refactor: Refactor redis util

This commit is contained in:
LWR 2023-01-31 19:20:37 +08:00
parent 67e775363c
commit 676bc9debf
3 changed files with 605 additions and 161 deletions

View File

@ -81,11 +81,11 @@ class StarBot:
f"房间号: <cyan>{up.room_id}</>) 的直播间状态: "
f"{'<green>直播中</>' if status == 1 else '<red>未开播</>'}")
if status == 1 and start_time != await redis.hgeti("StartTime", up.room_id):
if status == 1 and start_time != await redis.get_live_start_time(up.room_id):
await up.accumulate_and_reset_data()
await redis.hset("LiveStatus", up.room_id, status)
await redis.hset("StartTime", up.room_id, start_time)
await redis.set_live_status(up.room_id, status)
await redis.set_live_start_time(up.room_id, start_time)
# 连接直播间
for up in self.__datasource.get_up_list():

View File

@ -70,8 +70,8 @@ class Up(BaseModel):
self.__room.dispatch(name, data)
async def accumulate_and_reset_data(self):
await self.__accumulate_data()
await self.__reset_data()
await redis.accumulate_data(self.room_id)
await redis.reset_data(self.room_id)
def is_connecting(self):
return (self.__room is not None) and (self.__room.get_status() != 2)
@ -131,11 +131,11 @@ class Up(BaseModel):
logger.success(f"已重新连接到 {self.uname} 的直播间 {self.room_id}")
room_info = await self.__live_room.get_room_play_info()
last_status = await redis.hgeti("LiveStatus", self.room_id)
last_status = await redis.get_live_status(self.room_id)
now_status = room_info["live_status"]
if now_status != last_status:
await redis.hset("LiveStatus", self.room_id, now_status)
await redis.set_live_status(self.room_id, now_status)
if now_status == 1:
logger.warning(f"直播间 {self.room_id} 断线期间开播")
param = {
@ -165,11 +165,11 @@ class Up(BaseModel):
room_info = await self.__live_room.get_room_info()
self.uname = room_info["anchor_info"]["base_info"]["uname"]
await redis.hset("LiveStatus", self.room_id, 1)
await redis.set_live_status(self.room_id, 1)
# 是否为主播网络波动断线重连
now = int(time.time())
last = await redis.hgeti("EndTime", self.room_id)
last = await redis.get_live_end_time(self.room_id)
is_reconnect = (now - last) <= config.get("UP_DISCONNECT_CONNECT_INTERVAL")
if is_reconnect:
logger.opt(colors=True).info(f"<magenta>[断线重连] {self.uname} ({self.room_id})</>")
@ -186,10 +186,10 @@ class Up(BaseModel):
else:
fans_medal_count = room_info["anchor_info"]["medal_info"]["fansclub"]
guard_count = room_info["guard_info"]["count"]
await redis.hset("StartTime", self.room_id, live_start_time)
await redis.hset(f"FansCount:{self.room_id}", live_start_time, fans_count)
await redis.hset(f"FansMedalCount:{self.room_id}", live_start_time, fans_medal_count)
await redis.hset(f"GuardCount:{self.room_id}", live_start_time, guard_count)
await redis.set_live_start_time(self.room_id, live_start_time)
await redis.set_fans_count(self.room_id, live_start_time, fans_count)
await redis.set_fans_medal_count(self.room_id, live_start_time, fans_medal_count)
await redis.set_guard_count(self.room_id, live_start_time, guard_count)
await self.accumulate_and_reset_data()
@ -210,8 +210,8 @@ class Up(BaseModel):
"""
logger.debug(f"{self.uname} (PREPARING): {event}")
await redis.hset("LiveStatus", self.room_id, 0)
await redis.hset("EndTime", self.room_id, int(time.time()))
await redis.set_live_status(self.room_id, 0)
await redis.set_live_end_time(self.room_id, int(time.time()))
logger.opt(colors=True).info(f"<magenta>[下播] {self.uname} ({self.room_id})</>")
@ -239,13 +239,13 @@ class Up(BaseModel):
content = base[1]
# 弹幕统计
await redis.hincrby("RoomDanmuCount", self.room_id)
await redis.zincrby(f"UserDanmuCount:{self.room_id}", uid)
await redis.incr_room_danmu_count(self.room_id)
await redis.incr_user_danmu_count(self.room_id, uid)
# 弹幕词云所需弹幕记录
if isinstance(base[0][13], str):
await redis.rpush(f"RoomDanmu:{self.room_id}", content)
await redis.hincrby(f"RoomDanmuTime:{self.room_id}", int(time.time()))
await redis.add_room_danmu(self.room_id, content)
await redis.incr_room_danmu_time(self.room_id, int(time.time()))
gift_items = [
"box", "gift", "box_ranking", "box_profit_ranking", "gift_ranking",
@ -266,10 +266,10 @@ class Up(BaseModel):
# 礼物统计
if base["total_coin"] != 0 and base["discount_price"] != 0:
await redis.hincrbyfloat("RoomGiftProfit", self.room_id, price)
await redis.zincrby(f"UserGiftProfit:{self.room_id}", uid, price)
await redis.incr_room_gift_profit(self.room_id, price)
await redis.incr_user_gift_profit(self.room_id, uid, price)
await redis.hincrbyfloat(f"RoomGiftTime:{self.room_id}", int(time.time()), price)
await redis.incr_room_gift_time(self.room_id, int(time.time()), price)
# 盲盒统计
if base["blind_gift"] is not None:
@ -278,13 +278,13 @@ class Up(BaseModel):
gift_price = base["discount_price"] / 1000
profit = float("{:.1f}".format((gift_price * gift_num) - box_price))
await redis.hincrby("RoomBoxCount", self.room_id, gift_num)
await redis.zincrby(f"UserBoxCount:{self.room_id}", uid, gift_num)
box_profit_after = await redis.hincrbyfloat("RoomBoxProfit", self.room_id, profit)
await redis.zincrby(f"UserBoxProfit:{self.room_id}", uid, profit)
await redis.incr_room_box_count(self.room_id, gift_num)
await redis.incr_user_box_count(self.room_id, uid, gift_num)
box_profit_after = await redis.incr_room_box_profit(self.room_id, profit)
await redis.incr_user_box_profit(self.room_id, uid, profit)
await redis.rpush(f"RoomBoxProfitRecord:{self.room_id}", box_profit_after)
await redis.hincrby(f"RoomBoxTime:{self.room_id}", int(time.time()))
await redis.add_room_box_profit_record(self.room_id, box_profit_after)
await redis.incr_room_box_time(self.room_id, int(time.time()))
sc_items = ["sc", "sc_ranking", "sc_diagram"]
if not config.get("ONLY_HANDLE_NECESSARY_EVENT") or self.__any_live_report_item_enabled(sc_items):
@ -300,10 +300,10 @@ class Up(BaseModel):
price = base["price"]
# SC 统计
await redis.hincrby("RoomScProfit", self.room_id, price)
await redis.zincrby(f"UserScProfit:{self.room_id}", uid, price)
await redis.incr_room_sc_profit(self.room_id, price)
await redis.incr_user_sc_profit(self.room_id, uid, price)
await redis.hincrby(f"RoomScTime:{self.room_id}", int(time.time()), price)
await redis.incr_room_sc_time(self.room_id, int(time.time()), price)
guard_items = ["guard", "guard_list", "guard_diagram"]
if not config.get("ONLY_HANDLE_NECESSARY_EVENT") or self.__any_live_report_item_enabled(guard_items):
@ -325,10 +325,10 @@ class Up(BaseModel):
"提督": "Commander",
"总督": "Governor"
}
await redis.hincrby(f"Room{type_mapping[guard_type]}Count", self.room_id, month)
await redis.zincrby(f"User{type_mapping[guard_type]}Count:{self.room_id}", uid, month)
await redis.incr_room_guard_count(type_mapping[guard_type], self.room_id, month)
await redis.incr_user_guard_count(type_mapping[guard_type], self.room_id, uid, month)
await redis.hincrby(f"RoomGuardTime:{self.room_id}", int(time.time()), month)
await redis.incr_room_guard_time(self.room_id, int(time.time()), month)
if self.__any_dynamic_update_enabled():
@self.__room.on("DYNAMIC_UPDATE")
@ -372,93 +372,6 @@ class Up(BaseModel):
}
self.__bot.send_dynamic_update(self, dynamic_update_args)
async def __accumulate_data(self):
"""
累计直播间数据
"""
# 累计弹幕数
await redis.hincrby("RoomDanmuTotal", self.room_id, await redis.hgeti("RoomDanmuCount", self.room_id))
await redis.zunionstore(f"UserDanmuTotal:{self.room_id}", f"UserDanmuCount:{self.room_id}")
# 累计盲盒数
await redis.hincrby("RoomBoxTotal", self.room_id, await redis.hgeti("RoomBoxCount", self.room_id))
await redis.zunionstore(f"UserBoxTotal:{self.room_id}", f"UserBoxCount:{self.room_id}")
# 累计盲盒盈亏
await redis.hincrbyfloat("RoomBoxProfitTotal", self.room_id, await redis.hgetf1("RoomBoxProfit", self.room_id))
await redis.zunionstore(f"UserBoxProfitTotal:{self.room_id}", f"UserBoxProfit:{self.room_id}")
# 累计礼物收益
await redis.hincrbyfloat("RoomGiftTotal", self.room_id, await redis.hgetf1("RoomGiftProfit", self.room_id))
await redis.zunionstore(f"UserGiftTotal:{self.room_id}", f"UserGiftProfit:{self.room_id}")
# 累计 SC 收益
await redis.hincrby("RoomScTotal", self.room_id, await redis.hgeti("RoomScProfit", self.room_id))
await redis.zunionstore(f"UserScTotal:{self.room_id}", f"UserScProfit:{self.room_id}")
# 累计舰长数
await redis.hincrby("RoomCaptainTotal", self.room_id, await redis.hgeti("RoomCaptainCount", self.room_id))
await redis.zunionstore(f"UserCaptainTotal:{self.room_id}", f"UserCaptainCount:{self.room_id}")
# 累计提督数
await redis.hincrby("RoomCommanderTotal", self.room_id, await redis.hgeti("RoomCommanderCount", self.room_id))
await redis.zunionstore(f"UserCommanderTotal:{self.room_id}", f"UserCommanderCount:{self.room_id}")
# 累计总督数
await redis.hincrby("RoomGovernorTotal", self.room_id, await redis.hgeti("RoomGovernorCount", self.room_id))
await redis.zunionstore(f"UserGovernorTotal:{self.room_id}", f"UserGovernorCount:{self.room_id}")
async def __reset_data(self):
"""
重置直播间数据
"""
# 清空弹幕记录
await redis.delete(f"RoomDanmu:{self.room_id}")
# 清空数据分布
await redis.delete(f"RoomDanmuTime:{self.room_id}")
await redis.delete(f"RoomBoxTime:{self.room_id}")
await redis.delete(f"RoomGiftTime:{self.room_id}")
await redis.delete(f"RoomScTime:{self.room_id}")
await redis.delete(f"RoomGuardTime:{self.room_id}")
# 重置弹幕数
await redis.hset(f"RoomDanmuCount", self.room_id, 0)
await redis.delete(f"UserDanmuCount:{self.room_id}")
# 重置盲盒数
await redis.hset(f"RoomBoxCount", self.room_id, 0)
await redis.delete(f"UserBoxCount:{self.room_id}")
# 重置盲盒盈亏
await redis.hset(f"RoomBoxProfit", self.room_id, 0)
await redis.delete(f"UserBoxProfit:{self.room_id}")
# 清空盲盒盈亏记录
await redis.delete(f"RoomBoxProfitRecord:{self.room_id}")
# 重置礼物收益
await redis.hset(f"RoomGiftProfit", self.room_id, 0)
await redis.delete(f"UserGiftProfit:{self.room_id}")
# 重置 SC 收益
await redis.hset(f"RoomScProfit", self.room_id, 0)
await redis.delete(f"UserScProfit:{self.room_id}")
# 重置舰长数
await redis.hset(f"RoomCaptainCount", self.room_id, 0)
await redis.delete(f"UserCaptainCount:{self.room_id}")
# 重置提督数
await redis.hset(f"RoomCommanderCount", self.room_id, 0)
await redis.delete(f"UserCommanderCount:{self.room_id}")
# 重置总督数
await redis.hset(f"RoomGovernorCount", self.room_id, 0)
await redis.delete(f"UserGovernorCount:{self.room_id}")
async def __generate_live_report_param(self):
"""
计算直播报告所需数据
@ -472,8 +385,8 @@ class Up(BaseModel):
})
# 直播时间段和直播时长
start_time = await redis.hgeti("StartTime", self.room_id)
end_time = await redis.hgeti("EndTime", self.room_id)
start_time = await redis.get_live_start_time(self.room_id)
end_time = await redis.get_live_end_time(self.room_id)
seconds = end_time - start_time
minute, second = divmod(seconds, 60)
hour, minute = divmod(minute, 60)
@ -492,16 +405,16 @@ class Up(BaseModel):
if self.__any_live_report_item_enabled(["fans_change", "fans_medal_change", "guard_change"]):
room_info = await self.__live_room.get_room_info()
if await redis.hexists(f"FansCount:{self.room_id}", start_time):
fans_count = await redis.hgeti(f"FansCount:{self.room_id}", start_time)
if await redis.exists_fans_count(self.room_id, start_time):
fans_count = await redis.get_fans_count(self.room_id, start_time)
else:
fans_count = -1
if await redis.hexists(f"FansMedalCount:{self.room_id}", start_time):
fans_medal_count = await redis.hgeti(f"FansMedalCount:{self.room_id}", start_time)
if await redis.exists_fans_medal_count(self.room_id, start_time):
fans_medal_count = await redis.get_fans_medal_count(self.room_id, start_time)
else:
fans_medal_count = -1
if await redis.hexists(f"GuardCount:{self.room_id}", start_time):
guard_count = await redis.hgeti(f"GuardCount:{self.room_id}", start_time)
if await redis.exists_guard_count(self.room_id, start_time):
guard_count = await redis.get_guard_count(self.room_id, start_time)
else:
guard_count = -1
@ -523,43 +436,43 @@ class Up(BaseModel):
})
# 直播数据
box_profit = await redis.hgetf1("RoomBoxProfit", self.room_id)
count = await redis.zcard("BoxProfit")
await redis.zadd("BoxProfit", f"{start_time}-{self.uid}-{self.uname}", box_profit)
rank = await redis.zrank("BoxProfit", f"{start_time}-{self.uid}-{self.uname}")
box_profit = await redis.get_room_box_profit(self.room_id)
count = await redis.len_box_profit_record()
await redis.add_box_profit_record(start_time, self.uid, self.uname, box_profit)
rank = await redis.rank_box_profit_record(start_time, self.uid, self.uname)
percent = float("{:.2f}".format(float("{:.4f}".format(rank / count)) * 100)) if count != 0 else 100
live_report_param.update({
# 弹幕相关
"danmu_count": await redis.hgeti("RoomDanmuCount", self.room_id),
"danmu_person_count": await redis.zcard(f"UserDanmuCount:{self.room_id}"),
"danmu_diagram": await redis.hgetalltuplei(f"RoomDanmuTime:{self.room_id}"),
"danmu_count": await redis.get_room_danmu_count(self.room_id),
"danmu_person_count": await redis.len_user_danmu_count(self.room_id),
"danmu_diagram": await redis.get_room_danmu_time(self.room_id),
# 盲盒相关
"box_count": await redis.hgeti("RoomBoxCount", self.room_id),
"box_person_count": await redis.zcard(f"UserBoxCount:{self.room_id}"),
"box_count": await redis.get_room_box_count(self.room_id),
"box_person_count": await redis.len_user_box_count(self.room_id),
"box_profit": box_profit,
"box_beat_percent": percent,
"box_profit_diagram": await redis.lrangef1(f"RoomBoxProfitRecord:{self.room_id}", 0, -1),
"box_diagram": await redis.hgetalltuplei(f"RoomBoxTime:{self.room_id}"),
"box_profit_diagram": await redis.get_room_box_profit_record(self.room_id),
"box_diagram": await redis.get_room_box_time(self.room_id),
# 礼物相关
"gift_profit": await redis.hgetf1("RoomGiftProfit", self.room_id),
"gift_person_count": await redis.zcard(f"UserGiftProfit:{self.room_id}"),
"gift_diagram": await redis.hgetalltuplef1(f"RoomGiftTime:{self.room_id}"),
"gift_profit": await redis.get_room_gift_profit(self.room_id),
"gift_person_count": await redis.len_user_gift_profit(self.room_id),
"gift_diagram": await redis.get_room_gift_time(self.room_id),
# SC醒目留言相关
"sc_profit": await redis.hgeti("RoomScProfit", self.room_id),
"sc_person_count": await redis.zcard(f"UserScProfit:{self.room_id}"),
"sc_diagram": await redis.hgetalltuplei(f"RoomScTime:{self.room_id}"),
"sc_profit": await redis.get_room_sc_profit(self.room_id),
"sc_person_count": await redis.len_user_sc_profit(self.room_id),
"sc_diagram": await redis.get_room_sc_time(self.room_id),
# 大航海相关
"captain_count": await redis.hgeti("RoomCaptainCount", self.room_id),
"commander_count": await redis.hgeti("RoomCommanderCount", self.room_id),
"governor_count": await redis.hgeti("RoomGovernorCount", self.room_id),
"guard_diagram": await redis.hgetalltuplei(f"RoomGuardTime:{self.room_id}")
"captain_count": await redis.get_room_captain_count(self.room_id),
"commander_count": await redis.get_room_commander_count(self.room_id),
"governor_count": await redis.get_room_governor_count(self.room_id),
"guard_diagram": await redis.get_room_guard_time(self.room_id)
})
# 弹幕排行
if self.__any_live_report_item_enabled("danmu_ranking"):
ranking_count = max(map(lambda t: t.live_report.danmu_ranking, self.targets))
danmu_ranking = await redis.zrevrangewithscoresi(f"UserDanmuCount:{self.room_id}", 0, ranking_count - 1)
danmu_ranking = await redis.get_user_danmu_count(self.room_id, 0, ranking_count - 1)
if danmu_ranking:
uids = [x[0] for x in danmu_ranking]
@ -575,7 +488,7 @@ class Up(BaseModel):
# 盲盒数量排行
if self.__any_live_report_item_enabled("box_ranking"):
ranking_count = max(map(lambda t: t.live_report.box_ranking, self.targets))
box_ranking = await redis.zrevrangewithscoresi(f"UserBoxCount:{self.room_id}", 0, ranking_count - 1)
box_ranking = await redis.get_user_box_count(self.room_id, 0, ranking_count - 1)
if box_ranking:
uids = [x[0] for x in box_ranking]
@ -591,9 +504,7 @@ class Up(BaseModel):
# 盲盒盈亏排行
if self.__any_live_report_item_enabled("box_profit_ranking"):
ranking_count = max(map(lambda t: t.live_report.box_profit_ranking, self.targets))
box_profit_ranking = await redis.zrevrangewithscoresf1(
f"UserBoxProfit:{self.room_id}", 0, ranking_count - 1
)
box_profit_ranking = await redis.get_user_box_profit(self.room_id, 0, ranking_count - 1)
if box_profit_ranking:
uids = [x[0] for x in box_profit_ranking]
@ -609,7 +520,7 @@ class Up(BaseModel):
# 礼物排行
if self.__any_live_report_item_enabled("gift_ranking"):
ranking_count = max(map(lambda t: t.live_report.gift_ranking, self.targets))
gift_ranking = await redis.zrevrangewithscoresf1(f"UserGiftProfit:{self.room_id}", 0, ranking_count - 1)
gift_ranking = await redis.get_user_gift_profit(self.room_id, 0, ranking_count - 1)
if gift_ranking:
uids = [x[0] for x in gift_ranking]
@ -625,7 +536,7 @@ class Up(BaseModel):
# SC醒目留言排行
if self.__any_live_report_item_enabled("sc_ranking"):
ranking_count = max(map(lambda t: t.live_report.sc_ranking, self.targets))
sc_ranking = await redis.zrevrangewithscoresi(f"UserScProfit:{self.room_id}", 0, ranking_count - 1)
sc_ranking = await redis.get_user_sc_profit(self.room_id, 0, ranking_count - 1)
if sc_ranking:
uids = [x[0] for x in sc_ranking]
@ -640,9 +551,9 @@ class Up(BaseModel):
# 开通大航海观众列表
if self.__any_live_report_item_enabled("guard_list"):
captains = await redis.zrevrangewithscoresi(f"UserCaptainCount:{self.room_id}", 0, -1)
commanders = await redis.zrevrangewithscoresi(f"UserCommanderCount:{self.room_id}", 0, -1)
governors = await redis.zrevrangewithscoresi(f"UserGovernorCount:{self.room_id}", 0, -1)
captains = await redis.get_user_captain_count(self.room_id)
commanders = await redis.get_user_commander_count(self.room_id)
governors = await redis.get_user_governor_count(self.room_id)
if captains:
uids = [x[0] for x in captains]
@ -676,7 +587,7 @@ class Up(BaseModel):
# 弹幕词云
if self.__any_live_report_item_enabled("danmu_cloud"):
all_danmu = await redis.lrange(f"RoomDanmu:{self.room_id}", 0, -1)
all_danmu = await redis.get_room_danmu(self.room_id)
live_report_param.update({
"all_danmu": all_danmu

View File

@ -127,3 +127,536 @@ async def zunionstore(dest: str, source: Union[str, List[str]]):
await __redis.zunionstore(dest, [dest, source])
if isinstance(source, list):
await __redis.zunionstore(dest, source)
# StarBot
# 直播间状态0未开播1正在直播2轮播
async def get_live_status(room_id: int) -> int:
return await hgeti("LiveStatus", room_id)
async def set_live_status(room_id: int, status: int):
await hset("LiveStatus", room_id, status)
# 直播开始时间
async def get_live_start_time(room_id: int) -> int:
return await hgeti("StartTime", room_id)
async def set_live_start_time(room_id: int, start_time: int):
await hset("StartTime", room_id, start_time)
# 直播结束时间
async def get_live_end_time(room_id: int) -> int:
return await hgeti("EndTime", room_id)
async def set_live_end_time(room_id: int, end_time: int):
await hset("EndTime", room_id, end_time)
# 粉丝数
async def exists_fans_count(room_id: int, start_time: int) -> bool:
return await hexists(f"FansCount:{room_id}", start_time)
async def get_fans_count(room_id: int, start_time: int) -> int:
return await hgeti(f"FansCount:{room_id}", start_time)
async def set_fans_count(room_id: int, start_time: int, fans_count: int):
await hset(f"FansCount:{room_id}", start_time, fans_count)
# 粉丝团人数
async def exists_fans_medal_count(room_id: int, start_time: int) -> bool:
return await hexists(f"FansMedalCount:{room_id}", start_time)
async def get_fans_medal_count(room_id: int, start_time: int) -> int:
return await hgeti(f"FansMedalCount:{room_id}", start_time)
async def set_fans_medal_count(room_id: int, start_time: int, fans_medal_count: int):
await hset(f"FansMedalCount:{room_id}", start_time, fans_medal_count)
# 大航海人数
async def exists_guard_count(room_id: int, start_time: int) -> bool:
return await hexists(f"GuardCount:{room_id}", start_time)
async def get_guard_count(room_id: int, start_time: int) -> int:
return await hgeti(f"GuardCount:{room_id}", start_time)
async def set_guard_count(room_id: int, start_time: int, guard_count: int):
await hset(f"GuardCount:{room_id}", start_time, guard_count)
# 房间弹幕数量
async def get_room_danmu_count(room_id: int) -> int:
return await hgeti("RoomDanmuCount", room_id)
async def incr_room_danmu_count(room_id: int) -> int:
return await hincrby("RoomDanmuCount", room_id)
async def reset_room_danmu_count(room_id: int):
await hset("RoomDanmuCount", room_id, 0)
# 房间累计弹幕数量
async def accumulate_room_danmu_total(room_id: int) -> int:
return await hincrby("RoomDanmuTotal", room_id, await get_room_danmu_count(room_id))
# 用户弹幕数量
async def len_user_danmu_count(room_id: int) -> int:
return await zcard(f"UserDanmuCount:{room_id}")
async def get_user_danmu_count(room_id: int, start: int = 0, end: int = -1) -> List[Tuple[str, int]]:
return await zrevrangewithscoresi(f"UserDanmuCount:{room_id}", start, end)
async def incr_user_danmu_count(room_id: int, uid: int) -> float:
return await zincrby(f"UserDanmuCount:{room_id}", uid)
async def delete_user_danmu_count(room_id: int):
await delete(f"UserDanmuCount:{room_id}")
# 用户累计弹幕数量
async def accumulate_user_danmu_total(room_id: int):
await zunionstore(f"UserDanmuTotal:{room_id}", f"UserDanmuCount:{room_id}")
# 房间弹幕记录
async def get_room_danmu(room_id: int) -> List[str]:
return await lrange(f"RoomDanmu:{room_id}", 0, -1)
async def add_room_danmu(room_id: int, content: str):
await rpush(f"RoomDanmu:{room_id}", content)
async def delete_room_danmu(room_id: int):
await delete(f"RoomDanmu:{room_id}")
# 房间弹幕时间分布
async def get_room_danmu_time(room_id: int) -> List[Tuple[str, int]]:
return await hgetalltuplei(f"RoomDanmuTime:{room_id}")
async def incr_room_danmu_time(room_id: int, timestamp: int) -> int:
return await hincrby(f"RoomDanmuTime:{room_id}", timestamp)
async def delete_room_danmu_time(room_id: int):
await delete(f"RoomDanmuTime:{room_id}")
# 房间盲盒数量
async def get_room_box_count(room_id: int) -> int:
return await hgeti("RoomBoxCount", room_id)
async def incr_room_box_count(room_id: int, count: int) -> int:
return await hincrby("RoomBoxCount", room_id, count)
async def reset_room_box_count(room_id: int):
await hset("RoomBoxCount", room_id, 0)
# 房间累计盲盒数量
async def accumulate_room_box_total(room_id: int) -> int:
return await hincrby("RoomBoxTotal", room_id, await get_room_box_count(room_id))
# 用户盲盒数量
async def len_user_box_count(room_id: int) -> int:
return await zcard(f"UserBoxCount:{room_id}")
async def get_user_box_count(room_id: int, start: int = 0, end: int = -1) -> List[Tuple[str, int]]:
return await zrevrangewithscoresi(f"UserBoxCount:{room_id}", start, end)
async def incr_user_box_count(room_id: int, uid: int, count: int) -> float:
return await zincrby(f"UserBoxCount:{room_id}", uid, count)
async def delete_user_box_count(room_id: int):
await delete(f"UserBoxCount:{room_id}")
# 用户累计盲盒数量
async def accumulate_user_box_total(room_id: int):
await zunionstore(f"UserBoxTotal:{room_id}", f"UserBoxCount:{room_id}")
# 房间盲盒盈亏
async def get_room_box_profit(room_id: int) -> float:
return await hgetf1("RoomBoxProfit", room_id)
async def incr_room_box_profit(room_id: int, profit: float) -> float:
return await hincrbyfloat("RoomBoxProfit", room_id, profit)
async def reset_room_box_profit(room_id: int):
await hset("RoomBoxProfit", room_id, 0)
# 房间累计盲盒盈亏
async def accumulate_room_box_profit_total(room_id: int) -> float:
return await hincrbyfloat("RoomBoxProfitTotal", room_id, await get_room_box_profit(room_id))
# 用户盲盒盈亏
async def get_user_box_profit(room_id: int, start: int = 0, end: int = -1) -> List[Tuple[str, float]]:
return await zrevrangewithscoresf1(f"UserBoxProfit:{room_id}", start, end)
async def incr_user_box_profit(room_id: int, uid: int, profit: float) -> float:
return await zincrby(f"UserBoxProfit:{room_id}", uid, profit)
async def delete_user_box_profit(room_id: int):
await delete(f"UserBoxProfit:{room_id}")
# 用户累计盲盒盈亏
async def accumulate_user_box_profit_total(room_id: int):
await zunionstore(f"UserBoxProfitTotal:{room_id}", f"UserBoxProfit:{room_id}")
# 房间盲盒盈亏记录,用于绘制直播报告中盲盒盈亏曲线图
async def get_room_box_profit_record(room_id: int) -> List[float]:
return await lrangef1(f"RoomBoxProfitRecord:{room_id}", 0, -1)
async def add_room_box_profit_record(room_id: int, profit: float):
await rpush(f"RoomBoxProfitRecord:{room_id}", profit)
async def delete_room_box_profit_record(room_id: int):
await delete(f"RoomBoxProfitRecord:{room_id}")
# 盲盒盈亏记录,用于计算直播报告中击败了百分之多少的直播间
async def len_box_profit_record() -> int:
return await zcard("BoxProfitRecord")
async def rank_box_profit_record(start_time: int, uid: int, uname: str) -> int:
return await zrank("BoxProfitRecord", f"{start_time}-{uid}-{uname}")
async def add_box_profit_record(start_time: int, uid: int, uname: str, profit: float):
await zadd("BoxProfitRecord", f"{start_time}-{uid}-{uname}", profit)
# 房间盲盒时间分布
async def get_room_box_time(room_id: int) -> List[Tuple[str, int]]:
return await hgetalltuplei(f"RoomBoxTime:{room_id}")
async def incr_room_box_time(room_id: int, timestamp: int) -> int:
return await hincrby(f"RoomBoxTime:{room_id}", timestamp)
async def delete_room_box_time(room_id: int):
await delete(f"RoomBoxTime:{room_id}")
# 房间礼物价值
async def get_room_gift_profit(room_id: int) -> float:
return await hgetf1("RoomGiftProfit", room_id)
async def incr_room_gift_profit(room_id: int, price: float) -> float:
return await hincrbyfloat("RoomGiftProfit", room_id, price)
async def reset_room_gift_profit(room_id: int):
await hset("RoomGiftProfit", room_id, 0)
# 房间累计礼物价值
async def accumulate_room_gift_total(room_id: int) -> float:
return await hincrbyfloat("RoomGiftTotal", room_id, await get_room_gift_profit(room_id))
# 用户礼物价值
async def len_user_gift_profit(room_id: int) -> int:
return await zcard(f"UserGiftProfit:{room_id}")
async def get_user_gift_profit(room_id: int, start: int = 0, end: int = -1) -> List[Tuple[str, float]]:
return await zrevrangewithscoresf1(f"UserGiftProfit:{room_id}", start, end)
async def incr_user_gift_profit(room_id: int, uid: int, price: float) -> float:
return await zincrby(f"UserGiftProfit:{room_id}", uid, price)
async def delete_user_gift_profit(room_id: int):
await delete(f"UserGiftProfit:{room_id}")
# 用户累计礼物价值
async def accumulate_user_gift_total(room_id: int):
await zunionstore(f"UserGiftTotal:{room_id}", f"UserGiftProfit:{room_id}")
# 房间礼物时间分布
async def get_room_gift_time(room_id: int) -> List[Tuple[str, float]]:
return await hgetalltuplef1(f"RoomGiftTime:{room_id}")
async def incr_room_gift_time(room_id: int, timestamp: int, price: float) -> float:
return await hincrbyfloat(f"RoomGiftTime:{room_id}", timestamp, price)
async def delete_room_gift_time(room_id: int):
await delete(f"RoomGiftTime:{room_id}")
# 房间 SC 价值
async def get_room_sc_profit(room_id: int) -> int:
return await hgeti("RoomScProfit", room_id)
async def incr_room_sc_profit(room_id: int, price: int) -> int:
return await hincrby("RoomScProfit", room_id, price)
async def reset_room_sc_profit(room_id: int):
await hset("RoomScProfit", room_id, 0)
# 房间累计 SC 价值
async def accumulate_room_sc_total(room_id: int) -> int:
return await hincrby("RoomScTotal", room_id, await get_room_sc_profit(room_id))
# 用户 SC 价值
async def len_user_sc_profit(room_id: int) -> int:
return await zcard(f"UserScProfit:{room_id}")
async def get_user_sc_profit(room_id: int, start: int = 0, end: int = -1) -> List[Tuple[str, int]]:
return await zrevrangewithscoresi(f"UserScProfit:{room_id}", start, end)
async def incr_user_sc_profit(room_id: int, uid: int, price: int) -> float:
return await zincrby(f"UserScProfit:{room_id}", uid, price)
async def delete_user_sc_profit(room_id: int):
await delete(f"UserScProfit:{room_id}")
# 用户累计 SC 价值
async def accumulate_user_sc_total(room_id: int):
await zunionstore(f"UserScTotal:{room_id}", f"UserScProfit:{room_id}")
# 房间 SC 时间分布
async def get_room_sc_time(room_id: int) -> List[Tuple[str, int]]:
return await hgetalltuplei(f"RoomScTime:{room_id}")
async def incr_room_sc_time(room_id: int, timestamp: int, price: int) -> int:
return await hincrby(f"RoomScTime:{room_id}", timestamp, price)
async def delete_room_sc_time(room_id: int):
await delete(f"RoomScTime:{room_id}")
# 房间大航海数量
async def get_room_captain_count(room_id: int) -> int:
return await hgeti("RoomCaptainCount", room_id)
async def get_room_commander_count(room_id: int) -> int:
return await hgeti("RoomCommanderCount", room_id)
async def get_room_governor_count(room_id: int) -> int:
return await hgeti("RoomGovernorCount", room_id)
async def incr_room_guard_count(type_str: str, room_id: int, month: int) -> int:
return await hincrby(f"Room{type_str}Count", room_id, month)
async def reset_room_guard_count(room_id: int):
await hset("RoomCaptainCount", room_id, 0)
await hset("RoomCommanderCount", room_id, 0)
await hset("RoomGovernorCount", room_id, 0)
# 房间累计大航海数量
async def accumulate_room_guard_total(room_id: int):
await hincrby("RoomCaptainTotal", room_id, await get_room_captain_count(room_id))
await hincrby("RoomCommanderTotal", room_id, await get_room_commander_count(room_id))
await hincrby("RoomGovernorTotal", room_id, await get_room_governor_count(room_id))
# 用户大航海数量
async def get_user_captain_count(room_id: int, start: int = 0, end: int = -1) -> List[Tuple[str, int]]:
return await zrevrangewithscoresi(f"UserCaptainCount:{room_id}", start, end)
async def get_user_commander_count(room_id: int, start: int = 0, end: int = -1) -> List[Tuple[str, int]]:
return await zrevrangewithscoresi(f"UserCommanderCount:{room_id}", start, end)
async def get_user_governor_count(room_id: int, start: int = 0, end: int = -1) -> List[Tuple[str, int]]:
return await zrevrangewithscoresi(f"UserGovernorCount:{room_id}", start, end)
async def incr_user_guard_count(type_str: str, room_id: int, uid: int, month: int) -> float:
return await zincrby(f"User{type_str}Count:{room_id}", uid, month)
async def delete_user_guard_count(room_id: int):
await delete(f"UserCaptainCount:{room_id}")
await delete(f"UserCommanderCount:{room_id}")
await delete(f"UserGovernorCount:{room_id}")
# 用户累计大航海数量
async def accumulate_user_guard_total(room_id: int):
await zunionstore(f"UserCaptainTotal:{room_id}", f"UserCaptainCount:{room_id}")
await zunionstore(f"UserCommanderTotal:{room_id}", f"UserCommanderCount:{room_id}")
await zunionstore(f"UserGovernorTotal:{room_id}", f"UserGovernorCount:{room_id}")
# 房间大航海时间分布
async def get_room_guard_time(room_id: int) -> List[Tuple[str, int]]:
return await hgetalltuplei(f"RoomGuardTime:{room_id}")
async def incr_room_guard_time(room_id: int, timestamp: int, month: int) -> int:
return await hincrby(f"RoomGuardTime:{room_id}", timestamp, month)
async def delete_room_guard_time(room_id: int):
await delete(f"RoomGuardTime:{room_id}")
# 累计和重置数据
async def accumulate_data(room_id: int):
# 累计弹幕数
await accumulate_room_danmu_total(room_id)
await accumulate_user_danmu_total(room_id)
# 累计盲盒数
await accumulate_room_box_total(room_id)
await accumulate_user_box_total(room_id)
# 累计盲盒盈亏
await accumulate_room_box_profit_total(room_id)
await accumulate_user_box_profit_total(room_id)
# 累计礼物收益
await accumulate_room_gift_total(room_id)
await accumulate_user_gift_total(room_id)
# 累计 SC 收益
await accumulate_room_sc_total(room_id)
await accumulate_user_sc_total(room_id)
# 累计大航海数
await accumulate_room_guard_total(room_id)
await accumulate_user_guard_total(room_id)
async def reset_data(room_id: int):
# 清空弹幕记录
await delete_room_danmu(room_id)
# 清空数据分布
await delete_room_danmu_time(room_id)
await delete_room_box_time(room_id)
await delete_room_gift_time(room_id)
await delete_room_sc_time(room_id)
await delete_room_guard_time(room_id)
# 重置弹幕数
await reset_room_danmu_count(room_id)
await delete_user_danmu_count(room_id)
# 重置盲盒数
await reset_room_box_count(room_id)
await delete_user_box_count(room_id)
# 重置盲盒盈亏
await reset_room_box_profit(room_id)
await delete_user_box_profit(room_id)
# 清空盲盒盈亏记录
await delete_room_box_profit_record(room_id)
# 重置礼物收益
await reset_room_gift_profit(room_id)
await delete_user_gift_profit(room_id)
# 重置 SC 收益
await reset_room_sc_profit(room_id)
await delete_user_sc_profit(room_id)
# 重置大航海数
await reset_room_guard_count(room_id)
await delete_user_guard_count(room_id)