commit f45c1f4a1c54849e14222b162a42e47528c93c30 Author: zhilv Date: Wed Nov 5 13:00:58 2025 +0800 feat(*): 初始化代码仓库 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2d19fc7 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.html diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..88d6dae --- /dev/null +++ b/LICENSE @@ -0,0 +1,18 @@ +MIT License + +Copyright (c) 2025 zhilv + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and +associated documentation files (the "Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the +following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial +portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT +LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO +EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..bd5a4cb --- /dev/null +++ b/README.md @@ -0,0 +1,36 @@ +# 重庆城市科技学院刷课 + +## CKWK 自动刷课脚本 + +### 介绍 + +`CKWK` 是一个 Python 脚本,旨在帮助用户自动化刷课过程,支持多个学习平台。该脚本通过模拟登录、获取课程信息、解析学习记录以及自动播放视频等方式来实现自动学习。支持验证码识别,适用于一些需要登录并观看课程视频的学习平台。 + +#### 警告: +- 本脚本仅用于学习与技术研究,禁止将其用于任何非法用途。 +- 作者不对因使用本脚本造成的任何损失或后果承担责任。 + +### 功能 + +- 自动登录学习平台 +- 识别验证码进行登录 +- 获取用户的课程信息 +- 自动浏览课程内容,模拟学习行为 +- 自动获取学习记录,跳过已学课程 +- 定时保持在线状态 + +### 使用环境 + +- Python 3.x +- 安装以下依赖库: + - `requests` + - `ddddocr` + - `lxml` + - `python-dotenv` + - `lxml` + +可以使用 `pip` 安装: + +```bash +pip install requests ddddocr lxml python-dotenv lxml +``` \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..8d2b06b --- /dev/null +++ b/main.py @@ -0,0 +1,393 @@ +""" +-------------------------------------------- +文件名称: ckwk.py +作者: zhilv +邮箱: zhilv666@qq.com +版本: 1.0 +-------------------------------------------- +说明: +本脚本仅用于学习与技术研究,禁止将其用于任何非法用途。 +作者不对因使用本脚本造成的任何损失或后果承担责任。 +-------------------------------------------- +""" + +import time +import os +import requests +import re +import json +import random +import ddddocr +from datetime import datetime +from dotenv import load_dotenv +from lxml import etree +from threading import Thread +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry +import warnings + +warnings.filterwarnings("ignore") + + +load_dotenv() + + +class CKWK: + def __init__(self, un: str, pw: str, host: str) -> None: + self.session = requests.Session() + # 定义重试策略 + retry_strategy = Retry( + total=5, # 最多重试 5 次 + backoff_factor=1, # 重试间隔:1s, 2s, 4s, 8s... + status_forcelist=[429, 500, 502, 503, 504], # 哪些状态码触发重试 + allowed_methods=["HEAD", "GET", "OPTIONS", "POST"], # 允许重试的方法 + ) + + # 挂载适配器 + adapter = HTTPAdapter(max_retries=retry_strategy) + self.session.mount("http://", adapter) + self.session.mount("https://", adapter) + # self.session.verify = False + # self.session.proxies.update( + # { + # "http": "http://127.0.0.1:9000", + # "https": "http://127.0.0.1:9000", + # } + # ) + self.username = un + self.host = host + self.password = pw + self.user = dict() + self.courses = [] + self.ocr = ddddocr.DdddOcr() + self.session.headers.update( + { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36 Edg/140.0.0.0", + "Accept": "application/json, text/javascript, */*; q=0.01", + "Accept-Encoding": "gzip, deflate, br, zstd", + } + ) + + def coder(self) -> str: + resp = self.session.get( + f"https://{self.host}/service/code", + params={"r": f"{random.random()}"}, + ) + if resp.status_code == 200: + return f"{self.ocr.classification(resp.content)}" + return "" + + def login(self) -> bool: + yzm = self.coder() + resp = self.session.post( + f"https://{self.host}/user/login", + data={ + "username": self.username, + "password": self.password, + "code": yzm, + "redirect": "", + }, + ) + if "登录成功" in resp.text: + return True + self.log(resp.text) + return False + + def parse_user(self, html: str) -> None: + tree = etree.HTML(html) + try: + self.user = { + "name": tree.xpath('//div[@class="name"]/text()')[0], + "course_number": tree.xpath( + '//div[@class="intro"]/div[1]/div[2]/text()' + )[0], + "huping": tree.xpath('//div[@class="intro"]/div[2]/div[2]/text()')[0], + "lexueyuan": tree.xpath('//div[@class="intro"]/div[3]/div[2]/text()')[ + 0 + ], + "taolunzhuti": tree.xpath('//div[@class="intro"]/div[4]/div[2]/text()')[ + 0 + ], + "study_time": "".join( + tree.xpath('//div[@class="intro"]/div[5]/div[2]//text()') + ), + "courses": [ + { + "name": course.xpath('.//div[@class="name"]/a/text()')[0], + "href": course.xpath('.//div[@class="name"]/a/@href')[0], + "id": course.xpath('.//div[@class="name"]/a/@href')[0].split( + "=" + )[-1], + "progress": course.xpath( + './/div[@class="progress"]/div[3]/text()' + )[0], + } + for course in tree.xpath('//div[@class="user-course"]/div') + ], + } + # print(self.user) + except Exception as e: + self.log("[parse] 提取用户信息失败: ", e) + raise e + finally: + pass + + def parse_course(self, html: str) -> tuple[int, int]: + tree = etree.HTML(html) + video = 0 + state = 1 + try: + if self.courses == []: + courses = [ + { + "title": i.get("title", ""), + "href": i.get("href", ""), + "id": i.get("href", "").split("=")[-1], + "class": i.get("class", ""), + } + for i in tree.xpath( + # '//div[@class="item"]/a[contains(text(), "mp4")]' + '//div[@class="item"]/a' + # '//div[@class="item"]/a[not(contains(text(), "考试") or contains(text(), "课件") or contains(text(), "练习") or contains(text(), "实验"))]' + ) + ] + index = next( + (i for i, c in enumerate(courses) if c["class"] == "on"), None + ) + + if index is not None: + self.courses = courses[index:] + # print(self.courses) + video = int(tree.xpath('.//input[@id="video-duration"]/@value')[0]) + state = int(tree.xpath('.//input[@id="study-state"]/@value')[0]) + except Exception as e: + self.log("[parse] 提取课程列表失败: ", e) + # raise e + finally: + return video, state + + def get_course(self, _id: str) -> tuple[int, int]: + resp = self.session.get("https://{}/user/node?nodeId={}".format(self.host, _id)) + if resp.status_code == 200 and "错误提示" not in resp.text: + return self.parse_course(resp.text) + elif "错误提示" in resp.text: + msg = etree.HTML(resp.text).xpath('//div[@class="name"]/text()')[0] + self.log(msg) + return 0, 1 + self.log(resp.text) + return 0, 1 + + def get_course_id(self, _id): + resp = self.session.get( + "https://{}/user/course?courseId={}".format(self.host, _id) + ) + tree = etree.HTML(resp.text) + + self.get_course( + "".join( + tree.xpath('//div[@class="ncoursecon-intro"]/div/div[1]/a/@href') + ).split("=")[-1] + ) + + def get_study_record(self, course_id: str, page=1): + resp = self.session.get( + f"https://{self.host}/user/study_record.json", + params={ + "courseId": course_id, + "page": page, + "_": str(int(time.time() * 1000)), + }, + ) + if resp.status_code == 200 and resp.json().get("status", False): + self.log(f'{resp.json().get("msg", "")} page -> {page}') + self.courses.extend(resp.json().get("list", [])) + if page <= resp.json().get("pageInfo", {}).get("pageCount", 0): + self.get_study_record(course_id, page + 1) + + def log(self, *args, **kwargs) -> None: + exit_flag = False + time_str = datetime.now().strftime("[%Y-%m-%d %H:%M:%S]") + prefix = f'[{self.user.get("name", "")}] {time_str}' + # 检查 args 中是否有 \r,若有则将前缀拼在 \r 之后 + if args and isinstance(args[0], str) and "\r" in args[0]: + # 将前缀插入到最后一个 \r 后面 + msg = args[0].rsplit("\r", 1) + log_content = ( + f"{msg[0]}\r{prefix} {msg[1]}" + if len(msg) > 1 + else f"{prefix} {args[0]}" + ) + else: + log_content = f'{prefix} {" ".join(map(str, args))}' + print(log_content, **kwargs) + if exit_flag: + os._exit(0) + + def online(self) -> bool: + resp = self.session.post(f"https://{self.host}/user/online") + if resp.status_code == 200 and resp.json().get("status", False): + return True + return False + + def study_start(self, node_id: str) -> None: + headers = self.session.headers + headers["x-requested-with"] = "XMLHttpRequest" + resp = self.session.post( + f"https://{self.host}/user/node/study", + data={ + "nodeId": node_id, + "studyId": 0, + "studyTime": 1, + "code": self.coder(), + }, + headers=headers, + ) + try: + if resp.status_code == 200 and resp.json().get("status", False): + self.log(f'{resp.json().get("msg", "")} --> 1', end="", flush=True) + self.study_id = resp.json().get("studyId", 0) + except: + if resp.status_code == 200: + data = json.loads( + re.findall(">var data =(.*?);", resp.text)[0] + ) + self.log(data.get("msg", "")) + self.study_id = data.get("studyId", 0) + + def study(self, node_id: str, study_time: str) -> None: + headers = self.session.headers + headers["x-requested-with"] = "XMLHttpRequest" + resp = self.session.post( + f"https://{self.host}/user/node/study", + data={ + "nodeId": node_id, + "studyId": self.study_id, + "studyTime": study_time, + }, + headers=headers, + ) + if resp.status_code == 200 and resp.json().get("status", False): + self.log( + f'\r{" " * 100}\r{resp.json().get("msg", "")} --> {study_time}', + end="", + flush=True, + ) + else: + self.study(node_id, study_time) + + def study_over(self, node_id, study_time: str) -> bool: + headers = self.session.headers + headers["x-requested-with"] = "XMLHttpRequest" + resp = self.session.post( + f"https://{self.host}/user/node/study", + data={ + "nodeId": node_id, + "studyId": self.study_id, + "studyTime": study_time, + "close": "1", + }, + headers=headers, + ) + if resp.status_code == 200 and resp.json().get("status", False): + self.log( + f'\r{" " * 100}\r{resp.json().get("msg", "")} --> {study_time}', + flush=True, + ) + return True + return False + + def get_user(self) -> None: + resp = self.session.get(f"https://{self.host}/user") + if resp.status_code == 200: + self.parse_user(resp.text) + else: + self.log(resp.text, exit=True) + + def start_loop(self): + while True: + time.sleep(120) + self.online() + + def run(self) -> None: + while self.login(): + self.get_user() + thread = Thread(target=self.start_loop, daemon=True) + thread.start() + cs = self.user.get("courses", []) + if len(cs) > 1: + for i, _c in enumerate(cs): + print(f"{i+1}: {_c.get("name", "")} {_c.get("progress", "")}") + cs_order = input("请输入刷课顺序(用英文逗号分隔,不需要耍的不用加): ") + cs_list = [cs[int(_c) - 1] for _c in cs_order.split(",")] + + cs_list = cs + # for c in self.user.get("courses", []): + for c in cs_list: + # if c.get("progress", "0%").split("%")[0] == 100: + # self.log(f"{c.get("name", "")} --> {c.get("id", "")} 已经完成") + # continue + self.log(f"{c.get("name", "")} --> {c.get("id", "")}") + # self.get_course_id(c.get("id", "")) + self.get_study_record(c.get("id", "")) + # print(self.courses) + # os._exit(0) + for l in self.courses: + if "已学" in l.get("state", "") : + self.log( + f'{l.get("name", "")} --> {l.get("id", "")} --> 已经学习过了' + ) + continue + # elif "1" == l.get("lock", "0"): + # self.log( + # f'{l.get("name", "")} --> {l.get("id", "")} --> 未解锁' + # ) + # continue + + node_id = l.get("id", "") + total_time = 0 + video_time, video_state = self.get_course(node_id) + self.log( + f'{l.get("name", "")} --> {l.get("id", "")} --> {video_time} --> {video_state}' + ) + # if video_state == 2: + # self.log( + # f'{l.get("name", "")} --> {l.get("id", "")} --> 已经学习过了' + # ) + # continue + if video_time <= 0: + continue + + self.study_start(node_id) + + while True: + elapsed = video_time - total_time + if elapsed <= 0: + self.study_over(node_id, f"{video_time}") + break + time.sleep(30) + total_time += 30 + self.study(node_id, f"{total_time}") + self.log("所有课程已经结束") + break + + +if __name__ == "__main__": + host_server = [ + ["cqcst.yuruixxkj.com", "御瑞科技(选修课)"], + ["cqcst.zjxkeji.com", "重庆城市科技学院(其他1)"], + ["cqcst.yuncanjykeji.com", "重庆城市科技学院实训平台(其他2)"], + ] + for i, h in enumerate(host_server): + print(f"{i+1}: {h[0]} {h[1]}") + c = int(input("请选择刷课平台: ")) - 1 + u = os.environ.get("YURU_ACCOUNT", "") + p = os.environ.get( + "YURU_PASSWORD", + ) + if u == "": + u = input("请输入账号: ") + if p == "": + p = input("请输入密码: ") + if u is None and p is None and c in [0, 1, 2]: + os._exit(0) + CKWK(f"{u}", f"{p}", host_server[c][0]).run()