156 lines
3.8 KiB
Go
156 lines
3.8 KiB
Go
package syncer
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"gpt-load/internal/store"
|
|
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
// LoaderFunc defines a generic function signature for loading data from the source of truth (e.g., database).
|
|
type LoaderFunc[T any] func() (T, error)
|
|
|
|
// CacheSyncer is a generic service that manages in-memory caching and cross-instance synchronization.
|
|
type CacheSyncer[T any] struct {
|
|
mu sync.RWMutex
|
|
cache T
|
|
loader LoaderFunc[T]
|
|
store store.Store
|
|
channelName string
|
|
logger *logrus.Entry
|
|
stopChan chan struct{}
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
// NewCacheSyncer creates and initializes a new CacheSyncer.
|
|
func NewCacheSyncer[T any](
|
|
loader LoaderFunc[T],
|
|
store store.Store,
|
|
channelName string,
|
|
logger *logrus.Entry,
|
|
) (*CacheSyncer[T], error) {
|
|
s := &CacheSyncer[T]{
|
|
loader: loader,
|
|
store: store,
|
|
channelName: channelName,
|
|
logger: logger,
|
|
stopChan: make(chan struct{}),
|
|
}
|
|
|
|
if err := s.reload(); err != nil {
|
|
return nil, fmt.Errorf("initial load for %s failed: %w", channelName, err)
|
|
}
|
|
|
|
s.wg.Add(1)
|
|
go s.listenForUpdates()
|
|
|
|
return s, nil
|
|
}
|
|
|
|
// Get safely returns the cached data.
|
|
func (s *CacheSyncer[T]) Get() T {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
return s.cache
|
|
}
|
|
|
|
// Invalidate publishes a notification to all instances to reload their cache.
|
|
func (s *CacheSyncer[T]) Invalidate() error {
|
|
s.logger.Debug("publishing invalidation notification")
|
|
s.reload()
|
|
return s.store.Publish(s.channelName, []byte("reload"))
|
|
}
|
|
|
|
// Stop gracefully shuts down the syncer's background goroutine.
|
|
func (s *CacheSyncer[T]) Stop() {
|
|
s.logger.Debug("stopping cache syncer...")
|
|
close(s.stopChan)
|
|
s.wg.Wait()
|
|
s.logger.Info("cache syncer stopped.")
|
|
}
|
|
|
|
// reload fetches the latest data using the loader function and updates the cache.
|
|
func (s *CacheSyncer[T]) reload() error {
|
|
s.logger.Debug("reloading cache...")
|
|
newData, err := s.loader()
|
|
if err != nil {
|
|
s.logger.Errorf("failed to reload cache: %v", err)
|
|
return err
|
|
}
|
|
|
|
s.mu.Lock()
|
|
s.cache = newData
|
|
s.mu.Unlock()
|
|
|
|
s.logger.Info("cache reloaded successfully")
|
|
return nil
|
|
}
|
|
|
|
// listenForUpdates runs in the background, listening for invalidation messages.
|
|
func (s *CacheSyncer[T]) listenForUpdates() {
|
|
defer s.wg.Done()
|
|
|
|
for {
|
|
select {
|
|
case <-s.stopChan:
|
|
s.logger.Info("received stop signal, exiting listener loop.")
|
|
return
|
|
default:
|
|
}
|
|
|
|
if s.store == nil {
|
|
s.logger.Warn("store is not configured, stopping subscription listener.")
|
|
return
|
|
}
|
|
|
|
subscription, err := s.store.Subscribe(s.channelName)
|
|
if err != nil {
|
|
s.logger.Errorf("failed to subscribe, retrying in 5s: %v", err)
|
|
select {
|
|
case <-time.After(5 * time.Second):
|
|
continue
|
|
case <-s.stopChan:
|
|
return
|
|
}
|
|
}
|
|
|
|
s.logger.Debugf("subscribed to channel: %s", s.channelName)
|
|
|
|
subscriberLoop:
|
|
for {
|
|
select {
|
|
case msg, ok := <-subscription.Channel():
|
|
if !ok {
|
|
s.logger.Warn("subscription channel closed, attempting to re-subscribe...")
|
|
break subscriberLoop // This will lead to closing the current subscription and retrying.
|
|
}
|
|
s.logger.Debugf("received invalidation notification, payload: %s", string(msg.Payload))
|
|
if err := s.reload(); err != nil {
|
|
s.logger.Errorf("failed to reload cache after notification: %v", err)
|
|
}
|
|
case <-s.stopChan:
|
|
s.logger.Info("received stop signal, exiting subscriber loop.")
|
|
if err := subscription.Close(); err != nil {
|
|
s.logger.Errorf("failed to close subscription: %v", err)
|
|
}
|
|
return
|
|
}
|
|
}
|
|
|
|
// Before retrying, ensure the old subscription is closed.
|
|
if err := subscription.Close(); err != nil {
|
|
s.logger.Errorf("failed to close subscription before retrying: %v", err)
|
|
}
|
|
|
|
// Wait a moment before retrying to avoid tight loops on persistent errors.
|
|
select {
|
|
case <-time.After(2 * time.Second):
|
|
case <-s.stopChan:
|
|
return
|
|
}
|
|
}
|
|
}
|