Files
gpt-load/internal/syncer/cache_syncer.go
2025-07-10 23:20:52 +08:00

163 lines
4.0 KiB
Go

package syncer
import (
"fmt"
"sync"
"time"
"gpt-load/internal/store"
"github.com/sirupsen/logrus"
)
// LoaderFunc defines a generic function signature for loading data from the source of truth (e.g., database).
type LoaderFunc[T any] func() (T, error)
// CacheSyncer is a generic service that manages in-memory caching and cross-instance synchronization.
type CacheSyncer[T any] struct {
mu sync.RWMutex
cache T
loader LoaderFunc[T]
store store.Store
channelName string
logger *logrus.Entry
stopChan chan struct{}
wg sync.WaitGroup
afterReload func(newValue T)
}
// NewCacheSyncer creates and initializes a new CacheSyncer.
func NewCacheSyncer[T any](
loader LoaderFunc[T],
store store.Store,
channelName string,
logger *logrus.Entry,
afterReload func(newValue T),
) (*CacheSyncer[T], error) {
s := &CacheSyncer[T]{
loader: loader,
store: store,
channelName: channelName,
logger: logger,
stopChan: make(chan struct{}),
afterReload: afterReload,
}
if err := s.reload(); err != nil {
return nil, fmt.Errorf("initial load for %s failed: %w", channelName, err)
}
s.wg.Add(1)
go s.listenForUpdates()
return s, nil
}
// Get safely returns the cached data.
func (s *CacheSyncer[T]) Get() T {
s.mu.RLock()
defer s.mu.RUnlock()
return s.cache
}
// Invalidate publishes a notification to all instances to reload their cache.
func (s *CacheSyncer[T]) Invalidate() error {
s.logger.Debug("publishing invalidation notification")
return s.store.Publish(s.channelName, []byte("reload"))
}
// Stop gracefully shuts down the syncer's background goroutine.
func (s *CacheSyncer[T]) Stop() {
s.logger.Debug("stopping cache syncer...")
close(s.stopChan)
s.wg.Wait()
s.logger.Info("cache syncer stopped.")
}
// reload fetches the latest data using the loader function and updates the cache.
func (s *CacheSyncer[T]) reload() error {
s.logger.Debug("reloading cache...")
newData, err := s.loader()
if err != nil {
s.logger.Errorf("failed to reload cache: %v", err)
return err
}
s.mu.Lock()
s.cache = newData
s.mu.Unlock()
s.logger.Info("cache reloaded successfully")
// After successfully reloading and updating the cache, trigger the hook.
if s.afterReload != nil {
s.logger.Debug("triggering afterReload hook")
s.afterReload(newData)
}
return nil
}
// listenForUpdates runs in the background, listening for invalidation messages.
func (s *CacheSyncer[T]) listenForUpdates() {
defer s.wg.Done()
for {
select {
case <-s.stopChan:
s.logger.Info("received stop signal, exiting listener loop.")
return
default:
}
if s.store == nil {
s.logger.Warn("store is not configured, stopping subscription listener.")
return
}
subscription, err := s.store.Subscribe(s.channelName)
if err != nil {
s.logger.Errorf("failed to subscribe, retrying in 5s: %v", err)
select {
case <-time.After(5 * time.Second):
continue
case <-s.stopChan:
return
}
}
s.logger.Debugf("subscribed to channel: %s", s.channelName)
subscriberLoop:
for {
select {
case msg, ok := <-subscription.Channel():
if !ok {
s.logger.Warn("subscription channel closed, attempting to re-subscribe...")
break subscriberLoop
}
s.logger.Debugf("received invalidation notification, payload: %s", string(msg.Payload))
if err := s.reload(); err != nil {
s.logger.Errorf("failed to reload cache after notification: %v", err)
}
case <-s.stopChan:
s.logger.Info("received stop signal, exiting subscriber loop.")
if err := subscription.Close(); err != nil {
s.logger.Errorf("failed to close subscription: %v", err)
}
return
}
}
// Before retrying, ensure the old subscription is closed.
if err := subscription.Close(); err != nil {
s.logger.Errorf("failed to close subscription before retrying: %v", err)
}
// Wait a moment before retrying to avoid tight loops on persistent errors.
select {
case <-time.After(2 * time.Second):
case <-s.stopChan:
return
}
}
}