109 lines
3.6 KiB
Python
109 lines
3.6 KiB
Python
import time
|
|
import numpy as np
|
|
from typing import Dict, TypedDict
|
|
|
|
from apscheduler.schedulers.background import BackgroundScheduler
|
|
from bilibili_api import Credential, Danmaku, sync
|
|
from bilibili_api.live import LiveDanmaku, LiveRoom
|
|
|
|
import config
|
|
|
|
rng = np.random.default_rng()
|
|
|
|
ROOMID = config.ROOMID
|
|
# 凭证
|
|
credential = Credential(
|
|
sessdata=config.SESSDATA,
|
|
bili_jct=config.BILI_JCT
|
|
)
|
|
# 监听直播间弹幕
|
|
monitor = LiveDanmaku(ROOMID, credential=credential)
|
|
# 用来发送弹幕
|
|
sender = LiveRoom(ROOMID, credential=credential)
|
|
# 发送用 UID
|
|
UID = 1153907392
|
|
|
|
class Gift(TypedDict):
|
|
'''用户赠送礼物列表
|
|
username: 用户名
|
|
last_gift_time: 最后一次赠送礼物时时间戳
|
|
gift_list: 赠送的礼物字典,格式 礼物名: 数量'''
|
|
username: str
|
|
last_gift_time: int
|
|
gift_list: Dict[str, int]
|
|
|
|
user_list: Dict[int, Gift] = dict()
|
|
sched = BackgroundScheduler(timezone="Asia/Shanghai") # 定时任务
|
|
|
|
@monitor.on("DANMU_MSG")
|
|
async def recv(event):
|
|
# 发送者UID
|
|
uid = event["data"]["info"][2][0]
|
|
# 排除自己发送的弹幕
|
|
if uid == UID:
|
|
return
|
|
# 弹幕文本
|
|
msg = event["data"]["info"][1]
|
|
if msg == "ping":
|
|
await sender.send_danmaku(Danmaku("pang!"))
|
|
|
|
@monitor.on("SEND_GIFT")
|
|
async def on_gift(event):
|
|
info = event['data']['data']
|
|
# print(info)
|
|
uid = info['uid']
|
|
user = user_list.get(uid)
|
|
if user:
|
|
# 如果用户列表中有该用户 则更新他的礼物字典以及礼物时间戳
|
|
num = user['gift_list'].get(info['giftName'], 0)
|
|
user['gift_list'][info['giftName']] = num + info['num']
|
|
user['last_gift_time'] = int(time.time())
|
|
print(f"update {uid}")
|
|
else:
|
|
# 不存在则以现在时间及礼物新建 Gift 对象
|
|
user_list[uid] = Gift(
|
|
username=info['uname'],
|
|
last_gift_time=int(time.time()),
|
|
gift_list={info['giftName']: info['num']}
|
|
)
|
|
print(f"new {uid}")
|
|
# 开启一个监控
|
|
sched.add_job(check, 'interval', seconds=1, args=[uid], id=str(uid))
|
|
print(f"start monitoring {uid}")
|
|
|
|
@monitor.on("PRAPERING")
|
|
async def on_preparing(event):
|
|
await sender.send_danmaku(Danmaku("欢迎魚魚准备上班,搬好小板凳了喵~ヽ(*・ω・)ノ "))
|
|
|
|
@monitor.on("LIVE")
|
|
async def on_live_start(event):
|
|
current_hour = time.localtime().tm_hour
|
|
choices = ["直播开始啦,快来看魚宝!", "欢迎魚宝进入直播间!", "上班啦,魚宝今日顺利!"]
|
|
if 11 <= current_hour < 14:
|
|
choices.append("魚宝中午好,吃饭了喵?")
|
|
elif 18 <= current_hour < 21:
|
|
choices.append("魚宝晚上好,记得早点下班睡觉!")
|
|
danmu = rng.choice(choices)
|
|
await sender.send_danmaku(Danmaku(danmu))
|
|
|
|
def check(uid: int):
|
|
'判断是否超过阈值并输出'
|
|
user = user_list.get(uid)
|
|
if user:
|
|
if int(time.time()) - user.get('last_gift_time', 0) > 5: # 5 秒超时开始回复
|
|
sched.remove_job(str(uid)) # 移除该监控任务
|
|
print(f"watch {uid} end")
|
|
# print(user_list.pop(uid)) # 释放用户
|
|
whatever = user_list.pop(uid)
|
|
danmu = '感谢' + whatever['username'] + '赠送的' + '、'.join([str(v) + '个' + k for k, v in whatever['gift_list'].items()]) + '!'
|
|
sync(sender.send_danmaku(Danmaku(danmu)))
|
|
else:
|
|
print(f"keep watching {uid}...")
|
|
else:
|
|
print(f"cannot find {uid}")
|
|
|
|
if __name__ == '__main__':
|
|
sched.start() # 启动定时任务
|
|
|
|
sync(monitor.connect()) # 连接直播间
|