feat: Add ranking lists and guard list into live report

This commit is contained in:
LWR 2023-01-17 20:27:05 +08:00
parent 30205e73e9
commit c23f4e6c1f
9 changed files with 670 additions and 121 deletions

View File

@ -291,8 +291,10 @@ class MySQLDataSource(DataSource):
)
live_report = await self.__query(
"SELECT g.`uid`, g.`uname`, g.`room_id`, `key`, `type`, `num`, "
"`enabled`, `logo`, `time`, `fans_change`, `fans_medal_change`, `guard_change`, "
"`danmu`, `box`, `gift`, `sc`, `guard`, `danmu_cloud` "
"`enabled`, `logo`, `logo_base64`, `time`, `fans_change`, `fans_medal_change`, `guard_change`, "
"`danmu`, `box`, `gift`, `sc`, `guard`, "
"`danmu_ranking`, `box_ranking`, `box_profit_ranking`, `gift_ranking`, `sc_ranking`, "
"`guard_list`, `danmu_cloud` "
"FROM `groups` AS `g` LEFT JOIN `live_report` AS `l` "
"ON g.`uid` = l.`uid` AND g.`index` = l.`index` "
f"WHERE g.`uid` = {uid} "
@ -309,33 +311,19 @@ class MySQLDataSource(DataSource):
targets = []
for i, target in enumerate(live_on):
if all((live_on[i]["enabled"], live_on[i]["message"])):
on = LiveOn(enabled=live_on[i]["enabled"],
message=live_on[i]["message"])
on = LiveOn(**live_on[i])
else:
on = LiveOn()
if all((live_off[i]["enabled"], live_off[i]["message"])):
off = LiveOff(enabled=live_off[i]["enabled"],
message=live_off[i]["message"])
off = LiveOff(**live_off[i])
else:
off = LiveOff()
if live_report[i]["enabled"]:
report = LiveReport(enabled=live_report[i]["enabled"],
logo=live_report[i]["logo"],
time=live_report[i]["time"],
fans_change=live_report[i]["fans_change"],
fans_medal_change=live_report[i]["fans_medal_change"],
guard_change=live_report[i]["guard_change"],
danmu=live_report[i]["danmu"],
box=live_report[i]["box"],
gift=live_report[i]["gift"],
sc=live_report[i]["sc"],
guard=live_report[i]["guard"],
danmu_cloud=live_report[i]["danmu_cloud"])
report = LiveReport(**live_report[i])
else:
report = LiveReport()
if all((dynamic_update[i]["enabled"], dynamic_update[i]["message"])):
update = DynamicUpdate(enabled=dynamic_update[i]["enabled"],
message=dynamic_update[i]["message"])
update = DynamicUpdate(**dynamic_update[i])
else:
update = DynamicUpdate()

View File

@ -86,7 +86,10 @@ class LiveReport(BaseModel):
"""是否启用直播报告。默认False"""
logo: Optional[str] = None
"""主播立绘的 Base64 字符串会绘制在直播报告右上角合适位置。默认None"""
"""主播立绘的路径会绘制在直播报告右上角合适位置。默认None"""
logo_base64: Optional[str] = None
"""主播立绘的 Base64 字符串会绘制在直播报告右上角合适位置立绘路径不为空时优先使用路径。默认None"""
time: Optional[bool] = False
"""是否展示本场直播直播时间段和直播时长。默认False"""
@ -110,11 +113,29 @@ class LiveReport(BaseModel):
"""是否展示本场直播礼物收益、送礼物人数。默认False"""
sc: Optional[bool] = False
"""是否展示本场直播SC醒目留言收益、发送SC醒目留言人数。默认False"""
"""是否展示本场直播 SC醒目留言收益、发送 SC醒目留言人数。默认False"""
guard: Optional[bool] = False
"""是否展示本场直播开通大航海数。默认False"""
danmu_ranking = 0
"""展示本场直播弹幕排行榜的前多少名0 为不展示。默认0"""
box_ranking = 0
"""展示本场直播盲盒数量排行榜的前多少名0 为不展示。默认0"""
box_profit_ranking = 0
"""展示本场直播盲盒盈亏排行榜的前多少名0 为不展示。默认0"""
gift_ranking = 0
"""展示本场直播礼物排行榜的前多少名0 为不展示。默认0"""
sc_ranking = 0
"""展示本场直播 SC醒目留言排行榜的前多少名0 为不展示。默认0"""
guard_list = False
"""是否展示本场直播开通大航海观众列表。默认False"""
danmu_cloud: Optional[bool] = False
"""是否生成本场直播弹幕词云。默认False。默认False"""
@ -124,15 +145,23 @@ class LiveReport(BaseModel):
获取功能全部开启的默认 LiveReport 实例
默认配置启用直播报告生成弹幕词云
"""
return LiveReport(enabled=True, logo=None,
return LiveReport(enabled=True, logo=None, logo_base64=None,
time=True, fans_change=True, fans_medal_change=True,guard_change=True,
danmu=True, box=True, gift=True, sc=True, guard=True, danmu_cloud=True)
danmu=True, box=True, gift=True, sc=True, guard=True,
danmu_ranking=3, box_ranking=3, box_profit_ranking=3, gift_ranking=3, sc_ranking=3,
guard_list=True, danmu_cloud=True)
def __str__(self):
return (f"启用: {self.enabled}\n直播时长数据: {self.time}\n粉丝变动数据: {self.fans_change}\n"
f"粉丝团变动数据: {self.fans_medal_change}\n大航海变动数据: {self.guard_change}\n"
f"弹幕相关数据: {self.danmu}\n盲盒相关数据: {self.box}\n礼物相关数据: {self.gift}\n"
f"SC相关数据: {self.sc}\n大航海相关数据: {self.guard}\n生成弹幕词云: {self.danmu_cloud}")
f"SC相关数据: {self.sc}\n大航海相关数据: {self.guard}\n"
f"弹幕排行榜: {f'{self.danmu_ranking}' if self.danmu_ranking else False}\n"
f"盲盒数量排行榜: {f'{self.box_ranking}' if self.box_ranking else False}\n"
f"盲盒盈亏排行榜: {f'{self.box_profit_ranking}' if self.box_profit_ranking else False}\n"
f"礼物排行榜: {f'{self.gift_ranking}' if self.gift_ranking else False}\n"
f"SC 排行榜: {f'{self.sc_ranking}' if self.sc_ranking else False}\n"
f"开通大航海观众列表: {self.guard_list}\n生成弹幕词云: {self.danmu_cloud}")
class DynamicUpdate(BaseModel):

View File

@ -19,11 +19,13 @@ from .user import User
from ..exception import LiveException
from ..utils import config, redis
from ..utils.Painter import DynamicPicGenerator
from ..utils.utils import get_credential, timestamp_format
from ..utils.utils import get_credential, timestamp_format, get_unames_and_faces_by_uids
if typing.TYPE_CHECKING:
from .sender import Bot
jieba.setLogLevel(jieba.logging.INFO)
class Up(BaseModel):
"""
@ -527,6 +529,124 @@ class Up(BaseModel):
"governor_count": await redis.hgeti("RoomGovernorCount", self.room_id)
})
# 弹幕排行
if self.__any_live_report_item_enabled("danmu_ranking"):
ranking_count = max(map(lambda t: t.live_report.danmu_ranking, self.targets))
danmu_ranking = await redis.zrevrangewithscoresi(f"UserDanmuCount:{self.room_id}", 0, ranking_count - 1)
if danmu_ranking:
uids = [x[0] for x in danmu_ranking]
counts = [x[1] for x in danmu_ranking]
unames, faces = await get_unames_and_faces_by_uids(uids)
live_report_param.update({
"danmu_ranking_faces": faces,
"danmu_ranking_unames": unames,
"danmu_ranking_counts": counts
})
# 盲盒数量排行
if self.__any_live_report_item_enabled("box_ranking"):
ranking_count = max(map(lambda t: t.live_report.box_ranking, self.targets))
box_ranking = await redis.zrevrangewithscoresi(f"UserBoxCount:{self.room_id}", 0, ranking_count - 1)
if box_ranking:
uids = [x[0] for x in box_ranking]
counts = [x[1] for x in box_ranking]
unames, faces = await get_unames_and_faces_by_uids(uids)
live_report_param.update({
"box_ranking_faces": faces,
"box_ranking_unames": unames,
"box_ranking_counts": counts
})
# 盲盒盈亏排行
if self.__any_live_report_item_enabled("box_profit_ranking"):
ranking_count = max(map(lambda t: t.live_report.box_profit_ranking, self.targets))
box_profit_ranking = await redis.zrevrangewithscoresf1(
f"UserBoxProfit:{self.room_id}", 0, ranking_count - 1
)
if box_profit_ranking:
uids = [x[0] for x in box_profit_ranking]
counts = [x[1] for x in box_profit_ranking]
unames, faces = await get_unames_and_faces_by_uids(uids)
live_report_param.update({
"box_profit_ranking_faces": faces,
"box_profit_ranking_unames": unames,
"box_profit_ranking_counts": counts
})
# 礼物排行
if self.__any_live_report_item_enabled("gift_ranking"):
ranking_count = max(map(lambda t: t.live_report.gift_ranking, self.targets))
gift_ranking = await redis.zrevrangewithscoresf1(f"UserGiftProfit:{self.room_id}", 0, ranking_count - 1)
if gift_ranking:
uids = [x[0] for x in gift_ranking]
counts = [x[1] for x in gift_ranking]
unames, faces = await get_unames_and_faces_by_uids(uids)
live_report_param.update({
"gift_ranking_faces": faces,
"gift_ranking_unames": unames,
"gift_ranking_counts": counts
})
# SC醒目留言排行
if self.__any_live_report_item_enabled("sc_ranking"):
ranking_count = max(map(lambda t: t.live_report.sc_ranking, self.targets))
sc_ranking = await redis.zrevrangewithscoresi(f"UserScProfit:{self.room_id}", 0, ranking_count - 1)
if sc_ranking:
uids = [x[0] for x in sc_ranking]
counts = [x[1] for x in sc_ranking]
unames, faces = await get_unames_and_faces_by_uids(uids)
live_report_param.update({
"sc_ranking_faces": faces,
"sc_ranking_unames": unames,
"sc_ranking_counts": counts
})
# 开通大航海观众列表
if self.__any_live_report_item_enabled("guard_list"):
captains = await redis.zrevrangewithscoresi(f"UserCaptainCount:{self.room_id}", 0, -1)
commanders = await redis.zrevrangewithscoresi(f"UserCommanderCount:{self.room_id}", 0, -1)
governors = await redis.zrevrangewithscoresi(f"UserGovernorCount:{self.room_id}", 0, -1)
if captains:
uids = [x[0] for x in captains]
counts = [x[1] for x in captains]
unames, faces = await get_unames_and_faces_by_uids(uids)
captain_infos = [[faces[i], unames[i], counts[i]] for i in range(len(counts))]
live_report_param.update({
"captain_infos": captain_infos,
})
if commanders:
uids = [x[0] for x in commanders]
counts = [x[1] for x in commanders]
unames, faces = await get_unames_and_faces_by_uids(uids)
commander_infos = [[faces[i], unames[i], counts[i]] for i in range(len(counts))]
live_report_param.update({
"commander_infos": commander_infos,
})
if governors:
uids = [x[0] for x in governors]
counts = [x[1] for x in governors]
unames, faces = await get_unames_and_faces_by_uids(uids)
governor_infos = [[faces[i], unames[i], counts[i]] for i in range(len(counts))]
live_report_param.update({
"governor_infos": governor_infos,
})
# 弹幕词云
if self.__any_live_report_item_enabled("danmu_cloud"):
all_danmu = " ".join(await redis.lrange(f"RoomDanmu:{self.room_id}", 0, -1))
@ -536,7 +656,6 @@ class Up(BaseModel):
"danmu_cloud": ""
})
else:
jieba.setLogLevel(jieba.logging.INFO)
words = list(jieba.cut(all_danmu))
counts = dict(Counter(words))

Binary file not shown.

After

Width:  |  Height:  |  Size: 9.3 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 9.9 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 48 KiB

View File

@ -26,6 +26,12 @@ class Color(Enum):
+ LIGHTGRAY: 淡灰色
+ RED: 红色
+ GREEN: 绿色
+ DEEPBLUE: 深蓝色
+ LIGHTBLUE: 浅蓝色
+ DEEPRED: 深红色
+ LIGHTRED: 浅红色
+ DEEPGREEN: 深绿色
+ LIGHTGREEN: 浅绿色
+ CRIMSON: 总督红
+ FUCHSIA: 提督紫
+ DEEPSKYBLUE: 舰长蓝
@ -38,6 +44,12 @@ class Color(Enum):
LIGHTGRAY = (244, 244, 244)
RED = (150, 0, 0)
GREEN = (0, 150, 0)
DEEPBLUE = (55, 187, 248)
LIGHTBLUE = (175, 238, 238)
DEEPRED = (240, 128, 128)
LIGHTRED = (255, 220, 220)
DEEPGREEN = (0, 255, 0)
LIGHTGREEN = (184, 255, 184)
CRIMSON = (220, 20, 60)
FUCHSIA = (255, 0, 255)
DEEPSKYBLUE = (0, 191, 255)
@ -104,6 +116,10 @@ class PicGenerator:
def row_space(self):
return self.__ROW_SPACE
@property
def img(self):
return self.__canvas
def set_row_space(self, row_space: int):
"""
设置默认行距
@ -342,6 +358,15 @@ class PicGenerator:
self.__draw.text(xy, chapter, color, self.__chapter_font)
return self
def get_chapter_length(self, s: str) -> int:
"""
获取绘制指定字符串的章节标题所需长度像素数
Args:
s: 要绘制的字符串
"""
return int(self.__draw.textlength(s, self.__chapter_font))
def draw_section(self,
section: str,
color: Union[Color, Tuple[int, int, int]] = Color.BLACK,
@ -365,6 +390,15 @@ class PicGenerator:
self.__draw.text(xy, section, color, self.__section_font)
return self
def get_section_length(self, s: str) -> int:
"""
获取绘制指定字符串的小节标题所需长度像素数
Args:
s: 要绘制的字符串
"""
return int(self.__draw.textlength(s, self.__section_font))
def draw_tip(self,
tip: str,
color: Union[Color, Tuple[int, int, int]] = Color.GRAY,
@ -388,6 +422,15 @@ class PicGenerator:
self.__draw.text(xy, tip, color, self.__tip_font)
return self
def get_tip_length(self, s: str) -> int:
"""
获取绘制指定字符串的提示所需长度像素数
Args:
s: 要绘制的字符串
"""
return int(self.__draw.textlength(s, self.__tip_font))
def draw_text(self,
texts: Union[str, List[str]],
colors: Optional[Union[Color, Tuple[int, int, int], List[Union[Color, Tuple[int, int, int]]]]] = None,
@ -461,6 +504,15 @@ class PicGenerator:
return self
def get_text_length(self, s: str) -> int:
"""
获取绘制指定字符串的文本所需长度像素数
Args:
s: 要绘制的字符串
"""
return int(self.__draw.textlength(s, self.__text_font))
def show(self):
"""
显示图片
@ -511,9 +563,10 @@ class LiveReportGenerator:
width = 1000
height = 10000
top_blank = 75
margin = 50
generator = PicGenerator(width, height)
pic = (generator.set_pos(50, 125)
pic = (generator.set_pos(margin, top_blank + margin)
.draw_rounded_rectangle(0, top_blank, width, height - top_blank, 35, Color.WHITE)
.copy_bottom(35))
@ -524,7 +577,7 @@ class LiveReportGenerator:
logo_limit = (0, 0)
# 主播立绘
if model.logo:
if model.logo or model.logo_base64:
logo = cls.__get_logo(model)
base_left = 650
@ -609,11 +662,24 @@ class LiveReportGenerator:
# 直播数据
if model.danmu or model.box or model.gift or model.sc or model.guard:
(danmu_count, danmu_person_count,
box_count, box_person_count, box_profit, box_beat_percent,
gift_profit, gift_person_count,
sc_profit, sc_person_count,
captain_count, commander_count, governor_count) = cls.__get_live_params(param)
# 弹幕相关
danmu_count = param.get('danmu_count', 0)
danmu_person_count = param.get('danmu_person_count', 0)
# 盲盒相关
box_count = param.get('box_count', 0)
box_person_count = param.get('box_person_count', 0)
box_profit = param.get('box_profit', 0.0)
box_beat_percent = param.get('box_beat_percent', 0.0)
# 礼物相关
gift_profit = param.get('gift_profit', 0.0)
gift_person_count = param.get('gift_person_count', 0)
# SC醒目留言相关
sc_profit = param.get('sc_profit', 0)
sc_person_count = param.get('sc_person_count', 0)
# 大航海相关
captain_count = param.get('captain_count', 0)
commander_count = param.get('commander_count', 0)
governor_count = param.get('governor_count', 0)
if any([danmu_count > 0, box_count > 0, gift_profit > 0, sc_profit > 0,
captain_count > 0, commander_count > 0, governor_count > 0]):
@ -654,6 +720,80 @@ class LiveReportGenerator:
colors.append(Color.CRIMSON)
pic.draw_text(texts, colors)
# 弹幕排行
if model.danmu_ranking:
faces = param.get("danmu_ranking_faces", [])
unames = param.get("danmu_ranking_unames", [])
counts = param.get("danmu_ranking_counts", [])
if counts:
pic.draw_section(f"弹幕排行 (Top {len(counts)})")
ranking_img = cls.__get_ranking(pic, faces, unames, counts, pic.width - (margin * 2))
pic.draw_img_alpha(pic.auto_size_img_by_limit(ranking_img, logo_limit))
# 盲盒数量排行
if model.box_ranking:
faces = param.get("box_ranking_faces", [])
unames = param.get("box_ranking_unames", [])
counts = param.get("box_ranking_counts", [])
if counts:
pic.draw_section(f"盲盒数量排行 (Top {len(counts)})")
ranking_img = cls.__get_ranking(pic, faces, unames, counts, pic.width - (margin * 2))
pic.draw_img_alpha(pic.auto_size_img_by_limit(ranking_img, logo_limit))
# 盲盒盈亏排行
if model.box_profit_ranking:
faces = param.get("box_profit_ranking_faces", [])
unames = param.get("box_profit_ranking_unames", [])
counts = param.get("box_profit_ranking_counts", [])
if counts:
pic.draw_section(f"盲盒盈亏排行 (Top {len(counts)})")
ranking_img = cls.__get_double_ranking(pic, faces, unames, counts, pic.width - (margin * 2))
pic.draw_img_alpha(pic.auto_size_img_by_limit(ranking_img, logo_limit))
# 礼物排行
if model.gift_ranking:
faces = param.get("gift_ranking_faces", [])
unames = param.get("gift_ranking_unames", [])
counts = param.get("gift_ranking_counts", [])
if counts:
pic.draw_section(f"礼物排行 (Top {len(counts)})")
ranking_img = cls.__get_ranking(pic, faces, unames, counts, pic.width - (margin * 2))
pic.draw_img_alpha(pic.auto_size_img_by_limit(ranking_img, logo_limit))
# SC醒目留言排行
if model.sc_ranking:
faces = param.get("sc_ranking_faces", [])
unames = param.get("sc_ranking_unames", [])
counts = param.get("sc_ranking_counts", [])
if counts:
pic.draw_section(f"SC(醒目留言)排行 (Top {len(counts)})")
ranking_img = cls.__get_ranking(pic, faces, unames, counts, pic.width - (margin * 2))
pic.draw_img_alpha(pic.auto_size_img_by_limit(ranking_img, logo_limit))
# 开通大航海观众列表
if model.guard_list:
captain_infos = param.get("captain_infos", [])
commander_infos = param.get("commander_infos", [])
governor_infos = param.get("governor_infos", [])
if any([captain_infos, commander_infos, governor_infos]):
pic.draw_section("本场开通大航海观众")
guard_list_img = cls.__get_guard_list(
pic, captain_infos, commander_infos, governor_infos, pic.width - (margin * 2)
)
pic.draw_img_alpha(pic.auto_size_img_by_limit(guard_list_img, logo_limit))
# 弹幕词云
if model.danmu_cloud:
base64_str = param.get('danmu_cloud', "")
@ -664,7 +804,7 @@ class LiveReportGenerator:
img = pic.auto_size_img_by_limit(Image.open(img_bytes), logo_limit)
pic.draw_img_with_border(img)
# 底部信息
# 底部版权信息,请务必保留此处
pic.set_row_space(10)
pic.draw_text_right(50, "Designed By StarBot", Color.GRAY, logo_limit)
pic.draw_text_right(50, "https://github.com/Starlwr/StarBot", Color.LINK, logo_limit)
@ -683,8 +823,12 @@ class LiveReportGenerator:
Returns:
主播立绘图片
"""
logo_bytes = BytesIO(base64.b64decode(model.logo))
logo = Image.open(logo_bytes)
if model.logo:
logo = Image.open(model.logo)
else:
logo_bytes = BytesIO(base64.b64decode(model.logo_base64))
logo = Image.open(logo_bytes)
logo = logo.crop(logo.getbbox())
logo_height = 800
@ -694,40 +838,279 @@ class LiveReportGenerator:
return logo
@classmethod
def __get_live_params(cls, param: Dict[str, Any]) -> Tuple:
def __get_rank_bar_pic(cls,
width: int,
height: int,
start_color: Union[Color, Tuple[int, int, int]] = Color.DEEPBLUE,
end_color: Union[Color, Tuple[int, int, int]] = Color.LIGHTBLUE,
reverse: bool = False) -> Image:
"""
从传入直播报告参数中取出直播相关参数
生成排行榜中排行条图片
Args:
param: 直播报告参数
Returns:
直播相关参数
width: 排行条长度
height: 排行条宽度
start_color: 排行条渐变起始颜色默认深蓝色 (57, 119, 230)
end_color: 排行条渐变终止颜色默认浅蓝色 (55, 187, 248)
reverse: 是否生成反向排行条用于双向排行榜的负数排行条默认False
"""
# 弹幕相关
danmu_count = param.get('danmu_count', 0)
danmu_person_count = param.get('danmu_person_count', 0)
# 盲盒相关
box_count = param.get('box_count', 0)
box_person_count = param.get('box_person_count', 0)
box_profit = param.get('box_profit', 0.0)
box_beat_percent = param.get('box_beat_percent', 0.0)
# 礼物相关
gift_profit = param.get('gift_profit', 0.0)
gift_person_count = param.get('gift_person_count', 0)
# SC醒目留言相关
sc_profit = param.get('sc_profit', 0)
sc_person_count = param.get('sc_person_count', 0)
# 大航海相关
captain_count = param.get('captain_count', 0)
commander_count = param.get('commander_count', 0)
governor_count = param.get('governor_count', 0)
if isinstance(start_color, Color):
start_color = start_color.value
if isinstance(end_color, Color):
end_color = end_color.value
if reverse:
start_color, end_color = end_color, start_color
return (danmu_count, danmu_person_count,
box_count, box_person_count, box_profit, box_beat_percent,
gift_profit, gift_person_count,
sc_profit, sc_person_count,
captain_count, commander_count, governor_count)
r_step = (end_color[0] - start_color[0]) / width
g_step = (end_color[1] - start_color[1]) / width
b_step = (end_color[2] - start_color[2]) / width
now_color = [start_color[0], start_color[1], start_color[2]]
bar = Image.new("RGBA", (width, 1))
draw = ImageDraw.Draw(bar)
for i in range(width):
draw.point((i, 0), (int(now_color[0]), int(now_color[1]), int(now_color[2])))
now_color[0] += r_step
now_color[1] += g_step
now_color[2] += b_step
bar = bar.resize((width, height))
mask = Image.new("L", (width, height), 255)
mask_draw = ImageDraw.Draw(mask)
if not reverse:
mask_draw.polygon(((width - height, height), (width, 0), (width, height)), 0)
else:
mask_draw.polygon(((0, 0), (0, height), (height, height)), 0)
bar.putalpha(mask)
return bar
@classmethod
def __get_ranking(cls,
pic: PicGenerator,
faces: List[Image.Image],
unames: List[str],
counts: Union[List[int], List[float]],
width: int,
start_color: Union[Color, Tuple[int, int, int]] = Color.DEEPBLUE,
end_color: Union[Color, Tuple[int, int, int]] = Color.LIGHTBLUE) -> Image:
"""
绘制排行榜
Args:
pic: 绘图器实例
faces: 头像图片列表按照数量列表降序排序
unames: 昵称列表按照数量列表降序排序
counts: 数量列表降序排序
width: 排行榜图片宽度
start_color: 排行条渐变起始颜色默认深蓝色 (57, 119, 230)
end_color: 排行条渐变终止颜色默认浅蓝色 (55, 187, 248)
"""
count = len(counts)
if count == 0 or len(faces) != len(unames) or len(unames) != len(counts):
raise ValueError
face_size = 100
offset = 10
bar_height = 30
bar_x = face_size - offset
top_bar_width = width - face_size + offset
top_count = counts[0]
chart = PicGenerator(width, (face_size * count) + (pic.row_space * (count - 1)))
chart.set_row_space(pic.row_space)
for i in range(count):
bar_width = int(counts[i] / top_count * top_bar_width)
if bar_width != 0:
bar = cls.__get_rank_bar_pic(bar_width, bar_height, start_color, end_color)
chart.draw_img_alpha(bar, (bar_x, chart.y + int((face_size - bar_height) / 2)))
chart.draw_tip(unames[i], Color.BLACK, (bar_x + (offset * 2), chart.y))
count_pos = (max(chart.x + bar_width, bar_x + (offset * 3) + chart.get_tip_length(unames[i])), chart.y)
chart.draw_tip(str(counts[i]), xy=count_pos)
chart.draw_img_alpha(mask_round(faces[i].resize((face_size, face_size)).convert("RGBA")))
return chart.img
@classmethod
def __get_double_ranking(cls,
pic: PicGenerator,
faces: List[Image.Image],
unames: List[str],
counts: Union[List[int], List[float]],
width: int,
start_color: Union[Color, Tuple[int, int, int]] = Color.DEEPRED,
end_color: Union[Color, Tuple[int, int, int]] = Color.LIGHTRED,
reverse_start_color: Union[Color, Tuple[int, int, int]] = Color.DEEPGREEN,
reverse_end_color: Union[Color, Tuple[int, int, int]] = Color.LIGHTGREEN) -> Image:
"""
绘制双向排行榜
Args:
pic: 绘图器实例
faces: 头像图片列表按照数量列表降序排序
unames: 昵称列表按照数量列表降序排序
counts: 数量列表降序排序
width: 排行榜图片宽度
start_color: 正向排行条渐变起始颜色数量为正时使用默认深红色 (57, 119, 230)
end_color: 正向排行条渐变终止颜色数量为正时使用默认浅红色 (55, 187, 248)
reverse_start_color: 反向排行条渐变起始颜色数量为负时使用默认深绿色 (57, 119, 230)
reverse_end_color: 反向排行条渐变终止颜色数量为负时使用默认浅绿色 (55, 187, 248)
"""
count = len(counts)
if count == 0 or len(faces) != len(unames) or len(unames) != len(counts):
raise ValueError
face_size = 100
offset = 10
bar_height = 30
face_x = int((width - face_size) / 2)
bar_x = face_x + face_size - offset
reverse_bar_x = face_x + offset
top_bar_width = (width - face_size) / 2 + offset
top_count = max(max(counts), abs(min(counts)))
chart = PicGenerator(width, (face_size * count) + (pic.row_space * (count - 1)))
chart.set_row_space(pic.row_space)
for i in range(count):
bar_width = int(abs(counts[i]) / top_count * top_bar_width)
if counts[i] > 0:
bar = cls.__get_rank_bar_pic(bar_width, bar_height, start_color, end_color)
chart.draw_img_alpha(bar, (bar_x, chart.y + int((face_size - bar_height) / 2)))
elif counts[i] < 0:
bar = cls.__get_rank_bar_pic(bar_width, bar_height, reverse_start_color, reverse_end_color, True)
chart.draw_img_alpha(bar, (reverse_bar_x - bar_width, chart.y + int((face_size - bar_height) / 2)))
if counts[i] >= 0:
chart.draw_tip(unames[i], Color.BLACK, (bar_x + (offset * 2), chart.y))
count_pos = (max(face_x + bar_width, bar_x + (offset * 3) + chart.get_tip_length(unames[i])), chart.y)
chart.draw_tip(str(counts[i]), xy=count_pos)
else:
uname_length = chart.get_tip_length(unames[i])
count_length = chart.get_tip_length(str(counts[i]))
chart.draw_tip(unames[i], Color.BLACK, (reverse_bar_x - (offset * 2) - uname_length, chart.y))
count_pos = (min(face_x + face_size - bar_width - count_length,
reverse_bar_x - (offset * 3) - uname_length - count_length), chart.y)
chart.draw_tip(str(counts[i]), xy=count_pos)
chart.set_pos(x=face_x).draw_img_alpha(mask_round(faces[i].resize((face_size, face_size)).convert("RGBA")))
chart.set_pos(x=0)
return chart.img
@classmethod
def __get_guard_line_pic(cls,
pic: PicGenerator,
width: int,
face_size: int,
faces: List[Image.Image],
unames: List[str],
counts: List[int],
icon: Image.Image,
color: Union[Color, Tuple[int, int, int]]) -> Image:
"""
生成大航海列表中每行图片
Args:
pic: 绘图器实例
width: 大航海列表中每行图片长度
face_size: 头像尺寸
faces: 头像图片列表按照数量列表降序排序
unames: 昵称列表按照数量列表降序排序
counts: 数量列表降序排序
icon: 大航海图标
"""
count = len(counts)
if count == 0 or len(faces) != len(unames) or len(unames) != len(counts):
raise ValueError
text_size = 30
icon_size = int(face_size * 1.5)
face_padding = int((icon_size - face_size) / 2)
margin = int((width - (icon_size * count)) / (count + 1))
xs = [margin + (i * (icon_size + margin)) for i in range(count)]
line = PicGenerator(width, icon_size + int(pic.row_space * 2.5) + (text_size * 2))
line.set_row_space(pic.row_space)
icon = icon.resize((icon_size, icon_size))
for i, x in enumerate(xs):
line.draw_img_alpha(mask_round(faces[i].resize((face_size, face_size))), (x + face_padding, face_padding))
if i != count - 1:
line.draw_img_alpha(icon, (x, 0))
else:
line.set_pos(x=x).draw_img_alpha(icon).set_pos(x=0)
for i, x in enumerate(xs):
uname = limit_str_length(unames[i], 8)
uname_length = line.get_text_length(uname)
uname_x_offset = int((icon_size - uname_length) / 2)
count = f"{counts[i]}"
count_length = line.get_text_length(count)
count_x_offset = int((icon_size - count_length) / 2)
line.draw_text(uname, color, (x + uname_x_offset, line.y))
line.draw_text(count, Color.BLACK, (x + count_x_offset, line.y + text_size + int(line.row_space / 2)))
return line.img
@classmethod
def __get_guard_list(cls,
pic: PicGenerator,
captains: List[List[Union[Image.Image, str, int]]],
commanders: List[List[Union[Image.Image, str, int]]],
governors: List[List[Union[Image.Image, str, int]]],
width: int) -> Image:
"""
绘制大航海列表
Args:
pic: 绘图器实例
captains: 舰长信息
commanders: 提督信息
governors: 总督信息
width: 大航海列表图片宽度
"""
face_size = 150
icon_size = int(face_size * 1.5)
text_size = 30
line_count = 3
line_height = icon_size + int(pic.row_space * 2.5) + (text_size * 2)
resource_base_path = os.path.dirname(os.path.dirname(__file__))
icon_map = {
0: Image.open(f"{resource_base_path}/resource/governor.png").convert("RGBA"),
1: Image.open(f"{resource_base_path}/resource/commander.png").convert("RGBA"),
2: Image.open(f"{resource_base_path}/resource/captain.png").convert("RGBA")
}
color_map = {
0: Color.CRIMSON,
1: Color.FUCHSIA,
2: Color.DEEPSKYBLUE
}
captains = split_list(captains, line_count)
commanders = split_list(commanders, line_count)
governors = split_list(governors, line_count)
img = PicGenerator(width, (len(governors) + len(commanders) + len(captains)) * line_height)
img.set_row_space(pic.row_space)
for i, lst in enumerate([governors, commanders, captains]):
if not lst:
continue
for line in lst:
faces = [x[0] for x in line]
unames = [x[1] for x in line]
counts = [x[2] for x in line]
img.draw_img_alpha(
cls.__get_guard_line_pic(pic, width, face_size, faces, unames, counts, icon_map[i], color_map[i])
).move_pos(0, -pic.row_space)
return img.img
class DynamicPicGenerator:
@ -781,7 +1164,7 @@ class DynamicPicGenerator:
await cls.__draw_by_type(pic, dynamic_type, card, dynamic_id, display,
text_margin, img_margin, False, origin_dynamic_id)
# 底部信息
# 底部版权信息,请务必保留此处
pic.move_pos(0, 15)
pic.draw_text_right(25, "Designed By StarBot", Color.GRAY)
pic.draw_text_right(25, "https://github.com/Starlwr/StarBot", Color.LINK)
@ -789,6 +1172,55 @@ class DynamicPicGenerator:
return pic.base64()
@classmethod
async def __draw_header(cls,
pic: PicGenerator,
face: Image.Image,
pendant: Image.Image,
official: int,
vip: bool,
uname: str,
timestamp: int) -> PicGenerator:
"""
绘制动态头部
Args:
pic: 绘图器实例
face: 头像图片
pendant: 头像挂件图片
official: 认证类型
vip: 是否为大会员
uname: 昵称
timestamp: 动态时间戳
"""
face_size = (100, 100)
face = face.resize(face_size, Resampling.LANCZOS)
face = mask_round(face)
pic.draw_img_alpha(face, (50, 50))
if pendant is not None:
pendant_size = (170, 170)
pendant = pendant.resize(pendant_size, Resampling.LANCZOS).convert('RGBA')
pic.draw_img_alpha(pendant, (15, 15))
pic.move_pos(15, 0)
if official == 0:
pic.draw_img_alpha(Image.open(f"{cls.__resource_base_path}/resource/personal.png"), (118, 118))
elif official == 1:
pic.draw_img_alpha(Image.open(f"{cls.__resource_base_path}/resource/business.png"), (118, 118))
elif vip:
pic.draw_img_alpha(Image.open(f"{cls.__resource_base_path}/resource/vip.png"), (118, 118))
if vip:
pic.draw_text(uname, Color.PINK)
else:
pic.draw_text(uname, Color.BLACK)
pic.draw_tip(timestamp_format(timestamp, "%Y-%m-%d %H:%M"))
pic.set_pos(y=200)
return pic
@classmethod
async def __draw_by_type(cls,
pic: PicGenerator,
@ -813,6 +1245,7 @@ class DynamicPicGenerator:
img_margin: 图片外边距
forward: 当前是否为转发动态的源动态
"""
async def download_img(mod: Dict[str, Any]):
"""
下载表情图片
@ -885,55 +1318,6 @@ class DynamicPicGenerator:
await cls.__draw_add_on_card(pic, add_on_card, text_margin, forward)
@classmethod
async def __draw_header(cls,
pic: PicGenerator,
face: Image.Image,
pendant: Image.Image,
official: int,
vip: bool,
uname: str,
timestamp: int) -> PicGenerator:
"""
绘制动态头部
Args:
pic: 绘图器实例
face: 头像图片
pendant: 头像挂件图片
official: 认证类型
vip: 是否为大会员
uname: 昵称
timestamp: 动态时间戳
"""
face_size = (100, 100)
face = face.resize(face_size, Resampling.LANCZOS)
face = mask_round(face)
pic.draw_img_alpha(face, (50, 50))
if pendant is not None:
pendant_size = (170, 170)
pendant = pendant.resize(pendant_size, Resampling.LANCZOS).convert('RGBA')
pic.draw_img_alpha(pendant, (15, 15))
pic.move_pos(15, 0)
if official == 0:
pic.draw_img_alpha(Image.open(f"{cls.__resource_base_path}/resource/personal.png"), (118, 118))
elif official == 1:
pic.draw_img_alpha(Image.open(f"{cls.__resource_base_path}/resource/business.png"), (118, 118))
elif vip:
pic.draw_img_alpha(Image.open(f"{cls.__resource_base_path}/resource/vip.png"), (118, 118))
if vip:
pic.draw_text(uname, Color.PINK)
else:
pic.draw_text(uname, Color.BLACK)
pic.draw_tip(timestamp_format(timestamp, "%Y-%m-%d %H:%M"))
pic.set_pos(y=200)
return pic
@classmethod
async def __get_content_line_imgs(cls, modules: List[Dict[str, Any]], width: int) -> List[Image.Image]:
"""

View File

@ -1,4 +1,4 @@
from typing import Any, Union, Optional, List
from typing import Any, Optional, Union, Tuple, List
import aioredis
from loguru import logger
@ -86,6 +86,16 @@ async def zrank(key: str, member: str) -> int:
return rank
async def zrevrangewithscoresi(key: str, start: int, end: int) -> List[Tuple[str, int]]:
return list(map(lambda x: (x[0].decode(), int(x[1])), await __redis.zrevrange(key, start, end, True)))
async def zrevrangewithscoresf1(key: str, start: int, end: int) -> List[Tuple[str, float]]:
return list(map(
lambda x: (x[0].decode(), float("{:.1f}".format(float(x[1])))), await __redis.zrevrange(key, start, end, True)
))
async def zadd(key: str, member: str, score: Union[int, float]):
await __redis.zadd(key, {member: score})

View File

@ -1,18 +1,18 @@
"""
通用工具库
"""
import asyncio
import json
import os
import time
from io import BytesIO
from typing import List, Dict, Optional, Any
from typing import Tuple, List, Dict, Sized, Optional, Any
from PIL import Image, ImageDraw
from . import config
from .Credential import Credential
from .network import get_session
from .network import get_session, request
def get_api(field: str) -> Dict:
@ -79,7 +79,7 @@ async def open_url_image(url: str) -> Optional[Image.Image]:
return image
def split_list(lst: List[Any], n: int) -> List[List[Any]]:
def split_list(lst: Sized, n: int) -> List[List[Any]]:
"""
将传入列表划分为若干子列表每个子列表包含 n 个元素
@ -145,3 +145,22 @@ def mask_rounded_rectangle(img: Image.Image, radius: int = 10) -> Image.Image:
mask_draw.rounded_rectangle((0, 0, img_width, img_height), radius, 255)
img.putalpha(mask)
return img
async def get_unames_and_faces_by_uids(uids: List[str]) -> Tuple[List[str], List[Image.Image]]:
"""
根据 UID 列表批量获取昵称和头像图片
Args:
uids: UID 列表
Returns:
昵称列表和头像图片列表组成的元组
"""
user_info_url = f"https://api.vc.bilibili.com/account/v1/user/cards?uids={','.join(uids)}"
infos_list = await request("GET", user_info_url)
infos = dict(zip([x["mid"] for x in infos_list], infos_list))
unames = [infos[int(uid)]["name"] for uid in uids]
download_face_tasks = [open_url_image(infos[int(uid)]["face"]) for uid in uids]
faces = await asyncio.gather(*download_face_tasks)
return (unames, faces)