479 lines
12 KiB
Go
479 lines
12 KiB
Go
package store
|
|
|
|
import (
|
|
"context"
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"filefast/backend/internal/model"
|
|
)
|
|
|
|
type Persistence interface {
|
|
PersistDevice(context.Context, model.Device) error
|
|
PersistRoom(context.Context, model.Room) error
|
|
PersistTransfer(context.Context, model.Transfer) error
|
|
PersistFallbackObject(context.Context, model.FallbackObject) error
|
|
PersistRuntimeConfig(context.Context, model.RuntimeConfig) error
|
|
}
|
|
|
|
type Snapshot struct {
|
|
Devices []model.Device
|
|
Rooms []model.Room
|
|
Transfers []model.Transfer
|
|
FallbackObjects []model.FallbackObject
|
|
Runtime *model.RuntimeConfig
|
|
}
|
|
|
|
type MemoryStore struct {
|
|
mu sync.RWMutex
|
|
devices map[string]model.Device
|
|
rooms map[string]model.Room
|
|
transfers map[string]model.Transfer
|
|
fallbackObjects map[string]model.FallbackObject
|
|
adminSessions map[string]model.AdminSession
|
|
deviceSessions map[string]model.DeviceSession
|
|
runtime model.RuntimeConfig
|
|
persistence Persistence
|
|
persistTimeout time.Duration
|
|
onPersistError func(kind, id string, err error)
|
|
}
|
|
|
|
const (
|
|
activeDeviceWindow = 2 * time.Minute
|
|
activeTransferWindow = 30 * time.Minute
|
|
recentTerminalTransferWind = 24 * time.Hour
|
|
)
|
|
|
|
func NewMemoryStore(runtime model.RuntimeConfig) *MemoryStore {
|
|
return &MemoryStore{
|
|
devices: make(map[string]model.Device),
|
|
rooms: make(map[string]model.Room),
|
|
transfers: make(map[string]model.Transfer),
|
|
fallbackObjects: make(map[string]model.FallbackObject),
|
|
adminSessions: make(map[string]model.AdminSession),
|
|
deviceSessions: make(map[string]model.DeviceSession),
|
|
runtime: runtime,
|
|
}
|
|
}
|
|
|
|
func (s *MemoryStore) UpsertDevice(device model.Device) model.Device {
|
|
s.mu.Lock()
|
|
s.devices[device.ID] = device
|
|
s.mu.Unlock()
|
|
s.persistDevice(device)
|
|
return device
|
|
}
|
|
|
|
func (s *MemoryStore) GetDevice(id string) (model.Device, bool) {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
device, ok := s.devices[id]
|
|
return device, ok
|
|
}
|
|
|
|
func (s *MemoryStore) ListDevices() []model.Device {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
devices := make([]model.Device, 0, len(s.devices))
|
|
for _, device := range s.devices {
|
|
devices = append(devices, device)
|
|
}
|
|
|
|
return devices
|
|
}
|
|
|
|
func (s *MemoryStore) UpsertRoom(room model.Room) model.Room {
|
|
s.mu.Lock()
|
|
s.rooms[room.Code] = room
|
|
s.mu.Unlock()
|
|
s.persistRoom(room)
|
|
return room
|
|
}
|
|
|
|
func (s *MemoryStore) GetRoom(code string) (model.Room, bool) {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
room, ok := s.rooms[code]
|
|
return room, ok
|
|
}
|
|
|
|
func (s *MemoryStore) UpsertTransfer(transfer model.Transfer) model.Transfer {
|
|
s.mu.Lock()
|
|
s.transfers[transfer.ID] = transfer
|
|
s.mu.Unlock()
|
|
s.persistTransfer(transfer)
|
|
return transfer
|
|
}
|
|
|
|
func (s *MemoryStore) GetTransfer(id string) (model.Transfer, bool) {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
transfer, ok := s.transfers[id]
|
|
return transfer, ok
|
|
}
|
|
|
|
func (s *MemoryStore) ListRecentTransfers(limit int) []model.Transfer {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
now := time.Now()
|
|
transfers := make([]model.Transfer, 0, len(s.transfers))
|
|
for _, transfer := range s.transfers {
|
|
if !isTransferVisible(transfer, now) {
|
|
continue
|
|
}
|
|
transfers = append(transfers, transfer)
|
|
}
|
|
|
|
sort.Slice(transfers, func(i, j int) bool {
|
|
return transfers[i].UpdatedAt.After(transfers[j].UpdatedAt)
|
|
})
|
|
|
|
if limit > 0 && len(transfers) > limit {
|
|
return transfers[:limit]
|
|
}
|
|
|
|
return transfers
|
|
}
|
|
|
|
func (s *MemoryStore) ListPendingFallbackDownloads(receiverDeviceID string, limit int) []model.PendingFallbackDownload {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
if receiverDeviceID == "" {
|
|
return nil
|
|
}
|
|
|
|
now := time.Now()
|
|
downloads := make([]model.PendingFallbackDownload, 0, len(s.transfers))
|
|
|
|
for _, transfer := range s.transfers {
|
|
if transfer.ReceiverDeviceID != receiverDeviceID || transfer.ObjectKey == "" || transfer.FinalStatus != model.TransferCompleted {
|
|
continue
|
|
}
|
|
if transfer.ExpiresAt == nil || !transfer.ExpiresAt.After(now) {
|
|
continue
|
|
}
|
|
|
|
object, ok := s.fallbackObjects[transfer.ID]
|
|
if !ok || object.CleanedAt != nil || object.ExpiresAt.Before(now) || object.CleanupState != "ready" {
|
|
continue
|
|
}
|
|
|
|
downloads = append(downloads, model.PendingFallbackDownload{
|
|
TransferID: transfer.ID,
|
|
Name: transfer.Name,
|
|
SizeBytes: transfer.SizeBytes,
|
|
CreatedAt: transfer.CreatedAt,
|
|
ExpiresAt: object.ExpiresAt,
|
|
DownloadPath: "/api/transfers/" + transfer.ID + "/fallback/download",
|
|
SenderDeviceID: transfer.SenderDeviceID,
|
|
})
|
|
}
|
|
|
|
sort.Slice(downloads, func(i, j int) bool {
|
|
return downloads[i].CreatedAt.After(downloads[j].CreatedAt)
|
|
})
|
|
|
|
if limit > 0 && len(downloads) > limit {
|
|
return downloads[:limit]
|
|
}
|
|
|
|
return downloads
|
|
}
|
|
|
|
func (s *MemoryStore) SaveFallbackObject(object model.FallbackObject) model.FallbackObject {
|
|
s.mu.Lock()
|
|
s.fallbackObjects[object.TransferID] = object
|
|
s.mu.Unlock()
|
|
s.persistFallbackObject(object)
|
|
return object
|
|
}
|
|
|
|
func (s *MemoryStore) GetFallbackObject(transferID string) (model.FallbackObject, bool) {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
object, ok := s.fallbackObjects[transferID]
|
|
return object, ok
|
|
}
|
|
|
|
func (s *MemoryStore) ListExpiredFallbackObjects(now time.Time) []model.FallbackObject {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
var objects []model.FallbackObject
|
|
for _, object := range s.fallbackObjects {
|
|
if object.CleanedAt == nil && !object.ExpiresAt.After(now) {
|
|
objects = append(objects, object)
|
|
}
|
|
}
|
|
|
|
return objects
|
|
}
|
|
|
|
func (s *MemoryStore) SaveAdminSession(session model.AdminSession) model.AdminSession {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.adminSessions[session.Token] = session
|
|
return session
|
|
}
|
|
|
|
func (s *MemoryStore) SaveDeviceSession(session model.DeviceSession) model.DeviceSession {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
s.deviceSessions[session.DeviceID] = session
|
|
return session
|
|
}
|
|
|
|
func (s *MemoryStore) HasAdminSession(token string) bool {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
_, ok := s.adminSessions[token]
|
|
return ok
|
|
}
|
|
|
|
func (s *MemoryStore) ValidateDeviceSession(deviceID, token string) bool {
|
|
s.mu.RLock()
|
|
session, ok := s.deviceSessions[deviceID]
|
|
s.mu.RUnlock()
|
|
if !ok {
|
|
return false
|
|
}
|
|
if session.ExpiresAt.Before(time.Now()) {
|
|
s.mu.Lock()
|
|
delete(s.deviceSessions, deviceID)
|
|
s.mu.Unlock()
|
|
return false
|
|
}
|
|
return session.Token == token
|
|
}
|
|
|
|
func (s *MemoryStore) RuntimeConfig() model.RuntimeConfig {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.runtime
|
|
}
|
|
|
|
func (s *MemoryStore) UpdateRuntimeConfig(runtime model.RuntimeConfig) model.RuntimeConfig {
|
|
s.mu.Lock()
|
|
s.runtime = runtime
|
|
s.mu.Unlock()
|
|
s.persistRuntimeConfig(runtime)
|
|
return runtime
|
|
}
|
|
|
|
func (s *MemoryStore) SnapshotStats() map[string]int {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
now := time.Now()
|
|
onlineDevices := 0
|
|
activeDevices := 0
|
|
for _, device := range s.devices {
|
|
if isDeviceActive(device, now) {
|
|
activeDevices++
|
|
}
|
|
if device.IsOnline && isDeviceActive(device, now) {
|
|
onlineDevices++
|
|
}
|
|
}
|
|
|
|
waitingRooms := 0
|
|
for _, room := range s.rooms {
|
|
if room.Status == model.RoomStatusWaiting && room.ExpiresAt.After(now) {
|
|
waitingRooms++
|
|
}
|
|
}
|
|
|
|
fallbackPending := 0
|
|
for _, object := range s.fallbackObjects {
|
|
if object.CleanedAt == nil && object.CleanupState == "ready" {
|
|
fallbackPending++
|
|
}
|
|
}
|
|
|
|
validTransfers := 0
|
|
for _, transfer := range s.transfers {
|
|
if isTransferVisible(transfer, now) {
|
|
validTransfers++
|
|
}
|
|
}
|
|
|
|
return map[string]int{
|
|
"devices_total": activeDevices,
|
|
"devices_online": onlineDevices,
|
|
"rooms_waiting": waitingRooms,
|
|
"transfers_total": validTransfers,
|
|
"transfers_cumulative": len(s.transfers),
|
|
"fallback_pending": fallbackPending,
|
|
"admin_sessions": len(s.adminSessions),
|
|
}
|
|
}
|
|
|
|
func (s *MemoryStore) SnapshotMinIOStorage() model.MinIOStorageOverview {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
|
|
now := time.Now()
|
|
usedBytes := int64(0)
|
|
objectCount := 0
|
|
|
|
for _, object := range s.fallbackObjects {
|
|
if object.CleanedAt != nil || !object.ExpiresAt.After(now) || object.CleanupState != "ready" {
|
|
continue
|
|
}
|
|
|
|
usedBytes += object.SizeBytes
|
|
objectCount++
|
|
}
|
|
|
|
capacityBytes := s.runtime.MinIOCapacityBytes
|
|
if capacityBytes < 0 {
|
|
capacityBytes = 0
|
|
}
|
|
|
|
remainingBytes := capacityBytes - usedBytes
|
|
if remainingBytes < 0 {
|
|
remainingBytes = 0
|
|
}
|
|
|
|
usagePercent := 0
|
|
if capacityBytes > 0 {
|
|
usagePercent = int((usedBytes * 100) / capacityBytes)
|
|
if usagePercent > 100 {
|
|
usagePercent = 100
|
|
}
|
|
}
|
|
|
|
return model.MinIOStorageOverview{
|
|
Enabled: s.runtime.MinIOFallbackEnabled,
|
|
UsedBytes: usedBytes,
|
|
CapacityBytes: capacityBytes,
|
|
RemainingBytes: remainingBytes,
|
|
UsagePercent: usagePercent,
|
|
ObjectCount: objectCount,
|
|
}
|
|
}
|
|
|
|
func isDeviceActive(device model.Device, now time.Time) bool {
|
|
if device.LastSeenAt.IsZero() {
|
|
return false
|
|
}
|
|
|
|
return device.LastSeenAt.After(now.Add(-activeDeviceWindow))
|
|
}
|
|
|
|
func isTransferVisible(transfer model.Transfer, now time.Time) bool {
|
|
if transfer.UpdatedAt.IsZero() {
|
|
transfer.UpdatedAt = transfer.CreatedAt
|
|
}
|
|
|
|
if isTerminalTransferStatus(transfer.FinalStatus) {
|
|
return transfer.UpdatedAt.After(now.Add(-recentTerminalTransferWind))
|
|
}
|
|
|
|
return transfer.UpdatedAt.After(now.Add(-activeTransferWindow))
|
|
}
|
|
|
|
func isTerminalTransferStatus(status string) bool {
|
|
switch status {
|
|
case model.TransferCompleted, model.TransferFailed, model.TransferCancelled:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (s *MemoryStore) SetPersistence(persistence Persistence, timeout time.Duration, onError func(kind, id string, err error)) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
s.persistence = persistence
|
|
s.persistTimeout = timeout
|
|
s.onPersistError = onError
|
|
}
|
|
|
|
func (s *MemoryStore) LoadSnapshot(snapshot Snapshot) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
s.devices = make(map[string]model.Device, len(snapshot.Devices))
|
|
for _, device := range snapshot.Devices {
|
|
s.devices[device.ID] = device
|
|
}
|
|
|
|
s.rooms = make(map[string]model.Room, len(snapshot.Rooms))
|
|
for _, room := range snapshot.Rooms {
|
|
s.rooms[room.Code] = room
|
|
}
|
|
|
|
s.transfers = make(map[string]model.Transfer, len(snapshot.Transfers))
|
|
for _, transfer := range snapshot.Transfers {
|
|
s.transfers[transfer.ID] = transfer
|
|
}
|
|
|
|
s.fallbackObjects = make(map[string]model.FallbackObject, len(snapshot.FallbackObjects))
|
|
for _, object := range snapshot.FallbackObjects {
|
|
s.fallbackObjects[object.TransferID] = object
|
|
}
|
|
|
|
if snapshot.Runtime != nil {
|
|
s.runtime = *snapshot.Runtime
|
|
}
|
|
}
|
|
|
|
func (s *MemoryStore) persistDevice(device model.Device) {
|
|
s.persist(device.ID, "device", func(ctx context.Context, persistence Persistence) error {
|
|
return persistence.PersistDevice(ctx, device)
|
|
})
|
|
}
|
|
|
|
func (s *MemoryStore) persistRoom(room model.Room) {
|
|
s.persist(room.Code, "room", func(ctx context.Context, persistence Persistence) error {
|
|
return persistence.PersistRoom(ctx, room)
|
|
})
|
|
}
|
|
|
|
func (s *MemoryStore) persistTransfer(transfer model.Transfer) {
|
|
s.persist(transfer.ID, "transfer", func(ctx context.Context, persistence Persistence) error {
|
|
return persistence.PersistTransfer(ctx, transfer)
|
|
})
|
|
}
|
|
|
|
func (s *MemoryStore) persistFallbackObject(object model.FallbackObject) {
|
|
s.persist(object.TransferID, "fallback_object", func(ctx context.Context, persistence Persistence) error {
|
|
return persistence.PersistFallbackObject(ctx, object)
|
|
})
|
|
}
|
|
|
|
func (s *MemoryStore) persistRuntimeConfig(runtime model.RuntimeConfig) {
|
|
s.persist("transfer_policy", "runtime_config", func(ctx context.Context, persistence Persistence) error {
|
|
return persistence.PersistRuntimeConfig(ctx, runtime)
|
|
})
|
|
}
|
|
|
|
func (s *MemoryStore) persist(id, kind string, fn func(context.Context, Persistence) error) {
|
|
s.mu.RLock()
|
|
persistence := s.persistence
|
|
timeout := s.persistTimeout
|
|
onError := s.onPersistError
|
|
s.mu.RUnlock()
|
|
|
|
if persistence == nil {
|
|
return
|
|
}
|
|
|
|
if timeout <= 0 {
|
|
timeout = 5 * time.Second
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
defer cancel()
|
|
|
|
if err := fn(ctx, persistence); err != nil && onError != nil {
|
|
onError(kind, id, err)
|
|
}
|
|
}
|