Files
2026-03-28 15:43:18 +08:00

479 lines
12 KiB
Go

package store
import (
"context"
"sort"
"sync"
"time"
"filefast/backend/internal/model"
)
type Persistence interface {
PersistDevice(context.Context, model.Device) error
PersistRoom(context.Context, model.Room) error
PersistTransfer(context.Context, model.Transfer) error
PersistFallbackObject(context.Context, model.FallbackObject) error
PersistRuntimeConfig(context.Context, model.RuntimeConfig) error
}
type Snapshot struct {
Devices []model.Device
Rooms []model.Room
Transfers []model.Transfer
FallbackObjects []model.FallbackObject
Runtime *model.RuntimeConfig
}
type MemoryStore struct {
mu sync.RWMutex
devices map[string]model.Device
rooms map[string]model.Room
transfers map[string]model.Transfer
fallbackObjects map[string]model.FallbackObject
adminSessions map[string]model.AdminSession
deviceSessions map[string]model.DeviceSession
runtime model.RuntimeConfig
persistence Persistence
persistTimeout time.Duration
onPersistError func(kind, id string, err error)
}
const (
activeDeviceWindow = 2 * time.Minute
activeTransferWindow = 30 * time.Minute
recentTerminalTransferWind = 24 * time.Hour
)
func NewMemoryStore(runtime model.RuntimeConfig) *MemoryStore {
return &MemoryStore{
devices: make(map[string]model.Device),
rooms: make(map[string]model.Room),
transfers: make(map[string]model.Transfer),
fallbackObjects: make(map[string]model.FallbackObject),
adminSessions: make(map[string]model.AdminSession),
deviceSessions: make(map[string]model.DeviceSession),
runtime: runtime,
}
}
func (s *MemoryStore) UpsertDevice(device model.Device) model.Device {
s.mu.Lock()
s.devices[device.ID] = device
s.mu.Unlock()
s.persistDevice(device)
return device
}
func (s *MemoryStore) GetDevice(id string) (model.Device, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
device, ok := s.devices[id]
return device, ok
}
func (s *MemoryStore) ListDevices() []model.Device {
s.mu.RLock()
defer s.mu.RUnlock()
devices := make([]model.Device, 0, len(s.devices))
for _, device := range s.devices {
devices = append(devices, device)
}
return devices
}
func (s *MemoryStore) UpsertRoom(room model.Room) model.Room {
s.mu.Lock()
s.rooms[room.Code] = room
s.mu.Unlock()
s.persistRoom(room)
return room
}
func (s *MemoryStore) GetRoom(code string) (model.Room, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
room, ok := s.rooms[code]
return room, ok
}
func (s *MemoryStore) UpsertTransfer(transfer model.Transfer) model.Transfer {
s.mu.Lock()
s.transfers[transfer.ID] = transfer
s.mu.Unlock()
s.persistTransfer(transfer)
return transfer
}
func (s *MemoryStore) GetTransfer(id string) (model.Transfer, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
transfer, ok := s.transfers[id]
return transfer, ok
}
func (s *MemoryStore) ListRecentTransfers(limit int) []model.Transfer {
s.mu.RLock()
defer s.mu.RUnlock()
now := time.Now()
transfers := make([]model.Transfer, 0, len(s.transfers))
for _, transfer := range s.transfers {
if !isTransferVisible(transfer, now) {
continue
}
transfers = append(transfers, transfer)
}
sort.Slice(transfers, func(i, j int) bool {
return transfers[i].UpdatedAt.After(transfers[j].UpdatedAt)
})
if limit > 0 && len(transfers) > limit {
return transfers[:limit]
}
return transfers
}
func (s *MemoryStore) ListPendingFallbackDownloads(receiverDeviceID string, limit int) []model.PendingFallbackDownload {
s.mu.RLock()
defer s.mu.RUnlock()
if receiverDeviceID == "" {
return nil
}
now := time.Now()
downloads := make([]model.PendingFallbackDownload, 0, len(s.transfers))
for _, transfer := range s.transfers {
if transfer.ReceiverDeviceID != receiverDeviceID || transfer.ObjectKey == "" || transfer.FinalStatus != model.TransferCompleted {
continue
}
if transfer.ExpiresAt == nil || !transfer.ExpiresAt.After(now) {
continue
}
object, ok := s.fallbackObjects[transfer.ID]
if !ok || object.CleanedAt != nil || object.ExpiresAt.Before(now) || object.CleanupState != "ready" {
continue
}
downloads = append(downloads, model.PendingFallbackDownload{
TransferID: transfer.ID,
Name: transfer.Name,
SizeBytes: transfer.SizeBytes,
CreatedAt: transfer.CreatedAt,
ExpiresAt: object.ExpiresAt,
DownloadPath: "/api/transfers/" + transfer.ID + "/fallback/download",
SenderDeviceID: transfer.SenderDeviceID,
})
}
sort.Slice(downloads, func(i, j int) bool {
return downloads[i].CreatedAt.After(downloads[j].CreatedAt)
})
if limit > 0 && len(downloads) > limit {
return downloads[:limit]
}
return downloads
}
func (s *MemoryStore) SaveFallbackObject(object model.FallbackObject) model.FallbackObject {
s.mu.Lock()
s.fallbackObjects[object.TransferID] = object
s.mu.Unlock()
s.persistFallbackObject(object)
return object
}
func (s *MemoryStore) GetFallbackObject(transferID string) (model.FallbackObject, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
object, ok := s.fallbackObjects[transferID]
return object, ok
}
func (s *MemoryStore) ListExpiredFallbackObjects(now time.Time) []model.FallbackObject {
s.mu.RLock()
defer s.mu.RUnlock()
var objects []model.FallbackObject
for _, object := range s.fallbackObjects {
if object.CleanedAt == nil && !object.ExpiresAt.After(now) {
objects = append(objects, object)
}
}
return objects
}
func (s *MemoryStore) SaveAdminSession(session model.AdminSession) model.AdminSession {
s.mu.Lock()
defer s.mu.Unlock()
s.adminSessions[session.Token] = session
return session
}
func (s *MemoryStore) SaveDeviceSession(session model.DeviceSession) model.DeviceSession {
s.mu.Lock()
defer s.mu.Unlock()
s.deviceSessions[session.DeviceID] = session
return session
}
func (s *MemoryStore) HasAdminSession(token string) bool {
s.mu.RLock()
defer s.mu.RUnlock()
_, ok := s.adminSessions[token]
return ok
}
func (s *MemoryStore) ValidateDeviceSession(deviceID, token string) bool {
s.mu.RLock()
session, ok := s.deviceSessions[deviceID]
s.mu.RUnlock()
if !ok {
return false
}
if session.ExpiresAt.Before(time.Now()) {
s.mu.Lock()
delete(s.deviceSessions, deviceID)
s.mu.Unlock()
return false
}
return session.Token == token
}
func (s *MemoryStore) RuntimeConfig() model.RuntimeConfig {
s.mu.RLock()
defer s.mu.RUnlock()
return s.runtime
}
func (s *MemoryStore) UpdateRuntimeConfig(runtime model.RuntimeConfig) model.RuntimeConfig {
s.mu.Lock()
s.runtime = runtime
s.mu.Unlock()
s.persistRuntimeConfig(runtime)
return runtime
}
func (s *MemoryStore) SnapshotStats() map[string]int {
s.mu.RLock()
defer s.mu.RUnlock()
now := time.Now()
onlineDevices := 0
activeDevices := 0
for _, device := range s.devices {
if isDeviceActive(device, now) {
activeDevices++
}
if device.IsOnline && isDeviceActive(device, now) {
onlineDevices++
}
}
waitingRooms := 0
for _, room := range s.rooms {
if room.Status == model.RoomStatusWaiting && room.ExpiresAt.After(now) {
waitingRooms++
}
}
fallbackPending := 0
for _, object := range s.fallbackObjects {
if object.CleanedAt == nil && object.CleanupState == "ready" {
fallbackPending++
}
}
validTransfers := 0
for _, transfer := range s.transfers {
if isTransferVisible(transfer, now) {
validTransfers++
}
}
return map[string]int{
"devices_total": activeDevices,
"devices_online": onlineDevices,
"rooms_waiting": waitingRooms,
"transfers_total": validTransfers,
"transfers_cumulative": len(s.transfers),
"fallback_pending": fallbackPending,
"admin_sessions": len(s.adminSessions),
}
}
func (s *MemoryStore) SnapshotMinIOStorage() model.MinIOStorageOverview {
s.mu.RLock()
defer s.mu.RUnlock()
now := time.Now()
usedBytes := int64(0)
objectCount := 0
for _, object := range s.fallbackObjects {
if object.CleanedAt != nil || !object.ExpiresAt.After(now) || object.CleanupState != "ready" {
continue
}
usedBytes += object.SizeBytes
objectCount++
}
capacityBytes := s.runtime.MinIOCapacityBytes
if capacityBytes < 0 {
capacityBytes = 0
}
remainingBytes := capacityBytes - usedBytes
if remainingBytes < 0 {
remainingBytes = 0
}
usagePercent := 0
if capacityBytes > 0 {
usagePercent = int((usedBytes * 100) / capacityBytes)
if usagePercent > 100 {
usagePercent = 100
}
}
return model.MinIOStorageOverview{
Enabled: s.runtime.MinIOFallbackEnabled,
UsedBytes: usedBytes,
CapacityBytes: capacityBytes,
RemainingBytes: remainingBytes,
UsagePercent: usagePercent,
ObjectCount: objectCount,
}
}
func isDeviceActive(device model.Device, now time.Time) bool {
if device.LastSeenAt.IsZero() {
return false
}
return device.LastSeenAt.After(now.Add(-activeDeviceWindow))
}
func isTransferVisible(transfer model.Transfer, now time.Time) bool {
if transfer.UpdatedAt.IsZero() {
transfer.UpdatedAt = transfer.CreatedAt
}
if isTerminalTransferStatus(transfer.FinalStatus) {
return transfer.UpdatedAt.After(now.Add(-recentTerminalTransferWind))
}
return transfer.UpdatedAt.After(now.Add(-activeTransferWindow))
}
func isTerminalTransferStatus(status string) bool {
switch status {
case model.TransferCompleted, model.TransferFailed, model.TransferCancelled:
return true
default:
return false
}
}
func (s *MemoryStore) SetPersistence(persistence Persistence, timeout time.Duration, onError func(kind, id string, err error)) {
s.mu.Lock()
defer s.mu.Unlock()
s.persistence = persistence
s.persistTimeout = timeout
s.onPersistError = onError
}
func (s *MemoryStore) LoadSnapshot(snapshot Snapshot) {
s.mu.Lock()
defer s.mu.Unlock()
s.devices = make(map[string]model.Device, len(snapshot.Devices))
for _, device := range snapshot.Devices {
s.devices[device.ID] = device
}
s.rooms = make(map[string]model.Room, len(snapshot.Rooms))
for _, room := range snapshot.Rooms {
s.rooms[room.Code] = room
}
s.transfers = make(map[string]model.Transfer, len(snapshot.Transfers))
for _, transfer := range snapshot.Transfers {
s.transfers[transfer.ID] = transfer
}
s.fallbackObjects = make(map[string]model.FallbackObject, len(snapshot.FallbackObjects))
for _, object := range snapshot.FallbackObjects {
s.fallbackObjects[object.TransferID] = object
}
if snapshot.Runtime != nil {
s.runtime = *snapshot.Runtime
}
}
func (s *MemoryStore) persistDevice(device model.Device) {
s.persist(device.ID, "device", func(ctx context.Context, persistence Persistence) error {
return persistence.PersistDevice(ctx, device)
})
}
func (s *MemoryStore) persistRoom(room model.Room) {
s.persist(room.Code, "room", func(ctx context.Context, persistence Persistence) error {
return persistence.PersistRoom(ctx, room)
})
}
func (s *MemoryStore) persistTransfer(transfer model.Transfer) {
s.persist(transfer.ID, "transfer", func(ctx context.Context, persistence Persistence) error {
return persistence.PersistTransfer(ctx, transfer)
})
}
func (s *MemoryStore) persistFallbackObject(object model.FallbackObject) {
s.persist(object.TransferID, "fallback_object", func(ctx context.Context, persistence Persistence) error {
return persistence.PersistFallbackObject(ctx, object)
})
}
func (s *MemoryStore) persistRuntimeConfig(runtime model.RuntimeConfig) {
s.persist("transfer_policy", "runtime_config", func(ctx context.Context, persistence Persistence) error {
return persistence.PersistRuntimeConfig(ctx, runtime)
})
}
func (s *MemoryStore) persist(id, kind string, fn func(context.Context, Persistence) error) {
s.mu.RLock()
persistence := s.persistence
timeout := s.persistTimeout
onError := s.onPersistError
s.mu.RUnlock()
if persistence == nil {
return
}
if timeout <= 0 {
timeout = 5 * time.Second
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
if err := fn(ctx, persistence); err != nil && onError != nil {
onError(kind, id, err)
}
}