This commit is contained in:
Yingjie Wang 2024-10-29 11:50:16 +08:00
commit c84ee68cd8
3 changed files with 1232 additions and 0 deletions

13
Pipfile Normal file
View File

@ -0,0 +1,13 @@
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
[packages]
bilibili-api-python = "*"
numpy = "*"
[dev-packages]
[requires]
python_version = "3.11"

1109
Pipfile.lock generated Normal file

File diff suppressed because it is too large Load Diff

110
bilibot.py Normal file
View File

@ -0,0 +1,110 @@
import time
import numpy as np
from typing import Dict, TypedDict
from apscheduler.schedulers.background import BackgroundScheduler
from bilibili_api import Credential, Danmaku, sync
from bilibili_api.live import LiveDanmaku, LiveRoom
import config
rng = np.random.default_rng()
ROOMID = config.ROOMID
# 凭证
credential = Credential(
sessdata=config.SESSDATA,
bili_jct=config.BILI_JCT
)
# 监听直播间弹幕
monitor = LiveDanmaku(ROOMID, credential=credential)
# 用来发送弹幕
sender = LiveRoom(ROOMID, credential=credential)
# 发送用 UID
UID = 1153907392
class Gift(TypedDict):
'''用户赠送礼物列表
username: 用户名
last_gift_time: 最后一次赠送礼物时时间戳
gift_list: 赠送的礼物字典格式 礼物名: 数量'''
username: str
last_gift_time: int
gift_list: Dict[str, int]
user_list: Dict[int, Gift] = dict()
sched = BackgroundScheduler(timezone="Asia/Shanghai") # 定时任务
@monitor.on("DANMU_MSG")
async def recv(event):
# 发送者UID
uid = event["data"]["info"][2][0]
# 排除自己发送的弹幕
if uid == UID:
return
# 弹幕文本
msg = event["data"]["info"][1]
if msg == "ping":
await sender.send_danmaku(Danmaku("pang!"))
@monitor.on("SEND_GIFT")
async def on_gift(event):
info = event['data']['data']
# print(info)
uid = info['uid']
user = user_list.get(uid)
if user:
# 如果用户列表中有该用户 则更新他的礼物字典以及礼物时间戳
num = user['gift_list'].get(info['giftName'], 0)
user['gift_list'][info['giftName']] = num + info['num']
user['last_gift_time'] = int(time.time())
print(f"update {uid}")
else:
# 不存在则以现在时间及礼物新建 Gift 对象
user_list[uid] = Gift(
username=info['uname'],
last_gift_time=int(time.time()),
gift_list={info['giftName']: info['num']}
)
print(f"new {uid}")
# 开启一个监控
sched.add_job(check, 'interval', seconds=1, args=[uid], id=str(uid))
print(f"start monitoring {uid}")
@monitor.on("PRAPERING")
async def on_preparing(event):
await sender.send_danmaku(Danmaku("欢迎魚魚准备上班,搬好小板凳了喵~ヽ(*・ω・)ノ "))
@monitor.on("LIVE")
async def on_live_start(event):
beijing_tz = timezone(timedelta(hours=8))
now = datetime.now(beijing_tz)
current_hour = now.hour
choices = ["直播开始啦,快来看魚宝!", "欢迎魚宝进入直播间!", "上班啦,魚宝今日顺利!"]
if 11 <= current_hour < 14:
choices.append("魚宝中午好,吃饭了喵?")
elif 18 <= current_hour < 21:
choices.append("魚宝晚上好,记得早点下班睡觉!")
danmu = rng.choice(choices)
await sender.send_danmaku(Danmaku(danmu))
def check(uid: int):
'判断是否超过阈值并输出'
user = user_list.get(uid)
if user:
if int(time.time()) - user.get('last_gift_time', 0) > 5: # 5 秒超时开始回复
sched.remove_job(str(uid)) # 移除该监控任务
print(f"watch {uid} end")
# print(user_list.pop(uid)) # 释放用户
whatever = user_list.pop(uid)
danmu = '感谢' + whatever['username'] + '赠送的' + ''.join([str(v) + '' + k for k, v in whatever['gift_list'].items()]) + '!'
sync(sender.send_danmaku(Danmaku(danmu)))
else:
print(f"keep watching {uid}...")
else:
print(f"cannot find {uid}")
if __name__ == '__main__':
sched.start() # 启动定时任务
sync(monitor.connect()) # 连接直播间