feat: 新增考试,可选(kimi,豆包,deepseek)模型进行答题,自己设置 API_KEY, 请不要相信 AI 的结果,最后交卷会提示你

This commit is contained in:
2025-12-12 19:17:36 +08:00
parent 6dfeac0d24
commit 14d4bc014d
4 changed files with 284 additions and 20 deletions

View File

@@ -23,6 +23,7 @@
- 自动浏览课程内容,模拟学习行为
- 自动获取学习记录,跳过已学课程
- 定时保持在线状态
- 自动进行使用 **豆包** 或者 **Deepseek** 进行答题或者考试
### 使用环境

View File

@@ -3,6 +3,7 @@ import os
import json
from openai import OpenAI
from dotenv import load_dotenv
from httpx import Client, Proxy
load_dotenv()
@@ -24,9 +25,17 @@ class DOUBAO:
self.key = os.environ.get("DOUBAO_API_KEY")
if not self.key:
raise Exception("找不到 DOUBAO_API_KEY 请设置环境变量在运行代码")
httpx_client = Client(
proxy=Proxy("socks5://127.0.0.1:9000"), # 传入SOCKS代理地址
timeout=30.0, # 可选:设置超时时间
verify=False,
)
self.client = OpenAI(
api_key=self.key,
base_url="https://ark.cn-beijing.volces.com/api/v3",
http_client=httpx_client,
)
def chat(self, question_json: dict) -> str:

73
kimi.py Normal file
View File

@@ -0,0 +1,73 @@
# pip install openai python-dotenv
import os
import json
from openai import OpenAI
from dotenv import load_dotenv
from httpx import Client, Proxy
load_dotenv()
class KIMI:
system_prompt = (
"You are a professional QA answer assistant.\n"
"The user will provide a JSON object containing a question.\n"
"Your job:\n"
"1. Read the 'title' and the 'chooies'.\n"
"2. Determine the correct answer(s).\n"
"3. Output ONLY the 'num' of the correct choices.\n"
"4. If multiple answers exist, join them with English commas, e.g., 'A,C,D'.\n"
"5. If you cannot determine the answer confidently, return an empty string.\n"
"6. Do not output explanations or any extra text."
)
def __init__(self) -> None:
# 默认从环境变量读取,也可以手动填
self.key = os.getenv(
"KIMI_API_KEY", os.getenv("KIMI_API_KEY")
)
if not self.key:
raise RuntimeError("找不到 KIMI_API_KEY请设置环境变量后再运行")
httpx_client = Client(
proxy=Proxy("socks5://127.0.0.1:9000"), # 传入SOCKS代理地址
timeout=30.0, # 可选:设置超时时间
verify=False,
)
self.client = OpenAI(
api_key=self.key,
base_url="https://api.moonshot.cn/v1", # 已去掉多余空格
http_client=httpx_client,
)
def chat(self, question_json: dict) -> str:
resp = self.client.chat.completions.create(
model="kimi-k2-turbo-preview",
messages=[
{"role": "system", "content": self.system_prompt},
{
"role": "user",
"content": json.dumps(question_json, ensure_ascii=False),
},
],
temperature=0,
)
return resp.choices[0].message.content or ""
# 本地单条测试
if __name__ == "__main__":
question = {
"id": "25594265",
"index": "1",
"type": "单选",
"title": "下列哪个属于AI视觉识别应用",
"chooies": [
{"num": "A", "txt": "音乐合成"},
{"num": "B", "txt": "植物识别"},
{"num": "C", "txt": "文案创作"},
{"num": "D", "txt": "机器翻译"},
],
}
print(KIMI().chat(question)) # 期望输出B

221
main.py
View File

@@ -3,7 +3,7 @@
文件名称: ckwk.py
作者: zhilv
邮箱: zhilv666@qq.com
版本: 1.2
版本: 1.3
--------------------------------------------
说明:
本脚本仅用于学习与技术研究,禁止将其用于任何非法用途。
@@ -28,6 +28,7 @@ import warnings
from deepseek import DeepSeek
from doubao import DOUBAO
from kimi import KIMI
# 修复 ANTIALIAS 错误 - 添加猴子补丁
import PIL.Image
@@ -82,7 +83,7 @@ def replace_html_entities(text, replace_mode="keep_char"):
class CKWK:
def __init__(self, un: str, pw: str, host: str, ai_type="doubao") -> None:
def __init__(self, un: str, pw: str, host: str, ai_type="kimi") -> None:
"""
un: 用户名
pw: 密码
@@ -93,6 +94,8 @@ class CKWK:
self.ai = DOUBAO()
elif ai_type == "deepseek":
self.ai = DeepSeek()
elif ai_type == "kimi":
self.ai = KIMI()
else:
self.ai = DOUBAO()
@@ -109,19 +112,20 @@ class CKWK:
adapter = HTTPAdapter(max_retries=retry_strategy)
self.session.mount("http://", adapter)
self.session.mount("https://", adapter)
# self.session.verify = False
# self.session.proxies.update(
# {
# "http": "http://127.0.0.1:9000",
# "https": "http://127.0.0.1:9000",
# }
# )
self.session.verify = False
self.session.proxies.update(
{
"http": "http://127.0.0.1:9000",
"https": "http://127.0.0.1:9000",
}
)
self.username = un
self.host = host
self.password = pw
self.user = dict()
self.courses = []
self.works = []
self.exams = []
self.qas = []
self.ocr = ddddocr.DdddOcr()
self.session.headers.update(
@@ -233,7 +237,7 @@ class CKWK:
finally:
return video, state
def parse_q_a(self, html: str) -> None:
def parse_q_a_work(self, html: str) -> None:
tree = etree.HTML(html)
qas = tree.xpath('//div[@class="topic-item"]/form')
for qa in qas:
@@ -262,7 +266,36 @@ class CKWK:
}
self.qas.append(items)
def get_question_answer(self, courseID: str, nodeID: str, workID: str):
def parse_q_a_exam(self, html: str) -> None:
tree = etree.HTML(html)
qas = tree.xpath("//div/form")
for qa in qas:
items = {
"id": qa.xpath(
'./div[@class="courseexamcon-submit"]/input[@name="answerId"]/@value'
)[0],
"index": qa.xpath(
'./div[@class="courseexamcon-main"]/div/div[@class="num"]/span/text()'
)[0],
"type": qa.xpath(
'./div[@class="courseexamcon-main"]/div/div[@class="type"]/text()'
)[0],
"title": qa.xpath(
'./div[@class="courseexamcon-main"]/div[@class="name"]/text()'
)[0].strip(),
"chooies": [
{
"num": a.xpath('.//span[@class="num"]/text()')[0],
"txt": a.xpath('.//span[@class="txt"]/text()')[0],
}
for a in qa.xpath(
'./div[@class="courseexamcon-main"]/div[@class="list"]//li'
)
],
}
self.qas.append(items)
def get_question_answer_work(self, courseID: str, nodeID: str, workID: str):
headers = self.session.headers
headers["x-requested-with"] = "XMLHttpRequest"
resp = self.session.get(
@@ -272,22 +305,47 @@ class CKWK:
)
if resp.status_code == 200 and resp.json().get("status", False):
qa_path = resp.json().get("url", "")
self.log(f'开始作业: {resp.json().get("msg", "")} url -> {qa_path}')
resp2 = self.session.get("https://{}{}".format(self.host, qa_path))
self.start_url = "https://{}{}".format(self.host, qa_path)
self.log(f'开始作业: {resp.json().get("msg", "")} url -> {self.start_url}')
resp2 = self.session.get(self.start_url)
self.qas = []
if resp.status_code == 200:
self.parse_q_a(resp2.text)
self.parse_q_a_work(resp2.text)
else:
self.log(f'开始作业: {resp.json().get("msg", "")}')
return False
return True
def work_submit(self, answerID: str, workID: str, answer: str, final: bool):
def get_question_answer_exam(self, courseID: str, nodeID: str, examId: str):
headers = self.session.headers
headers["x-requested-with"] = "XMLHttpRequest"
resp = self.session.get(
"https://{}/user/exam/start".format(self.host),
params={"courseId": courseID, "nodeId": nodeID, "examId": examId},
headers=headers,
)
if resp.status_code == 200 and resp.json().get("status", False):
qa_path = resp.json().get("url", "")
self.start_url = "https://{}{}".format(self.host, qa_path)
self.log(f'开始考试: {resp.json().get("msg", "")} url -> {self.start_url}')
resp2 = self.session.get(self.start_url)
self.qas = []
if resp.status_code == 200:
self.parse_q_a_exam(resp2.text)
else:
self.log(f'开始考试: {resp.json().get("msg", "")}')
return False
return True
def work_submit(
self, answerID: str, workID: str, answer: str, final: bool, index: int
):
headers = self.session.headers
headers["x-requested-with"] = "XMLHttpRequest"
data = {
"answerId": answerID,
"workId": workID,
"finish": "0" if final else "1",
"finish": "0",
}
if "," in answer:
answers = answer.split(",")
@@ -296,7 +354,7 @@ class CKWK:
# data.setdefault("answer[]", []).append(a)
else:
if answer == "":
data["answer"] = "C"
data["answer"] = "A"
data["answer"] = answer
resp = self.session.post(
@@ -304,9 +362,69 @@ class CKWK:
data=data,
headers=headers,
)
if not final:
aa = 1
while aa == 1:
print(f"作业链接: {self.start_url}")
aa = input("现在是最后一道题,请你去检查后输入 0 后自动提交:")
if aa == 0:
data["finish"] = "1"
resp = self.session.post(
f"https://{self.host}/user/work/submit",
data=data,
headers=headers,
)
break
if resp.status_code == 200 and resp.json().get("status", False):
self.log(
f'提交答案: {resp.json().get("msg", "")} {answerID} => {answer} {final}'
f'{index}/{len(self.qas)} 提交答案: {resp.json().get("msg", "")} {answerID} => {answer} {final}'
)
def exam_submit(
self, answerID: str, workID: str, answer: str, final: bool, index: int
):
headers = self.session.headers
headers["x-requested-with"] = "XMLHttpRequest"
data = {
"answerId": answerID,
"examId": workID,
"finish": "0",
}
if "," in answer:
answers = answer.split(",")
data["answer[]"] = answers
# for a in answers:
# data.setdefault("answer[]", []).append(a)
else:
if answer == "":
data["answer"] = "A"
data["answer"] = answer
resp = self.session.post(
f"https://{self.host}/user/exam/submit",
data=data,
headers=headers,
)
if not final:
aa = 1
while aa == 1:
print(f"考试链接: {self.start_url}")
aa = input("现在是最后一道,请你去检查后输入 0 后自动提交:")
if aa == 0:
data["finish"] = "1"
resp = self.session.post(
f"https://{self.host}/user/exam/submit",
data=data,
headers=headers,
)
break
if resp.status_code == 200 and resp.json().get("status", False):
self.log(
f'{index}/{len(self.qas)} 提交答案: {resp.json().get("msg", "")} {answerID} => {answer} {final}'
)
def get_course(self, _id: str) -> tuple[int, int]:
@@ -419,6 +537,21 @@ class CKWK:
if page < resp.json().get("pageInfo", {}).get("pageCount", 0):
self.get_study_record(course_id, page + 1)
def get_exam_record(self, course_id: str, page=1):
resp = self.session.get(
f"https://{self.host}/user/study_record/exam.json",
params={
"courseId": course_id,
"page": page,
"_": str(int(time.time() * 1000)),
},
)
if resp.status_code == 200 and resp.json().get("status", False):
self.log(f'{resp.json().get("msg", "")} page -> {page}')
self.exams.extend(resp.json().get("list", []))
if page < resp.json().get("pageInfo", {}).get("pageCount", 0):
self.get_exam_record(course_id, page + 1)
def log(self, *args, **kwargs) -> None:
exit_flag = False
time_str = datetime.now().strftime("[%Y-%m-%d %H:%M:%S]")
@@ -607,7 +740,7 @@ class CKWK:
f'{work.get("title", "")} 已阅 {work.get("finalScore", "")}'
)
continue
if not self.get_question_answer(
if not self.get_question_answer_work(
c.get("id", ""), work.get("nodeId", ""), work.get("id", "")
):
continue
@@ -620,6 +753,51 @@ class CKWK:
work.get("id", ""),
chooise,
int(q.get("index", 1)) < len(self.qas),
q.get("index", 1),
)
# break
break
def exam(self):
while self.login():
self.get_user()
thread = Thread(target=self.start_loop, daemon=True)
thread.start()
cs = self.user.get("courses", [])
print(
f'姓名: {self.user.get("name", "")}\n\t互评: {self.user.get("huping", 0)}\n\t乐学园: {self.user.get("lexueyuan", 0)}\n\t讨论主题: {self.user.get("taolunzhuti", 0)}\n\t学习时长: {self.user.get("study_time", "")}'
)
if len(cs) > 1:
for i, _c in enumerate(cs):
print(f'{i+1}: {_c.get("name", "")} {_c.get("progress", "")}')
cs_order = input("请输入刷课顺序(用英文逗号分隔,不需要耍的不用加): ")
cs_list = [cs[int(_c) - 1] for _c in cs_order.split(",")]
cs_list = cs
for c in cs_list:
self.get_exam_record(c.get("id", ""))
print(self.exams)
for exam in self.exams:
self.qas = []
if "线下阅" in exam.get("state"):
self.log(
f'{exam.get("title", "")} 已阅 {exam.get("finalScore", "")}'
)
continue
print(exam)
if not self.get_question_answer_exam(
c.get("id", ""), exam.get("nodeId", ""), exam.get("id", "")
):
continue
for q in self.qas:
# print(q)
chooise = self.ai.chat(q)
chooise = chooise if chooise else "A"
self.exam_submit(
q.get("id", ""),
exam.get("id", ""),
chooise,
int(q.get("index", 1)) < len(self.qas),
q.get("index", 1),
)
# break
break
@@ -655,4 +833,7 @@ if __name__ == "__main__":
# CKWK(f"{u}", f"{p}", host_server[c][0]).run()
# 刷作业,不需要用的时候可以直接注释
CKWK(f"{u}", f"{p}", host_server[c][0]).work()
# CKWK(f"{u}", f"{p}", host_server[c][0]).work()
# 考试,不需要用的时候可以直接注释
CKWK(f"{u}", f"{p}", host_server[c][0]).exam()