Добавлена возможность асинхронного создания снапшотов

А также обновлены зависимости
This commit is contained in:
2025-11-21 14:46:16 +03:00
parent 6845219e94
commit 058aff4019
8 changed files with 420 additions and 42 deletions

View File

@@ -11,6 +11,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"time"
"gitea.unprism.ru/KRBL/Agate/models"
@@ -26,6 +27,8 @@ type SnapshotManagerData struct {
metadataStore store.MetadataStore
blobStore store.BlobStore
logger *log.Logger
jobs map[string]*store.SnapshotStatus
jobsMutex sync.RWMutex
}
func CreateSnapshotManager(metadataStore store.MetadataStore, blobStore store.BlobStore, logger *log.Logger) (interfaces.SnapshotManager, error) {
@@ -42,6 +45,7 @@ func CreateSnapshotManager(metadataStore store.MetadataStore, blobStore store.Bl
metadataStore: metadataStore,
blobStore: blobStore,
logger: logger,
jobs: make(map[string]*store.SnapshotStatus),
}, nil
}
@@ -71,6 +75,98 @@ func (data *SnapshotManagerData) CreateSnapshot(ctx context.Context, sourceDir s
// Generate a unique ID for the snapshot
snapshotID := uuid.New().String()
return data.createSnapshotInternal(ctx, sourceDir, name, parentID, snapshotID, nil)
}
func (data *SnapshotManagerData) CreateSnapshotAsync(ctx context.Context, sourceDir string, name string, parentID string, onStart func(), onFinish func(string, error)) (string, error) {
// Validate source directory
info, err := os.Stat(sourceDir)
if err != nil {
if os.IsNotExist(err) {
return "", models.ErrSourceNotFound
}
return "", fmt.Errorf("failed to access source directory: %w", err)
}
if !info.IsDir() {
return "", models.ErrSourceNotDirectory
}
// Check if parent exists if specified
if parentID != "" {
_, err := data.metadataStore.GetSnapshotMetadata(ctx, parentID)
if err != nil {
fmt.Println("failed to check parent snapshot: %w", err)
parentID = ""
}
}
snapshotID := uuid.New().String()
data.jobsMutex.Lock()
data.jobs[snapshotID] = &store.SnapshotStatus{
ID: snapshotID,
Status: "pending",
Progress: 0,
}
data.jobsMutex.Unlock()
go func() {
if onStart != nil {
onStart()
}
data.jobsMutex.Lock()
if job, ok := data.jobs[snapshotID]; ok {
job.Status = "running"
}
data.jobsMutex.Unlock()
_, err := data.createSnapshotInternal(context.Background(), sourceDir, name, parentID, snapshotID, func(current, total int64) {
data.jobsMutex.Lock()
defer data.jobsMutex.Unlock()
if job, ok := data.jobs[snapshotID]; ok {
if total > 0 {
job.Progress = float64(current) / float64(total)
}
}
})
data.jobsMutex.Lock()
if job, ok := data.jobs[snapshotID]; ok {
if err != nil {
job.Status = "failed"
job.Error = err.Error()
} else {
job.Status = "done"
job.Progress = 1.0
}
}
data.jobsMutex.Unlock()
if onFinish != nil {
onFinish(snapshotID, err)
}
}()
return snapshotID, nil
}
func (data *SnapshotManagerData) GetSnapshotStatus(ctx context.Context, jobID string) (*store.SnapshotStatus, error) {
data.jobsMutex.RLock()
defer data.jobsMutex.RUnlock()
job, ok := data.jobs[jobID]
if !ok {
return nil, models.ErrNotFound
}
// Return a copy to avoid race conditions
statusCopy := *job
return &statusCopy, nil
}
func (data *SnapshotManagerData) createSnapshotInternal(ctx context.Context, sourceDir, name, parentID, snapshotID string, onProgress func(current, total int64)) (*store.Snapshot, error) {
// Create a temporary file for the archive in the working directory
tempFilePath := filepath.Join(data.blobStore.GetBaseDir(), "temp-"+snapshotID+".zip")
tempFile, err := os.Create(tempFilePath)
@@ -81,7 +177,7 @@ func (data *SnapshotManagerData) CreateSnapshot(ctx context.Context, sourceDir s
defer os.Remove(tempFilePath) // Clean up temp file after we're done
// Create archive of the source directory
if err := archive.CreateArchive(sourceDir, tempFilePath); err != nil {
if err := archive.CreateArchiveWithProgress(sourceDir, tempFilePath, onProgress); err != nil {
return nil, fmt.Errorf("failed to create archive: %w", err)
}