feat(work): 新增刷作业模式,具体使用请查看 main 函数

This commit is contained in:
2025-11-28 20:12:51 +08:00
parent 2957e88770
commit 4dd77bf0b7
6 changed files with 307 additions and 19 deletions

Binary file not shown.

Binary file not shown.

64
deepseek.py Normal file
View File

@@ -0,0 +1,64 @@
# Please install OpenAI SDK first: `pip3 install openai`
import os
import json
from openai import OpenAI
from dotenv import load_dotenv
load_dotenv()
class DeepSeek:
system_prompt = (
"You are a professional QA answer assistant.\n"
"The user will provide a JSON object containing a question.\n"
"Your job:\n"
"1. Read the 'title' and the 'chooies'.\n"
"2. Determine the correct answer(s).\n"
"3. Output ONLY the 'num' of the correct choices.\n"
"4. If multiple answers exist, join them with English commas, e.g., 'A,C,D'.\n"
"5. If you cannot determine the answer confidently, return an empty string.\n"
"6. Do not output explanations or any extra text."
)
def __init__(self) -> None:
self.key = os.environ.get("DEEPSEEK_API_KEY")
if self.key:
raise Exception("找不到 DEEPSEEK_API_KEY 请设置环境变量在运行代码")
self.client = OpenAI(
api_key=self.key,
base_url="https://api.deepseek.com",
)
def chat(self, question_json: dict) -> str:
response = self.client.chat.completions.create(
model="deepseek-chat",
# model="deepseek-reasoner",
messages=[
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": json.dumps(question_json)},
],
stream=False,
)
# print(response.choices[0].message.content)
return (
response.choices[0].message.content
if response.choices[0].message.content
else ""
)
if __name__ == "__main__":
question = {
"id": "25594265",
"index": "1",
"type": "单选",
"title": "下列哪个属于AI视觉识别应用",
"chooies": [
{"num": "A", "txt": "音乐合成"},
{"num": "B", "txt": "植物识别"},
{"num": "C", "txt": "文案创作"},
{"num": "D", "txt": "机器翻译"},
],
}
DeepSeek().chat(question)

65
doubao.py Normal file
View File

@@ -0,0 +1,65 @@
# Please install OpenAI SDK first: `pip3 install openai`
import os
import json
from openai import OpenAI
from dotenv import load_dotenv
load_dotenv()
class DOUBAO:
system_prompt = (
"You are a professional QA answer assistant.\n"
"The user will provide a JSON object containing a question.\n"
"Your job:\n"
"1. Read the 'title' and the 'chooies'.\n"
"2. Determine the correct answer(s).\n"
"3. Output ONLY the 'num' of the correct choices.\n"
"4. If multiple answers exist, join them with English commas, e.g., 'A,C,D'.\n"
"5. If you cannot determine the answer confidently, return an empty string.\n"
"6. Do not output explanations or any extra text."
)
def __init__(self) -> None:
self.key = os.environ.get("DOUBAO_API_KEY")
if self.key:
raise Exception("找不到 DOUBAO_API_KEY 请设置环境变量在运行代码")
self.client = OpenAI(
api_key=self.key,
base_url="https://ark.cn-beijing.volces.com/api/v3",
)
def chat(self, question_json: dict) -> str:
response = self.client.chat.completions.create(
# model="deepseek-chat",
model="doubao-seed-code-preview-251028",
# model="deepseek-reasoner",
messages=[
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": json.dumps(question_json)},
],
stream=False,
)
# print(response.choices[0].message.content)
return (
response.choices[0].message.content
if response.choices[0].message.content
else ""
)
if __name__ == "__main__":
question = {
"id": "25594265",
"index": "1",
"type": "单选",
"title": "下列哪个属于AI视觉识别应用",
"chooies": [
{"num": "A", "txt": "音乐合成"},
{"num": "B", "txt": "植物识别"},
{"num": "C", "txt": "文案创作"},
{"num": "D", "txt": "机器翻译"},
],
}
DOUBAO().chat(question)

196
main.py
View File

@@ -26,6 +26,9 @@ from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry from urllib3.util.retry import Retry
import warnings import warnings
from deepseek import DeepSeek
from doubao import DOUBAO
# 修复 ANTIALIAS 错误 - 添加猴子补丁 # 修复 ANTIALIAS 错误 - 添加猴子补丁
import PIL.Image import PIL.Image
@@ -79,7 +82,20 @@ def replace_html_entities(text, replace_mode="keep_char"):
class CKWK: class CKWK:
def __init__(self, un: str, pw: str, host: str) -> None: def __init__(self, un: str, pw: str, host: str, ai_type="doubao") -> None:
"""
un: 用户名
pw: 密码
host: 网站域名
ai_type: ai类别取值为 'doubao' 或者 'deepseek'
"""
if ai_type == "doubao":
self.ai = DOUBAO()
elif ai_type == "deepseek":
self.ai = DeepSeek()
else:
self.ai = DOUBAO()
self.session = requests.Session() self.session = requests.Session()
# 定义重试策略 # 定义重试策略
retry_strategy = Retry( retry_strategy = Retry(
@@ -93,18 +109,20 @@ class CKWK:
adapter = HTTPAdapter(max_retries=retry_strategy) adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter) self.session.mount("http://", adapter)
self.session.mount("https://", adapter) self.session.mount("https://", adapter)
# self.session.verify = False self.session.verify = False
# self.session.proxies.update( self.session.proxies.update(
# { {
# "http": "http://127.0.0.1:9000", "http": "http://127.0.0.1:9000",
# "https": "http://127.0.0.1:9000", "https": "http://127.0.0.1:9000",
# } }
# ) )
self.username = un self.username = un
self.host = host self.host = host
self.password = pw self.password = pw
self.user = dict() self.user = dict()
self.courses = [] self.courses = []
self.works = []
self.qas = []
self.ocr = ddddocr.DdddOcr() self.ocr = ddddocr.DdddOcr()
self.session.headers.update( self.session.headers.update(
{ {
@@ -215,6 +233,82 @@ class CKWK:
finally: finally:
return video, state return video, state
def parse_q_a(self, html: str) -> None:
tree = etree.HTML(html)
qas = tree.xpath('//div[@class="topic-item"]/form')
for qa in qas:
items = {
"id": qa.xpath(
'./div[@class="courseexamcon-submit"]/input[@name="answerId"]/@value'
)[0],
"index": qa.xpath(
'./div[@class="courseexamcon-main"]/div/div[@class="num"]/span/text()'
)[0],
"type": qa.xpath(
'./div[@class="courseexamcon-main"]/div/div[@class="type"]/text()'
)[0],
"title": qa.xpath(
'./div[@class="courseexamcon-main"]/div[@class="name"]/text()'
)[0].strip(),
"chooies": [
{
"num": a.xpath('.//span[@class="num"]/text()')[0],
"txt": a.xpath('.//span[@class="txt"]/text()')[0],
}
for a in qa.xpath(
'./div[@class="courseexamcon-main"]/div[@class="list"]//li'
)
],
}
self.qas.append(items)
def get_question_answer(self, courseID: str, nodeID: str, workID: str):
headers = self.session.headers
headers["x-requested-with"] = "XMLHttpRequest"
resp = self.session.get(
"https://{}/user/work/start".format(self.host),
params={"courseId": courseID, "nodeId": nodeID, "workId": workID},
headers=headers,
)
if resp.status_code == 200 and resp.json().get("status", False):
qa_path = resp.json().get("url", "")
self.log(f'开始作业: {resp.json().get("msg", "")} url -> {qa_path}')
resp2 = self.session.get("https://{}{}".format(self.host, qa_path))
if resp.status_code == 200:
self.parse_q_a(resp2.text)
else:
self.log(f'开始作业: {resp.json().get("msg", "")}')
return False
return True
def work_submit(self, answerID: str, workID: str, answer: str, final: bool):
headers = self.session.headers
headers["x-requested-with"] = "XMLHttpRequest"
data = {
"answerId": answerID,
"workId": workID,
"finish": "0" if final else "1",
}
if "," in answer:
answers = answer.split(",")
data["answer[]"] = answers
# for a in answers:
# data.setdefault("answer[]", []).append(a)
else:
if answer == "":
data["answer"] = "C"
data["answer"] = answer
resp = self.session.post(
f"https://{self.host}/user/work/submit",
data=data,
headers=headers,
)
if resp.status_code == 200 and resp.json().get("status", False):
self.log(
f'提交答案: {resp.json().get("msg", "")} {answerID} => {answer} {final}'
)
def get_course(self, _id: str) -> tuple[int, int]: def get_course(self, _id: str) -> tuple[int, int]:
resp = self.session.get("https://{}/user/node?nodeId={}".format(self.host, _id)) resp = self.session.get("https://{}/user/node?nodeId={}".format(self.host, _id))
if resp.status_code == 200 and "错误提示" not in resp.text: if resp.status_code == 200 and "错误提示" not in resp.text:
@@ -306,7 +400,23 @@ class CKWK:
if resp.status_code == 200 and resp.json().get("status", False): if resp.status_code == 200 and resp.json().get("status", False):
self.log(f'{resp.json().get("msg", "")} page -> {page}') self.log(f'{resp.json().get("msg", "")} page -> {page}')
self.courses.extend(resp.json().get("list", [])) self.courses.extend(resp.json().get("list", []))
if page <= resp.json().get("pageInfo", {}).get("pageCount", 0): if page < resp.json().get("pageInfo", {}).get("pageCount", 0):
self.get_study_record(course_id, page + 1)
def get_work_record(self, course_id: str, user_id: str, page=1):
resp = self.session.get(
f"https://{self.host}/user/study_record/work.json",
params={
"courseId": course_id,
# "userId": user_id,
"page": page,
"_": str(int(time.time() * 1000)),
},
)
if resp.status_code == 200 and resp.json().get("status", False):
self.log(f'{resp.json().get("msg", "")} page -> {page}')
self.works.extend(resp.json().get("list", []))
if page < resp.json().get("pageInfo", {}).get("pageCount", 0):
self.get_study_record(course_id, page + 1) self.get_study_record(course_id, page + 1)
def log(self, *args, **kwargs) -> None: def log(self, *args, **kwargs) -> None:
@@ -414,6 +524,7 @@ class CKWK:
self.online() self.online()
def run(self) -> None: def run(self) -> None:
print("当前为刷课模式: ")
while self.login(): while self.login():
self.get_user() self.get_user()
thread = Thread(target=self.start_loop, daemon=True) thread = Thread(target=self.start_loop, daemon=True)
@@ -443,7 +554,7 @@ class CKWK:
f'{l.get("name", "")} --> {l.get("id", "")} --> 已经学习过了' f'{l.get("name", "")} --> {l.get("id", "")} --> 已经学习过了'
) )
continue continue
if isReply: if isReply:
self.reply(c.get("id", ""), l.get("id", "")) self.reply(c.get("id", ""), l.get("id", ""))
@@ -470,6 +581,48 @@ class CKWK:
self.log("所有课程已经结束") self.log("所有课程已经结束")
break break
def work(self):
print("当前为刷作业模式: ")
while self.login():
self.get_user()
thread = Thread(target=self.start_loop, daemon=True)
thread.start()
cs = self.user.get("courses", [])
print(
f'姓名: {self.user.get("name", "")}\n\t互评: {self.user.get("huping", 0)}\n\t乐学园: {self.user.get("lexueyuan", 0)}\n\t讨论主题: {self.user.get("taolunzhuti", 0)}\n\t学习时长: {self.user.get("study_time", "")}'
)
if len(cs) > 1:
for i, _c in enumerate(cs):
print(f'{i+1}: {_c.get("name", "")} {_c.get("progress", "")}')
cs_order = input("请输入刷课顺序(用英文逗号分隔,不需要耍的不用加): ")
cs_list = [cs[int(_c) - 1] for _c in cs_order.split(",")]
cs_list = cs
for c in cs_list:
self.get_work_record(c.get("id", ""), "")
# print(self.works)
for work in self.works:
self.qas = []
if "已阅" in work.get("state"):
self.log(
f'{work.get("title", "")} 已阅 {work.get("finalScore", "")}'
)
continue
if not self.get_question_answer(
c.get("id", ""), work.get("nodeId", ""), work.get("id", "")
):
continue
for q in self.qas:
# print(q)
chooise = self.ai.chat(q)
self.work_submit(
q.get("id", ""),
work.get("id", ""),
chooise,
int(q.get("index", 1)) < len(self.qas),
)
# break
break
if __name__ == "__main__": if __name__ == "__main__":
host_server = [ host_server = [
@@ -486,12 +639,17 @@ if __name__ == "__main__":
u = input("请输入账号: ") u = input("请输入账号: ")
if p == "": if p == "":
p = input("请输入密码: ") p = input("请输入密码: ")
r = input("是否需要评论(不需要[0],需要[1]): ")
if int(r) == 1: # 刷课
isReply = True # r = input("是否需要评论(不需要[0],需要[1]): ")
else: # if int(r) == 1:
isReply = False # isReply = True
print(bool(isReply)) # else:
if u is None and p is None and c in [0, 1, 2]: # isReply = False
os._exit(0) # print(bool(isReply))
CKWK(f"{u}", f"{p}", host_server[c][0]).run() # if u is None and p is None and c in [0, 1, 2]:
# os._exit(0)
# CKWK(f"{u}", f"{p}", host_server[c][0]).run()
# 刷作业
CKWK(f"{u}", f"{p}", host_server[c][0]).work()

View File

@@ -8,5 +8,6 @@ dependencies = [
"ddddocr>=1.5.6", "ddddocr>=1.5.6",
"dotenv>=0.9.9", "dotenv>=0.9.9",
"lxml>=6.0.2", "lxml>=6.0.2",
"openai>=2.8.1",
"requests>=2.32.5", "requests>=2.32.5",
] ]