Files
wk-backend/internal/ckwk/session_manager.go
2026-04-03 14:24:29 +08:00

179 lines
3.6 KiB
Go

package ckwk
import (
"ckwk/pkg/log"
"context"
"sync"
"time"
"github.com/google/uuid"
"go.uber.org/zap"
)
type SessionManager struct {
mu sync.RWMutex
sessions map[string]SessionItem
userToSession map[string]string
}
type SessionItem struct {
Instance *WK
LastValue time.Time
cancel context.CancelFunc
}
func NewSessionManager() *SessionManager {
return &SessionManager{
sessions: make(map[string]SessionItem),
userToSession: make(map[string]string),
}
}
// Store: 保存 session 并返回 session id
func (m *SessionManager) Store(wk *WK) string {
m.mu.Lock()
defer m.mu.Unlock()
userKey := wk.Host + ":" + wk.Username
if oldID, exists := m.userToSession[userKey]; exists {
item := m.sessions[oldID]
if item.cancel != nil {
item.cancel()
}
ctx, cancel := context.WithCancel(context.Background())
item.LastValue = time.Now()
wk.bindSession(m, oldID)
item.Instance = wk
item.cancel = cancel
m.sessions[oldID] = item
log.Info("用户已存在,复用旧 Session",
zap.String("id", oldID),
zap.String("user", userKey),
)
go m.KeepAlive(ctx, oldID)
return oldID
}
sessionID := uuid.New().String()
ctx, cancel := context.WithCancel(context.Background())
wk.bindSession(m, sessionID)
m.userToSession[userKey] = sessionID
m.sessions[sessionID] = SessionItem{
Instance: wk,
LastValue: time.Now(),
cancel: cancel,
}
log.Info("创建新 Session", zap.String("id", sessionID))
go m.KeepAlive(ctx, sessionID)
return sessionID
}
// Get: 获取指定 session id 的 wk
func (m *SessionManager) Get(sessionID string) (*WK, bool) {
m.mu.Lock()
defer m.mu.Unlock()
item, ok := m.sessions[sessionID]
if ok {
item.LastValue = time.Now()
m.sessions[sessionID] = item
return item.Instance, true
}
return nil, false
}
func (m *SessionManager) Del(sessionID string) {
m.mu.Lock()
defer m.mu.Unlock()
if item, ok := m.sessions[sessionID]; ok {
userKey := item.Instance.Host + ":" + item.Instance.Username
if item.cancel != nil {
item.cancel()
}
delete(m.userToSession, userKey)
delete(m.sessions, sessionID)
log.Info("删除 Session", zap.String("id", sessionID))
}
}
func (m *SessionManager) KeepAlive(ctx context.Context, id string) {
ticker := time.NewTicker(2 * time.Minute)
defer ticker.Stop()
log.Info("启动 KeepAlive", zap.String("id", id))
for {
select {
case <-ctx.Done():
log.Info("KeepAlive 已停止", zap.String("id", id))
return
case <-ticker.C:
m.mu.RLock()
item, ok := m.sessions[id]
m.mu.RUnlock()
if !ok || item.Instance == nil {
log.Info("Session 已不存在,停止 KeepAlive", zap.String("id", id))
return
}
_, err := item.Instance.Online()
if err != nil {
log.Error("自动保活请求失败", zap.Error(err))
}
}
}
}
func (m *SessionManager) ClearAll() {
m.mu.Lock()
defer m.mu.Unlock()
for sessionID, item := range m.sessions {
// 停止 KeepAlive
if item.cancel != nil {
item.cancel()
}
userKey := item.Instance.Host + ":" + item.Instance.Username
delete(m.userToSession, userKey)
delete(m.sessions, sessionID)
log.Info("清理 Session", zap.String("id", sessionID))
}
log.Info("所有 Session 已清空")
}
func (m *SessionManager) ClearExpired(d time.Duration) {
m.mu.Lock()
defer m.mu.Unlock()
now := time.Now()
for sessionID, item := range m.sessions {
if now.Sub(item.LastValue) > d {
if item.cancel != nil {
item.cancel()
}
userKey := item.Instance.Host + ":" + item.Instance.Username
delete(m.userToSession, userKey)
delete(m.sessions, sessionID)
log.Info("清理过期 Session", zap.String("id", sessionID))
}
}
}