refactor: Refactor wbi utils

This commit is contained in:
LWR 2025-01-07 22:41:19 +08:00
parent 06334653da
commit 79afbf8a96
3 changed files with 48 additions and 101 deletions

View File

@ -6,10 +6,8 @@
import json
import time
from enum import Enum
from typing import List, Dict
from deprecated import deprecated
from typing import List
from .wbi import encWbi, getWbiKeys
from ..utils.Credential import Credential
from ..utils.network import request
from ..utils.utils import get_api
@ -102,7 +100,7 @@ class User:
用户相关
"""
def __init__(self, uid: int, credential: Credential = None, wbi_keys: Dict[str, str] = None):
def __init__(self, uid: int, credential: Credential = None):
"""
Args:
uid: 用户 UID
@ -114,12 +112,7 @@ class User:
credential = Credential()
self.credential = credential
self.__self_info = None
self.__wbi_keys = wbi_keys
def set_wbi_keys(self, wbi_keys: Dict[str, str]) -> None:
self.__wbi_keys = wbi_keys
@deprecated(reason='you should use get_user_info_wbi')
async def get_user_info(self):
"""
获取用户信息昵称性别生日签名头像 URL空间横幅 URL
@ -130,16 +123,6 @@ class User:
}
return await request("GET", url=api["url"], params=params, credential=self.credential)
async def get_user_info_wbi(self):
if self.__wbi_keys == None:
raise RuntimeError("wbi keys is None")
api = API["info"]["info_wbi"]
params = {
"mid": self.uid
}
params_wbi = encWbi(params, **self.__wbi_keys)
return await request("GET", url=api["url"], params=params, credential=self.credential)
async def __get_self_info(self):
"""
获取自己的信息如果存在缓存则使用缓存
@ -172,7 +155,6 @@ class User:
}
return await request("GET", url=api["url"], params=params, credential=self.credential)
@deprecated(reason="you should use get_live_info_wbi")
async def get_live_info(self):
"""
获取用户直播间信息
@ -183,19 +165,6 @@ class User:
}
return await request("GET", url=api["url"], params=params, credential=self.credential)
async def get_live_info_wbi(self):
"""
获取用户直播间信息
"""
if self.__wbi_keys == None:
raise RuntimeError("wbi keys is None")
api = API["info"]["live_wbi"]
params = {
"mid": self.uid
}
params_wbi = encWbi(params, **self.__wbi_keys)
return await request("GET", url=api["url"], params=params_wbi, credential=self.credential)
async def get_videos(self,
tid: int = 0,
pn: int = 1,
@ -270,7 +239,6 @@ class User:
}
return await request("GET", url=api["url"], params=params, credential=self.credential)
@deprecated(reason="https://github.com/SocialSisterYi/bilibili-API-collect/issues/852")
async def get_dynamics(self, offset: int = 0, need_top: bool = False):
"""
获取用户动态

View File

@ -1,66 +0,0 @@
# https://socialsisteryi.github.io/bilibili-API-collect/docs/misc/sign/wbi.html#python
from functools import reduce
from hashlib import md5
import urllib.parse
import time
import requests
mixinKeyEncTab = [
46, 47, 18, 2, 53, 8, 23, 32, 15, 50, 10, 31, 58, 3, 45, 35, 27, 43, 5, 49,
33, 9, 42, 19, 29, 28, 14, 39, 12, 38, 41, 13, 37, 48, 7, 16, 24, 55, 40,
61, 26, 17, 0, 1, 60, 51, 30, 4, 22, 25, 54, 21, 56, 59, 6, 63, 57, 62, 11,
36, 20, 34, 44, 52
]
def getMixinKey(orig: str):
'对 imgKey 和 subKey 进行字符顺序打乱编码'
return reduce(lambda s, i: s + orig[i], mixinKeyEncTab, '')[:32]
def encWbi(params: dict, img_key: str, sub_key: str):
'为请求参数进行 wbi 签名'
mixin_key = getMixinKey(img_key + sub_key)
curr_time = round(time.time())
params['wts'] = curr_time # 添加 wts 字段
params = dict(sorted(params.items())) # 按照 key 重排参数
# 过滤 value 中的 "!'()*" 字符
params = {
k : ''.join(filter(lambda chr: chr not in "!'()*", str(v)))
for k, v
in params.items()
}
query = urllib.parse.urlencode(params) # 序列化参数
wbi_sign = md5((query + mixin_key).encode()).hexdigest() # 计算 w_rid
params['w_rid'] = wbi_sign
return params
def getWbiKeys() -> tuple[str, str]:
'获取最新的 img_key 和 sub_key'
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3',
'Referer': 'https://www.bilibili.com/'
}
resp = requests.get('https://api.bilibili.com/x/web-interface/nav', headers=headers)
resp.raise_for_status()
json_content = resp.json()
img_url: str = json_content['data']['wbi_img']['img_url']
sub_url: str = json_content['data']['wbi_img']['sub_url']
img_key = img_url.rsplit('/', 1)[1].split('.')[0]
sub_key = sub_url.rsplit('/', 1)[1].split('.')[0]
return img_key, sub_key
if __name__ == '__main__':
img_key, sub_key = getWbiKeys()
signed_params = encWbi(
params={
'foo': '114',
'bar': '514',
'baz': 1919810
},
img_key=img_key,
sub_key=sub_key
)
query = urllib.parse.urlencode(signed_params)
print(signed_params)
print(query)

45
starbot/utils/wbi.py Normal file
View File

@ -0,0 +1,45 @@
# https://socialsisteryi.github.io/bilibili-API-collect/docs/misc/sign/wbi.html#python
from functools import reduce
from hashlib import md5
import urllib.parse
import time
from starbot.utils.network import request
from starbot.utils.utils import get_credential
mixinKeyEncTab = [
46, 47, 18, 2, 53, 8, 23, 32, 15, 50, 10, 31, 58, 3, 45, 35, 27, 43, 5, 49,
33, 9, 42, 19, 29, 28, 14, 39, 12, 38, 41, 13, 37, 48, 7, 16, 24, 55, 40,
61, 26, 17, 0, 1, 60, 51, 30, 4, 22, 25, 54, 21, 56, 59, 6, 63, 57, 62, 11,
36, 20, 34, 44, 52
]
def get_mixin_key(orig: str):
return reduce(lambda s, i: s + orig[i], mixinKeyEncTab, '')[:32]
def enc_wbi(params: dict, img_key: str, sub_key: str):
mixin_key = get_mixin_key(img_key + sub_key)
curr_time = round(time.time())
params['wts'] = curr_time # 添加 wts 字段
params = dict(sorted(params.items())) # 按照 key 重排参数
# 过滤 value 中的 "!'()*" 字符
params = {
k: ''.join(filter(lambda c: c not in "!'()*", str(v)))
for k, v
in params.items()
}
query = urllib.parse.urlencode(params) # 序列化参数
wbi_sign = md5((query + mixin_key).encode()).hexdigest() # 计算 w_rid
params['w_rid'] = wbi_sign
return params
async def get_wbi_keys() -> tuple[str, str]:
"""获取最新的 img_key 和 sub_key"""
result = await request("GET", "https://api.bilibili.com/x/web-interface/nav", credential=get_credential())
img_url = result['wbi_img']['img_url']
sub_url = result['wbi_img']['sub_url']
return img_url.rsplit('/', 1)[1].split('.')[0], sub_url.rsplit('/', 1)[1].split('.')[0]