""" -------------------------------------------- 文件名称: ckwk.py 作者: zhilv 邮箱: zhilv666@qq.com 版本: 1.3 -------------------------------------------- 说明: 本脚本仅用于学习与技术研究,禁止将其用于任何非法用途。 作者不对因使用本脚本造成的任何损失或后果承担责任。 -------------------------------------------- """ import json import os import random import re import time import warnings from datetime import datetime from threading import Thread import ddddocr # 修复 ANTIALIAS 错误 - 添加猴子补丁 import PIL.Image import requests from dotenv import load_dotenv from lxml import etree # type: ignore from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from deepseek import DeepSeek from doubao import DOUBAO from kimi import KIMI if not hasattr(PIL.Image, "ANTIALIAS"): PIL.Image.ANTIALIAS = PIL.Image.Resampling.LANCZOS # type: ignore warnings.filterwarnings("ignore") load_dotenv() isReply = False def replace_html_entities(text, replace_mode="keep_char"): """ 替换网页中的 HTML 实体字符(如 “、” 等) :param text: 原始网页文本 :param replace_mode: 替换模式: - "keep_char": 替换为对应的普通字符(默认,如 “ → 「) - "remove": 直接去除实体字符(如 “ → 空字符串) :return: 处理后的文本 """ # 常见 HTML 实体映射表(可根据需求扩展) html_entities = { # 引号类(用户示例中的核心场景) "“": "「", # 左双引号 "”": "」", # 右双引号 "‘": "「", # 左单引号 "’": "」", # 右单引号 # 其他常见实体(可选保留/扩展) "&": "&", # 和号 "<": "<", # 小于号 ">": ">", # 大于号 " ": " ", # 非换行空格 "©": "©", # 版权符号 "®": "®", # 注册商标符号 } # 根据模式调整替换目标 target_map = ( html_entities if replace_mode == "keep_char" else {k: "" for k in html_entities} ) # 批量替换 result = text for entity, target in target_map.items(): result = result.replace(entity, target) return result class CKWK: def __init__(self, un: str, pw: str, host: str, ai_type="kimi") -> None: """ un: 用户名 pw: 密码 host: 网站域名 ai_type: ai类别取值为 'doubao' 或者 'deepseek' """ if ai_type == "doubao": self.ai = DOUBAO() elif ai_type == "deepseek": self.ai = DeepSeek() elif ai_type == "kimi": self.ai = KIMI() else: self.ai = DOUBAO() self.session = requests.Session() # 定义重试策略 retry_strategy = Retry( total=5, # 最多重试 5 次 backoff_factor=1, # 重试间隔:1s, 2s, 4s, 8s... status_forcelist=[429, 500, 502, 503, 504], # 哪些状态码触发重试 allowed_methods=["HEAD", "GET", "OPTIONS", "POST"], # 允许重试的方法 ) # 挂载适配器 adapter = HTTPAdapter(max_retries=retry_strategy) self.session.mount("http://", adapter) self.session.mount("https://", adapter) # self.session.verify = False # self.session.proxies.update( # { # "http": "http://127.0.0.1:9000", # "https": "http://127.0.0.1:9000", # } # ) self.username = un self.host = host self.password = pw self.user = dict() self.courses = [] self.works = [] self.exams = [] self.qas = [] self.ocr = ddddocr.DdddOcr() self.session.headers.update( { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36 Edg/140.0.0.0", "Accept": "application/json, text/javascript, */*; q=0.01", "Accept-Encoding": "gzip, deflate, br, zstd", } ) def coder(self) -> str: resp = self.session.get( f"https://{self.host}/service/code", params={"r": f"{random.random()}"}, ) if resp.status_code == 200: return f"{self.ocr.classification(resp.content)}" return "" def login(self) -> bool: yzm = self.coder() resp = self.session.post( f"https://{self.host}/user/login", data={ "username": self.username, "password": self.password, "code": yzm, "redirect": "", }, ) if "登录成功" in resp.text: return True try: data = json.loads(re.findall(" =(.*?);", resp.text)[0]) self.log(data.get("msg", "")) finally: return False def parse_user(self, html: str) -> None: tree = etree.HTML(html) try: self.user = { "name": tree.xpath('//div[@class="name"]/text()')[0], "course_number": tree.xpath( '//div[@class="intro"]/div[1]/div[2]/text()' )[0], "huping": tree.xpath('//div[@class="intro"]/div[2]/div[2]/text()')[0], "lexueyuan": tree.xpath('//div[@class="intro"]/div[3]/div[2]/text()')[ 0 ], "taolunzhuti": tree.xpath('//div[@class="intro"]/div[4]/div[2]/text()')[ 0 ], "study_time": "".join( tree.xpath('//div[@class="intro"]/div[5]/div[2]//text()') ), "courses": [ { "name": course.xpath('.//div[@class="name"]/a/text()')[0], "href": course.xpath('.//div[@class="name"]/a/@href')[0], "id": course.xpath('.//div[@class="name"]/a/@href')[0].split( "=" )[-1], "progress": course.xpath( './/div[@class="progress"]/div[3]/text()' )[0], } for course in tree.xpath('//div[@class="user-course"]/div') ], } # print(self.user) except Exception as e: self.log("[parse] 提取用户信息失败: ", e) raise e finally: pass def parse_course(self, html: str) -> tuple[int, int]: tree = etree.HTML(html) video = 0 state = 1 try: if self.courses == []: courses = [ { "title": i.get("title", ""), "href": i.get("href", ""), "id": i.get("href", "").split("=")[-1], "class": i.get("class", ""), } for i in tree.xpath( # '//div[@class="item"]/a[contains(text(), "mp4")]' '//div[@class="item"]/a' # '//div[@class="item"]/a[not(contains(text(), "考试") or contains(text(), "课件") or contains(text(), "练习") or contains(text(), "实验"))]' ) ] index = next( (i for i, c in enumerate(courses) if c["class"] == "on"), None ) if index is not None: self.courses = courses[index:] # print(self.courses) video = int(tree.xpath('.//input[@id="video-duration"]/@value')[0]) state = int(tree.xpath('.//input[@id="study-state"]/@value')[0]) except Exception as e: self.log("[parse] 提取课程列表失败: ", e) # raise e finally: return video, state def parse_q_a_work(self, html: str) -> None: tree = etree.HTML(html) qas = tree.xpath('//div[@class="topic-item"]/form') for qa in qas: items = { "id": qa.xpath( './div[@class="courseexamcon-submit"]/input[@name="answerId"]/@value' )[0], "index": qa.xpath( './div[@class="courseexamcon-main"]/div/div[@class="num"]/span/text()' )[0], "type": qa.xpath( './div[@class="courseexamcon-main"]/div/div[@class="type"]/text()' )[0], "title": qa.xpath( './div[@class="courseexamcon-main"]/div[@class="name"]/text()' )[0].strip(), "chooies": [ { "num": a.xpath('.//span[@class="num"]/text()')[0], "txt": a.xpath('.//span[@class="txt"]/text()')[0], } for a in qa.xpath( './div[@class="courseexamcon-main"]/div[@class="list"]//li' ) ], } self.qas.append(items) def parse_q_a_exam(self, html: str) -> None: tree = etree.HTML(html) qas = tree.xpath("//div/form") for qa in qas: items = { "id": qa.xpath( './div[@class="courseexamcon-submit"]/input[@name="answerId"]/@value' )[0], "index": qa.xpath( './div[@class="courseexamcon-main"]/div/div[@class="num"]/span/text()' )[0], "type": qa.xpath( './div[@class="courseexamcon-main"]/div/div[@class="type"]/text()' )[0], "title": qa.xpath( './div[@class="courseexamcon-main"]/div[@class="name"]/text()' )[0].strip(), "chooies": [ { "num": a.xpath('.//span[@class="num"]/text()')[0], "txt": a.xpath('.//span[@class="txt"]/text()')[0], } for a in qa.xpath( './div[@class="courseexamcon-main"]/div[@class="list"]//li' ) ], } self.qas.append(items) def get_question_answer_work(self, courseID: str, nodeID: str, workID: str): headers = self.session.headers headers["x-requested-with"] = "XMLHttpRequest" resp = self.session.get( "https://{}/user/work/start".format(self.host), params={"courseId": courseID, "nodeId": nodeID, "workId": workID}, headers=headers, ) if resp.status_code == 200 and resp.json().get("status", False): qa_path = resp.json().get("url", "") self.start_url = "https://{}{}".format(self.host, qa_path) self.log(f"开始作业: {resp.json().get('msg', '')} url -> {self.start_url}") resp2 = self.session.get(self.start_url) self.qas = [] if resp.status_code == 200: self.parse_q_a_work(resp2.text) else: self.log(f"开始作业: {resp.json().get('msg', '')}") return False return True def get_question_answer_exam(self, courseID: str, nodeID: str, examId: str): headers = self.session.headers headers["x-requested-with"] = "XMLHttpRequest" resp = self.session.get( "https://{}/user/exam/start".format(self.host), params={"courseId": courseID, "nodeId": nodeID, "examId": examId}, headers=headers, ) if resp.status_code == 200 and resp.json().get("status", False): qa_path = resp.json().get("url", "") self.start_url = "https://{}{}".format(self.host, qa_path) self.log(f"开始考试: {resp.json().get('msg', '')} url -> {self.start_url}") resp2 = self.session.get(self.start_url) self.qas = [] if resp.status_code == 200: self.parse_q_a_exam(resp2.text) else: self.log(f"开始考试: {resp.json().get('msg', '')}") return False return True def work_submit( self, answerID: str, workID: str, answer: str, final: bool, index: int ): headers = self.session.headers headers["x-requested-with"] = "XMLHttpRequest" data = { "answerId": answerID, "workId": workID, "finish": "0", } if "," in answer: answers = answer.split(",") data["answer[]"] = answers # for a in answers: # data.setdefault("answer[]", []).append(a) else: if answer == "": data["answer"] = "A" data["answer"] = answer resp = self.session.post( f"https://{self.host}/user/work/submit", data=data, headers=headers, ) if not final: aa = 1 while aa == 1: print(f"作业链接: {self.start_url}") aa = input("现在是最后一道题,请你去检查后输入 0 后自动提交:") if aa == 0: data["finish"] = "1" resp = self.session.post( f"https://{self.host}/user/work/submit", data=data, headers=headers, ) break if resp.status_code == 200 and resp.json().get("status", False): self.log( f"{index}/{len(self.qas)} 提交答案: {resp.json().get('msg', '')} {answerID} => {answer} {final}" ) def exam_submit( self, answerID: str, workID: str, answer: str, final: bool, index: int ): headers = self.session.headers headers["x-requested-with"] = "XMLHttpRequest" data = { "answerId": answerID, "examId": workID, "finish": "0", } if "," in answer: answers = answer.split(",") data["answer[]"] = answers # for a in answers: # data.setdefault("answer[]", []).append(a) else: if answer == "": data["answer"] = "A" data["answer"] = answer resp = self.session.post( f"https://{self.host}/user/exam/submit", data=data, headers=headers, ) if resp.status_code == 200 and resp.json().get("status", False): self.log( f"{index}/{len(self.qas)} 提交答案: {resp.json().get('msg', '')} {answerID} => {answer} {final}" ) if not final: aa = 1 while aa == 1: print(f"考试链接: {self.start_url}") aa = int(input("现在是最后一道,请你去检查后输入 0 后自动提交:")) if aa == 0: data["finish"] = "1" resp = self.session.post( f"https://{self.host}/user/exam/submit", data=data, headers=headers, ) break def get_course(self, _id: str) -> tuple[int, int]: resp = self.session.get("https://{}/user/node?nodeId={}".format(self.host, _id)) if resp.status_code == 200 and "错误提示" not in resp.text: return self.parse_course(resp.text) elif "错误提示" in resp.text: msg = etree.HTML(resp.text).xpath('//div[@class="name"]/text()')[0] self.log(msg) return 0, 1 self.log(resp.text) return 0, 1 def add_reply(self, content: str, courseId: str, nodeId: str): headers = self.session.headers headers["x-requested-with"] = "XMLHttpRequest" resp = self.session.post( f"https://{self.host}/user/node_discuss/add_reply", data={ "content": replace_html_entities(content), "images": "", "files": "", "courseId": courseId, "nodeId": nodeId, }, headers=headers, ) if resp.status_code == 200 and resp.json().get("status", False): self.log( f"添加评论: {resp.json().get('msg', '')} content -> {replace_html_entities(content)}" ) def delete_reply(self, courseId: str, nodeId: str, replyId: str): headers = self.session.headers headers["x-requested-with"] = "XMLHttpRequest" resp = self.session.get( f"https://{self.host}/user/node_discuss/delete_reply", params={"courseId": courseId, "nodeId": nodeId, "replyId": replyId}, headers=headers, ) if resp.status_code == 200 and resp.json().get("status", False): self.log(f"删除评论: {resp.json().get('msg', '')}") def reply(self, courseId: str, nodeId: str): headers = self.session.headers headers["x-requested-with"] = "XMLHttpRequest" resp = self.session.get( f"https://{self.host}/user/node_discuss/reply", params={ "courseId": courseId, "nodeId": nodeId, "_": str(int(time.time() * 1000)), }, headers=headers, ) if ( resp.status_code == 200 and resp.json().get("status", False) and resp.json().get("pageInfo", {}).get("recordsCount", 0) > 0 ): self.add_reply( resp.json().get("list", [{}])[0].get("content", ""), courseId, nodeId ) # self.delete_reply( # courseId, nodeId, resp.json().get("list", [{}])[0].get("id", "") # ) # return resp.json().get("list", [{}])[0] # return {} def get_course_id(self, _id): resp = self.session.get( "https://{}/user/course?courseId={}".format(self.host, _id) ) tree = etree.HTML(resp.text) self.get_course( "".join( tree.xpath('//div[@class="ncoursecon-intro"]/div/div[1]/a/@href') ).split("=")[-1] ) def get_study_record(self, course_id: str, page=1): resp = self.session.get( f"https://{self.host}/user/study_record.json", params={ "courseId": course_id, "page": page, "_": str(int(time.time() * 1000)), }, ) if resp.status_code == 200 and resp.json().get("status", False): self.log(f"{resp.json().get('msg', '')} page -> {page}") self.courses.extend(resp.json().get("list", [])) if page < resp.json().get("pageInfo", {}).get("pageCount", 0): self.get_study_record(course_id, page + 1) def get_work_record(self, course_id: str, user_id: str, page=1): resp = self.session.get( f"https://{self.host}/user/study_record/work.json", params={ "courseId": course_id, # "userId": user_id, "page": page, "_": str(int(time.time() * 1000)), }, ) if resp.status_code == 200 and resp.json().get("status", False): self.log(f"{resp.json().get('msg', '')} page -> {page}") self.works.extend(resp.json().get("list", [])) if page < resp.json().get("pageInfo", {}).get("pageCount", 0): self.get_study_record(course_id, page + 1) def get_exam_record(self, course_id: str, page=1): resp = self.session.get( f"https://{self.host}/user/study_record/exam.json", params={ "courseId": course_id, "page": page, "_": str(int(time.time() * 1000)), }, ) if resp.status_code == 200 and resp.json().get("status", False): self.log(f"{resp.json().get('msg', '')} page -> {page}") self.exams.extend(resp.json().get("list", [])) if page < resp.json().get("pageInfo", {}).get("pageCount", 0): self.get_exam_record(course_id, page + 1) def log(self, *args, **kwargs) -> None: exit_flag = False time_str = datetime.now().strftime("[%Y-%m-%d %H:%M:%S]") prefix = f"[{self.user.get('name', '')}] {time_str}" # 检查 args 中是否有 \r,若有则将前缀拼在 \r 之后 if args and isinstance(args[0], str) and "\r" in args[0]: # 将前缀插入到最后一个 \r 后面 msg = args[0].rsplit("\r", 1) log_content = ( f"{msg[0]}\r{prefix} {msg[1]}" if len(msg) > 1 else f"{prefix} {args[0]}" ) else: log_content = f"{prefix} {' '.join(map(str, args))}" print(log_content, **kwargs) if exit_flag: os._exit(0) def online(self) -> bool: resp = self.session.post(f"https://{self.host}/user/online") if resp.status_code == 200 and resp.json().get("status", False): return True return False def study_start(self, node_id: str) -> None: headers = self.session.headers headers["x-requested-with"] = "XMLHttpRequest" resp = self.session.post( f"https://{self.host}/user/node/study", data={ "nodeId": node_id, "studyId": 0, "studyTime": 1, "code": self.coder(), }, headers=headers, ) try: if resp.status_code == 200 and resp.json().get("status", False): self.log(f"{resp.json().get('msg', '')} --> 1", end="", flush=True) self.study_id = resp.json().get("studyId", 0) except: if resp.status_code == 200: data = json.loads( re.findall(">var data =(.*?);", resp.text)[0] ) self.log(data.get("msg", "")) self.study_id = data.get("studyId", 0) def study(self, node_id: str, study_time: str) -> None: headers = self.session.headers headers["x-requested-with"] = "XMLHttpRequest" resp = self.session.post( f"https://{self.host}/user/node/study", data={ "nodeId": node_id, "studyId": self.study_id, "studyTime": study_time, }, headers=headers, ) if resp.status_code == 200 and resp.json().get("status", False): self.log( f"\r{' ' * 100}\r{resp.json().get('msg', '')} --> {study_time}", end="", flush=True, ) else: self.study(node_id, study_time) def study_over(self, node_id, study_time: str) -> bool: headers = self.session.headers headers["x-requested-with"] = "XMLHttpRequest" resp = self.session.post( f"https://{self.host}/user/node/study", data={ "nodeId": node_id, "studyId": self.study_id, "studyTime": study_time, "close": "1", }, headers=headers, ) if resp.status_code == 200 and resp.json().get("status", False): self.log( f"\r{' ' * 100}\r{resp.json().get('msg', '')} --> {study_time}", flush=True, ) return True return False def get_user(self) -> None: resp = self.session.get(f"https://{self.host}/user") if resp.status_code == 200: self.parse_user(resp.text) else: self.log(resp.text, exit=True) def start_loop(self): while True: time.sleep(120) self.online() def run(self) -> None: print("当前为刷课模式: ") while self.login(): self.get_user() thread = Thread(target=self.start_loop, daemon=True) thread.start() cs = self.user.get("courses", []) print( f"姓名: {self.user.get('name', '')}\n\t互评: {self.user.get('huping', 0)}\n\t乐学园: {self.user.get('lexueyuan', 0)}\n\t讨论主题: {self.user.get('taolunzhuti', 0)}\n\t学习时长: {self.user.get('study_time', '')}" ) if len(cs) > 1: for i, _c in enumerate(cs): print(f"{i + 1}: {_c.get('name', '')} {_c.get('progress', '')}") cs_order = input("请输入刷课顺序(用英文逗号分隔,不需要耍的不用加): ") cs_list = [cs[int(_c) - 1] for _c in cs_order.split(",")] cs_list = cs for c in cs_list: print(f"课程名: {c.get('name', '')}\n\t进度: {c.get('progress', '')}") self.log(f"{c.get('name', '')} --> {c.get('id', '')}") self.get_study_record(c.get("id", "")) for l in self.courses: if "已学" in l.get("state", ""): # 用于将之前已学过的视频进行补评论,请自行解除注释 # if isReply: # self.reply(c.get("id", ""), l.get("id", "")) self.log( f"{l.get('name', '')} --> {l.get('id', '')} --> 已经学习过了" ) continue if isReply: self.reply(c.get("id", ""), l.get("id", "")) node_id = l.get("id", "") total_time = 0 video_time, video_state = self.get_course(node_id) self.log( f"{l.get('name', '')} --> {l.get('id', '')} --> {video_time} --> {video_state}" ) if video_time <= 0: continue self.study_start(node_id) while True: elapsed = video_time - total_time if elapsed <= 0: self.study_over(node_id, f"{video_time}") break time.sleep(30) total_time += 30 self.study(node_id, f"{total_time}") self.log("所有课程已经结束") break def work(self): print("当前为刷作业模式: ") while self.login(): self.get_user() thread = Thread(target=self.start_loop, daemon=True) thread.start() cs = self.user.get("courses", []) print( f"姓名: {self.user.get('name', '')}\n\t互评: {self.user.get('huping', 0)}\n\t乐学园: {self.user.get('lexueyuan', 0)}\n\t讨论主题: {self.user.get('taolunzhuti', 0)}\n\t学习时长: {self.user.get('study_time', '')}" ) if len(cs) > 1: for i, _c in enumerate(cs): print(f"{i + 1}: {_c.get('name', '')} {_c.get('progress', '')}") cs_order = input("请输入刷课顺序(用英文逗号分隔,不需要耍的不用加): ") cs_list = [cs[int(_c) - 1] for _c in cs_order.split(",")] cs_list = cs for c in cs_list: self.get_work_record(c.get("id", ""), "") # print(self.works) for work in self.works: self.qas = [] if "已阅" in work.get("state"): self.log( f"{work.get('title', '')} 已阅 {work.get('finalScore', '')}" ) continue if not self.get_question_answer_work( c.get("id", ""), work.get("nodeId", ""), work.get("id", "") ): continue for q in self.qas: # print(q) chooise = self.ai.chat(q) chooise = chooise if chooise else "C" self.work_submit( q.get("id", ""), work.get("id", ""), chooise, int(q.get("index", 1)) < len(self.qas), q.get("index", 1), ) time.sleep(2) # break break def exam(self): while self.login(): self.get_user() thread = Thread(target=self.start_loop, daemon=True) thread.start() cs = self.user.get("courses", []) print( f"姓名: {self.user.get('name', '')}\n\t互评: {self.user.get('huping', 0)}\n\t乐学园: {self.user.get('lexueyuan', 0)}\n\t讨论主题: {self.user.get('taolunzhuti', 0)}\n\t学习时长: {self.user.get('study_time', '')}" ) if len(cs) > 1: for i, _c in enumerate(cs): print(f"{i + 1}: {_c.get('name', '')} {_c.get('progress', '')}") cs_order = input("请输入刷课顺序(用英文逗号分隔,不需要耍的不用加): ") cs_list = [cs[int(_c) - 1] for _c in cs_order.split(",")] cs_list = cs for c in cs_list: self.get_exam_record(c.get("id", "")) print(self.exams) for exam in self.exams: self.qas = [] if "线下阅" in exam.get("state"): self.log( f"{exam.get('title', '')} 已阅 {exam.get('finalScore', '')}" ) continue print(exam) if not self.get_question_answer_exam( c.get("id", ""), exam.get("nodeId", ""), exam.get("id", "") ): continue for q in self.qas: # print(q) chooise = self.ai.chat(q) chooise = chooise if chooise else "A" self.exam_submit( q.get("id", ""), exam.get("id", ""), chooise, int(q.get("index", 1)) < len(self.qas), q.get("index", 1), ) time.sleep(2) # break break if __name__ == "__main__": host_server = [ # ["cqcst.yuruixxkj.com", "御瑞科技(选修课)"], # ["cqcst.zjxkeji.com", "重庆城市科技学院(其他1)"], # ["cqcst.yuncanjykeji.com", "重庆城市科技学院实训平台(其他2)"], ["cqcst.leykeji.com", "劳动课程测评考试平台"], # ["cqcst.zjxkeji.com", "公益课程平台"], ["cqcst.suwankj.com", "在线课程测评考试平台"], # ["cqcst.yuruixxkj.com", "在线测评考试平台"], ] for i, h in enumerate(host_server): print(f"{i + 1}: {h[0]} {h[1]}") c = int(input("请选择刷课平台: ")) - 1 u = os.environ.get("YURU_ACCOUNT", "") p = os.environ.get("YURU_PASSWORD", "") if u == "": u = input("请输入账号: ") if p == "": p = input("请输入密码: ") # 刷课程评论, 不需要用的时候可以直接注释 # r = input("是否需要评论(不需要[0],需要[1]): ") # if int(r) == 1: # isReply = True # else: # isReply = False # print(bool(isReply)) # if u is None and p is None and c in [0, 1, 2]: # os._exit(0) # 刷视频,不需要用的时候可以直接注释 # CKWK(f"{u}", f"{p}", host_server[c][0]).run() # 刷作业,不需要用的时候可以直接注释 CKWK(f"{u}", f"{p}", host_server[c][0]).work() # 考试,不需要用的时候可以直接注释 # CKWK(f"{u}", f"{p}", host_server[c][0]).exam()