feat: add debug log stream support

This commit is contained in:
2026-04-02 23:27:46 +08:00
parent f1c16e89f0
commit 9ec25b94f1
15 changed files with 624 additions and 15 deletions

2
.gitmodules vendored
View File

@@ -1,4 +1,4 @@
[submodule "web/frontend"]
path = web/frontend
url = https://gitea.kmux.cn/zhilv/wk-frontend
url = https://gitea.kmux.cn/cqcst/wk-frontend
branch = main

View File

@@ -12,16 +12,49 @@
### 拉取代码
```shell
git clone --recurse-submodules https://gitea.kmux.cn/zhilv/wk-backend
git clone --recurse-submodules https://gitea.kmux.cn/cqcst/wk-backend
```
更新已有仓库时,建议带上 submodule 一起同步:
```shell
git pull --recurse-submodules
task fe:sync
```
如果前端仓库 `wk-frontend` 有新提交,需要把主仓库里的 submodule 指针更新到最新:
```shell
task fe:update
git status
```
### 代码构建
**推荐使用 [Taskfile](https://taskfile.dev/) 进行项目构建**
调试模式下可通过环境变量开启本地代理/跳过 SSL 校验:
```shell
CKWK_DEBUG_PROXY=http://127.0.0.1:9000
CKWK_DEBUG_SKIP_SSL_VERIFY=true
```
`release` 模式下会自动忽略这两个调试开关。
调试日志 WS 仅在 `debug` 模式开启,连接地址:
```shell
ws://127.0.0.1:8080/api/debug/ws/logs
```
服务端会保留最近 1000 条内存日志,并持续推送新的入站 HTTP、出站请求和应用日志。
- 支持命令
```
* fe:sync: 同步前端 submodule 🔁
* fe:update: 拉取前端 submodule 最新提交 ⬆️
* build: 构建前端 + 后端 📦
* dev: 同时启动前后端(开发模式)🔥
* rebuild: 清理并重建 🔁
@@ -39,7 +72,7 @@ git clone --recurse-submodules https://gitea.kmux.cn/zhilv/wk-backend
### 项目结构
- 目录
**前端项目地址: [wk-frontend](https://gitea.kmux.cn/zhilv/wk-frontend)**
**前端项目地址: [wk-frontend](https://gitea.kmux.cn/cqcst/wk-frontend)**
```
.
├── Taskfile.yml # taskfile 命令定义

View File

@@ -25,24 +25,41 @@ vars:
-X "ckwk/internal/conf.GitCommit={{.GIT_COMMIT}}"
tasks:
# ======================
# 🔁 Git / Submodule
# ======================
fe:sync:
desc: 同步前端 submodule 🔁
cmds:
- git submodule update --init --recursive {{.FRONTEND_DIR}}
fe:update:
desc: 拉取前端 submodule 最新提交 ⬆️
cmds:
- git submodule update --init --remote --recursive {{.FRONTEND_DIR}}
# ======================
# 🎨 前端
# ======================
fe:install:
desc: 安装前端依赖 📦
deps: [fe:sync]
dir: "{{.FRONTEND_DIR}}"
cmds:
- pnpm install
fe:dev:
desc: 启动前端开发服务器 🚀
deps: [fe:sync]
dir: "{{.FRONTEND_DIR}}"
cmds:
- pnpm dev
fe:build:
desc: 构建前端 🏗️
deps: [fe:sync]
dir: "{{.FRONTEND_DIR}}"
cmds:
- pnpm build

1
go.mod
View File

@@ -5,6 +5,7 @@ go 1.25.0
require (
github.com/antchfx/htmlquery v1.3.6
github.com/gin-gonic/gin v1.12.0
github.com/gorilla/websocket v1.5.3
github.com/google/uuid v1.6.0
go.uber.org/zap v1.27.1
gopkg.in/natefinch/lumberjack.v2 v2.2.1

2
go.sum
View File

@@ -41,6 +41,8 @@ github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y=

View File

@@ -37,11 +37,17 @@ func NewWK(username, password, host string, cookies []*http.Cookie) *WK {
return nil
}
req := request.NewClient(&request.Config{
reqCfg := &request.Config{
UserAgent: "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36 Edg/140.0.0.0",
// Proxy: "http://127.0.0.1:9000",
// VerifySSL: false,
})
VerifySSL: true,
Debug: conf.IsDebugMode(),
}
if conf.IsDebugMode() {
reqCfg.Proxy = conf.DebugProxy
reqCfg.VerifySSL = !conf.DebugSkipSSLVerify
}
req := request.NewClient(reqCfg)
if len(cookies) > 0 {
req.SetCookies(cookies)
}

View File

@@ -37,8 +37,14 @@ func (m *SessionManager) Store(wk *WK) string {
userKey := wk.Host + ":" + wk.Username
if oldID, exists := m.userToSession[userKey]; exists {
item := m.sessions[oldID]
if item.cancel != nil {
item.cancel()
}
ctx, cancel := context.WithCancel(context.Background())
item.LastValue = time.Now()
item.Instance = wk
item.cancel = cancel
m.sessions[oldID] = item
log.Info("用户已存在,复用旧 Session",
@@ -46,6 +52,8 @@ func (m *SessionManager) Store(wk *WK) string {
zap.String("user", userKey),
)
go m.KeepAlive(ctx, oldID)
return oldID
}
@@ -61,18 +69,20 @@ func (m *SessionManager) Store(wk *WK) string {
log.Info("创建新 Session", zap.String("id", sessionID))
go m.KeepAlive(ctx, sessionID, wk)
go m.KeepAlive(ctx, sessionID)
return sessionID
}
// Get: 获取指定 session id 的 wk
func (m *SessionManager) Get(sessionID string) (*WK, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
m.mu.Lock()
defer m.mu.Unlock()
item, ok := m.sessions[sessionID]
if ok {
item.LastValue = time.Now()
m.sessions[sessionID] = item
return item.Instance, true
}
return nil, false
@@ -96,7 +106,7 @@ func (m *SessionManager) Del(sessionID string) {
}
}
func (m *SessionManager) KeepAlive(ctx context.Context, id string, wk *WK) {
func (m *SessionManager) KeepAlive(ctx context.Context, id string) {
ticker := time.NewTicker(2 * time.Minute)
defer ticker.Stop()
@@ -108,7 +118,15 @@ func (m *SessionManager) KeepAlive(ctx context.Context, id string, wk *WK) {
log.Info("KeepAlive 已停止", zap.String("id", id))
return
case <-ticker.C:
_, err := wk.Online()
m.mu.RLock()
item, ok := m.sessions[id]
m.mu.RUnlock()
if !ok || item.Instance == nil {
log.Info("Session 已不存在,停止 KeepAlive", zap.String("id", id))
return
}
_, err := item.Instance.Online()
if err != nil {
log.Error("自动保活请求失败", zap.Error(err))
}

View File

@@ -1,5 +1,10 @@
package conf
import (
"os"
"strings"
)
// 构建信息
var (
Mode string = "debug"
@@ -9,4 +14,32 @@ var (
GitAuthor string = "unknown"
GitEmail string = "unknown"
GitCommit string = "unknown"
DebugProxy string = ""
DebugSkipSSLVerify bool = false
)
func init() {
if !IsDebugMode() {
return
}
if proxy := os.Getenv("CKWK_DEBUG_PROXY"); proxy != "" {
DebugProxy = proxy
}
DebugSkipSSLVerify = parseEnvBool("CKWK_DEBUG_SKIP_SSL_VERIFY")
}
func IsDebugMode() bool {
return !strings.EqualFold(Mode, "release")
}
func parseEnvBool(key string) bool {
value := strings.TrimSpace(os.Getenv(key))
switch strings.ToLower(value) {
case "1", "true", "yes", "on":
return true
default:
return false
}
}

View File

@@ -0,0 +1,65 @@
package handler
import (
"net/http"
"time"
"ckwk/pkg/log"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
)
var debugLogUpgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
func DebugLogWS(ctx *gin.Context) {
conn, err := debugLogUpgrader.Upgrade(ctx.Writer, ctx.Request, nil)
if err != nil {
return
}
defer conn.Close()
subID, ch := log.Subscribe()
defer log.Unsubscribe(subID)
for _, entry := range log.Entries() {
if err := conn.WriteJSON(entry); err != nil {
return
}
}
done := make(chan struct{})
go func() {
defer close(done)
for {
if _, _, err := conn.ReadMessage(); err != nil {
return
}
}
}()
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-done:
return
case entry, ok := <-ch:
if !ok {
return
}
if err := conn.WriteJSON(entry); err != nil {
return
}
case <-ticker.C:
if err := conn.WriteMessage(websocket.PingMessage, []byte("ping")); err != nil {
return
}
}
}
}

View File

@@ -0,0 +1,89 @@
package middleware
import (
"bytes"
"io"
"net/http"
"strings"
"time"
"ckwk/pkg/log"
"github.com/gin-gonic/gin"
"go.uber.org/zap/zapcore"
)
const maxDebugBodySize = 4 * 1024
type debugBodyWriter struct {
gin.ResponseWriter
body bytes.Buffer
}
func (w *debugBodyWriter) Write(data []byte) (int, error) {
w.body.Write(data)
return w.ResponseWriter.Write(data)
}
func (w *debugBodyWriter) WriteString(data string) (int, error) {
w.body.WriteString(data)
return w.ResponseWriter.WriteString(data)
}
func DebugHTTPLog() gin.HandlerFunc {
return func(ctx *gin.Context) {
if isDebugLogRoute(ctx.Request.URL.Path) {
ctx.Next()
return
}
startAt := time.Now()
requestBody := readRequestBody(ctx.Request)
writer := &debugBodyWriter{ResponseWriter: ctx.Writer}
ctx.Writer = writer
ctx.Next()
fields := map[string]any{
"method": ctx.Request.Method,
"path": ctx.Request.URL.Path,
"rawQuery": ctx.Request.URL.RawQuery,
"status": writer.Status(),
"durationMs": time.Since(startAt).Milliseconds(),
"clientIP": ctx.ClientIP(),
"requestHeader": log.SanitizeHeaders(ctx.Request.Header),
"requestBody": truncate(log.SanitizeBody(ctx.ContentType(), requestBody), maxDebugBodySize),
"responseHeader": log.SanitizeHeaders(http.Header(writer.Header().Clone())),
"responseBody": truncate(log.SanitizeBody(writer.Header().Get("Content-Type"), writer.body.String()), maxDebugBodySize),
"responseSize": writer.Size(),
"handler": ctx.HandlerName(),
"abortWithErrors": ctx.Errors.ByType(gin.ErrorTypeAny).String(),
}
log.Capture(zapcore.DebugLevel, "http", "incoming exchange", fields)
}
}
func readRequestBody(r *http.Request) string {
if r == nil || r.Body == nil {
return ""
}
body, err := io.ReadAll(r.Body)
if err != nil {
return ""
}
r.Body = io.NopCloser(bytes.NewBuffer(body))
return string(body)
}
func truncate(value string, limit int) string {
if len(value) <= limit {
return value
}
return value[:limit] + "...(truncated)"
}
func isDebugLogRoute(path string) bool {
return strings.HasPrefix(path, "/api/debug/ws/logs")
}

View File

@@ -48,6 +48,9 @@ func SetupRouter() *gin.Engine {
AllowCredentials: true,
MaxAge: 12 * time.Hour,
}))
if conf.IsDebugMode() {
r.Use(middleware.DebugHTTPLog())
}
wkHandler := handler.NewWKHandler()
sessionMiddleware := middleware.SessionMiddleware(wkHandler.Session)
// schedule.StartCron(wkHandler.Session)
@@ -65,6 +68,12 @@ func SetupRouter() *gin.Engine {
{
api.POST("/login", wkHandler.Login)
api.Any("/version", handler.Version)
if conf.IsDebugMode() {
debug := api.Group("/debug")
{
debug.GET("/ws/logs", handler.DebugLogWS)
}
}
v1 := api.Group("/v1")
{
v1.GET("/host", wkHandler.Host)

285
pkg/log/buffer.go Normal file
View File

@@ -0,0 +1,285 @@
package log
import (
"encoding/json"
"net/http"
"net/url"
"strings"
"sync"
"time"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
const (
DefaultBufferLimit = 1000
)
type Entry struct {
ID int64 `json:"id"`
Time string `json:"time"`
Level string `json:"level"`
Source string `json:"source"`
Message string `json:"message"`
Logger string `json:"logger,omitempty"`
Caller string `json:"caller,omitempty"`
Fields map[string]any `json:"fields,omitempty"`
}
type bufferHub struct {
mu sync.RWMutex
limit int
nextEntryID int64
nextSubID int
entries []Entry
subscribers map[int]chan Entry
}
type memoryCore struct {
level zapcore.LevelEnabler
fields []zap.Field
}
var defaultHub = newBufferHub(DefaultBufferLimit)
func newBufferHub(limit int) *bufferHub {
return &bufferHub{
limit: limit,
entries: make([]Entry, 0, limit),
subscribers: make(map[int]chan Entry),
}
}
func (h *bufferHub) append(entry Entry) Entry {
h.mu.Lock()
h.nextEntryID++
entry.ID = h.nextEntryID
if len(h.entries) >= h.limit {
h.entries = append(h.entries[1:], entry)
} else {
h.entries = append(h.entries, entry)
}
subscribers := make([]chan Entry, 0, len(h.subscribers))
for _, ch := range h.subscribers {
subscribers = append(subscribers, ch)
}
h.mu.Unlock()
for _, ch := range subscribers {
select {
case ch <- entry:
default:
}
}
return entry
}
func (h *bufferHub) snapshot() []Entry {
h.mu.RLock()
defer h.mu.RUnlock()
entries := make([]Entry, len(h.entries))
copy(entries, h.entries)
return entries
}
func (h *bufferHub) subscribe() (int, <-chan Entry) {
h.mu.Lock()
defer h.mu.Unlock()
h.nextSubID++
id := h.nextSubID
ch := make(chan Entry, 256)
h.subscribers[id] = ch
return id, ch
}
func (h *bufferHub) unsubscribe(id int) {
h.mu.Lock()
defer h.mu.Unlock()
ch, ok := h.subscribers[id]
if !ok {
return
}
delete(h.subscribers, id)
close(ch)
}
func Entries() []Entry {
return defaultHub.snapshot()
}
func Subscribe() (int, <-chan Entry) {
return defaultHub.subscribe()
}
func Unsubscribe(id int) {
defaultHub.unsubscribe(id)
}
func Capture(level zapcore.Level, source, message string, fields map[string]any) Entry {
return defaultHub.append(Entry{
Time: time.Now().Format(TimeFormatDateTime),
Level: strings.ToLower(level.String()),
Source: source,
Message: message,
Fields: cloneFields(fields),
})
}
func NewMemoryCore(level zapcore.LevelEnabler) zapcore.Core {
return &memoryCore{level: level}
}
func (c *memoryCore) Enabled(level zapcore.Level) bool {
return c.level.Enabled(level)
}
func (c *memoryCore) With(fields []zap.Field) zapcore.Core {
merged := make([]zap.Field, 0, len(c.fields)+len(fields))
merged = append(merged, c.fields...)
merged = append(merged, fields...)
return &memoryCore{
level: c.level,
fields: merged,
}
}
func (c *memoryCore) Check(entry zapcore.Entry, checked *zapcore.CheckedEntry) *zapcore.CheckedEntry {
if c.Enabled(entry.Level) {
return checked.AddCore(entry, c)
}
return checked
}
func (c *memoryCore) Write(entry zapcore.Entry, fields []zap.Field) error {
combined := make([]zap.Field, 0, len(c.fields)+len(fields))
combined = append(combined, c.fields...)
combined = append(combined, fields...)
defaultHub.append(Entry{
Time: entry.Time.Format(TimeFormatDateTime),
Level: strings.ToLower(entry.Level.String()),
Source: "app",
Message: entry.Message,
Logger: entry.LoggerName,
Caller: entry.Caller.TrimmedPath(),
Fields: fieldsToMap(combined),
})
return nil
}
func (c *memoryCore) Sync() error {
return nil
}
func fieldsToMap(fields []zap.Field) map[string]any {
if len(fields) == 0 {
return nil
}
encoder := zapcore.NewMapObjectEncoder()
for _, field := range fields {
field.AddTo(encoder)
}
if len(encoder.Fields) == 0 {
return nil
}
return cloneFields(encoder.Fields)
}
func cloneFields(fields map[string]any) map[string]any {
if len(fields) == 0 {
return nil
}
cloned := make(map[string]any, len(fields))
for key, value := range fields {
cloned[key] = value
}
return cloned
}
func SanitizeHeaders(headers http.Header) map[string][]string {
if len(headers) == 0 {
return nil
}
sanitized := make(map[string][]string, len(headers))
for key, values := range headers {
copied := append([]string(nil), values...)
if isSensitiveKey(key) {
for i := range copied {
copied[i] = maskValue(copied[i])
}
}
sanitized[key] = copied
}
return sanitized
}
func SanitizeBody(contentType, body string) string {
if body == "" {
return ""
}
switch {
case strings.Contains(contentType, "application/json"):
var payload any
if err := json.Unmarshal([]byte(body), &payload); err == nil {
maskValueRecursive(payload)
if b, err := json.Marshal(payload); err == nil {
return string(b)
}
}
case strings.Contains(contentType, "application/x-www-form-urlencoded"):
values, err := url.ParseQuery(body)
if err == nil {
for key := range values {
if isSensitiveKey(key) {
values.Set(key, maskValue(values.Get(key)))
}
}
return values.Encode()
}
}
return body
}
func maskValueRecursive(value any) {
switch typed := value.(type) {
case map[string]any:
for key, item := range typed {
if isSensitiveKey(key) {
typed[key] = maskValue("")
continue
}
maskValueRecursive(item)
}
case []any:
for _, item := range typed {
maskValueRecursive(item)
}
}
}
func isSensitiveKey(key string) bool {
key = strings.ToLower(strings.TrimSpace(key))
switch key {
case "authorization", "cookie", "set-cookie", "x-session-id", "password", "token", "code", "session_id":
return true
default:
return false
}
}
func maskValue(_ string) string {
return "******"
}

View File

@@ -36,6 +36,7 @@ func init() {
zapcore.AddSync(os.Stdout),
zap.DebugLevel,
)
core = zapcore.NewTee(core, NewMemoryCore(zap.DebugLevel))
logger = zap.New(core, zap.AddCaller(), zap.AddCallerSkip(1))
sugar = logger.Sugar()
@@ -96,7 +97,9 @@ func Init(cfg Config) {
core := zapcore.NewTee(
zapcore.NewCore(encoderJson, writeSyncer, zapLevel),
zapcore.NewCore(encoderConsole, zapcore.AddSync(os.Stdout), zapLevel))
zapcore.NewCore(encoderConsole, zapcore.AddSync(os.Stdout), zapLevel),
NewMemoryCore(zapLevel),
)
logger = zap.New(core, zap.AddCaller(), zap.AddCallerSkip(1))
sugar = logger.Sugar()

View File

@@ -2,9 +2,13 @@ package request
import (
"crypto/tls"
"encoding/json"
"net/http"
"time"
"ckwk/pkg/log"
"go.uber.org/zap/zapcore"
"resty.dev/v3"
)
@@ -16,6 +20,7 @@ var (
const (
DefaultUserAgent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36 Edg/140.0.0.0"
DefaultTimeout = 10 * time.Second
DefaultDebugBody = 4 * 1024
)
type Config struct {
@@ -37,8 +42,17 @@ func DefaultConfg() *Config {
// NewClient 创建一个标准的 Resty 客户端
func NewClient(cfg *Config) *resty.Client {
defaults := DefaultConfg()
if cfg == nil {
cfg = DefaultConfg()
cfg = defaults
} else {
// 合并零值,避免调用方只覆盖部分字段时丢失默认超时和 User-Agent。
if cfg.Timeout <= 0 {
cfg.Timeout = defaults.Timeout
}
if cfg.UserAgent == "" {
cfg.UserAgent = defaults.UserAgent
}
}
client := resty.New()
@@ -53,6 +67,40 @@ func NewClient(cfg *Config) *resty.Client {
if cfg.Proxy != "" {
client.SetProxy(cfg.Proxy)
}
if cfg.Debug {
client.SetDebug(true)
client.SetDebugBodyLimit(DefaultDebugBody)
client.OnDebugLog(func(debugLog *resty.DebugLog) {
fields := map[string]any{
"request": map[string]any{
"host": debugLog.Request.Host,
"uri": debugLog.Request.URI,
"method": debugLog.Request.Method,
"proto": debugLog.Request.Proto,
"header": log.SanitizeHeaders(debugLog.Request.Header),
"attempt": debugLog.Request.Attempt,
"body": log.SanitizeBody(debugLog.Request.Header.Get("Content-Type"), debugLog.Request.Body),
},
"response": map[string]any{
"statusCode": debugLog.Response.StatusCode,
"status": debugLog.Response.Status,
"proto": debugLog.Response.Proto,
"receivedAt": debugLog.Response.ReceivedAt.Format(time.RFC3339Nano),
"durationMs": debugLog.Response.Duration.Milliseconds(),
"size": debugLog.Response.Size,
"header": log.SanitizeHeaders(debugLog.Response.Header),
"body": log.SanitizeBody(debugLog.Response.Header.Get("Content-Type"), debugLog.Response.Body),
},
}
if debugLog.TraceInfo != nil {
if traceJSON, err := json.Marshal(debugLog.TraceInfo); err == nil {
fields["trace"] = json.RawMessage(traceJSON)
}
}
log.Capture(zapcore.DebugLevel, "resty", "outbound exchange", fields)
})
client.SetDebugLogFormatter(nil)
}
return client
}