Files
gpt-load/internal/services/task_service.go

126 lines
3.0 KiB
Go

package services
import (
"errors"
"sync"
"time"
)
// TaskStatus represents the status of a long-running task.
type TaskStatus struct {
IsRunning bool `json:"is_running"`
GroupName string `json:"group_name,omitempty"`
Processed int `json:"processed,omitempty"`
Total int `json:"total,omitempty"`
TaskID string `json:"task_id,omitempty"`
ExpiresAt time.Time `json:"-"` // Internal field to handle zombie tasks
lastUpdated time.Time
}
// TaskService manages the state of a single, global, long-running task.
type TaskService struct {
mu sync.Mutex
status TaskStatus
resultsCache map[string]any
cacheOrder []string
maxCacheSize int
}
// NewTaskService creates a new TaskService.
func NewTaskService() *TaskService {
return &TaskService{
resultsCache: make(map[string]any),
cacheOrder: make([]string, 0),
maxCacheSize: 100,
}
}
// StartTask attempts to start a new task. It returns an error if a task is already running.
func (s *TaskService) StartTask(taskID, groupName string, total int, timeout time.Duration) (*TaskStatus, error) {
s.mu.Lock()
defer s.mu.Unlock()
// Zombie task check
if s.status.IsRunning && time.Now().After(s.status.ExpiresAt) {
// The previous task is considered a zombie, reset it.
s.status = TaskStatus{}
}
if s.status.IsRunning {
return nil, errors.New("a task is already running")
}
s.status = TaskStatus{
IsRunning: true,
TaskID: taskID,
GroupName: groupName,
Total: total,
Processed: 0,
ExpiresAt: time.Now().Add(timeout),
lastUpdated: time.Now(),
}
return &s.status, nil
}
// GetTaskStatus returns the current status of the task.
func (s *TaskService) GetTaskStatus() *TaskStatus {
s.mu.Lock()
defer s.mu.Unlock()
// Zombie task check
if s.status.IsRunning && time.Now().After(s.status.ExpiresAt) {
s.status = TaskStatus{} // Reset if expired
}
// Return a copy to prevent race conditions on the caller's side
statusCopy := s.status
return &statusCopy
}
// UpdateProgress updates the progress of the current task.
func (s *TaskService) UpdateProgress(processed int) {
s.mu.Lock()
defer s.mu.Unlock()
if !s.status.IsRunning {
return
}
s.status.Processed = processed
s.status.lastUpdated = time.Now()
}
// EndTask marks the current task as finished.
func (s *TaskService) EndTask() {
s.mu.Lock()
defer s.mu.Unlock()
s.status.IsRunning = false
}
// StoreResult stores the result of a finished task.
func (s *TaskService) StoreResult(taskID string, result any) {
s.mu.Lock()
defer s.mu.Unlock()
if _, exists := s.resultsCache[taskID]; !exists {
if len(s.cacheOrder) >= s.maxCacheSize {
oldestTaskID := s.cacheOrder[0]
delete(s.resultsCache, oldestTaskID)
s.cacheOrder = s.cacheOrder[1:]
}
s.cacheOrder = append(s.cacheOrder, taskID)
}
s.resultsCache[taskID] = result
}
// GetResult retrieves the result of a finished task.
func (s *TaskService) GetResult(taskID string) (any, bool) {
s.mu.Lock()
defer s.mu.Unlock()
result, found := s.resultsCache[taskID]
return result, found
}