diff --git a/README.md b/README.md index 6793cdc..f74a993 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,7 @@ - 自动浏览课程内容,模拟学习行为 - 自动获取学习记录,跳过已学课程 - 定时保持在线状态 +- 自动进行使用 **豆包** 或者 **Deepseek** 进行答题或者考试 ### 使用环境 diff --git a/doubao.py b/doubao.py index 7ac3bce..0b896b4 100644 --- a/doubao.py +++ b/doubao.py @@ -3,6 +3,7 @@ import os import json from openai import OpenAI from dotenv import load_dotenv +from httpx import Client, Proxy load_dotenv() @@ -24,9 +25,17 @@ class DOUBAO: self.key = os.environ.get("DOUBAO_API_KEY") if not self.key: raise Exception("找不到 DOUBAO_API_KEY 请设置环境变量在运行代码") + + httpx_client = Client( + proxy=Proxy("socks5://127.0.0.1:9000"), # 传入SOCKS代理地址 + timeout=30.0, # 可选:设置超时时间 + verify=False, + ) + self.client = OpenAI( api_key=self.key, base_url="https://ark.cn-beijing.volces.com/api/v3", + http_client=httpx_client, ) def chat(self, question_json: dict) -> str: diff --git a/kimi.py b/kimi.py new file mode 100644 index 0000000..1270400 --- /dev/null +++ b/kimi.py @@ -0,0 +1,73 @@ +# pip install openai python-dotenv +import os +import json +from openai import OpenAI +from dotenv import load_dotenv +from httpx import Client, Proxy + +load_dotenv() + + +class KIMI: + system_prompt = ( + "You are a professional QA answer assistant.\n" + "The user will provide a JSON object containing a question.\n" + "Your job:\n" + "1. Read the 'title' and the 'chooies'.\n" + "2. Determine the correct answer(s).\n" + "3. Output ONLY the 'num' of the correct choices.\n" + "4. If multiple answers exist, join them with English commas, e.g., 'A,C,D'.\n" + "5. If you cannot determine the answer confidently, return an empty string.\n" + "6. Do not output explanations or any extra text." + ) + + def __init__(self) -> None: + # 默认从环境变量读取,也可以手动填 + self.key = os.getenv( + "KIMI_API_KEY", os.getenv("KIMI_API_KEY") + ) + if not self.key: + raise RuntimeError("找不到 KIMI_API_KEY,请设置环境变量后再运行") + + httpx_client = Client( + proxy=Proxy("socks5://127.0.0.1:9000"), # 传入SOCKS代理地址 + timeout=30.0, # 可选:设置超时时间 + verify=False, + ) + + self.client = OpenAI( + api_key=self.key, + base_url="https://api.moonshot.cn/v1", # 已去掉多余空格 + http_client=httpx_client, + ) + + def chat(self, question_json: dict) -> str: + resp = self.client.chat.completions.create( + model="kimi-k2-turbo-preview", + messages=[ + {"role": "system", "content": self.system_prompt}, + { + "role": "user", + "content": json.dumps(question_json, ensure_ascii=False), + }, + ], + temperature=0, + ) + return resp.choices[0].message.content or "" + + +# 本地单条测试 +if __name__ == "__main__": + question = { + "id": "25594265", + "index": "1", + "type": "单选", + "title": "下列哪个属于AI视觉识别应用?", + "chooies": [ + {"num": "A", "txt": "音乐合成"}, + {"num": "B", "txt": "植物识别"}, + {"num": "C", "txt": "文案创作"}, + {"num": "D", "txt": "机器翻译"}, + ], + } + print(KIMI().chat(question)) # 期望输出:B diff --git a/main.py b/main.py index 1197d5a..53a9a59 100644 --- a/main.py +++ b/main.py @@ -3,7 +3,7 @@ 文件名称: ckwk.py 作者: zhilv 邮箱: zhilv666@qq.com -版本: 1.2 +版本: 1.3 -------------------------------------------- 说明: 本脚本仅用于学习与技术研究,禁止将其用于任何非法用途。 @@ -28,6 +28,7 @@ import warnings from deepseek import DeepSeek from doubao import DOUBAO +from kimi import KIMI # 修复 ANTIALIAS 错误 - 添加猴子补丁 import PIL.Image @@ -82,7 +83,7 @@ def replace_html_entities(text, replace_mode="keep_char"): class CKWK: - def __init__(self, un: str, pw: str, host: str, ai_type="doubao") -> None: + def __init__(self, un: str, pw: str, host: str, ai_type="kimi") -> None: """ un: 用户名 pw: 密码 @@ -93,6 +94,8 @@ class CKWK: self.ai = DOUBAO() elif ai_type == "deepseek": self.ai = DeepSeek() + elif ai_type == "kimi": + self.ai = KIMI() else: self.ai = DOUBAO() @@ -109,19 +112,20 @@ class CKWK: adapter = HTTPAdapter(max_retries=retry_strategy) self.session.mount("http://", adapter) self.session.mount("https://", adapter) - # self.session.verify = False - # self.session.proxies.update( - # { - # "http": "http://127.0.0.1:9000", - # "https": "http://127.0.0.1:9000", - # } - # ) + self.session.verify = False + self.session.proxies.update( + { + "http": "http://127.0.0.1:9000", + "https": "http://127.0.0.1:9000", + } + ) self.username = un self.host = host self.password = pw self.user = dict() self.courses = [] self.works = [] + self.exams = [] self.qas = [] self.ocr = ddddocr.DdddOcr() self.session.headers.update( @@ -233,7 +237,7 @@ class CKWK: finally: return video, state - def parse_q_a(self, html: str) -> None: + def parse_q_a_work(self, html: str) -> None: tree = etree.HTML(html) qas = tree.xpath('//div[@class="topic-item"]/form') for qa in qas: @@ -262,7 +266,36 @@ class CKWK: } self.qas.append(items) - def get_question_answer(self, courseID: str, nodeID: str, workID: str): + def parse_q_a_exam(self, html: str) -> None: + tree = etree.HTML(html) + qas = tree.xpath("//div/form") + for qa in qas: + items = { + "id": qa.xpath( + './div[@class="courseexamcon-submit"]/input[@name="answerId"]/@value' + )[0], + "index": qa.xpath( + './div[@class="courseexamcon-main"]/div/div[@class="num"]/span/text()' + )[0], + "type": qa.xpath( + './div[@class="courseexamcon-main"]/div/div[@class="type"]/text()' + )[0], + "title": qa.xpath( + './div[@class="courseexamcon-main"]/div[@class="name"]/text()' + )[0].strip(), + "chooies": [ + { + "num": a.xpath('.//span[@class="num"]/text()')[0], + "txt": a.xpath('.//span[@class="txt"]/text()')[0], + } + for a in qa.xpath( + './div[@class="courseexamcon-main"]/div[@class="list"]//li' + ) + ], + } + self.qas.append(items) + + def get_question_answer_work(self, courseID: str, nodeID: str, workID: str): headers = self.session.headers headers["x-requested-with"] = "XMLHttpRequest" resp = self.session.get( @@ -272,22 +305,47 @@ class CKWK: ) if resp.status_code == 200 and resp.json().get("status", False): qa_path = resp.json().get("url", "") - self.log(f'开始作业: {resp.json().get("msg", "")} url -> {qa_path}') - resp2 = self.session.get("https://{}{}".format(self.host, qa_path)) + self.start_url = "https://{}{}".format(self.host, qa_path) + self.log(f'开始作业: {resp.json().get("msg", "")} url -> {self.start_url}') + resp2 = self.session.get(self.start_url) + self.qas = [] if resp.status_code == 200: - self.parse_q_a(resp2.text) + self.parse_q_a_work(resp2.text) else: self.log(f'开始作业: {resp.json().get("msg", "")}') return False return True - def work_submit(self, answerID: str, workID: str, answer: str, final: bool): + def get_question_answer_exam(self, courseID: str, nodeID: str, examId: str): + headers = self.session.headers + headers["x-requested-with"] = "XMLHttpRequest" + resp = self.session.get( + "https://{}/user/exam/start".format(self.host), + params={"courseId": courseID, "nodeId": nodeID, "examId": examId}, + headers=headers, + ) + if resp.status_code == 200 and resp.json().get("status", False): + qa_path = resp.json().get("url", "") + self.start_url = "https://{}{}".format(self.host, qa_path) + self.log(f'开始考试: {resp.json().get("msg", "")} url -> {self.start_url}') + resp2 = self.session.get(self.start_url) + self.qas = [] + if resp.status_code == 200: + self.parse_q_a_exam(resp2.text) + else: + self.log(f'开始考试: {resp.json().get("msg", "")}') + return False + return True + + def work_submit( + self, answerID: str, workID: str, answer: str, final: bool, index: int + ): headers = self.session.headers headers["x-requested-with"] = "XMLHttpRequest" data = { "answerId": answerID, "workId": workID, - "finish": "0" if final else "1", + "finish": "0", } if "," in answer: answers = answer.split(",") @@ -296,7 +354,7 @@ class CKWK: # data.setdefault("answer[]", []).append(a) else: if answer == "": - data["answer"] = "C" + data["answer"] = "A" data["answer"] = answer resp = self.session.post( @@ -304,9 +362,69 @@ class CKWK: data=data, headers=headers, ) + + if not final: + aa = 1 + while aa == 1: + print(f"作业链接: {self.start_url}") + aa = input("现在是最后一道题,请你去检查后输入 0 后自动提交:") + if aa == 0: + data["finish"] = "1" + resp = self.session.post( + f"https://{self.host}/user/work/submit", + data=data, + headers=headers, + ) + break + if resp.status_code == 200 and resp.json().get("status", False): self.log( - f'提交答案: {resp.json().get("msg", "")} {answerID} => {answer} {final}' + f'{index}/{len(self.qas)} 提交答案: {resp.json().get("msg", "")} {answerID} => {answer} {final}' + ) + + def exam_submit( + self, answerID: str, workID: str, answer: str, final: bool, index: int + ): + headers = self.session.headers + headers["x-requested-with"] = "XMLHttpRequest" + data = { + "answerId": answerID, + "examId": workID, + "finish": "0", + } + if "," in answer: + answers = answer.split(",") + data["answer[]"] = answers + # for a in answers: + # data.setdefault("answer[]", []).append(a) + else: + if answer == "": + data["answer"] = "A" + data["answer"] = answer + + resp = self.session.post( + f"https://{self.host}/user/exam/submit", + data=data, + headers=headers, + ) + + if not final: + aa = 1 + while aa == 1: + print(f"考试链接: {self.start_url}") + aa = input("现在是最后一道,请你去检查后输入 0 后自动提交:") + if aa == 0: + data["finish"] = "1" + resp = self.session.post( + f"https://{self.host}/user/exam/submit", + data=data, + headers=headers, + ) + break + + if resp.status_code == 200 and resp.json().get("status", False): + self.log( + f'{index}/{len(self.qas)} 提交答案: {resp.json().get("msg", "")} {answerID} => {answer} {final}' ) def get_course(self, _id: str) -> tuple[int, int]: @@ -419,6 +537,21 @@ class CKWK: if page < resp.json().get("pageInfo", {}).get("pageCount", 0): self.get_study_record(course_id, page + 1) + def get_exam_record(self, course_id: str, page=1): + resp = self.session.get( + f"https://{self.host}/user/study_record/exam.json", + params={ + "courseId": course_id, + "page": page, + "_": str(int(time.time() * 1000)), + }, + ) + if resp.status_code == 200 and resp.json().get("status", False): + self.log(f'{resp.json().get("msg", "")} page -> {page}') + self.exams.extend(resp.json().get("list", [])) + if page < resp.json().get("pageInfo", {}).get("pageCount", 0): + self.get_exam_record(course_id, page + 1) + def log(self, *args, **kwargs) -> None: exit_flag = False time_str = datetime.now().strftime("[%Y-%m-%d %H:%M:%S]") @@ -607,7 +740,7 @@ class CKWK: f'{work.get("title", "")} 已阅 {work.get("finalScore", "")}' ) continue - if not self.get_question_answer( + if not self.get_question_answer_work( c.get("id", ""), work.get("nodeId", ""), work.get("id", "") ): continue @@ -620,6 +753,51 @@ class CKWK: work.get("id", ""), chooise, int(q.get("index", 1)) < len(self.qas), + q.get("index", 1), + ) + # break + break + + def exam(self): + while self.login(): + self.get_user() + thread = Thread(target=self.start_loop, daemon=True) + thread.start() + cs = self.user.get("courses", []) + print( + f'姓名: {self.user.get("name", "")}\n\t互评: {self.user.get("huping", 0)}\n\t乐学园: {self.user.get("lexueyuan", 0)}\n\t讨论主题: {self.user.get("taolunzhuti", 0)}\n\t学习时长: {self.user.get("study_time", "")}' + ) + if len(cs) > 1: + for i, _c in enumerate(cs): + print(f'{i+1}: {_c.get("name", "")} {_c.get("progress", "")}') + cs_order = input("请输入刷课顺序(用英文逗号分隔,不需要耍的不用加): ") + cs_list = [cs[int(_c) - 1] for _c in cs_order.split(",")] + cs_list = cs + for c in cs_list: + self.get_exam_record(c.get("id", "")) + print(self.exams) + for exam in self.exams: + self.qas = [] + if "线下阅" in exam.get("state"): + self.log( + f'{exam.get("title", "")} 已阅 {exam.get("finalScore", "")}' + ) + continue + print(exam) + if not self.get_question_answer_exam( + c.get("id", ""), exam.get("nodeId", ""), exam.get("id", "") + ): + continue + for q in self.qas: + # print(q) + chooise = self.ai.chat(q) + chooise = chooise if chooise else "A" + self.exam_submit( + q.get("id", ""), + exam.get("id", ""), + chooise, + int(q.get("index", 1)) < len(self.qas), + q.get("index", 1), ) # break break @@ -655,4 +833,7 @@ if __name__ == "__main__": # CKWK(f"{u}", f"{p}", host_server[c][0]).run() # 刷作业,不需要用的时候可以直接注释 - CKWK(f"{u}", f"{p}", host_server[c][0]).work() + # CKWK(f"{u}", f"{p}", host_server[c][0]).work() + + # 考试,不需要用的时候可以直接注释 + CKWK(f"{u}", f"{p}", host_server[c][0]).exam()