Files
cksd/models/ammeter_usage.py

120 lines
3.8 KiB
Python

import time
import aiosqlite
from database import get_conn
class AmmeterUsageModel:
@classmethod
async def create_table(cls):
async with get_conn() as conn:
conn.row_factory = aiosqlite.Row
await conn.execute(
"""
CREATE TABLE IF NOT EXISTS ammeter_usage (
id INTEGER PRIMARY KEY AUTOINCREMENT,
room_id INTEGER,
room_name TEXT,
left_ele REAL,
left_money REAL,
left_free_ele REAL,
left_free_money REAL,
ele_price REAL,
mon_time INTEGER,
created_at INTEGER UNIQUE
)
"""
)
await conn.commit()
@classmethod
async def insert(cls, data: dict):
async with get_conn() as conn:
conn.row_factory = aiosqlite.Row
await conn.execute(
"""
INSERT OR IGNORE INTO ammeter_usage (
room_id, room_name,
left_ele, left_money,
left_free_ele, left_free_money,
ele_price, mon_time, created_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
data["roomId"],
data["roomName"],
data["leftEle"],
data["leftMoney"],
data["leftFreeEle"],
data["leftFreeMoney"],
data["elePrice"],
data["monTime"],
int(time.time() * 1000),
),
)
await conn.commit()
@classmethod
async def exec(cls, sql: str, *args, **kwargs):
async with get_conn() as conn:
conn.row_factory = aiosqlite.Row
await conn.execute(sql, *args, **kwargs)
await conn.commit()
@classmethod
async def get_recent(cls, room_id: int, days: int = 30):
"""查询最近N天数据"""
now = int(time.time() * 1000)
start = now - days * 24 * 60 * 60 * 1000
async with get_conn() as conn:
conn.row_factory = aiosqlite.Row
cur = await conn.execute(
"""
SELECT created_at, left_ele, left_free_ele, left_money
FROM ammeter_usage
WHERE room_id = ?
AND created_at >= ?
ORDER BY created_at ASC
""",
(room_id, start),
)
rows = await cur.fetchall()
return [dict(r) for r in rows]
@classmethod
async def get_range(cls, room_id: int, start: int, end: int):
"""按时间区间查询"""
async with get_conn() as conn:
conn.row_factory = aiosqlite.Row
cur = await conn.execute(
"""
SELECT *
FROM ammeter_usage
WHERE room_id = ?
AND created_at BETWEEN ? AND ?
ORDER BY created_at ASC
""",
(room_id, start, end),
)
rows = await cur.fetchall()
return [dict(r) for r in rows]
@classmethod
async def list(cls, room_id: int, page: int = 1, limit: int = 20):
"""分页查询,用于表格展示"""
offset = (page - 1) * limit
async with get_conn() as conn:
conn.row_factory = aiosqlite.Row
cur = await conn.execute(
"""
SELECT *
FROM ammeter_usage
WHERE room_id = ?
ORDER BY created_at DESC
LIMIT ? OFFSET ?
""",
(room_id, limit, offset),
)
rows = await cur.fetchall()
return [dict(r) for r in rows]