From 058aff40192e77beb3514a2857b1425ac13d60a5 Mon Sep 17 00:00:00 2001 From: Alexander Lazarenko Date: Fri, 21 Nov 2025 14:46:16 +0300 Subject: [PATCH] =?UTF-8?q?=D0=94=D0=BE=D0=B1=D0=B0=D0=B2=D0=BB=D0=B5?= =?UTF-8?q?=D0=BD=D0=B0=20=D0=B2=D0=BE=D0=B7=D0=BC=D0=BE=D0=B6=D0=BD=D0=BE?= =?UTF-8?q?=D1=81=D1=82=D1=8C=20=D0=B0=D1=81=D0=B8=D0=BD=D1=85=D1=80=D0=BE?= =?UTF-8?q?=D0=BD=D0=BD=D0=BE=D0=B3=D0=BE=20=D1=81=D0=BE=D0=B7=D0=B4=D0=B0?= =?UTF-8?q?=D0=BD=D0=B8=D1=8F=20=D1=81=D0=BD=D0=B0=D0=BF=D1=88=D0=BE=D1=82?= =?UTF-8?q?=D0=BE=D0=B2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit А также обновлены зависимости --- api.go | 50 ++++++++++++++++++ archive/archive.go | 98 +++++++++++++++++++++++++++++++++++ async_test.go | 115 +++++++++++++++++++++++++++++++++++++++++ go.mod | 18 +++---- go.sum | 66 +++++++++++------------ interfaces/snapshot.go | 9 ++++ manager.go | 98 ++++++++++++++++++++++++++++++++++- store/store.go | 8 +++ 8 files changed, 420 insertions(+), 42 deletions(-) create mode 100644 async_test.go diff --git a/api.go b/api.go index 0c59031..f42949b 100644 --- a/api.go +++ b/api.go @@ -227,6 +227,56 @@ func (a *Agate) SaveSnapshot(ctx context.Context, name string, parentID string) return snapshot.ID, nil } +// SnapshotAsync creates a new snapshot asynchronously. +// It returns the job ID (which is also the snapshot ID) immediately. +// The actual snapshot creation happens in a background goroutine. +// Use GetSnapshotStatus to check the progress. +func (a *Agate) SnapshotAsync(ctx context.Context, name string, parentID string) (string, error) { + a.options.Logger.Printf("Starting async snapshot creation with name: %s", name) + + // If parentID is not provided, use the current snapshot ID + if parentID == "" { + parentID = a.currentSnapshotID + } + + return a.manager.CreateSnapshotAsync(ctx, a.options.BlobStore.GetActiveDir(), name, parentID, + func() { + // onStart: Lock mutex and close resources + a.mutex.Lock() + if a.options.CloseFunc != nil { + if err := a.options.CloseFunc(); err != nil { + a.options.Logger.Printf("ERROR: failed to close resources before async snapshot: %v", err) + } + } + }, + func(id string, err error) { + // onFinish: Open resources, update state, and unlock mutex + defer a.mutex.Unlock() + + if a.options.OpenFunc != nil { + if err := a.options.OpenFunc(a.options.BlobStore.GetActiveDir()); err != nil { + a.options.Logger.Printf("ERROR: failed to open resources after async snapshot: %v", err) + } + } + + if err == nil { + a.currentSnapshotID = id + if err := a.saveCurrentSnapshotID(); err != nil { + a.options.Logger.Printf("ERROR: failed to save current snapshot ID: %v", err) + } + a.options.Logger.Printf("Async snapshot %s created successfully", id) + } else { + a.options.Logger.Printf("Async snapshot creation failed: %v", err) + } + }, + ) +} + +// GetSnapshotStatus returns the status of an asynchronous snapshot creation job. +func (a *Agate) GetSnapshotStatus(ctx context.Context, jobID string) (*store.SnapshotStatus, error) { + return a.manager.GetSnapshotStatus(ctx, jobID) +} + // RestoreSnapshot extracts a snapshot to the active directory. func (a *Agate) RestoreSnapshot(ctx context.Context, snapshotID string) error { a.mutex.Lock() diff --git a/archive/archive.go b/archive/archive.go index 3a8799d..1da8425 100644 --- a/archive/archive.go +++ b/archive/archive.go @@ -95,6 +95,104 @@ func CreateArchive(sourceDir, targetPath string) error { return nil } +// CreateArchiveWithProgress creates a ZIP archive with progress reporting. +// onProgress is called with the current number of bytes written and the total size. +func CreateArchiveWithProgress(sourceDir, targetPath string, onProgress func(current, total int64)) error { + info, err := os.Stat(sourceDir) + if err != nil { + return fmt.Errorf("failed to stat source directory %s: %w", sourceDir, err) + } + if !info.IsDir() { + return fmt.Errorf("source %s is not a directory", sourceDir) + } + + // Calculate total size + var totalSize int64 + err = filepath.Walk(sourceDir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if !info.IsDir() { + totalSize += info.Size() + } + return nil + }) + if err != nil { + return fmt.Errorf("failed to calculate total size of %s: %w", sourceDir, err) + } + + // Create file for ZIP archive + outFile, err := os.Create(targetPath) + if err != nil { + return fmt.Errorf("failed to create target archive file %s: %w", targetPath, err) + } + defer outFile.Close() + + // Create zip.Writer + zipWriter := zip.NewWriter(outFile) + defer zipWriter.Close() + + var currentSize int64 + + // Recursively walk sourceDir + err = filepath.Walk(sourceDir, func(filePath string, fileInfo os.FileInfo, walkErr error) error { + if walkErr != nil { + return fmt.Errorf("error walking path %s: %w", filePath, walkErr) + } + + // Skip sourceDir itself + if filePath == sourceDir { + return nil + } + + // Create relative path + relativePath := strings.TrimPrefix(filePath, sourceDir+string(filepath.Separator)) + relativePath = filepath.ToSlash(relativePath) + + // Check if directory + if fileInfo.IsDir() { + _, err = zipWriter.Create(relativePath + "/") + if err != nil { + return fmt.Errorf("failed to create directory entry %s in archive: %w", relativePath, err) + } + return nil + } + + // Open file for reading + fileToArchive, err := os.Open(filePath) + if err != nil { + return fmt.Errorf("failed to open file %s for archiving: %w", filePath, err) + } + defer fileToArchive.Close() + + // Create archive entry + zipEntryWriter, err := zipWriter.Create(relativePath) + if err != nil { + return fmt.Errorf("failed to create entry %s in archive: %w", relativePath, err) + } + + // Copy content + n, err := io.Copy(zipEntryWriter, fileToArchive) + if err != nil { + return fmt.Errorf("failed to copy file content %s to archive: %w", filePath, err) + } + + currentSize += n + if onProgress != nil { + onProgress(currentSize, totalSize) + } + + return nil + }) + + if err != nil { + os.Remove(targetPath) + return fmt.Errorf("failed during directory walk for archiving %s: %w", sourceDir, err) + } + + return nil +} + // ListArchiveContents читает ZIP-архив и возвращает информацию о его содержимом. func ListArchiveContents(archivePath string) ([]ArchiveEntryInfo, error) { // Открываем ZIP-архив diff --git a/async_test.go b/async_test.go new file mode 100644 index 0000000..51bef00 --- /dev/null +++ b/async_test.go @@ -0,0 +1,115 @@ +package agate + +import ( + "context" + "os" + "path/filepath" + "testing" + "time" +) + +func TestSnapshotAsync(t *testing.T) { + // Setup temporary work directory + workDir, err := os.MkdirTemp("", "agate_async_test") + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll(workDir) + + // Initialize Agate + opts := AgateOptions{ + WorkDir: workDir, + Logger: nil, // Disable logging for test + } + + ag, err := New(opts) + if err != nil { + t.Fatalf("Failed to initialize Agate: %v", err) + } + defer ag.Close() + + // Get active directory and create some dummy files + activeDir := ag.GetActiveDir() + if err := os.MkdirAll(activeDir, 0755); err != nil { + t.Fatalf("Failed to create active dir: %v", err) + } + + // Create a large-ish file to ensure it takes some time (though still fast) + dummyFile := filepath.Join(activeDir, "data.bin") + data := make([]byte, 1024*1024) // 1MB + if err := os.WriteFile(dummyFile, data, 0644); err != nil { + t.Fatalf("Failed to create dummy file: %v", err) + } + + // Start async snapshot + ctx := context.Background() + snapshotID, err := ag.SnapshotAsync(ctx, "async-snap", "") + if err != nil { + t.Fatalf("SnapshotAsync failed: %v", err) + } + + if snapshotID == "" { + t.Fatal("SnapshotAsync returned empty ID") + } + + // Check status immediately + status, err := ag.GetSnapshotStatus(ctx, snapshotID) + if err != nil { + t.Fatalf("GetSnapshotStatus failed: %v", err) + } + + // Status should be pending or running + if status.Status != "pending" && status.Status != "running" { + t.Errorf("Initial status should be pending or running, got: %s", status.Status) + } + + // Poll for completion + timeout := time.After(5 * time.Second) + ticker := time.NewTicker(10 * time.Millisecond) + defer ticker.Stop() + + done := false + for !done { + select { + case <-timeout: + t.Fatal("Timeout waiting for snapshot completion") + case <-ticker.C: + status, err := ag.GetSnapshotStatus(ctx, snapshotID) + if err != nil { + t.Fatalf("GetSnapshotStatus failed during polling: %v", err) + } + + if status.Status == "done" { + done = true + if status.Progress != 1.0 { + t.Errorf("Expected progress 1.0, got %f", status.Progress) + } + } else if status.Status == "failed" { + t.Fatalf("Snapshot creation failed: %s", status.Error) + } + } + } + + // Verify snapshot exists + snaps, err := ag.ListSnapshots(ctx) + if err != nil { + t.Fatalf("ListSnapshots failed: %v", err) + } + + found := false + for _, s := range snaps { + if s.ID == snapshotID { + found = true + break + } + } + + if !found { + t.Errorf("Snapshot %s not found in list", snapshotID) + } + + // Verify current snapshot ID is updated + if ag.GetCurrentSnapshotID() != snapshotID { + t.Errorf("Current snapshot ID not updated. Expected %s, got %s", snapshotID, ag.GetCurrentSnapshotID()) + } +} diff --git a/go.mod b/go.mod index 3adc18d..6431184 100644 --- a/go.mod +++ b/go.mod @@ -4,16 +4,16 @@ go 1.24.3 require ( github.com/google/uuid v1.6.0 - github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 - github.com/mattn/go-sqlite3 v1.14.28 - google.golang.org/genproto/googleapis/api v0.0.0-20250505200425-f936aa4a68b2 - google.golang.org/grpc v1.72.0 - google.golang.org/protobuf v1.36.6 + github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 + github.com/mattn/go-sqlite3 v1.14.32 + google.golang.org/genproto/googleapis/api v0.0.0-20251111163417-95abcf5c77ba + google.golang.org/grpc v1.77.0 + google.golang.org/protobuf v1.36.10 ) require ( - golang.org/x/net v0.40.0 // indirect - golang.org/x/sys v0.33.0 // indirect - golang.org/x/text v0.25.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20250505200425-f936aa4a68b2 // indirect + golang.org/x/net v0.47.0 // indirect + golang.org/x/sys v0.38.0 // indirect + golang.org/x/text v0.31.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20251111163417-95abcf5c77ba // indirect ) diff --git a/go.sum b/go.sum index 59bf6d7..14c90e7 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= -github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= @@ -8,33 +8,35 @@ github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 h1:5ZPtiqj0JL5oKWmcsq4VMaAW5ukBEgSGXEN89zeH1Jo= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3/go.mod h1:ndYquD05frm2vACXE1nsccT4oJzjhw2arTS2cpUD1PI= -github.com/mattn/go-sqlite3 v1.14.28 h1:ThEiQrnbtumT+QMknw63Befp/ce/nUPgBPMlRFEum7A= -github.com/mattn/go-sqlite3 v1.14.28/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= -go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= -go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= -go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= -go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= -go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ= -go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE= -go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A= -go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU= -go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk= -go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w= -go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k= -go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= -golang.org/x/net v0.40.0 h1:79Xs7wF06Gbdcg4kdCCIQArK11Z1hr5POQ6+fIYHNuY= -golang.org/x/net v0.40.0/go.mod h1:y0hY0exeL2Pku80/zKK7tpntoX23cqL3Oa6njdgRtds= -golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= -golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= -golang.org/x/text v0.25.0 h1:qVyWApTSYLk/drJRO5mDlNYskwQznZmkpV2c8q9zls4= -golang.org/x/text v0.25.0/go.mod h1:WEdwpYrmk1qmdHvhkSTNPm3app7v4rsT8F2UD6+VHIA= -google.golang.org/genproto/googleapis/api v0.0.0-20250505200425-f936aa4a68b2 h1:vPV0tzlsK6EzEDHNNH5sa7Hs9bd7iXR7B1tSiPepkV0= -google.golang.org/genproto/googleapis/api v0.0.0-20250505200425-f936aa4a68b2/go.mod h1:pKLAc5OolXC3ViWGI62vvC0n10CpwAtRcTNCFwTKBEw= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250505200425-f936aa4a68b2 h1:IqsN8hx+lWLqlN+Sc3DoMy/watjofWiU8sRFgQ8fhKM= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250505200425-f936aa4a68b2/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= -google.golang.org/grpc v1.72.0 h1:S7UkcVa60b5AAQTaO6ZKamFp1zMZSU0fGDK2WZLbBnM= -google.golang.org/grpc v1.72.0/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM= -google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= -google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 h1:NmZ1PKzSTQbuGHw9DGPFomqkkLWMC+vZCkfs+FHv1Vg= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3/go.mod h1:zQrxl1YP88HQlA6i9c63DSVPFklWpGX4OWAc9bFuaH4= +github.com/mattn/go-sqlite3 v1.14.32 h1:JD12Ag3oLy1zQA+BNn74xRgaBbdhbNIDYvQUEuuErjs= +github.com/mattn/go-sqlite3 v1.14.32/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= +go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= +go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= +go.opentelemetry.io/otel v1.38.0 h1:RkfdswUDRimDg0m2Az18RKOsnI8UDzppJAtj01/Ymk8= +go.opentelemetry.io/otel v1.38.0/go.mod h1:zcmtmQ1+YmQM9wrNsTGV/q/uyusom3P8RxwExxkZhjM= +go.opentelemetry.io/otel/metric v1.38.0 h1:Kl6lzIYGAh5M159u9NgiRkmoMKjvbsKtYRwgfrA6WpA= +go.opentelemetry.io/otel/metric v1.38.0/go.mod h1:kB5n/QoRM8YwmUahxvI3bO34eVtQf2i4utNVLr9gEmI= +go.opentelemetry.io/otel/sdk v1.38.0 h1:l48sr5YbNf2hpCUj/FoGhW9yDkl+Ma+LrVl8qaM5b+E= +go.opentelemetry.io/otel/sdk v1.38.0/go.mod h1:ghmNdGlVemJI3+ZB5iDEuk4bWA3GkTpW+DOoZMYBVVg= +go.opentelemetry.io/otel/sdk/metric v1.38.0 h1:aSH66iL0aZqo//xXzQLYozmWrXxyFkBJ6qT5wthqPoM= +go.opentelemetry.io/otel/sdk/metric v1.38.0/go.mod h1:dg9PBnW9XdQ1Hd6ZnRz689CbtrUp0wMMs9iPcgT9EZA= +go.opentelemetry.io/otel/trace v1.38.0 h1:Fxk5bKrDZJUH+AMyyIXGcFAPah0oRcT+LuNtJrmcNLE= +go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42sO++GHkg4wwhs= +golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY= +golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU= +golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc= +golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM= +golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM= +gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= +gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= +google.golang.org/genproto/googleapis/api v0.0.0-20251111163417-95abcf5c77ba h1:B14OtaXuMaCQsl2deSvNkyPKIzq3BjfxQp8d00QyWx4= +google.golang.org/genproto/googleapis/api v0.0.0-20251111163417-95abcf5c77ba/go.mod h1:G5IanEx8/PgI9w6CFcYQf7jMtHQhZruvfM1i3qOqk5U= +google.golang.org/genproto/googleapis/rpc v0.0.0-20251111163417-95abcf5c77ba h1:UKgtfRM7Yh93Sya0Fo8ZzhDP4qBckrrxEr2oF5UIVb8= +google.golang.org/genproto/googleapis/rpc v0.0.0-20251111163417-95abcf5c77ba/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk= +google.golang.org/grpc v1.77.0 h1:wVVY6/8cGA6vvffn+wWK5ToddbgdU3d8MNENr4evgXM= +google.golang.org/grpc v1.77.0/go.mod h1:z0BY1iVj0q8E1uSQCjL9cppRj+gnZjzDnzV0dHhrNig= +google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE= +google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= diff --git a/interfaces/snapshot.go b/interfaces/snapshot.go index b665eee..3bb24f4 100644 --- a/interfaces/snapshot.go +++ b/interfaces/snapshot.go @@ -13,6 +13,15 @@ type SnapshotManager interface { // Returns the created Snapshot with its metadata or an error if the process fails. CreateSnapshot(ctx context.Context, sourceDir string, name string, parentID string) (*store.Snapshot, error) + // CreateSnapshotAsync initiates a background process to create a snapshot. + // Returns the job ID (which is also the snapshot ID) or an error if the process couldn't start. + // onStart is called in the background goroutine before the snapshot creation starts. + // onFinish is called in the background goroutine after the snapshot creation finishes (successfully or with error). + CreateSnapshotAsync(ctx context.Context, sourceDir string, name string, parentID string, onStart func(), onFinish func(string, error)) (string, error) + + // GetSnapshotStatus retrieves the status of an asynchronous snapshot creation job. + GetSnapshotStatus(ctx context.Context, jobID string) (*store.SnapshotStatus, error) + // GetSnapshotDetails retrieves detailed metadata for a specific snapshot identified by its unique snapshotID. // Returns a Snapshot object containing metadata GetSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error) diff --git a/manager.go b/manager.go index cf16515..59ae6dd 100644 --- a/manager.go +++ b/manager.go @@ -11,6 +11,7 @@ import ( "os" "path/filepath" "strings" + "sync" "time" "gitea.unprism.ru/KRBL/Agate/models" @@ -26,6 +27,8 @@ type SnapshotManagerData struct { metadataStore store.MetadataStore blobStore store.BlobStore logger *log.Logger + jobs map[string]*store.SnapshotStatus + jobsMutex sync.RWMutex } func CreateSnapshotManager(metadataStore store.MetadataStore, blobStore store.BlobStore, logger *log.Logger) (interfaces.SnapshotManager, error) { @@ -42,6 +45,7 @@ func CreateSnapshotManager(metadataStore store.MetadataStore, blobStore store.Bl metadataStore: metadataStore, blobStore: blobStore, logger: logger, + jobs: make(map[string]*store.SnapshotStatus), }, nil } @@ -71,6 +75,98 @@ func (data *SnapshotManagerData) CreateSnapshot(ctx context.Context, sourceDir s // Generate a unique ID for the snapshot snapshotID := uuid.New().String() + return data.createSnapshotInternal(ctx, sourceDir, name, parentID, snapshotID, nil) +} + +func (data *SnapshotManagerData) CreateSnapshotAsync(ctx context.Context, sourceDir string, name string, parentID string, onStart func(), onFinish func(string, error)) (string, error) { + // Validate source directory + info, err := os.Stat(sourceDir) + if err != nil { + if os.IsNotExist(err) { + return "", models.ErrSourceNotFound + } + return "", fmt.Errorf("failed to access source directory: %w", err) + } + + if !info.IsDir() { + return "", models.ErrSourceNotDirectory + } + + // Check if parent exists if specified + if parentID != "" { + _, err := data.metadataStore.GetSnapshotMetadata(ctx, parentID) + if err != nil { + fmt.Println("failed to check parent snapshot: %w", err) + parentID = "" + } + } + + snapshotID := uuid.New().String() + + data.jobsMutex.Lock() + data.jobs[snapshotID] = &store.SnapshotStatus{ + ID: snapshotID, + Status: "pending", + Progress: 0, + } + data.jobsMutex.Unlock() + + go func() { + if onStart != nil { + onStart() + } + + data.jobsMutex.Lock() + if job, ok := data.jobs[snapshotID]; ok { + job.Status = "running" + } + data.jobsMutex.Unlock() + + _, err := data.createSnapshotInternal(context.Background(), sourceDir, name, parentID, snapshotID, func(current, total int64) { + data.jobsMutex.Lock() + defer data.jobsMutex.Unlock() + if job, ok := data.jobs[snapshotID]; ok { + if total > 0 { + job.Progress = float64(current) / float64(total) + } + } + }) + + data.jobsMutex.Lock() + if job, ok := data.jobs[snapshotID]; ok { + if err != nil { + job.Status = "failed" + job.Error = err.Error() + } else { + job.Status = "done" + job.Progress = 1.0 + } + } + data.jobsMutex.Unlock() + + if onFinish != nil { + onFinish(snapshotID, err) + } + }() + + return snapshotID, nil +} + +func (data *SnapshotManagerData) GetSnapshotStatus(ctx context.Context, jobID string) (*store.SnapshotStatus, error) { + data.jobsMutex.RLock() + defer data.jobsMutex.RUnlock() + + job, ok := data.jobs[jobID] + if !ok { + return nil, models.ErrNotFound + } + + // Return a copy to avoid race conditions + statusCopy := *job + return &statusCopy, nil +} + +func (data *SnapshotManagerData) createSnapshotInternal(ctx context.Context, sourceDir, name, parentID, snapshotID string, onProgress func(current, total int64)) (*store.Snapshot, error) { // Create a temporary file for the archive in the working directory tempFilePath := filepath.Join(data.blobStore.GetBaseDir(), "temp-"+snapshotID+".zip") tempFile, err := os.Create(tempFilePath) @@ -81,7 +177,7 @@ func (data *SnapshotManagerData) CreateSnapshot(ctx context.Context, sourceDir s defer os.Remove(tempFilePath) // Clean up temp file after we're done // Create archive of the source directory - if err := archive.CreateArchive(sourceDir, tempFilePath); err != nil { + if err := archive.CreateArchiveWithProgress(sourceDir, tempFilePath, onProgress); err != nil { return nil, fmt.Errorf("failed to create archive: %w", err) } diff --git a/store/store.go b/store/store.go index e0a322a..d7b5a57 100644 --- a/store/store.go +++ b/store/store.go @@ -37,6 +37,14 @@ type SnapshotInfo struct { CreationTime time.Time // Время создания } +// SnapshotStatus represents the status of an asynchronous snapshot creation process. +type SnapshotStatus struct { + ID string // Unique identifier of the job (usually same as Snapshot ID) + Status string // Current status: "pending", "running", "done", "failed" + Progress float64 // Progress from 0.0 to 1.0 + Error string // Error message if failed +} + // ListOptions provides options for filtering and paginating snapshot lists type ListOptions struct { FilterByName string // Filter snapshots by name (substring match)