Compare commits

...

14 Commits

8 changed files with 626 additions and 44 deletions

8
.env.example Normal file
View File

@@ -0,0 +1,8 @@
# 刷课网站
YURU_ACCOUNT=xxx
YURU_PASSWORD=xxx
# AI 大模型的 API KEY
DEEPSEEK_API_KEY=sk-3bxxx1
DOUBAO_API_KEY=2xxx
KIMI_API_KEY=sk-SxxxP

4
.gitignore vendored
View File

@@ -1 +1,5 @@
*.html *.html
__pycache__/
*.pyc
*.pyo
*.pyd

View File

@@ -23,6 +23,7 @@
- 自动浏览课程内容,模拟学习行为 - 自动浏览课程内容,模拟学习行为
- 自动获取学习记录,跳过已学课程 - 自动获取学习记录,跳过已学课程
- 定时保持在线状态 - 定时保持在线状态
- 自动进行使用 **豆包** 或者 **Deepseek** 进行答题或者考试
### 使用环境 ### 使用环境
@@ -33,19 +34,24 @@
- `lxml` - `lxml`
- `python-dotenv` - `python-dotenv`
- `lxml` - `lxml`
- `openai`
可以使用 `pip` 安装: 可以使用 `pip` 安装:
```bash ```bash
pip install requests ddddocr lxml python-dotenv lxml pip install requests ddddocr lxml python-dotenv lxml openai
``` ```
### 免输入密码 ### 免输入密码
- 在main.py文件同级别目录下创建`.env`文件 - `.env.example`复制为".env"
- 在文件中添加如下内容 - 在文件中添加如下内容
```sh ```sh
YURU_ACCOUNT=你的用户名 YURU_ACCOUNT=你的用户名
YURU_PASSWORD=你的密码 YURU_PASSWORD=你的密码
DEEPSEEK_API_KEY=你的 deepseek api 密钥
DOUBAO_API_KEY=你的 豆包 api 密钥
KIMI_API_KEY=你的 KIMI api 密钥
``` ```
- 运行`main.py`文件 - 运行`main.py`文件
```sh ```sh

66
deepseek.py Normal file
View File

@@ -0,0 +1,66 @@
# Please install OpenAI SDK first: `pip3 install openai`
import os
import json
from openai import OpenAI
from dotenv import load_dotenv
load_dotenv()
class DeepSeek:
system_prompt = (
"You are a professional QA answer assistant.\n"
"The user will provide a JSON object containing a question.\n"
"Your job:\n"
"1. Read the 'title' and the 'chooies'.\n"
"2. Determine the correct answer(s).\n"
"3. Output ONLY the 'num' of the correct choices.\n"
"4. If multiple answers exist, join them with English commas, e.g., 'A,C,D'.\n"
"5. If you cannot determine the answer confidently, return an empty string.\n"
"6. Do not output explanations or any extra text."
)
def __init__(self) -> None:
self.key = os.environ.get("DEEPSEEK_API_KEY")
if not self.key:
raise Exception("找不到 DEEPSEEK_API_KEY 请设置环境变量在运行代码")
self.client = OpenAI(
api_key=self.key,
base_url="https://api.deepseek.com",
)
def chat(self, question_json: dict) -> str:
response = self.client.chat.completions.create(
# model="deepseek-chat",
# model="deepseek-reasoner",
# model="deepseek-v4-pro",
model="deepseek-v4-flash",
messages=[
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": json.dumps(question_json)},
],
stream=False,
)
# print(response.choices[0].message.content)
return (
response.choices[0].message.content
if response.choices[0].message.content
else ""
)
if __name__ == "__main__":
question = {
"id": "25594265",
"index": "1",
"type": "单选",
"title": "下列哪个属于AI视觉识别应用",
"chooies": [
{"num": "A", "txt": "音乐合成"},
{"num": "B", "txt": "植物识别"},
{"num": "C", "txt": "文案创作"},
{"num": "D", "txt": "机器翻译"},
],
}
DeepSeek().chat(question)

74
doubao.py Normal file
View File

@@ -0,0 +1,74 @@
# Please install OpenAI SDK first: `pip3 install openai`
import os
import json
from openai import OpenAI
from dotenv import load_dotenv
from httpx import Client, Proxy
load_dotenv()
class DOUBAO:
system_prompt = (
"You are a professional QA answer assistant.\n"
"The user will provide a JSON object containing a question.\n"
"Your job:\n"
"1. Read the 'title' and the 'chooies'.\n"
"2. Determine the correct answer(s).\n"
"3. Output ONLY the 'num' of the correct choices.\n"
"4. If multiple answers exist, join them with English commas, e.g., 'A,C,D'.\n"
"5. If you cannot determine the answer confidently, return an empty string.\n"
"6. Do not output explanations or any extra text."
)
def __init__(self) -> None:
self.key = os.environ.get("DOUBAO_API_KEY")
if not self.key:
raise Exception("找不到 DOUBAO_API_KEY 请设置环境变量在运行代码")
httpx_client = Client(
# proxy=Proxy("socks5://127.0.0.1:9000"), # 传入SOCKS代理地址
timeout=30.0, # 可选:设置超时时间
# verify=False,
)
self.client = OpenAI(
api_key=self.key,
base_url="https://ark.cn-beijing.volces.com/api/v3",
http_client=httpx_client,
)
def chat(self, question_json: dict) -> str:
response = self.client.chat.completions.create(
# model="deepseek-chat",
model="doubao-seed-code-preview-251028",
# model="deepseek-reasoner",
messages=[
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": json.dumps(question_json)},
],
stream=False,
)
# print(response.choices[0].message.content)
return (
response.choices[0].message.content
if response.choices[0].message.content
else ""
)
if __name__ == "__main__":
question = {
"id": "25594265",
"index": "1",
"type": "单选",
"title": "下列哪个属于AI视觉识别应用",
"chooies": [
{"num": "A", "txt": "音乐合成"},
{"num": "B", "txt": "植物识别"},
{"num": "C", "txt": "文案创作"},
{"num": "D", "txt": "机器翻译"},
],
}
DOUBAO().chat(question)

73
kimi.py Normal file
View File

@@ -0,0 +1,73 @@
# pip install openai python-dotenv
import os
import json
from openai import OpenAI
from dotenv import load_dotenv
from httpx import Client, Proxy
load_dotenv()
class KIMI:
system_prompt = (
"You are a professional QA answer assistant.\n"
"The user will provide a JSON object containing a question.\n"
"Your job:\n"
"1. Read the 'title' and the 'chooies'.\n"
"2. Determine the correct answer(s).\n"
"3. Output ONLY the 'num' of the correct choices.\n"
"4. If multiple answers exist, join them with English commas, e.g., 'A,C,D'.\n"
"5. If you cannot determine the answer confidently, return an empty string.\n"
"6. Do not output explanations or any extra text."
)
def __init__(self) -> None:
# 默认从环境变量读取,也可以手动填
self.key = os.getenv(
"KIMI_API_KEY", os.getenv("KIMI_API_KEY")
)
if not self.key:
raise RuntimeError("找不到 KIMI_API_KEY请设置环境变量后再运行")
httpx_client = Client(
# proxy=Proxy("socks5://127.0.0.1:9000"), # 传入SOCKS代理地址
timeout=30.0, # 可选:设置超时时间
# verify=False,
)
self.client = OpenAI(
api_key=self.key,
base_url="https://api.moonshot.cn/v1", # 已去掉多余空格
http_client=httpx_client,
)
def chat(self, question_json: dict) -> str:
resp = self.client.chat.completions.create(
model="kimi-k2-turbo-preview",
messages=[
{"role": "system", "content": self.system_prompt},
{
"role": "user",
"content": json.dumps(question_json, ensure_ascii=False),
},
],
temperature=0,
)
return resp.choices[0].message.content or ""
# 本地单条测试
if __name__ == "__main__":
question = {
"id": "25594265",
"index": "1",
"type": "单选",
"title": "下列哪个属于AI视觉识别应用",
"chooies": [
{"num": "A", "txt": "音乐合成"},
{"num": "B", "txt": "植物识别"},
{"num": "C", "txt": "文案创作"},
{"num": "D", "txt": "机器翻译"},
],
}
print(KIMI().chat(question)) # 期望输出B

430
main.py
View File

@@ -3,7 +3,7 @@
文件名称: ckwk.py 文件名称: ckwk.py
作者: zhilv 作者: zhilv
邮箱: zhilv666@qq.com 邮箱: zhilv666@qq.com
版本: 1.1 版本: 1.3
-------------------------------------------- --------------------------------------------
说明: 说明:
本脚本仅用于学习与技术研究,禁止将其用于任何非法用途。 本脚本仅用于学习与技术研究,禁止将其用于任何非法用途。
@@ -11,23 +11,28 @@
-------------------------------------------- --------------------------------------------
""" """
import time
import os
import requests
import re
import json import json
import os
import random import random
import ddddocr import re
from datetime import datetime import time
from dotenv import load_dotenv
from lxml import etree # type: ignore
from threading import Thread
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import warnings import warnings
from datetime import datetime
from threading import Thread
import ddddocr
# 修复 ANTIALIAS 错误 - 添加猴子补丁 # 修复 ANTIALIAS 错误 - 添加猴子补丁
import PIL.Image import PIL.Image
import requests
from dotenv import load_dotenv
from lxml import etree # type: ignore
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from deepseek import DeepSeek
from doubao import DOUBAO
from kimi import KIMI
if not hasattr(PIL.Image, "ANTIALIAS"): if not hasattr(PIL.Image, "ANTIALIAS"):
PIL.Image.ANTIALIAS = PIL.Image.Resampling.LANCZOS # type: ignore PIL.Image.ANTIALIAS = PIL.Image.Resampling.LANCZOS # type: ignore
@@ -79,7 +84,22 @@ def replace_html_entities(text, replace_mode="keep_char"):
class CKWK: class CKWK:
def __init__(self, un: str, pw: str, host: str) -> None: def __init__(self, un: str, pw: str, host: str, ai_type="deepseek") -> None:
"""
un: 用户名
pw: 密码
host: 网站域名
ai_type: ai类别取值为 'doubao' 或者 'deepseek'
"""
if ai_type == "doubao":
self.ai = DOUBAO()
elif ai_type == "deepseek":
self.ai = DeepSeek()
elif ai_type == "kimi":
self.ai = KIMI()
else:
self.ai = DOUBAO()
self.session = requests.Session() self.session = requests.Session()
# 定义重试策略 # 定义重试策略
retry_strategy = Retry( retry_strategy = Retry(
@@ -105,6 +125,9 @@ class CKWK:
self.password = pw self.password = pw
self.user = dict() self.user = dict()
self.courses = [] self.courses = []
self.works = []
self.exams = []
self.qas = []
self.ocr = ddddocr.DdddOcr() self.ocr = ddddocr.DdddOcr()
self.session.headers.update( self.session.headers.update(
{ {
@@ -215,6 +238,197 @@ class CKWK:
finally: finally:
return video, state return video, state
def parse_q_a_work(self, html: str) -> None:
tree = etree.HTML(html)
qas = tree.xpath('//div[@class="topic-item"]/form')
for qa in qas:
items = {
"id": qa.xpath(
'./div[@class="courseexamcon-submit"]/input[@name="answerId"]/@value'
)[0],
"index": qa.xpath(
'./div[@class="courseexamcon-main"]/div/div[@class="num"]/span/text()'
)[0],
"type": qa.xpath(
'./div[@class="courseexamcon-main"]/div/div[@class="type"]/text()'
)[0],
"title": qa.xpath(
'./div[@class="courseexamcon-main"]/div[@class="name"]/text()'
)[0].strip(),
"chooies": [
{
"num": a.xpath('.//span[@class="num"]/text()')[0],
"txt": a.xpath('.//span[@class="txt"]/text()')[0],
}
for a in qa.xpath(
'./div[@class="courseexamcon-main"]/div[@class="list"]//li'
)
],
}
self.qas.append(items)
def parse_q_a_exam(self, html: str) -> None:
tree = etree.HTML(html)
qas = tree.xpath("//div/form")
for qa in qas:
items = {
"id": qa.xpath(
'./div[@class="courseexamcon-submit"]/input[@name="answerId"]/@value'
)[0],
"index": qa.xpath(
'./div[@class="courseexamcon-main"]/div/div[@class="num"]/span/text()'
)[0],
"type": qa.xpath(
'./div[@class="courseexamcon-main"]/div/div[@class="type"]/text()'
)[0],
"title": qa.xpath(
'./div[@class="courseexamcon-main"]/div[@class="name"]/text()'
)[0].strip(),
"chooies": [
{
"num": a.xpath('.//span[@class="num"]/text()')[0],
"txt": a.xpath('.//span[@class="txt"]/text()')[0],
}
for a in qa.xpath(
'./div[@class="courseexamcon-main"]/div[@class="list"]//li'
)
],
}
self.qas.append(items)
def get_question_answer_work(self, courseID: str, nodeID: str, workID: str):
headers = self.session.headers
headers["x-requested-with"] = "XMLHttpRequest"
resp = self.session.get(
"https://{}/user/work/start".format(self.host),
params={"courseId": courseID, "nodeId": nodeID, "workId": workID},
headers=headers,
)
if resp.status_code == 200 and resp.json().get("status", False):
qa_path = resp.json().get("url", "")
self.start_url = "https://{}{}".format(self.host, qa_path)
self.log(f"开始作业: {resp.json().get('msg', '')} url -> {self.start_url}")
resp2 = self.session.get(self.start_url)
self.qas = []
if resp.status_code == 200:
self.parse_q_a_work(resp2.text)
else:
self.log(f"开始作业: {resp.json().get('msg', '')}")
return False
return True
def get_question_answer_exam(self, courseID: str, nodeID: str, examId: str):
headers = self.session.headers
headers["x-requested-with"] = "XMLHttpRequest"
resp = self.session.get(
"https://{}/user/exam/start".format(self.host),
params={"courseId": courseID, "nodeId": nodeID, "examId": examId},
headers=headers,
)
if resp.status_code == 200 and resp.json().get("status", False):
qa_path = resp.json().get("url", "")
self.start_url = "https://{}{}".format(self.host, qa_path)
self.log(f"开始考试: {resp.json().get('msg', '')} url -> {self.start_url}")
resp2 = self.session.get(self.start_url)
self.qas = []
if resp.status_code == 200:
self.parse_q_a_exam(resp2.text)
else:
self.log(f"开始考试: {resp.json().get('msg', '')}")
return False
return True
def work_submit(
self, answerID: str, workID: str, answer: str, final: bool, index: int
):
headers = self.session.headers
headers["x-requested-with"] = "XMLHttpRequest"
data = {
"answerId": answerID,
"workId": workID,
"finish": "0",
}
if "," in answer:
answers = answer.split(",")
data["answer[]"] = answers
# for a in answers:
# data.setdefault("answer[]", []).append(a)
else:
if answer == "":
data["answer"] = "A"
data["answer"] = answer
resp = self.session.post(
f"https://{self.host}/user/work/submit",
data=data,
headers=headers,
)
if not final:
aa = 1
while aa == 1:
print(f"作业链接: {self.start_url}")
aa = input("现在是最后一道题,请你去检查后输入 0 后自动提交:")
aa = int(aa)
if aa == 0:
data["finish"] = "1"
resp = self.session.post(
f"https://{self.host}/user/work/submit",
data=data,
headers=headers,
)
break
if resp.status_code == 200 and resp.json().get("status", False):
self.log(
f"{index}/{len(self.qas)} 提交答案: {resp.json().get('msg', '')} {answerID} => {answer} {final}"
)
def exam_submit(
self, answerID: str, workID: str, answer: str, final: bool, index: int
):
headers = self.session.headers
headers["x-requested-with"] = "XMLHttpRequest"
data = {
"answerId": answerID,
"examId": workID,
"finish": "0",
}
if "," in answer:
answers = answer.split(",")
data["answer[]"] = answers
# for a in answers:
# data.setdefault("answer[]", []).append(a)
else:
if answer == "":
data["answer"] = "A"
data["answer"] = answer
resp = self.session.post(
f"https://{self.host}/user/exam/submit",
data=data,
headers=headers,
)
if resp.status_code == 200 and resp.json().get("status", False):
self.log(
f"{index}/{len(self.qas)} 提交答案: {resp.json().get('msg', '')} {answerID} => {answer} {final}"
)
if not final:
aa = 1
while aa == 1:
print(f"考试链接: {self.start_url}")
aa = int(input("现在是最后一道,请你去检查后输入 0 后自动提交:"))
if aa == 0:
data["finish"] = "1"
resp = self.session.post(
f"https://{self.host}/user/exam/submit",
data=data,
headers=headers,
)
break
def get_course(self, _id: str) -> tuple[int, int]: def get_course(self, _id: str) -> tuple[int, int]:
resp = self.session.get("https://{}/user/node?nodeId={}".format(self.host, _id)) resp = self.session.get("https://{}/user/node?nodeId={}".format(self.host, _id))
if resp.status_code == 200 and "错误提示" not in resp.text: if resp.status_code == 200 and "错误提示" not in resp.text:
@@ -242,7 +456,7 @@ class CKWK:
) )
if resp.status_code == 200 and resp.json().get("status", False): if resp.status_code == 200 and resp.json().get("status", False):
self.log( self.log(
f'添加评论: {resp.json().get("msg", "")} content -> {replace_html_entities(content)}' f"添加评论: {resp.json().get('msg', '')} content -> {replace_html_entities(content)}"
) )
def delete_reply(self, courseId: str, nodeId: str, replyId: str): def delete_reply(self, courseId: str, nodeId: str, replyId: str):
@@ -254,7 +468,7 @@ class CKWK:
headers=headers, headers=headers,
) )
if resp.status_code == 200 and resp.json().get("status", False): if resp.status_code == 200 and resp.json().get("status", False):
self.log(f'删除评论: {resp.json().get("msg", "")}') self.log(f"删除评论: {resp.json().get('msg', '')}")
def reply(self, courseId: str, nodeId: str): def reply(self, courseId: str, nodeId: str):
headers = self.session.headers headers = self.session.headers
@@ -304,15 +518,46 @@ class CKWK:
}, },
) )
if resp.status_code == 200 and resp.json().get("status", False): if resp.status_code == 200 and resp.json().get("status", False):
self.log(f'{resp.json().get("msg", "")} page -> {page}') self.log(f"{resp.json().get('msg', '')} page -> {page}")
self.courses.extend(resp.json().get("list", [])) self.courses.extend(resp.json().get("list", []))
if page <= resp.json().get("pageInfo", {}).get("pageCount", 0): if page < resp.json().get("pageInfo", {}).get("pageCount", 0):
self.get_study_record(course_id, page + 1) self.get_study_record(course_id, page + 1)
def get_work_record(self, course_id: str, user_id: str, page=1):
resp = self.session.get(
f"https://{self.host}/user/study_record/work.json",
params={
"courseId": course_id,
# "userId": user_id,
"page": page,
"_": str(int(time.time() * 1000)),
},
)
if resp.status_code == 200 and resp.json().get("status", False):
self.log(f"{resp.json().get('msg', '')} page -> {page}")
self.works.extend(resp.json().get("list", []))
if page < resp.json().get("pageInfo", {}).get("pageCount", 0):
self.get_study_record(course_id, page + 1)
def get_exam_record(self, course_id: str, page=1):
resp = self.session.get(
f"https://{self.host}/user/study_record/exam.json",
params={
"courseId": course_id,
"page": page,
"_": str(int(time.time() * 1000)),
},
)
if resp.status_code == 200 and resp.json().get("status", False):
self.log(f"{resp.json().get('msg', '')} page -> {page}")
self.exams.extend(resp.json().get("list", []))
if page < resp.json().get("pageInfo", {}).get("pageCount", 0):
self.get_exam_record(course_id, page + 1)
def log(self, *args, **kwargs) -> None: def log(self, *args, **kwargs) -> None:
exit_flag = False exit_flag = False
time_str = datetime.now().strftime("[%Y-%m-%d %H:%M:%S]") time_str = datetime.now().strftime("[%Y-%m-%d %H:%M:%S]")
prefix = f'[{self.user.get("name", "")}] {time_str}' prefix = f"[{self.user.get('name', '')}] {time_str}"
# 检查 args 中是否有 \r若有则将前缀拼在 \r 之后 # 检查 args 中是否有 \r若有则将前缀拼在 \r 之后
if args and isinstance(args[0], str) and "\r" in args[0]: if args and isinstance(args[0], str) and "\r" in args[0]:
# 将前缀插入到最后一个 \r 后面 # 将前缀插入到最后一个 \r 后面
@@ -323,7 +568,7 @@ class CKWK:
else f"{prefix} {args[0]}" else f"{prefix} {args[0]}"
) )
else: else:
log_content = f'{prefix} {" ".join(map(str, args))}' log_content = f"{prefix} {' '.join(map(str, args))}"
print(log_content, **kwargs) print(log_content, **kwargs)
if exit_flag: if exit_flag:
os._exit(0) os._exit(0)
@@ -349,7 +594,7 @@ class CKWK:
) )
try: try:
if resp.status_code == 200 and resp.json().get("status", False): if resp.status_code == 200 and resp.json().get("status", False):
self.log(f'{resp.json().get("msg", "")} --> 1', end="", flush=True) self.log(f"{resp.json().get('msg', '')} --> 1", end="", flush=True)
self.study_id = resp.json().get("studyId", 0) self.study_id = resp.json().get("studyId", 0)
except: except:
if resp.status_code == 200: if resp.status_code == 200:
@@ -373,7 +618,7 @@ class CKWK:
) )
if resp.status_code == 200 and resp.json().get("status", False): if resp.status_code == 200 and resp.json().get("status", False):
self.log( self.log(
f'\r{" " * 100}\r{resp.json().get("msg", "")} --> {study_time}', f"\r{' ' * 100}\r{resp.json().get('msg', '')} --> {study_time}",
end="", end="",
flush=True, flush=True,
) )
@@ -395,7 +640,7 @@ class CKWK:
) )
if resp.status_code == 200 and resp.json().get("status", False): if resp.status_code == 200 and resp.json().get("status", False):
self.log( self.log(
f'\r{" " * 100}\r{resp.json().get("msg", "")} --> {study_time}', f"\r{' ' * 100}\r{resp.json().get('msg', '')} --> {study_time}",
flush=True, flush=True,
) )
return True return True
@@ -414,23 +659,24 @@ class CKWK:
self.online() self.online()
def run(self) -> None: def run(self) -> None:
print("当前为刷课模式: ")
while self.login(): while self.login():
self.get_user() self.get_user()
thread = Thread(target=self.start_loop, daemon=True) thread = Thread(target=self.start_loop, daemon=True)
thread.start() thread.start()
cs = self.user.get("courses", []) cs = self.user.get("courses", [])
print( print(
f'姓名: {self.user.get("name", "")}\n\t互评: {self.user.get("huping", 0)}\n\t乐学园: {self.user.get("lexueyuan", 0)}\n\t讨论主题: {self.user.get("taolunzhuti", 0)}\n\t学习时长: {self.user.get("study_time", "")}' f"姓名: {self.user.get('name', '')}\n\t互评: {self.user.get('huping', 0)}\n\t乐学园: {self.user.get('lexueyuan', 0)}\n\t讨论主题: {self.user.get('taolunzhuti', 0)}\n\t学习时长: {self.user.get('study_time', '')}"
) )
if len(cs) > 1: if len(cs) > 1:
for i, _c in enumerate(cs): for i, _c in enumerate(cs):
print(f'{i+1}: {_c.get("name", "")} {_c.get("progress", "")}') print(f"{i + 1}: {_c.get('name', '')} {_c.get('progress', '')}")
cs_order = input("请输入刷课顺序(用英文逗号分隔,不需要耍的不用加): ") cs_order = input("请输入刷课顺序(用英文逗号分隔,不需要耍的不用加): ")
cs_list = [cs[int(_c) - 1] for _c in cs_order.split(",")] cs_list = [cs[int(_c) - 1] for _c in cs_order.split(",")]
cs_list = cs cs_list = cs
for c in cs_list: for c in cs_list:
print(f'课程名: {c.get("name", "")}\n\t进度: {c.get("progress", "")}') print(f"课程名: {c.get('name', '')}\n\t进度: {c.get('progress', '')}")
self.log(f'{c.get("name", "")} --> {c.get("id", "")}') self.log(f"{c.get('name', '')} --> {c.get('id', '')}")
self.get_study_record(c.get("id", "")) self.get_study_record(c.get("id", ""))
@@ -440,7 +686,7 @@ class CKWK:
# if isReply: # if isReply:
# self.reply(c.get("id", ""), l.get("id", "")) # self.reply(c.get("id", ""), l.get("id", ""))
self.log( self.log(
f'{l.get("name", "")} --> {l.get("id", "")} --> 已经学习过了' f"{l.get('name', '')} --> {l.get('id', '')} --> 已经学习过了"
) )
continue continue
@@ -451,7 +697,7 @@ class CKWK:
total_time = 0 total_time = 0
video_time, video_state = self.get_course(node_id) video_time, video_state = self.get_course(node_id)
self.log( self.log(
f'{l.get("name", "")} --> {l.get("id", "")} --> {video_time} --> {video_state}' f"{l.get('name', '')} --> {l.get('id', '')} --> {video_time} --> {video_state}"
) )
if video_time <= 0: if video_time <= 0:
@@ -470,12 +716,106 @@ class CKWK:
self.log("所有课程已经结束") self.log("所有课程已经结束")
break break
def work(self):
print("当前为刷作业模式: ")
while self.login():
self.get_user()
thread = Thread(target=self.start_loop, daemon=True)
thread.start()
cs = self.user.get("courses", [])
print(
f"姓名: {self.user.get('name', '')}\n\t互评: {self.user.get('huping', 0)}\n\t乐学园: {self.user.get('lexueyuan', 0)}\n\t讨论主题: {self.user.get('taolunzhuti', 0)}\n\t学习时长: {self.user.get('study_time', '')}"
)
if len(cs) > 1:
for i, _c in enumerate(cs):
print(f"{i + 1}: {_c.get('name', '')} {_c.get('progress', '')}")
cs_order = input("请输入刷课顺序(用英文逗号分隔,不需要耍的不用加): ")
cs_list = [cs[int(_c) - 1] for _c in cs_order.split(",")]
cs_list = cs
for c in cs_list:
self.get_work_record(c.get("id", ""), "")
# print(self.works)
for work in self.works:
self.qas = []
if "已阅" in work.get("state"):
self.log(
f"{work.get('title', '')} 已阅 {work.get('finalScore', '')}"
)
continue
if not self.get_question_answer_work(
c.get("id", ""), work.get("nodeId", ""), work.get("id", "")
):
continue
for q in self.qas:
# print(q)
chooise = self.ai.chat(q)
chooise = chooise if chooise else "C"
self.work_submit(
q.get("id", ""),
work.get("id", ""),
chooise,
int(q.get("index", 1)) < len(self.qas),
q.get("index", 1),
)
time.sleep(2)
# break
break
def exam(self):
while self.login():
self.get_user()
thread = Thread(target=self.start_loop, daemon=True)
thread.start()
cs = self.user.get("courses", [])
print(
f"姓名: {self.user.get('name', '')}\n\t互评: {self.user.get('huping', 0)}\n\t乐学园: {self.user.get('lexueyuan', 0)}\n\t讨论主题: {self.user.get('taolunzhuti', 0)}\n\t学习时长: {self.user.get('study_time', '')}"
)
if len(cs) > 1:
for i, _c in enumerate(cs):
print(f"{i + 1}: {_c.get('name', '')} {_c.get('progress', '')}")
cs_order = input("请输入刷课顺序(用英文逗号分隔,不需要耍的不用加): ")
cs_list = [cs[int(_c) - 1] for _c in cs_order.split(",")]
cs_list = cs
for c in cs_list:
self.get_exam_record(c.get("id", ""))
print(self.exams)
for exam in self.exams:
self.qas = []
if "线下阅" in exam.get("state"):
self.log(
f"{exam.get('title', '')} 已阅 {exam.get('finalScore', '')}"
)
continue
print(exam)
if not self.get_question_answer_exam(
c.get("id", ""), exam.get("nodeId", ""), exam.get("id", "")
):
continue
for q in self.qas:
# print(q)
chooise = self.ai.chat(q)
chooise = chooise if chooise else "A"
self.exam_submit(
q.get("id", ""),
exam.get("id", ""),
chooise,
int(q.get("index", 1)) < len(self.qas),
q.get("index", 1),
)
time.sleep(2)
# break
break
if __name__ == "__main__": if __name__ == "__main__":
host_server = [ host_server = [
["cqcst.yuruixxkj.com", "御瑞科技(选修课)"], # ["cqcst.yuruixxkj.com", "御瑞科技(选修课)"],
["cqcst.zjxkeji.com", "重庆城市科技学院其他1"], # ["cqcst.zjxkeji.com", "重庆城市科技学院其他1"],
["cqcst.yuncanjykeji.com", "重庆城市科技学院实训平台其他2"], # ["cqcst.yuncanjykeji.com", "重庆城市科技学院实训平台其他2"],
["cqcst.leykeji.com", "劳动课程测评考试平台"],
# ["cqcst.zjxkeji.com", "公益课程平台"],
["cqcst.suwankj.com", "在线课程测评考试平台"],
# ["cqcst.yuruixxkj.com", "在线测评考试平台"],
] ]
for i, h in enumerate(host_server): for i, h in enumerate(host_server):
print(f"{i + 1}: {h[0]} {h[1]}") print(f"{i + 1}: {h[0]} {h[1]}")
@@ -486,12 +826,22 @@ if __name__ == "__main__":
u = input("请输入账号: ") u = input("请输入账号: ")
if p == "": if p == "":
p = input("请输入密码: ") p = input("请输入密码: ")
r = input("是否需要评论(不需要[0],需要[1]): ")
if int(r) == 1: # 刷课程评论, 不需要用的时候可以直接注释
isReply = True # r = input("是否需要评论(不需要[0],需要[1]): ")
else: # if int(r) == 1:
isReply = False # isReply = True
print(bool(isReply)) # else:
if u is None and p is None and c in [0, 1, 2]: # isReply = False
os._exit(0) # print(bool(isReply))
CKWK(f"{u}", f"{p}", host_server[c][0]).run() # if u is None and p is None and c in [0, 1, 2]:
# os._exit(0)
# 刷视频,不需要用的时候可以直接注释
# CKWK(f"{u}", f"{p}", host_server[c][0]).run()
# 刷作业,不需要用的时候可以直接注释
# CKWK(f"{u}", f"{p}", host_server[c][0]).work()
# 考试,不需要用的时候可以直接注释
CKWK(f"{u}", f"{p}", host_server[c][0]).exam()

View File

@@ -8,5 +8,6 @@ dependencies = [
"ddddocr>=1.5.6", "ddddocr>=1.5.6",
"dotenv>=0.9.9", "dotenv>=0.9.9",
"lxml>=6.0.2", "lxml>=6.0.2",
"openai>=2.8.1",
"requests>=2.32.5", "requests>=2.32.5",
] ]