import asyncio import httpx import base64 import os from dotenv import load_dotenv from uuid import uuid4 load_dotenv() def get_env(key: str): return os.getenv(key, "") class XBXS: def __init__(self, u: str, p: str) -> None: self.username = u self.password = base64.b64encode(p.encode()).decode() self.token = None self.client = httpx.AsyncClient( base_url="https://xiaobei.yinghuaonline.com", timeout=10, headers={ "user-agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 16_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 Html5Plus/1.0 (Immersed/20) uni-app", "accept": "*/*", "accept-language": "zh-CN,zh-Hans;q=0.9", "accept-encoding": "gzip, deflate, br", }, ) async def get_captcha(self): resp = await self.client.get("/xiaobei-api/captchaImage") resp.raise_for_status() result = resp.json() if "uuid" not in result: raise RuntimeError(f"captcha error: {result}") return { "showCode": result.get("showCode", ""), "uuid": result.get("uuid", ""), } async def login(self): data = await self.get_captcha() resp = await self.client.post( "/xiaobei-api/userLogin", json={ "username": self.username, "password": self.password, "code": data.get("showCode", ""), "uuid": data.get("uuid", ""), "appUuid": "", }, ) resp.raise_for_status() result = resp.json() self.token = result.get("token") self.client.headers["Authorization"] = f"Bearer {self.token}" return resp.json() async def get_info(self): resp = await self.client.get("/xiaobei-api/getInfo") resp.raise_for_status() result = resp.json() return { "userName": result.get("user", {}).get("userName", ""), "nickName": result.get("user", {}).get("nickName", ""), "roles": result.get("roles", []), "phonenumber": result.get("user", {}).get("phonenumber", ""), "bindphonenumber": result.get("user", {}).get("bindphonenumber", ""), "idCard": result.get("user", {}).get("idCard", ""), "trtcId": result.get("user", {}).get("trtcId", ""), } async def get_student_info(self): resp = await self.client.get("/xiaobei-api/student/studentInfo") resp.raise_for_status() result = resp.json() return { "id": result.get("data", {}).get("id", ""), "studentNumber": result.get("data", {}).get("studentNumber", ""), "deptId": result.get("data", {}).get("deptId", ""), "deptName": result.get("data", {}).get("deptName", ""), "studentName": result.get("data", {}).get("studentName", ""), "studentIpone": result.get("data", {}).get("studentIpone", ""), "studentIdCard": result.get("data", {}).get("studentIdCard", ""), "studentSex": result.get("data", {}).get("studentSex", ""), "studentSchoolTime": result.get("data", {}).get("studentSchoolTime", ""), "schoolId": result.get("data", {}).get("schoolId", ""), "schoolName": result.get("data", {}).get("schoolName", ""), "collegeId": result.get("data", {}).get("collegeId", ""), "collegeName": result.get("data", {}).get("collegeName", ""), "professionalId": result.get("data", {}).get("professionalId", ""), "professionalName": result.get("data", {}).get("professionalName", ""), } async def dep_id(self): resp = await self.client.get("/xiaobei-api/student/deptId") resp.raise_for_status() result = resp.json() return result async def get_user_sig2(self): resp = await self.client.get("/xiaobei-api/trtc/genUserSig2") resp.raise_for_status() result = resp.json() return result async def get_know_list(self): resp = await self.client.get("/xiaobei-api/student/know/list") resp.raise_for_status() result = resp.json() if result.get("total", 0) > 0: pass return result async def update_student_know(self, knowingID: str): files = { "file0": ( str(uuid4()), open("./image.jpg", "rb"), "image/jpeg", ) } resp = await self.client.post( "/xiaobei-api/student/know/updateStudentKnow", data={ "address": "1388号", "remark": "无", "knowingId": knowingID, "location": "106.5346788194445:29.343291015625", "size": "1", }, ) resp.raise_for_status() async def main(self): try: data = await self.login() print(data) finally: await self.client.aclose() if __name__ == "__main__": asyncio.run( XBXS( get_env("XBXS_USERNAME"), get_env("XBXS_PASSWORD"), ).main() )