feat: Command support

This commit is contained in:
LWR 2023-02-04 16:53:28 +08:00
parent 813c8d2d84
commit 06d6b4eced
21 changed files with 1718 additions and 27 deletions

89
starbot/commands/bind.py Normal file
View File

@ -0,0 +1,89 @@
from typing import Union, Optional
from graia.ariadne import Ariadne
from graia.ariadne.event.message import FriendMessage, GroupMessage
from graia.ariadne.message.chain import MessageChain
from graia.ariadne.message.element import Source
from graia.ariadne.message.parser.twilight import Twilight, FullMatch, UnionMatch, ParamMatch, ResultValue
from graia.ariadne.model import Friend, Member, Group
from graia.ariadne.util.interrupt import FunctionWaiter
from graia.saya import Channel
from graia.saya.builtins.broadcast import ListenerSchema
from ..utils import config, redis
from ..utils.network import request
from ..utils.utils import remove_command_param_placeholder
prefix = config.get("COMMAND_PREFIX")
channel = Channel.current()
@channel.use(
ListenerSchema(
listening_events=[FriendMessage, GroupMessage],
inline_dispatchers=[Twilight(
FullMatch(prefix),
UnionMatch("绑定", "bind"),
"uid" @ ParamMatch()
)],
)
)
async def bind(app: Ariadne,
source: Source,
sender: Union[Friend, Group],
member: Optional[Member],
uid: MessageChain = ResultValue()):
if isinstance(sender, Group) and await redis.exists_disable_command("DenyBind", sender.id):
await app.send_message(sender, MessageChain("此命令已被禁用~"), quote=source)
return
uid = remove_command_param_placeholder(uid.display)
if not uid.isdigit() or int(uid) == 0:
await app.send_message(sender, MessageChain("请输入正确的UID~"), quote=source)
return
if isinstance(sender, Friend):
qq = sender.id
else:
qq = member.id
uid = int(uid)
info = await request("GET", f"https://api.live.bilibili.com/live_user/v1/Master/info?uid={uid}")
uname = info["info"]["uname"]
if not uname:
await app.send_message(
sender, MessageChain("要绑定的UID不正确\n请检查后重新绑定~"), quote=source
)
return
await app.send_message(
sender,
MessageChain(f"即将绑定B站账号: {uname}\n请在1分钟内发送\"{prefix}确认绑定\"以继续\n发送其他消息会取消本次绑定"),
quote=source
)
async def waiter(wait_source: Source,
wait_sender: Union[Friend, Group],
wait_member: Optional[Member],
wait_msg: MessageChain) -> bool:
if isinstance(wait_sender, Friend):
wait_qq = wait_sender.id
else:
wait_qq = wait_member.id
if wait_qq == qq:
nonlocal source
source = wait_source
if wait_msg.display == f"{prefix}确认绑定":
return True
else:
return False
result = await FunctionWaiter(waiter, [FriendMessage, GroupMessage]).wait(timeout=60)
if result:
await redis.bind_uid(qq, uid)
await app.send_message(sender, MessageChain("绑定成功~"), quote=source)

View File

@ -0,0 +1,58 @@
from graia.ariadne import Ariadne
from graia.ariadne.event.message import GroupMessage
from graia.ariadne.message.chain import MessageChain
from graia.ariadne.message.element import Source
from graia.ariadne.message.parser.twilight import Twilight, FullMatch, UnionMatch, ParamMatch, ResultValue
from graia.ariadne.model import Group, Member, MemberPerm
from graia.saya import Channel
from graia.saya.builtins.broadcast import ListenerSchema
from ..utils import config, redis
from ..utils.utils import remove_command_param_placeholder
prefix = config.get("COMMAND_PREFIX")
master = config.get("MASTER_QQ")
disable_map = {
"绑定": "DenyBind",
"直播间数据": "DenyRoomData",
"直播间总数据": "DenyRoomDataTotal",
"我的数据": "DenyUserData",
"我的总数据": "DenyUserDataTotal",
}
channel = Channel.current()
@channel.use(
ListenerSchema(
listening_events=[GroupMessage],
inline_dispatchers=[Twilight(
FullMatch(prefix),
UnionMatch("禁用", "disable"),
"name" @ ParamMatch()
)],
)
)
async def disable_command(app: Ariadne,
source: Source,
group: Group,
member: Member,
name: MessageChain = ResultValue()):
if member.permission == MemberPerm.Member and member.id != master:
await app.send_message(group, MessageChain("您没有执行此命令的权限~"), quote=source)
return
name = remove_command_param_placeholder(name.display)
if name not in disable_map:
await app.send_message(
group, MessageChain(f"输入的命令名称不正确~\n支持的命令: {''.join(disable_map.keys())}"), quote=source
)
return
if redis.exists_disable_command(disable_map[name], group.id):
await app.send_message(group, "此命令已经是禁用状态~", quote=source)
return
await redis.add_disable_command(disable_map[name], group.id)
await app.send_message(group, "命令已禁用~", quote=source)

View File

@ -0,0 +1,50 @@
from graia.ariadne import Ariadne
from graia.ariadne.event.message import GroupMessage
from graia.ariadne.message.chain import MessageChain
from graia.ariadne.message.element import Source
from graia.ariadne.message.parser.twilight import Twilight, FullMatch
from graia.ariadne.model import Member, Group
from graia.saya import Channel
from graia.saya.builtins.broadcast import ListenerSchema
from ..core.datasource import DataSource
from ..core.model import PushType
from ..utils import config, redis
prefix = config.get("COMMAND_PREFIX")
channel = Channel.current()
@channel.use(
ListenerSchema(
listening_events=[GroupMessage],
inline_dispatchers=[Twilight(
FullMatch(prefix),
FullMatch("动态@我")
)],
)
)
async def dynamic_at_me(app: Ariadne, source: Source, sender: Group, member: Member):
datasource: DataSource = app.options["StarBotDataSource"]
ups = datasource.get_ups_by_target(sender.id, PushType.Group if isinstance(sender, Group) else PushType.Friend)
if not ups:
return
if await redis.exists_dynamic_at(sender.id, member.id):
await app.send_message(sender, MessageChain("您已经在动态@名单中了~"), quote=source)
return
at_length_limit = config.get("COMMAND_DYNAMIC_AT_ME_LIMIT")
now_length = await redis.len_dynamic_at(sender.id)
if now_length >= at_length_limit:
await app.send_message(sender, MessageChain("本群的动态@名额已满~"), quote=source)
return
await redis.add_dynamic_at(sender.id, member.id)
await app.send_message(
sender,
MessageChain(f"您已经成功加入动态@名单~\n本群动态@名额还剩余 {max(0, at_length_limit - now_length - 1)}"),
quote=source
)

View File

@ -0,0 +1,49 @@
from graia.ariadne import Ariadne
from graia.ariadne.event.message import GroupMessage
from graia.ariadne.message.chain import MessageChain
from graia.ariadne.message.element import Source
from graia.ariadne.message.parser.twilight import Twilight, FullMatch, UnionMatch
from graia.ariadne.model import Member, Group
from graia.saya import Channel
from graia.saya.builtins.broadcast import ListenerSchema
from ..core.datasource import DataSource
from ..core.model import PushType
from ..utils import config, redis
prefix = config.get("COMMAND_PREFIX")
channel = Channel.current()
@channel.use(
ListenerSchema(
listening_events=[GroupMessage],
inline_dispatchers=[Twilight(
FullMatch(prefix),
UnionMatch(
"取消动态@我", "退出动态@我", "动态不@我", "动态别@我"
)
)],
)
)
async def dynamic_at_me_cancel(app: Ariadne, source: Source, sender: Group, member: Member):
datasource: DataSource = app.options["StarBotDataSource"]
ups = datasource.get_ups_by_target(sender.id, PushType.Group if isinstance(sender, Group) else PushType.Friend)
if not ups:
return
if not await redis.exists_dynamic_at(sender.id, member.id):
await app.send_message(sender, MessageChain("您不在动态@名单中~"), quote=source)
return
at_length_limit = config.get("COMMAND_DYNAMIC_AT_ME_LIMIT")
now_length = await redis.len_dynamic_at(sender.id)
await redis.delete_dynamic_at(sender.id, member.id)
await app.send_message(
sender,
MessageChain(f"您已成功退出动态@名单~\n本群动态@名额还剩余 {max(0, at_length_limit - now_length + 1)}"),
quote=source
)

View File

@ -0,0 +1,40 @@
from graia.ariadne import Ariadne
from graia.ariadne.event.message import GroupMessage
from graia.ariadne.message.chain import MessageChain
from graia.ariadne.message.element import Source
from graia.ariadne.message.parser.twilight import Twilight, FullMatch, UnionMatch
from graia.ariadne.model import Group
from graia.saya import Channel
from graia.saya.builtins.broadcast import ListenerSchema
from ..core.datasource import DataSource
from ..core.model import PushType
from ..utils import config, redis
prefix = config.get("COMMAND_PREFIX")
channel = Channel.current()
@channel.use(
ListenerSchema(
listening_events=[GroupMessage],
inline_dispatchers=[Twilight(
FullMatch(prefix),
UnionMatch("动态@列表", "动态@名单")
)],
)
)
async def dynamic_at_me_list(app: Ariadne, source: Source, sender: Group):
datasource: DataSource = app.options["StarBotDataSource"]
ups = datasource.get_ups_by_target(sender.id, PushType.Group if isinstance(sender, Group) else PushType.Friend)
if not ups:
return
if not await redis.len_dynamic_at(sender.id):
await app.send_message(sender, MessageChain("本群的动态@列表为空~"), quote=source)
return
ats = "\n".join({str(x) for x in await redis.range_dynamic_at(sender.id)})
await app.send_message(sender, MessageChain(f"本群的动态@列表如下:\n{ats}"), quote=source)

View File

@ -0,0 +1,58 @@
from graia.ariadne import Ariadne
from graia.ariadne.event.message import GroupMessage
from graia.ariadne.message.chain import MessageChain
from graia.ariadne.message.element import Source
from graia.ariadne.message.parser.twilight import Twilight, FullMatch, UnionMatch, ParamMatch, ResultValue
from graia.ariadne.model import Group, Member, MemberPerm
from graia.saya import Channel
from graia.saya.builtins.broadcast import ListenerSchema
from ..utils import config, redis
from ..utils.utils import remove_command_param_placeholder
prefix = config.get("COMMAND_PREFIX")
master = config.get("MASTER_QQ")
disable_map = {
"绑定": "DenyBind",
"直播间数据": "DenyRoomData",
"直播间总数据": "DenyRoomDataTotal",
"我的数据": "DenyUserData",
"我的总数据": "DenyUserDataTotal",
}
channel = Channel.current()
@channel.use(
ListenerSchema(
listening_events=[GroupMessage],
inline_dispatchers=[Twilight(
FullMatch(prefix),
UnionMatch("启用", "enable"),
"name" @ ParamMatch()
)],
)
)
async def enable_command(app: Ariadne,
source: Source,
group: Group,
member: Member,
name: MessageChain = ResultValue()):
if member.permission == MemberPerm.Member and member.id != master:
await app.send_message(group, MessageChain("您没有执行此命令的权限~"), quote=source)
return
name = remove_command_param_placeholder(name.display)
if name not in disable_map:
await app.send_message(
group, MessageChain(f"输入的命令名称不正确~\n支持的命令: {''.join(disable_map.keys())}"), quote=source
)
return
if not redis.exists_disable_command(disable_map[name], group.id):
await app.send_message(group, "此命令已经是启用状态~", quote=source)
return
await redis.delete_disable_command(disable_map[name], group.id)
await app.send_message(group, "命令已启用~", quote=source)

View File

@ -0,0 +1,50 @@
from graia.ariadne import Ariadne
from graia.ariadne.event.message import GroupMessage
from graia.ariadne.message.chain import MessageChain
from graia.ariadne.message.element import Source
from graia.ariadne.message.parser.twilight import Twilight, FullMatch, UnionMatch
from graia.ariadne.model import Member, Group
from graia.saya import Channel
from graia.saya.builtins.broadcast import ListenerSchema
from ..core.datasource import DataSource
from ..core.model import PushType
from ..utils import config, redis
prefix = config.get("COMMAND_PREFIX")
channel = Channel.current()
@channel.use(
ListenerSchema(
listening_events=[GroupMessage],
inline_dispatchers=[Twilight(
FullMatch(prefix),
UnionMatch("开播@我", "直播@我")
)],
)
)
async def live_on_at_me(app: Ariadne, source: Source, sender: Group, member: Member):
datasource: DataSource = app.options["StarBotDataSource"]
ups = datasource.get_ups_by_target(sender.id, PushType.Group if isinstance(sender, Group) else PushType.Friend)
if not ups:
return
if await redis.exists_live_on_at(sender.id, member.id):
await app.send_message(sender, MessageChain("您已经在开播@名单中了~"), quote=source)
return
at_length_limit = config.get("COMMAND_LIVE_ON_AT_ME_LIMIT")
now_length = await redis.len_live_on_at(sender.id)
if now_length >= at_length_limit:
await app.send_message(sender, MessageChain("本群的开播@名额已满~"), quote=source)
return
await redis.add_live_on_at(sender.id, member.id)
await app.send_message(
sender,
MessageChain(f"您已经成功加入开播@名单~\n本群开播@名额还剩余 {max(0, at_length_limit - now_length - 1)}"),
quote=source
)

View File

@ -0,0 +1,49 @@
from graia.ariadne import Ariadne
from graia.ariadne.event.message import GroupMessage
from graia.ariadne.message.chain import MessageChain
from graia.ariadne.message.element import Source
from graia.ariadne.message.parser.twilight import Twilight, FullMatch, UnionMatch
from graia.ariadne.model import Member, Group
from graia.saya import Channel
from graia.saya.builtins.broadcast import ListenerSchema
from ..core.datasource import DataSource
from ..core.model import PushType
from ..utils import config, redis
prefix = config.get("COMMAND_PREFIX")
channel = Channel.current()
@channel.use(
ListenerSchema(
listening_events=[GroupMessage],
inline_dispatchers=[Twilight(
FullMatch(prefix),
UnionMatch(
"取消开播@我", "取消直播@我", "退出开播@我", "退出直播@我", "开播不@我", "直播不@我", "开播别@我", "直播别@我"
)
)],
)
)
async def live_on_at_me_cancel(app: Ariadne, source: Source, sender: Group, member: Member):
datasource: DataSource = app.options["StarBotDataSource"]
ups = datasource.get_ups_by_target(sender.id, PushType.Group if isinstance(sender, Group) else PushType.Friend)
if not ups:
return
if not await redis.exists_live_on_at(sender.id, member.id):
await app.send_message(sender, MessageChain("您不在开播@名单中~"), quote=source)
return
at_length_limit = config.get("COMMAND_LIVE_ON_AT_ME_LIMIT")
now_length = await redis.len_live_on_at(sender.id)
await redis.delete_live_on_at(sender.id, member.id)
await app.send_message(
sender,
MessageChain(f"您已成功退出开播@名单~\n本群开播@名额还剩余 {max(0, at_length_limit - now_length + 1)}"),
quote=source
)

View File

@ -0,0 +1,40 @@
from graia.ariadne import Ariadne
from graia.ariadne.event.message import GroupMessage
from graia.ariadne.message.chain import MessageChain
from graia.ariadne.message.element import Source
from graia.ariadne.message.parser.twilight import Twilight, FullMatch, UnionMatch
from graia.ariadne.model import Group
from graia.saya import Channel
from graia.saya.builtins.broadcast import ListenerSchema
from ..core.datasource import DataSource
from ..core.model import PushType
from ..utils import config, redis
prefix = config.get("COMMAND_PREFIX")
channel = Channel.current()
@channel.use(
ListenerSchema(
listening_events=[GroupMessage],
inline_dispatchers=[Twilight(
FullMatch(prefix),
UnionMatch("开播@列表", "直播@列表", "开播@名单", "直播@名单")
)],
)
)
async def live_on_at_me_list(app: Ariadne, source: Source, sender: Group):
datasource: DataSource = app.options["StarBotDataSource"]
ups = datasource.get_ups_by_target(sender.id, PushType.Group if isinstance(sender, Group) else PushType.Friend)
if not ups:
return
if not await redis.len_live_on_at(sender.id):
await app.send_message(sender, MessageChain("本群的开播@列表为空~"), quote=source)
return
ats = "\n".join({str(x) for x in await redis.range_live_on_at(sender.id)})
await app.send_message(sender, MessageChain(f"本群的开播@列表如下:\n{ats}"), quote=source)

View File

@ -0,0 +1,114 @@
import time
from typing import Union
from graia.ariadne import Ariadne
from graia.ariadne.event.message import FriendMessage, GroupMessage
from graia.ariadne.message.chain import MessageChain
from graia.ariadne.message.element import Image, Source
from graia.ariadne.message.parser.twilight import Twilight, FullMatch
from graia.ariadne.model import Friend, Group
from graia.saya import Channel
from graia.saya.builtins.broadcast import ListenerSchema
from ..core.datasource import DataSource
from ..core.model import PushType
from ..painter.PicGenerator import PicGenerator, Color
from ..utils import config, redis
from ..utils.utils import timestamp_format, get_unames_and_faces_by_uids, mask_round
prefix = config.get("COMMAND_PREFIX")
channel = Channel.current()
@channel.use(
ListenerSchema(
listening_events=[FriendMessage, GroupMessage],
inline_dispatchers=[Twilight(
FullMatch(prefix),
FullMatch("直播间数据")
)],
)
)
async def room_data(app: Ariadne, source: Source, sender: Union[Friend, Group]):
if isinstance(sender, Group) and await redis.exists_disable_command("DenyRoomData", sender.id):
await app.send_message(sender, MessageChain("此命令已被禁用~"), quote=source)
return
datasource: DataSource = app.options["StarBotDataSource"]
ups = datasource.get_ups_by_target(sender.id, PushType.Group if isinstance(sender, Group) else PushType.Friend)
if not ups:
return
for up in ups:
width = 1000
height = 100000
face_size = 100
uname, face = await get_unames_and_faces_by_uids([str(up.uid)])
uname = uname[0]
face = face[0]
generator = PicGenerator(width, height)
pic = generator.set_pos(175, 80).draw_rounded_rectangle(0, 0, width, height, 35, Color.WHITE).copy_bottom(35)
pic.draw_img_alpha(mask_round(face.resize((face_size, face_size)).convert("RGBA")), (50, 50))
pic.draw_section(f"{uname} 的直播间数据").set_pos(50, 150 + pic.row_space)
pic.draw_tip(f"UID: {up.uid} 房间号: {up.room_id}")
pic.draw_tip("此处为本直播间最近一场直播的数据")
pic.draw_tip("使用\"直播间总数据\"命令可查询本直播间累计总数据")
pic.draw_tip(f"查询时间: {timestamp_format(int(time.time()), '%Y/%m/%d %H:%M:%S')}")
status = await redis.get_live_status(up.room_id)
status_str = "正在直播" if status == 1 else "未开播"
status_color = Color.RED if status == 1 else Color.GREEN
pic.draw_text(["直播间状态: ", status_str], [Color.BLACK, status_color])
pic.draw_text("")
note_str = "本次直播数据: " if status == 1 else "上次直播数据: "
pic.draw_text(note_str)
danmu_count = await redis.get_room_danmu_count(up.room_id)
danmu_person_count = await redis.len_user_danmu_count(up.room_id)
box_count = await redis.get_room_box_count(up.room_id)
box_person_count = await redis.len_user_box_count(up.room_id)
box_profit = await redis.get_room_box_profit(up.room_id)
box_profit_color = Color.RED if box_profit > 0 else (Color.GREEN if box_profit < 0 else Color.GRAY)
gift_profit = await redis.get_room_gift_profit(up.room_id)
gift_person_count = await redis.len_user_gift_profit(up.room_id)
sc_profit = await redis.get_room_sc_profit(up.room_id)
sc_person_count = await redis.len_user_sc_profit(up.room_id)
captain_count = await redis.get_room_captain_count(up.room_id)
commander_count = await redis.get_room_commander_count(up.room_id)
governor_count = await redis.get_room_governor_count(up.room_id)
guard_person_count = await redis.len_user_guard_count(up.room_id)
pic.draw_text(
["收获弹幕数: ", str(danmu_count), " 条 (", str(danmu_person_count), " 人)"],
[Color.BLACK, Color.LINK, Color.BLACK, Color.LINK, Color.BLACK]
)
pic.draw_text(
["收到盲盒数: ", str(box_count), " 个 (", str(box_person_count), " 人)"],
[Color.BLACK, Color.LINK, Color.BLACK, Color.LINK, Color.BLACK]
)
pic.draw_text(["盲盒盈亏: ", str(box_profit), ""], [Color.BLACK, box_profit_color, Color.BLACK])
pic.draw_text(
["收获礼物价值: ", str(gift_profit), " 元 (", str(gift_person_count), " 人)"],
[Color.BLACK, Color.LINK, Color.BLACK, Color.LINK, Color.BLACK]
)
pic.draw_text(
["收获 SC (醒目留言) 价值: ", str(sc_profit), " 元 (", str(sc_person_count), " 人)"],
[Color.BLACK, Color.LINK, Color.BLACK, Color.LINK, Color.BLACK]
)
pic.draw_text(
["收获大航海: ", f"舰长×{captain_count} ", f"提督×{commander_count} ", f"总督×{governor_count}"],
[Color.BLACK, Color.DEEPSKYBLUE, Color.FUCHSIA, Color.CRIMSON]
)
pic.draw_text(["开通大航海人数: ", str(guard_person_count)], [Color.BLACK, Color.LINK])
# 底部版权信息,请务必保留此处
pic.draw_text("")
pic.draw_text_right(25, "Designed By StarBot", Color.GRAY)
pic.draw_text_right(25, "https://github.com/Starlwr/StarBot", Color.LINK)
pic.crop_and_paste_bottom()
await app.send_message(sender, MessageChain(Image(base64=pic.base64())))

View File

@ -0,0 +1,106 @@
import time
from typing import Union
from graia.ariadne import Ariadne
from graia.ariadne.event.message import FriendMessage, GroupMessage
from graia.ariadne.message.chain import MessageChain
from graia.ariadne.message.element import Image, Source
from graia.ariadne.message.parser.twilight import Twilight, FullMatch
from graia.ariadne.model import Friend, Group
from graia.saya import Channel
from graia.saya.builtins.broadcast import ListenerSchema
from ..core.datasource import DataSource
from ..core.model import PushType
from ..painter.PicGenerator import PicGenerator, Color
from ..utils import config, redis
from ..utils.utils import timestamp_format, get_unames_and_faces_by_uids, mask_round
prefix = config.get("COMMAND_PREFIX")
channel = Channel.current()
@channel.use(
ListenerSchema(
listening_events=[FriendMessage, GroupMessage],
inline_dispatchers=[Twilight(
FullMatch(prefix),
FullMatch("直播间总数据")
)],
)
)
async def room_data_total(app: Ariadne, source: Source, sender: Union[Friend, Group]):
if isinstance(sender, Group) and await redis.exists_disable_command("DenyRoomDataTotal", sender.id):
await app.send_message(sender, MessageChain("此命令已被禁用~"), quote=source)
return
datasource: DataSource = app.options["StarBotDataSource"]
ups = datasource.get_ups_by_target(sender.id, PushType.Group if isinstance(sender, Group) else PushType.Friend)
if not ups:
return
for up in ups:
width = 1000
height = 100000
face_size = 100
uname, face = await get_unames_and_faces_by_uids([str(up.uid)])
uname = uname[0]
face = face[0]
generator = PicGenerator(width, height)
pic = generator.set_pos(175, 80).draw_rounded_rectangle(0, 0, width, height, 35, Color.WHITE).copy_bottom(35)
pic.draw_img_alpha(mask_round(face.resize((face_size, face_size)).convert("RGBA")), (50, 50))
pic.draw_section(f"{uname} 的直播间总数据").set_pos(50, 150 + pic.row_space)
pic.draw_tip(f"UID: {up.uid} 房间号: {up.room_id}")
pic.draw_tip(f"数据自主播注册之日起开始累计")
pic.draw_tip(f"查询时间: {timestamp_format(int(time.time()), '%Y/%m/%d %H:%M:%S')}")
pic.draw_text("")
danmu_count = await redis.get_room_danmu_all(up.room_id)
danmu_person_count = await redis.len_user_danmu_all(up.room_id)
box_count = await redis.get_room_box_all(up.room_id)
box_person_count = await redis.len_user_box_all(up.room_id)
box_profit = await redis.get_room_box_profit_all(up.room_id)
box_profit_color = Color.RED if box_profit > 0 else (Color.GREEN if box_profit < 0 else Color.GRAY)
gift_profit = await redis.get_room_gift_all(up.room_id)
gift_person_count = await redis.len_user_gift_all(up.room_id)
sc_profit = await redis.get_room_sc_all(up.room_id)
sc_person_count = await redis.len_user_sc_all(up.room_id)
captain_count = await redis.get_room_captain_all(up.room_id)
commander_count = await redis.get_room_commander_all(up.room_id)
governor_count = await redis.get_room_governor_all(up.room_id)
guard_person_count = await redis.len_user_guard_all(up.room_id)
pic.draw_text(
["收获弹幕总数: ", str(danmu_count), " 条 (", str(danmu_person_count), " 人)"],
[Color.BLACK, Color.LINK, Color.BLACK, Color.LINK, Color.BLACK]
)
pic.draw_text(
["收到盲盒总数: ", str(box_count), " 个 (", str(box_person_count), " 人)"],
[Color.BLACK, Color.LINK, Color.BLACK, Color.LINK, Color.BLACK]
)
pic.draw_text(["盲盒总盈亏: ", str(box_profit), ""], [Color.BLACK, box_profit_color, Color.BLACK])
pic.draw_text(
["收获礼物总价值: ", str(gift_profit), " 元 (", str(gift_person_count), " 人)"],
[Color.BLACK, Color.LINK, Color.BLACK, Color.LINK, Color.BLACK]
)
pic.draw_text(
["收获 SC (醒目留言) 总价值: ", str(sc_profit), " 元 (", str(sc_person_count), " 人)"],
[Color.BLACK, Color.LINK, Color.BLACK, Color.LINK, Color.BLACK]
)
pic.draw_text(
["收获大航海总月数: ", f"舰长×{captain_count} ", f"提督×{commander_count} ", f"总督×{governor_count}"],
[Color.BLACK, Color.DEEPSKYBLUE, Color.FUCHSIA, Color.CRIMSON]
)
pic.draw_text(["开通大航海总人数: ", str(guard_person_count)], [Color.BLACK, Color.LINK])
# 底部版权信息,请务必保留此处
pic.draw_text("")
pic.draw_text_right(25, "Designed By StarBot", Color.GRAY)
pic.draw_text_right(25, "https://github.com/Starlwr/StarBot", Color.LINK)
pic.crop_and_paste_bottom()
await app.send_message(sender, MessageChain(Image(base64=pic.base64())))

View File

@ -0,0 +1,167 @@
import time
from typing import Union, Optional
from graia.ariadne import Ariadne
from graia.ariadne.event.message import FriendMessage, GroupMessage
from graia.ariadne.message.chain import MessageChain
from graia.ariadne.message.element import Image, Source
from graia.ariadne.message.parser.twilight import Twilight, FullMatch
from graia.ariadne.model import Friend, Group, Member
from graia.saya import Channel
from graia.saya.builtins.broadcast import ListenerSchema
from ..core.datasource import DataSource
from ..core.model import PushType
from ..painter.PicGenerator import PicGenerator, Color
from ..utils import config, redis
from ..utils.utils import timestamp_format, get_unames_and_faces_by_uids, mask_round, get_parallel_ranking
prefix = config.get("COMMAND_PREFIX")
channel = Channel.current()
@channel.use(
ListenerSchema(
listening_events=[FriendMessage, GroupMessage],
inline_dispatchers=[Twilight(
FullMatch(prefix),
FullMatch("我的数据")
)],
)
)
async def user_data(app: Ariadne, source: Source, sender: Union[Friend, Group], member: Optional[Member]):
if isinstance(sender, Group) and await redis.exists_disable_command("DenyUserData", sender.id):
await app.send_message(sender, MessageChain("此命令已被禁用~"), quote=source)
return
datasource: DataSource = app.options["StarBotDataSource"]
ups = datasource.get_ups_by_target(sender.id, PushType.Group if isinstance(sender, Group) else PushType.Friend)
if not ups:
return
if isinstance(sender, Friend):
qq = sender.id
else:
qq = member.id
uid = await redis.get_bind_uid(qq)
if not uid:
await app.send_message(
sender, MessageChain(f"请先使用\"{prefix}绑定 [UID]\"命令绑定B站UID后再查询~\n命令示例:{prefix}绑定 114514")
)
return
uname, face = await get_unames_and_faces_by_uids([str(uid)])
uname = uname[0]
face = face[0]
if not uname:
await app.send_message(
sender, MessageChain(f"当前绑定的UID不正确\n请重新使用\"{prefix}绑定 [UID]\"命令绑定B站UID后再查询~")
)
return
for up in ups:
width = 1000
height = 100000
face_size = 100
generator = PicGenerator(width, height)
pic = generator.set_pos(175, 80).draw_rounded_rectangle(0, 0, width, height, 35, Color.WHITE).copy_bottom(35)
pic.draw_img_alpha(mask_round(face.resize((face_size, face_size)).convert("RGBA")), (50, 50))
pic.draw_section(f"{uname} 的数据").set_pos(50, 150 + pic.row_space)
pic.draw_tip("此处为在本直播间最近一场直播的数据")
pic.draw_tip("使用\"我的总数据\"命令可查询在本直播间的累计总数据")
pic.draw_tip(f"查询时间: {timestamp_format(int(time.time()), '%Y/%m/%d %H:%M:%S')}")
pic.draw_text("")
user_danmu_count = await redis.get_user_danmu_count(up.room_id, uid)
user_box_count = await redis.get_user_box_count(up.room_id, uid)
user_box_profit = await redis.get_user_box_profit(up.room_id, uid)
user_gift_profit = await redis.get_user_gift_profit(up.room_id, uid)
user_sc_profit = await redis.get_user_sc_profit(up.room_id, uid)
user_captain_count = await redis.get_user_captain_count(up.room_id, uid)
user_commander_count = await redis.get_user_commander_count(up.room_id, uid)
user_governor_count = await redis.get_user_governor_count(up.room_id, uid)
if not any([user_danmu_count, user_box_count, user_gift_profit, user_sc_profit,
user_captain_count, user_commander_count, user_governor_count]):
pic.draw_text_multiline(50, f"未查询到 {uname}{up.uname} 最近一场直播中的数据")
pic.draw_text("请先在直播间中互动后再来查询")
pic.draw_text(f"或尝试使用\"{prefix}我的总数据\"命令查询总数据")
else:
pic.draw_text_multiline(50, f"{uname}{up.uname} 最近一场直播中的数据:")
if user_danmu_count:
pic.draw_text("")
user_danmu_counts = [x[1] for x in await redis.range_user_danmu_count(up.room_id)]
rank, total, diff = get_parallel_ranking(user_danmu_count, user_danmu_counts)
pic.draw_text(
["发送弹幕数: ", str(user_danmu_count), " 条 排名: ", f"{rank}/{total}"],
[Color.BLACK, Color.LINK, Color.BLACK, Color.LINK]
)
if diff is not None:
pic.draw_text(["距离上一名还需: ", str(diff), ""], [Color.BLACK, Color.LINK, Color.BLACK])
if user_box_count:
pic.draw_text("")
user_box_counts = [x[1] for x in await redis.range_user_box_count(up.room_id)]
rank, total, diff = get_parallel_ranking(user_box_count, user_box_counts)
pic.draw_text(
["开启盲盒数: ", str(user_box_count), " 个 排名: ", f"{rank}/{total}"],
[Color.BLACK, Color.LINK, Color.BLACK, Color.LINK]
)
if diff is not None:
pic.draw_text(["距离上一名还需: ", str(diff), ""], [Color.BLACK, Color.LINK, Color.BLACK])
pic.draw_text("")
user_box_profits = [x[1] for x in await redis.range_user_box_profit(up.room_id)]
rank, total, diff = get_parallel_ranking(user_box_profit, user_box_profits)
color = Color.RED if user_box_profit > 0 else (Color.GREEN if user_box_profit < 0 else Color.GRAY)
pic.draw_text(
["盲盒盈亏: ", str(user_box_profit), " 元 排名: ", f"{rank}/{total}"],
[Color.BLACK, color, Color.BLACK, Color.LINK]
)
if diff is not None:
pic.draw_text(["距离上一名还需: ", str(diff), ""], [Color.BLACK, Color.LINK, Color.BLACK])
if user_gift_profit:
pic.draw_text("")
user_gift_profits = [x[1] for x in await redis.range_user_gift_profit(up.room_id)]
rank, total, diff = get_parallel_ranking(user_gift_profit, user_gift_profits)
pic.draw_text(
["送出礼物价值: ", str(user_gift_profit), " 元 排名: ", f"{rank}/{total}"],
[Color.BLACK, Color.LINK, Color.BLACK, Color.LINK]
)
if diff is not None:
pic.draw_text(["距离上一名还需: ", str(diff), ""], [Color.BLACK, Color.LINK, Color.BLACK])
if user_sc_profit:
pic.draw_text("")
user_sc_profits = [x[1] for x in await redis.range_user_sc_profit(up.room_id)]
rank, total, diff = get_parallel_ranking(user_sc_profit, user_sc_profits)
pic.draw_text(
["发送 SC (醒目留言) 价值: ", str(user_sc_profit), " 元 排名: ", f"{rank}/{total}"],
[Color.BLACK, Color.LINK, Color.BLACK, Color.LINK]
)
if diff is not None:
pic.draw_text(["距离上一名还需: ", str(diff), ""], [Color.BLACK, Color.LINK, Color.BLACK])
if any([user_captain_count, user_commander_count, user_governor_count]):
pic.draw_text("")
if user_captain_count:
pic.draw_text(f"开通舰长: {user_captain_count}", Color.DEEPSKYBLUE)
if user_commander_count:
pic.draw_text(f"开通提督: {user_commander_count}", Color.FUCHSIA)
if user_governor_count:
pic.draw_text(f"开通总督: {user_governor_count}", Color.CRIMSON)
# 底部版权信息,请务必保留此处
pic.draw_text("")
pic.draw_text_right(25, "Designed By StarBot", Color.GRAY)
pic.draw_text_right(25, "https://github.com/Starlwr/StarBot", Color.LINK)
pic.crop_and_paste_bottom()
await app.send_message(sender, MessageChain(Image(base64=pic.base64())))

View File

@ -0,0 +1,180 @@
import time
from typing import Union, Optional
from graia.ariadne import Ariadne
from graia.ariadne.event.message import FriendMessage, GroupMessage
from graia.ariadne.message.chain import MessageChain
from graia.ariadne.message.element import Image, Source
from graia.ariadne.message.parser.twilight import Twilight, FullMatch
from graia.ariadne.model import Friend, Group, Member
from graia.saya import Channel
from graia.saya.builtins.broadcast import ListenerSchema
from ..core.datasource import DataSource
from ..core.model import PushType
from ..painter.PicGenerator import PicGenerator, Color
from ..utils import config, redis
from ..utils.utils import timestamp_format, get_unames_and_faces_by_uids, mask_round, get_parallel_ranking, get_ratio
prefix = config.get("COMMAND_PREFIX")
channel = Channel.current()
@channel.use(
ListenerSchema(
listening_events=[FriendMessage, GroupMessage],
inline_dispatchers=[Twilight(
FullMatch(prefix),
FullMatch("我的总数据")
)],
)
)
async def user_data_total(app: Ariadne, source: Source, sender: Union[Friend, Group], member: Optional[Member]):
if isinstance(sender, Group) and await redis.exists_disable_command("DenyUserDataTotal", sender.id):
await app.send_message(sender, MessageChain("此命令已被禁用~"), quote=source)
return
datasource: DataSource = app.options["StarBotDataSource"]
ups = datasource.get_ups_by_target(sender.id, PushType.Group if isinstance(sender, Group) else PushType.Friend)
if not ups:
return
if isinstance(sender, Friend):
qq = sender.id
else:
qq = member.id
uid = await redis.get_bind_uid(qq)
if not uid:
await app.send_message(
sender, MessageChain(f"请先使用\"{prefix}绑定 [UID]\"命令绑定B站UID后再查询~\n命令示例:{prefix}绑定 114514")
)
return
uname, face = await get_unames_and_faces_by_uids([str(uid)])
uname = uname[0]
face = face[0]
if not uname:
await app.send_message(
sender, MessageChain(f"当前绑定的UID不正确\n请重新使用\"{prefix}绑定 [UID]\"命令绑定B站UID后再查询~")
)
return
for up in ups:
width = 1000
height = 100000
face_size = 100
generator = PicGenerator(width, height)
pic = generator.set_pos(175, 80).draw_rounded_rectangle(0, 0, width, height, 35, Color.WHITE).copy_bottom(35)
pic.draw_img_alpha(mask_round(face.resize((face_size, face_size)).convert("RGBA")), (50, 50))
pic.draw_section(f"{uname} 的总数据").set_pos(50, 150 + pic.row_space)
pic.draw_tip(f"数据自主播注册之日起开始累计")
pic.draw_tip(f"查询时间: {timestamp_format(int(time.time()), '%Y/%m/%d %H:%M:%S')}")
pic.draw_text("")
user_danmu_count = await redis.get_user_danmu_all(up.room_id, uid)
user_box_count = await redis.get_user_box_all(up.room_id, uid)
user_box_profit = await redis.get_user_box_profit_all(up.room_id, uid)
user_gift_profit = await redis.get_user_gift_all(up.room_id, uid)
user_sc_profit = await redis.get_user_sc_all(up.room_id, uid)
user_captain_count = await redis.get_user_captain_all(up.room_id, uid)
user_commander_count = await redis.get_user_commander_all(up.room_id, uid)
user_governor_count = await redis.get_user_governor_all(up.room_id, uid)
if not any([user_danmu_count, user_box_count, user_gift_profit, user_sc_profit,
user_captain_count, user_commander_count, user_governor_count]):
pic.draw_text_multiline(50, f"未查询到 {uname}{up.uname} 房间的数据")
pic.draw_text("请先在直播间中互动后再来查询")
else:
pic.draw_text_multiline(50, f"{uname}{up.uname} 房间的总数据:")
if user_danmu_count:
pic.draw_text("")
user_danmu_counts = [x[1] for x in await redis.range_user_danmu_all(up.room_id)]
room_danmu_count = await redis.get_room_danmu_all(up.room_id)
ratio = get_ratio(user_danmu_count, room_danmu_count)
rank, total, diff = get_parallel_ranking(user_danmu_count, user_danmu_counts)
pic.draw_text(
["发送弹幕总数: ", str(user_danmu_count), " 条 排名: ", f"{rank}/{total}"],
[Color.BLACK, Color.LINK, Color.BLACK, Color.LINK]
)
pic.draw_text(["占据了弹幕总数的 ", ratio], [Color.BLACK, Color.LINK])
if diff is not None:
pic.draw_text(["距离上一名还需: ", str(diff), ""], [Color.BLACK, Color.LINK, Color.BLACK])
if user_box_count:
pic.draw_text("")
user_box_counts = [x[1] for x in await redis.range_user_box_all(up.room_id)]
room_box_count = await redis.get_room_box_all(up.room_id)
ratio = get_ratio(user_box_count, room_box_count)
rank, total, diff = get_parallel_ranking(user_box_count, user_box_counts)
pic.draw_text(
["开启盲盒总数: ", str(user_box_count), " 个 排名: ", f"{rank}/{total}"],
[Color.BLACK, Color.LINK, Color.BLACK, Color.LINK]
)
pic.draw_text(["占据了盲盒总数的 ", ratio], [Color.BLACK, Color.LINK])
if diff is not None:
pic.draw_text(["距离上一名还需: ", str(diff), ""], [Color.BLACK, Color.LINK, Color.BLACK])
pic.draw_text("")
user_box_profits = [x[1] for x in await redis.range_user_box_profit_all(up.room_id)]
room_box_profit = await redis.get_room_box_profit_all(up.room_id)
rank, total, diff = get_parallel_ranking(user_box_profit, user_box_profits)
color = Color.RED if user_box_profit > 0 else (Color.GREEN if user_box_profit < 0 else Color.GRAY)
room_color = Color.RED if room_box_profit > 0 else (Color.GREEN if room_box_profit < 0 else Color.GRAY)
pic.draw_text(
["盲盒总盈亏: ", str(user_box_profit), " 元 排名: ", f"{rank}/{total}"],
[Color.BLACK, color, Color.BLACK, Color.LINK]
)
pic.draw_text(["直播间盲盒总盈亏: ", str(room_box_profit), ""], [Color.BLACK, room_color, Color.BLACK])
if diff is not None:
pic.draw_text(["距离上一名还需: ", str(diff), ""], [Color.BLACK, Color.LINK, Color.BLACK])
if user_gift_profit:
pic.draw_text("")
user_gift_profits = [x[1] for x in await redis.range_user_gift_all(up.room_id)]
room_gift_profit = await redis.get_room_gift_all(up.room_id)
ratio = get_ratio(user_gift_profit, room_gift_profit)
rank, total, diff = get_parallel_ranking(user_gift_profit, user_gift_profits)
pic.draw_text(
["送出礼物总价值: ", str(user_gift_profit), " 元 排名: ", f"{rank}/{total}"],
[Color.BLACK, Color.LINK, Color.BLACK, Color.LINK]
)
pic.draw_text(["占据了礼物总价值的 ", ratio], [Color.BLACK, Color.LINK])
if diff is not None:
pic.draw_text(["距离上一名还需: ", str(diff), ""], [Color.BLACK, Color.LINK, Color.BLACK])
if user_sc_profit:
pic.draw_text("")
user_sc_profits = [x[1] for x in await redis.range_user_sc_all(up.room_id)]
room_sc_profit = await redis.get_room_sc_all(up.room_id)
ratio = get_ratio(user_sc_profit, room_sc_profit)
rank, total, diff = get_parallel_ranking(user_sc_profit, user_sc_profits)
pic.draw_text(
["发送 SC (醒目留言) 总价值: ", str(user_sc_profit), " 元 排名: ", f"{rank}/{total}"],
[Color.BLACK, Color.LINK, Color.BLACK, Color.LINK]
)
pic.draw_text(["占据了 SC (醒目留言) 总价值的 ", ratio], [Color.BLACK, Color.LINK])
if diff is not None:
pic.draw_text(["距离上一名还需: ", str(diff), ""], [Color.BLACK, Color.LINK, Color.BLACK])
if any([user_captain_count, user_commander_count, user_governor_count]):
pic.draw_text("")
if user_captain_count:
pic.draw_text(f"开通舰长: {user_captain_count}", Color.DEEPSKYBLUE)
if user_commander_count:
pic.draw_text(f"开通提督: {user_commander_count}", Color.FUCHSIA)
if user_governor_count:
pic.draw_text(f"开通总督: {user_governor_count}", Color.CRIMSON)
# 底部版权信息,请务必保留此处
pic.draw_text("")
pic.draw_text_right(25, "Designed By StarBot", Color.GRAY)
pic.draw_text_right(25, "https://github.com/Starlwr/StarBot", Color.LINK)
pic.crop_and_paste_bottom()
await app.send_message(sender, MessageChain(Image(base64=pic.base64())))

View File

@ -1,9 +1,11 @@
import asyncio
import os
import sys
from creart import create
from graia.ariadne import Ariadne
from graia.broadcast import Broadcast
from graia.saya import Saya
from loguru import logger
from .datasource import DataSource
@ -41,6 +43,7 @@ class StarBot:
datasource: 推送配置数据源
"""
self.__datasource = datasource
Ariadne.options["StarBotDataSource"] = datasource
async def __main(self):
"""
@ -105,6 +108,19 @@ class StarBot:
if config.get("USE_HTTP_API"):
asyncio.get_event_loop().create_task(http_init(self.__datasource))
# 载入命令
logger.info("开始载入命令模块")
commands_path = os.path.dirname(os.path.dirname(__file__)) + "\\commands"
saya = create(Saya)
with saya.module_context():
for root, dirs, files in os.walk(commands_path, topdown=False):
for name in files:
if name.endswith(".py"):
name = name[:-3]
saya.require(f"starbot.commands.{name}")
logger.success(f"{name} 命令模块载入成功")
# 启动消息推送模块
if not self.__datasource.bots:
logger.error("不存在需要启动的 Bot 账号, 请先在数据源中配置完毕后再重新运行")
@ -141,6 +157,7 @@ class StarBot:
logger.add(sys.stderr, format=logger_format, level="INFO")
logger.disable("graia.ariadne.model")
logger.disable("graia.ariadne.service")
logger.disable("graia.saya")
logger.disable("launart")
bcc = create(Broadcast)

View File

@ -7,7 +7,7 @@ import pymysql
from loguru import logger
from pydantic import ValidationError
from .model import LiveOn, LiveOff, LiveReport, DynamicUpdate, PushTarget
from .model import LiveOn, LiveOff, LiveReport, DynamicUpdate, PushTarget, PushType
from .room import Up
from .sender import Bot
from ..exception.DataSourceException import DataSourceException
@ -91,6 +91,27 @@ class DataSource(metaclass=abc.ABCMeta):
raise DataSourceException(f"不存在的 UID: {uid}")
return up
def get_ups_by_target(self, target_id: int, target_type: PushType) -> List[Up]:
"""
根据推送目标号码和推送目标类型获取 Up 实例列表
Args:
target_id: 需要获取 Up 的推送目标号码
target_type: 需要获取 Up 的推送目标类型
Returns:
Up 实例列表
"""
ups = []
for up in self.__up_list:
for target in up.targets:
if target_id == target.id and target_type == target.type:
ups.append(up)
break
return ups
def get_target_by_key(self, key: str) -> PushTarget:
"""
根据推送 key 获取 PushTarget 实例用于 HTTP API 推送

View File

@ -201,6 +201,7 @@ class Up(BaseModel):
"{url}": f"https://live.bilibili.com/{self.room_id}",
"{cover}": "".join(["{urlpic=", arg_base["cover"], "}"])
}
await self.__bot.send_live_on_at(self)
self.__bot.send_live_on(self, args)
@self.__room.on("PREPARING")
@ -370,6 +371,7 @@ class Up(BaseModel):
"{url}": url_map.get(dynamic_type, f"https://t.bilibili.com/{dynamic_id}"),
"{picture}": "".join(["{base64pic=", base64str, "}"])
}
await self.__bot.send_dynamic_at(self)
self.__bot.send_dynamic_update(self, dynamic_update_args)
async def __generate_live_report_param(self):
@ -472,7 +474,7 @@ class Up(BaseModel):
# 弹幕排行
if self.__any_live_report_item_enabled("danmu_ranking"):
ranking_count = max(map(lambda t: t.live_report.danmu_ranking, self.targets))
danmu_ranking = await redis.get_user_danmu_count(self.room_id, 0, ranking_count - 1)
danmu_ranking = await redis.rev_range_user_danmu_count(self.room_id, 0, ranking_count - 1)
if danmu_ranking:
uids = [x[0] for x in danmu_ranking]
@ -488,7 +490,7 @@ class Up(BaseModel):
# 盲盒数量排行
if self.__any_live_report_item_enabled("box_ranking"):
ranking_count = max(map(lambda t: t.live_report.box_ranking, self.targets))
box_ranking = await redis.get_user_box_count(self.room_id, 0, ranking_count - 1)
box_ranking = await redis.rev_range_user_box_count(self.room_id, 0, ranking_count - 1)
if box_ranking:
uids = [x[0] for x in box_ranking]
@ -504,7 +506,7 @@ class Up(BaseModel):
# 盲盒盈亏排行
if self.__any_live_report_item_enabled("box_profit_ranking"):
ranking_count = max(map(lambda t: t.live_report.box_profit_ranking, self.targets))
box_profit_ranking = await redis.get_user_box_profit(self.room_id, 0, ranking_count - 1)
box_profit_ranking = await redis.rev_range_user_box_profit(self.room_id, 0, ranking_count - 1)
if box_profit_ranking:
uids = [x[0] for x in box_profit_ranking]
@ -520,7 +522,7 @@ class Up(BaseModel):
# 礼物排行
if self.__any_live_report_item_enabled("gift_ranking"):
ranking_count = max(map(lambda t: t.live_report.gift_ranking, self.targets))
gift_ranking = await redis.get_user_gift_profit(self.room_id, 0, ranking_count - 1)
gift_ranking = await redis.rev_range_user_gift_profit(self.room_id, 0, ranking_count - 1)
if gift_ranking:
uids = [x[0] for x in gift_ranking]
@ -536,7 +538,7 @@ class Up(BaseModel):
# SC醒目留言排行
if self.__any_live_report_item_enabled("sc_ranking"):
ranking_count = max(map(lambda t: t.live_report.sc_ranking, self.targets))
sc_ranking = await redis.get_user_sc_profit(self.room_id, 0, ranking_count - 1)
sc_ranking = await redis.rev_range_user_sc_profit(self.room_id, 0, ranking_count - 1)
if sc_ranking:
uids = [x[0] for x in sc_ranking]
@ -551,9 +553,9 @@ class Up(BaseModel):
# 开通大航海观众列表
if self.__any_live_report_item_enabled("guard_list"):
captains = await redis.get_user_captain_count(self.room_id)
commanders = await redis.get_user_commander_count(self.room_id)
governors = await redis.get_user_governor_count(self.room_id)
captains = await redis.rev_range_user_captain_count(self.room_id)
commanders = await redis.rev_range_user_commander_count(self.room_id)
governors = await redis.rev_range_user_governor_count(self.room_id)
if captains:
uids = [x[0] for x in captains]

View File

@ -13,7 +13,7 @@ from pydantic import BaseModel, PrivateAttr
from .model import LiveOn, LiveOff, DynamicUpdate, Message, PushType, PushTarget
from .room import Up
from ..painter.LiveReportGenerator import LiveReportGenerator
from ..utils import config
from ..utils import config, redis
from ..utils.AsyncEvent import AsyncEvent
@ -121,6 +121,15 @@ class Bot(BaseModel, AsyncEvent):
# 过滤已不在群内的群成员的 @ 消息
member_list = [member.id for member in await self.__bot.get_member_list(message.id)]
elements = [e for e in chain if (not isinstance(e, At)) or (e.target in member_list)]
# 移除开播 @ 列表和动态 @ 列表中的元素
filtered = [e for e in chain if (isinstance(e, At)) and (e.target not in member_list)]
for at in filtered:
if await redis.exists_live_on_at(message.id, at.target):
await redis.delete_live_on_at(message.id, at.target)
if await redis.exists_dynamic_at(message.id, at.target):
await redis.delete_dynamic_at(message.id, at.target)
chain = MessageChain(elements)
if len(chain) != 0:
@ -179,6 +188,21 @@ class Bot(BaseModel, AsyncEvent):
"""
self.__send_push_message(up, lambda t: t.live_on, args)
async def send_live_on_at(self, up: Up):
"""
发送开播 @ 我列表中的 @ 消息
Args:
up: 要发送的 UP 主实例
"""
if not isinstance(up, Up):
return
for target in filter(lambda t: t.type == PushType.Group, up.targets):
if target.live_on.enabled:
ats = " ".join(["{at" + str(x) + "}" for x in await redis.range_live_on_at(target.id)])
self.send_message(Message(id=target.id, content=ats, type=target.type))
def send_live_off(self, up: Up, args: Dict[str, Any]):
"""
发送下播消息至 UP 主下启用下播推送的推送目标
@ -211,6 +235,21 @@ class Bot(BaseModel, AsyncEvent):
"""
self.__send_push_message(up, lambda t: t.dynamic_update, args)
async def send_dynamic_at(self, up: Up):
"""
发送动态 @ 我列表中的 @ 消息
Args:
up: 要发送的 UP 主实例
"""
if not isinstance(up, Up):
return
for target in filter(lambda t: t.type == PushType.Group, up.targets):
if target.dynamic_update.enabled:
ats = " ".join(["{at" + str(x) + "}" for x in await redis.range_dynamic_at(target.id)])
self.send_message(Message(id=target.id, content=ats, type=target.type))
def __eq__(self, other):
if isinstance(other, Bot):
return self.qq == other.qq

View File

@ -506,6 +506,62 @@ class PicGenerator:
"""
return int(self.__draw.textlength(s, self.__text_font))
def draw_text_multiline(self,
margin: int,
texts: Union[str, List[str]],
colors: Optional[
Union[Color, Tuple[int, int, int], List[Union[Color, Tuple[int, int, int]]]]
] = None,
xy: Optional[Tuple[int, int]] = None):
"""
在当前绘图坐标绘制多行文本文本到达边界处会自动换行绘制结束后会自动移动绘图坐标至下次绘图适合位置
也可手动传入绘图坐标手动传入时不会移动绘图坐标
传入文本列表和颜色列表可将多行文本绘制为不同颜色文本列表和颜色列表需一一对应
颜色列表少于文本列表时将使用默认黑色 (0, 0, 0)颜色列表多于文本列表时将舍弃多余颜色
Args:
margin: 外边距
texts: 文本内容
colors: 字体颜色默认黑色 (0, 0, 0)
xy: 绘图坐标默认自适应绘图坐标
"""
if colors is None:
colors = []
if isinstance(texts, str):
texts = [texts]
if isinstance(colors, (Color, tuple)):
colors = [colors]
for i in range(len(texts) - len(colors)):
colors.append(Color.BLACK)
for i in range(len(colors)):
if isinstance(colors[i], Color):
colors[i] = colors[i].value
if xy is None:
x = self.x
for i in range(len(texts)):
for c in texts[i]:
length = int(self.__draw.textlength(c, self.__text_font))
if self.x + length > self.width - margin:
self.move_pos(x - self.x, self.__text_font.size + self.__ROW_SPACE)
self.__draw.text(self.__xy, c, colors[i], self.__text_font)
self.move_pos(int(self.__draw.textlength(c, self.__text_font)), 0)
self.move_pos(x - self.x, self.__text_font.size + self.__ROW_SPACE)
else:
x = xy[0]
for i in range(len(texts)):
for c in texts[i]:
length = int(self.__draw.textlength(c, self.__text_font))
if xy[0] + length > self.width - margin:
xy = x, xy[1] + self.__text_font.size + self.__ROW_SPACE
self.__draw.text(xy, c, colors[i], self.__text_font)
xy = xy[0] + self.__draw.textlength(c, self.__text_font), xy[1]
return self
def show(self):
"""
显示图片
@ -533,4 +589,4 @@ class PicGenerator:
io = BytesIO()
self.__canvas.save(io, format="PNG")
return base64.b64encode(io.getvalue()).decode()
return base64.b64encode(io.getvalue()).decode()

View File

@ -84,6 +84,10 @@ SIMPLE_CONFIG = {
# 命令触发前缀
"COMMAND_PREFIX": "",
# 每个群开播 @ 我命令人数上限,单次 @ 人数过多容易被风控,不推荐修改
"COMMAND_LIVE_ON_AT_ME_LIMIT": 20,
# 每个群动态 @ 我命令人数上限,单次 @ 人数过多容易被风控,不推荐修改
"COMMAND_DYNAMIC_AT_ME_LIMIT": 20,
# 是否启用风控消息补发
"BAN_RESEND": False,
@ -179,6 +183,10 @@ FULL_CONFIG = {
# 命令触发前缀
"COMMAND_PREFIX": "",
# 每个群开播 @ 我命令人数上限,单次 @ 人数过多容易被风控,不推荐修改
"COMMAND_LIVE_ON_AT_ME_LIMIT": 20,
# 每个群动态 @ 我命令人数上限,单次 @ 人数过多容易被风控,不推荐修改
"COMMAND_DYNAMIC_AT_ME_LIMIT": 20,
# 是否启用风控消息补发
"BAN_RESEND": True,

View File

@ -1,4 +1,4 @@
from typing import Any, Optional, Union, Tuple, List
from typing import Any, Optional, Union, Tuple, List, Set
import aioredis
from loguru import logger
@ -37,7 +37,7 @@ async def lrange(key: str, start: int, end: int) -> List[str]:
return [x.decode() for x in await __redis.lrange(key, start, end)]
async def lrangei(key: str, start: int, end: int) -> List[float]:
async def lrangei(key: str, start: int, end: int) -> List[int]:
return [int(x) for x in await __redis.lrange(key, start, end)]
@ -91,6 +91,28 @@ async def hincrbyfloat(key: str, hkey: Union[str, int], value: Optional[float] =
return await __redis.hincrbyfloat(key, hkey, value)
# Set
async def scard(key: str) -> int:
return await __redis.scard(key)
async def sismember(key: str, member: Union[str, int]) -> bool:
return await __redis.sismember(key, member)
async def smembers(key: str) -> Set[int]:
return {int(x) for x in await __redis.smembers(key)}
async def sadd(key: str, member: Union[str, int]):
await __redis.sadd(key, member)
async def srem(key: str, member: Union[str, int]):
await __redis.srem(key, member)
# Zset
async def zcard(key: str) -> int:
@ -104,6 +126,28 @@ async def zrank(key: str, member: str) -> int:
return rank
async def zscore(key: str, member: Union[str, int]) -> float:
score = await __redis.zscore(key, member)
if score is None:
return 0.0
return score
async def zrange(key: str, start: int, end: int) -> List[str]:
return [x.decode() for x in await __redis.zrange(key, start, end)]
async def zrangewithscoresi(key: str, start: int, end: int) -> List[Tuple[str, int]]:
return [(x[0].decode(), int(x[1])) for x in await __redis.zrange(key, start, end, withscores=True)]
async def zrangewithscoresf1(key: str, start: int, end: int) -> List[Tuple[str, float]]:
return [
(x[0].decode(), float("{:.1f}".format(float(x[1]))))
for x in await __redis.zrange(key, start, end, withscores=True)
]
async def zrevrangewithscoresi(key: str, start: int, end: int) -> List[Tuple[str, int]]:
return [(x[0].decode(), int(x[1])) for x in await __redis.zrevrange(key, start, end, True)]
@ -219,17 +263,35 @@ async def reset_room_danmu_count(room_id: int):
# 房间累计弹幕数量
async def get_room_danmu_total(room_id: int) -> int:
return await hgeti("RoomDanmuTotal", room_id)
async def accumulate_room_danmu_total(room_id: int) -> int:
return await hincrby("RoomDanmuTotal", room_id, await get_room_danmu_count(room_id))
# 房间总弹幕数量
async def get_room_danmu_all(room_id: int) -> int:
return await get_room_danmu_count(room_id) + await get_room_danmu_total(room_id)
# 用户弹幕数量
async def get_user_danmu_count(room_id: int, uid: int) -> int:
return int(await zscore(f"UserDanmuCount:{room_id}", uid))
async def len_user_danmu_count(room_id: int) -> int:
return await zcard(f"UserDanmuCount:{room_id}")
async def get_user_danmu_count(room_id: int, start: int = 0, end: int = -1) -> List[Tuple[str, int]]:
async def range_user_danmu_count(room_id: int, start: int = 0, end: int = -1) -> List[Tuple[str, int]]:
return await zrangewithscoresi(f"UserDanmuCount:{room_id}", start, end)
async def rev_range_user_danmu_count(room_id: int, start: int = 0, end: int = -1) -> List[Tuple[str, int]]:
return await zrevrangewithscoresi(f"UserDanmuCount:{room_id}", start, end)
@ -243,10 +305,34 @@ async def delete_user_danmu_count(room_id: int):
# 用户累计弹幕数量
async def get_user_danmu_total(room_id: int, uid: int) -> int:
return int(await zscore(f"UserDanmuTotal:{room_id}", uid))
async def accumulate_user_danmu_total(room_id: int):
await zunionstore(f"UserDanmuTotal:{room_id}", f"UserDanmuCount:{room_id}")
# 用户总弹幕数量
async def len_user_danmu_all(room_id: int) -> int:
return len(
set(await zrange(f"UserDanmuCount:{room_id}", 0, -1))
.union(set(await zrange(f"UserDanmuTotal:{room_id}", 0, -1)))
)
async def get_user_danmu_all(room_id: int, uid: int) -> int:
return await get_user_danmu_count(room_id, uid) + await get_user_danmu_total(room_id, uid)
async def range_user_danmu_all(room_id: int, start: int = 0, end: int = -1) -> List[Tuple[str, int]]:
await zunionstore(f"TempDanmuTotal:{room_id}", [f"UserDanmuCount:{room_id}", f"UserDanmuTotal:{room_id}"])
result = await zrangewithscoresi(f"TempDanmuTotal:{room_id}", start, end)
await delete(f"TempDanmuTotal:{room_id}")
return result
# 房间弹幕记录
async def get_room_danmu(room_id: int) -> List[str]:
@ -291,17 +377,35 @@ async def reset_room_box_count(room_id: int):
# 房间累计盲盒数量
async def get_room_box_total(room_id: int) -> int:
return await hgeti("RoomBoxTotal", room_id)
async def accumulate_room_box_total(room_id: int) -> int:
return await hincrby("RoomBoxTotal", room_id, await get_room_box_count(room_id))
# 房间总盲盒数量
async def get_room_box_all(room_id: int) -> int:
return await get_room_box_count(room_id) + await get_room_box_total(room_id)
# 用户盲盒数量
async def get_user_box_count(room_id: int, uid: int) -> int:
return int(await zscore(f"UserBoxCount:{room_id}", uid))
async def len_user_box_count(room_id: int) -> int:
return await zcard(f"UserBoxCount:{room_id}")
async def get_user_box_count(room_id: int, start: int = 0, end: int = -1) -> List[Tuple[str, int]]:
async def range_user_box_count(room_id: int, start: int = 0, end: int = -1) -> List[Tuple[str, int]]:
return await zrangewithscoresi(f"UserBoxCount:{room_id}", start, end)
async def rev_range_user_box_count(room_id: int, start: int = 0, end: int = -1) -> List[Tuple[str, int]]:
return await zrevrangewithscoresi(f"UserBoxCount:{room_id}", start, end)
@ -315,10 +419,34 @@ async def delete_user_box_count(room_id: int):
# 用户累计盲盒数量
async def get_user_box_total(room_id: int, uid: int) -> int:
return int(await zscore(f"UserBoxTotal:{room_id}", uid))
async def accumulate_user_box_total(room_id: int):
await zunionstore(f"UserBoxTotal:{room_id}", f"UserBoxCount:{room_id}")
# 用户总盲盒数量
async def len_user_box_all(room_id: int) -> int:
return len(
set(await zrange(f"UserBoxCount:{room_id}", 0, -1))
.union(set(await zrange(f"UserBoxTotal:{room_id}", 0, -1)))
)
async def get_user_box_all(room_id: int, uid: int) -> int:
return await get_user_box_count(room_id, uid) + await get_user_box_total(room_id, uid)
async def range_user_box_all(room_id: int, start: int = 0, end: int = -1) -> List[Tuple[str, int]]:
await zunionstore(f"TempBoxTotal:{room_id}", [f"UserBoxCount:{room_id}", f"UserBoxTotal:{room_id}"])
result = await zrangewithscoresi(f"TempBoxTotal:{room_id}", start, end)
await delete(f"TempBoxTotal:{room_id}")
return result
# 房间盲盒盈亏
async def get_room_box_profit(room_id: int) -> float:
@ -335,13 +463,31 @@ async def reset_room_box_profit(room_id: int):
# 房间累计盲盒盈亏
async def get_room_box_profit_total(room_id: int) -> float:
return await hgetf1("RoomBoxProfitTotal", room_id)
async def accumulate_room_box_profit_total(room_id: int) -> float:
return await hincrbyfloat("RoomBoxProfitTotal", room_id, await get_room_box_profit(room_id))
# 房间总盲盒盈亏
async def get_room_box_profit_all(room_id: int) -> float:
return float("{:.1f}".format(await get_room_box_profit(room_id) + await get_room_box_profit_total(room_id)))
# 用户盲盒盈亏
async def get_user_box_profit(room_id: int, start: int = 0, end: int = -1) -> List[Tuple[str, float]]:
async def get_user_box_profit(room_id: int, uid: int) -> float:
return float("{:.1f}".format(await zscore(f"UserBoxProfit:{room_id}", uid)))
async def range_user_box_profit(room_id: int, start: int = 0, end: int = -1) -> List[Tuple[str, float]]:
return await zrangewithscoresf1(f"UserBoxProfit:{room_id}", start, end)
async def rev_range_user_box_profit(room_id: int, start: int = 0, end: int = -1) -> List[Tuple[str, float]]:
return await zrevrangewithscoresf1(f"UserBoxProfit:{room_id}", start, end)
@ -355,10 +501,29 @@ async def delete_user_box_profit(room_id: int):
# 用户累计盲盒盈亏
async def get_user_box_profit_total(room_id: int, uid: int) -> float:
return float("{:.1f}".format(await zscore(f"UserBoxProfitTotal:{room_id}", uid)))
async def accumulate_user_box_profit_total(room_id: int):
await zunionstore(f"UserBoxProfitTotal:{room_id}", f"UserBoxProfit:{room_id}")
# 用户总盲盒盈亏
async def get_user_box_profit_all(room_id: int, uid: int) -> float:
return float("{:.1f}".format(
await get_user_box_profit(room_id, uid) + await get_user_box_profit_total(room_id, uid)
))
async def range_user_box_profit_all(room_id: int, start: int = 0, end: int = -1) -> List[Tuple[str, float]]:
await zunionstore(f"TempBoxProfitTotal:{room_id}", [f"UserBoxProfit:{room_id}", f"UserBoxProfitTotal:{room_id}"])
result = await zrangewithscoresf1(f"TempBoxProfitTotal:{room_id}", start, end)
await delete(f"TempBoxProfitTotal:{room_id}")
return result
# 房间盲盒盈亏记录,用于绘制直播报告中盲盒盈亏曲线图
async def get_room_box_profit_record(room_id: int) -> List[float]:
@ -417,17 +582,35 @@ async def reset_room_gift_profit(room_id: int):
# 房间累计礼物价值
async def get_room_gift_total(room_id: int) -> float:
return await hgetf1("RoomGiftTotal", room_id)
async def accumulate_room_gift_total(room_id: int) -> float:
return await hincrbyfloat("RoomGiftTotal", room_id, await get_room_gift_profit(room_id))
# 房间总礼物价值
async def get_room_gift_all(room_id: int) -> float:
return float("{:.1f}".format(await get_room_gift_profit(room_id) + await get_room_gift_total(room_id)))
# 用户礼物价值
async def get_user_gift_profit(room_id: int, uid: int) -> float:
return float("{:.1f}".format(await zscore(f"UserGiftProfit:{room_id}", uid)))
async def len_user_gift_profit(room_id: int) -> int:
return await zcard(f"UserGiftProfit:{room_id}")
async def get_user_gift_profit(room_id: int, start: int = 0, end: int = -1) -> List[Tuple[str, float]]:
async def range_user_gift_profit(room_id: int, start: int = 0, end: int = -1) -> List[Tuple[str, float]]:
return await zrangewithscoresf1(f"UserGiftProfit:{room_id}", start, end)
async def rev_range_user_gift_profit(room_id: int, start: int = 0, end: int = -1) -> List[Tuple[str, float]]:
return await zrevrangewithscoresf1(f"UserGiftProfit:{room_id}", start, end)
@ -441,10 +624,36 @@ async def delete_user_gift_profit(room_id: int):
# 用户累计礼物价值
async def get_user_gift_total(room_id: int, uid: int) -> float:
return float("{:.1f}".format(await zscore(f"UserGiftTotal:{room_id}", uid)))
async def accumulate_user_gift_total(room_id: int):
await zunionstore(f"UserGiftTotal:{room_id}", f"UserGiftProfit:{room_id}")
# 用户总礼物价值
async def len_user_gift_all(room_id: int) -> int:
return len(
set(await zrange(f"UserGiftProfit:{room_id}", 0, -1))
.union(set(await zrange(f"UserGiftTotal:{room_id}", 0, -1)))
)
async def get_user_gift_all(room_id: int, uid: int) -> float:
return float("{:.1f}".format(
await get_user_gift_profit(room_id, uid) + await get_user_gift_total(room_id, uid)
))
async def range_user_gift_all(room_id: int, start: int = 0, end: int = -1) -> List[Tuple[str, float]]:
await zunionstore(f"TempGiftTotal:{room_id}", [f"UserGiftProfit:{room_id}", f"UserGiftTotal:{room_id}"])
result = await zrangewithscoresf1(f"TempGiftTotal:{room_id}", start, end)
await delete(f"TempGiftTotal:{room_id}")
return result
# 房间礼物时间分布
async def get_room_gift_time(room_id: int) -> List[Tuple[str, float]]:
@ -475,17 +684,35 @@ async def reset_room_sc_profit(room_id: int):
# 房间累计 SC 价值
async def get_room_sc_total(room_id: int) -> int:
return await hgeti("RoomScTotal", room_id)
async def accumulate_room_sc_total(room_id: int) -> int:
return await hincrby("RoomScTotal", room_id, await get_room_sc_profit(room_id))
# 房间总 SC 价值
async def get_room_sc_all(room_id: int) -> int:
return await get_room_sc_profit(room_id) + await get_room_sc_total(room_id)
# 用户 SC 价值
async def get_user_sc_profit(room_id: int, uid: int) -> int:
return int(await zscore(f"UserScProfit:{room_id}", uid))
async def len_user_sc_profit(room_id: int) -> int:
return await zcard(f"UserScProfit:{room_id}")
async def get_user_sc_profit(room_id: int, start: int = 0, end: int = -1) -> List[Tuple[str, int]]:
async def range_user_sc_profit(room_id: int, start: int = 0, end: int = -1) -> List[Tuple[str, int]]:
return await zrangewithscoresi(f"UserScProfit:{room_id}", start, end)
async def rev_range_user_sc_profit(room_id: int, start: int = 0, end: int = -1) -> List[Tuple[str, int]]:
return await zrevrangewithscoresi(f"UserScProfit:{room_id}", start, end)
@ -499,10 +726,34 @@ async def delete_user_sc_profit(room_id: int):
# 用户累计 SC 价值
async def get_user_sc_total(room_id: int, uid: int) -> int:
return int(await zscore(f"UserScTotal:{room_id}", uid))
async def accumulate_user_sc_total(room_id: int):
await zunionstore(f"UserScTotal:{room_id}", f"UserScProfit:{room_id}")
# 用户总 SC 价值
async def len_user_sc_all(room_id: int) -> int:
return len(
set(await zrange(f"UserScProfit:{room_id}", 0, -1))
.union(set(await zrange(f"UserScTotal:{room_id}", 0, -1)))
)
async def get_user_sc_all(room_id: int, uid: int) -> int:
return await get_user_sc_profit(room_id, uid) + await get_user_sc_total(room_id, uid)
async def range_user_sc_all(room_id: int, start: int = 0, end: int = -1) -> List[Tuple[str, int]]:
await zunionstore(f"TempScTotal:{room_id}", [f"UserScProfit:{room_id}", f"UserScTotal:{room_id}"])
result = await zrangewithscoresi(f"TempScTotal:{room_id}", start, end)
await delete(f"TempScTotal:{room_id}")
return result
# 房间 SC 时间分布
async def get_room_sc_time(room_id: int) -> List[Tuple[str, int]]:
@ -543,23 +794,81 @@ async def reset_room_guard_count(room_id: int):
# 房间累计大航海数量
async def get_room_captain_total(room_id: int) -> int:
return await hgeti("RoomCaptainTotal", room_id)
async def get_room_commander_total(room_id: int) -> int:
return await hgeti("RoomCommanderTotal", room_id)
async def get_room_governor_total(room_id: int) -> int:
return await hgeti("RoomGovernorTotal", room_id)
async def accumulate_room_guard_total(room_id: int):
await hincrby("RoomCaptainTotal", room_id, await get_room_captain_count(room_id))
await hincrby("RoomCommanderTotal", room_id, await get_room_commander_count(room_id))
await hincrby("RoomGovernorTotal", room_id, await get_room_governor_count(room_id))
# 房间总大航海数量
async def get_room_captain_all(room_id: int) -> int:
return await get_room_captain_count(room_id) + await get_room_captain_total(room_id)
async def get_room_commander_all(room_id: int) -> int:
return await get_room_commander_count(room_id) + await get_room_commander_total(room_id)
async def get_room_governor_all(room_id: int) -> int:
return await get_room_governor_count(room_id) + await get_room_governor_total(room_id)
# 用户大航海数量
async def get_user_captain_count(room_id: int, start: int = 0, end: int = -1) -> List[Tuple[str, int]]:
async def get_user_captain_count(room_id: int, uid: int) -> int:
return int(await zscore(f"UserCaptainCount:{room_id}", uid))
async def get_user_commander_count(room_id: int, uid: int) -> int:
return int(await zscore(f"UserCommanderCount:{room_id}", uid))
async def get_user_governor_count(room_id: int, uid: int) -> int:
return int(await zscore(f"UserGovernorCount:{room_id}", uid))
async def len_user_captain_count(room_id: int) -> int:
return await zcard(f"UserCaptainCount:{room_id}")
async def len_user_commander_count(room_id: int) -> int:
return await zcard(f"UserCommanderCount:{room_id}")
async def len_user_governor_count(room_id: int) -> int:
return await zcard(f"UserGovernorCount:{room_id}")
async def len_user_guard_count(room_id: int) -> int:
return len(
set(await zrange(f"UserCaptainCount:{room_id}", 0, -1))
.union(set(await zrange(f"UserCommanderCount:{room_id}", 0, -1)))
.union(set(await zrange(f"UserGovernorCount:{room_id}", 0, -1)))
)
async def rev_range_user_captain_count(room_id: int, start: int = 0, end: int = -1) -> List[Tuple[str, int]]:
return await zrevrangewithscoresi(f"UserCaptainCount:{room_id}", start, end)
async def get_user_commander_count(room_id: int, start: int = 0, end: int = -1) -> List[Tuple[str, int]]:
async def rev_range_user_commander_count(room_id: int, start: int = 0, end: int = -1) -> List[Tuple[str, int]]:
return await zrevrangewithscoresi(f"UserCommanderCount:{room_id}", start, end)
async def get_user_governor_count(room_id: int, start: int = 0, end: int = -1) -> List[Tuple[str, int]]:
async def rev_range_user_governor_count(room_id: int, start: int = 0, end: int = -1) -> List[Tuple[str, int]]:
return await zrevrangewithscoresi(f"UserGovernorCount:{room_id}", start, end)
@ -575,12 +884,70 @@ async def delete_user_guard_count(room_id: int):
# 用户累计大航海数量
async def get_user_captain_total(room_id: int, uid: int) -> int:
return int(await zscore(f"UserCaptainTotal:{room_id}", uid))
async def get_user_commander_total(room_id: int, uid: int) -> int:
return int(await zscore(f"UserCommanderTotal:{room_id}", uid))
async def get_user_governor_total(room_id: int, uid: int) -> int:
return int(await zscore(f"UserGovernorTotal:{room_id}", uid))
async def accumulate_user_guard_total(room_id: int):
await zunionstore(f"UserCaptainTotal:{room_id}", f"UserCaptainCount:{room_id}")
await zunionstore(f"UserCommanderTotal:{room_id}", f"UserCommanderCount:{room_id}")
await zunionstore(f"UserGovernorTotal:{room_id}", f"UserGovernorCount:{room_id}")
# 用户总大航海数量
async def len_user_captain_all(room_id: int) -> int:
return len(
set(await zrange(f"UserCaptainCount:{room_id}", 0, -1))
.union(set(await zrange(f"UserCaptainTotal:{room_id}", 0, -1)))
)
async def len_user_commander_all(room_id: int) -> int:
return len(
set(await zrange(f"UserCommanderCount:{room_id}", 0, -1))
.union(set(await zrange(f"UserCommanderTotal:{room_id}", 0, -1)))
)
async def len_user_governor_all(room_id: int) -> int:
return len(
set(await zrange(f"UserGovernorCount:{room_id}", 0, -1))
.union(set(await zrange(f"UserGovernorTotal:{room_id}", 0, -1)))
)
async def len_user_guard_all(room_id: int) -> int:
return len(
set(await zrange(f"UserCaptainCount:{room_id}", 0, -1))
.union(set(await zrange(f"UserCaptainTotal:{room_id}", 0, -1)))
.union(set(await zrange(f"UserCommanderCount:{room_id}", 0, -1)))
.union(set(await zrange(f"UserCommanderTotal:{room_id}", 0, -1)))
.union(set(await zrange(f"UserGovernorCount:{room_id}", 0, -1)))
.union(set(await zrange(f"UserGovernorTotal:{room_id}", 0, -1)))
)
async def get_user_captain_all(room_id: int, uid: int) -> int:
return await get_user_captain_count(room_id, uid) + await get_user_captain_total(room_id, uid)
async def get_user_commander_all(room_id: int, uid: int) -> int:
return await get_user_commander_count(room_id, uid) + await get_user_commander_total(room_id, uid)
async def get_user_governor_all(room_id: int, uid: int) -> int:
return await get_user_governor_count(room_id, uid) + await get_user_governor_total(room_id, uid)
# 房间大航海时间分布
async def get_room_guard_time(room_id: int) -> List[Tuple[str, int]]:
@ -660,3 +1027,71 @@ async def reset_data(room_id: int):
# 重置大航海数
await reset_room_guard_count(room_id)
await delete_user_guard_count(room_id)
# 用户绑定
async def get_bind_uid(qq: int) -> int:
return await hgeti("BindUid", qq)
async def bind_uid(qq: int, uid: int):
await hset("BindUid", qq, uid)
# 开播 @ 我
async def len_live_on_at(_id: int) -> int:
return await scard(f"LiveOnAtMe:{_id}")
async def exists_live_on_at(_id: int, qq: int) -> bool:
return await sismember(f"LiveOnAtMe:{_id}", qq)
async def range_live_on_at(_id: int) -> Set[int]:
return await smembers(f"LiveOnAtMe:{_id}")
async def add_live_on_at(_id: int, qq: int):
await sadd(f"LiveOnAtMe:{_id}", qq)
async def delete_live_on_at(_id: int, qq: int):
await srem(f"LiveOnAtMe:{_id}", qq)
# 动态 @ 我
async def len_dynamic_at(_id: int) -> int:
return await scard(f"DynamicAtMe:{_id}")
async def exists_dynamic_at(_id: int, qq: int) -> bool:
return await sismember(f"DynamicAtMe:{_id}", qq)
async def range_dynamic_at(_id: int) -> Set[int]:
return await smembers(f"DynamicAtMe:{_id}")
async def add_dynamic_at(_id: int, qq: int):
await sadd(f"DynamicAtMe:{_id}", qq)
async def delete_dynamic_at(_id: int, qq: int):
await srem(f"DynamicAtMe:{_id}", qq)
# 命令禁用
async def exists_disable_command(name: str, _id: int) -> bool:
return await sismember(name, _id)
async def add_disable_command(name: str, _id: int):
await sadd(name, _id)
async def delete_disable_command(name: str, _id: int):
await srem(name, _id)

View File

@ -2,17 +2,19 @@
通用工具库
"""
import asyncio
import bisect
import json
import os
import time
from io import BytesIO
from typing import Tuple, List, Dict, Sized, Optional, Any
from typing import Tuple, List, Dict, Sized, Optional, Any, Union
from PIL import Image, ImageDraw
from . import config
from .Credential import Credential
from .network import get_session, request
from ..exception import ResponseCodeException
def get_api(field: str) -> Dict:
@ -157,10 +159,71 @@ async def get_unames_and_faces_by_uids(uids: List[str]) -> Tuple[List[str], List
Returns:
昵称列表和头像图片列表组成的元组
"""
async def illegal_face():
face = Image.new("RGBA", (300, 300), (255, 255, 255, 255))
return face
user_info_url = f"https://api.vc.bilibili.com/account/v1/user/cards?uids={','.join(uids)}"
infos_list = await request("GET", user_info_url)
try:
infos_list = await request("GET", user_info_url)
except ResponseCodeException:
return [], []
infos = dict(zip([x["mid"] for x in infos_list], infos_list))
unames = [infos[int(uid)]["name"] for uid in uids]
download_face_tasks = [open_url_image(infos[int(uid)]["face"]) for uid in uids]
faces = await asyncio.gather(*download_face_tasks)
return (unames, faces)
unames = [infos[int(uid)]["name"] if int(uid) in infos else "" for uid in uids]
download_face_tasks = [
open_url_image(infos[int(uid)]["face"]) if int(uid) in infos else illegal_face() for uid in uids
]
faces = list(await asyncio.gather(*download_face_tasks, return_exceptions=True))
for i in range(len(faces)):
if isinstance(faces[i], Exception):
faces[i] = await illegal_face()
return unames, faces
def remove_command_param_placeholder(param: str) -> str:
"""
移除命令参数中括号占位符
Args:
param: 传入参数
Returns:
处理后的参数
"""
return param.replace("[", "").replace("]", "").replace("", "").replace("", "").replace("", "").replace("", "")
def get_parallel_ranking(score: Union[int, float],
scores: List[Union[int, float]]) -> Tuple[int, int, Optional[Union[int, float]]]:
"""
获取分数在分数列表中的排名存在并列情况优先取高名次
Args:
score: 分数
scores: 从小到大有序分数列表
Returns:
名次参与排名元素个数和距离上一名的差值组成的元组
"""
index = bisect.bisect_right(scores, score)
total = len(scores)
rank = total - index + 1
diff = scores[index] - score if index < total else None
if isinstance(diff,float):
diff = float("{:.1f}".format(diff))
return rank, total, diff
def get_ratio(count: Union[int, float], total: Union[int, float]) -> str:
"""
获取数量在总数量中所占比例
Args:
count: 数量
total: 总数量
Returns:
百分比字符串精确到两位小数
"""
return "{:.2f}".format(count / total * 100) + " %"