diff --git a/api.go b/api.go index 0c59031..f42949b 100644 --- a/api.go +++ b/api.go @@ -227,6 +227,56 @@ func (a *Agate) SaveSnapshot(ctx context.Context, name string, parentID string) return snapshot.ID, nil } +// SnapshotAsync creates a new snapshot asynchronously. +// It returns the job ID (which is also the snapshot ID) immediately. +// The actual snapshot creation happens in a background goroutine. +// Use GetSnapshotStatus to check the progress. +func (a *Agate) SnapshotAsync(ctx context.Context, name string, parentID string) (string, error) { + a.options.Logger.Printf("Starting async snapshot creation with name: %s", name) + + // If parentID is not provided, use the current snapshot ID + if parentID == "" { + parentID = a.currentSnapshotID + } + + return a.manager.CreateSnapshotAsync(ctx, a.options.BlobStore.GetActiveDir(), name, parentID, + func() { + // onStart: Lock mutex and close resources + a.mutex.Lock() + if a.options.CloseFunc != nil { + if err := a.options.CloseFunc(); err != nil { + a.options.Logger.Printf("ERROR: failed to close resources before async snapshot: %v", err) + } + } + }, + func(id string, err error) { + // onFinish: Open resources, update state, and unlock mutex + defer a.mutex.Unlock() + + if a.options.OpenFunc != nil { + if err := a.options.OpenFunc(a.options.BlobStore.GetActiveDir()); err != nil { + a.options.Logger.Printf("ERROR: failed to open resources after async snapshot: %v", err) + } + } + + if err == nil { + a.currentSnapshotID = id + if err := a.saveCurrentSnapshotID(); err != nil { + a.options.Logger.Printf("ERROR: failed to save current snapshot ID: %v", err) + } + a.options.Logger.Printf("Async snapshot %s created successfully", id) + } else { + a.options.Logger.Printf("Async snapshot creation failed: %v", err) + } + }, + ) +} + +// GetSnapshotStatus returns the status of an asynchronous snapshot creation job. +func (a *Agate) GetSnapshotStatus(ctx context.Context, jobID string) (*store.SnapshotStatus, error) { + return a.manager.GetSnapshotStatus(ctx, jobID) +} + // RestoreSnapshot extracts a snapshot to the active directory. func (a *Agate) RestoreSnapshot(ctx context.Context, snapshotID string) error { a.mutex.Lock() diff --git a/archive/archive.go b/archive/archive.go index 3a8799d..1da8425 100644 --- a/archive/archive.go +++ b/archive/archive.go @@ -95,6 +95,104 @@ func CreateArchive(sourceDir, targetPath string) error { return nil } +// CreateArchiveWithProgress creates a ZIP archive with progress reporting. +// onProgress is called with the current number of bytes written and the total size. +func CreateArchiveWithProgress(sourceDir, targetPath string, onProgress func(current, total int64)) error { + info, err := os.Stat(sourceDir) + if err != nil { + return fmt.Errorf("failed to stat source directory %s: %w", sourceDir, err) + } + if !info.IsDir() { + return fmt.Errorf("source %s is not a directory", sourceDir) + } + + // Calculate total size + var totalSize int64 + err = filepath.Walk(sourceDir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if !info.IsDir() { + totalSize += info.Size() + } + return nil + }) + if err != nil { + return fmt.Errorf("failed to calculate total size of %s: %w", sourceDir, err) + } + + // Create file for ZIP archive + outFile, err := os.Create(targetPath) + if err != nil { + return fmt.Errorf("failed to create target archive file %s: %w", targetPath, err) + } + defer outFile.Close() + + // Create zip.Writer + zipWriter := zip.NewWriter(outFile) + defer zipWriter.Close() + + var currentSize int64 + + // Recursively walk sourceDir + err = filepath.Walk(sourceDir, func(filePath string, fileInfo os.FileInfo, walkErr error) error { + if walkErr != nil { + return fmt.Errorf("error walking path %s: %w", filePath, walkErr) + } + + // Skip sourceDir itself + if filePath == sourceDir { + return nil + } + + // Create relative path + relativePath := strings.TrimPrefix(filePath, sourceDir+string(filepath.Separator)) + relativePath = filepath.ToSlash(relativePath) + + // Check if directory + if fileInfo.IsDir() { + _, err = zipWriter.Create(relativePath + "/") + if err != nil { + return fmt.Errorf("failed to create directory entry %s in archive: %w", relativePath, err) + } + return nil + } + + // Open file for reading + fileToArchive, err := os.Open(filePath) + if err != nil { + return fmt.Errorf("failed to open file %s for archiving: %w", filePath, err) + } + defer fileToArchive.Close() + + // Create archive entry + zipEntryWriter, err := zipWriter.Create(relativePath) + if err != nil { + return fmt.Errorf("failed to create entry %s in archive: %w", relativePath, err) + } + + // Copy content + n, err := io.Copy(zipEntryWriter, fileToArchive) + if err != nil { + return fmt.Errorf("failed to copy file content %s to archive: %w", filePath, err) + } + + currentSize += n + if onProgress != nil { + onProgress(currentSize, totalSize) + } + + return nil + }) + + if err != nil { + os.Remove(targetPath) + return fmt.Errorf("failed during directory walk for archiving %s: %w", sourceDir, err) + } + + return nil +} + // ListArchiveContents читает ZIP-архив и возвращает информацию о его содержимом. func ListArchiveContents(archivePath string) ([]ArchiveEntryInfo, error) { // Открываем ZIP-архив diff --git a/async_test.go b/async_test.go new file mode 100644 index 0000000..51bef00 --- /dev/null +++ b/async_test.go @@ -0,0 +1,115 @@ +package agate + +import ( + "context" + "os" + "path/filepath" + "testing" + "time" +) + +func TestSnapshotAsync(t *testing.T) { + // Setup temporary work directory + workDir, err := os.MkdirTemp("", "agate_async_test") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(workDir) + + // Initialize Agate + opts := AgateOptions{ + WorkDir: workDir, + Logger: nil, // Disable logging for test + } + + ag, err := New(opts) + if err != nil { + t.Fatalf("Failed to initialize Agate: %v", err) + } + defer ag.Close() + + // Get active directory and create some dummy files + activeDir := ag.GetActiveDir() + if err := os.MkdirAll(activeDir, 0755); err != nil { + t.Fatalf("Failed to create active dir: %v", err) + } + + // Create a large-ish file to ensure it takes some time (though still fast) + dummyFile := filepath.Join(activeDir, "data.bin") + data := make([]byte, 1024*1024) // 1MB + if err := os.WriteFile(dummyFile, data, 0644); err != nil { + t.Fatalf("Failed to create dummy file: %v", err) + } + + // Start async snapshot + ctx := context.Background() + snapshotID, err := ag.SnapshotAsync(ctx, "async-snap", "") + if err != nil { + t.Fatalf("SnapshotAsync failed: %v", err) + } + + if snapshotID == "" { + t.Fatal("SnapshotAsync returned empty ID") + } + + // Check status immediately + status, err := ag.GetSnapshotStatus(ctx, snapshotID) + if err != nil { + t.Fatalf("GetSnapshotStatus failed: %v", err) + } + + // Status should be pending or running + if status.Status != "pending" && status.Status != "running" { + t.Errorf("Initial status should be pending or running, got: %s", status.Status) + } + + // Poll for completion + timeout := time.After(5 * time.Second) + ticker := time.NewTicker(10 * time.Millisecond) + defer ticker.Stop() + + done := false + for !done { + select { + case <-timeout: + t.Fatal("Timeout waiting for snapshot completion") + case <-ticker.C: + status, err := ag.GetSnapshotStatus(ctx, snapshotID) + if err != nil { + t.Fatalf("GetSnapshotStatus failed during polling: %v", err) + } + + if status.Status == "done" { + done = true + if status.Progress != 1.0 { + t.Errorf("Expected progress 1.0, got %f", status.Progress) + } + } else if status.Status == "failed" { + t.Fatalf("Snapshot creation failed: %s", status.Error) + } + } + } + + // Verify snapshot exists + snaps, err := ag.ListSnapshots(ctx) + if err != nil { + t.Fatalf("ListSnapshots failed: %v", err) + } + + found := false + for _, s := range snaps { + if s.ID == snapshotID { + found = true + break + } + } + + if !found { + t.Errorf("Snapshot %s not found in list", snapshotID) + } + + // Verify current snapshot ID is updated + if ag.GetCurrentSnapshotID() != snapshotID { + t.Errorf("Current snapshot ID not updated. Expected %s, got %s", snapshotID, ag.GetCurrentSnapshotID()) + } +} diff --git a/interfaces/snapshot.go b/interfaces/snapshot.go index b665eee..3bb24f4 100644 --- a/interfaces/snapshot.go +++ b/interfaces/snapshot.go @@ -13,6 +13,15 @@ type SnapshotManager interface { // Returns the created Snapshot with its metadata or an error if the process fails. CreateSnapshot(ctx context.Context, sourceDir string, name string, parentID string) (*store.Snapshot, error) + // CreateSnapshotAsync initiates a background process to create a snapshot. + // Returns the job ID (which is also the snapshot ID) or an error if the process couldn't start. + // onStart is called in the background goroutine before the snapshot creation starts. + // onFinish is called in the background goroutine after the snapshot creation finishes (successfully or with error). + CreateSnapshotAsync(ctx context.Context, sourceDir string, name string, parentID string, onStart func(), onFinish func(string, error)) (string, error) + + // GetSnapshotStatus retrieves the status of an asynchronous snapshot creation job. + GetSnapshotStatus(ctx context.Context, jobID string) (*store.SnapshotStatus, error) + // GetSnapshotDetails retrieves detailed metadata for a specific snapshot identified by its unique snapshotID. // Returns a Snapshot object containing metadata GetSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error) diff --git a/manager.go b/manager.go index cf16515..59ae6dd 100644 --- a/manager.go +++ b/manager.go @@ -11,6 +11,7 @@ import ( "os" "path/filepath" "strings" + "sync" "time" "gitea.unprism.ru/KRBL/Agate/models" @@ -26,6 +27,8 @@ type SnapshotManagerData struct { metadataStore store.MetadataStore blobStore store.BlobStore logger *log.Logger + jobs map[string]*store.SnapshotStatus + jobsMutex sync.RWMutex } func CreateSnapshotManager(metadataStore store.MetadataStore, blobStore store.BlobStore, logger *log.Logger) (interfaces.SnapshotManager, error) { @@ -42,6 +45,7 @@ func CreateSnapshotManager(metadataStore store.MetadataStore, blobStore store.Bl metadataStore: metadataStore, blobStore: blobStore, logger: logger, + jobs: make(map[string]*store.SnapshotStatus), }, nil } @@ -71,6 +75,98 @@ func (data *SnapshotManagerData) CreateSnapshot(ctx context.Context, sourceDir s // Generate a unique ID for the snapshot snapshotID := uuid.New().String() + return data.createSnapshotInternal(ctx, sourceDir, name, parentID, snapshotID, nil) +} + +func (data *SnapshotManagerData) CreateSnapshotAsync(ctx context.Context, sourceDir string, name string, parentID string, onStart func(), onFinish func(string, error)) (string, error) { + // Validate source directory + info, err := os.Stat(sourceDir) + if err != nil { + if os.IsNotExist(err) { + return "", models.ErrSourceNotFound + } + return "", fmt.Errorf("failed to access source directory: %w", err) + } + + if !info.IsDir() { + return "", models.ErrSourceNotDirectory + } + + // Check if parent exists if specified + if parentID != "" { + _, err := data.metadataStore.GetSnapshotMetadata(ctx, parentID) + if err != nil { + fmt.Println("failed to check parent snapshot: %w", err) + parentID = "" + } + } + + snapshotID := uuid.New().String() + + data.jobsMutex.Lock() + data.jobs[snapshotID] = &store.SnapshotStatus{ + ID: snapshotID, + Status: "pending", + Progress: 0, + } + data.jobsMutex.Unlock() + + go func() { + if onStart != nil { + onStart() + } + + data.jobsMutex.Lock() + if job, ok := data.jobs[snapshotID]; ok { + job.Status = "running" + } + data.jobsMutex.Unlock() + + _, err := data.createSnapshotInternal(context.Background(), sourceDir, name, parentID, snapshotID, func(current, total int64) { + data.jobsMutex.Lock() + defer data.jobsMutex.Unlock() + if job, ok := data.jobs[snapshotID]; ok { + if total > 0 { + job.Progress = float64(current) / float64(total) + } + } + }) + + data.jobsMutex.Lock() + if job, ok := data.jobs[snapshotID]; ok { + if err != nil { + job.Status = "failed" + job.Error = err.Error() + } else { + job.Status = "done" + job.Progress = 1.0 + } + } + data.jobsMutex.Unlock() + + if onFinish != nil { + onFinish(snapshotID, err) + } + }() + + return snapshotID, nil +} + +func (data *SnapshotManagerData) GetSnapshotStatus(ctx context.Context, jobID string) (*store.SnapshotStatus, error) { + data.jobsMutex.RLock() + defer data.jobsMutex.RUnlock() + + job, ok := data.jobs[jobID] + if !ok { + return nil, models.ErrNotFound + } + + // Return a copy to avoid race conditions + statusCopy := *job + return &statusCopy, nil +} + +func (data *SnapshotManagerData) createSnapshotInternal(ctx context.Context, sourceDir, name, parentID, snapshotID string, onProgress func(current, total int64)) (*store.Snapshot, error) { // Create a temporary file for the archive in the working directory tempFilePath := filepath.Join(data.blobStore.GetBaseDir(), "temp-"+snapshotID+".zip") tempFile, err := os.Create(tempFilePath) @@ -81,7 +177,7 @@ func (data *SnapshotManagerData) CreateSnapshot(ctx context.Context, sourceDir s defer os.Remove(tempFilePath) // Clean up temp file after we're done // Create archive of the source directory - if err := archive.CreateArchive(sourceDir, tempFilePath); err != nil { + if err := archive.CreateArchiveWithProgress(sourceDir, tempFilePath, onProgress); err != nil { return nil, fmt.Errorf("failed to create archive: %w", err) } diff --git a/store/store.go b/store/store.go index e0a322a..d7b5a57 100644 --- a/store/store.go +++ b/store/store.go @@ -37,6 +37,14 @@ type SnapshotInfo struct { CreationTime time.Time // Время создания } +// SnapshotStatus represents the status of an asynchronous snapshot creation process. +type SnapshotStatus struct { + ID string // Unique identifier of the job (usually same as Snapshot ID) + Status string // Current status: "pending", "running", "done", "failed" + Progress float64 // Progress from 0.0 to 1.0 + Error string // Error message if failed +} + // ListOptions provides options for filtering and paginating snapshot lists type ListOptions struct { FilterByName string // Filter snapshots by name (substring match)