feat: 初始化项目并提交一次代码

This commit is contained in:
2025-12-17 20:25:15 +08:00
commit 355098025f
19 changed files with 585 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
__pycache__
data
fonts/*.ttf

21
LICENSE Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2025 zhanghoulin
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

45
README.md Normal file
View File

@@ -0,0 +1,45 @@
## 城科水电监控
**进行水费和电费的实时监控程序, 默认获取前24条数据进行绘图**
### 实现功能
- 获取**水费**和**电费**使用情况
- 使用 `matplotlib` 进行绘图可视化
- 使用企业微信机器人进行推送图片
### 使用
- 配置当前目录下的 `config.py` 文件
- 手动去 Github 下载 [LXGWWenKai-Medium 字体](https://github.com/lxgw/LxgwWenKai) 保存到fonts文件夹中或者使用 wget 进行获取,然后修改配置文件
```sh
curl -L -o ./fonts/LXGWWenKaiMono-Medium.ttf https://github.5700.cf/https://github.com/lxgw/LxgwWenKai/releases/download/v1.521/LXGWWenKaiMono-Medium.ttf
```
- `UID` 获取,使用支付宝进行扫描下方二维码即可获得 `uid`
![UID 获取](./images/image.png)
- `USER_NO` 获取,通过抓包获取,使用抓包软件进行抓取微信小程序**扫呗团餐**,对响应体进行搜索`user_no`即可
![USER_NO 获取](./images/image2.png)
- `SCHOOL_ACCOUNT` 这是学号(可选)
- `USER_NAME` 这是姓名(可选)
- `WECHAT_COMANY_BOT_KEY` 企业微信机器人 key
- 安装依赖
```sh
pip install aiosqlite apscheduler httpx matplotlib
```
- 通过 `scheduler.py` 修改监控时长配置(不懂的请问 AI
### 📜 License
This project is licensed under the [MIT License](LICENSE).

19
config.py Normal file
View File

@@ -0,0 +1,19 @@
from dotenv import load_dotenv
import os
load_dotenv()
DB_PATH = os.getenv("DB_PATH", "data/data.db")
FONT_PATH = os.path.join(os.getcwd(), "fonts", "LXGWWenKai-Medium.ttf")
# 获取水电信息配置
UID = os.getenv("UID", "1111111111111111") # 支付宝的 uid
USER_NO = os.getenv("USER_NO", "11111111111111111111111111111111") # 小程序的 user_no
SCHOOL_ACCOUNT = os.getenv("SCHOOL_ACCOUNT", "1111111111") # 学号
USER_NAME = os.getenv("USER_NAME", "张三") # 姓名
# 推送相关参数
WECHAT_COMANY_BOT_KEY = os.getenv(
"WECHAT_COMANY_BOT_KEY", "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
)

10
database.py Normal file
View File

@@ -0,0 +1,10 @@
import aiosqlite
import config
import os
def get_conn():
if not os.path.exists(config.DB_PATH):
open(config.DB_PATH, "w").close()
return aiosqlite.connect(config.DB_PATH, timeout=30)

0
fonts/.gitkeep Normal file
View File

BIN
images/image.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 56 KiB

BIN
images/image2.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 125 KiB

40
main.py Normal file
View File

@@ -0,0 +1,40 @@
# main.py / 启动入口
import asyncio
import os
from tasks.water_ammeter import SD
import config
from models.water_usage import WaterUsageModel
from models.ammeter_usage import AmmeterUsageModel
from scheduler import start_scheduler
import logging
import warnings
warnings.filterwarnings("ignore", message="Glyph .* missing from font")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s",
)
# 确保数据库目录存在
db_dir = os.path.dirname(config.DB_PATH)
if db_dir and not os.path.exists(db_dir):
os.makedirs(db_dir)
async def main():
# await WaterUsageModel.create_table()
# await AmmeterUsageModel.create_table()
# start_scheduler()
await SD().push()
while True:
await asyncio.sleep(3600)
if __name__ == "__main__":
asyncio.run(main())

0
models/__init__.py Normal file
View File

55
models/ammeter_usage.py Normal file
View File

@@ -0,0 +1,55 @@
import time
import aiosqlite
from database import get_conn
class AmmeterUsageModel:
@classmethod
async def create_table(cls):
async with get_conn() as conn:
conn.row_factory = aiosqlite.Row
await conn.execute(
"""
CREATE TABLE IF NOT EXISTS ammeter_usage (
id INTEGER PRIMARY KEY AUTOINCREMENT,
room_id INTEGER,
room_name TEXT,
left_ele REAL,
left_money REAL,
left_free_ele REAL,
left_free_money REAL,
ele_price REAL,
mon_time INTEGER,
created_at INTEGER UNIQUE
)
"""
)
await conn.commit()
@classmethod
async def insert(cls, data: dict):
print(data)
async with get_conn() as conn:
conn.row_factory = aiosqlite.Row
await conn.execute(
"""
INSERT OR IGNORE INTO ammeter_usage (
room_id, room_name,
left_ele, left_money,
left_free_ele, left_free_money,
ele_price, mon_time, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
data["roomId"],
data["roomName"],
data["leftEle"],
data["leftMoney"],
data["leftFreeEle"],
data["leftFreeMoney"],
data["elePrice"],
data["monTime"],
int(time.time() * 1000),
),
)
await conn.commit()

55
models/water_usage.py Normal file
View File

@@ -0,0 +1,55 @@
import time
import aiosqlite
from database import get_conn
class WaterUsageModel:
@classmethod
async def create_table(cls):
async with get_conn() as conn:
conn.row_factory = aiosqlite.Row
await conn.execute(
"""
CREATE TABLE IF NOT EXISTS water_usage (
id INTEGER PRIMARY KEY AUTOINCREMENT,
room_id INTEGER,
room_name TEXT,
left_water REAL,
left_money REAL,
left_free_water REAL,
left_free_money REAL,
water_price REAL,
mon_time INTEGER,
created_at INTEGER UNIQUE
)
"""
)
await conn.commit()
@classmethod
async def insert(cls, data: dict):
print(data)
async with get_conn() as conn:
conn.row_factory = aiosqlite.Row
await conn.execute(
"""
INSERT OR IGNORE INTO water_usage (
room_id, room_name,
left_water, left_money,
left_free_water, left_free_money,
water_price, mon_time, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
data["roomId"],
data["roomName"],
data["leftWater"],
data["leftMoney"],
data["leftFreeWater"],
data["leftFreeMoney"],
data["coldWaterPrice"],
data["monTime"],
int(time.time() * 1000),
),
)
await conn.commit()

12
notify/wechat_company.py Normal file
View File

@@ -0,0 +1,12 @@
import httpx
import config
async def send_image(base64_str: str, md5: str):
url = f"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={config.WECHAT_COMANY_BOT_KEY}"
payload = {"msgtype": "image", "image": {"base64": base64_str, "md5": md5}}
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(url, json=payload)
resp.raise_for_status()

41
scheduler.py Normal file
View File

@@ -0,0 +1,41 @@
import logging
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from tasks.water_ammeter import SD
logger = logging.getLogger("scheduler")
sd = SD()
def start_scheduler():
logger.info("初始化 AsyncIOSchedulerAsia/Shanghai")
scheduler = AsyncIOScheduler(timezone="Asia/Shanghai")
scheduler.add_job(
sd.fetch_and_save,
trigger="cron",
minute="*",
id="water_ammeter_job",
replace_existing=True,
max_instances=1,
coalesce=True,
)
logger.info("已添加任务water_ammeter_job每小时整点执行")
scheduler.add_job(
sd.push,
trigger="cron",
hour=19,
minute=0,
id="push_gzh",
replace_existing=True,
max_instances=1,
coalesce=True,
)
logger.info("已添加任务push_gzh每天 08:00 执行)")
scheduler.start()
logger.info("调度器启动完成")
return scheduler

13
services/collector.py Normal file
View File

@@ -0,0 +1,13 @@
# services/collector.py
from models.water_usage import WaterUsageModel
from models.ammeter_usage import AmmeterUsageModel
async def handle_water_response(resp: dict):
if resp.get("statusCode") == "200":
await WaterUsageModel.insert(resp["resultObject"])
async def handle_ammeter_response(resp: dict):
if resp.get("statusCode") == "200":
await AmmeterUsageModel.insert(resp["resultObject"])

31
services/plot.py Normal file
View File

@@ -0,0 +1,31 @@
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from matplotlib import font_manager
import config
import logging
logging.getLogger("matplotlib").setLevel(logging.WARNING)
# 从字体文件创建字体对象
FONT = font_manager.FontProperties(fname=config.FONT_PATH)
# 负号修复
plt.rcParams["axes.unicode_minus"] = False
def plot_line(times, values, title, ylabel, out_path):
plt.figure(figsize=(8, 4))
plt.plot(times, values, marker="o")
plt.title(title, fontproperties=FONT)
plt.xlabel("时间", fontproperties=FONT)
plt.ylabel(ylabel, fontproperties=FONT)
plt.xticks(rotation=45, fontproperties=FONT)
plt.grid(True)
plt.tight_layout()
plt.savefig(out_path, dpi=150)
plt.close()

40
services/render.py Normal file
View File

@@ -0,0 +1,40 @@
from services.statistics import get_ammeter_left_ele, get_water_left
from services.plot import plot_line
async def render_ammeter_chart():
times, values = await get_ammeter_left_ele(limit=24)
if not times:
return None
out = "data/ammeter.png"
plot_line(
times=times,
values=values,
title="宿舍电量剩余趋势",
ylabel="剩余电量(度)",
out_path=out,
)
return out
async def render_water_chart():
times, values = await get_water_left(limit=24)
if not times:
return None
out = "data/water.png"
plot_line(
times=times,
values=values,
title="宿舍水量剩余趋势",
ylabel="剩余水量(吨)",
out_path=out,
)
return out

48
services/statistics.py Normal file
View File

@@ -0,0 +1,48 @@
import time
import aiosqlite
from database import get_conn
async def get_ammeter_left_ele(limit=24):
async with get_conn() as conn:
conn.row_factory = aiosqlite.Row
cursor = await conn.execute(
"""
SELECT created_at, left_ele
FROM ammeter_usage
ORDER BY created_at DESC
LIMIT ?
""",
(limit,),
)
rows = await cursor.fetchall()
rows.reverse()
times = [time.strftime("%H:%M", time.localtime(r[0] / 1000)) for r in rows]
values = [r[1] for r in rows]
return times, values
async def get_water_left(limit=24):
async with get_conn() as conn:
conn.row_factory = aiosqlite.Row
cursor = await conn.execute(
"""
SELECT created_at, left_water
FROM water_usage
ORDER BY created_at DESC
LIMIT ?
""",
(limit,),
)
rows = await cursor.fetchall()
rows.reverse()
times = [time.strftime("%H:%M", time.localtime(r[0] / 1000)) for r in rows]
values = [r[1] for r in rows]
return times, values

152
tasks/water_ammeter.py Normal file
View File

@@ -0,0 +1,152 @@
import asyncio
import httpx
import logging
import base64
import hashlib
from notify.wechat_company import send_image
from services.render import render_ammeter_chart, render_water_chart
from services import collector
import config
logger = logging.getLogger("task.sd")
def image_to_base64_md5(image_path: str) -> tuple[str, str]:
logger.debug("读取图片并计算 MD5/Base64: %s", image_path)
with open(image_path, "rb") as f:
data = f.read()
md5 = hashlib.md5(data).hexdigest()
b64 = base64.b64encode(data).decode("utf-8")
logger.debug("图片处理完成 md5=%s", md5)
return b64, md5
class SD:
def __init__(self) -> None:
logger.info("初始化 SD 客户端")
self.client = httpx.AsyncClient(
base_url="https://zhsd.cqcst.edu.cn",
timeout=30,
proxy="http://127.0.0.1:9000",
verify=False,
headers={
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Encoding": "gzip",
"User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 16_3 like Mac OS X) AppleWebKit/605.1.15",
"Connection": "Keep-Alive",
"Host": "zhsd.cqcst.edu.cn",
},
)
async def get_cookie(self):
logger.info("获取登录 Cookie")
resp = await self.client.get(
"/cqbn/service/weixin/thirdLogin",
params={
"uid": config.UID,
"userNo": config.USER_NO,
"schoolAccount": config.SCHOOL_ACCOUNT,
"userName": config.USER_NAME,
},
)
logger.debug("Cookie 响应状态码: %s", resp.status_code)
# resp.raise_for_status()
async def get_user_info(self):
logger.debug("获取用户信息")
resp = await self.client.get("/cqbn/service/find/userinfo")
resp.raise_for_status()
result = resp.json()
logger.debug("用户信息返回成功")
return result
async def get_water(self):
logger.info("请求水表数据")
resp = await self.client.get(
"/cqbn/service/waterBalance",
params={"type": "3", "systemType": "1"},
)
resp.raise_for_status()
result = resp.json()
logger.info("水表数据获取成功")
return result
async def get_ammeter(self):
logger.info("请求电表数据")
resp = await self.client.get(
"/cqbn/service/ammeterBalance",
params={"type": "1"},
)
resp.raise_for_status()
result = resp.json()
logger.info("电表数据获取成功")
return result
async def fetch_and_save(self):
logger.info("===== 开始采集水电数据 =====")
try:
await self.get_cookie()
water = await self.get_water()
await collector.handle_water_response(water)
logger.info("水表数据已入库")
ammeter = await self.get_ammeter()
await collector.handle_ammeter_response(ammeter)
logger.info("电表数据已入库")
logger.info("===== 水电数据采集完成 =====")
except Exception:
logger.exception("水电数据采集失败")
async def push(self):
logger.info("===== 开始推送水电图表 =====")
try:
ele_img_path = await render_ammeter_chart()
if ele_img_path:
logger.info("生成电表图表成功: %s", ele_img_path)
ele_b64, ele_md5 = image_to_base64_md5(ele_img_path)
await send_image(ele_b64, ele_md5)
logger.info("电表图表推送完成")
else:
logger.warning("未生成电表图表,跳过推送")
water_img_path = await render_water_chart()
if water_img_path:
logger.info("生成水表图表成功: %s", water_img_path)
water_b64, water_md5 = image_to_base64_md5(water_img_path)
await send_image(water_b64, water_md5)
logger.info("水表图表推送完成")
else:
logger.warning("未生成水表图表,跳过推送")
logger.info("===== 图表推送完成 =====")
except Exception:
logger.exception("水电图表推送失败")
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s",
)
asyncio.run(SD().fetch_and_save())