""" -------------------------------------------- 文件名称: ckwk.py 作者: zhilv 邮箱: zhilv666@qq.com 版本: 1.0 -------------------------------------------- 说明: 本脚本仅用于学习与技术研究,禁止将其用于任何非法用途。 作者不对因使用本脚本造成的任何损失或后果承担责任。 -------------------------------------------- """ import time import os import requests import re import json import random import ddddocr from datetime import datetime from dotenv import load_dotenv from lxml import etree from threading import Thread from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry import warnings warnings.filterwarnings("ignore") load_dotenv() class CKWK: def __init__(self, un: str, pw: str, host: str) -> None: self.session = requests.Session() # 定义重试策略 retry_strategy = Retry( total=5, # 最多重试 5 次 backoff_factor=1, # 重试间隔:1s, 2s, 4s, 8s... status_forcelist=[429, 500, 502, 503, 504], # 哪些状态码触发重试 allowed_methods=["HEAD", "GET", "OPTIONS", "POST"], # 允许重试的方法 ) # 挂载适配器 adapter = HTTPAdapter(max_retries=retry_strategy) self.session.mount("http://", adapter) self.session.mount("https://", adapter) # self.session.verify = False # self.session.proxies.update( # { # "http": "http://127.0.0.1:9000", # "https": "http://127.0.0.1:9000", # } # ) self.username = un self.host = host self.password = pw self.user = dict() self.courses = [] self.ocr = ddddocr.DdddOcr() self.session.headers.update( { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36 Edg/140.0.0.0", "Accept": "application/json, text/javascript, */*; q=0.01", "Accept-Encoding": "gzip, deflate, br, zstd", } ) def coder(self) -> str: resp = self.session.get( f"https://{self.host}/service/code", params={"r": f"{random.random()}"}, ) if resp.status_code == 200: return f"{self.ocr.classification(resp.content)}" return "" def login(self) -> bool: yzm = self.coder() resp = self.session.post( f"https://{self.host}/user/login", data={ "username": self.username, "password": self.password, "code": yzm, "redirect": "", }, ) if "登录成功" in resp.text: return True self.log(resp.text) return False def parse_user(self, html: str) -> None: tree = etree.HTML(html) try: self.user = { "name": tree.xpath('//div[@class="name"]/text()')[0], "course_number": tree.xpath( '//div[@class="intro"]/div[1]/div[2]/text()' )[0], "huping": tree.xpath('//div[@class="intro"]/div[2]/div[2]/text()')[0], "lexueyuan": tree.xpath('//div[@class="intro"]/div[3]/div[2]/text()')[ 0 ], "taolunzhuti": tree.xpath('//div[@class="intro"]/div[4]/div[2]/text()')[ 0 ], "study_time": "".join( tree.xpath('//div[@class="intro"]/div[5]/div[2]//text()') ), "courses": [ { "name": course.xpath('.//div[@class="name"]/a/text()')[0], "href": course.xpath('.//div[@class="name"]/a/@href')[0], "id": course.xpath('.//div[@class="name"]/a/@href')[0].split( "=" )[-1], "progress": course.xpath( './/div[@class="progress"]/div[3]/text()' )[0], } for course in tree.xpath('//div[@class="user-course"]/div') ], } # print(self.user) except Exception as e: self.log("[parse] 提取用户信息失败: ", e) raise e finally: pass def parse_course(self, html: str) -> tuple[int, int]: tree = etree.HTML(html) video = 0 state = 1 try: if self.courses == []: courses = [ { "title": i.get("title", ""), "href": i.get("href", ""), "id": i.get("href", "").split("=")[-1], "class": i.get("class", ""), } for i in tree.xpath( # '//div[@class="item"]/a[contains(text(), "mp4")]' '//div[@class="item"]/a' # '//div[@class="item"]/a[not(contains(text(), "考试") or contains(text(), "课件") or contains(text(), "练习") or contains(text(), "实验"))]' ) ] index = next( (i for i, c in enumerate(courses) if c["class"] == "on"), None ) if index is not None: self.courses = courses[index:] # print(self.courses) video = int(tree.xpath('.//input[@id="video-duration"]/@value')[0]) state = int(tree.xpath('.//input[@id="study-state"]/@value')[0]) except Exception as e: self.log("[parse] 提取课程列表失败: ", e) # raise e finally: return video, state def get_course(self, _id: str) -> tuple[int, int]: resp = self.session.get("https://{}/user/node?nodeId={}".format(self.host, _id)) if resp.status_code == 200 and "错误提示" not in resp.text: return self.parse_course(resp.text) elif "错误提示" in resp.text: msg = etree.HTML(resp.text).xpath('//div[@class="name"]/text()')[0] self.log(msg) return 0, 1 self.log(resp.text) return 0, 1 def get_course_id(self, _id): resp = self.session.get( "https://{}/user/course?courseId={}".format(self.host, _id) ) tree = etree.HTML(resp.text) self.get_course( "".join( tree.xpath('//div[@class="ncoursecon-intro"]/div/div[1]/a/@href') ).split("=")[-1] ) def get_study_record(self, course_id: str, page=1): resp = self.session.get( f"https://{self.host}/user/study_record.json", params={ "courseId": course_id, "page": page, "_": str(int(time.time() * 1000)), }, ) if resp.status_code == 200 and resp.json().get("status", False): self.log(f'{resp.json().get("msg", "")} page -> {page}') self.courses.extend(resp.json().get("list", [])) if page <= resp.json().get("pageInfo", {}).get("pageCount", 0): self.get_study_record(course_id, page + 1) def log(self, *args, **kwargs) -> None: exit_flag = False time_str = datetime.now().strftime("[%Y-%m-%d %H:%M:%S]") prefix = f'[{self.user.get("name", "")}] {time_str}' # 检查 args 中是否有 \r,若有则将前缀拼在 \r 之后 if args and isinstance(args[0], str) and "\r" in args[0]: # 将前缀插入到最后一个 \r 后面 msg = args[0].rsplit("\r", 1) log_content = ( f"{msg[0]}\r{prefix} {msg[1]}" if len(msg) > 1 else f"{prefix} {args[0]}" ) else: log_content = f'{prefix} {" ".join(map(str, args))}' print(log_content, **kwargs) if exit_flag: os._exit(0) def online(self) -> bool: resp = self.session.post(f"https://{self.host}/user/online") if resp.status_code == 200 and resp.json().get("status", False): return True return False def study_start(self, node_id: str) -> None: headers = self.session.headers headers["x-requested-with"] = "XMLHttpRequest" resp = self.session.post( f"https://{self.host}/user/node/study", data={ "nodeId": node_id, "studyId": 0, "studyTime": 1, "code": self.coder(), }, headers=headers, ) try: if resp.status_code == 200 and resp.json().get("status", False): self.log(f'{resp.json().get("msg", "")} --> 1', end="", flush=True) self.study_id = resp.json().get("studyId", 0) except: if resp.status_code == 200: data = json.loads( re.findall(">var data =(.*?);", resp.text)[0] ) self.log(data.get("msg", "")) self.study_id = data.get("studyId", 0) def study(self, node_id: str, study_time: str) -> None: headers = self.session.headers headers["x-requested-with"] = "XMLHttpRequest" resp = self.session.post( f"https://{self.host}/user/node/study", data={ "nodeId": node_id, "studyId": self.study_id, "studyTime": study_time, }, headers=headers, ) if resp.status_code == 200 and resp.json().get("status", False): self.log( f'\r{" " * 100}\r{resp.json().get("msg", "")} --> {study_time}', end="", flush=True, ) else: self.study(node_id, study_time) def study_over(self, node_id, study_time: str) -> bool: headers = self.session.headers headers["x-requested-with"] = "XMLHttpRequest" resp = self.session.post( f"https://{self.host}/user/node/study", data={ "nodeId": node_id, "studyId": self.study_id, "studyTime": study_time, "close": "1", }, headers=headers, ) if resp.status_code == 200 and resp.json().get("status", False): self.log( f'\r{" " * 100}\r{resp.json().get("msg", "")} --> {study_time}', flush=True, ) return True return False def get_user(self) -> None: resp = self.session.get(f"https://{self.host}/user") if resp.status_code == 200: self.parse_user(resp.text) else: self.log(resp.text, exit=True) def start_loop(self): while True: time.sleep(120) self.online() def run(self) -> None: while self.login(): self.get_user() thread = Thread(target=self.start_loop, daemon=True) thread.start() cs = self.user.get("courses", []) if len(cs) > 1: for i, _c in enumerate(cs): print(f"{i+1}: {_c.get("name", "")} {_c.get("progress", "")}") cs_order = input("请输入刷课顺序(用英文逗号分隔,不需要耍的不用加): ") cs_list = [cs[int(_c) - 1] for _c in cs_order.split(",")] cs_list = cs # for c in self.user.get("courses", []): for c in cs_list: # if c.get("progress", "0%").split("%")[0] == 100: # self.log(f"{c.get("name", "")} --> {c.get("id", "")} 已经完成") # continue self.log(f"{c.get("name", "")} --> {c.get("id", "")}") # self.get_course_id(c.get("id", "")) self.get_study_record(c.get("id", "")) # print(self.courses) # os._exit(0) for l in self.courses: if "已学" in l.get("state", ""): self.log( f'{l.get("name", "")} --> {l.get("id", "")} --> 已经学习过了' ) continue # elif "1" == l.get("lock", "0"): # self.log( # f'{l.get("name", "")} --> {l.get("id", "")} --> 未解锁' # ) # continue node_id = l.get("id", "") total_time = 0 video_time, video_state = self.get_course(node_id) self.log( f'{l.get("name", "")} --> {l.get("id", "")} --> {video_time} --> {video_state}' ) # if video_state == 2: # self.log( # f'{l.get("name", "")} --> {l.get("id", "")} --> 已经学习过了' # ) # continue if video_time <= 0: continue self.study_start(node_id) while True: elapsed = video_time - total_time if elapsed <= 0: self.study_over(node_id, f"{video_time}") break time.sleep(30) total_time += 30 self.study(node_id, f"{total_time}") self.log("所有课程已经结束") break if __name__ == "__main__": host_server = [ ["cqcst.yuruixxkj.com", "御瑞科技(选修课)"], ["cqcst.zjxkeji.com", "重庆城市科技学院(其他1)"], ["cqcst.yuncanjykeji.com", "重庆城市科技学院实训平台(其他2)"], ] for i, h in enumerate(host_server): print(f"{i+1}: {h[0]} {h[1]}") c = int(input("请选择刷课平台: ")) - 1 u = os.environ.get("YURU_ACCOUNT", "") p = os.environ.get("YURU_PASSWORD", "") if u == "": u = input("请输入账号: ") if p == "": p = input("请输入密码: ") if u is None and p is None and c in [0, 1, 2]: os._exit(0) CKWK(f"{u}", f"{p}", host_server[c][0]).run()