From 4f7b48ddaedea4719382595c7004732c54cb46c0 Mon Sep 17 00:00:00 2001 From: LWR Date: Sun, 5 Feb 2023 19:48:22 +0800 Subject: [PATCH] feat: Automatically use the at me list when the number of at all is insufficient or the bot is not an administrator --- starbot/core/bot.py | 3 - starbot/core/model.py | 2 +- starbot/core/room.py | 12 +- starbot/core/sender.py | 129 +++++++++++++-------- starbot/core/server.py | 2 +- starbot/exception/AtAllLimitedException.py | 15 +++ starbot/utils/config.py | 18 +-- 7 files changed, 108 insertions(+), 73 deletions(-) create mode 100644 starbot/exception/AtAllLimitedException.py diff --git a/starbot/core/bot.py b/starbot/core/bot.py index d991a5e..c8791e2 100644 --- a/starbot/core/bot.py +++ b/starbot/core/bot.py @@ -130,9 +130,6 @@ class StarBot: logger.info("开始运行 Ariadne 消息推送模块") - for bot in self.__datasource.bots: - bot.start_sender() - try: Ariadne.launch_blocking() except RuntimeError as ex: diff --git a/starbot/core/model.py b/starbot/core/model.py index 9692963..88fc276 100644 --- a/starbot/core/model.py +++ b/starbot/core/model.py @@ -171,7 +171,7 @@ class LiveReport(BaseModel): 生成弹幕词云 """ return LiveReport(enabled=True, logo=None, logo_base64=None, - time=True, fans_change=True, fans_medal_change=True,guard_change=True, + time=True, fans_change=True, fans_medal_change=True, guard_change=True, danmu=True, box=True, gift=True, sc=True, guard=True, danmu_ranking=3, box_ranking=3, box_profit_ranking=3, gift_ranking=3, sc_ranking=3, guard_list=True, box_profit_diagram=True, diff --git a/starbot/core/room.py b/starbot/core/room.py index 859f27e..59adc5d 100644 --- a/starbot/core/room.py +++ b/starbot/core/room.py @@ -174,8 +174,8 @@ class Up(BaseModel): if is_reconnect: logger.opt(colors=True).info(f"[断线重连] {self.uname} ({self.room_id})") if config.get("UP_DISCONNECT_CONNECT_MESSAGE"): - self.__bot.send_to_all_target(self, config.get("UP_DISCONNECT_CONNECT_MESSAGE"), - lambda t: t.live_on.enabled) + await self.__bot.send_to_all_target(self, config.get("UP_DISCONNECT_CONNECT_MESSAGE"), + lambda t: t.live_on.enabled) else: logger.opt(colors=True).info(f"[开播] {self.uname} ({self.room_id})") @@ -202,7 +202,7 @@ class Up(BaseModel): "{cover}": "".join(["{urlpic=", arg_base["cover"], "}"]) } await self.__bot.send_live_on_at(self) - self.__bot.send_live_on(self, args) + await self.__bot.send_live_on(self, args) @self.__room.on("PREPARING") async def live_off(event): @@ -223,8 +223,8 @@ class Up(BaseModel): live_report_param = await self.__generate_live_report_param() # 推送下播消息和直播报告 - self.__bot.send_live_off(self, live_off_args) - self.__bot.send_live_report(self, live_report_param) + await self.__bot.send_live_off(self, live_off_args) + await self.__bot.send_live_report(self, live_report_param) danmu_items = ["danmu", "danmu_ranking", "danmu_diagram", "danmu_cloud"] if not config.get("ONLY_HANDLE_NECESSARY_EVENT") or self.__any_live_report_item_enabled(danmu_items): @@ -372,7 +372,7 @@ class Up(BaseModel): "{picture}": "".join(["{base64pic=", base64str, "}"]) } await self.__bot.send_dynamic_at(self) - self.__bot.send_dynamic_update(self, dynamic_update_args) + await self.__bot.send_dynamic_update(self, dynamic_update_args) async def __generate_live_report_param(self): """ diff --git a/starbot/core/sender.py b/starbot/core/sender.py index e613bbe..d56620a 100644 --- a/starbot/core/sender.py +++ b/starbot/core/sender.py @@ -1,9 +1,11 @@ import asyncio +import time from asyncio import AbstractEventLoop from typing import Optional, List, Dict, Any, Union, Callable from graia.ariadne import Ariadne from graia.ariadne.connection.config import config as AriadneConfig, HttpClientConfig, WebsocketClientConfig +from graia.ariadne.exception import RemoteException from graia.ariadne.message.chain import MessageChain from graia.ariadne.message.element import At, AtAll from graia.ariadne.model import LogConfig, MemberPerm @@ -12,6 +14,7 @@ from pydantic import BaseModel, PrivateAttr from .model import LiveOn, LiveOff, DynamicUpdate, Message, PushType, PushTarget from .room import Up +from ..exception.AtAllLimitedException import AtAllLimitedException from ..painter.LiveReportGenerator import LiveReportGenerator from ..utils import config, redis from ..utils.AsyncEvent import AsyncEvent @@ -34,8 +37,11 @@ class Bot(BaseModel, AsyncEvent): __bot: Optional[Ariadne] = PrivateAttr() """Ariadne 实例""" + __at_all_limited: Optional[int] = PrivateAttr() + """@全体成员次数用尽时所在日期""" + __queue: Optional[List[Message]] = PrivateAttr() - """待发送消息队列""" + """消息补发队列""" def __init__(self, **data: Any): super().__init__(**data) @@ -49,40 +55,53 @@ class Bot(BaseModel, AsyncEvent): ), log_config=LogConfig(log_level="DEBUG") ) + self.__at_all_limited = time.localtime(time.time() - 86400).tm_yday self.__queue = [] # 注入 Bot 实例引用 for up in self.ups: up.inject_bot(self) - def start_sender(self): - self.__loop.create_task(self.__sender()) - logger.success(f"Bot [{self.qq}] 已启动") - - def send_message(self, msg: Message): - self.__queue.append(msg) - - async def __sender(self): + async def send_message(self, msg: Message): """ - 消息发送模块 - """ - interval = config.get("MESSAGE_SEND_INTERVAL") + 消息发送 - while True: - if self.__queue: - msg = self.__queue[0] - if msg.type == PushType.Friend: - for message in msg.get_message_chains(): - logger.info(f"{self.qq} -> 好友[{msg.id}] : {message.safe_display}") - await self.__bot.send_friend_message(msg.id, message) - else: - for message in await self.group_message_filter(msg): - logger.info(f"{self.qq} -> 群[{msg.id}] : {message.safe_display}") - await self.__bot.send_group_message(msg.id, message) - self.__queue.pop(0) - await asyncio.sleep(interval) - else: - await asyncio.sleep(0.1) + Args: + msg: Message 实例 + + Raises: + + """ + exception = None + if msg.type == PushType.Friend: + for message in msg.get_message_chains(): + try: + await self.__bot.send_friend_message(msg.id, message) + logger.info(f"{self.qq} -> 好友[{msg.id}] : {message.safe_display}") + except RemoteException as ex: + logger.exception("消息推送模块异常", ex) + continue + else: + msgs = await self.group_message_filter(msg) + + if any([(AtAll in x) for x in msg.get_message_chains()]) and all([(AtAll not in x) for x in msgs]): + exception = AtAllLimitedException() + + for message in msgs: + try: + await self.__bot.send_group_message(msg.id, message) + logger.info(f"{self.qq} -> 群[{msg.id}] : {message.safe_display}") + except RemoteException as ex: + if "AT_ALL_LIMITED" in str(ex): + exception = AtAllLimitedException() + self.__at_all_limited = time.localtime(time.time()).tm_yday + continue + else: + logger.exception("消息推送模块异常", ex) + continue + + if exception is not None: + raise exception async def group_message_filter(self, message: Message) -> List[MessageChain]: """ @@ -107,10 +126,14 @@ class Bot(BaseModel, AsyncEvent): for chain in message.get_message_chains(): if AtAll in chain: # 过滤 Bot 不是群管理员时的 @全体成员 消息 - bot_info = await self.__bot.get_member(self.qq, message.id) + bot_info = await self.__bot.get_member(message.id, self.qq) if bot_info.permission < MemberPerm.Administrator: chain = chain.exclude(AtAll) + # 过滤已超出当日次数上限的 @全体成员 消息 + if time.localtime(time.time()).tm_yday == self.__at_all_limited: + chain = chain.exclude(AtAll) + # 过滤多余的 @全体成员 消息 if chain.count(AtAll) > 1: elements = [e for e in chain.exclude(AtAll)] @@ -137,7 +160,7 @@ class Bot(BaseModel, AsyncEvent): return new_chains - def send_to_all_target(self, up: Up, msg: str, target_filter: Callable[[PushTarget], bool] = lambda t: True): + async def send_to_all_target(self, up: Up, msg: str, target_filter: Callable[[PushTarget], bool] = lambda t: True): """ 发送消息至 UP 主下所有推送目标 @@ -151,11 +174,11 @@ class Bot(BaseModel, AsyncEvent): for target in up.targets: if target_filter(target): - self.send_message(Message(id=target.id, content=msg, type=target.type)) + await self.send_message(Message(id=target.id, content=msg, type=target.type)) - def __send_push_message(self, up: Up, - type_selector: Callable[[PushTarget], Union[LiveOn, LiveOff, DynamicUpdate]], - args: Dict[str, Any]): + async def __send_push_message(self, up: Up, + type_selector: Callable[[PushTarget], Union[LiveOn, LiveOff, DynamicUpdate]], + args: Dict[str, Any]): """ 发送推送消息至 UP 主下启用此推送类型的推送目标 @@ -176,9 +199,9 @@ class Bot(BaseModel, AsyncEvent): message = select.message for arg, val in args.items(): message = message.replace(arg, str(val)) - self.send_message(Message(id=target.id, content=message, type=target.type)) + await self.send_message(Message(id=target.id, content=message, type=target.type)) - def send_live_on(self, up: Up, args: Dict[str, Any]): + async def send_live_on(self, up: Up, args: Dict[str, Any]): """ 发送开播消息至 UP 主下启用开播推送的推送目标 @@ -186,24 +209,28 @@ class Bot(BaseModel, AsyncEvent): up: 要发送的 UP 主实例 args: 占位符参数 """ - self.__send_push_message(up, lambda t: t.live_on, args) + try: + await self.__send_push_message(up, lambda t: t.live_on, args) + except AtAllLimitedException: + await self.send_live_on_at(up, True) - async def send_live_on_at(self, up: Up): + async def send_live_on_at(self, up: Up, limited: bool = False): """ 发送开播 @ 我列表中的 @ 消息 Args: up: 要发送的 UP 主实例 + limited: 是否为 @全体成员次数达到上限时发送。默认:False """ if not isinstance(up, Up): return for target in filter(lambda t: t.type == PushType.Group, up.targets): - if target.live_on.enabled: + if target.live_on.enabled and (limited or "{atall}" not in target.live_on.message): ats = " ".join(["{at" + str(x) + "}" for x in await redis.range_live_on_at(target.id)]) - self.send_message(Message(id=target.id, content=ats, type=target.type)) + await self.send_message(Message(id=target.id, content=ats, type=target.type)) - def send_live_off(self, up: Up, args: Dict[str, Any]): + async def send_live_off(self, up: Up, args: Dict[str, Any]): """ 发送下播消息至 UP 主下启用下播推送的推送目标 @@ -211,9 +238,9 @@ class Bot(BaseModel, AsyncEvent): up: 要发送的 UP 主实例 args: 占位符参数 """ - self.__send_push_message(up, lambda t: t.live_off, args) + await self.__send_push_message(up, lambda t: t.live_off, args) - def send_live_report(self, up: Up, param: Dict[str, Any]): + async def send_live_report(self, up: Up, param: Dict[str, Any]): """ 发送直播报告消息至 UP 主下启用直播报告推送的推送目标 @@ -223,9 +250,11 @@ class Bot(BaseModel, AsyncEvent): """ for target in filter(lambda t: t.live_report.enabled, up.targets): base64str = LiveReportGenerator.generate(param, target.live_report) - self.send_message(Message(id=target.id, content="".join(["{base64pic=", base64str, "}"]), type=target.type)) + await self.send_message( + Message(id=target.id, content="".join(["{base64pic=", base64str, "}"]), type=target.type) + ) - def send_dynamic_update(self, up: Up, args: Dict[str, Any]): + async def send_dynamic_update(self, up: Up, args: Dict[str, Any]): """ 发送动态消息至 UP 主下启用动态推送的推送目标 @@ -233,22 +262,26 @@ class Bot(BaseModel, AsyncEvent): up: 要发送的 UP 主实例 args: 占位符参数 """ - self.__send_push_message(up, lambda t: t.dynamic_update, args) + try: + await self.__send_push_message(up, lambda t: t.dynamic_update, args) + except AtAllLimitedException: + await self.send_dynamic_at(up, True) - async def send_dynamic_at(self, up: Up): + async def send_dynamic_at(self, up: Up, limited: bool = False): """ 发送动态 @ 我列表中的 @ 消息 Args: up: 要发送的 UP 主实例 + limited: 是否为 @全体成员次数达到上限时发送。默认:False """ if not isinstance(up, Up): return for target in filter(lambda t: t.type == PushType.Group, up.targets): - if target.dynamic_update.enabled: + if target.dynamic_update.enabled and (limited or "{atall}" not in target.dynamic_update.message): ats = " ".join(["{at" + str(x) + "}" for x in await redis.range_dynamic_at(target.id)]) - self.send_message(Message(id=target.id, content=ats, type=target.type)) + await self.send_message(Message(id=target.id, content=ats, type=target.type)) def __eq__(self, other): if isinstance(other, Bot): diff --git a/starbot/core/server.py b/starbot/core/server.py index 0b23933..1e0afd6 100644 --- a/starbot/core/server.py +++ b/starbot/core/server.py @@ -23,7 +23,7 @@ async def send(request: aiohttp.web.Request) -> aiohttp.web.Response: target = datasource.get_target_by_key(key) bot = datasource.get_bot_by_key(key) msg = Message(id=target.id, content=message, type=target.type) - bot.send_message(msg) + await bot.send_message(msg) return web.Response(text="success") except DataSourceException: logger.warning(f"HTTP API 推送失败, 不存在的推送 key: {key}") diff --git a/starbot/exception/AtAllLimitedException.py b/starbot/exception/AtAllLimitedException.py new file mode 100644 index 0000000..98af4dd --- /dev/null +++ b/starbot/exception/AtAllLimitedException.py @@ -0,0 +1,15 @@ +""" +@ 全体成员次数达到上限异常 +""" + +from .ApiException import ApiException + + +class AtAllLimitedException(ApiException): + """ + @ 全体成员次数达到上限异常 + """ + + def __init__(self): + super().__init__() + self.msg = "今日 @ 全体成员次数已达到上限" diff --git a/starbot/utils/config.py b/starbot/utils/config.py index d9b79ff..0facb95 100644 --- a/starbot/utils/config.py +++ b/starbot/utils/config.py @@ -37,7 +37,7 @@ SIMPLE_CONFIG = { "BUVID3": None, # 成功连接所有主播直播间的最大等待时长,可使得日志输出顺序更加易读,一般无需修改此处,单位:秒 - "WAIT_FOR_ALL_CONNECTION_TIMEOUT": 60, + "WAIT_FOR_ALL_CONNECTION_TIMEOUT": 30, # 是否自动判断仅连接必要的直播间,即当某直播间的开播、下播、直播报告开关均未开启时,自动跳过连接直播间,以节省性能 "ONLY_CONNECT_NECESSARY_ROOM": False, @@ -78,9 +78,6 @@ SIMPLE_CONFIG = { "USE_HTTP_API": False, # HTTP API 端口 "HTTP_API_PORT": 8088, - - # 消息发送间隔,消息发送过快容易被风控,单位:秒 - "MESSAGE_SEND_INTERVAL": 0.5, # 命令触发前缀 "COMMAND_PREFIX": "", @@ -94,9 +91,7 @@ SIMPLE_CONFIG = { # 风控发送失败消息滞留时间上限,消息因风控滞留超出此时长不会进行补发,0 为无限制,单位:秒 "RESEND_TIME_LIMIT": 0, # 是否补发开播推送、下播推送、直播报告、动态推送中的 @全体成员 和 @群成员 消息,可能造成不必要的打扰,不推荐开启 - "RESEND_AT_MESSAGE": False, - # 是否补发除开播推送、下播推送、直播报告、动态推送外的其他消息,如群内命令所触发的回复消息 - "RESEND_ALL_MESSAGE": False + "RESEND_AT_MESSAGE": False } FULL_CONFIG = { @@ -136,7 +131,7 @@ FULL_CONFIG = { "BUVID3": None, # 成功连接所有主播直播间的最大等待时长,可使得日志输出顺序更加易读,一般无需修改此处,单位:秒 - "WAIT_FOR_ALL_CONNECTION_TIMEOUT": 60, + "WAIT_FOR_ALL_CONNECTION_TIMEOUT": 30, # 是否自动判断仅连接必要的直播间,即当某直播间的开播、下播、直播报告开关均未开启时,自动跳过连接直播间,以节省性能 "ONLY_CONNECT_NECESSARY_ROOM": False, @@ -178,9 +173,6 @@ FULL_CONFIG = { # HTTP API 端口 "HTTP_API_PORT": 8088, - # 消息发送间隔,消息发送过快容易被风控,单位:秒 - "MESSAGE_SEND_INTERVAL": 0.5, - # 命令触发前缀 "COMMAND_PREFIX": "", # 每个群开播 @ 我命令人数上限,单次 @ 人数过多容易被风控,不推荐修改 @@ -193,9 +185,7 @@ FULL_CONFIG = { # 风控发送失败消息滞留时间上限,消息因风控滞留超出此时长不会进行补发,0 为无限制,单位:秒 "RESEND_TIME_LIMIT": 0, # 是否补发开播推送、下播推送、直播报告、动态推送中的 @全体成员 和 @群成员 消息,可能造成不必要的打扰,不推荐开启 - "RESEND_AT_MESSAGE": False, - # 是否补发除开播推送、下播推送、直播报告、动态推送外的其他消息,如群内命令所触发的回复消息 - "RESEND_ALL_MESSAGE": False + "RESEND_AT_MESSAGE": False } use_config = SIMPLE_CONFIG