Merge remote-tracking branch 'origin/alpha' into alpha
# Conflicts: # go.mod # go.sum
This commit is contained in:
50
api.go
50
api.go
@@ -227,6 +227,56 @@ func (a *Agate) SaveSnapshot(ctx context.Context, name string, parentID string)
|
||||
return snapshot.ID, nil
|
||||
}
|
||||
|
||||
// SnapshotAsync creates a new snapshot asynchronously.
|
||||
// It returns the job ID (which is also the snapshot ID) immediately.
|
||||
// The actual snapshot creation happens in a background goroutine.
|
||||
// Use GetSnapshotStatus to check the progress.
|
||||
func (a *Agate) SnapshotAsync(ctx context.Context, name string, parentID string) (string, error) {
|
||||
a.options.Logger.Printf("Starting async snapshot creation with name: %s", name)
|
||||
|
||||
// If parentID is not provided, use the current snapshot ID
|
||||
if parentID == "" {
|
||||
parentID = a.currentSnapshotID
|
||||
}
|
||||
|
||||
return a.manager.CreateSnapshotAsync(ctx, a.options.BlobStore.GetActiveDir(), name, parentID,
|
||||
func() {
|
||||
// onStart: Lock mutex and close resources
|
||||
a.mutex.Lock()
|
||||
if a.options.CloseFunc != nil {
|
||||
if err := a.options.CloseFunc(); err != nil {
|
||||
a.options.Logger.Printf("ERROR: failed to close resources before async snapshot: %v", err)
|
||||
}
|
||||
}
|
||||
},
|
||||
func(id string, err error) {
|
||||
// onFinish: Open resources, update state, and unlock mutex
|
||||
defer a.mutex.Unlock()
|
||||
|
||||
if a.options.OpenFunc != nil {
|
||||
if err := a.options.OpenFunc(a.options.BlobStore.GetActiveDir()); err != nil {
|
||||
a.options.Logger.Printf("ERROR: failed to open resources after async snapshot: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
a.currentSnapshotID = id
|
||||
if err := a.saveCurrentSnapshotID(); err != nil {
|
||||
a.options.Logger.Printf("ERROR: failed to save current snapshot ID: %v", err)
|
||||
}
|
||||
a.options.Logger.Printf("Async snapshot %s created successfully", id)
|
||||
} else {
|
||||
a.options.Logger.Printf("Async snapshot creation failed: %v", err)
|
||||
}
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// GetSnapshotStatus returns the status of an asynchronous snapshot creation job.
|
||||
func (a *Agate) GetSnapshotStatus(ctx context.Context, jobID string) (*store.SnapshotStatus, error) {
|
||||
return a.manager.GetSnapshotStatus(ctx, jobID)
|
||||
}
|
||||
|
||||
// RestoreSnapshot extracts a snapshot to the active directory.
|
||||
func (a *Agate) RestoreSnapshot(ctx context.Context, snapshotID string) error {
|
||||
a.mutex.Lock()
|
||||
|
||||
@@ -95,6 +95,104 @@ func CreateArchive(sourceDir, targetPath string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// CreateArchiveWithProgress creates a ZIP archive with progress reporting.
|
||||
// onProgress is called with the current number of bytes written and the total size.
|
||||
func CreateArchiveWithProgress(sourceDir, targetPath string, onProgress func(current, total int64)) error {
|
||||
info, err := os.Stat(sourceDir)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to stat source directory %s: %w", sourceDir, err)
|
||||
}
|
||||
if !info.IsDir() {
|
||||
return fmt.Errorf("source %s is not a directory", sourceDir)
|
||||
}
|
||||
|
||||
// Calculate total size
|
||||
var totalSize int64
|
||||
err = filepath.Walk(sourceDir, func(path string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !info.IsDir() {
|
||||
totalSize += info.Size()
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to calculate total size of %s: %w", sourceDir, err)
|
||||
}
|
||||
|
||||
// Create file for ZIP archive
|
||||
outFile, err := os.Create(targetPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create target archive file %s: %w", targetPath, err)
|
||||
}
|
||||
defer outFile.Close()
|
||||
|
||||
// Create zip.Writer
|
||||
zipWriter := zip.NewWriter(outFile)
|
||||
defer zipWriter.Close()
|
||||
|
||||
var currentSize int64
|
||||
|
||||
// Recursively walk sourceDir
|
||||
err = filepath.Walk(sourceDir, func(filePath string, fileInfo os.FileInfo, walkErr error) error {
|
||||
if walkErr != nil {
|
||||
return fmt.Errorf("error walking path %s: %w", filePath, walkErr)
|
||||
}
|
||||
|
||||
// Skip sourceDir itself
|
||||
if filePath == sourceDir {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Create relative path
|
||||
relativePath := strings.TrimPrefix(filePath, sourceDir+string(filepath.Separator))
|
||||
relativePath = filepath.ToSlash(relativePath)
|
||||
|
||||
// Check if directory
|
||||
if fileInfo.IsDir() {
|
||||
_, err = zipWriter.Create(relativePath + "/")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create directory entry %s in archive: %w", relativePath, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Open file for reading
|
||||
fileToArchive, err := os.Open(filePath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open file %s for archiving: %w", filePath, err)
|
||||
}
|
||||
defer fileToArchive.Close()
|
||||
|
||||
// Create archive entry
|
||||
zipEntryWriter, err := zipWriter.Create(relativePath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create entry %s in archive: %w", relativePath, err)
|
||||
}
|
||||
|
||||
// Copy content
|
||||
n, err := io.Copy(zipEntryWriter, fileToArchive)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to copy file content %s to archive: %w", filePath, err)
|
||||
}
|
||||
|
||||
currentSize += n
|
||||
if onProgress != nil {
|
||||
onProgress(currentSize, totalSize)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
os.Remove(targetPath)
|
||||
return fmt.Errorf("failed during directory walk for archiving %s: %w", sourceDir, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListArchiveContents читает ZIP-архив и возвращает информацию о его содержимом.
|
||||
func ListArchiveContents(archivePath string) ([]ArchiveEntryInfo, error) {
|
||||
// Открываем ZIP-архив
|
||||
|
||||
115
async_test.go
Normal file
115
async_test.go
Normal file
@@ -0,0 +1,115 @@
|
||||
package agate
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestSnapshotAsync(t *testing.T) {
|
||||
// Setup temporary work directory
|
||||
workDir, err := os.MkdirTemp("", "agate_async_test")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.RemoveAll(workDir)
|
||||
|
||||
// Initialize Agate
|
||||
opts := AgateOptions{
|
||||
WorkDir: workDir,
|
||||
Logger: nil, // Disable logging for test
|
||||
}
|
||||
|
||||
ag, err := New(opts)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to initialize Agate: %v", err)
|
||||
}
|
||||
defer ag.Close()
|
||||
|
||||
// Get active directory and create some dummy files
|
||||
activeDir := ag.GetActiveDir()
|
||||
if err := os.MkdirAll(activeDir, 0755); err != nil {
|
||||
t.Fatalf("Failed to create active dir: %v", err)
|
||||
}
|
||||
|
||||
// Create a large-ish file to ensure it takes some time (though still fast)
|
||||
dummyFile := filepath.Join(activeDir, "data.bin")
|
||||
data := make([]byte, 1024*1024) // 1MB
|
||||
if err := os.WriteFile(dummyFile, data, 0644); err != nil {
|
||||
t.Fatalf("Failed to create dummy file: %v", err)
|
||||
}
|
||||
|
||||
// Start async snapshot
|
||||
ctx := context.Background()
|
||||
snapshotID, err := ag.SnapshotAsync(ctx, "async-snap", "")
|
||||
if err != nil {
|
||||
t.Fatalf("SnapshotAsync failed: %v", err)
|
||||
}
|
||||
|
||||
if snapshotID == "" {
|
||||
t.Fatal("SnapshotAsync returned empty ID")
|
||||
}
|
||||
|
||||
// Check status immediately
|
||||
status, err := ag.GetSnapshotStatus(ctx, snapshotID)
|
||||
if err != nil {
|
||||
t.Fatalf("GetSnapshotStatus failed: %v", err)
|
||||
}
|
||||
|
||||
// Status should be pending or running
|
||||
if status.Status != "pending" && status.Status != "running" {
|
||||
t.Errorf("Initial status should be pending or running, got: %s", status.Status)
|
||||
}
|
||||
|
||||
// Poll for completion
|
||||
timeout := time.After(5 * time.Second)
|
||||
ticker := time.NewTicker(10 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
|
||||
done := false
|
||||
for !done {
|
||||
select {
|
||||
case <-timeout:
|
||||
t.Fatal("Timeout waiting for snapshot completion")
|
||||
case <-ticker.C:
|
||||
status, err := ag.GetSnapshotStatus(ctx, snapshotID)
|
||||
if err != nil {
|
||||
t.Fatalf("GetSnapshotStatus failed during polling: %v", err)
|
||||
}
|
||||
|
||||
if status.Status == "done" {
|
||||
done = true
|
||||
if status.Progress != 1.0 {
|
||||
t.Errorf("Expected progress 1.0, got %f", status.Progress)
|
||||
}
|
||||
} else if status.Status == "failed" {
|
||||
t.Fatalf("Snapshot creation failed: %s", status.Error)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Verify snapshot exists
|
||||
snaps, err := ag.ListSnapshots(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("ListSnapshots failed: %v", err)
|
||||
}
|
||||
|
||||
found := false
|
||||
for _, s := range snaps {
|
||||
if s.ID == snapshotID {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !found {
|
||||
t.Errorf("Snapshot %s not found in list", snapshotID)
|
||||
}
|
||||
|
||||
// Verify current snapshot ID is updated
|
||||
if ag.GetCurrentSnapshotID() != snapshotID {
|
||||
t.Errorf("Current snapshot ID not updated. Expected %s, got %s", snapshotID, ag.GetCurrentSnapshotID())
|
||||
}
|
||||
}
|
||||
@@ -13,6 +13,15 @@ type SnapshotManager interface {
|
||||
// Returns the created Snapshot with its metadata or an error if the process fails.
|
||||
CreateSnapshot(ctx context.Context, sourceDir string, name string, parentID string) (*store.Snapshot, error)
|
||||
|
||||
// CreateSnapshotAsync initiates a background process to create a snapshot.
|
||||
// Returns the job ID (which is also the snapshot ID) or an error if the process couldn't start.
|
||||
// onStart is called in the background goroutine before the snapshot creation starts.
|
||||
// onFinish is called in the background goroutine after the snapshot creation finishes (successfully or with error).
|
||||
CreateSnapshotAsync(ctx context.Context, sourceDir string, name string, parentID string, onStart func(), onFinish func(string, error)) (string, error)
|
||||
|
||||
// GetSnapshotStatus retrieves the status of an asynchronous snapshot creation job.
|
||||
GetSnapshotStatus(ctx context.Context, jobID string) (*store.SnapshotStatus, error)
|
||||
|
||||
// GetSnapshotDetails retrieves detailed metadata for a specific snapshot identified by its unique snapshotID.
|
||||
// Returns a Snapshot object containing metadata
|
||||
GetSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error)
|
||||
|
||||
98
manager.go
98
manager.go
@@ -11,6 +11,7 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"gitea.unprism.ru/KRBL/Agate/models"
|
||||
@@ -26,6 +27,8 @@ type SnapshotManagerData struct {
|
||||
metadataStore store.MetadataStore
|
||||
blobStore store.BlobStore
|
||||
logger *log.Logger
|
||||
jobs map[string]*store.SnapshotStatus
|
||||
jobsMutex sync.RWMutex
|
||||
}
|
||||
|
||||
func CreateSnapshotManager(metadataStore store.MetadataStore, blobStore store.BlobStore, logger *log.Logger) (interfaces.SnapshotManager, error) {
|
||||
@@ -42,6 +45,7 @@ func CreateSnapshotManager(metadataStore store.MetadataStore, blobStore store.Bl
|
||||
metadataStore: metadataStore,
|
||||
blobStore: blobStore,
|
||||
logger: logger,
|
||||
jobs: make(map[string]*store.SnapshotStatus),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -71,6 +75,98 @@ func (data *SnapshotManagerData) CreateSnapshot(ctx context.Context, sourceDir s
|
||||
// Generate a unique ID for the snapshot
|
||||
snapshotID := uuid.New().String()
|
||||
|
||||
return data.createSnapshotInternal(ctx, sourceDir, name, parentID, snapshotID, nil)
|
||||
}
|
||||
|
||||
func (data *SnapshotManagerData) CreateSnapshotAsync(ctx context.Context, sourceDir string, name string, parentID string, onStart func(), onFinish func(string, error)) (string, error) {
|
||||
// Validate source directory
|
||||
info, err := os.Stat(sourceDir)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return "", models.ErrSourceNotFound
|
||||
}
|
||||
return "", fmt.Errorf("failed to access source directory: %w", err)
|
||||
}
|
||||
|
||||
if !info.IsDir() {
|
||||
return "", models.ErrSourceNotDirectory
|
||||
}
|
||||
|
||||
// Check if parent exists if specified
|
||||
if parentID != "" {
|
||||
_, err := data.metadataStore.GetSnapshotMetadata(ctx, parentID)
|
||||
if err != nil {
|
||||
fmt.Println("failed to check parent snapshot: %w", err)
|
||||
parentID = ""
|
||||
}
|
||||
}
|
||||
|
||||
snapshotID := uuid.New().String()
|
||||
|
||||
data.jobsMutex.Lock()
|
||||
data.jobs[snapshotID] = &store.SnapshotStatus{
|
||||
ID: snapshotID,
|
||||
Status: "pending",
|
||||
Progress: 0,
|
||||
}
|
||||
data.jobsMutex.Unlock()
|
||||
|
||||
go func() {
|
||||
if onStart != nil {
|
||||
onStart()
|
||||
}
|
||||
|
||||
data.jobsMutex.Lock()
|
||||
if job, ok := data.jobs[snapshotID]; ok {
|
||||
job.Status = "running"
|
||||
}
|
||||
data.jobsMutex.Unlock()
|
||||
|
||||
_, err := data.createSnapshotInternal(context.Background(), sourceDir, name, parentID, snapshotID, func(current, total int64) {
|
||||
data.jobsMutex.Lock()
|
||||
defer data.jobsMutex.Unlock()
|
||||
if job, ok := data.jobs[snapshotID]; ok {
|
||||
if total > 0 {
|
||||
job.Progress = float64(current) / float64(total)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
data.jobsMutex.Lock()
|
||||
if job, ok := data.jobs[snapshotID]; ok {
|
||||
if err != nil {
|
||||
job.Status = "failed"
|
||||
job.Error = err.Error()
|
||||
} else {
|
||||
job.Status = "done"
|
||||
job.Progress = 1.0
|
||||
}
|
||||
}
|
||||
data.jobsMutex.Unlock()
|
||||
|
||||
if onFinish != nil {
|
||||
onFinish(snapshotID, err)
|
||||
}
|
||||
}()
|
||||
|
||||
return snapshotID, nil
|
||||
}
|
||||
|
||||
func (data *SnapshotManagerData) GetSnapshotStatus(ctx context.Context, jobID string) (*store.SnapshotStatus, error) {
|
||||
data.jobsMutex.RLock()
|
||||
defer data.jobsMutex.RUnlock()
|
||||
|
||||
job, ok := data.jobs[jobID]
|
||||
if !ok {
|
||||
return nil, models.ErrNotFound
|
||||
}
|
||||
|
||||
// Return a copy to avoid race conditions
|
||||
statusCopy := *job
|
||||
return &statusCopy, nil
|
||||
}
|
||||
|
||||
func (data *SnapshotManagerData) createSnapshotInternal(ctx context.Context, sourceDir, name, parentID, snapshotID string, onProgress func(current, total int64)) (*store.Snapshot, error) {
|
||||
// Create a temporary file for the archive in the working directory
|
||||
tempFilePath := filepath.Join(data.blobStore.GetBaseDir(), "temp-"+snapshotID+".zip")
|
||||
tempFile, err := os.Create(tempFilePath)
|
||||
@@ -81,7 +177,7 @@ func (data *SnapshotManagerData) CreateSnapshot(ctx context.Context, sourceDir s
|
||||
defer os.Remove(tempFilePath) // Clean up temp file after we're done
|
||||
|
||||
// Create archive of the source directory
|
||||
if err := archive.CreateArchive(sourceDir, tempFilePath); err != nil {
|
||||
if err := archive.CreateArchiveWithProgress(sourceDir, tempFilePath, onProgress); err != nil {
|
||||
return nil, fmt.Errorf("failed to create archive: %w", err)
|
||||
}
|
||||
|
||||
|
||||
@@ -37,6 +37,14 @@ type SnapshotInfo struct {
|
||||
CreationTime time.Time // Время создания
|
||||
}
|
||||
|
||||
// SnapshotStatus represents the status of an asynchronous snapshot creation process.
|
||||
type SnapshotStatus struct {
|
||||
ID string // Unique identifier of the job (usually same as Snapshot ID)
|
||||
Status string // Current status: "pending", "running", "done", "failed"
|
||||
Progress float64 // Progress from 0.0 to 1.0
|
||||
Error string // Error message if failed
|
||||
}
|
||||
|
||||
// ListOptions provides options for filtering and paginating snapshot lists
|
||||
type ListOptions struct {
|
||||
FilterByName string // Filter snapshots by name (substring match)
|
||||
|
||||
Reference in New Issue
Block a user