848 lines
32 KiB
Python
848 lines
32 KiB
Python
"""
|
||
--------------------------------------------
|
||
文件名称: ckwk.py
|
||
作者: zhilv
|
||
邮箱: zhilv666@qq.com
|
||
版本: 1.3
|
||
--------------------------------------------
|
||
说明:
|
||
本脚本仅用于学习与技术研究,禁止将其用于任何非法用途。
|
||
作者不对因使用本脚本造成的任何损失或后果承担责任。
|
||
--------------------------------------------
|
||
"""
|
||
|
||
import json
|
||
import os
|
||
import random
|
||
import re
|
||
import time
|
||
import warnings
|
||
from datetime import datetime
|
||
from threading import Thread
|
||
|
||
import ddddocr
|
||
|
||
# 修复 ANTIALIAS 错误 - 添加猴子补丁
|
||
import PIL.Image
|
||
import requests
|
||
from dotenv import load_dotenv
|
||
from lxml import etree # type: ignore
|
||
from requests.adapters import HTTPAdapter
|
||
from urllib3.util.retry import Retry
|
||
|
||
from deepseek import DeepSeek
|
||
from doubao import DOUBAO
|
||
from kimi import KIMI
|
||
|
||
if not hasattr(PIL.Image, "ANTIALIAS"):
|
||
PIL.Image.ANTIALIAS = PIL.Image.Resampling.LANCZOS # type: ignore
|
||
|
||
warnings.filterwarnings("ignore")
|
||
|
||
|
||
load_dotenv()
|
||
|
||
isReply = False
|
||
|
||
|
||
def replace_html_entities(text, replace_mode="keep_char"):
|
||
"""
|
||
替换网页中的 HTML 实体字符(如 “、” 等)
|
||
:param text: 原始网页文本
|
||
:param replace_mode: 替换模式:
|
||
- "keep_char": 替换为对应的普通字符(默认,如 “ → 「)
|
||
- "remove": 直接去除实体字符(如 “ → 空字符串)
|
||
:return: 处理后的文本
|
||
"""
|
||
# 常见 HTML 实体映射表(可根据需求扩展)
|
||
html_entities = {
|
||
# 引号类(用户示例中的核心场景)
|
||
"“": "「", # 左双引号
|
||
"”": "」", # 右双引号
|
||
"‘": "「", # 左单引号
|
||
"’": "」", # 右单引号
|
||
# 其他常见实体(可选保留/扩展)
|
||
"&": "&", # 和号
|
||
"<": "<", # 小于号
|
||
">": ">", # 大于号
|
||
" ": " ", # 非换行空格
|
||
"©": "©", # 版权符号
|
||
"®": "®", # 注册商标符号
|
||
}
|
||
|
||
# 根据模式调整替换目标
|
||
target_map = (
|
||
html_entities if replace_mode == "keep_char" else {k: "" for k in html_entities}
|
||
)
|
||
|
||
# 批量替换
|
||
result = text
|
||
for entity, target in target_map.items():
|
||
result = result.replace(entity, target)
|
||
|
||
return result
|
||
|
||
|
||
class CKWK:
|
||
def __init__(self, un: str, pw: str, host: str, ai_type="deepseek") -> None:
|
||
"""
|
||
un: 用户名
|
||
pw: 密码
|
||
host: 网站域名
|
||
ai_type: ai类别取值为 'doubao' 或者 'deepseek'
|
||
"""
|
||
if ai_type == "doubao":
|
||
self.ai = DOUBAO()
|
||
elif ai_type == "deepseek":
|
||
self.ai = DeepSeek()
|
||
elif ai_type == "kimi":
|
||
self.ai = KIMI()
|
||
else:
|
||
self.ai = DOUBAO()
|
||
|
||
self.session = requests.Session()
|
||
# 定义重试策略
|
||
retry_strategy = Retry(
|
||
total=5, # 最多重试 5 次
|
||
backoff_factor=1, # 重试间隔:1s, 2s, 4s, 8s...
|
||
status_forcelist=[429, 500, 502, 503, 504], # 哪些状态码触发重试
|
||
allowed_methods=["HEAD", "GET", "OPTIONS", "POST"], # 允许重试的方法
|
||
)
|
||
|
||
# 挂载适配器
|
||
adapter = HTTPAdapter(max_retries=retry_strategy)
|
||
self.session.mount("http://", adapter)
|
||
self.session.mount("https://", adapter)
|
||
# self.session.verify = False
|
||
# self.session.proxies.update(
|
||
# {
|
||
# "http": "http://127.0.0.1:9000",
|
||
# "https": "http://127.0.0.1:9000",
|
||
# }
|
||
# )
|
||
self.username = un
|
||
self.host = host
|
||
self.password = pw
|
||
self.user = dict()
|
||
self.courses = []
|
||
self.works = []
|
||
self.exams = []
|
||
self.qas = []
|
||
self.ocr = ddddocr.DdddOcr()
|
||
self.session.headers.update(
|
||
{
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36 Edg/140.0.0.0",
|
||
"Accept": "application/json, text/javascript, */*; q=0.01",
|
||
"Accept-Encoding": "gzip, deflate, br, zstd",
|
||
}
|
||
)
|
||
|
||
def coder(self) -> str:
|
||
resp = self.session.get(
|
||
f"https://{self.host}/service/code",
|
||
params={"r": f"{random.random()}"},
|
||
)
|
||
if resp.status_code == 200:
|
||
return f"{self.ocr.classification(resp.content)}"
|
||
return ""
|
||
|
||
def login(self) -> bool:
|
||
yzm = self.coder()
|
||
resp = self.session.post(
|
||
f"https://{self.host}/user/login",
|
||
data={
|
||
"username": self.username,
|
||
"password": self.password,
|
||
"code": yzm,
|
||
"redirect": "",
|
||
},
|
||
)
|
||
if "登录成功" in resp.text:
|
||
return True
|
||
try:
|
||
data = json.loads(re.findall(" =(.*?);", resp.text)[0])
|
||
self.log(data.get("msg", ""))
|
||
finally:
|
||
return False
|
||
|
||
def parse_user(self, html: str) -> None:
|
||
tree = etree.HTML(html)
|
||
try:
|
||
self.user = {
|
||
"name": tree.xpath('//div[@class="name"]/text()')[0],
|
||
"course_number": tree.xpath(
|
||
'//div[@class="intro"]/div[1]/div[2]/text()'
|
||
)[0],
|
||
"huping": tree.xpath('//div[@class="intro"]/div[2]/div[2]/text()')[0],
|
||
"lexueyuan": tree.xpath('//div[@class="intro"]/div[3]/div[2]/text()')[
|
||
0
|
||
],
|
||
"taolunzhuti": tree.xpath('//div[@class="intro"]/div[4]/div[2]/text()')[
|
||
0
|
||
],
|
||
"study_time": "".join(
|
||
tree.xpath('//div[@class="intro"]/div[5]/div[2]//text()')
|
||
),
|
||
"courses": [
|
||
{
|
||
"name": course.xpath('.//div[@class="name"]/a/text()')[0],
|
||
"href": course.xpath('.//div[@class="name"]/a/@href')[0],
|
||
"id": course.xpath('.//div[@class="name"]/a/@href')[0].split(
|
||
"="
|
||
)[-1],
|
||
"progress": course.xpath(
|
||
'.//div[@class="progress"]/div[3]/text()'
|
||
)[0],
|
||
}
|
||
for course in tree.xpath('//div[@class="user-course"]/div')
|
||
],
|
||
}
|
||
# print(self.user)
|
||
except Exception as e:
|
||
self.log("[parse] 提取用户信息失败: ", e)
|
||
raise e
|
||
finally:
|
||
pass
|
||
|
||
def parse_course(self, html: str) -> tuple[int, int]:
|
||
tree = etree.HTML(html)
|
||
video = 0
|
||
state = 1
|
||
try:
|
||
if self.courses == []:
|
||
courses = [
|
||
{
|
||
"title": i.get("title", ""),
|
||
"href": i.get("href", ""),
|
||
"id": i.get("href", "").split("=")[-1],
|
||
"class": i.get("class", ""),
|
||
}
|
||
for i in tree.xpath(
|
||
# '//div[@class="item"]/a[contains(text(), "mp4")]'
|
||
'//div[@class="item"]/a'
|
||
# '//div[@class="item"]/a[not(contains(text(), "考试") or contains(text(), "课件") or contains(text(), "练习") or contains(text(), "实验"))]'
|
||
)
|
||
]
|
||
index = next(
|
||
(i for i, c in enumerate(courses) if c["class"] == "on"), None
|
||
)
|
||
|
||
if index is not None:
|
||
self.courses = courses[index:]
|
||
# print(self.courses)
|
||
video = int(tree.xpath('.//input[@id="video-duration"]/@value')[0])
|
||
state = int(tree.xpath('.//input[@id="study-state"]/@value')[0])
|
||
except Exception as e:
|
||
self.log("[parse] 提取课程列表失败: ", e)
|
||
# raise e
|
||
finally:
|
||
return video, state
|
||
|
||
def parse_q_a_work(self, html: str) -> None:
|
||
tree = etree.HTML(html)
|
||
qas = tree.xpath('//div[@class="topic-item"]/form')
|
||
for qa in qas:
|
||
items = {
|
||
"id": qa.xpath(
|
||
'./div[@class="courseexamcon-submit"]/input[@name="answerId"]/@value'
|
||
)[0],
|
||
"index": qa.xpath(
|
||
'./div[@class="courseexamcon-main"]/div/div[@class="num"]/span/text()'
|
||
)[0],
|
||
"type": qa.xpath(
|
||
'./div[@class="courseexamcon-main"]/div/div[@class="type"]/text()'
|
||
)[0],
|
||
"title": qa.xpath(
|
||
'./div[@class="courseexamcon-main"]/div[@class="name"]/text()'
|
||
)[0].strip(),
|
||
"chooies": [
|
||
{
|
||
"num": a.xpath('.//span[@class="num"]/text()')[0],
|
||
"txt": a.xpath('.//span[@class="txt"]/text()')[0],
|
||
}
|
||
for a in qa.xpath(
|
||
'./div[@class="courseexamcon-main"]/div[@class="list"]//li'
|
||
)
|
||
],
|
||
}
|
||
self.qas.append(items)
|
||
|
||
def parse_q_a_exam(self, html: str) -> None:
|
||
tree = etree.HTML(html)
|
||
qas = tree.xpath("//div/form")
|
||
for qa in qas:
|
||
items = {
|
||
"id": qa.xpath(
|
||
'./div[@class="courseexamcon-submit"]/input[@name="answerId"]/@value'
|
||
)[0],
|
||
"index": qa.xpath(
|
||
'./div[@class="courseexamcon-main"]/div/div[@class="num"]/span/text()'
|
||
)[0],
|
||
"type": qa.xpath(
|
||
'./div[@class="courseexamcon-main"]/div/div[@class="type"]/text()'
|
||
)[0],
|
||
"title": qa.xpath(
|
||
'./div[@class="courseexamcon-main"]/div[@class="name"]/text()'
|
||
)[0].strip(),
|
||
"chooies": [
|
||
{
|
||
"num": a.xpath('.//span[@class="num"]/text()')[0],
|
||
"txt": a.xpath('.//span[@class="txt"]/text()')[0],
|
||
}
|
||
for a in qa.xpath(
|
||
'./div[@class="courseexamcon-main"]/div[@class="list"]//li'
|
||
)
|
||
],
|
||
}
|
||
self.qas.append(items)
|
||
|
||
def get_question_answer_work(self, courseID: str, nodeID: str, workID: str):
|
||
headers = self.session.headers
|
||
headers["x-requested-with"] = "XMLHttpRequest"
|
||
resp = self.session.get(
|
||
"https://{}/user/work/start".format(self.host),
|
||
params={"courseId": courseID, "nodeId": nodeID, "workId": workID},
|
||
headers=headers,
|
||
)
|
||
if resp.status_code == 200 and resp.json().get("status", False):
|
||
qa_path = resp.json().get("url", "")
|
||
self.start_url = "https://{}{}".format(self.host, qa_path)
|
||
self.log(f"开始作业: {resp.json().get('msg', '')} url -> {self.start_url}")
|
||
resp2 = self.session.get(self.start_url)
|
||
self.qas = []
|
||
if resp.status_code == 200:
|
||
self.parse_q_a_work(resp2.text)
|
||
else:
|
||
self.log(f"开始作业: {resp.json().get('msg', '')}")
|
||
return False
|
||
return True
|
||
|
||
def get_question_answer_exam(self, courseID: str, nodeID: str, examId: str):
|
||
headers = self.session.headers
|
||
headers["x-requested-with"] = "XMLHttpRequest"
|
||
resp = self.session.get(
|
||
"https://{}/user/exam/start".format(self.host),
|
||
params={"courseId": courseID, "nodeId": nodeID, "examId": examId},
|
||
headers=headers,
|
||
)
|
||
if resp.status_code == 200 and resp.json().get("status", False):
|
||
qa_path = resp.json().get("url", "")
|
||
self.start_url = "https://{}{}".format(self.host, qa_path)
|
||
self.log(f"开始考试: {resp.json().get('msg', '')} url -> {self.start_url}")
|
||
resp2 = self.session.get(self.start_url)
|
||
self.qas = []
|
||
if resp.status_code == 200:
|
||
self.parse_q_a_exam(resp2.text)
|
||
else:
|
||
self.log(f"开始考试: {resp.json().get('msg', '')}")
|
||
return False
|
||
return True
|
||
|
||
def work_submit(
|
||
self, answerID: str, workID: str, answer: str, final: bool, index: int
|
||
):
|
||
headers = self.session.headers
|
||
headers["x-requested-with"] = "XMLHttpRequest"
|
||
data = {
|
||
"answerId": answerID,
|
||
"workId": workID,
|
||
"finish": "0",
|
||
}
|
||
if "," in answer:
|
||
answers = answer.split(",")
|
||
data["answer[]"] = answers
|
||
# for a in answers:
|
||
# data.setdefault("answer[]", []).append(a)
|
||
else:
|
||
if answer == "":
|
||
data["answer"] = "A"
|
||
data["answer"] = answer
|
||
|
||
resp = self.session.post(
|
||
f"https://{self.host}/user/work/submit",
|
||
data=data,
|
||
headers=headers,
|
||
)
|
||
|
||
if not final:
|
||
aa = 1
|
||
while aa == 1:
|
||
print(f"作业链接: {self.start_url}")
|
||
aa = input("现在是最后一道题,请你去检查后输入 0 后自动提交:")
|
||
aa = int(aa)
|
||
if aa == 0:
|
||
data["finish"] = "1"
|
||
resp = self.session.post(
|
||
f"https://{self.host}/user/work/submit",
|
||
data=data,
|
||
headers=headers,
|
||
)
|
||
break
|
||
|
||
if resp.status_code == 200 and resp.json().get("status", False):
|
||
self.log(
|
||
f"{index}/{len(self.qas)} 提交答案: {resp.json().get('msg', '')} {answerID} => {answer} {final}"
|
||
)
|
||
|
||
def exam_submit(
|
||
self, answerID: str, workID: str, answer: str, final: bool, index: int
|
||
):
|
||
headers = self.session.headers
|
||
headers["x-requested-with"] = "XMLHttpRequest"
|
||
data = {
|
||
"answerId": answerID,
|
||
"examId": workID,
|
||
"finish": "0",
|
||
}
|
||
if "," in answer:
|
||
answers = answer.split(",")
|
||
data["answer[]"] = answers
|
||
# for a in answers:
|
||
# data.setdefault("answer[]", []).append(a)
|
||
else:
|
||
if answer == "":
|
||
data["answer"] = "A"
|
||
data["answer"] = answer
|
||
|
||
resp = self.session.post(
|
||
f"https://{self.host}/user/exam/submit",
|
||
data=data,
|
||
headers=headers,
|
||
)
|
||
|
||
if resp.status_code == 200 and resp.json().get("status", False):
|
||
self.log(
|
||
f"{index}/{len(self.qas)} 提交答案: {resp.json().get('msg', '')} {answerID} => {answer} {final}"
|
||
)
|
||
|
||
if not final:
|
||
aa = 1
|
||
while aa == 1:
|
||
print(f"考试链接: {self.start_url}")
|
||
aa = int(input("现在是最后一道,请你去检查后输入 0 后自动提交:"))
|
||
if aa == 0:
|
||
data["finish"] = "1"
|
||
resp = self.session.post(
|
||
f"https://{self.host}/user/exam/submit",
|
||
data=data,
|
||
headers=headers,
|
||
)
|
||
break
|
||
|
||
def get_course(self, _id: str) -> tuple[int, int]:
|
||
resp = self.session.get("https://{}/user/node?nodeId={}".format(self.host, _id))
|
||
if resp.status_code == 200 and "错误提示" not in resp.text:
|
||
return self.parse_course(resp.text)
|
||
elif "错误提示" in resp.text:
|
||
msg = etree.HTML(resp.text).xpath('//div[@class="name"]/text()')[0]
|
||
self.log(msg)
|
||
return 0, 1
|
||
self.log(resp.text)
|
||
return 0, 1
|
||
|
||
def add_reply(self, content: str, courseId: str, nodeId: str):
|
||
headers = self.session.headers
|
||
headers["x-requested-with"] = "XMLHttpRequest"
|
||
resp = self.session.post(
|
||
f"https://{self.host}/user/node_discuss/add_reply",
|
||
data={
|
||
"content": replace_html_entities(content),
|
||
"images": "",
|
||
"files": "",
|
||
"courseId": courseId,
|
||
"nodeId": nodeId,
|
||
},
|
||
headers=headers,
|
||
)
|
||
if resp.status_code == 200 and resp.json().get("status", False):
|
||
self.log(
|
||
f"添加评论: {resp.json().get('msg', '')} content -> {replace_html_entities(content)}"
|
||
)
|
||
|
||
def delete_reply(self, courseId: str, nodeId: str, replyId: str):
|
||
headers = self.session.headers
|
||
headers["x-requested-with"] = "XMLHttpRequest"
|
||
resp = self.session.get(
|
||
f"https://{self.host}/user/node_discuss/delete_reply",
|
||
params={"courseId": courseId, "nodeId": nodeId, "replyId": replyId},
|
||
headers=headers,
|
||
)
|
||
if resp.status_code == 200 and resp.json().get("status", False):
|
||
self.log(f"删除评论: {resp.json().get('msg', '')}")
|
||
|
||
def reply(self, courseId: str, nodeId: str):
|
||
headers = self.session.headers
|
||
headers["x-requested-with"] = "XMLHttpRequest"
|
||
resp = self.session.get(
|
||
f"https://{self.host}/user/node_discuss/reply",
|
||
params={
|
||
"courseId": courseId,
|
||
"nodeId": nodeId,
|
||
"_": str(int(time.time() * 1000)),
|
||
},
|
||
headers=headers,
|
||
)
|
||
if (
|
||
resp.status_code == 200
|
||
and resp.json().get("status", False)
|
||
and resp.json().get("pageInfo", {}).get("recordsCount", 0) > 0
|
||
):
|
||
self.add_reply(
|
||
resp.json().get("list", [{}])[0].get("content", ""), courseId, nodeId
|
||
)
|
||
# self.delete_reply(
|
||
# courseId, nodeId, resp.json().get("list", [{}])[0].get("id", "")
|
||
# )
|
||
# return resp.json().get("list", [{}])[0]
|
||
# return {}
|
||
|
||
def get_course_id(self, _id):
|
||
resp = self.session.get(
|
||
"https://{}/user/course?courseId={}".format(self.host, _id)
|
||
)
|
||
tree = etree.HTML(resp.text)
|
||
|
||
self.get_course(
|
||
"".join(
|
||
tree.xpath('//div[@class="ncoursecon-intro"]/div/div[1]/a/@href')
|
||
).split("=")[-1]
|
||
)
|
||
|
||
def get_study_record(self, course_id: str, page=1):
|
||
resp = self.session.get(
|
||
f"https://{self.host}/user/study_record.json",
|
||
params={
|
||
"courseId": course_id,
|
||
"page": page,
|
||
"_": str(int(time.time() * 1000)),
|
||
},
|
||
)
|
||
if resp.status_code == 200 and resp.json().get("status", False):
|
||
self.log(f"{resp.json().get('msg', '')} page -> {page}")
|
||
self.courses.extend(resp.json().get("list", []))
|
||
if page < resp.json().get("pageInfo", {}).get("pageCount", 0):
|
||
self.get_study_record(course_id, page + 1)
|
||
|
||
def get_work_record(self, course_id: str, user_id: str, page=1):
|
||
resp = self.session.get(
|
||
f"https://{self.host}/user/study_record/work.json",
|
||
params={
|
||
"courseId": course_id,
|
||
# "userId": user_id,
|
||
"page": page,
|
||
"_": str(int(time.time() * 1000)),
|
||
},
|
||
)
|
||
if resp.status_code == 200 and resp.json().get("status", False):
|
||
self.log(f"{resp.json().get('msg', '')} page -> {page}")
|
||
self.works.extend(resp.json().get("list", []))
|
||
if page < resp.json().get("pageInfo", {}).get("pageCount", 0):
|
||
self.get_study_record(course_id, page + 1)
|
||
|
||
def get_exam_record(self, course_id: str, page=1):
|
||
resp = self.session.get(
|
||
f"https://{self.host}/user/study_record/exam.json",
|
||
params={
|
||
"courseId": course_id,
|
||
"page": page,
|
||
"_": str(int(time.time() * 1000)),
|
||
},
|
||
)
|
||
if resp.status_code == 200 and resp.json().get("status", False):
|
||
self.log(f"{resp.json().get('msg', '')} page -> {page}")
|
||
self.exams.extend(resp.json().get("list", []))
|
||
if page < resp.json().get("pageInfo", {}).get("pageCount", 0):
|
||
self.get_exam_record(course_id, page + 1)
|
||
|
||
def log(self, *args, **kwargs) -> None:
|
||
exit_flag = False
|
||
time_str = datetime.now().strftime("[%Y-%m-%d %H:%M:%S]")
|
||
prefix = f"[{self.user.get('name', '')}] {time_str}"
|
||
# 检查 args 中是否有 \r,若有则将前缀拼在 \r 之后
|
||
if args and isinstance(args[0], str) and "\r" in args[0]:
|
||
# 将前缀插入到最后一个 \r 后面
|
||
msg = args[0].rsplit("\r", 1)
|
||
log_content = (
|
||
f"{msg[0]}\r{prefix} {msg[1]}"
|
||
if len(msg) > 1
|
||
else f"{prefix} {args[0]}"
|
||
)
|
||
else:
|
||
log_content = f"{prefix} {' '.join(map(str, args))}"
|
||
print(log_content, **kwargs)
|
||
if exit_flag:
|
||
os._exit(0)
|
||
|
||
def online(self) -> bool:
|
||
resp = self.session.post(f"https://{self.host}/user/online")
|
||
if resp.status_code == 200 and resp.json().get("status", False):
|
||
return True
|
||
return False
|
||
|
||
def study_start(self, node_id: str) -> None:
|
||
headers = self.session.headers
|
||
headers["x-requested-with"] = "XMLHttpRequest"
|
||
resp = self.session.post(
|
||
f"https://{self.host}/user/node/study",
|
||
data={
|
||
"nodeId": node_id,
|
||
"studyId": 0,
|
||
"studyTime": 1,
|
||
"code": self.coder(),
|
||
},
|
||
headers=headers,
|
||
)
|
||
try:
|
||
if resp.status_code == 200 and resp.json().get("status", False):
|
||
self.log(f"{resp.json().get('msg', '')} --> 1", end="", flush=True)
|
||
self.study_id = resp.json().get("studyId", 0)
|
||
except:
|
||
if resp.status_code == 200:
|
||
data = json.loads(
|
||
re.findall(">var data =(.*?);</script>", resp.text)[0]
|
||
)
|
||
self.log(data.get("msg", ""))
|
||
self.study_id = data.get("studyId", 0)
|
||
|
||
def study(self, node_id: str, study_time: str) -> None:
|
||
headers = self.session.headers
|
||
headers["x-requested-with"] = "XMLHttpRequest"
|
||
resp = self.session.post(
|
||
f"https://{self.host}/user/node/study",
|
||
data={
|
||
"nodeId": node_id,
|
||
"studyId": self.study_id,
|
||
"studyTime": study_time,
|
||
},
|
||
headers=headers,
|
||
)
|
||
if resp.status_code == 200 and resp.json().get("status", False):
|
||
self.log(
|
||
f"\r{' ' * 100}\r{resp.json().get('msg', '')} --> {study_time}",
|
||
end="",
|
||
flush=True,
|
||
)
|
||
else:
|
||
self.study(node_id, study_time)
|
||
|
||
def study_over(self, node_id, study_time: str) -> bool:
|
||
headers = self.session.headers
|
||
headers["x-requested-with"] = "XMLHttpRequest"
|
||
resp = self.session.post(
|
||
f"https://{self.host}/user/node/study",
|
||
data={
|
||
"nodeId": node_id,
|
||
"studyId": self.study_id,
|
||
"studyTime": study_time,
|
||
"close": "1",
|
||
},
|
||
headers=headers,
|
||
)
|
||
if resp.status_code == 200 and resp.json().get("status", False):
|
||
self.log(
|
||
f"\r{' ' * 100}\r{resp.json().get('msg', '')} --> {study_time}",
|
||
flush=True,
|
||
)
|
||
return True
|
||
return False
|
||
|
||
def get_user(self) -> None:
|
||
resp = self.session.get(f"https://{self.host}/user")
|
||
if resp.status_code == 200:
|
||
self.parse_user(resp.text)
|
||
else:
|
||
self.log(resp.text, exit=True)
|
||
|
||
def start_loop(self):
|
||
while True:
|
||
time.sleep(120)
|
||
self.online()
|
||
|
||
def run(self) -> None:
|
||
print("当前为刷课模式: ")
|
||
while self.login():
|
||
self.get_user()
|
||
thread = Thread(target=self.start_loop, daemon=True)
|
||
thread.start()
|
||
cs = self.user.get("courses", [])
|
||
print(
|
||
f"姓名: {self.user.get('name', '')}\n\t互评: {self.user.get('huping', 0)}\n\t乐学园: {self.user.get('lexueyuan', 0)}\n\t讨论主题: {self.user.get('taolunzhuti', 0)}\n\t学习时长: {self.user.get('study_time', '')}"
|
||
)
|
||
if len(cs) > 1:
|
||
for i, _c in enumerate(cs):
|
||
print(f"{i + 1}: {_c.get('name', '')} {_c.get('progress', '')}")
|
||
cs_order = input("请输入刷课顺序(用英文逗号分隔,不需要耍的不用加): ")
|
||
cs_list = [cs[int(_c) - 1] for _c in cs_order.split(",")]
|
||
cs_list = cs
|
||
for c in cs_list:
|
||
print(f"课程名: {c.get('name', '')}\n\t进度: {c.get('progress', '')}")
|
||
self.log(f"{c.get('name', '')} --> {c.get('id', '')}")
|
||
|
||
self.get_study_record(c.get("id", ""))
|
||
|
||
for l in self.courses:
|
||
if "已学" in l.get("state", ""):
|
||
# 用于将之前已学过的视频进行补评论,请自行解除注释
|
||
# if isReply:
|
||
# self.reply(c.get("id", ""), l.get("id", ""))
|
||
self.log(
|
||
f"{l.get('name', '')} --> {l.get('id', '')} --> 已经学习过了"
|
||
)
|
||
continue
|
||
|
||
if isReply:
|
||
self.reply(c.get("id", ""), l.get("id", ""))
|
||
|
||
node_id = l.get("id", "")
|
||
total_time = 0
|
||
video_time, video_state = self.get_course(node_id)
|
||
self.log(
|
||
f"{l.get('name', '')} --> {l.get('id', '')} --> {video_time} --> {video_state}"
|
||
)
|
||
|
||
if video_time <= 0:
|
||
continue
|
||
|
||
self.study_start(node_id)
|
||
|
||
while True:
|
||
elapsed = video_time - total_time
|
||
if elapsed <= 0:
|
||
self.study_over(node_id, f"{video_time}")
|
||
break
|
||
time.sleep(30)
|
||
total_time += 30
|
||
self.study(node_id, f"{total_time}")
|
||
self.log("所有课程已经结束")
|
||
break
|
||
|
||
def work(self):
|
||
print("当前为刷作业模式: ")
|
||
while self.login():
|
||
self.get_user()
|
||
thread = Thread(target=self.start_loop, daemon=True)
|
||
thread.start()
|
||
cs = self.user.get("courses", [])
|
||
print(
|
||
f"姓名: {self.user.get('name', '')}\n\t互评: {self.user.get('huping', 0)}\n\t乐学园: {self.user.get('lexueyuan', 0)}\n\t讨论主题: {self.user.get('taolunzhuti', 0)}\n\t学习时长: {self.user.get('study_time', '')}"
|
||
)
|
||
if len(cs) > 1:
|
||
for i, _c in enumerate(cs):
|
||
print(f"{i + 1}: {_c.get('name', '')} {_c.get('progress', '')}")
|
||
cs_order = input("请输入刷课顺序(用英文逗号分隔,不需要耍的不用加): ")
|
||
cs_list = [cs[int(_c) - 1] for _c in cs_order.split(",")]
|
||
cs_list = cs
|
||
for c in cs_list:
|
||
self.get_work_record(c.get("id", ""), "")
|
||
# print(self.works)
|
||
for work in self.works:
|
||
self.qas = []
|
||
if "已阅" in work.get("state"):
|
||
self.log(
|
||
f"{work.get('title', '')} 已阅 {work.get('finalScore', '')}"
|
||
)
|
||
continue
|
||
if not self.get_question_answer_work(
|
||
c.get("id", ""), work.get("nodeId", ""), work.get("id", "")
|
||
):
|
||
continue
|
||
for q in self.qas:
|
||
# print(q)
|
||
chooise = self.ai.chat(q)
|
||
chooise = chooise if chooise else "C"
|
||
self.work_submit(
|
||
q.get("id", ""),
|
||
work.get("id", ""),
|
||
chooise,
|
||
int(q.get("index", 1)) < len(self.qas),
|
||
q.get("index", 1),
|
||
)
|
||
time.sleep(2)
|
||
# break
|
||
break
|
||
|
||
def exam(self):
|
||
while self.login():
|
||
self.get_user()
|
||
thread = Thread(target=self.start_loop, daemon=True)
|
||
thread.start()
|
||
cs = self.user.get("courses", [])
|
||
print(
|
||
f"姓名: {self.user.get('name', '')}\n\t互评: {self.user.get('huping', 0)}\n\t乐学园: {self.user.get('lexueyuan', 0)}\n\t讨论主题: {self.user.get('taolunzhuti', 0)}\n\t学习时长: {self.user.get('study_time', '')}"
|
||
)
|
||
if len(cs) > 1:
|
||
for i, _c in enumerate(cs):
|
||
print(f"{i + 1}: {_c.get('name', '')} {_c.get('progress', '')}")
|
||
cs_order = input("请输入刷课顺序(用英文逗号分隔,不需要耍的不用加): ")
|
||
cs_list = [cs[int(_c) - 1] for _c in cs_order.split(",")]
|
||
cs_list = cs
|
||
for c in cs_list:
|
||
self.get_exam_record(c.get("id", ""))
|
||
print(self.exams)
|
||
for exam in self.exams:
|
||
self.qas = []
|
||
if "线下阅" in exam.get("state"):
|
||
self.log(
|
||
f"{exam.get('title', '')} 已阅 {exam.get('finalScore', '')}"
|
||
)
|
||
continue
|
||
print(exam)
|
||
if not self.get_question_answer_exam(
|
||
c.get("id", ""), exam.get("nodeId", ""), exam.get("id", "")
|
||
):
|
||
continue
|
||
for q in self.qas:
|
||
# print(q)
|
||
chooise = self.ai.chat(q)
|
||
chooise = chooise if chooise else "A"
|
||
self.exam_submit(
|
||
q.get("id", ""),
|
||
exam.get("id", ""),
|
||
chooise,
|
||
int(q.get("index", 1)) < len(self.qas),
|
||
q.get("index", 1),
|
||
)
|
||
time.sleep(2)
|
||
# break
|
||
break
|
||
|
||
|
||
if __name__ == "__main__":
|
||
host_server = [
|
||
# ["cqcst.yuruixxkj.com", "御瑞科技(选修课)"],
|
||
# ["cqcst.zjxkeji.com", "重庆城市科技学院(其他1)"],
|
||
# ["cqcst.yuncanjykeji.com", "重庆城市科技学院实训平台(其他2)"],
|
||
["cqcst.leykeji.com", "劳动课程测评考试平台"],
|
||
# ["cqcst.zjxkeji.com", "公益课程平台"],
|
||
["cqcst.suwankj.com", "在线课程测评考试平台"],
|
||
# ["cqcst.yuruixxkj.com", "在线测评考试平台"],
|
||
]
|
||
for i, h in enumerate(host_server):
|
||
print(f"{i + 1}: {h[0]} {h[1]}")
|
||
c = int(input("请选择刷课平台: ")) - 1
|
||
u = os.environ.get("YURU_ACCOUNT", "")
|
||
p = os.environ.get("YURU_PASSWORD", "")
|
||
if u == "":
|
||
u = input("请输入账号: ")
|
||
if p == "":
|
||
p = input("请输入密码: ")
|
||
|
||
# 刷课程评论, 不需要用的时候可以直接注释
|
||
# r = input("是否需要评论(不需要[0],需要[1]): ")
|
||
# if int(r) == 1:
|
||
# isReply = True
|
||
# else:
|
||
# isReply = False
|
||
# print(bool(isReply))
|
||
# if u is None and p is None and c in [0, 1, 2]:
|
||
# os._exit(0)
|
||
|
||
# 刷视频,不需要用的时候可以直接注释
|
||
# CKWK(f"{u}", f"{p}", host_server[c][0]).run()
|
||
|
||
# 刷作业,不需要用的时候可以直接注释
|
||
# CKWK(f"{u}", f"{p}", host_server[c][0]).work()
|
||
|
||
# 考试,不需要用的时候可以直接注释
|
||
CKWK(f"{u}", f"{p}", host_server[c][0]).exam()
|