feat: Automatically use the at me list when the number of at all is insufficient or the bot is not an administrator

This commit is contained in:
LWR 2023-02-05 19:48:22 +08:00
parent 06d6b4eced
commit 4f7b48ddae
7 changed files with 108 additions and 73 deletions

View File

@ -130,9 +130,6 @@ class StarBot:
logger.info("开始运行 Ariadne 消息推送模块")
for bot in self.__datasource.bots:
bot.start_sender()
try:
Ariadne.launch_blocking()
except RuntimeError as ex:

View File

@ -171,7 +171,7 @@ class LiveReport(BaseModel):
生成弹幕词云
"""
return LiveReport(enabled=True, logo=None, logo_base64=None,
time=True, fans_change=True, fans_medal_change=True,guard_change=True,
time=True, fans_change=True, fans_medal_change=True, guard_change=True,
danmu=True, box=True, gift=True, sc=True, guard=True,
danmu_ranking=3, box_ranking=3, box_profit_ranking=3, gift_ranking=3, sc_ranking=3,
guard_list=True, box_profit_diagram=True,

View File

@ -174,8 +174,8 @@ class Up(BaseModel):
if is_reconnect:
logger.opt(colors=True).info(f"<magenta>[断线重连] {self.uname} ({self.room_id})</>")
if config.get("UP_DISCONNECT_CONNECT_MESSAGE"):
self.__bot.send_to_all_target(self, config.get("UP_DISCONNECT_CONNECT_MESSAGE"),
lambda t: t.live_on.enabled)
await self.__bot.send_to_all_target(self, config.get("UP_DISCONNECT_CONNECT_MESSAGE"),
lambda t: t.live_on.enabled)
else:
logger.opt(colors=True).info(f"<magenta>[开播] {self.uname} ({self.room_id})</>")
@ -202,7 +202,7 @@ class Up(BaseModel):
"{cover}": "".join(["{urlpic=", arg_base["cover"], "}"])
}
await self.__bot.send_live_on_at(self)
self.__bot.send_live_on(self, args)
await self.__bot.send_live_on(self, args)
@self.__room.on("PREPARING")
async def live_off(event):
@ -223,8 +223,8 @@ class Up(BaseModel):
live_report_param = await self.__generate_live_report_param()
# 推送下播消息和直播报告
self.__bot.send_live_off(self, live_off_args)
self.__bot.send_live_report(self, live_report_param)
await self.__bot.send_live_off(self, live_off_args)
await self.__bot.send_live_report(self, live_report_param)
danmu_items = ["danmu", "danmu_ranking", "danmu_diagram", "danmu_cloud"]
if not config.get("ONLY_HANDLE_NECESSARY_EVENT") or self.__any_live_report_item_enabled(danmu_items):
@ -372,7 +372,7 @@ class Up(BaseModel):
"{picture}": "".join(["{base64pic=", base64str, "}"])
}
await self.__bot.send_dynamic_at(self)
self.__bot.send_dynamic_update(self, dynamic_update_args)
await self.__bot.send_dynamic_update(self, dynamic_update_args)
async def __generate_live_report_param(self):
"""

View File

@ -1,9 +1,11 @@
import asyncio
import time
from asyncio import AbstractEventLoop
from typing import Optional, List, Dict, Any, Union, Callable
from graia.ariadne import Ariadne
from graia.ariadne.connection.config import config as AriadneConfig, HttpClientConfig, WebsocketClientConfig
from graia.ariadne.exception import RemoteException
from graia.ariadne.message.chain import MessageChain
from graia.ariadne.message.element import At, AtAll
from graia.ariadne.model import LogConfig, MemberPerm
@ -12,6 +14,7 @@ from pydantic import BaseModel, PrivateAttr
from .model import LiveOn, LiveOff, DynamicUpdate, Message, PushType, PushTarget
from .room import Up
from ..exception.AtAllLimitedException import AtAllLimitedException
from ..painter.LiveReportGenerator import LiveReportGenerator
from ..utils import config, redis
from ..utils.AsyncEvent import AsyncEvent
@ -34,8 +37,11 @@ class Bot(BaseModel, AsyncEvent):
__bot: Optional[Ariadne] = PrivateAttr()
"""Ariadne 实例"""
__at_all_limited: Optional[int] = PrivateAttr()
"""@全体成员次数用尽时所在日期"""
__queue: Optional[List[Message]] = PrivateAttr()
"""待发送消息队列"""
"""消息补发队列"""
def __init__(self, **data: Any):
super().__init__(**data)
@ -49,40 +55,53 @@ class Bot(BaseModel, AsyncEvent):
),
log_config=LogConfig(log_level="DEBUG")
)
self.__at_all_limited = time.localtime(time.time() - 86400).tm_yday
self.__queue = []
# 注入 Bot 实例引用
for up in self.ups:
up.inject_bot(self)
def start_sender(self):
self.__loop.create_task(self.__sender())
logger.success(f"Bot [{self.qq}] 已启动")
def send_message(self, msg: Message):
self.__queue.append(msg)
async def __sender(self):
async def send_message(self, msg: Message):
"""
消息发送模块
"""
interval = config.get("MESSAGE_SEND_INTERVAL")
消息发送
while True:
if self.__queue:
msg = self.__queue[0]
if msg.type == PushType.Friend:
for message in msg.get_message_chains():
logger.info(f"{self.qq} -> 好友[{msg.id}] : {message.safe_display}")
await self.__bot.send_friend_message(msg.id, message)
else:
for message in await self.group_message_filter(msg):
logger.info(f"{self.qq} -> 群[{msg.id}] : {message.safe_display}")
await self.__bot.send_group_message(msg.id, message)
self.__queue.pop(0)
await asyncio.sleep(interval)
else:
await asyncio.sleep(0.1)
Args:
msg: Message 实例
Raises:
"""
exception = None
if msg.type == PushType.Friend:
for message in msg.get_message_chains():
try:
await self.__bot.send_friend_message(msg.id, message)
logger.info(f"{self.qq} -> 好友[{msg.id}] : {message.safe_display}")
except RemoteException as ex:
logger.exception("消息推送模块异常", ex)
continue
else:
msgs = await self.group_message_filter(msg)
if any([(AtAll in x) for x in msg.get_message_chains()]) and all([(AtAll not in x) for x in msgs]):
exception = AtAllLimitedException()
for message in msgs:
try:
await self.__bot.send_group_message(msg.id, message)
logger.info(f"{self.qq} -> 群[{msg.id}] : {message.safe_display}")
except RemoteException as ex:
if "AT_ALL_LIMITED" in str(ex):
exception = AtAllLimitedException()
self.__at_all_limited = time.localtime(time.time()).tm_yday
continue
else:
logger.exception("消息推送模块异常", ex)
continue
if exception is not None:
raise exception
async def group_message_filter(self, message: Message) -> List[MessageChain]:
"""
@ -107,10 +126,14 @@ class Bot(BaseModel, AsyncEvent):
for chain in message.get_message_chains():
if AtAll in chain:
# 过滤 Bot 不是群管理员时的 @全体成员 消息
bot_info = await self.__bot.get_member(self.qq, message.id)
bot_info = await self.__bot.get_member(message.id, self.qq)
if bot_info.permission < MemberPerm.Administrator:
chain = chain.exclude(AtAll)
# 过滤已超出当日次数上限的 @全体成员 消息
if time.localtime(time.time()).tm_yday == self.__at_all_limited:
chain = chain.exclude(AtAll)
# 过滤多余的 @全体成员 消息
if chain.count(AtAll) > 1:
elements = [e for e in chain.exclude(AtAll)]
@ -137,7 +160,7 @@ class Bot(BaseModel, AsyncEvent):
return new_chains
def send_to_all_target(self, up: Up, msg: str, target_filter: Callable[[PushTarget], bool] = lambda t: True):
async def send_to_all_target(self, up: Up, msg: str, target_filter: Callable[[PushTarget], bool] = lambda t: True):
"""
发送消息至 UP 主下所有推送目标
@ -151,11 +174,11 @@ class Bot(BaseModel, AsyncEvent):
for target in up.targets:
if target_filter(target):
self.send_message(Message(id=target.id, content=msg, type=target.type))
await self.send_message(Message(id=target.id, content=msg, type=target.type))
def __send_push_message(self, up: Up,
type_selector: Callable[[PushTarget], Union[LiveOn, LiveOff, DynamicUpdate]],
args: Dict[str, Any]):
async def __send_push_message(self, up: Up,
type_selector: Callable[[PushTarget], Union[LiveOn, LiveOff, DynamicUpdate]],
args: Dict[str, Any]):
"""
发送推送消息至 UP 主下启用此推送类型的推送目标
@ -176,9 +199,9 @@ class Bot(BaseModel, AsyncEvent):
message = select.message
for arg, val in args.items():
message = message.replace(arg, str(val))
self.send_message(Message(id=target.id, content=message, type=target.type))
await self.send_message(Message(id=target.id, content=message, type=target.type))
def send_live_on(self, up: Up, args: Dict[str, Any]):
async def send_live_on(self, up: Up, args: Dict[str, Any]):
"""
发送开播消息至 UP 主下启用开播推送的推送目标
@ -186,24 +209,28 @@ class Bot(BaseModel, AsyncEvent):
up: 要发送的 UP 主实例
args: 占位符参数
"""
self.__send_push_message(up, lambda t: t.live_on, args)
try:
await self.__send_push_message(up, lambda t: t.live_on, args)
except AtAllLimitedException:
await self.send_live_on_at(up, True)
async def send_live_on_at(self, up: Up):
async def send_live_on_at(self, up: Up, limited: bool = False):
"""
发送开播 @ 我列表中的 @ 消息
Args:
up: 要发送的 UP 主实例
limited: 是否为 @全体成员次数达到上限时发送默认False
"""
if not isinstance(up, Up):
return
for target in filter(lambda t: t.type == PushType.Group, up.targets):
if target.live_on.enabled:
if target.live_on.enabled and (limited or "{atall}" not in target.live_on.message):
ats = " ".join(["{at" + str(x) + "}" for x in await redis.range_live_on_at(target.id)])
self.send_message(Message(id=target.id, content=ats, type=target.type))
await self.send_message(Message(id=target.id, content=ats, type=target.type))
def send_live_off(self, up: Up, args: Dict[str, Any]):
async def send_live_off(self, up: Up, args: Dict[str, Any]):
"""
发送下播消息至 UP 主下启用下播推送的推送目标
@ -211,9 +238,9 @@ class Bot(BaseModel, AsyncEvent):
up: 要发送的 UP 主实例
args: 占位符参数
"""
self.__send_push_message(up, lambda t: t.live_off, args)
await self.__send_push_message(up, lambda t: t.live_off, args)
def send_live_report(self, up: Up, param: Dict[str, Any]):
async def send_live_report(self, up: Up, param: Dict[str, Any]):
"""
发送直播报告消息至 UP 主下启用直播报告推送的推送目标
@ -223,9 +250,11 @@ class Bot(BaseModel, AsyncEvent):
"""
for target in filter(lambda t: t.live_report.enabled, up.targets):
base64str = LiveReportGenerator.generate(param, target.live_report)
self.send_message(Message(id=target.id, content="".join(["{base64pic=", base64str, "}"]), type=target.type))
await self.send_message(
Message(id=target.id, content="".join(["{base64pic=", base64str, "}"]), type=target.type)
)
def send_dynamic_update(self, up: Up, args: Dict[str, Any]):
async def send_dynamic_update(self, up: Up, args: Dict[str, Any]):
"""
发送动态消息至 UP 主下启用动态推送的推送目标
@ -233,22 +262,26 @@ class Bot(BaseModel, AsyncEvent):
up: 要发送的 UP 主实例
args: 占位符参数
"""
self.__send_push_message(up, lambda t: t.dynamic_update, args)
try:
await self.__send_push_message(up, lambda t: t.dynamic_update, args)
except AtAllLimitedException:
await self.send_dynamic_at(up, True)
async def send_dynamic_at(self, up: Up):
async def send_dynamic_at(self, up: Up, limited: bool = False):
"""
发送动态 @ 我列表中的 @ 消息
Args:
up: 要发送的 UP 主实例
limited: 是否为 @全体成员次数达到上限时发送默认False
"""
if not isinstance(up, Up):
return
for target in filter(lambda t: t.type == PushType.Group, up.targets):
if target.dynamic_update.enabled:
if target.dynamic_update.enabled and (limited or "{atall}" not in target.dynamic_update.message):
ats = " ".join(["{at" + str(x) + "}" for x in await redis.range_dynamic_at(target.id)])
self.send_message(Message(id=target.id, content=ats, type=target.type))
await self.send_message(Message(id=target.id, content=ats, type=target.type))
def __eq__(self, other):
if isinstance(other, Bot):

View File

@ -23,7 +23,7 @@ async def send(request: aiohttp.web.Request) -> aiohttp.web.Response:
target = datasource.get_target_by_key(key)
bot = datasource.get_bot_by_key(key)
msg = Message(id=target.id, content=message, type=target.type)
bot.send_message(msg)
await bot.send_message(msg)
return web.Response(text="success")
except DataSourceException:
logger.warning(f"HTTP API 推送失败, 不存在的推送 key: {key}")

View File

@ -0,0 +1,15 @@
"""
@ 全体成员次数达到上限异常
"""
from .ApiException import ApiException
class AtAllLimitedException(ApiException):
"""
@ 全体成员次数达到上限异常
"""
def __init__(self):
super().__init__()
self.msg = "今日 @ 全体成员次数已达到上限"

View File

@ -37,7 +37,7 @@ SIMPLE_CONFIG = {
"BUVID3": None,
# 成功连接所有主播直播间的最大等待时长,可使得日志输出顺序更加易读,一般无需修改此处,单位:秒
"WAIT_FOR_ALL_CONNECTION_TIMEOUT": 60,
"WAIT_FOR_ALL_CONNECTION_TIMEOUT": 30,
# 是否自动判断仅连接必要的直播间,即当某直播间的开播、下播、直播报告开关均未开启时,自动跳过连接直播间,以节省性能
"ONLY_CONNECT_NECESSARY_ROOM": False,
@ -78,9 +78,6 @@ SIMPLE_CONFIG = {
"USE_HTTP_API": False,
# HTTP API 端口
"HTTP_API_PORT": 8088,
# 消息发送间隔,消息发送过快容易被风控,单位:秒
"MESSAGE_SEND_INTERVAL": 0.5,
# 命令触发前缀
"COMMAND_PREFIX": "",
@ -94,9 +91,7 @@ SIMPLE_CONFIG = {
# 风控发送失败消息滞留时间上限消息因风控滞留超出此时长不会进行补发0 为无限制,单位:秒
"RESEND_TIME_LIMIT": 0,
# 是否补发开播推送、下播推送、直播报告、动态推送中的 @全体成员 和 @群成员 消息,可能造成不必要的打扰,不推荐开启
"RESEND_AT_MESSAGE": False,
# 是否补发除开播推送、下播推送、直播报告、动态推送外的其他消息,如群内命令所触发的回复消息
"RESEND_ALL_MESSAGE": False
"RESEND_AT_MESSAGE": False
}
FULL_CONFIG = {
@ -136,7 +131,7 @@ FULL_CONFIG = {
"BUVID3": None,
# 成功连接所有主播直播间的最大等待时长,可使得日志输出顺序更加易读,一般无需修改此处,单位:秒
"WAIT_FOR_ALL_CONNECTION_TIMEOUT": 60,
"WAIT_FOR_ALL_CONNECTION_TIMEOUT": 30,
# 是否自动判断仅连接必要的直播间,即当某直播间的开播、下播、直播报告开关均未开启时,自动跳过连接直播间,以节省性能
"ONLY_CONNECT_NECESSARY_ROOM": False,
@ -178,9 +173,6 @@ FULL_CONFIG = {
# HTTP API 端口
"HTTP_API_PORT": 8088,
# 消息发送间隔,消息发送过快容易被风控,单位:秒
"MESSAGE_SEND_INTERVAL": 0.5,
# 命令触发前缀
"COMMAND_PREFIX": "",
# 每个群开播 @ 我命令人数上限,单次 @ 人数过多容易被风控,不推荐修改
@ -193,9 +185,7 @@ FULL_CONFIG = {
# 风控发送失败消息滞留时间上限消息因风控滞留超出此时长不会进行补发0 为无限制,单位:秒
"RESEND_TIME_LIMIT": 0,
# 是否补发开播推送、下播推送、直播报告、动态推送中的 @全体成员 和 @群成员 消息,可能造成不必要的打扰,不推荐开启
"RESEND_AT_MESSAGE": False,
# 是否补发除开播推送、下播推送、直播报告、动态推送外的其他消息,如群内命令所触发的回复消息
"RESEND_ALL_MESSAGE": False
"RESEND_AT_MESSAGE": False
}
use_config = SIMPLE_CONFIG