feat: Live on and live off message push support

This commit is contained in:
LWR 2022-11-23 01:08:45 +08:00
parent dc6125f142
commit f1ecba3a61
3 changed files with 242 additions and 16 deletions

View File

@ -1,4 +1,5 @@
import asyncio
import time
import typing
from asyncio import AbstractEventLoop
from typing import Optional, List, Any
@ -38,7 +39,7 @@ class Up(BaseModel):
"""直播间连接实例"""
__is_reconnect: Optional[bool] = PrivateAttr()
"""是否为断线重连"""
"""是否为接直播间"""
__loop: Optional[AbstractEventLoop] = PrivateAttr()
"""asyncio 事件循环"""
@ -56,13 +57,13 @@ class Up(BaseModel):
def inject_bot(self, bot):
self.__bot = bot
def any_live_on_enabled(self):
def __any_live_on_enabled(self):
return any(map(lambda conf: conf.enabled, map(lambda group: group.live_on, self.targets)))
def any_live_off_enabled(self):
def __any_live_off_enabled(self):
return any(map(lambda conf: conf.enabled, map(lambda group: group.live_off, self.targets)))
def any_live_report_enabled(self):
def __any_live_report_enabled(self):
return any(map(lambda conf: conf.enabled, map(lambda group: group.live_report, self.targets)))
async def connect(self):
@ -74,13 +75,13 @@ class Up(BaseModel):
user_info = await user.get_user_info()
self.uname = user_info["name"]
if user_info["live_room"] is None:
raise LiveException(f"UP 主 {self.uname} ( UID: {self.uid} ) 还未开通直播间~")
raise LiveException(f"UP 主 {self.uname} ( UID: {self.uid} ) 还未开通直播间")
self.room_id = user_info["live_room"]["roomid"]
self.__room = LiveDanmaku(self.room_id, credential=get_credential())
# 开播推送开关和下播推送开关均处于关闭状态时跳过连接直播间,以节省性能
if config.get("ONLY_CONNECT_NECESSARY_ROOM"):
if not any([self.any_live_on_enabled(), self.any_live_off_enabled(), self.any_live_report_enabled()]):
if not any([self.__any_live_on_enabled(), self.__any_live_off_enabled(), self.__any_live_report_enabled()]):
logger.warning(f"{self.uname} 的开播, 下播和直播报告开关均处于关闭状态, 跳过连接直播间")
return
@ -90,8 +91,10 @@ class Up(BaseModel):
@self.__room.on("VERIFICATION_SUCCESSFUL")
async def on_link(event):
logger.debug(f"{self.uname} (VERIFICATION_SUCCESSFUL): {event}")
if self.__is_reconnect:
logger.success(f"直播间 {self.room_id} 断线重连成功")
logger.success(f"直播间 {self.room_id} 成功")
live_room = LiveRoom(self.room_id, get_credential())
room_info = await live_room.get_room_play_info()
@ -100,17 +103,157 @@ class Up(BaseModel):
if now_status != last_status:
await redis.hset("LiveStatus", self.room_id, now_status)
if last_status == 1:
logger.warning(f"直播间 {self.room_id} 断线期间下播")
pass
if now_status == 1:
logger.warning(f"直播间 {self.room_id} 断线期间开播")
pass
param = {
"data": {
"live_time": 0
}
}
await live_on(param)
if last_status == 1:
logger.warning(f"直播间 {self.room_id} 断线期间下播")
param = {}
await live_off(param)
else:
self.__is_reconnect = True
logger.success(f"已成功连接到 {self.uname} 的直播间 {self.room_id}")
@self.__room.on("LIVE")
async def live_on(event):
logger.debug(f"{self.uname} (LIVE): {event}")
# 是否为真正开播
if "live_time" in event["data"]:
live_room = LiveRoom(self.room_id, get_credential())
room_info = await live_room.get_room_info()
self.uname = room_info["anchor_info"]["base_info"]["uname"]
await redis.hset("LiveStatus", self.room_id, 1)
# 是否为主播网络波动断线重连
now = int(time.time())
last = await redis.hgeti("EndTime", self.room_id)
is_reconnect = (now - last) <= config.get("UP_DISCONNECT_CONNECT_INTERVAL")
if is_reconnect:
logger.opt(colors=True).info(f"<magenta>[断线重连] {self.uname} ({self.room_id})</>")
if config.get("UP_DISCONNECT_CONNECT_MESSAGE"):
self.__bot.send_to_all_target(self, config.get("UP_DISCONNECT_CONNECT_MESSAGE"),
lambda t: t.live_on.enabled)
else:
logger.opt(colors=True).info(f"<magenta>[开播] {self.uname} ({self.room_id})</>")
await redis.hset("StartTime", self.room_id, room_info["room_info"]["live_start_time"])
await self.__accumulate_data()
await self.__reset_data()
# 推送消息
arg_base = room_info["room_info"]
args = {
"{uname}": self.uname,
"{title}": arg_base["title"],
"{url}": f"https://live.bilibili.com/{self.room_id}",
"{cover}": "".join(["{urlpic=", arg_base["cover"], "}"])
}
self.__bot.send_live_on(self, args)
@self.__room.on("PREPARING")
async def live_off(event):
logger.debug(f"{self.uname} (PREPARING): {event}")
await redis.hset("LiveStatus", self.room_id, 0)
await redis.hset("EndTime", self.room_id, int(time.time()))
logger.opt(colors=True).info(f"<magenta>[下播] {self.uname} ({self.room_id})</>")
# 推送消息
args = {
"{uname}": self.uname
}
self.__bot.send_live_off(self, args)
async def __accumulate_data(self):
"""
累计直播间数据
"""
# 盲盒记录,用于统计击败直播间百分比
if await redis.hgeti("RoomBoxCount", self.room_id) > 0:
await redis.rpush("BoxProfit", await redis.hgetf1("RoomBoxProfit", self.room_id))
# 累计弹幕数
await redis.hincrby("RoomDanmuTotal", self.room_id, await redis.hgeti("RoomDanmuCount", self.room_id))
await redis.zunionstore(f"UserDanmuTotal:{self.room_id}", f"UserDanmuCount:{self.room_id}")
# 累计盲盒数
await redis.hincrby("RoomBoxTotal", self.room_id, await redis.hgeti("RoomBoxCount", self.room_id))
await redis.zunionstore(f"UserBoxTotal:{self.room_id}", f"UserBoxCount:{self.room_id}")
# 累计盲盒盈亏
await redis.hincrbyfloat("RoomBoxProfitTotal", self.room_id, await redis.hgetf1("RoomBoxProfit", self.room_id))
await redis.zunionstore(f"UserBoxProfitTotal:{self.room_id}", f"UserBoxProfit:{self.room_id}")
# 累计礼物收益
await redis.hincrbyfloat("RoomGiftTotal", self.room_id, await redis.hgetf1("RoomGiftProfit", self.room_id))
await redis.zunionstore(f"UserGiftTotal:{self.room_id}", f"UserGiftProfit:{self.room_id}")
# 累计 SC 收益
await redis.hincrby("RoomScTotal", self.room_id, await redis.hgeti("RoomScProfit", self.room_id))
await redis.zunionstore(f"UserScTotal:{self.room_id}", f"UserScProfit:{self.room_id}")
# 累计舰长数
await redis.hincrby("RoomCaptainTotal", self.room_id, await redis.hgeti("RoomCaptainCount", self.room_id))
await redis.zunionstore(f"UserCaptainTotal:{self.room_id}", f"UserCaptainCount:{self.room_id}")
# 累计提督数
await redis.hincrby("RoomCommanderTotal", self.room_id, await redis.hgeti("RoomCommanderCount", self.room_id))
await redis.zunionstore(f"UserCommanderTotal:{self.room_id}", f"UserCommanderCount:{self.room_id}")
# 累计总督数
await redis.hincrby("RoomGovernorTotal", self.room_id, await redis.hgeti("RoomGovernorCount", self.room_id))
await redis.zunionstore(f"UserGovernorTotal:{self.room_id}", f"UserGovernorCount:{self.room_id}")
async def __reset_data(self):
"""
重置直播间数据
"""
# 清空弹幕记录
await redis.delete(f"RoomDanmu:{self.room_id}")
# 重置弹幕数
await redis.hset(f"RoomDanmuCount", self.room_id, 0)
await redis.delete(f"UserDanmuCount:{self.room_id}")
# 重置盲盒数
await redis.hset(f"RoomBoxCount", self.room_id, 0)
await redis.delete(f"UserBoxCount:{self.room_id}")
# 重置盲盒盈亏
await redis.hset(f"RoomBoxProfit", self.room_id, 0)
await redis.delete(f"UserBoxProfit:{self.room_id}")
# 重置礼物收益
await redis.hset(f"RoomGiftProfit", self.room_id, 0)
await redis.delete(f"UserGiftProfit:{self.room_id}")
# 重置 SC 收益
await redis.hset(f"RoomScProfit", self.room_id, 0)
await redis.delete(f"UserScProfit:{self.room_id}")
# 重置舰长数
await redis.hset(f"RoomCaptainCount", self.room_id, 0)
await redis.delete(f"UserCaptainCount:{self.room_id}")
# 重置提督数
await redis.hset(f"RoomCommanderCount", self.room_id, 0)
await redis.delete(f"UserCommanderCount:{self.room_id}")
# 重置总督数
await redis.hset(f"RoomGovernorCount", self.room_id, 0)
await redis.delete(f"UserGovernorCount:{self.room_id}")
def __eq__(self, other):
if isinstance(other, Up):
return self.uid == other.uid

View File

@ -1,6 +1,6 @@
import asyncio
from asyncio import AbstractEventLoop
from typing import List, Optional, Any
from typing import Optional, List, Dict, Any, Union, Callable
from graia.ariadne import Ariadne
from graia.ariadne.connection.config import config as AriadneConfig, HttpClientConfig, WebsocketClientConfig
@ -10,7 +10,7 @@ from graia.ariadne.model import LogConfig, MemberPerm
from loguru import logger
from pydantic import BaseModel, PrivateAttr
from .model import PushType, Message
from .model import LiveOn, LiveOff, DynamicUpdate, Message, PushType, PushTarget
from .room import Up
from ..utils import config
from ..utils.AsyncEvent import AsyncEvent
@ -72,11 +72,11 @@ class Bot(BaseModel, AsyncEvent):
msg = self.__queue[0]
if msg.type == PushType.Friend:
for message in msg.get_message_chains():
logger.info(f"{self.qq} -> 好友[{msg.id}] : {message}")
logger.info(f"{self.qq} -> 好友[{msg.id}] : {message.safe_display}")
await self.__bot.send_friend_message(msg.id, message)
else:
for message in await self.group_message_filter(msg):
logger.info(f"{self.qq} -> 群[{msg.id}] : {message}")
logger.info(f"{self.qq} -> 群[{msg.id}] : {message.safe_display}")
await self.__bot.send_group_message(msg.id, message)
self.__queue.pop(0)
await asyncio.sleep(interval)
@ -122,10 +122,81 @@ class Bot(BaseModel, AsyncEvent):
elements = [e for e in chain if (not isinstance(e, At)) or (e.target in member_list)]
chain = MessageChain(elements)
new_chains.append(chain)
if len(chain) != 0:
new_chains.append(chain)
return new_chains
def send_to_all_target(self, up: Up, msg: str, target_filter: Callable[[PushTarget], bool] = lambda t: True):
"""
发送消息至 UP 主下所有推送目标
Args:
up: 要发送的 UP 主实例
msg: 要发送的消息内容可使用占位符
target_filter: 推送目标过滤器如传入 lambda t: t.live_on.enabled 代表发送所有启用开播推送的群默认lambda t: True
"""
if not isinstance(up, Up):
return
for target in up.targets:
if target_filter(target):
self.send_message(Message(id=target.id, content=msg, type=target.type))
def __send_push_message(self, up: Up,
type_selector: Callable[[PushTarget], Union[LiveOn, LiveOff, DynamicUpdate]],
args: Dict):
"""
发送推送消息至 UP 主下启用此推送类型的推送目标
Args:
up: 要发送的 UP 主实例
type_selector: 推送类型选择器如传入 lambda t: t.live_on 代表推送开播推送消息
args: 占位符参数
"""
if not isinstance(up, Up):
return
for target in up.targets:
select = type_selector(target)
if not isinstance(select, (LiveOn, LiveOff, DynamicUpdate)):
return
if select.enabled:
for arg, val in args.items():
select.message = select.message.replace(arg, str(val))
self.send_message(Message(id=target.id, content=select.message, type=target.type))
def send_live_on(self, up: Up, args: Dict):
"""
发送开播消息至 UP 主下启用开播推送的推送目标
Args:
up: 要发送的 UP 主实例
args: 占位符参数
"""
self.__send_push_message(up, lambda t: t.live_on, args)
def send_live_off(self, up: Up, args: Dict):
"""
发送下播消息至 UP 主下启用下播推送的推送目标
Args:
up: 要发送的 UP 主实例
args: 占位符参数
"""
self.__send_push_message(up, lambda t: t.live_off, args)
def send_dynamic_update(self, up: Up, args: Dict):
"""
发送动态消息至 UP 主下启用动态推送的推送目标
Args:
up: 要发送的 UP 主实例
args: 占位符参数
"""
self.__send_push_message(up, lambda t: t.dynamic_update, args)
def __eq__(self, other):
if isinstance(other, Bot):
return self.qq == other.qq

View File

@ -41,6 +41,11 @@ SIMPLE_CONFIG = {
# 是否自动判断仅处理必要的直播事件,例如当某直播间的下播推送和直播报告中均不包含弹幕相关功能,则不再处理此直播间的弹幕事件,以节省性能
"ONLY_HANDLE_NECESSARY_EVENT": False,
# 主播下播后再开播视为主播网络波动断线重连的时间间隔,在此时间内重新开播不会重新计算本次直播数据,且不重复 @全体成员,单位:秒
"UP_DISCONNECT_CONNECT_INTERVAL": 120,
# 视为主播网络波动断线重连时,需发送的额外提示消息
"UP_DISCONNECT_CONNECT_MESSAGE": "检测到下播后短时间内重新开播,可能是由于主播网络波动引起,本次开播不再重复通知",
# Bot 主人 QQ用于接收部分 Bot 异常通知等
"MASTER_QQ": None,
@ -109,6 +114,11 @@ FULL_CONFIG = {
# 是否自动判断仅处理必要的直播事件,例如当某直播间的下播推送和直播报告中均不包含弹幕相关功能,则不再处理此直播间的弹幕事件,以节省性能
"ONLY_HANDLE_NECESSARY_EVENT": False,
# 主播下播后再开播视为主播网络波动断线重连的时间间隔,在此时间内重新开播不会重新计算本次直播数据,且不重复 @全体成员,单位:秒
"UP_DISCONNECT_CONNECT_INTERVAL": 120,
# 视为主播网络波动断线重连时,需发送的额外提示消息
"UP_DISCONNECT_CONNECT_MESSAGE": "检测到下播后短时间内重新开播,可能是由于主播网络波动引起,本次开播不再重复通知",
# Bot 主人 QQ用于接收 Bot 异常通知等
"MASTER_QQ": None,
@ -161,6 +171,7 @@ def use_simple_config():
Mirai 连接端口 7827
未设置登录 B 站账号所需 Cookie 数据
不启用节省性能优化
主播下播后 2 分钟内开播视为主播网络波动并发送提示消息 "检测到下播后短时间内重新开播,可能是由于主播网络波动引起,本次开播不再重复通知"
未设置 Bot 主人 QQ
不使用 HTTP 代理
不开启 HTTP API 推送
@ -182,6 +193,7 @@ def use_full_config():
Mirai 连接端口 7827
未设置登录 B 站账号所需 Cookie 数据
不启用节省性能优化
主播下播后 2 分钟内开播视为主播网络波动并发送提示消息 "检测到下播后短时间内重新开播,可能是由于主播网络波动引起,本次开播不再重复通知"
未设置 Bot 主人 QQ
不使用 HTTP 代理
开启 HTTP API 推送 (port: 8088)