From 063608a3add64366b3d9ed062c1cfd51b3e9d23d Mon Sep 17 00:00:00 2001 From: LWR Date: Thu, 16 Feb 2023 21:38:22 +0800 Subject: [PATCH] feat: MySQL datasource dynamically adds user and dynamically reloads push targets support --- starbot/core/datasource.py | 203 +++++++++++++++++++++++++------------ starbot/core/room.py | 8 ++ starbot/utils/redis.py | 8 ++ 3 files changed, 156 insertions(+), 63 deletions(-) diff --git a/starbot/core/datasource.py b/starbot/core/datasource.py index 06048ef..0ebd5d6 100644 --- a/starbot/core/datasource.py +++ b/starbot/core/datasource.py @@ -91,6 +91,24 @@ class DataSource(metaclass=abc.ABCMeta): raise DataSourceException(f"不存在的 UID: {uid}") return up + def get_bot(self, qq: int) -> Bot: + """ + 根据 QQ 获取 Bot 实例 + + Args: + qq: 需要获取 Bot 的 QQ + + Returns: + Bot 实例 + + Raises: + DataSourceException: QQ 不存在 + """ + bot = next((b for b in self.bots if b.qq == qq), None) + if bot is None: + raise DataSourceException(f"不存在的 QQ: {qq}") + return bot + def get_ups_by_target(self, target_id: int, target_type: PushType) -> List[Up]: """ 根据推送目标号码和推送目标类型获取 Up 实例列表 @@ -272,6 +290,82 @@ class MySQLDataSource(DataSource): except pymysql.err.Error as ex: raise DataSourceException(f"从 MySQL 中读取配置时发生了错误 {ex}") + async def __load_targets(self, uid: int) -> List[PushTarget]: + """ + 从 MySQL 中读取指定 UID 的推送配置 + + Args: + uid: 要读取配置的 UID + + Returns: + 推送目标列表 + """ + live_on = await self.__query( + "SELECT g.`uid`, g.`uname`, g.`room_id`, `key`, `type`, `num`, `enabled`, `message` " + "FROM `groups` AS `g` LEFT JOIN `live_on` AS `l` " + "ON g.`uid` = l.`uid` AND g.`index` = l.`index` " + f"WHERE g.`uid` = {uid} " + "ORDER BY g.`index`" + ) + live_off = await self.__query( + "SELECT g.`uid`, g.`uname`, g.`room_id`, `key`, `type`, `num`, `enabled`, `message` " + "FROM `groups` AS `g` LEFT JOIN `live_off` AS `l` " + "ON g.`uid` = l.`uid` AND g.`index` = l.`index` " + f"WHERE g.`uid` = {uid} " + "ORDER BY g.`index`" + ) + live_report = await self.__query( + "SELECT g.`uid`, g.`uname`, g.`room_id`, `key`, `type`, `num`, " + "`enabled`, `logo`, `logo_base64`, `time`, `fans_change`, `fans_medal_change`, `guard_change`, " + "`danmu`, `box`, `gift`, `sc`, `guard`, " + "`danmu_ranking`, `box_ranking`, `box_profit_ranking`, `gift_ranking`, `sc_ranking`, " + "`guard_list`, `box_profit_diagram`, `danmu_diagram`, `box_diagram`, `gift_diagram`, " + "`sc_diagram`, `guard_diagram`, `danmu_cloud` " + "FROM `groups` AS `g` LEFT JOIN `live_report` AS `l` " + "ON g.`uid` = l.`uid` AND g.`index` = l.`index` " + f"WHERE g.`uid` = {uid} " + "ORDER BY g.`index`" + ) + dynamic_update = await self.__query( + "SELECT g.`uid`, g.`uname`, g.`room_id`, `key`, `type`, `num`, `enabled`, `message` " + "FROM `groups` AS `g` LEFT JOIN `dynamic_update` AS `d` " + "ON g.`uid` = d.`uid` AND g.`index` = d.`index` " + f"WHERE g.`uid` = {uid} " + "ORDER BY g.`index`" + ) + + targets = [] + for i, target in enumerate(live_on): + if all((live_on[i]["enabled"], live_on[i]["message"])): + on = LiveOn(**live_on[i]) + else: + on = LiveOn() + if all((live_off[i]["enabled"], live_off[i]["message"])): + off = LiveOff(**live_off[i]) + else: + off = LiveOff() + if live_report[i]["enabled"]: + report = LiveReport(**live_report[i]) + else: + report = LiveReport() + if all((dynamic_update[i]["enabled"], dynamic_update[i]["message"])): + update = DynamicUpdate(**dynamic_update[i]) + else: + update = DynamicUpdate() + + targets.append( + PushTarget( + id=target["num"], + type=target["type"], + live_on=on, + live_off=off, + live_report=report, + dynamic_update=update + ) + ) + + return targets + async def load(self): """ 从 MySQL 中初始化配置 @@ -296,69 +390,7 @@ class MySQLDataSource(DataSource): for now_user in bot_users: uid = now_user.get("uid") - live_on = await self.__query( - "SELECT g.`uid`, g.`uname`, g.`room_id`, `key`, `type`, `num`, `enabled`, `message` " - "FROM `groups` AS `g` LEFT JOIN `live_on` AS `l` " - "ON g.`uid` = l.`uid` AND g.`index` = l.`index` " - f"WHERE g.`uid` = {uid} " - "ORDER BY g.`index`" - ) - live_off = await self.__query( - "SELECT g.`uid`, g.`uname`, g.`room_id`, `key`, `type`, `num`, `enabled`, `message` " - "FROM `groups` AS `g` LEFT JOIN `live_off` AS `l` " - "ON g.`uid` = l.`uid` AND g.`index` = l.`index` " - f"WHERE g.`uid` = {uid} " - "ORDER BY g.`index`" - ) - live_report = await self.__query( - "SELECT g.`uid`, g.`uname`, g.`room_id`, `key`, `type`, `num`, " - "`enabled`, `logo`, `logo_base64`, `time`, `fans_change`, `fans_medal_change`, `guard_change`, " - "`danmu`, `box`, `gift`, `sc`, `guard`, " - "`danmu_ranking`, `box_ranking`, `box_profit_ranking`, `gift_ranking`, `sc_ranking`, " - "`guard_list`, `box_profit_diagram`, `danmu_diagram`, `box_diagram`, `gift_diagram`, " - "`sc_diagram`, `guard_diagram`, `danmu_cloud` " - "FROM `groups` AS `g` LEFT JOIN `live_report` AS `l` " - "ON g.`uid` = l.`uid` AND g.`index` = l.`index` " - f"WHERE g.`uid` = {uid} " - "ORDER BY g.`index`" - ) - dynamic_update = await self.__query( - "SELECT g.`uid`, g.`uname`, g.`room_id`, `key`, `type`, `num`, `enabled`, `message` " - "FROM `groups` AS `g` LEFT JOIN `dynamic_update` AS `d` " - "ON g.`uid` = d.`uid` AND g.`index` = d.`index` " - f"WHERE g.`uid` = {uid} " - "ORDER BY g.`index`" - ) - - targets = [] - for i, target in enumerate(live_on): - if all((live_on[i]["enabled"], live_on[i]["message"])): - on = LiveOn(**live_on[i]) - else: - on = LiveOn() - if all((live_off[i]["enabled"], live_off[i]["message"])): - off = LiveOff(**live_off[i]) - else: - off = LiveOff() - if live_report[i]["enabled"]: - report = LiveReport(**live_report[i]) - else: - report = LiveReport() - if all((dynamic_update[i]["enabled"], dynamic_update[i]["message"])): - update = DynamicUpdate(**dynamic_update[i]) - else: - update = DynamicUpdate() - - targets.append( - PushTarget( - id=target["num"], - type=target["type"], - live_on=on, - live_off=off, - live_report=report, - dynamic_update=update - ) - ) + targets = await self.__load_targets(uid) ups.append(Up(uid=uid, targets=targets)) @@ -366,3 +398,48 @@ class MySQLDataSource(DataSource): super().format_data() logger.success(f"成功从 MySQL 中导入了 {len(self.get_up_list())} 个 UP 主") + + async def reload_targets(self, up: Union[int, Up]): + """ + 重新从 MySQL 中读取特定 Up 的推送配置 + + Args: + up: 需要重载配置的 Up 实例或其 UID + """ + if isinstance(up, int): + try: + up = self.get_up(up) + except DataSourceException: + logger.warning(f"重载配置时出现异常, UID: {up} 不存在") + return + + logger.info(f"开始从 MySQL 中重载 {up.uname} (UID: {up.uid}, 房间号: {up.room_id}) 的推送配置") + + if not self.__pool: + await self.__connect() + + up.targets = await self.__load_targets(up.uid) + + super().format_data() + logger.success(f"已成功重载 {up.uname} (UID: {up.uid}, 房间号: {up.room_id}) 的推送配置") + + async def load_new(self, uid: int): + """ + 从 MySQL 中追加读取指定 UID 的用户 + + Args: + uid: 需要追加读取配置的 UID + """ + user = await self.__query(f"SELECT * FROM `bot` WHERE uid = {uid}") + if len(user) == 0: + logger.error(f"载入 UID: {uid} 的推送配置失败, UID 不存在") + raise DataSourceException(f"载入 UID: {uid} 的推送配置失败, UID 不存在") + + bot = user[0].get("bot") + targets = await self.__load_targets(uid) + up = Up(uid=uid, targets=targets) + self.get_bot(bot).ups.append(up) + super().format_data() + logger.success(f"已成功载入 UID: {uid} 的推送配置") + + await up.connect() diff --git a/starbot/core/room.py b/starbot/core/room.py index f0253b4..eab9f87 100644 --- a/starbot/core/room.py +++ b/starbot/core/room.py @@ -155,6 +155,14 @@ class Up(BaseModel): else: logger.success(f"已成功连接到 {self.uname} 的直播间 {self.room_id}") + if not await redis.exists_live_status(self.room_id): + room_info = await self.__live_room.get_room_play_info() + status = room_info["live_status"] + await redis.set_live_status(self.room_id, status) + if status == 1: + start_time = room_info["live_time"] + await redis.set_live_start_time(self.room_id, start_time) + self.__is_reconnect = True @self.__room.on("LIVE") diff --git a/starbot/utils/redis.py b/starbot/utils/redis.py index 4186e2a..c83aeef 100644 --- a/starbot/utils/redis.py +++ b/starbot/utils/redis.py @@ -207,6 +207,10 @@ async def zunionstore(dest: str, source: Union[str, List[str]]): # 直播间状态,0:未开播,1:正在直播,2:轮播 +async def exists_live_status(room_id: int) -> bool: + return await hexists("LiveStatus", room_id) + + async def get_live_status(room_id: int) -> int: return await hgeti("LiveStatus", room_id) @@ -217,6 +221,10 @@ async def set_live_status(room_id: int, status: int): # 直播开始时间 +async def exists_live_start_time(room_id: int) -> bool: + return await hexists("StartTime", room_id) + + async def get_live_start_time(room_id: int) -> int: return await hgeti("StartTime", room_id)