package ckwk import ( "ckwk/pkg/log" "context" "sync" "time" "github.com/google/uuid" "go.uber.org/zap" ) type SessionManager struct { mu sync.RWMutex sessions map[string]SessionItem userToSession map[string]string } type SessionItem struct { Instance *WK LastValue time.Time cancel context.CancelFunc } func NewSessionManager() *SessionManager { return &SessionManager{ sessions: make(map[string]SessionItem), userToSession: make(map[string]string), } } // removeSession 取消 context、删除双 map 条目、记日志 func (m *SessionManager) removeSession(sessionID string, item SessionItem) { if item.cancel != nil { item.cancel() } userKey := item.Instance.UserKey() delete(m.userToSession, userKey) delete(m.sessions, sessionID) log.Info("删除 Session", zap.String("id", sessionID)) } // Store: 保存 session 并返回 session id func (m *SessionManager) Store(wk *WK) string { m.mu.Lock() defer m.mu.Unlock() userKey := wk.UserKey() if oldID, exists := m.userToSession[userKey]; exists { item := m.sessions[oldID] if item.cancel != nil { item.cancel() } ctx, cancel := context.WithCancel(context.Background()) item.LastValue = time.Now() wk.bindSession(m, oldID) item.Instance = wk item.cancel = cancel m.sessions[oldID] = item log.Info("用户已存在,复用旧 Session", zap.String("id", oldID), zap.String("user", userKey), ) go m.KeepAlive(ctx, oldID) return oldID } sessionID := uuid.New().String() ctx, cancel := context.WithCancel(context.Background()) wk.bindSession(m, sessionID) m.userToSession[userKey] = sessionID m.sessions[sessionID] = SessionItem{ Instance: wk, LastValue: time.Now(), cancel: cancel, } log.Info("创建新 Session", zap.String("id", sessionID)) go m.KeepAlive(ctx, sessionID) return sessionID } // Get: 获取指定 session id 的 wk func (m *SessionManager) Get(sessionID string) (*WK, bool) { m.mu.RLock() item, ok := m.sessions[sessionID] if !ok { m.mu.RUnlock() return nil, false } m.mu.RUnlock() m.mu.Lock() item.LastValue = time.Now() m.sessions[sessionID] = item m.mu.Unlock() return item.Instance, true } func (m *SessionManager) Del(sessionID string) { m.mu.Lock() defer m.mu.Unlock() if item, ok := m.sessions[sessionID]; ok { m.removeSession(sessionID, item) } } func (m *SessionManager) KeepAlive(ctx context.Context, id string) { ticker := time.NewTicker(2 * time.Minute) defer ticker.Stop() log.Info("启动 KeepAlive", zap.String("id", id)) for { select { case <-ctx.Done(): log.Info("KeepAlive 已停止", zap.String("id", id)) return case <-ticker.C: m.mu.RLock() item, ok := m.sessions[id] m.mu.RUnlock() if !ok || item.Instance == nil { log.Info("Session 已不存在,停止 KeepAlive", zap.String("id", id)) return } _, err := item.Instance.Online() if err != nil { log.Error("自动保活请求失败", zap.Error(err)) } } } } func (m *SessionManager) ClearAll() { m.mu.Lock() defer m.mu.Unlock() for sessionID, item := range m.sessions { m.removeSession(sessionID, item) } log.Info("所有 Session 已清空") } func (m *SessionManager) ClearExpired(d time.Duration) { m.mu.Lock() defer m.mu.Unlock() now := time.Now() for sessionID, item := range m.sessions { if now.Sub(item.LastValue) > d { m.removeSession(sessionID, item) } } }