feat: Live report support

This commit is contained in:
LWR 2022-12-15 21:18:21 +08:00
parent 1ca5af4413
commit 52e43eb1bb
7 changed files with 1091 additions and 24 deletions

View File

@ -1,18 +1,24 @@
import asyncio
import base64
import os
import time
import typing
from asyncio import AbstractEventLoop
from typing import Optional, List, Any
from collections import Counter
from io import BytesIO
from typing import Optional, Any, Union, List
import jieba
from loguru import logger
from pydantic import BaseModel, PrivateAttr
from wordcloud import WordCloud
from .live import LiveDanmaku, LiveRoom
from .model import PushTarget
from .user import User
from ..exception import LiveException
from ..utils import config, redis
from ..utils.utils import get_credential
from ..utils.utils import get_credential, timestamp_format
if typing.TYPE_CHECKING:
from .sender import Bot
@ -35,6 +41,12 @@ class Up(BaseModel):
room_id: Optional[int] = None
"""主播直播间房间号,无需手动传入,会自动获取"""
__user: Optional[User] = PrivateAttr()
"""用户实例,用于获取用户相关信息"""
__live_room: Optional[LiveRoom] = PrivateAttr()
"""直播间实例,用于获取直播间相关信息"""
__room: Optional[LiveDanmaku] = PrivateAttr()
"""直播间连接实例"""
@ -49,6 +61,8 @@ class Up(BaseModel):
def __init__(self, **data: Any):
super().__init__(**data)
self.__user = None
self.__live_room = None
self.__room = None
self.__is_reconnect = False
self.__loop = asyncio.get_event_loop()
@ -66,17 +80,24 @@ class Up(BaseModel):
def __any_live_report_enabled(self):
return any(map(lambda conf: conf.enabled, map(lambda group: group.live_report, self.targets)))
def __any_live_report_item_enabled(self, attribute: Union[str, List[str]]):
if isinstance(attribute, list):
return any([self.__any_live_report_item_enabled(a) for a in attribute])
return any(map(lambda t: t.live_report.enabled and t.live_report.__getattribute__(attribute), self.targets))
async def connect(self):
"""
连接直播间
"""
self.__user = User(self.uid, get_credential())
if not all([self.uname, self.room_id]):
user = User(self.uid, get_credential())
user_info = await user.get_user_info()
user_info = await self.__user.get_user_info()
self.uname = user_info["name"]
if user_info["live_room"] is None:
raise LiveException(f"UP 主 {self.uname} ( UID: {self.uid} ) 还未开通直播间")
self.room_id = user_info["live_room"]["roomid"]
self.__live_room = LiveRoom(self.room_id, get_credential())
self.__room = LiveDanmaku(self.room_id, credential=get_credential())
# 开播推送开关和下播推送开关均处于关闭状态时跳过连接直播间,以节省性能
@ -91,13 +112,15 @@ class Up(BaseModel):
@self.__room.on("VERIFICATION_SUCCESSFUL")
async def on_link(event):
"""
连接成功事件
"""
logger.debug(f"{self.uname} (VERIFICATION_SUCCESSFUL): {event}")
if self.__is_reconnect:
logger.success(f"直播间 {self.room_id} 重新连接成功")
logger.success(f"已重新连接到 {self.uname}直播间 {self.room_id}")
live_room = LiveRoom(self.room_id, get_credential())
room_info = await live_room.get_room_play_info()
room_info = await self.__live_room.get_room_play_info()
last_status = await redis.hgeti("LiveStatus", self.room_id)
now_status = room_info["live_status"]
@ -116,18 +139,20 @@ class Up(BaseModel):
param = {}
await live_off(param)
else:
self.__is_reconnect = True
logger.success(f"已成功连接到 {self.uname} 的直播间 {self.room_id}")
logger.success(f"已成功连接到 {self.uname} 的直播间 {self.room_id}")
self.__is_reconnect = True
@self.__room.on("LIVE")
async def live_on(event):
"""
开播事件
"""
logger.debug(f"{self.uname} (LIVE): {event}")
# 是否为真正开播
if "live_time" in event["data"]:
live_room = LiveRoom(self.room_id, get_credential())
room_info = await live_room.get_room_info()
room_info = await self.__live_room.get_room_info()
self.uname = room_info["anchor_info"]["base_info"]["uname"]
await redis.hset("LiveStatus", self.room_id, 1)
@ -143,12 +168,23 @@ class Up(BaseModel):
lambda t: t.live_on.enabled)
else:
logger.opt(colors=True).info(f"<magenta>[开播] {self.uname} ({self.room_id})</>")
await redis.hset("StartTime", self.room_id, room_info["room_info"]["live_start_time"])
live_start_time = room_info["room_info"]["live_start_time"]
fans_count = room_info["anchor_info"]["relation_info"]["attention"]
if room_info["anchor_info"]["medal_info"] is None:
fans_medal_count = 0
else:
fans_medal_count = room_info["anchor_info"]["medal_info"]["fansclub"]
guard_count = room_info["guard_info"]["count"]
await redis.hset("StartTime", self.room_id, live_start_time)
await redis.hset(f"FansCount:{self.room_id}", live_start_time, fans_count)
await redis.hset(f"FansMedalCount:{self.room_id}", live_start_time, fans_medal_count)
await redis.hset(f"GuardCount:{self.room_id}", live_start_time, guard_count)
await self.__accumulate_data()
await self.__reset_data()
# 推送消息
# 推送开播消息
arg_base = room_info["room_info"]
args = {
"{uname}": self.uname,
@ -160,6 +196,9 @@ class Up(BaseModel):
@self.__room.on("PREPARING")
async def live_off(event):
"""
下播事件
"""
logger.debug(f"{self.uname} (PREPARING): {event}")
await redis.hset("LiveStatus", self.room_id, 0)
@ -167,21 +206,111 @@ class Up(BaseModel):
logger.opt(colors=True).info(f"<magenta>[下播] {self.uname} ({self.room_id})</>")
# 推送消息
args = {
# 生成下播消息和直播报告占位符参数
live_off_args = {
"{uname}": self.uname
}
self.__bot.send_live_off(self, args)
live_report_param = await self.__generate_live_report_param()
# 推送下播消息和直播报告
self.__bot.send_live_off(self, live_off_args)
self.__bot.send_live_report(self, live_report_param)
danmu_items = ["danmu", "danmu_cloud"]
if not config.get("ONLY_HANDLE_NECESSARY_EVENT") or self.__any_live_report_item_enabled(danmu_items):
@self.__room.on("DANMU_MSG")
async def on_danmu(event):
"""
弹幕事件
"""
logger.debug(f"{self.uname} (DANMU_MSG): {event}")
base = event["data"]["info"]
uid = base[2][0]
content = base[1]
# 弹幕统计
await redis.hincrby("RoomDanmuCount", self.room_id)
await redis.zincrby(f"UserDanmuCount:{self.room_id}", uid)
# 弹幕词云所需弹幕记录
if isinstance(base[0][13], str):
await redis.rpush(f"RoomDanmu:{self.room_id}", content)
gift_items = ["box", "gift"]
if not config.get("ONLY_HANDLE_NECESSARY_EVENT") or self.__any_live_report_item_enabled(gift_items):
@self.__room.on("SEND_GIFT")
async def on_gift(event):
"""
礼物事件
"""
logger.debug(f"{self.uname} (SEND_GIFT): {event}")
base = event["data"]["data"]
uid = base["uid"]
num = base["num"]
price = float("{:.1f}".format((base["discount_price"] / 1000) * num))
# 礼物统计
if base["total_coin"] != 0 and base["discount_price"] != 0:
await redis.hincrbyfloat("RoomGiftProfit", self.room_id, price)
await redis.zincrby(f"UserGiftProfit:{self.room_id}", uid, price)
# 盲盒统计
if base["blind_gift"] is not None:
box_price = base["total_coin"] / 1000
gift_num = base["num"]
gift_price = base["discount_price"] / 1000
profit = float("{:.1f}".format((gift_price * gift_num) - box_price))
await redis.hincrby("RoomBoxCount", self.room_id, gift_num)
await redis.zincrby(f"UserBoxCount:{self.room_id}", uid, gift_num)
await redis.hincrbyfloat("RoomBoxProfit", self.room_id, profit)
await redis.zincrby(f"UserBoxProfit:{self.room_id}", uid, profit)
if not config.get("ONLY_HANDLE_NECESSARY_EVENT") or self.__any_live_report_item_enabled("sc"):
@self.__room.on("SUPER_CHAT_MESSAGE")
async def on_sc(event):
"""
SC醒目留言事件
"""
logger.debug(f"{self.uname} (SUPER_CHAT_MESSAGE): {event}")
base = event["data"]["data"]
uid = base["uid"]
price = base["price"]
# SC 统计
await redis.hincrby("RoomScProfit", self.room_id, price)
await redis.zincrby(f"UserScProfit:{self.room_id}", uid, price)
if not config.get("ONLY_HANDLE_NECESSARY_EVENT") or self.__any_live_report_item_enabled("guard"):
@self.__room.on("GUARD_BUY")
async def on_guard(event):
"""
大航海事件
"""
logger.debug(f"{self.uname} (GUARD_BUY): {event}")
base = event["data"]["data"]
uid = base["uid"]
guard_type = base["gift_name"]
month = base["num"]
# 上舰统计
type_mapping = {
"舰长": "Captain",
"提督": "Commander",
"总督": "Governor"
}
await redis.hincrby(f"Room{type_mapping[guard_type]}Count", self.room_id, month)
await redis.zincrby(f"User{type_mapping[guard_type]}Count:{self.room_id}", uid, month)
async def __accumulate_data(self):
"""
累计直播间数据
"""
# 盲盒记录,用于统计击败直播间百分比
if await redis.hgeti("RoomBoxCount", self.room_id) > 0:
await redis.rpush("BoxProfit", await redis.hgetf1("RoomBoxProfit", self.room_id))
# 累计弹幕数
await redis.hincrby("RoomDanmuTotal", self.room_id, await redis.hgeti("RoomDanmuCount", self.room_id))
await redis.zunionstore(f"UserDanmuTotal:{self.room_id}", f"UserDanmuCount:{self.room_id}")
@ -254,6 +383,127 @@ class Up(BaseModel):
await redis.hset(f"RoomGovernorCount", self.room_id, 0)
await redis.delete(f"UserGovernorCount:{self.room_id}")
async def __generate_live_report_param(self):
"""
计算直播报告所需数据
"""
live_report_param = {}
# 主播信息
live_report_param.update({
"uname": self.uname,
"room_id": self.room_id
})
# 直播时间段和直播时长
start_time = await redis.hgeti("StartTime", self.room_id)
end_time = await redis.hgeti("EndTime", self.room_id)
seconds = end_time - start_time
minute, second = divmod(seconds, 60)
hour, minute = divmod(minute, 60)
live_report_param.update({
"start_time": timestamp_format(start_time),
"end_time": timestamp_format(end_time),
"hour": hour,
"minute": minute,
"second": second
})
# 基础数据变动
if self.__any_live_report_item_enabled(["fans_change", "fans_medal_change", "guard_change"]):
room_info = await self.__live_room.get_room_info()
live_start_time = await redis.hgeti("StartTime", self.room_id)
if await redis.hexists(f"FansCount:{self.room_id}", live_start_time):
fans_count = await redis.hgeti(f"FansCount:{self.room_id}", live_start_time)
else:
fans_count = -1
if await redis.hexists(f"FansMedalCount:{self.room_id}", live_start_time):
fans_medal_count = await redis.hgeti(f"FansMedalCount:{self.room_id}", live_start_time)
else:
fans_medal_count = -1
if await redis.hexists(f"GuardCount:{self.room_id}", live_start_time):
guard_count = await redis.hgeti(f"GuardCount:{self.room_id}", live_start_time)
else:
guard_count = -1
if room_info["anchor_info"]["medal_info"] is None:
fans_medal_count_after = 0
else:
fans_medal_count_after = room_info["anchor_info"]["medal_info"]["fansclub"]
live_report_param.update({
# 粉丝变动
"fans_before": fans_count,
"fans_after": room_info["anchor_info"]["relation_info"]["attention"],
# 粉丝团(粉丝勋章数)变动
"fans_medal_before": fans_medal_count,
"fans_medal_after": fans_medal_count_after,
# 大航海变动
"guard_before": guard_count,
"guard_after": room_info["guard_info"]["count"]
})
# 直播数据
box_profit = await redis.hgetf1("RoomBoxProfit", self.room_id)
count = await redis.zcard("BoxProfit")
await redis.zadd("BoxProfit", f"{start_time}-{self.uid}-{self.uname}", box_profit)
rank = await redis.zrank("BoxProfit", f"{start_time}-{self.uid}-{self.uname}")
percent = float("{:.2f}".format(float("{:.4f}".format(rank / count)) * 100)) if count != 0 else 100
live_report_param.update({
# 弹幕相关
"danmu_count": await redis.hgeti("RoomDanmuCount", self.room_id),
"danmu_person_count": await redis.zcard(f"UserDanmuCount:{self.room_id}"),
# 盲盒相关
"box_count": await redis.hgeti("RoomBoxCount", self.room_id),
"box_person_count": await redis.zcard(f"UserBoxCount:{self.room_id}"),
"box_profit": box_profit,
"box_beat_percent": percent,
# 礼物相关
"gift_profit": await redis.hgetf1("RoomGiftProfit", self.room_id),
"gift_person_count": await redis.zcard(f"UserGiftProfit:{self.room_id}"),
# SC醒目留言相关
"sc_profit": await redis.hgeti("RoomScProfit", self.room_id),
"sc_person_count": await redis.zcard(f"UserScProfit:{self.room_id}"),
# 大航海相关
"captain_count": await redis.hgeti("RoomCaptainCount", self.room_id),
"commander_count": await redis.hgeti("RoomCommanderCount", self.room_id),
"governor_count": await redis.hgeti("RoomGovernorCount", self.room_id)
})
# 弹幕词云
if self.__any_live_report_item_enabled("danmu_cloud"):
all_danmu = " ".join(await redis.lrange(f"RoomDanmu:{self.room_id}", 0, -1))
if len(all_danmu) == 0:
live_report_param.update({
"danmu_cloud": ""
})
else:
jieba.setLogLevel(jieba.logging.INFO)
words = list(jieba.cut(all_danmu))
counts = dict(Counter(words))
font_base_path = os.path.dirname(os.path.dirname(__file__))
io = BytesIO()
word_cloud = WordCloud(width=900,
height=450,
font_path=f"{font_base_path}/font/{config.get('DANMU_CLOUD_FONT')}",
background_color=config.get("DANMU_CLOUD_BACKGROUND_COLOR"),
max_font_size=config.get("DANMU_CLOUD_MAX_FONT_SIZE"),
max_words=config.get("DANMU_CLOUD_MAX_WORDS"))
word_cloud.generate_from_frequencies(counts)
word_cloud.to_image().save(io, format="png")
base64_str = base64.b64encode(io.getvalue()).decode()
live_report_param.update({
"danmu_cloud": base64_str
})
return live_report_param
def __eq__(self, other):
if isinstance(other, Up):
return self.uid == other.uid

View File

@ -14,6 +14,7 @@ from .model import LiveOn, LiveOff, DynamicUpdate, Message, PushType, PushTarget
from .room import Up
from ..utils import config
from ..utils.AsyncEvent import AsyncEvent
from ..utils.Painter import LiveReportGenerator
class Bot(BaseModel, AsyncEvent):
@ -145,7 +146,7 @@ class Bot(BaseModel, AsyncEvent):
def __send_push_message(self, up: Up,
type_selector: Callable[[PushTarget], Union[LiveOn, LiveOff, DynamicUpdate]],
args: Dict):
args: Dict[str, Any]):
"""
发送推送消息至 UP 主下启用此推送类型的推送目标
@ -167,7 +168,7 @@ class Bot(BaseModel, AsyncEvent):
select.message = select.message.replace(arg, str(val))
self.send_message(Message(id=target.id, content=select.message, type=target.type))
def send_live_on(self, up: Up, args: Dict):
def send_live_on(self, up: Up, args: Dict[str, Any]):
"""
发送开播消息至 UP 主下启用开播推送的推送目标
@ -177,7 +178,7 @@ class Bot(BaseModel, AsyncEvent):
"""
self.__send_push_message(up, lambda t: t.live_on, args)
def send_live_off(self, up: Up, args: Dict):
def send_live_off(self, up: Up, args: Dict[str, Any]):
"""
发送下播消息至 UP 主下启用下播推送的推送目标
@ -187,7 +188,19 @@ class Bot(BaseModel, AsyncEvent):
"""
self.__send_push_message(up, lambda t: t.live_off, args)
def send_dynamic_update(self, up: Up, args: Dict):
def send_live_report(self, up: Up, param: Dict[str, Any]):
"""
发送直播报告消息至 UP 主下启用直播报告推送的推送目标
Args:
up: 要发送的 UP 主实例
param: 直播报告参数
"""
for target in filter(lambda t: t.live_report.enabled, up.targets):
base64str = LiveReportGenerator.generate(param, target.live_report)
self.send_message(Message(id=target.id, content="".join(["{base64pic=", base64str, "}"]), type=target.type))
def send_dynamic_update(self, up: Up, args: Dict[str, Any]):
"""
发送动态消息至 UP 主下启用动态推送的推送目标

BIN
starbot/font/bold.ttf Normal file

Binary file not shown.

BIN
starbot/font/normal.ttf Normal file

Binary file not shown.

758
starbot/utils/Painter.py Normal file
View File

@ -0,0 +1,758 @@
import base64
import os
from enum import Enum
from io import BytesIO
from typing import Optional, Union, Tuple, List, Dict, Any
from PIL import Image, ImageDraw, ImageFont
from ..core.model import LiveReport
from ..utils import config
class Color(Enum):
"""
常用颜色 RGB 枚举
+ BLACK: 黑色
+ WHITE: 白色
+ GRAY: 灰色
+ RED: 红色
+ GREEN: 绿色
+ CRIMSON: 总督红
+ FUCHSIA: 提督紫
+ DEEPSKYBLUE: 舰长蓝
+ LINK: 超链接蓝
"""
BLACK = (0, 0, 0)
WHITE = (255, 255, 255)
GRAY = (169, 169, 169)
RED = (150, 0, 0)
GREEN = (0, 150, 0)
CRIMSON = (220, 20, 60)
FUCHSIA = (255, 0, 255)
DEEPSKYBLUE = (0, 191, 255)
LINK = (6, 69, 173)
class PicGenerator:
"""
基于 Pillow 的绘图器可使用流式调用方法
"""
def __init__(self,
width: int,
height: int,
normal_font: str = config.get("PAINTER_NORMAL_FONT"),
bold_font: str = config.get("PAINTER_BOLD_FONT")):
"""
初始化绘图器
Args:
width: 画布宽度
height: 画布高度
normal_font: 普通字体路径默认config.get("PAINTER_NORMAL_FONT") = "normal.ttf"
bold_font: 粗体字体路径默认config.get("PAINTER_BOLD_FONT") = "bold.ttf"
"""
self.width = width
self.height = height
self.__canvas = Image.new("RGBA", (self.width, self.height))
self.__draw = ImageDraw.Draw(self.__canvas)
font_base_path = os.path.dirname(os.path.dirname(__file__))
self.__chapter_font = ImageFont.truetype(f"{font_base_path}/font/{bold_font}", 50)
self.__section_font = ImageFont.truetype(f"{font_base_path}/font/{bold_font}", 40)
self.__tip_font = ImageFont.truetype(f"{font_base_path}/font/{normal_font}", 25)
self.__text_font = ImageFont.truetype(f"{font_base_path}/font/{normal_font}", 30)
self.__xy = 0, 0
self.__ROW_SPACE = 25
def set_pos(self, x: int, y: int):
"""
设置绘图坐标
Args:
x: X 坐标
y: Y 坐标
"""
self.__xy = x, y
return self
def move_pos(self, x: int, y: int):
"""
移动绘图坐标
Args:
x: X 偏移量
y: Y 偏移量
"""
self.__xy = self.__xy[0] + x, self.__xy[1] + y
return self
def draw_rounded_rectangle(self,
x: int,
y: int,
width: int,
height: int,
radius: int,
color: Union[Color, Tuple[int, int, int]]):
"""
绘制一个圆角矩形此方法不会移动绘图坐标
Args:
x: 圆角矩形左上角的 x 坐标
y: 圆角矩形左上角的 y 坐标
width: 圆角矩形的宽度
height: 圆角矩形的高度
radius: 圆角矩形的圆角半径
color: 圆角矩形的背景颜色
"""
if isinstance(color, Color):
color = color.value
self.__draw.rounded_rectangle(((x, y), (x + width, y + height)), radius, color)
return self
def auto_size_img_by_limit(self,
img: Image.Image,
xy_limit: Tuple[int, int],
xy: Optional[Tuple[int, int]] = None) -> Image:
"""
将指定的图片限制为不可覆盖指定的点若其将要覆盖指定的点会自适应缩小图片至不会覆盖指定的点
Args:
img: 要限制的图片
xy_limit: 指定不可被覆盖的点
xy: 图片将要被绘制到的坐标默认自适应的当前绘图坐标
"""
if xy is None:
xy = self.__xy
return self.auto_size_img_by_limit_cls(img, xy_limit, xy)
@classmethod
def auto_size_img_by_limit_cls(cls, img: Image.Image, xy_limit: Tuple[int, int], xy: Tuple[int, int]) -> Image:
"""
将指定的图片限制为不可覆盖指定的点若其将要覆盖指定的点会自适应缩小图片至不会覆盖指定的点
Args:
img: 要限制的图片
xy_limit: 指定不可被覆盖的点
xy: 图片将要被绘制到的坐标
"""
cover_margin = config.get("PAINTER_AUTO_SIZE_BY_LIMIT_MARGIN")
xy_limit = xy_limit[0] - cover_margin, xy_limit[1] + cover_margin
x_cover = xy[0] + img.width - xy_limit[0]
if xy[1] >= xy_limit[1] or x_cover <= 0:
return img
width = img.width - x_cover
height = int(img.height * (width / img.width))
return img.resize((width, height))
def draw_img(self, img: Union[str, Image.Image], xy: Optional[Tuple[int, int]] = None):
"""
在当前绘图坐标绘制一张图片会自动移动绘图坐标至下次绘图适合位置
也可手动传入绘图坐标手动传入时不会移动绘图坐标
Args:
img: 图片路径或 Image 图片实例
xy: 绘图坐标默认自适应绘图坐标
"""
if isinstance(img, str):
img = Image.open(img)
if xy is None:
self.__canvas.paste(img, self.__xy)
self.move_pos(0, img.height + self.__ROW_SPACE)
else:
self.__canvas.paste(img, xy)
return self
def draw_img_alpha(self, img: Union[str, Image.Image], xy: Optional[Tuple[int, int]] = None):
"""
在当前绘图坐标绘制一张透明背景图片会自动移动绘图坐标至下次绘图适合位置
也可手动传入绘图坐标手动传入时不会移动绘图坐标
Args:
img: 透明背景图片路径或 Image 图片实例
xy: 绘图坐标默认自适应绘图坐标
"""
if isinstance(img, str):
img = Image.open(img)
if xy is None:
self.__canvas.paste(img, self.__xy, img)
self.move_pos(0, img.height + self.__ROW_SPACE)
else:
self.__canvas.paste(img, xy, img)
return self
def draw_img_with_border(self,
img: Union[str, Image.Image],
xy: Optional[Tuple[int, int]] = None,
color: Optional[Union[Color, Tuple[int, int, int]]] = Color.BLACK,
radius: int = 10,
width: int = 1):
"""
在当前绘图坐标绘制一张图片并附带圆角矩形边框会自动移动绘图坐标至下次绘图适合位置
也可手动传入绘图坐标手动传入时不会移动绘图坐标
Args:
img: 图片路径或 Image 图片实例
xy: 绘图坐标默认自适应绘图坐标
color: 边框颜色默认黑色 (0, 0, 0)
radius: 边框圆角半径默认20
width: 边框粗细默认1
"""
if isinstance(img, str):
img = Image.open(img)
if isinstance(color, Color):
color = color.value
if xy is None:
xy = self.__xy
self.draw_img(img)
else:
self.draw_img(img, xy)
border = Image.new("RGBA", (img.width + (width * 2), img.height + (width * 2)))
ImageDraw.Draw(border).rounded_rectangle((0, 0, img.width, img.height), radius, (0, 0, 0, 0), color, width)
self.draw_img_alpha(border, (xy[0] - width, xy[1] - width))
return self
def draw_chapter(self,
chapter: str,
color: Union[Color, Tuple[int, int, int]] = Color.BLACK,
xy: Optional[Tuple[int, int]] = None):
"""
在当前绘图坐标绘制一个章节标题会自动移动绘图坐标至下次绘图适合位置
也可手动传入绘图坐标手动传入时不会移动绘图坐标
Args:
chapter: 章节名称
color: 字体颜色默认黑色 (0, 0, 0)
xy: 绘图坐标默认自适应绘图坐标
"""
if isinstance(color, Color):
color = color.value
if xy is None:
self.__draw.text(self.__xy, chapter, color, self.__chapter_font)
self.move_pos(0, self.__chapter_font.size + self.__ROW_SPACE)
else:
self.__draw.text(xy, chapter, color, self.__chapter_font)
return self
def draw_section(self,
section: str,
color: Union[Color, Tuple[int, int, int]] = Color.BLACK,
xy: Optional[Tuple[int, int]] = None):
"""
在当前绘图坐标绘制一个小节标题会自动移动绘图坐标至下次绘图适合位置
也可手动传入绘图坐标手动传入时不会移动绘图坐标
Args:
section: 小节名称
color: 字体颜色默认黑色 (0, 0, 0)
xy: 绘图坐标默认自适应绘图坐标
"""
if isinstance(color, Color):
color = color.value
if xy is None:
self.__draw.text(self.__xy, section, color, self.__section_font)
self.move_pos(0, self.__section_font.size + self.__ROW_SPACE)
else:
self.__draw.text(xy, section, color, self.__section_font)
return self
def draw_tip(self,
tip: str,
color: Union[Color, Tuple[int, int, int]] = Color.GRAY,
xy: Optional[Tuple[int, int]] = None):
"""
在当前绘图坐标绘制一个提示会自动移动绘图坐标至下次绘图适合位置
也可手动传入绘图坐标手动传入时不会移动绘图坐标
Args:
tip: 提示内容
color: 字体颜色默认灰色 (169, 169, 169)
xy: 绘图坐标默认自适应绘图坐标
"""
if isinstance(color, Color):
color = color.value
if xy is None:
self.__draw.text(self.__xy, tip, color, self.__tip_font)
self.move_pos(0, self.__tip_font.size + self.__ROW_SPACE)
else:
self.__draw.text(xy, tip, color, self.__tip_font)
return self
def draw_text(self,
texts: Union[str, List[str]],
colors: Optional[Union[Color, Tuple[int, int, int], List[Union[Color, Tuple[int, int, int]]]]] = None,
xy: Optional[Tuple[int, int]] = None):
"""
在当前绘图坐标绘制一行文本会自动移动绘图坐标至下次绘图适合位置
也可手动传入绘图坐标手动传入时不会移动绘图坐标
传入文本列表和颜色列表可将一行文本绘制为不同颜色文本列表和颜色列表需一一对应
颜色列表少于文本列表时将使用默认黑色 (0, 0, 0)颜色列表多于文本列表时将舍弃多余颜色
Args:
texts: 文本内容
colors: 字体颜色默认黑色 (0, 0, 0)
xy: 绘图坐标默认自适应绘图坐标
"""
if colors is None:
colors = []
if isinstance(texts, str):
texts = [texts]
if isinstance(colors, (Color, tuple)):
colors = [colors]
for i in range(len(texts) - len(colors)):
colors.append(Color.BLACK)
for i in range(len(colors)):
if isinstance(colors[i], Color):
colors[i] = colors[i].value
if xy is None:
x = self.__xy[0]
for i in range(len(texts)):
self.__draw.text(self.__xy, texts[i], colors[i], self.__text_font)
self.move_pos(int(self.__draw.textlength(texts[i], self.__text_font)), 0)
self.move_pos(x - self.__xy[0], self.__text_font.size + self.__ROW_SPACE)
else:
for i in range(len(texts)):
self.__draw.text(xy, texts[i], colors[i], self.__text_font)
xy = xy[0] + self.__draw.textlength(texts[i], self.__text_font), xy[1]
return self
def draw_text_right(self,
margin_right: int,
texts: Union[str, List[str]],
colors: Optional[Union[Color, Tuple[int, int, int],
List[Union[Color, Tuple[int, int, int]]]]] = None,
xy_limit: Optional[Tuple[int, int]] = (0, 0),
xy: Optional[Tuple[int, int]] = None):
"""
在当前绘图坐标绘制一行右对齐文本会自动移动绘图坐标保证不会覆盖指定的点会自动移动绘图坐标至下次绘图适合位置
也可手动传入绘图坐标手动传入时不会移动绘图坐标
传入文本列表和颜色列表可将一行文本绘制为不同颜色文本列表和颜色列表需一一对应
颜色列表少于文本列表时将使用默认黑色 (0, 0, 0)颜色列表多于文本列表时将舍弃多余颜色
Args:
margin_right: 右边距
texts: 文本内容
colors: 字体颜色默认黑色 (0, 0, 0)
xy_limit: 指定不可被覆盖的点默认(0, 0)
xy: 绘图坐标默认自适应绘图坐标
"""
if xy is None:
xy = self.__xy
x = self.width - self.__draw.textlength("".join(texts), self.__text_font) - margin_right
cover_margin = config.get("PAINTER_AUTO_SIZE_BY_LIMIT_MARGIN")
xy_limit = xy_limit[0] - cover_margin, xy_limit[1] + cover_margin
y = max(xy[1], xy_limit[1])
self.draw_text(texts, colors, (x, y))
self.move_pos(0, self.__text_font.size + self.__ROW_SPACE)
return self
def show(self):
"""
显示图片
"""
self.__canvas.show()
return self
def save(self, path: str):
"""
保存图片
Args:
path: 保存路径
"""
self.__canvas.save(path)
return self
def base64(self) -> str:
"""
结束绘图获取 Base64 字符串
Returns:
Base64 字符串
"""
io = BytesIO()
self.__canvas.save(io, format="PNG")
return base64.b64encode(io.getvalue()).decode()
class LiveReportGenerator:
"""
直播报告生成器
"""
@classmethod
def generate(cls, param: Dict[str, Any], model: LiveReport) -> str:
"""
根据传入直播报告参数生成直播报告图片
Args:
param: 直播报告参数
model: 直播报告配置实例
Returns:
直播报告图片的 Base64 字符串
"""
width = 1000
height = cls.__calc_height(param, model)
top_blank = 75
generator = PicGenerator(width, height)
pic = (generator.set_pos(50, 125)
.draw_rounded_rectangle(0, top_blank, width, height - top_blank, 35, Color.WHITE))
# 标题
pic.draw_chapter("直播报告")
# 防止后续绘图覆盖主播立绘
logo_limit = (0, 0)
# 主播立绘
if model.logo:
logo = cls.__get_logo(model)
base_left = 650
logo_left = base_left + int((width - base_left - logo.width) / 2)
if logo_left < base_left:
logo_left = base_left
pic.draw_img_alpha(logo, (logo_left, 0))
logo_limit = (logo_left, logo.height)
# 主播信息
uname = param.get('uname', '')
room_id = param.get('room_id', 0)
pic.draw_tip(f"{uname} ({room_id})")
# 直播时长
if model.time:
start_time = param.get('start_time', '')
end_time = param.get('end_time', '')
hour = param.get('hour', 0)
minute = param.get('minute', 0)
second = param.get('second', 0)
live_time_str = ""
if hour != 0:
live_time_str += f"{hour}"
if minute != 0:
live_time_str += f"{minute}"
if second != 0:
live_time_str += f"{second}"
if live_time_str == "":
live_time_str = "0 秒"
pic.draw_tip(f"{start_time} ~ {end_time} ({live_time_str.strip()})")
# 基础数据
if model.fans_change or model.fans_medal_change or model.guard_change:
pic.draw_section("基础数据")
if model.fans_change:
fans_before = param.get('fans_before', 0)
fans_after = param.get('fans_after', 0)
if fans_before == -1:
fans_before = "?"
diff = 0
else:
diff = fans_after - fans_before
if diff > 0:
pic.draw_text([f"粉丝: {fans_before}{fans_after} ", f"(+{diff})"], [Color.BLACK, Color.RED])
elif diff < 0:
pic.draw_text([f"粉丝: {fans_before}{fans_after} ", f"({diff})"], [Color.BLACK, Color.GREEN])
else:
pic.draw_text([f"粉丝: {fans_before}{fans_after} ", f"(+0)"], [Color.BLACK, Color.GRAY])
if model.fans_medal_change:
medal_before = param.get('fans_medal_before', 0)
medal_after = param.get('fans_medal_after', 0)
if medal_before == -1:
medal_before = "?"
diff = 0
else:
diff = medal_after - medal_before
if diff > 0:
pic.draw_text([f"粉丝团: {medal_before}{medal_after} ", f"(+{diff})"], [Color.BLACK, Color.RED])
elif diff < 0:
pic.draw_text([f"粉丝团: {medal_before}{medal_after} ", f"({diff})"], [Color.BLACK, Color.GREEN])
else:
pic.draw_text([f"粉丝团: {medal_before}{medal_after} ", f"(+0)"], [Color.BLACK, Color.GRAY])
if model.guard_change:
guard_before = param.get('guard_before', 0)
guard_after = param.get('guard_after', 0)
if guard_before == -1:
guard_before = "?"
diff = 0
else:
diff = guard_after - guard_before
if diff > 0:
pic.draw_text([f"大航海: {guard_before}{guard_after} ", f"(+{diff})"], [Color.BLACK, Color.RED])
elif diff < 0:
pic.draw_text([f"大航海: {guard_before}{guard_after} ", f"({diff})"], [Color.BLACK, Color.GREEN])
else:
pic.draw_text([f"大航海: {guard_before}{guard_after} ", f"(+0)"], [Color.BLACK, Color.GRAY])
# 直播数据
if model.danmu or model.box or model.gift or model.sc or model.guard:
(danmu_count, danmu_person_count,
box_count, box_person_count, box_profit, box_beat_percent,
gift_profit, gift_person_count,
sc_profit, sc_person_count,
captain_count, commander_count, governor_count) = cls.__get_live_params(param)
if any([danmu_count > 0, box_count > 0, gift_profit > 0, sc_profit > 0,
captain_count > 0, commander_count > 0, governor_count > 0]):
pic.draw_section("直播数据")
if model.danmu and danmu_count > 0:
pic.draw_text(f"弹幕: {danmu_count} 条 ({danmu_person_count} 人)")
if model.box and box_count > 0:
pic.draw_text(f"盲盒: {box_count} 个 ({box_person_count} 人)")
if box_profit > 0:
pic.draw_text(["盲盒赚了: ", f"{box_profit}", " 元 (击败了 ", f"{box_beat_percent}% ", "的直播间)"],
[Color.BLACK, Color.RED, Color.BLACK, Color.RED, Color.BLACK])
elif box_profit < 0:
pic.draw_text(["盲盒亏了: ", f"{abs(box_profit)}", " 元 (击败了 ", f"{box_beat_percent}% ", "的直播间)"],
[Color.BLACK, Color.GREEN, Color.BLACK, Color.GREEN, Color.BLACK])
else:
pic.draw_text(["盲盒不赚不亏 (击败了 ", f"{box_beat_percent}% ", "的直播间)"],
[Color.BLACK, Color.GRAY, Color.BLACK])
if model.gift and gift_profit > 0:
pic.draw_text(f"礼物: {gift_profit} 元 ({gift_person_count} 人)")
if model.sc and sc_profit > 0:
pic.draw_text(f"SC (醒目留言): {sc_profit} 元 ({sc_person_count} 人)")
if model.guard and any([captain_count > 0, commander_count > 0, governor_count > 0]):
texts = ["大航海: "]
colors = [Color.BLACK]
if captain_count > 0:
texts.append(f"舰长 × {captain_count} ")
colors.append(Color.DEEPSKYBLUE)
if commander_count > 0:
texts.append(f"提督 × {commander_count} ")
colors.append(Color.FUCHSIA)
if governor_count > 0:
texts.append(f"总督 × {governor_count} ")
colors.append(Color.CRIMSON)
pic.draw_text(texts, colors)
# 弹幕词云
if model.danmu_cloud:
base64_str = param.get('danmu_cloud', "")
if base64_str != "":
pic.draw_section("弹幕词云")
img_bytes = BytesIO(base64.b64decode(base64_str))
img = pic.auto_size_img_by_limit(Image.open(img_bytes), logo_limit)
pic.draw_img_with_border(img)
# 底部信息
pic.draw_text_right(50, "Designed By StarBot", Color.GRAY, logo_limit)
pic.draw_text_right(50, "https://github.com/Starlwr/StarBot", Color.LINK, logo_limit)
return pic.base64()
@classmethod
def __calc_height(cls, param: Dict[str, Any], model: LiveReport) -> int:
"""
根据传入直播报告参数计算直播报告图片高度
Args:
param: 直播报告参数
model: 直播报告配置实例
Returns:
直播报告图片高度
"""
x = 50
y = 0
width = 1000
height = 0
chapter_font_size = 50
section_font_size = 40
tip_font_size = 25
text_font_size = 30
text_row_space = 25
# 图片上边距
top_blank = 75
top_base_margin = 50
y += (top_blank + top_base_margin)
height = max(height, y)
# 标题
y += (chapter_font_size + text_row_space)
height = max(height, y)
# 主播立绘
logo_limit = (0, 0)
if model.logo:
logo = cls.__get_logo(model)
height = max(height, logo.height)
base_left = 650
logo_left = base_left + int((width - base_left - logo.width) / 2)
if logo_left < base_left:
logo_left = base_left
logo_limit = (logo_left, logo.height)
# 主播信息
y += (tip_font_size + text_row_space)
height = max(height, y)
# 直播时长
if model.time:
y += (tip_font_size + text_row_space)
height = max(height, y)
# 基础数据
if model.fans_change or model.fans_medal_change or model.guard_change:
y += (section_font_size + text_row_space)
height = max(height, y)
if model.fans_change:
y += (text_font_size + text_row_space)
height = max(height, y)
if model.fans_medal_change:
y += (text_font_size + text_row_space)
height = max(height, y)
if model.guard_change:
y += (text_font_size + text_row_space)
height = max(height, y)
# 直播数据
if model.danmu or model.box or model.gift or model.sc or model.guard:
(danmu_count, danmu_person_count,
box_count, box_person_count, box_profit, box_beat_percent,
gift_profit, gift_person_count,
sc_profit, sc_person_count,
captain_count, commander_count, governor_count) = cls.__get_live_params(param)
if any([danmu_count > 0, box_count > 0, gift_profit > 0, sc_profit > 0,
captain_count > 0, commander_count > 0, governor_count > 0]):
y += (section_font_size + text_row_space)
height = max(height, y)
if model.danmu and danmu_count > 0:
y += (text_font_size + text_row_space)
height = max(height, y)
if model.box and box_count > 0:
y += (text_font_size + text_row_space) * 2
height = max(height, y)
if model.gift and gift_profit > 0:
y += (text_font_size + text_row_space)
height = max(height, y)
if model.sc and sc_profit > 0:
y += (text_font_size + text_row_space)
height = max(height, y)
if model.guard and any([captain_count > 0, commander_count > 0, governor_count > 0]):
y += (text_font_size + text_row_space)
height = max(height, y)
# 弹幕词云
if model.danmu_cloud:
base64_str = param.get('danmu_cloud', "")
if base64_str != "":
y += (section_font_size + text_row_space)
height = max(height, y)
img_bytes = BytesIO(base64.b64decode(base64_str))
img = PicGenerator.auto_size_img_by_limit_cls(Image.open(img_bytes), logo_limit, (x, y))
y += (img.height + text_row_space)
height = max(height, y)
# 底部信息
height += (text_font_size + text_row_space) * 2
return height
@classmethod
def __get_logo(cls, model: LiveReport) -> Image:
"""
从直播报告实例中读取主播立绘图片
Args:
model: 直播报告配置实例
Returns:
主播立绘图片
"""
logo_bytes = BytesIO(base64.b64decode(model.logo))
logo = Image.open(logo_bytes)
logo = logo.crop(logo.getbbox())
logo_height = 800
logo_width = int(logo.width * (logo_height / logo.height))
logo = logo.resize((logo_width, logo_height))
return logo
@classmethod
def __get_live_params(cls, param: Dict[str, Any]) -> Tuple:
"""
从传入直播报告参数中取出直播相关参数
Args:
param: 直播报告参数
Returns:
直播相关参数
"""
# 弹幕相关
danmu_count = param.get('danmu_count', 0)
danmu_person_count = param.get('danmu_person_count', 0)
# 盲盒相关
box_count = param.get('box_count', 0)
box_person_count = param.get('box_person_count', 0)
box_profit = param.get('box_profit', 0.0)
box_beat_percent = param.get('box_beat_percent', 0.0)
# 礼物相关
gift_profit = param.get('gift_profit', 0.0)
gift_person_count = param.get('gift_person_count', 0)
# SC醒目留言相关
sc_profit = param.get('sc_profit', 0)
sc_person_count = param.get('sc_person_count', 0)
# 大航海相关
captain_count = param.get('captain_count', 0)
commander_count = param.get('commander_count', 0)
governor_count = param.get('governor_count', 0)
return (danmu_count, danmu_person_count,
box_count, box_person_count, box_profit, box_beat_percent,
gift_profit, gift_person_count,
sc_profit, sc_person_count,
captain_count, commander_count, governor_count)

View File

@ -46,6 +46,22 @@ SIMPLE_CONFIG = {
# 视为主播网络波动断线重连时,需发送的额外提示消息
"UP_DISCONNECT_CONNECT_MESSAGE": "检测到下播后短时间内重新开播,可能是由于主播网络波动引起,本次开播不再重复通知",
# 绘图器普通字体路径,如需自定义,请将字体放入 font 文件夹中后,修改配置中的 normal.ttf 为您的字体文件名
"PAINTER_NORMAL_FONT": "normal.ttf",
# 绘图器粗体字体路径,如需自定义,请将字体放入 font 文件夹中后,修改配置中的 bold.ttf 为您的字体文件名
"PAINTER_BOLD_FONT": "bold.ttf",
# 绘图器自适应不覆盖已绘制图形的间距,单位:像素
"PAINTER_AUTO_SIZE_BY_LIMIT_MARGIN": 10,
# 弹幕词云字体路径,如需自定义,请将字体放入 font 文件夹中后,修改配置中的 normal.ttf 为您的字体文件名
"DANMU_CLOUD_FONT": "normal.ttf",
# 弹幕词云图片背景色
"DANMU_CLOUD_BACKGROUND_COLOR": "white",
# 弹幕词云最大字号
"DANMU_CLOUD_MAX_FONT_SIZE": 300,
# 弹幕词云最多词数
"DANMU_CLOUD_MAX_WORDS": 100,
# Bot 主人 QQ用于接收部分 Bot 异常通知等
"MASTER_QQ": None,
@ -119,6 +135,22 @@ FULL_CONFIG = {
# 视为主播网络波动断线重连时,需发送的额外提示消息
"UP_DISCONNECT_CONNECT_MESSAGE": "检测到下播后短时间内重新开播,可能是由于主播网络波动引起,本次开播不再重复通知",
# 绘图器普通字体路径,如需自定义,请将字体放入 font 文件夹中后,修改配置中的 normal.ttf 为您的字体文件名
"PAINTER_NORMAL_FONT": "normal.ttf",
# 绘图器粗体字体路径,如需自定义,请将字体放入 font 文件夹中后,修改配置中的 bold.ttf 为您的字体文件名
"PAINTER_BOLD_FONT": "bold.ttf",
# 绘图器自适应不覆盖已绘制图形的间距,单位:像素
"PAINTER_AUTO_SIZE_BY_LIMIT_MARGIN": 10,
# 弹幕词云字体路径,如需自定义,请将字体放入 font 文件夹中后,修改配置中的 normal.ttf 为您的字体文件名
"DANMU_CLOUD_FONT": "normal.ttf",
# 弹幕词云图片背景色
"DANMU_CLOUD_BACKGROUND_COLOR": "white",
# 弹幕词云最大字号
"DANMU_CLOUD_MAX_FONT_SIZE": 200,
# 弹幕词云最多词数
"DANMU_CLOUD_MAX_WORDS": 80,
# Bot 主人 QQ用于接收 Bot 异常通知等
"MASTER_QQ": None,

View File

@ -4,6 +4,7 @@
import json
import os
import time
from typing import Dict
from . import config
@ -37,3 +38,16 @@ def get_credential() -> Credential:
bili_jct = config.get("BILI_JCT")
buvid3 = config.get("BUVID3")
return Credential(sessdata, bili_jct, buvid3)
def timestamp_format(timestamp: int) -> str:
"""
时间戳格式化为形如 11/04 00:00:00 的字符串形式
Args:
timestamp: 时间戳
Returns:
格式化后的字符串
"""
return time.strftime("%m/%d %H:%M:%S", time.localtime(timestamp))