import asyncio import httpx import logging import base64 import hashlib from notify.wechat_company import send_image from services.render import render_ammeter_chart, render_water_chart from services import collector import config logger = logging.getLogger("task.sd") def image_to_base64_md5(image_path: str) -> tuple[str, str]: logger.debug("读取图片并计算 MD5/Base64: %s", image_path) with open(image_path, "rb") as f: data = f.read() md5 = hashlib.md5(data).hexdigest() b64 = base64.b64encode(data).decode("utf-8") logger.debug("图片处理完成 md5=%s", md5) return b64, md5 class SD: def __init__(self) -> None: logger.info("初始化 SD 客户端") self.client = httpx.AsyncClient( base_url="https://zhsd.cqcst.edu.cn", timeout=30, # proxy="http://127.0.0.1:9000", # verify=False, headers={ "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Encoding": "gzip", "User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 16_3 like Mac OS X) AppleWebKit/605.1.15", "Connection": "Keep-Alive", "Host": "zhsd.cqcst.edu.cn", }, ) async def get_cookie(self): logger.info("获取登录 Cookie") resp = await self.client.get( "/cqbn/service/weixin/thirdLogin", params={ "uid": config.UID, "userNo": config.USER_NO, "schoolAccount": config.SCHOOL_ACCOUNT, "userName": config.USER_NAME, }, ) logger.debug("Cookie 响应状态码: %s", resp.status_code) # resp.raise_for_status() async def get_user_info(self): logger.debug("获取用户信息") resp = await self.client.get("/cqbn/service/find/userinfo") resp.raise_for_status() result = resp.json() logger.debug("用户信息返回成功") return result async def get_water(self): logger.info("请求水表数据") resp = await self.client.get( "/cqbn/service/waterBalance", params={"type": "3", "systemType": "1"}, ) resp.raise_for_status() result = resp.json() logger.info("水表数据获取成功") return result async def get_ammeter(self): logger.info("请求电表数据") resp = await self.client.get( "/cqbn/service/ammeterBalance", params={"type": "1"}, ) resp.raise_for_status() result = resp.json() logger.info("电表数据获取成功") return result async def fetch_and_save(self): logger.info("===== 开始采集水电数据 =====") try: await self.get_cookie() water = await self.get_water() await collector.handle_water_response(water) logger.info("水表数据已入库") ammeter = await self.get_ammeter() await collector.handle_ammeter_response(ammeter) logger.info("电表数据已入库") logger.info("===== 水电数据采集完成 =====") except Exception: logger.exception("水电数据采集失败") async def push(self): logger.info("===== 开始推送水电图表 =====") try: ele_img_path = await render_ammeter_chart() if ele_img_path: logger.info("生成电表图表成功: %s", ele_img_path) ele_b64, ele_md5 = image_to_base64_md5(ele_img_path) await send_image(ele_b64, ele_md5) logger.info("电表图表推送完成") else: logger.warning("未生成电表图表,跳过推送") water_img_path = await render_water_chart() if water_img_path: logger.info("生成水表图表成功: %s", water_img_path) water_b64, water_md5 = image_to_base64_md5(water_img_path) await send_image(water_b64, water_md5) logger.info("水表图表推送完成") else: logger.warning("未生成水表图表,跳过推送") logger.info("===== 图表推送完成 =====") except Exception: logger.exception("水电图表推送失败") if __name__ == "__main__": logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s", ) asyncio.run(SD().fetch_and_save())