feat(reply): 添加自动评论功能,补评论需要将代码中有一段代码解除注释(运行后,记得注释回去)

This commit is contained in:
2025-11-21 14:14:35 +08:00
parent a7014dcaa0
commit 2957e88770

144
main.py
View File

@@ -3,7 +3,7 @@
文件名称: ckwk.py
作者: zhilv
邮箱: zhilv666@qq.com
版本: 1.0
版本: 1.1
--------------------------------------------
说明:
本脚本仅用于学习与技术研究,禁止将其用于任何非法用途。
@@ -37,6 +37,46 @@ warnings.filterwarnings("ignore")
load_dotenv()
isReply = False
def replace_html_entities(text, replace_mode="keep_char"):
"""
替换网页中的 HTML 实体字符(如 “、” 等)
:param text: 原始网页文本
:param replace_mode: 替换模式:
- "keep_char": 替换为对应的普通字符(默认,如 “ → 「)
- "remove": 直接去除实体字符(如 “ → 空字符串)
:return: 处理后的文本
"""
# 常见 HTML 实体映射表(可根据需求扩展)
html_entities = {
# 引号类(用户示例中的核心场景)
"“": "", # 左双引号
"”": "", # 右双引号
"‘": "", # 左单引号
"’": "", # 右单引号
# 其他常见实体(可选保留/扩展)
"&": "&", # 和号
"&lt;": "<", # 小于号
"&gt;": ">", # 大于号
"&nbsp;": " ", # 非换行空格
"&copy;": "©", # 版权符号
"&reg;": "®", # 注册商标符号
}
# 根据模式调整替换目标
target_map = (
html_entities if replace_mode == "keep_char" else {k: "" for k in html_entities}
)
# 批量替换
result = text
for entity, target in target_map.items():
result = result.replace(entity, target)
return result
class CKWK:
def __init__(self, un: str, pw: str, host: str) -> None:
@@ -96,7 +136,10 @@ class CKWK:
)
if "登录成功" in resp.text:
return True
self.log(resp.text)
try:
data = json.loads(re.findall(" =(.*?);", resp.text)[0])
self.log(data.get("msg", ""))
finally:
return False
def parse_user(self, html: str) -> None:
@@ -183,6 +226,62 @@ class CKWK:
self.log(resp.text)
return 0, 1
def add_reply(self, content: str, courseId: str, nodeId: str):
headers = self.session.headers
headers["x-requested-with"] = "XMLHttpRequest"
resp = self.session.post(
f"https://{self.host}/user/node_discuss/add_reply",
data={
"content": replace_html_entities(content),
"images": "",
"files": "",
"courseId": courseId,
"nodeId": nodeId,
},
headers=headers,
)
if resp.status_code == 200 and resp.json().get("status", False):
self.log(
f'添加评论: {resp.json().get("msg", "")} content -> {replace_html_entities(content)}'
)
def delete_reply(self, courseId: str, nodeId: str, replyId: str):
headers = self.session.headers
headers["x-requested-with"] = "XMLHttpRequest"
resp = self.session.get(
f"https://{self.host}/user/node_discuss/delete_reply",
params={"courseId": courseId, "nodeId": nodeId, "replyId": replyId},
headers=headers,
)
if resp.status_code == 200 and resp.json().get("status", False):
self.log(f'删除评论: {resp.json().get("msg", "")}')
def reply(self, courseId: str, nodeId: str):
headers = self.session.headers
headers["x-requested-with"] = "XMLHttpRequest"
resp = self.session.get(
f"https://{self.host}/user/node_discuss/reply",
params={
"courseId": courseId,
"nodeId": nodeId,
"_": str(int(time.time() * 1000)),
},
headers=headers,
)
if (
resp.status_code == 200
and resp.json().get("status", False)
and resp.json().get("pageInfo", {}).get("recordsCount", 0) > 0
):
self.add_reply(
resp.json().get("list", [{}])[0].get("content", ""), courseId, nodeId
)
# self.delete_reply(
# courseId, nodeId, resp.json().get("list", [{}])[0].get("id", "")
# )
# return resp.json().get("list", [{}])[0]
# return {}
def get_course_id(self, _id):
resp = self.session.get(
"https://{}/user/course?courseId={}".format(self.host, _id)
@@ -320,34 +419,33 @@ class CKWK:
thread = Thread(target=self.start_loop, daemon=True)
thread.start()
cs = self.user.get("courses", [])
print(
f'姓名: {self.user.get("name", "")}\n\t互评: {self.user.get("huping", 0)}\n\t乐学园: {self.user.get("lexueyuan", 0)}\n\t讨论主题: {self.user.get("taolunzhuti", 0)}\n\t学习时长: {self.user.get("study_time", "")}'
)
if len(cs) > 1:
for i, _c in enumerate(cs):
print(f"{i+1}: {_c.get("name", "")} {_c.get("progress", "")}")
print(f'{i+1}: {_c.get("name", "")} {_c.get("progress", "")}')
cs_order = input("请输入刷课顺序(用英文逗号分隔,不需要耍的不用加): ")
cs_list = [cs[int(_c) - 1] for _c in cs_order.split(",")]
print(f"{cs[0].get("name", "")} {cs[0].get("progress", "")}")
cs_list = cs
# for c in self.user.get("courses", []):
for c in cs_list:
# if c.get("progress", "0%").split("%")[0] == 100:
# self.log(f"{c.get("name", "")} --> {c.get("id", "")} 已经完成")
# continue
self.log(f"{c.get("name", "")} --> {c.get("id", "")}")
# self.get_course_id(c.get("id", ""))
print(f'课程名: {c.get("name", "")}\n\t进度: {c.get("progress", "")}')
self.log(f'{c.get("name", "")} --> {c.get("id", "")}')
self.get_study_record(c.get("id", ""))
# print(self.courses)
# os._exit(0)
for l in self.courses:
if "已学" in l.get("state", ""):
# 用于将之前已学过的视频进行补评论,请自行解除注释
# if isReply:
# self.reply(c.get("id", ""), l.get("id", ""))
self.log(
f'{l.get("name", "")} --> {l.get("id", "")} --> 已经学习过了'
)
continue
# elif "1" == l.get("lock", "0"):
# self.log(
# f'{l.get("name", "")} --> {l.get("id", "")} --> 未解锁'
# )
# continue
if isReply:
self.reply(c.get("id", ""), l.get("id", ""))
node_id = l.get("id", "")
total_time = 0
@@ -355,11 +453,7 @@ class CKWK:
self.log(
f'{l.get("name", "")} --> {l.get("id", "")} --> {video_time} --> {video_state}'
)
# if video_state == 2:
# self.log(
# f'{l.get("name", "")} --> {l.get("id", "")} --> 已经学习过了'
# )
# continue
if video_time <= 0:
continue
@@ -392,6 +486,12 @@ if __name__ == "__main__":
u = input("请输入账号: ")
if p == "":
p = input("请输入密码: ")
r = input("是否需要评论(不需要[0],需要[1]): ")
if int(r) == 1:
isReply = True
else:
isReply = False
print(bool(isReply))
if u is None and p is None and c in [0, 1, 2]:
os._exit(0)
CKWK(f"{u}", f"{p}", host_server[c][0]).run()