diff --git a/starbot/core/room.py b/starbot/core/room.py index d69f07d..130a590 100644 --- a/starbot/core/room.py +++ b/starbot/core/room.py @@ -1,4 +1,5 @@ import asyncio +import time import typing from asyncio import AbstractEventLoop from typing import Optional, List, Any @@ -38,7 +39,7 @@ class Up(BaseModel): """直播间连接实例""" __is_reconnect: Optional[bool] = PrivateAttr() - """是否为断线重连""" + """是否为重新连接直播间""" __loop: Optional[AbstractEventLoop] = PrivateAttr() """asyncio 事件循环""" @@ -56,13 +57,13 @@ class Up(BaseModel): def inject_bot(self, bot): self.__bot = bot - def any_live_on_enabled(self): + def __any_live_on_enabled(self): return any(map(lambda conf: conf.enabled, map(lambda group: group.live_on, self.targets))) - def any_live_off_enabled(self): + def __any_live_off_enabled(self): return any(map(lambda conf: conf.enabled, map(lambda group: group.live_off, self.targets))) - def any_live_report_enabled(self): + def __any_live_report_enabled(self): return any(map(lambda conf: conf.enabled, map(lambda group: group.live_report, self.targets))) async def connect(self): @@ -74,13 +75,13 @@ class Up(BaseModel): user_info = await user.get_user_info() self.uname = user_info["name"] if user_info["live_room"] is None: - raise LiveException(f"UP 主 {self.uname} ( UID: {self.uid} ) 还未开通直播间~") + raise LiveException(f"UP 主 {self.uname} ( UID: {self.uid} ) 还未开通直播间") self.room_id = user_info["live_room"]["roomid"] self.__room = LiveDanmaku(self.room_id, credential=get_credential()) # 开播推送开关和下播推送开关均处于关闭状态时跳过连接直播间,以节省性能 if config.get("ONLY_CONNECT_NECESSARY_ROOM"): - if not any([self.any_live_on_enabled(), self.any_live_off_enabled(), self.any_live_report_enabled()]): + if not any([self.__any_live_on_enabled(), self.__any_live_off_enabled(), self.__any_live_report_enabled()]): logger.warning(f"{self.uname} 的开播, 下播和直播报告开关均处于关闭状态, 跳过连接直播间") return @@ -90,8 +91,10 @@ class Up(BaseModel): @self.__room.on("VERIFICATION_SUCCESSFUL") async def on_link(event): + logger.debug(f"{self.uname} (VERIFICATION_SUCCESSFUL): {event}") + if self.__is_reconnect: - logger.success(f"直播间 {self.room_id} 断线重连成功") + logger.success(f"直播间 {self.room_id} 重新连接成功") live_room = LiveRoom(self.room_id, get_credential()) room_info = await live_room.get_room_play_info() @@ -100,17 +103,157 @@ class Up(BaseModel): if now_status != last_status: await redis.hset("LiveStatus", self.room_id, now_status) - if last_status == 1: - logger.warning(f"直播间 {self.room_id} 断线期间下播") - pass if now_status == 1: logger.warning(f"直播间 {self.room_id} 断线期间开播") - pass + param = { + "data": { + "live_time": 0 + } + } + await live_on(param) + if last_status == 1: + logger.warning(f"直播间 {self.room_id} 断线期间下播") + param = {} + await live_off(param) else: self.__is_reconnect = True logger.success(f"已成功连接到 {self.uname} 的直播间 {self.room_id}") + @self.__room.on("LIVE") + async def live_on(event): + logger.debug(f"{self.uname} (LIVE): {event}") + + # 是否为真正开播 + if "live_time" in event["data"]: + live_room = LiveRoom(self.room_id, get_credential()) + room_info = await live_room.get_room_info() + self.uname = room_info["anchor_info"]["base_info"]["uname"] + + await redis.hset("LiveStatus", self.room_id, 1) + + # 是否为主播网络波动断线重连 + now = int(time.time()) + last = await redis.hgeti("EndTime", self.room_id) + is_reconnect = (now - last) <= config.get("UP_DISCONNECT_CONNECT_INTERVAL") + if is_reconnect: + logger.opt(colors=True).info(f"[断线重连] {self.uname} ({self.room_id})") + if config.get("UP_DISCONNECT_CONNECT_MESSAGE"): + self.__bot.send_to_all_target(self, config.get("UP_DISCONNECT_CONNECT_MESSAGE"), + lambda t: t.live_on.enabled) + else: + logger.opt(colors=True).info(f"[开播] {self.uname} ({self.room_id})") + await redis.hset("StartTime", self.room_id, room_info["room_info"]["live_start_time"]) + + await self.__accumulate_data() + await self.__reset_data() + + # 推送消息 + arg_base = room_info["room_info"] + args = { + "{uname}": self.uname, + "{title}": arg_base["title"], + "{url}": f"https://live.bilibili.com/{self.room_id}", + "{cover}": "".join(["{urlpic=", arg_base["cover"], "}"]) + } + self.__bot.send_live_on(self, args) + + @self.__room.on("PREPARING") + async def live_off(event): + logger.debug(f"{self.uname} (PREPARING): {event}") + + await redis.hset("LiveStatus", self.room_id, 0) + await redis.hset("EndTime", self.room_id, int(time.time())) + + logger.opt(colors=True).info(f"[下播] {self.uname} ({self.room_id})") + + # 推送消息 + args = { + "{uname}": self.uname + } + self.__bot.send_live_off(self, args) + + async def __accumulate_data(self): + """ + 累计直播间数据 + """ + + # 盲盒记录,用于统计击败直播间百分比 + if await redis.hgeti("RoomBoxCount", self.room_id) > 0: + await redis.rpush("BoxProfit", await redis.hgetf1("RoomBoxProfit", self.room_id)) + + # 累计弹幕数 + await redis.hincrby("RoomDanmuTotal", self.room_id, await redis.hgeti("RoomDanmuCount", self.room_id)) + await redis.zunionstore(f"UserDanmuTotal:{self.room_id}", f"UserDanmuCount:{self.room_id}") + + # 累计盲盒数 + await redis.hincrby("RoomBoxTotal", self.room_id, await redis.hgeti("RoomBoxCount", self.room_id)) + await redis.zunionstore(f"UserBoxTotal:{self.room_id}", f"UserBoxCount:{self.room_id}") + + # 累计盲盒盈亏 + await redis.hincrbyfloat("RoomBoxProfitTotal", self.room_id, await redis.hgetf1("RoomBoxProfit", self.room_id)) + await redis.zunionstore(f"UserBoxProfitTotal:{self.room_id}", f"UserBoxProfit:{self.room_id}") + + # 累计礼物收益 + await redis.hincrbyfloat("RoomGiftTotal", self.room_id, await redis.hgetf1("RoomGiftProfit", self.room_id)) + await redis.zunionstore(f"UserGiftTotal:{self.room_id}", f"UserGiftProfit:{self.room_id}") + + # 累计 SC 收益 + await redis.hincrby("RoomScTotal", self.room_id, await redis.hgeti("RoomScProfit", self.room_id)) + await redis.zunionstore(f"UserScTotal:{self.room_id}", f"UserScProfit:{self.room_id}") + + # 累计舰长数 + await redis.hincrby("RoomCaptainTotal", self.room_id, await redis.hgeti("RoomCaptainCount", self.room_id)) + await redis.zunionstore(f"UserCaptainTotal:{self.room_id}", f"UserCaptainCount:{self.room_id}") + + # 累计提督数 + await redis.hincrby("RoomCommanderTotal", self.room_id, await redis.hgeti("RoomCommanderCount", self.room_id)) + await redis.zunionstore(f"UserCommanderTotal:{self.room_id}", f"UserCommanderCount:{self.room_id}") + + # 累计总督数 + await redis.hincrby("RoomGovernorTotal", self.room_id, await redis.hgeti("RoomGovernorCount", self.room_id)) + await redis.zunionstore(f"UserGovernorTotal:{self.room_id}", f"UserGovernorCount:{self.room_id}") + + async def __reset_data(self): + """ + 重置直播间数据 + """ + + # 清空弹幕记录 + await redis.delete(f"RoomDanmu:{self.room_id}") + + # 重置弹幕数 + await redis.hset(f"RoomDanmuCount", self.room_id, 0) + await redis.delete(f"UserDanmuCount:{self.room_id}") + + # 重置盲盒数 + await redis.hset(f"RoomBoxCount", self.room_id, 0) + await redis.delete(f"UserBoxCount:{self.room_id}") + + # 重置盲盒盈亏 + await redis.hset(f"RoomBoxProfit", self.room_id, 0) + await redis.delete(f"UserBoxProfit:{self.room_id}") + + # 重置礼物收益 + await redis.hset(f"RoomGiftProfit", self.room_id, 0) + await redis.delete(f"UserGiftProfit:{self.room_id}") + + # 重置 SC 收益 + await redis.hset(f"RoomScProfit", self.room_id, 0) + await redis.delete(f"UserScProfit:{self.room_id}") + + # 重置舰长数 + await redis.hset(f"RoomCaptainCount", self.room_id, 0) + await redis.delete(f"UserCaptainCount:{self.room_id}") + + # 重置提督数 + await redis.hset(f"RoomCommanderCount", self.room_id, 0) + await redis.delete(f"UserCommanderCount:{self.room_id}") + + # 重置总督数 + await redis.hset(f"RoomGovernorCount", self.room_id, 0) + await redis.delete(f"UserGovernorCount:{self.room_id}") + def __eq__(self, other): if isinstance(other, Up): return self.uid == other.uid diff --git a/starbot/core/sender.py b/starbot/core/sender.py index 4622f75..e8f4997 100644 --- a/starbot/core/sender.py +++ b/starbot/core/sender.py @@ -1,6 +1,6 @@ import asyncio from asyncio import AbstractEventLoop -from typing import List, Optional, Any +from typing import Optional, List, Dict, Any, Union, Callable from graia.ariadne import Ariadne from graia.ariadne.connection.config import config as AriadneConfig, HttpClientConfig, WebsocketClientConfig @@ -10,7 +10,7 @@ from graia.ariadne.model import LogConfig, MemberPerm from loguru import logger from pydantic import BaseModel, PrivateAttr -from .model import PushType, Message +from .model import LiveOn, LiveOff, DynamicUpdate, Message, PushType, PushTarget from .room import Up from ..utils import config from ..utils.AsyncEvent import AsyncEvent @@ -72,11 +72,11 @@ class Bot(BaseModel, AsyncEvent): msg = self.__queue[0] if msg.type == PushType.Friend: for message in msg.get_message_chains(): - logger.info(f"{self.qq} -> 好友[{msg.id}] : {message}") + logger.info(f"{self.qq} -> 好友[{msg.id}] : {message.safe_display}") await self.__bot.send_friend_message(msg.id, message) else: for message in await self.group_message_filter(msg): - logger.info(f"{self.qq} -> 群[{msg.id}] : {message}") + logger.info(f"{self.qq} -> 群[{msg.id}] : {message.safe_display}") await self.__bot.send_group_message(msg.id, message) self.__queue.pop(0) await asyncio.sleep(interval) @@ -122,10 +122,81 @@ class Bot(BaseModel, AsyncEvent): elements = [e for e in chain if (not isinstance(e, At)) or (e.target in member_list)] chain = MessageChain(elements) - new_chains.append(chain) + if len(chain) != 0: + new_chains.append(chain) return new_chains + def send_to_all_target(self, up: Up, msg: str, target_filter: Callable[[PushTarget], bool] = lambda t: True): + """ + 发送消息至 UP 主下所有推送目标 + + Args: + up: 要发送的 UP 主实例 + msg: 要发送的消息内容,可使用占位符 + target_filter: 推送目标过滤器,如传入 lambda t: t.live_on.enabled 代表发送所有启用开播推送的群。默认:lambda t: True + """ + if not isinstance(up, Up): + return + + for target in up.targets: + if target_filter(target): + self.send_message(Message(id=target.id, content=msg, type=target.type)) + + def __send_push_message(self, up: Up, + type_selector: Callable[[PushTarget], Union[LiveOn, LiveOff, DynamicUpdate]], + args: Dict): + """ + 发送推送消息至 UP 主下启用此推送类型的推送目标 + + Args: + up: 要发送的 UP 主实例 + type_selector: 推送类型选择器,如传入 lambda t: t.live_on 代表推送开播推送消息 + args: 占位符参数 + """ + if not isinstance(up, Up): + return + + for target in up.targets: + select = type_selector(target) + if not isinstance(select, (LiveOn, LiveOff, DynamicUpdate)): + return + + if select.enabled: + for arg, val in args.items(): + select.message = select.message.replace(arg, str(val)) + self.send_message(Message(id=target.id, content=select.message, type=target.type)) + + def send_live_on(self, up: Up, args: Dict): + """ + 发送开播消息至 UP 主下启用开播推送的推送目标 + + Args: + up: 要发送的 UP 主实例 + args: 占位符参数 + """ + self.__send_push_message(up, lambda t: t.live_on, args) + + def send_live_off(self, up: Up, args: Dict): + """ + 发送下播消息至 UP 主下启用下播推送的推送目标 + + Args: + up: 要发送的 UP 主实例 + args: 占位符参数 + """ + self.__send_push_message(up, lambda t: t.live_off, args) + + def send_dynamic_update(self, up: Up, args: Dict): + """ + 发送动态消息至 UP 主下启用动态推送的推送目标 + + Args: + up: 要发送的 UP 主实例 + args: 占位符参数 + """ + self.__send_push_message(up, lambda t: t.dynamic_update, args) + def __eq__(self, other): if isinstance(other, Bot): return self.qq == other.qq diff --git a/starbot/utils/config.py b/starbot/utils/config.py index b279834..302d5d3 100644 --- a/starbot/utils/config.py +++ b/starbot/utils/config.py @@ -41,6 +41,11 @@ SIMPLE_CONFIG = { # 是否自动判断仅处理必要的直播事件,例如当某直播间的下播推送和直播报告中均不包含弹幕相关功能,则不再处理此直播间的弹幕事件,以节省性能 "ONLY_HANDLE_NECESSARY_EVENT": False, + # 主播下播后再开播视为主播网络波动断线重连的时间间隔,在此时间内重新开播不会重新计算本次直播数据,且不重复 @全体成员,单位:秒 + "UP_DISCONNECT_CONNECT_INTERVAL": 120, + # 视为主播网络波动断线重连时,需发送的额外提示消息 + "UP_DISCONNECT_CONNECT_MESSAGE": "检测到下播后短时间内重新开播,可能是由于主播网络波动引起,本次开播不再重复通知", + # Bot 主人 QQ,用于接收部分 Bot 异常通知等 "MASTER_QQ": None, @@ -109,6 +114,11 @@ FULL_CONFIG = { # 是否自动判断仅处理必要的直播事件,例如当某直播间的下播推送和直播报告中均不包含弹幕相关功能,则不再处理此直播间的弹幕事件,以节省性能 "ONLY_HANDLE_NECESSARY_EVENT": False, + # 主播下播后再开播视为主播网络波动断线重连的时间间隔,在此时间内重新开播不会重新计算本次直播数据,且不重复 @全体成员,单位:秒 + "UP_DISCONNECT_CONNECT_INTERVAL": 120, + # 视为主播网络波动断线重连时,需发送的额外提示消息 + "UP_DISCONNECT_CONNECT_MESSAGE": "检测到下播后短时间内重新开播,可能是由于主播网络波动引起,本次开播不再重复通知", + # Bot 主人 QQ,用于接收 Bot 异常通知等 "MASTER_QQ": None, @@ -161,6 +171,7 @@ def use_simple_config(): Mirai 连接端口 7827 未设置登录 B 站账号所需 Cookie 数据 不启用节省性能优化 + 主播下播后 2 分钟内开播视为主播网络波动,并发送提示消息 "检测到下播后短时间内重新开播,可能是由于主播网络波动引起,本次开播不再重复通知" 未设置 Bot 主人 QQ 不使用 HTTP 代理 不开启 HTTP API 推送 @@ -182,6 +193,7 @@ def use_full_config(): Mirai 连接端口 7827 未设置登录 B 站账号所需 Cookie 数据 不启用节省性能优化 + 主播下播后 2 分钟内开播视为主播网络波动,并发送提示消息 "检测到下播后短时间内重新开播,可能是由于主播网络波动引起,本次开播不再重复通知" 未设置 Bot 主人 QQ 不使用 HTTP 代理 开启 HTTP API 推送 (port: 8088)