diff --git a/__pycache__/deepseek.cpython-313.pyc b/__pycache__/deepseek.cpython-313.pyc new file mode 100644 index 0000000..5c8b719 Binary files /dev/null and b/__pycache__/deepseek.cpython-313.pyc differ diff --git a/__pycache__/doubao.cpython-313.pyc b/__pycache__/doubao.cpython-313.pyc new file mode 100644 index 0000000..403486d Binary files /dev/null and b/__pycache__/doubao.cpython-313.pyc differ diff --git a/deepseek.py b/deepseek.py new file mode 100644 index 0000000..b440529 --- /dev/null +++ b/deepseek.py @@ -0,0 +1,64 @@ +# Please install OpenAI SDK first: `pip3 install openai` +import os +import json +from openai import OpenAI +from dotenv import load_dotenv + +load_dotenv() + + +class DeepSeek: + system_prompt = ( + "You are a professional QA answer assistant.\n" + "The user will provide a JSON object containing a question.\n" + "Your job:\n" + "1. Read the 'title' and the 'chooies'.\n" + "2. Determine the correct answer(s).\n" + "3. Output ONLY the 'num' of the correct choices.\n" + "4. If multiple answers exist, join them with English commas, e.g., 'A,C,D'.\n" + "5. If you cannot determine the answer confidently, return an empty string.\n" + "6. Do not output explanations or any extra text." + ) + + def __init__(self) -> None: + self.key = os.environ.get("DEEPSEEK_API_KEY") + if self.key: + raise Exception("找不到 DEEPSEEK_API_KEY 请设置环境变量在运行代码") + self.client = OpenAI( + api_key=self.key, + base_url="https://api.deepseek.com", + ) + + def chat(self, question_json: dict) -> str: + response = self.client.chat.completions.create( + model="deepseek-chat", + # model="deepseek-reasoner", + messages=[ + {"role": "system", "content": self.system_prompt}, + {"role": "user", "content": json.dumps(question_json)}, + ], + stream=False, + ) + + # print(response.choices[0].message.content) + return ( + response.choices[0].message.content + if response.choices[0].message.content + else "" + ) + + +if __name__ == "__main__": + question = { + "id": "25594265", + "index": "1", + "type": "单选", + "title": "下列哪个属于AI视觉识别应用?", + "chooies": [ + {"num": "A", "txt": "音乐合成"}, + {"num": "B", "txt": "植物识别"}, + {"num": "C", "txt": "文案创作"}, + {"num": "D", "txt": "机器翻译"}, + ], + } + DeepSeek().chat(question) diff --git a/doubao.py b/doubao.py new file mode 100644 index 0000000..375393b --- /dev/null +++ b/doubao.py @@ -0,0 +1,65 @@ +# Please install OpenAI SDK first: `pip3 install openai` +import os +import json +from openai import OpenAI +from dotenv import load_dotenv + +load_dotenv() + + +class DOUBAO: + system_prompt = ( + "You are a professional QA answer assistant.\n" + "The user will provide a JSON object containing a question.\n" + "Your job:\n" + "1. Read the 'title' and the 'chooies'.\n" + "2. Determine the correct answer(s).\n" + "3. Output ONLY the 'num' of the correct choices.\n" + "4. If multiple answers exist, join them with English commas, e.g., 'A,C,D'.\n" + "5. If you cannot determine the answer confidently, return an empty string.\n" + "6. Do not output explanations or any extra text." + ) + + def __init__(self) -> None: + self.key = os.environ.get("DOUBAO_API_KEY") + if self.key: + raise Exception("找不到 DOUBAO_API_KEY 请设置环境变量在运行代码") + self.client = OpenAI( + api_key=self.key, + base_url="https://ark.cn-beijing.volces.com/api/v3", + ) + + def chat(self, question_json: dict) -> str: + response = self.client.chat.completions.create( + # model="deepseek-chat", + model="doubao-seed-code-preview-251028", + # model="deepseek-reasoner", + messages=[ + {"role": "system", "content": self.system_prompt}, + {"role": "user", "content": json.dumps(question_json)}, + ], + stream=False, + ) + + # print(response.choices[0].message.content) + return ( + response.choices[0].message.content + if response.choices[0].message.content + else "" + ) + + +if __name__ == "__main__": + question = { + "id": "25594265", + "index": "1", + "type": "单选", + "title": "下列哪个属于AI视觉识别应用?", + "chooies": [ + {"num": "A", "txt": "音乐合成"}, + {"num": "B", "txt": "植物识别"}, + {"num": "C", "txt": "文案创作"}, + {"num": "D", "txt": "机器翻译"}, + ], + } + DOUBAO().chat(question) diff --git a/main.py b/main.py index a6db6b1..af15fd1 100644 --- a/main.py +++ b/main.py @@ -26,6 +26,9 @@ from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry import warnings +from deepseek import DeepSeek +from doubao import DOUBAO + # 修复 ANTIALIAS 错误 - 添加猴子补丁 import PIL.Image @@ -79,7 +82,20 @@ def replace_html_entities(text, replace_mode="keep_char"): class CKWK: - def __init__(self, un: str, pw: str, host: str) -> None: + def __init__(self, un: str, pw: str, host: str, ai_type="doubao") -> None: + """ + un: 用户名 + pw: 密码 + host: 网站域名 + ai_type: ai类别取值为 'doubao' 或者 'deepseek' + """ + if ai_type == "doubao": + self.ai = DOUBAO() + elif ai_type == "deepseek": + self.ai = DeepSeek() + else: + self.ai = DOUBAO() + self.session = requests.Session() # 定义重试策略 retry_strategy = Retry( @@ -93,18 +109,20 @@ class CKWK: adapter = HTTPAdapter(max_retries=retry_strategy) self.session.mount("http://", adapter) self.session.mount("https://", adapter) - # self.session.verify = False - # self.session.proxies.update( - # { - # "http": "http://127.0.0.1:9000", - # "https": "http://127.0.0.1:9000", - # } - # ) + self.session.verify = False + self.session.proxies.update( + { + "http": "http://127.0.0.1:9000", + "https": "http://127.0.0.1:9000", + } + ) self.username = un self.host = host self.password = pw self.user = dict() self.courses = [] + self.works = [] + self.qas = [] self.ocr = ddddocr.DdddOcr() self.session.headers.update( { @@ -215,6 +233,82 @@ class CKWK: finally: return video, state + def parse_q_a(self, html: str) -> None: + tree = etree.HTML(html) + qas = tree.xpath('//div[@class="topic-item"]/form') + for qa in qas: + items = { + "id": qa.xpath( + './div[@class="courseexamcon-submit"]/input[@name="answerId"]/@value' + )[0], + "index": qa.xpath( + './div[@class="courseexamcon-main"]/div/div[@class="num"]/span/text()' + )[0], + "type": qa.xpath( + './div[@class="courseexamcon-main"]/div/div[@class="type"]/text()' + )[0], + "title": qa.xpath( + './div[@class="courseexamcon-main"]/div[@class="name"]/text()' + )[0].strip(), + "chooies": [ + { + "num": a.xpath('.//span[@class="num"]/text()')[0], + "txt": a.xpath('.//span[@class="txt"]/text()')[0], + } + for a in qa.xpath( + './div[@class="courseexamcon-main"]/div[@class="list"]//li' + ) + ], + } + self.qas.append(items) + + def get_question_answer(self, courseID: str, nodeID: str, workID: str): + headers = self.session.headers + headers["x-requested-with"] = "XMLHttpRequest" + resp = self.session.get( + "https://{}/user/work/start".format(self.host), + params={"courseId": courseID, "nodeId": nodeID, "workId": workID}, + headers=headers, + ) + if resp.status_code == 200 and resp.json().get("status", False): + qa_path = resp.json().get("url", "") + self.log(f'开始作业: {resp.json().get("msg", "")} url -> {qa_path}') + resp2 = self.session.get("https://{}{}".format(self.host, qa_path)) + if resp.status_code == 200: + self.parse_q_a(resp2.text) + else: + self.log(f'开始作业: {resp.json().get("msg", "")}') + return False + return True + + def work_submit(self, answerID: str, workID: str, answer: str, final: bool): + headers = self.session.headers + headers["x-requested-with"] = "XMLHttpRequest" + data = { + "answerId": answerID, + "workId": workID, + "finish": "0" if final else "1", + } + if "," in answer: + answers = answer.split(",") + data["answer[]"] = answers + # for a in answers: + # data.setdefault("answer[]", []).append(a) + else: + if answer == "": + data["answer"] = "C" + data["answer"] = answer + + resp = self.session.post( + f"https://{self.host}/user/work/submit", + data=data, + headers=headers, + ) + if resp.status_code == 200 and resp.json().get("status", False): + self.log( + f'提交答案: {resp.json().get("msg", "")} {answerID} => {answer} {final}' + ) + def get_course(self, _id: str) -> tuple[int, int]: resp = self.session.get("https://{}/user/node?nodeId={}".format(self.host, _id)) if resp.status_code == 200 and "错误提示" not in resp.text: @@ -306,7 +400,23 @@ class CKWK: if resp.status_code == 200 and resp.json().get("status", False): self.log(f'{resp.json().get("msg", "")} page -> {page}') self.courses.extend(resp.json().get("list", [])) - if page <= resp.json().get("pageInfo", {}).get("pageCount", 0): + if page < resp.json().get("pageInfo", {}).get("pageCount", 0): + self.get_study_record(course_id, page + 1) + + def get_work_record(self, course_id: str, user_id: str, page=1): + resp = self.session.get( + f"https://{self.host}/user/study_record/work.json", + params={ + "courseId": course_id, + # "userId": user_id, + "page": page, + "_": str(int(time.time() * 1000)), + }, + ) + if resp.status_code == 200 and resp.json().get("status", False): + self.log(f'{resp.json().get("msg", "")} page -> {page}') + self.works.extend(resp.json().get("list", [])) + if page < resp.json().get("pageInfo", {}).get("pageCount", 0): self.get_study_record(course_id, page + 1) def log(self, *args, **kwargs) -> None: @@ -414,6 +524,7 @@ class CKWK: self.online() def run(self) -> None: + print("当前为刷课模式: ") while self.login(): self.get_user() thread = Thread(target=self.start_loop, daemon=True) @@ -443,7 +554,7 @@ class CKWK: f'{l.get("name", "")} --> {l.get("id", "")} --> 已经学习过了' ) continue - + if isReply: self.reply(c.get("id", ""), l.get("id", "")) @@ -470,6 +581,48 @@ class CKWK: self.log("所有课程已经结束") break + def work(self): + print("当前为刷作业模式: ") + while self.login(): + self.get_user() + thread = Thread(target=self.start_loop, daemon=True) + thread.start() + cs = self.user.get("courses", []) + print( + f'姓名: {self.user.get("name", "")}\n\t互评: {self.user.get("huping", 0)}\n\t乐学园: {self.user.get("lexueyuan", 0)}\n\t讨论主题: {self.user.get("taolunzhuti", 0)}\n\t学习时长: {self.user.get("study_time", "")}' + ) + if len(cs) > 1: + for i, _c in enumerate(cs): + print(f'{i+1}: {_c.get("name", "")} {_c.get("progress", "")}') + cs_order = input("请输入刷课顺序(用英文逗号分隔,不需要耍的不用加): ") + cs_list = [cs[int(_c) - 1] for _c in cs_order.split(",")] + cs_list = cs + for c in cs_list: + self.get_work_record(c.get("id", ""), "") + # print(self.works) + for work in self.works: + self.qas = [] + if "已阅" in work.get("state"): + self.log( + f'{work.get("title", "")} 已阅 {work.get("finalScore", "")}' + ) + continue + if not self.get_question_answer( + c.get("id", ""), work.get("nodeId", ""), work.get("id", "") + ): + continue + for q in self.qas: + # print(q) + chooise = self.ai.chat(q) + self.work_submit( + q.get("id", ""), + work.get("id", ""), + chooise, + int(q.get("index", 1)) < len(self.qas), + ) + # break + break + if __name__ == "__main__": host_server = [ @@ -486,12 +639,17 @@ if __name__ == "__main__": u = input("请输入账号: ") if p == "": p = input("请输入密码: ") - r = input("是否需要评论(不需要[0],需要[1]): ") - if int(r) == 1: - isReply = True - else: - isReply = False - print(bool(isReply)) - if u is None and p is None and c in [0, 1, 2]: - os._exit(0) - CKWK(f"{u}", f"{p}", host_server[c][0]).run() + + # 刷课 + # r = input("是否需要评论(不需要[0],需要[1]): ") + # if int(r) == 1: + # isReply = True + # else: + # isReply = False + # print(bool(isReply)) + # if u is None and p is None and c in [0, 1, 2]: + # os._exit(0) + # CKWK(f"{u}", f"{p}", host_server[c][0]).run() + + # 刷作业 + CKWK(f"{u}", f"{p}", host_server[c][0]).work() diff --git a/pyproject.toml b/pyproject.toml index cafa223..c1053d9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,5 +8,6 @@ dependencies = [ "ddddocr>=1.5.6", "dotenv>=0.9.9", "lxml>=6.0.2", + "openai>=2.8.1", "requests>=2.32.5", ]