397 lines
14 KiB
Python
397 lines
14 KiB
Python
"""
|
||
--------------------------------------------
|
||
文件名称: ckwk.py
|
||
作者: zhilv
|
||
邮箱: zhilv666@qq.com
|
||
版本: 1.0
|
||
--------------------------------------------
|
||
说明:
|
||
本脚本仅用于学习与技术研究,禁止将其用于任何非法用途。
|
||
作者不对因使用本脚本造成的任何损失或后果承担责任。
|
||
--------------------------------------------
|
||
"""
|
||
|
||
import time
|
||
import os
|
||
import requests
|
||
import re
|
||
import json
|
||
import random
|
||
import ddddocr
|
||
from datetime import datetime
|
||
from dotenv import load_dotenv
|
||
from lxml import etree # type: ignore
|
||
from threading import Thread
|
||
from requests.adapters import HTTPAdapter
|
||
from urllib3.util.retry import Retry
|
||
import warnings
|
||
|
||
# 修复 ANTIALIAS 错误 - 添加猴子补丁
|
||
import PIL.Image
|
||
if not hasattr(PIL.Image, 'ANTIALIAS'):
|
||
PIL.Image.ANTIALIAS = PIL.Image.Resampling.LANCZOS # type: ignore
|
||
|
||
warnings.filterwarnings("ignore")
|
||
|
||
|
||
load_dotenv()
|
||
|
||
|
||
class CKWK:
|
||
def __init__(self, un: str, pw: str, host: str) -> None:
|
||
self.session = requests.Session()
|
||
# 定义重试策略
|
||
retry_strategy = Retry(
|
||
total=5, # 最多重试 5 次
|
||
backoff_factor=1, # 重试间隔:1s, 2s, 4s, 8s...
|
||
status_forcelist=[429, 500, 502, 503, 504], # 哪些状态码触发重试
|
||
allowed_methods=["HEAD", "GET", "OPTIONS", "POST"], # 允许重试的方法
|
||
)
|
||
|
||
# 挂载适配器
|
||
adapter = HTTPAdapter(max_retries=retry_strategy)
|
||
self.session.mount("http://", adapter)
|
||
self.session.mount("https://", adapter)
|
||
# self.session.verify = False
|
||
# self.session.proxies.update(
|
||
# {
|
||
# "http": "http://127.0.0.1:9000",
|
||
# "https": "http://127.0.0.1:9000",
|
||
# }
|
||
# )
|
||
self.username = un
|
||
self.host = host
|
||
self.password = pw
|
||
self.user = dict()
|
||
self.courses = []
|
||
self.ocr = ddddocr.DdddOcr()
|
||
self.session.headers.update(
|
||
{
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36 Edg/140.0.0.0",
|
||
"Accept": "application/json, text/javascript, */*; q=0.01",
|
||
"Accept-Encoding": "gzip, deflate, br, zstd",
|
||
}
|
||
)
|
||
|
||
def coder(self) -> str:
|
||
resp = self.session.get(
|
||
f"https://{self.host}/service/code",
|
||
params={"r": f"{random.random()}"},
|
||
)
|
||
if resp.status_code == 200:
|
||
return f"{self.ocr.classification(resp.content)}"
|
||
return ""
|
||
|
||
def login(self) -> bool:
|
||
yzm = self.coder()
|
||
resp = self.session.post(
|
||
f"https://{self.host}/user/login",
|
||
data={
|
||
"username": self.username,
|
||
"password": self.password,
|
||
"code": yzm,
|
||
"redirect": "",
|
||
},
|
||
)
|
||
if "登录成功" in resp.text:
|
||
return True
|
||
self.log(resp.text)
|
||
return False
|
||
|
||
def parse_user(self, html: str) -> None:
|
||
tree = etree.HTML(html)
|
||
try:
|
||
self.user = {
|
||
"name": tree.xpath('//div[@class="name"]/text()')[0],
|
||
"course_number": tree.xpath(
|
||
'//div[@class="intro"]/div[1]/div[2]/text()'
|
||
)[0],
|
||
"huping": tree.xpath('//div[@class="intro"]/div[2]/div[2]/text()')[0],
|
||
"lexueyuan": tree.xpath('//div[@class="intro"]/div[3]/div[2]/text()')[
|
||
0
|
||
],
|
||
"taolunzhuti": tree.xpath('//div[@class="intro"]/div[4]/div[2]/text()')[
|
||
0
|
||
],
|
||
"study_time": "".join(
|
||
tree.xpath('//div[@class="intro"]/div[5]/div[2]//text()')
|
||
),
|
||
"courses": [
|
||
{
|
||
"name": course.xpath('.//div[@class="name"]/a/text()')[0],
|
||
"href": course.xpath('.//div[@class="name"]/a/@href')[0],
|
||
"id": course.xpath('.//div[@class="name"]/a/@href')[0].split(
|
||
"="
|
||
)[-1],
|
||
"progress": course.xpath(
|
||
'.//div[@class="progress"]/div[3]/text()'
|
||
)[0],
|
||
}
|
||
for course in tree.xpath('//div[@class="user-course"]/div')
|
||
],
|
||
}
|
||
# print(self.user)
|
||
except Exception as e:
|
||
self.log("[parse] 提取用户信息失败: ", e)
|
||
raise e
|
||
finally:
|
||
pass
|
||
|
||
def parse_course(self, html: str) -> tuple[int, int]:
|
||
tree = etree.HTML(html)
|
||
video = 0
|
||
state = 1
|
||
try:
|
||
if self.courses == []:
|
||
courses = [
|
||
{
|
||
"title": i.get("title", ""),
|
||
"href": i.get("href", ""),
|
||
"id": i.get("href", "").split("=")[-1],
|
||
"class": i.get("class", ""),
|
||
}
|
||
for i in tree.xpath(
|
||
# '//div[@class="item"]/a[contains(text(), "mp4")]'
|
||
'//div[@class="item"]/a'
|
||
# '//div[@class="item"]/a[not(contains(text(), "考试") or contains(text(), "课件") or contains(text(), "练习") or contains(text(), "实验"))]'
|
||
)
|
||
]
|
||
index = next(
|
||
(i for i, c in enumerate(courses) if c["class"] == "on"), None
|
||
)
|
||
|
||
if index is not None:
|
||
self.courses = courses[index:]
|
||
# print(self.courses)
|
||
video = int(tree.xpath('.//input[@id="video-duration"]/@value')[0])
|
||
state = int(tree.xpath('.//input[@id="study-state"]/@value')[0])
|
||
except Exception as e:
|
||
self.log("[parse] 提取课程列表失败: ", e)
|
||
# raise e
|
||
finally:
|
||
return video, state
|
||
|
||
def get_course(self, _id: str) -> tuple[int, int]:
|
||
resp = self.session.get("https://{}/user/node?nodeId={}".format(self.host, _id))
|
||
if resp.status_code == 200 and "错误提示" not in resp.text:
|
||
return self.parse_course(resp.text)
|
||
elif "错误提示" in resp.text:
|
||
msg = etree.HTML(resp.text).xpath('//div[@class="name"]/text()')[0]
|
||
self.log(msg)
|
||
return 0, 1
|
||
self.log(resp.text)
|
||
return 0, 1
|
||
|
||
def get_course_id(self, _id):
|
||
resp = self.session.get(
|
||
"https://{}/user/course?courseId={}".format(self.host, _id)
|
||
)
|
||
tree = etree.HTML(resp.text)
|
||
|
||
self.get_course(
|
||
"".join(
|
||
tree.xpath('//div[@class="ncoursecon-intro"]/div/div[1]/a/@href')
|
||
).split("=")[-1]
|
||
)
|
||
|
||
def get_study_record(self, course_id: str, page=1):
|
||
resp = self.session.get(
|
||
f"https://{self.host}/user/study_record.json",
|
||
params={
|
||
"courseId": course_id,
|
||
"page": page,
|
||
"_": str(int(time.time() * 1000)),
|
||
},
|
||
)
|
||
if resp.status_code == 200 and resp.json().get("status", False):
|
||
self.log(f'{resp.json().get("msg", "")} page -> {page}')
|
||
self.courses.extend(resp.json().get("list", []))
|
||
if page <= resp.json().get("pageInfo", {}).get("pageCount", 0):
|
||
self.get_study_record(course_id, page + 1)
|
||
|
||
def log(self, *args, **kwargs) -> None:
|
||
exit_flag = False
|
||
time_str = datetime.now().strftime("[%Y-%m-%d %H:%M:%S]")
|
||
prefix = f'[{self.user.get("name", "")}] {time_str}'
|
||
# 检查 args 中是否有 \r,若有则将前缀拼在 \r 之后
|
||
if args and isinstance(args[0], str) and "\r" in args[0]:
|
||
# 将前缀插入到最后一个 \r 后面
|
||
msg = args[0].rsplit("\r", 1)
|
||
log_content = (
|
||
f"{msg[0]}\r{prefix} {msg[1]}"
|
||
if len(msg) > 1
|
||
else f"{prefix} {args[0]}"
|
||
)
|
||
else:
|
||
log_content = f'{prefix} {" ".join(map(str, args))}'
|
||
print(log_content, **kwargs)
|
||
if exit_flag:
|
||
os._exit(0)
|
||
|
||
def online(self) -> bool:
|
||
resp = self.session.post(f"https://{self.host}/user/online")
|
||
if resp.status_code == 200 and resp.json().get("status", False):
|
||
return True
|
||
return False
|
||
|
||
def study_start(self, node_id: str) -> None:
|
||
headers = self.session.headers
|
||
headers["x-requested-with"] = "XMLHttpRequest"
|
||
resp = self.session.post(
|
||
f"https://{self.host}/user/node/study",
|
||
data={
|
||
"nodeId": node_id,
|
||
"studyId": 0,
|
||
"studyTime": 1,
|
||
"code": self.coder(),
|
||
},
|
||
headers=headers,
|
||
)
|
||
try:
|
||
if resp.status_code == 200 and resp.json().get("status", False):
|
||
self.log(f'{resp.json().get("msg", "")} --> 1', end="", flush=True)
|
||
self.study_id = resp.json().get("studyId", 0)
|
||
except:
|
||
if resp.status_code == 200:
|
||
data = json.loads(
|
||
re.findall(">var data =(.*?);</script>", resp.text)[0]
|
||
)
|
||
self.log(data.get("msg", ""))
|
||
self.study_id = data.get("studyId", 0)
|
||
|
||
def study(self, node_id: str, study_time: str) -> None:
|
||
headers = self.session.headers
|
||
headers["x-requested-with"] = "XMLHttpRequest"
|
||
resp = self.session.post(
|
||
f"https://{self.host}/user/node/study",
|
||
data={
|
||
"nodeId": node_id,
|
||
"studyId": self.study_id,
|
||
"studyTime": study_time,
|
||
},
|
||
headers=headers,
|
||
)
|
||
if resp.status_code == 200 and resp.json().get("status", False):
|
||
self.log(
|
||
f'\r{" " * 100}\r{resp.json().get("msg", "")} --> {study_time}',
|
||
end="",
|
||
flush=True,
|
||
)
|
||
else:
|
||
self.study(node_id, study_time)
|
||
|
||
def study_over(self, node_id, study_time: str) -> bool:
|
||
headers = self.session.headers
|
||
headers["x-requested-with"] = "XMLHttpRequest"
|
||
resp = self.session.post(
|
||
f"https://{self.host}/user/node/study",
|
||
data={
|
||
"nodeId": node_id,
|
||
"studyId": self.study_id,
|
||
"studyTime": study_time,
|
||
"close": "1",
|
||
},
|
||
headers=headers,
|
||
)
|
||
if resp.status_code == 200 and resp.json().get("status", False):
|
||
self.log(
|
||
f'\r{" " * 100}\r{resp.json().get("msg", "")} --> {study_time}',
|
||
flush=True,
|
||
)
|
||
return True
|
||
return False
|
||
|
||
def get_user(self) -> None:
|
||
resp = self.session.get(f"https://{self.host}/user")
|
||
if resp.status_code == 200:
|
||
self.parse_user(resp.text)
|
||
else:
|
||
self.log(resp.text, exit=True)
|
||
|
||
def start_loop(self):
|
||
while True:
|
||
time.sleep(120)
|
||
self.online()
|
||
|
||
def run(self) -> None:
|
||
while self.login():
|
||
self.get_user()
|
||
thread = Thread(target=self.start_loop, daemon=True)
|
||
thread.start()
|
||
cs = self.user.get("courses", [])
|
||
if len(cs) > 1:
|
||
for i, _c in enumerate(cs):
|
||
print(f"{i+1}: {_c.get("name", "")} {_c.get("progress", "")}")
|
||
cs_order = input("请输入刷课顺序(用英文逗号分隔,不需要耍的不用加): ")
|
||
cs_list = [cs[int(_c) - 1] for _c in cs_order.split(",")]
|
||
print(f"{cs[0].get("name", "")} {cs[0].get("progress", "")}")
|
||
cs_list = cs
|
||
# for c in self.user.get("courses", []):
|
||
for c in cs_list:
|
||
# if c.get("progress", "0%").split("%")[0] == 100:
|
||
# self.log(f"{c.get("name", "")} --> {c.get("id", "")} 已经完成")
|
||
# continue
|
||
self.log(f"{c.get("name", "")} --> {c.get("id", "")}")
|
||
# self.get_course_id(c.get("id", ""))
|
||
self.get_study_record(c.get("id", ""))
|
||
# print(self.courses)
|
||
# os._exit(0)
|
||
for l in self.courses:
|
||
if "已学" in l.get("state", ""):
|
||
self.log(
|
||
f'{l.get("name", "")} --> {l.get("id", "")} --> 已经学习过了'
|
||
)
|
||
continue
|
||
# elif "1" == l.get("lock", "0"):
|
||
# self.log(
|
||
# f'{l.get("name", "")} --> {l.get("id", "")} --> 未解锁'
|
||
# )
|
||
# continue
|
||
|
||
node_id = l.get("id", "")
|
||
total_time = 0
|
||
video_time, video_state = self.get_course(node_id)
|
||
self.log(
|
||
f'{l.get("name", "")} --> {l.get("id", "")} --> {video_time} --> {video_state}'
|
||
)
|
||
# if video_state == 2:
|
||
# self.log(
|
||
# f'{l.get("name", "")} --> {l.get("id", "")} --> 已经学习过了'
|
||
# )
|
||
# continue
|
||
if video_time <= 0:
|
||
continue
|
||
|
||
self.study_start(node_id)
|
||
|
||
while True:
|
||
elapsed = video_time - total_time
|
||
if elapsed <= 0:
|
||
self.study_over(node_id, f"{video_time}")
|
||
break
|
||
time.sleep(30)
|
||
total_time += 30
|
||
self.study(node_id, f"{total_time}")
|
||
self.log("所有课程已经结束")
|
||
break
|
||
|
||
|
||
if __name__ == "__main__":
|
||
host_server = [
|
||
["cqcst.yuruixxkj.com", "御瑞科技(选修课)"],
|
||
["cqcst.zjxkeji.com", "重庆城市科技学院(其他1)"],
|
||
["cqcst.yuncanjykeji.com", "重庆城市科技学院实训平台(其他2)"],
|
||
]
|
||
for i, h in enumerate(host_server):
|
||
print(f"{i+1}: {h[0]} {h[1]}")
|
||
c = int(input("请选择刷课平台: ")) - 1
|
||
u = os.environ.get("YURU_ACCOUNT", "")
|
||
p = os.environ.get("YURU_PASSWORD", "")
|
||
if u == "":
|
||
u = input("请输入账号: ")
|
||
if p == "":
|
||
p = input("请输入密码: ")
|
||
if u is None and p is None and c in [0, 1, 2]:
|
||
os._exit(0)
|
||
CKWK(f"{u}", f"{p}", host_server[c][0]).run()
|