feat(*): 初始化代码仓库

This commit is contained in:
2025-11-05 13:00:58 +08:00
commit f45c1f4a1c
4 changed files with 448 additions and 0 deletions

393
main.py Normal file
View File

@@ -0,0 +1,393 @@
"""
--------------------------------------------
文件名称: ckwk.py
作者: zhilv
邮箱: zhilv666@qq.com
版本: 1.0
--------------------------------------------
说明:
本脚本仅用于学习与技术研究,禁止将其用于任何非法用途。
作者不对因使用本脚本造成的任何损失或后果承担责任。
--------------------------------------------
"""
import time
import os
import requests
import re
import json
import random
import ddddocr
from datetime import datetime
from dotenv import load_dotenv
from lxml import etree
from threading import Thread
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import warnings
warnings.filterwarnings("ignore")
load_dotenv()
class CKWK:
def __init__(self, un: str, pw: str, host: str) -> None:
self.session = requests.Session()
# 定义重试策略
retry_strategy = Retry(
total=5, # 最多重试 5 次
backoff_factor=1, # 重试间隔1s, 2s, 4s, 8s...
status_forcelist=[429, 500, 502, 503, 504], # 哪些状态码触发重试
allowed_methods=["HEAD", "GET", "OPTIONS", "POST"], # 允许重试的方法
)
# 挂载适配器
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
# self.session.verify = False
# self.session.proxies.update(
# {
# "http": "http://127.0.0.1:9000",
# "https": "http://127.0.0.1:9000",
# }
# )
self.username = un
self.host = host
self.password = pw
self.user = dict()
self.courses = []
self.ocr = ddddocr.DdddOcr()
self.session.headers.update(
{
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36 Edg/140.0.0.0",
"Accept": "application/json, text/javascript, */*; q=0.01",
"Accept-Encoding": "gzip, deflate, br, zstd",
}
)
def coder(self) -> str:
resp = self.session.get(
f"https://{self.host}/service/code",
params={"r": f"{random.random()}"},
)
if resp.status_code == 200:
return f"{self.ocr.classification(resp.content)}"
return ""
def login(self) -> bool:
yzm = self.coder()
resp = self.session.post(
f"https://{self.host}/user/login",
data={
"username": self.username,
"password": self.password,
"code": yzm,
"redirect": "",
},
)
if "登录成功" in resp.text:
return True
self.log(resp.text)
return False
def parse_user(self, html: str) -> None:
tree = etree.HTML(html)
try:
self.user = {
"name": tree.xpath('//div[@class="name"]/text()')[0],
"course_number": tree.xpath(
'//div[@class="intro"]/div[1]/div[2]/text()'
)[0],
"huping": tree.xpath('//div[@class="intro"]/div[2]/div[2]/text()')[0],
"lexueyuan": tree.xpath('//div[@class="intro"]/div[3]/div[2]/text()')[
0
],
"taolunzhuti": tree.xpath('//div[@class="intro"]/div[4]/div[2]/text()')[
0
],
"study_time": "".join(
tree.xpath('//div[@class="intro"]/div[5]/div[2]//text()')
),
"courses": [
{
"name": course.xpath('.//div[@class="name"]/a/text()')[0],
"href": course.xpath('.//div[@class="name"]/a/@href')[0],
"id": course.xpath('.//div[@class="name"]/a/@href')[0].split(
"="
)[-1],
"progress": course.xpath(
'.//div[@class="progress"]/div[3]/text()'
)[0],
}
for course in tree.xpath('//div[@class="user-course"]/div')
],
}
# print(self.user)
except Exception as e:
self.log("[parse] 提取用户信息失败: ", e)
raise e
finally:
pass
def parse_course(self, html: str) -> tuple[int, int]:
tree = etree.HTML(html)
video = 0
state = 1
try:
if self.courses == []:
courses = [
{
"title": i.get("title", ""),
"href": i.get("href", ""),
"id": i.get("href", "").split("=")[-1],
"class": i.get("class", ""),
}
for i in tree.xpath(
# '//div[@class="item"]/a[contains(text(), "mp4")]'
'//div[@class="item"]/a'
# '//div[@class="item"]/a[not(contains(text(), "考试") or contains(text(), "课件") or contains(text(), "练习") or contains(text(), "实验"))]'
)
]
index = next(
(i for i, c in enumerate(courses) if c["class"] == "on"), None
)
if index is not None:
self.courses = courses[index:]
# print(self.courses)
video = int(tree.xpath('.//input[@id="video-duration"]/@value')[0])
state = int(tree.xpath('.//input[@id="study-state"]/@value')[0])
except Exception as e:
self.log("[parse] 提取课程列表失败: ", e)
# raise e
finally:
return video, state
def get_course(self, _id: str) -> tuple[int, int]:
resp = self.session.get("https://{}/user/node?nodeId={}".format(self.host, _id))
if resp.status_code == 200 and "错误提示" not in resp.text:
return self.parse_course(resp.text)
elif "错误提示" in resp.text:
msg = etree.HTML(resp.text).xpath('//div[@class="name"]/text()')[0]
self.log(msg)
return 0, 1
self.log(resp.text)
return 0, 1
def get_course_id(self, _id):
resp = self.session.get(
"https://{}/user/course?courseId={}".format(self.host, _id)
)
tree = etree.HTML(resp.text)
self.get_course(
"".join(
tree.xpath('//div[@class="ncoursecon-intro"]/div/div[1]/a/@href')
).split("=")[-1]
)
def get_study_record(self, course_id: str, page=1):
resp = self.session.get(
f"https://{self.host}/user/study_record.json",
params={
"courseId": course_id,
"page": page,
"_": str(int(time.time() * 1000)),
},
)
if resp.status_code == 200 and resp.json().get("status", False):
self.log(f'{resp.json().get("msg", "")} page -> {page}')
self.courses.extend(resp.json().get("list", []))
if page <= resp.json().get("pageInfo", {}).get("pageCount", 0):
self.get_study_record(course_id, page + 1)
def log(self, *args, **kwargs) -> None:
exit_flag = False
time_str = datetime.now().strftime("[%Y-%m-%d %H:%M:%S]")
prefix = f'[{self.user.get("name", "")}] {time_str}'
# 检查 args 中是否有 \r若有则将前缀拼在 \r 之后
if args and isinstance(args[0], str) and "\r" in args[0]:
# 将前缀插入到最后一个 \r 后面
msg = args[0].rsplit("\r", 1)
log_content = (
f"{msg[0]}\r{prefix} {msg[1]}"
if len(msg) > 1
else f"{prefix} {args[0]}"
)
else:
log_content = f'{prefix} {" ".join(map(str, args))}'
print(log_content, **kwargs)
if exit_flag:
os._exit(0)
def online(self) -> bool:
resp = self.session.post(f"https://{self.host}/user/online")
if resp.status_code == 200 and resp.json().get("status", False):
return True
return False
def study_start(self, node_id: str) -> None:
headers = self.session.headers
headers["x-requested-with"] = "XMLHttpRequest"
resp = self.session.post(
f"https://{self.host}/user/node/study",
data={
"nodeId": node_id,
"studyId": 0,
"studyTime": 1,
"code": self.coder(),
},
headers=headers,
)
try:
if resp.status_code == 200 and resp.json().get("status", False):
self.log(f'{resp.json().get("msg", "")} --> 1', end="", flush=True)
self.study_id = resp.json().get("studyId", 0)
except:
if resp.status_code == 200:
data = json.loads(
re.findall(">var data =(.*?);</script>", resp.text)[0]
)
self.log(data.get("msg", ""))
self.study_id = data.get("studyId", 0)
def study(self, node_id: str, study_time: str) -> None:
headers = self.session.headers
headers["x-requested-with"] = "XMLHttpRequest"
resp = self.session.post(
f"https://{self.host}/user/node/study",
data={
"nodeId": node_id,
"studyId": self.study_id,
"studyTime": study_time,
},
headers=headers,
)
if resp.status_code == 200 and resp.json().get("status", False):
self.log(
f'\r{" " * 100}\r{resp.json().get("msg", "")} --> {study_time}',
end="",
flush=True,
)
else:
self.study(node_id, study_time)
def study_over(self, node_id, study_time: str) -> bool:
headers = self.session.headers
headers["x-requested-with"] = "XMLHttpRequest"
resp = self.session.post(
f"https://{self.host}/user/node/study",
data={
"nodeId": node_id,
"studyId": self.study_id,
"studyTime": study_time,
"close": "1",
},
headers=headers,
)
if resp.status_code == 200 and resp.json().get("status", False):
self.log(
f'\r{" " * 100}\r{resp.json().get("msg", "")} --> {study_time}',
flush=True,
)
return True
return False
def get_user(self) -> None:
resp = self.session.get(f"https://{self.host}/user")
if resp.status_code == 200:
self.parse_user(resp.text)
else:
self.log(resp.text, exit=True)
def start_loop(self):
while True:
time.sleep(120)
self.online()
def run(self) -> None:
while self.login():
self.get_user()
thread = Thread(target=self.start_loop, daemon=True)
thread.start()
cs = self.user.get("courses", [])
if len(cs) > 1:
for i, _c in enumerate(cs):
print(f"{i+1}: {_c.get("name", "")} {_c.get("progress", "")}")
cs_order = input("请输入刷课顺序(用英文逗号分隔,不需要耍的不用加): ")
cs_list = [cs[int(_c) - 1] for _c in cs_order.split(",")]
cs_list = cs
# for c in self.user.get("courses", []):
for c in cs_list:
# if c.get("progress", "0%").split("%")[0] == 100:
# self.log(f"{c.get("name", "")} --> {c.get("id", "")} 已经完成")
# continue
self.log(f"{c.get("name", "")} --> {c.get("id", "")}")
# self.get_course_id(c.get("id", ""))
self.get_study_record(c.get("id", ""))
# print(self.courses)
# os._exit(0)
for l in self.courses:
if "已学" in l.get("state", "") :
self.log(
f'{l.get("name", "")} --> {l.get("id", "")} --> 已经学习过了'
)
continue
# elif "1" == l.get("lock", "0"):
# self.log(
# f'{l.get("name", "")} --> {l.get("id", "")} --> 未解锁'
# )
# continue
node_id = l.get("id", "")
total_time = 0
video_time, video_state = self.get_course(node_id)
self.log(
f'{l.get("name", "")} --> {l.get("id", "")} --> {video_time} --> {video_state}'
)
# if video_state == 2:
# self.log(
# f'{l.get("name", "")} --> {l.get("id", "")} --> 已经学习过了'
# )
# continue
if video_time <= 0:
continue
self.study_start(node_id)
while True:
elapsed = video_time - total_time
if elapsed <= 0:
self.study_over(node_id, f"{video_time}")
break
time.sleep(30)
total_time += 30
self.study(node_id, f"{total_time}")
self.log("所有课程已经结束")
break
if __name__ == "__main__":
host_server = [
["cqcst.yuruixxkj.com", "御瑞科技(选修课)"],
["cqcst.zjxkeji.com", "重庆城市科技学院其他1"],
["cqcst.yuncanjykeji.com", "重庆城市科技学院实训平台其他2"],
]
for i, h in enumerate(host_server):
print(f"{i+1}: {h[0]} {h[1]}")
c = int(input("请选择刷课平台: ")) - 1
u = os.environ.get("YURU_ACCOUNT", "")
p = os.environ.get(
"YURU_PASSWORD",
)
if u == "":
u = input("请输入账号: ")
if p == "":
p = input("请输入密码: ")
if u is None and p is None and c in [0, 1, 2]:
os._exit(0)
CKWK(f"{u}", f"{p}", host_server[c][0]).run()