6 Commits

19 changed files with 893 additions and 1311 deletions

View File

@@ -41,10 +41,6 @@ test-coverage:
go test -v -coverprofile=coverage.out ./... go test -v -coverprofile=coverage.out ./...
go tool cover -html=coverage.out -o coverage.html go tool cover -html=coverage.out -o coverage.html
# Run linter
lint:
golangci-lint run
# Run all checks (tests + linter) # Run all checks (tests + linter)
check: test lint check: test lint

174
api.go
View File

@@ -4,14 +4,17 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"gitea.unprism.ru/KRBL/Agate/archive"
"gitea.unprism.ru/KRBL/Agate/interfaces"
"gitea.unprism.ru/KRBL/Agate/remote"
"io" "io"
"log" "log"
"os" "os"
"path/filepath" "path/filepath"
"sync" "sync"
"time"
"gitea.unprism.ru/KRBL/Agate/archive"
"gitea.unprism.ru/KRBL/Agate/interfaces"
"gitea.unprism.ru/KRBL/Agate/models"
"gitea.unprism.ru/KRBL/Agate/remote"
"gitea.unprism.ru/KRBL/Agate/store" "gitea.unprism.ru/KRBL/Agate/store"
"gitea.unprism.ru/KRBL/Agate/stores" "gitea.unprism.ru/KRBL/Agate/stores"
@@ -224,6 +227,56 @@ func (a *Agate) SaveSnapshot(ctx context.Context, name string, parentID string)
return snapshot.ID, nil return snapshot.ID, nil
} }
// SnapshotAsync creates a new snapshot asynchronously.
// It returns the job ID (which is also the snapshot ID) immediately.
// The actual snapshot creation happens in a background goroutine.
// Use GetSnapshotStatus to check the progress.
func (a *Agate) SnapshotAsync(ctx context.Context, name string, parentID string) (string, error) {
a.options.Logger.Printf("Starting async snapshot creation with name: %s", name)
// If parentID is not provided, use the current snapshot ID
if parentID == "" {
parentID = a.currentSnapshotID
}
return a.manager.CreateSnapshotAsync(ctx, a.options.BlobStore.GetActiveDir(), name, parentID,
func() {
// onStart: Lock mutex and close resources
a.mutex.Lock()
if a.options.CloseFunc != nil {
if err := a.options.CloseFunc(); err != nil {
a.options.Logger.Printf("ERROR: failed to close resources before async snapshot: %v", err)
}
}
},
func(id string, err error) {
// onFinish: Open resources, update state, and unlock mutex
defer a.mutex.Unlock()
if a.options.OpenFunc != nil {
if err := a.options.OpenFunc(a.options.BlobStore.GetActiveDir()); err != nil {
a.options.Logger.Printf("ERROR: failed to open resources after async snapshot: %v", err)
}
}
if err == nil {
a.currentSnapshotID = id
if err := a.saveCurrentSnapshotID(); err != nil {
a.options.Logger.Printf("ERROR: failed to save current snapshot ID: %v", err)
}
a.options.Logger.Printf("Async snapshot %s created successfully", id)
} else {
a.options.Logger.Printf("Async snapshot creation failed: %v", err)
}
},
)
}
// GetSnapshotStatus returns the status of an asynchronous snapshot creation job.
func (a *Agate) GetSnapshotStatus(ctx context.Context, jobID string) (*store.SnapshotStatus, error) {
return a.manager.GetSnapshotStatus(ctx, jobID)
}
// RestoreSnapshot extracts a snapshot to the active directory. // RestoreSnapshot extracts a snapshot to the active directory.
func (a *Agate) RestoreSnapshot(ctx context.Context, snapshotID string) error { func (a *Agate) RestoreSnapshot(ctx context.Context, snapshotID string) error {
a.mutex.Lock() a.mutex.Lock()
@@ -325,7 +378,9 @@ func (a *Agate) saveCurrentSnapshotID() error {
if a.currentSnapshotID == "" { if a.currentSnapshotID == "" {
// If there's no current snapshot ID, remove the file if it exists // If there's no current snapshot ID, remove the file if it exists
if _, err := os.Stat(a.currentIDFile); err == nil { if _, err := os.Stat(a.currentIDFile); err == nil {
return os.Remove(a.currentIDFile) if err := os.Remove(a.currentIDFile); err != nil {
return err
}
} }
return nil return nil
} }
@@ -358,7 +413,11 @@ func (a *Agate) GetRemoteSnapshot(ctx context.Context, address string, snapshotI
if err != nil { if err != nil {
return err return err
} }
defer client.Close() defer func() {
if err := client.Close(); err != nil {
a.options.Logger.Printf("ERROR: failed to close client: %v", err)
}
}()
remoteSnapshot, err := client.FetchSnapshotDetails(ctx, snapshotID) remoteSnapshot, err := client.FetchSnapshotDetails(ctx, snapshotID)
if err != nil { if err != nil {
@@ -373,7 +432,11 @@ func (a *Agate) GetRemoteSnapshot(ctx context.Context, address string, snapshotI
if err := os.MkdirAll(newSnapshotDir, 0755); err != nil { if err := os.MkdirAll(newSnapshotDir, 0755); err != nil {
return fmt.Errorf("failed to create new snapshot directory: %w", err) return fmt.Errorf("failed to create new snapshot directory: %w", err)
} }
defer os.RemoveAll(newSnapshotDir) defer func() {
if err := os.RemoveAll(newSnapshotDir); err != nil {
a.options.Logger.Printf("ERROR: failed to remove temp dir: %v", err)
}
}()
if localParentID != "" { if localParentID != "" {
if err := a.manager.ExtractSnapshot(ctx, localParentID, newSnapshotDir, false); err != nil { if err := a.manager.ExtractSnapshot(ctx, localParentID, newSnapshotDir, false); err != nil {
@@ -409,7 +472,11 @@ func (a *Agate) GetRemoteSnapshot(ctx context.Context, address string, snapshotI
if err := os.Rename(diffPartPath, diffArchivePath); err != nil { if err := os.Rename(diffPartPath, diffArchivePath); err != nil {
return fmt.Errorf("failed to finalize downloaded diff: %w", err) return fmt.Errorf("failed to finalize downloaded diff: %w", err)
} }
defer os.Remove(diffArchivePath) defer func() {
if err := os.Remove(diffArchivePath); err != nil {
a.options.Logger.Printf("ERROR: failed to remove temp file: %v", err)
}
}()
if err := extractArchive(diffArchivePath, newSnapshotDir); err != nil { if err := extractArchive(diffArchivePath, newSnapshotDir); err != nil {
return fmt.Errorf("failed to extract diff archive: %w", err) return fmt.Errorf("failed to extract diff archive: %w", err)
@@ -419,7 +486,11 @@ func (a *Agate) GetRemoteSnapshot(ctx context.Context, address string, snapshotI
if err := archive.CreateArchive(newSnapshotDir, finalArchivePath); err != nil { if err := archive.CreateArchive(newSnapshotDir, finalArchivePath); err != nil {
return fmt.Errorf("failed to create final snapshot archive: %w", err) return fmt.Errorf("failed to create final snapshot archive: %w", err)
} }
defer os.Remove(finalArchivePath) defer func() {
if err := os.Remove(finalArchivePath); err != nil {
a.options.Logger.Printf("ERROR: failed to remove temp file: %v", err)
}
}()
finalArchiveFile, err := os.Open(finalArchivePath) finalArchiveFile, err := os.Open(finalArchivePath)
if err != nil { if err != nil {
@@ -443,3 +514,90 @@ func (a *Agate) GetRemoteSnapshot(ctx context.Context, address string, snapshotI
func (a *Agate) GetCurrentSnapshotID() string { func (a *Agate) GetCurrentSnapshotID() string {
return a.currentSnapshotID return a.currentSnapshotID
} }
// RegisterLocalSnapshot регистрирует локальный файл как блоб снимка и создает
// соответствующую запись в метаданных. Если снимок с таким ID уже существует,
// метод ничего не делает и возвращает nil.
//
// - ctx: Контекст для выполнения операции.
// - snapshotID: ID регистрируемого снимка.
// - parentID: ID родительского снимка. Может быть пустым для полных снимков.
// - name: Описательное имя для снимка.
// - localPath: Абсолютный путь к локальному файлу снимка (полному или дифф-архиву).
func (ag *Agate) RegisterLocalSnapshot(ctx context.Context, snapshotID, parentID, name, localPath string) error {
// 1. Check if snapshot already exists
_, err := ag.manager.GetSnapshotDetails(ctx, snapshotID)
if err == nil {
ag.options.Logger.Printf("snapshot %s already exists, skipping registration", snapshotID)
return nil // Snapshot already exists
}
// We expect ErrNotFound, anything else is a real error.
if !errors.Is(err, models.ErrNotFound) {
return fmt.Errorf("failed to check for existing snapshot: %w", err)
}
// 2. Add the file to the blob store
// Check if blob already exists. If so, we assume it's the correct one and skip overwriting.
// This is to prevent issues when registering a file that is already in the blob store.
_, err = ag.options.BlobStore.GetBlobPath(ctx, snapshotID)
if err == nil {
ag.options.Logger.Printf("blob for snapshot %s already exists, skipping storing it", snapshotID)
} else if errors.Is(err, models.ErrNotFound) {
// Blob does not exist, so we store it.
localFile, err := os.Open(localPath)
if err != nil {
return fmt.Errorf("failed to open local snapshot file: %w", err)
}
defer func() {
if err := localFile.Close(); err != nil {
ag.options.Logger.Printf("ERROR: failed to close local file: %v", err)
}
}()
if _, err = ag.options.BlobStore.StoreBlob(ctx, snapshotID, localFile); err != nil {
return fmt.Errorf("failed to store blob from local file: %w", err)
}
} else {
// Another error occurred when checking for the blob.
return fmt.Errorf("failed to check for existing blob: %w", err)
}
// 3. Create and save snapshot metadata
// We get the file list from the archive to create the metadata.
// Note: This method does not calculate file hashes, so the metadata will be incomplete.
// This is a limitation of the current implementation.
var files []store.FileInfo
archiveFiles, err := archive.ListArchiveContents(localPath)
if err != nil {
// If we can't list the contents, we can't create the metadata.
// We should clean up the blob we just stored.
_ = ag.options.BlobStore.DeleteBlob(ctx, snapshotID)
return fmt.Errorf("failed to list archive contents for metadata creation: %w", err)
}
for _, f := range archiveFiles {
files = append(files, store.FileInfo{
Path: f.Path,
Size: int64(f.Size),
IsDir: f.IsDir,
// SHA256 is intentionally left empty as we don't have it.
})
}
snapshot := store.Snapshot{
ID: snapshotID,
Name: name,
ParentID: parentID,
CreationTime: time.Now(),
Files: files,
}
if err := ag.options.MetadataStore.SaveSnapshotMetadata(ctx, snapshot); err != nil {
// Clean up the blob
_ = ag.options.BlobStore.DeleteBlob(ctx, snapshotID)
return fmt.Errorf("failed to save snapshot metadata: %w", err)
}
ag.options.Logger.Printf("Successfully registered local snapshot %s", snapshotID)
return nil
}

View File

@@ -95,6 +95,104 @@ func CreateArchive(sourceDir, targetPath string) error {
return nil return nil
} }
// CreateArchiveWithProgress creates a ZIP archive with progress reporting.
// onProgress is called with the current number of bytes written and the total size.
func CreateArchiveWithProgress(sourceDir, targetPath string, onProgress func(current, total int64)) error {
info, err := os.Stat(sourceDir)
if err != nil {
return fmt.Errorf("failed to stat source directory %s: %w", sourceDir, err)
}
if !info.IsDir() {
return fmt.Errorf("source %s is not a directory", sourceDir)
}
// Calculate total size
var totalSize int64
err = filepath.Walk(sourceDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
totalSize += info.Size()
}
return nil
})
if err != nil {
return fmt.Errorf("failed to calculate total size of %s: %w", sourceDir, err)
}
// Create file for ZIP archive
outFile, err := os.Create(targetPath)
if err != nil {
return fmt.Errorf("failed to create target archive file %s: %w", targetPath, err)
}
defer outFile.Close()
// Create zip.Writer
zipWriter := zip.NewWriter(outFile)
defer zipWriter.Close()
var currentSize int64
// Recursively walk sourceDir
err = filepath.Walk(sourceDir, func(filePath string, fileInfo os.FileInfo, walkErr error) error {
if walkErr != nil {
return fmt.Errorf("error walking path %s: %w", filePath, walkErr)
}
// Skip sourceDir itself
if filePath == sourceDir {
return nil
}
// Create relative path
relativePath := strings.TrimPrefix(filePath, sourceDir+string(filepath.Separator))
relativePath = filepath.ToSlash(relativePath)
// Check if directory
if fileInfo.IsDir() {
_, err = zipWriter.Create(relativePath + "/")
if err != nil {
return fmt.Errorf("failed to create directory entry %s in archive: %w", relativePath, err)
}
return nil
}
// Open file for reading
fileToArchive, err := os.Open(filePath)
if err != nil {
return fmt.Errorf("failed to open file %s for archiving: %w", filePath, err)
}
defer fileToArchive.Close()
// Create archive entry
zipEntryWriter, err := zipWriter.Create(relativePath)
if err != nil {
return fmt.Errorf("failed to create entry %s in archive: %w", relativePath, err)
}
// Copy content
n, err := io.Copy(zipEntryWriter, fileToArchive)
if err != nil {
return fmt.Errorf("failed to copy file content %s to archive: %w", filePath, err)
}
currentSize += n
if onProgress != nil {
onProgress(currentSize, totalSize)
}
return nil
})
if err != nil {
os.Remove(targetPath)
return fmt.Errorf("failed during directory walk for archiving %s: %w", sourceDir, err)
}
return nil
}
// ListArchiveContents читает ZIP-архив и возвращает информацию о его содержимом. // ListArchiveContents читает ZIP-архив и возвращает информацию о его содержимом.
func ListArchiveContents(archivePath string) ([]ArchiveEntryInfo, error) { func ListArchiveContents(archivePath string) ([]ArchiveEntryInfo, error) {
// Открываем ZIP-архив // Открываем ZIP-архив

115
async_test.go Normal file
View File

@@ -0,0 +1,115 @@
package agate
import (
"context"
"os"
"path/filepath"
"testing"
"time"
)
func TestSnapshotAsync(t *testing.T) {
// Setup temporary work directory
workDir, err := os.MkdirTemp("", "agate_async_test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(workDir)
// Initialize Agate
opts := AgateOptions{
WorkDir: workDir,
Logger: nil, // Disable logging for test
}
ag, err := New(opts)
if err != nil {
t.Fatalf("Failed to initialize Agate: %v", err)
}
defer ag.Close()
// Get active directory and create some dummy files
activeDir := ag.GetActiveDir()
if err := os.MkdirAll(activeDir, 0755); err != nil {
t.Fatalf("Failed to create active dir: %v", err)
}
// Create a large-ish file to ensure it takes some time (though still fast)
dummyFile := filepath.Join(activeDir, "data.bin")
data := make([]byte, 1024*1024) // 1MB
if err := os.WriteFile(dummyFile, data, 0644); err != nil {
t.Fatalf("Failed to create dummy file: %v", err)
}
// Start async snapshot
ctx := context.Background()
snapshotID, err := ag.SnapshotAsync(ctx, "async-snap", "")
if err != nil {
t.Fatalf("SnapshotAsync failed: %v", err)
}
if snapshotID == "" {
t.Fatal("SnapshotAsync returned empty ID")
}
// Check status immediately
status, err := ag.GetSnapshotStatus(ctx, snapshotID)
if err != nil {
t.Fatalf("GetSnapshotStatus failed: %v", err)
}
// Status should be pending or running
if status.Status != "pending" && status.Status != "running" {
t.Errorf("Initial status should be pending or running, got: %s", status.Status)
}
// Poll for completion
timeout := time.After(5 * time.Second)
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
done := false
for !done {
select {
case <-timeout:
t.Fatal("Timeout waiting for snapshot completion")
case <-ticker.C:
status, err := ag.GetSnapshotStatus(ctx, snapshotID)
if err != nil {
t.Fatalf("GetSnapshotStatus failed during polling: %v", err)
}
if status.Status == "done" {
done = true
if status.Progress != 1.0 {
t.Errorf("Expected progress 1.0, got %f", status.Progress)
}
} else if status.Status == "failed" {
t.Fatalf("Snapshot creation failed: %s", status.Error)
}
}
}
// Verify snapshot exists
snaps, err := ag.ListSnapshots(ctx)
if err != nil {
t.Fatalf("ListSnapshots failed: %v", err)
}
found := false
for _, s := range snaps {
if s.ID == snapshotID {
found = true
break
}
}
if !found {
t.Errorf("Snapshot %s not found in list", snapshotID)
}
// Verify current snapshot ID is updated
if ag.GetCurrentSnapshotID() != snapshotID {
t.Errorf("Current snapshot ID not updated. Expected %s, got %s", snapshotID, ag.GetCurrentSnapshotID())
}
}

215
go.mod
View File

@@ -2,215 +2,18 @@ module gitea.unprism.ru/KRBL/Agate
go 1.24.3 go 1.24.3
tool github.com/golangci/golangci-lint/v2/cmd/golangci-lint
require ( require (
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3
github.com/mattn/go-sqlite3 v1.14.28 github.com/mattn/go-sqlite3 v1.14.32
google.golang.org/genproto/googleapis/api v0.0.0-20250505200425-f936aa4a68b2 google.golang.org/genproto/googleapis/api v0.0.0-20251111163417-95abcf5c77ba
google.golang.org/grpc v1.72.0 google.golang.org/grpc v1.77.0
google.golang.org/protobuf v1.36.6 google.golang.org/protobuf v1.36.10
) )
require ( require (
4d63.com/gocheckcompilerdirectives v1.3.0 // indirect golang.org/x/net v0.47.0 // indirect
4d63.com/gochecknoglobals v0.2.2 // indirect golang.org/x/sys v0.38.0 // indirect
github.com/4meepo/tagalign v1.4.2 // indirect golang.org/x/text v0.31.0 // indirect
github.com/Abirdcfly/dupword v0.1.3 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20251111163417-95abcf5c77ba // indirect
github.com/Antonboom/errname v1.1.0 // indirect
github.com/Antonboom/nilnil v1.1.0 // indirect
github.com/Antonboom/testifylint v1.6.1 // indirect
github.com/BurntSushi/toml v1.5.0 // indirect
github.com/Djarvur/go-err113 v0.0.0-20210108212216-aea10b59be24 // indirect
github.com/GaijinEntertainment/go-exhaustruct/v3 v3.3.1 // indirect
github.com/Masterminds/semver/v3 v3.3.1 // indirect
github.com/OpenPeeDeeP/depguard/v2 v2.2.1 // indirect
github.com/alecthomas/chroma/v2 v2.17.2 // indirect
github.com/alecthomas/go-check-sumtype v0.3.1 // indirect
github.com/alexkohler/nakedret/v2 v2.0.6 // indirect
github.com/alexkohler/prealloc v1.0.0 // indirect
github.com/alingse/asasalint v0.0.11 // indirect
github.com/alingse/nilnesserr v0.2.0 // indirect
github.com/ashanbrown/forbidigo v1.6.0 // indirect
github.com/ashanbrown/makezero v1.2.0 // indirect
github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bkielbasa/cyclop v1.2.3 // indirect
github.com/blizzy78/varnamelen v0.8.0 // indirect
github.com/bombsimon/wsl/v4 v4.7.0 // indirect
github.com/breml/bidichk v0.3.3 // indirect
github.com/breml/errchkjson v0.4.1 // indirect
github.com/butuzov/ireturn v0.4.0 // indirect
github.com/butuzov/mirror v1.3.0 // indirect
github.com/catenacyber/perfsprint v0.9.1 // indirect
github.com/ccojocar/zxcvbn-go v1.0.2 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/charithe/durationcheck v0.0.10 // indirect
github.com/charmbracelet/colorprofile v0.2.3-0.20250311203215-f60798e515dc // indirect
github.com/charmbracelet/lipgloss v1.1.0 // indirect
github.com/charmbracelet/x/ansi v0.8.0 // indirect
github.com/charmbracelet/x/cellbuf v0.0.13-0.20250311204145-2c3ea96c31dd // indirect
github.com/charmbracelet/x/term v0.2.1 // indirect
github.com/chavacava/garif v0.1.0 // indirect
github.com/ckaznocha/intrange v0.3.1 // indirect
github.com/curioswitch/go-reassign v0.3.0 // indirect
github.com/daixiang0/gci v0.13.6 // indirect
github.com/dave/dst v0.27.3 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/denis-tingaikin/go-header v0.5.0 // indirect
github.com/dlclark/regexp2 v1.11.5 // indirect
github.com/ettle/strcase v0.2.0 // indirect
github.com/fatih/color v1.18.0 // indirect
github.com/fatih/structtag v1.2.0 // indirect
github.com/firefart/nonamedreturns v1.0.6 // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/fzipp/gocyclo v0.6.0 // indirect
github.com/ghostiam/protogetter v0.3.15 // indirect
github.com/go-critic/go-critic v0.13.0 // indirect
github.com/go-toolsmith/astcast v1.1.0 // indirect
github.com/go-toolsmith/astcopy v1.1.0 // indirect
github.com/go-toolsmith/astequal v1.2.0 // indirect
github.com/go-toolsmith/astfmt v1.1.0 // indirect
github.com/go-toolsmith/astp v1.1.0 // indirect
github.com/go-toolsmith/strparse v1.1.0 // indirect
github.com/go-toolsmith/typep v1.1.0 // indirect
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
github.com/go-xmlfmt/xmlfmt v1.1.3 // indirect
github.com/gobwas/glob v0.2.3 // indirect
github.com/gofrs/flock v0.12.1 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golangci/dupl v0.0.0-20250308024227-f665c8d69b32 // indirect
github.com/golangci/go-printf-func-name v0.1.0 // indirect
github.com/golangci/gofmt v0.0.0-20250106114630-d62b90e6713d // indirect
github.com/golangci/golangci-lint/v2 v2.1.6 // indirect
github.com/golangci/golines v0.0.0-20250217134842-442fd0091d95 // indirect
github.com/golangci/misspell v0.6.0 // indirect
github.com/golangci/plugin-module-register v0.1.1 // indirect
github.com/golangci/revgrep v0.8.0 // indirect
github.com/golangci/unconvert v0.0.0-20250410112200-a129a6e6413e // indirect
github.com/google/go-cmp v0.7.0 // indirect
github.com/gordonklaus/ineffassign v0.1.0 // indirect
github.com/gostaticanalysis/analysisutil v0.7.1 // indirect
github.com/gostaticanalysis/comment v1.5.0 // indirect
github.com/gostaticanalysis/forcetypeassert v0.2.0 // indirect
github.com/gostaticanalysis/nilerr v0.1.1 // indirect
github.com/hashicorp/go-immutable-radix/v2 v2.1.0 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hexops/gotextdiff v1.0.3 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jgautheron/goconst v1.8.1 // indirect
github.com/jingyugao/rowserrcheck v1.1.1 // indirect
github.com/jjti/go-spancheck v0.6.4 // indirect
github.com/julz/importas v0.2.0 // indirect
github.com/karamaru-alpha/copyloopvar v1.2.1 // indirect
github.com/kisielk/errcheck v1.9.0 // indirect
github.com/kkHAIKE/contextcheck v1.1.6 // indirect
github.com/kulti/thelper v0.6.3 // indirect
github.com/kunwardeep/paralleltest v1.0.14 // indirect
github.com/lasiar/canonicalheader v1.1.2 // indirect
github.com/ldez/exptostd v0.4.3 // indirect
github.com/ldez/gomoddirectives v0.6.1 // indirect
github.com/ldez/grignotin v0.9.0 // indirect
github.com/ldez/tagliatelle v0.7.1 // indirect
github.com/ldez/usetesting v0.4.3 // indirect
github.com/leonklingele/grouper v1.1.2 // indirect
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
github.com/macabu/inamedparam v0.2.0 // indirect
github.com/magiconair/properties v1.8.6 // indirect
github.com/manuelarte/funcorder v0.2.1 // indirect
github.com/maratori/testableexamples v1.0.0 // indirect
github.com/maratori/testpackage v1.1.1 // indirect
github.com/matoous/godox v1.1.0 // indirect
github.com/mattn/go-colorable v0.1.14 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mattn/go-runewidth v0.0.16 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/mgechev/revive v1.9.0 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/moricho/tparallel v0.3.2 // indirect
github.com/muesli/termenv v0.16.0 // indirect
github.com/nakabonne/nestif v0.3.1 // indirect
github.com/nishanths/exhaustive v0.12.0 // indirect
github.com/nishanths/predeclared v0.2.2 // indirect
github.com/nunnatsa/ginkgolinter v0.19.1 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.2.4 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/polyfloyd/go-errorlint v1.8.0 // indirect
github.com/prometheus/client_golang v1.12.1 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/quasilyte/go-ruleguard v0.4.4 // indirect
github.com/quasilyte/go-ruleguard/dsl v0.3.22 // indirect
github.com/quasilyte/gogrep v0.5.0 // indirect
github.com/quasilyte/regex/syntax v0.0.0-20210819130434-b3f0c404a727 // indirect
github.com/quasilyte/stdinfo v0.0.0-20220114132959-f7386bf02567 // indirect
github.com/raeperd/recvcheck v0.2.0 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/rogpeppe/go-internal v1.14.1 // indirect
github.com/ryancurrah/gomodguard v1.4.1 // indirect
github.com/ryanrolds/sqlclosecheck v0.5.1 // indirect
github.com/sanposhiho/wastedassign/v2 v2.1.0 // indirect
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1 // indirect
github.com/sashamelentyev/interfacebloat v1.1.0 // indirect
github.com/sashamelentyev/usestdlibvars v1.28.0 // indirect
github.com/securego/gosec/v2 v2.22.3 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/sivchari/containedctx v1.0.3 // indirect
github.com/sonatard/noctx v0.1.0 // indirect
github.com/sourcegraph/go-diff v0.7.0 // indirect
github.com/spf13/afero v1.14.0 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/cobra v1.9.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.6 // indirect
github.com/spf13/viper v1.12.0 // indirect
github.com/ssgreg/nlreturn/v2 v2.2.1 // indirect
github.com/stbenjam/no-sprintf-host-port v0.2.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/stretchr/testify v1.10.0 // indirect
github.com/subosito/gotenv v1.4.1 // indirect
github.com/tdakkota/asciicheck v0.4.1 // indirect
github.com/tetafro/godot v1.5.1 // indirect
github.com/timakin/bodyclose v0.0.0-20241222091800-1db5c5ca4d67 // indirect
github.com/timonwong/loggercheck v0.11.0 // indirect
github.com/tomarrell/wrapcheck/v2 v2.11.0 // indirect
github.com/tommy-muehle/go-mnd/v2 v2.5.1 // indirect
github.com/ultraware/funlen v0.2.0 // indirect
github.com/ultraware/whitespace v0.2.0 // indirect
github.com/uudashr/gocognit v1.2.0 // indirect
github.com/uudashr/iface v1.3.1 // indirect
github.com/xen0n/gosmopolitan v1.3.0 // indirect
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect
github.com/yagipy/maintidx v1.0.0 // indirect
github.com/yeya24/promlinter v0.3.0 // indirect
github.com/ykadowak/zerologlint v0.1.5 // indirect
gitlab.com/bosi/decorder v0.4.2 // indirect
go-simpler.org/musttag v0.13.1 // indirect
go-simpler.org/sloglint v0.11.0 // indirect
go.augendre.info/fatcontext v0.8.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/automaxprocs v1.6.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/exp/typeparams v0.0.0-20250210185358-939b2ce775ac // indirect
golang.org/x/mod v0.24.0 // indirect
golang.org/x/net v0.40.0 // indirect
golang.org/x/sync v0.14.0 // indirect
golang.org/x/sys v0.33.0 // indirect
golang.org/x/text v0.25.0 // indirect
golang.org/x/tools v0.32.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250505200425-f936aa4a68b2 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
honnef.co/go/tools v0.6.1 // indirect
mvdan.cc/gofumpt v0.8.0 // indirect
mvdan.cc/unparam v0.0.0-20250301125049-0df0534333a4 // indirect
) )

1055
go.sum

File diff suppressed because it is too large Load Diff

View File

@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT. // Code generated by protoc-gen-go. DO NOT EDIT.
// versions: // versions:
// protoc-gen-go v1.36.6 // protoc-gen-go v1.36.8
// protoc v4.25.3 // protoc v6.32.0
// source: snapshot.proto // source: snapshot.proto
package grpc package grpc
@@ -500,6 +500,112 @@ func (x *DownloadSnapshotDiffRequest) GetOffset() int64 {
return 0 return 0
} }
// Запрос на получение информации о дифе
type GetDiffInfoRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
SnapshotId string `protobuf:"bytes,1,opt,name=snapshot_id,json=snapshotId,proto3" json:"snapshot_id,omitempty"`
LocalParentId string `protobuf:"bytes,2,opt,name=local_parent_id,json=localParentId,proto3" json:"local_parent_id,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *GetDiffInfoRequest) Reset() {
*x = GetDiffInfoRequest{}
mi := &file_snapshot_proto_msgTypes[9]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *GetDiffInfoRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*GetDiffInfoRequest) ProtoMessage() {}
func (x *GetDiffInfoRequest) ProtoReflect() protoreflect.Message {
mi := &file_snapshot_proto_msgTypes[9]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use GetDiffInfoRequest.ProtoReflect.Descriptor instead.
func (*GetDiffInfoRequest) Descriptor() ([]byte, []int) {
return file_snapshot_proto_rawDescGZIP(), []int{9}
}
func (x *GetDiffInfoRequest) GetSnapshotId() string {
if x != nil {
return x.SnapshotId
}
return ""
}
func (x *GetDiffInfoRequest) GetLocalParentId() string {
if x != nil {
return x.LocalParentId
}
return ""
}
// Информация о дифе
type DiffInfo struct {
state protoimpl.MessageState `protogen:"open.v1"`
Sha256Hash string `protobuf:"bytes,1,opt,name=sha256_hash,json=sha256Hash,proto3" json:"sha256_hash,omitempty"`
SizeBytes int64 `protobuf:"varint,2,opt,name=size_bytes,json=sizeBytes,proto3" json:"size_bytes,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *DiffInfo) Reset() {
*x = DiffInfo{}
mi := &file_snapshot_proto_msgTypes[10]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *DiffInfo) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*DiffInfo) ProtoMessage() {}
func (x *DiffInfo) ProtoReflect() protoreflect.Message {
mi := &file_snapshot_proto_msgTypes[10]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use DiffInfo.ProtoReflect.Descriptor instead.
func (*DiffInfo) Descriptor() ([]byte, []int) {
return file_snapshot_proto_rawDescGZIP(), []int{10}
}
func (x *DiffInfo) GetSha256Hash() string {
if x != nil {
return x.Sha256Hash
}
return ""
}
func (x *DiffInfo) GetSizeBytes() int64 {
if x != nil {
return x.SizeBytes
}
return 0
}
var File_snapshot_proto protoreflect.FileDescriptor var File_snapshot_proto protoreflect.FileDescriptor
const file_snapshot_proto_rawDesc = "" + const file_snapshot_proto_rawDesc = "" +
@@ -538,12 +644,22 @@ const file_snapshot_proto_rawDesc = "" +
"\vsnapshot_id\x18\x01 \x01(\tR\n" + "\vsnapshot_id\x18\x01 \x01(\tR\n" +
"snapshotId\x12&\n" + "snapshotId\x12&\n" +
"\x0flocal_parent_id\x18\x02 \x01(\tR\rlocalParentId\x12\x16\n" + "\x0flocal_parent_id\x18\x02 \x01(\tR\rlocalParentId\x12\x16\n" +
"\x06offset\x18\x03 \x01(\x03R\x06offset2\xf1\x03\n" + "\x06offset\x18\x03 \x01(\x03R\x06offset\"]\n" +
"\x12GetDiffInfoRequest\x12\x1f\n" +
"\vsnapshot_id\x18\x01 \x01(\tR\n" +
"snapshotId\x12&\n" +
"\x0flocal_parent_id\x18\x02 \x01(\tR\rlocalParentId\"J\n" +
"\bDiffInfo\x12\x1f\n" +
"\vsha256_hash\x18\x01 \x01(\tR\n" +
"sha256Hash\x12\x1d\n" +
"\n" +
"size_bytes\x18\x02 \x01(\x03R\tsizeBytes2\xb8\x04\n" +
"\x0fSnapshotService\x12k\n" + "\x0fSnapshotService\x12k\n" +
"\rListSnapshots\x12 .agate.grpc.ListSnapshotsRequest\x1a!.agate.grpc.ListSnapshotsResponse\"\x15\x82\xd3\xe4\x93\x02\x0f\x12\r/v1/snapshots\x12}\n" + "\rListSnapshots\x12 .agate.grpc.ListSnapshotsRequest\x1a!.agate.grpc.ListSnapshotsResponse\"\x15\x82\xd3\xe4\x93\x02\x0f\x12\r/v1/snapshots\x12}\n" +
"\x12GetSnapshotDetails\x12%.agate.grpc.GetSnapshotDetailsRequest\x1a\x1b.agate.grpc.SnapshotDetails\"#\x82\xd3\xe4\x93\x02\x1d\x12\x1b/v1/snapshots/{snapshot_id}\x12\x8a\x01\n" + "\x12GetSnapshotDetails\x12%.agate.grpc.GetSnapshotDetailsRequest\x1a\x1b.agate.grpc.SnapshotDetails\"#\x82\xd3\xe4\x93\x02\x1d\x12\x1b/v1/snapshots/{snapshot_id}\x12\x8a\x01\n" +
"\fDownloadFile\x12\x1f.agate.grpc.DownloadFileRequest\x1a .agate.grpc.DownloadFileResponse\"5\x82\xd3\xe4\x93\x02/\x12-/v1/snapshots/{snapshot_id}/files/{file_path}0\x01\x12e\n" + "\fDownloadFile\x12\x1f.agate.grpc.DownloadFileRequest\x1a .agate.grpc.DownloadFileResponse\"5\x82\xd3\xe4\x93\x02/\x12-/v1/snapshots/{snapshot_id}/files/{file_path}0\x01\x12e\n" +
"\x14DownloadSnapshotDiff\x12'.agate.grpc.DownloadSnapshotDiffRequest\x1a .agate.grpc.DownloadFileResponse\"\x000\x01B\"Z gitea.unprism.ru/KRBL/Agate/grpcb\x06proto3" "\x14DownloadSnapshotDiff\x12'.agate.grpc.DownloadSnapshotDiffRequest\x1a .agate.grpc.DownloadFileResponse\"\x000\x01\x12E\n" +
"\vGetDiffInfo\x12\x1e.agate.grpc.GetDiffInfoRequest\x1a\x14.agate.grpc.DiffInfo\"\x00B\"Z gitea.unprism.ru/KRBL/Agate/grpcb\x06proto3"
var ( var (
file_snapshot_proto_rawDescOnce sync.Once file_snapshot_proto_rawDescOnce sync.Once
@@ -557,7 +673,7 @@ func file_snapshot_proto_rawDescGZIP() []byte {
return file_snapshot_proto_rawDescData return file_snapshot_proto_rawDescData
} }
var file_snapshot_proto_msgTypes = make([]protoimpl.MessageInfo, 9) var file_snapshot_proto_msgTypes = make([]protoimpl.MessageInfo, 11)
var file_snapshot_proto_goTypes = []any{ var file_snapshot_proto_goTypes = []any{
(*FileInfo)(nil), // 0: agate.grpc.FileInfo (*FileInfo)(nil), // 0: agate.grpc.FileInfo
(*SnapshotInfo)(nil), // 1: agate.grpc.SnapshotInfo (*SnapshotInfo)(nil), // 1: agate.grpc.SnapshotInfo
@@ -568,10 +684,12 @@ var file_snapshot_proto_goTypes = []any{
(*DownloadFileRequest)(nil), // 6: agate.grpc.DownloadFileRequest (*DownloadFileRequest)(nil), // 6: agate.grpc.DownloadFileRequest
(*DownloadFileResponse)(nil), // 7: agate.grpc.DownloadFileResponse (*DownloadFileResponse)(nil), // 7: agate.grpc.DownloadFileResponse
(*DownloadSnapshotDiffRequest)(nil), // 8: agate.grpc.DownloadSnapshotDiffRequest (*DownloadSnapshotDiffRequest)(nil), // 8: agate.grpc.DownloadSnapshotDiffRequest
(*timestamppb.Timestamp)(nil), // 9: google.protobuf.Timestamp (*GetDiffInfoRequest)(nil), // 9: agate.grpc.GetDiffInfoRequest
(*DiffInfo)(nil), // 10: agate.grpc.DiffInfo
(*timestamppb.Timestamp)(nil), // 11: google.protobuf.Timestamp
} }
var file_snapshot_proto_depIdxs = []int32{ var file_snapshot_proto_depIdxs = []int32{
9, // 0: agate.grpc.SnapshotInfo.creation_time:type_name -> google.protobuf.Timestamp 11, // 0: agate.grpc.SnapshotInfo.creation_time:type_name -> google.protobuf.Timestamp
1, // 1: agate.grpc.SnapshotDetails.info:type_name -> agate.grpc.SnapshotInfo 1, // 1: agate.grpc.SnapshotDetails.info:type_name -> agate.grpc.SnapshotInfo
0, // 2: agate.grpc.SnapshotDetails.files:type_name -> agate.grpc.FileInfo 0, // 2: agate.grpc.SnapshotDetails.files:type_name -> agate.grpc.FileInfo
1, // 3: agate.grpc.ListSnapshotsResponse.snapshots:type_name -> agate.grpc.SnapshotInfo 1, // 3: agate.grpc.ListSnapshotsResponse.snapshots:type_name -> agate.grpc.SnapshotInfo
@@ -579,12 +697,14 @@ var file_snapshot_proto_depIdxs = []int32{
5, // 5: agate.grpc.SnapshotService.GetSnapshotDetails:input_type -> agate.grpc.GetSnapshotDetailsRequest 5, // 5: agate.grpc.SnapshotService.GetSnapshotDetails:input_type -> agate.grpc.GetSnapshotDetailsRequest
6, // 6: agate.grpc.SnapshotService.DownloadFile:input_type -> agate.grpc.DownloadFileRequest 6, // 6: agate.grpc.SnapshotService.DownloadFile:input_type -> agate.grpc.DownloadFileRequest
8, // 7: agate.grpc.SnapshotService.DownloadSnapshotDiff:input_type -> agate.grpc.DownloadSnapshotDiffRequest 8, // 7: agate.grpc.SnapshotService.DownloadSnapshotDiff:input_type -> agate.grpc.DownloadSnapshotDiffRequest
4, // 8: agate.grpc.SnapshotService.ListSnapshots:output_type -> agate.grpc.ListSnapshotsResponse 9, // 8: agate.grpc.SnapshotService.GetDiffInfo:input_type -> agate.grpc.GetDiffInfoRequest
2, // 9: agate.grpc.SnapshotService.GetSnapshotDetails:output_type -> agate.grpc.SnapshotDetails 4, // 9: agate.grpc.SnapshotService.ListSnapshots:output_type -> agate.grpc.ListSnapshotsResponse
7, // 10: agate.grpc.SnapshotService.DownloadFile:output_type -> agate.grpc.DownloadFileResponse 2, // 10: agate.grpc.SnapshotService.GetSnapshotDetails:output_type -> agate.grpc.SnapshotDetails
7, // 11: agate.grpc.SnapshotService.DownloadSnapshotDiff:output_type -> agate.grpc.DownloadFileResponse 7, // 11: agate.grpc.SnapshotService.DownloadFile:output_type -> agate.grpc.DownloadFileResponse
8, // [8:12] is the sub-list for method output_type 7, // 12: agate.grpc.SnapshotService.DownloadSnapshotDiff:output_type -> agate.grpc.DownloadFileResponse
4, // [4:8] is the sub-list for method input_type 10, // 13: agate.grpc.SnapshotService.GetDiffInfo:output_type -> agate.grpc.DiffInfo
9, // [9:14] is the sub-list for method output_type
4, // [4:9] is the sub-list for method input_type
4, // [4:4] is the sub-list for extension type_name 4, // [4:4] is the sub-list for extension type_name
4, // [4:4] is the sub-list for extension extendee 4, // [4:4] is the sub-list for extension extendee
0, // [0:4] is the sub-list for field type_name 0, // [0:4] is the sub-list for field type_name
@@ -601,7 +721,7 @@ func file_snapshot_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(), GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_snapshot_proto_rawDesc), len(file_snapshot_proto_rawDesc)), RawDescriptor: unsafe.Slice(unsafe.StringData(file_snapshot_proto_rawDesc), len(file_snapshot_proto_rawDesc)),
NumEnums: 0, NumEnums: 0,
NumMessages: 9, NumMessages: 11,
NumExtensions: 0, NumExtensions: 0,
NumServices: 1, NumServices: 1,
}, },

View File

@@ -40,7 +40,9 @@ func request_SnapshotService_ListSnapshots_0(ctx context.Context, marshaler runt
protoReq ListSnapshotsRequest protoReq ListSnapshotsRequest
metadata runtime.ServerMetadata metadata runtime.ServerMetadata
) )
io.Copy(io.Discard, req.Body) if req.Body != nil {
_, _ = io.Copy(io.Discard, req.Body)
}
msg, err := client.ListSnapshots(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD)) msg, err := client.ListSnapshots(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return msg, metadata, err return msg, metadata, err
} }
@@ -60,7 +62,9 @@ func request_SnapshotService_GetSnapshotDetails_0(ctx context.Context, marshaler
metadata runtime.ServerMetadata metadata runtime.ServerMetadata
err error err error
) )
io.Copy(io.Discard, req.Body) if req.Body != nil {
_, _ = io.Copy(io.Discard, req.Body)
}
val, ok := pathParams["snapshot_id"] val, ok := pathParams["snapshot_id"]
if !ok { if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "snapshot_id") return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "snapshot_id")
@@ -97,7 +101,9 @@ func request_SnapshotService_DownloadFile_0(ctx context.Context, marshaler runti
metadata runtime.ServerMetadata metadata runtime.ServerMetadata
err error err error
) )
io.Copy(io.Discard, req.Body) if req.Body != nil {
_, _ = io.Copy(io.Discard, req.Body)
}
val, ok := pathParams["snapshot_id"] val, ok := pathParams["snapshot_id"]
if !ok { if !ok {
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "snapshot_id") return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "snapshot_id")

View File

@@ -32,6 +32,9 @@ service SnapshotService {
// Скачать архив, содержащий только разницу между двумя снапшотами // Скачать архив, содержащий только разницу между двумя снапшотами
rpc DownloadSnapshotDiff(DownloadSnapshotDiffRequest) returns (stream DownloadFileResponse) {} rpc DownloadSnapshotDiff(DownloadSnapshotDiffRequest) returns (stream DownloadFileResponse) {}
// Получить информацию о дифе (хеш и размер)
rpc GetDiffInfo(GetDiffInfoRequest) returns (DiffInfo) {}
} }
// Метаданные файла внутри снапшота // Метаданные файла внутри снапшота
@@ -86,3 +89,15 @@ message DownloadSnapshotDiffRequest {
string local_parent_id = 2; // ID снапшота, который уже есть у клиента string local_parent_id = 2; // ID снапшота, который уже есть у клиента
int64 offset = 3; // Смещение в байтах для докачки int64 offset = 3; // Смещение в байтах для докачки
} }
// Запрос на получение информации о дифе
message GetDiffInfoRequest {
string snapshot_id = 1;
string local_parent_id = 2;
}
// Информация о дифе
message DiffInfo {
string sha256_hash = 1;
int64 size_bytes = 2;
}

View File

@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT. // Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions: // versions:
// - protoc-gen-go-grpc v1.5.1 // - protoc-gen-go-grpc v1.5.1
// - protoc v4.25.3 // - protoc v6.32.0
// source: snapshot.proto // source: snapshot.proto
package grpc package grpc
@@ -23,6 +23,7 @@ const (
SnapshotService_GetSnapshotDetails_FullMethodName = "/agate.grpc.SnapshotService/GetSnapshotDetails" SnapshotService_GetSnapshotDetails_FullMethodName = "/agate.grpc.SnapshotService/GetSnapshotDetails"
SnapshotService_DownloadFile_FullMethodName = "/agate.grpc.SnapshotService/DownloadFile" SnapshotService_DownloadFile_FullMethodName = "/agate.grpc.SnapshotService/DownloadFile"
SnapshotService_DownloadSnapshotDiff_FullMethodName = "/agate.grpc.SnapshotService/DownloadSnapshotDiff" SnapshotService_DownloadSnapshotDiff_FullMethodName = "/agate.grpc.SnapshotService/DownloadSnapshotDiff"
SnapshotService_GetDiffInfo_FullMethodName = "/agate.grpc.SnapshotService/GetDiffInfo"
) )
// SnapshotServiceClient is the client API for SnapshotService service. // SnapshotServiceClient is the client API for SnapshotService service.
@@ -39,6 +40,8 @@ type SnapshotServiceClient interface {
DownloadFile(ctx context.Context, in *DownloadFileRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[DownloadFileResponse], error) DownloadFile(ctx context.Context, in *DownloadFileRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[DownloadFileResponse], error)
// Скачать архив, содержащий только разницу между двумя снапшотами // Скачать архив, содержащий только разницу между двумя снапшотами
DownloadSnapshotDiff(ctx context.Context, in *DownloadSnapshotDiffRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[DownloadFileResponse], error) DownloadSnapshotDiff(ctx context.Context, in *DownloadSnapshotDiffRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[DownloadFileResponse], error)
// Получить информацию о дифе (хеш и размер)
GetDiffInfo(ctx context.Context, in *GetDiffInfoRequest, opts ...grpc.CallOption) (*DiffInfo, error)
} }
type snapshotServiceClient struct { type snapshotServiceClient struct {
@@ -107,6 +110,16 @@ func (c *snapshotServiceClient) DownloadSnapshotDiff(ctx context.Context, in *Do
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type SnapshotService_DownloadSnapshotDiffClient = grpc.ServerStreamingClient[DownloadFileResponse] type SnapshotService_DownloadSnapshotDiffClient = grpc.ServerStreamingClient[DownloadFileResponse]
func (c *snapshotServiceClient) GetDiffInfo(ctx context.Context, in *GetDiffInfoRequest, opts ...grpc.CallOption) (*DiffInfo, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(DiffInfo)
err := c.cc.Invoke(ctx, SnapshotService_GetDiffInfo_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
// SnapshotServiceServer is the server API for SnapshotService service. // SnapshotServiceServer is the server API for SnapshotService service.
// All implementations must embed UnimplementedSnapshotServiceServer // All implementations must embed UnimplementedSnapshotServiceServer
// for forward compatibility. // for forward compatibility.
@@ -121,6 +134,8 @@ type SnapshotServiceServer interface {
DownloadFile(*DownloadFileRequest, grpc.ServerStreamingServer[DownloadFileResponse]) error DownloadFile(*DownloadFileRequest, grpc.ServerStreamingServer[DownloadFileResponse]) error
// Скачать архив, содержащий только разницу между двумя снапшотами // Скачать архив, содержащий только разницу между двумя снапшотами
DownloadSnapshotDiff(*DownloadSnapshotDiffRequest, grpc.ServerStreamingServer[DownloadFileResponse]) error DownloadSnapshotDiff(*DownloadSnapshotDiffRequest, grpc.ServerStreamingServer[DownloadFileResponse]) error
// Получить информацию о дифе (хеш и размер)
GetDiffInfo(context.Context, *GetDiffInfoRequest) (*DiffInfo, error)
mustEmbedUnimplementedSnapshotServiceServer() mustEmbedUnimplementedSnapshotServiceServer()
} }
@@ -143,6 +158,9 @@ func (UnimplementedSnapshotServiceServer) DownloadFile(*DownloadFileRequest, grp
func (UnimplementedSnapshotServiceServer) DownloadSnapshotDiff(*DownloadSnapshotDiffRequest, grpc.ServerStreamingServer[DownloadFileResponse]) error { func (UnimplementedSnapshotServiceServer) DownloadSnapshotDiff(*DownloadSnapshotDiffRequest, grpc.ServerStreamingServer[DownloadFileResponse]) error {
return status.Errorf(codes.Unimplemented, "method DownloadSnapshotDiff not implemented") return status.Errorf(codes.Unimplemented, "method DownloadSnapshotDiff not implemented")
} }
func (UnimplementedSnapshotServiceServer) GetDiffInfo(context.Context, *GetDiffInfoRequest) (*DiffInfo, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetDiffInfo not implemented")
}
func (UnimplementedSnapshotServiceServer) mustEmbedUnimplementedSnapshotServiceServer() {} func (UnimplementedSnapshotServiceServer) mustEmbedUnimplementedSnapshotServiceServer() {}
func (UnimplementedSnapshotServiceServer) testEmbeddedByValue() {} func (UnimplementedSnapshotServiceServer) testEmbeddedByValue() {}
@@ -222,6 +240,24 @@ func _SnapshotService_DownloadSnapshotDiff_Handler(srv interface{}, stream grpc.
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type SnapshotService_DownloadSnapshotDiffServer = grpc.ServerStreamingServer[DownloadFileResponse] type SnapshotService_DownloadSnapshotDiffServer = grpc.ServerStreamingServer[DownloadFileResponse]
func _SnapshotService_GetDiffInfo_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetDiffInfoRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(SnapshotServiceServer).GetDiffInfo(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: SnapshotService_GetDiffInfo_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(SnapshotServiceServer).GetDiffInfo(ctx, req.(*GetDiffInfoRequest))
}
return interceptor(ctx, in, info, handler)
}
// SnapshotService_ServiceDesc is the grpc.ServiceDesc for SnapshotService service. // SnapshotService_ServiceDesc is the grpc.ServiceDesc for SnapshotService service.
// It's only intended for direct use with grpc.RegisterService, // It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy) // and not to be introspected or modified (even as a copy)
@@ -237,6 +273,10 @@ var SnapshotService_ServiceDesc = grpc.ServiceDesc{
MethodName: "GetSnapshotDetails", MethodName: "GetSnapshotDetails",
Handler: _SnapshotService_GetSnapshotDetails_Handler, Handler: _SnapshotService_GetSnapshotDetails_Handler,
}, },
{
MethodName: "GetDiffInfo",
Handler: _SnapshotService_GetDiffInfo_Handler,
},
}, },
Streams: []grpc.StreamDesc{ Streams: []grpc.StreamDesc{
{ {

View File

@@ -13,6 +13,15 @@ type SnapshotManager interface {
// Returns the created Snapshot with its metadata or an error if the process fails. // Returns the created Snapshot with its metadata or an error if the process fails.
CreateSnapshot(ctx context.Context, sourceDir string, name string, parentID string) (*store.Snapshot, error) CreateSnapshot(ctx context.Context, sourceDir string, name string, parentID string) (*store.Snapshot, error)
// CreateSnapshotAsync initiates a background process to create a snapshot.
// Returns the job ID (which is also the snapshot ID) or an error if the process couldn't start.
// onStart is called in the background goroutine before the snapshot creation starts.
// onFinish is called in the background goroutine after the snapshot creation finishes (successfully or with error).
CreateSnapshotAsync(ctx context.Context, sourceDir string, name string, parentID string, onStart func(), onFinish func(string, error)) (string, error)
// GetSnapshotStatus retrieves the status of an asynchronous snapshot creation job.
GetSnapshotStatus(ctx context.Context, jobID string) (*store.SnapshotStatus, error)
// GetSnapshotDetails retrieves detailed metadata for a specific snapshot identified by its unique snapshotID. // GetSnapshotDetails retrieves detailed metadata for a specific snapshot identified by its unique snapshotID.
// Returns a Snapshot object containing metadata // Returns a Snapshot object containing metadata
GetSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error) GetSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error)
@@ -38,6 +47,9 @@ type SnapshotManager interface {
// It returns an io.ReadCloser for the archive stream and an error. // It returns an io.ReadCloser for the archive stream and an error.
// The caller is responsible for closing the reader, which will also handle cleanup of temporary resources. // The caller is responsible for closing the reader, which will also handle cleanup of temporary resources.
StreamSnapshotDiff(ctx context.Context, snapshotID, parentID string, offset int64) (io.ReadCloser, error) StreamSnapshotDiff(ctx context.Context, snapshotID, parentID string, offset int64) (io.ReadCloser, error)
// GetSnapshotDiffInfo calculates the hash and size of a differential archive between two snapshots.
GetSnapshotDiffInfo(ctx context.Context, snapshotID, parentID string) (*store.DiffInfo, error)
} }
// SnapshotServer defines the interface for a server that can share snapshots // SnapshotServer defines the interface for a server that can share snapshots

View File

@@ -11,8 +11,10 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"sync"
"time" "time"
"gitea.unprism.ru/KRBL/Agate/models"
"github.com/google/uuid" "github.com/google/uuid"
"gitea.unprism.ru/KRBL/Agate/archive" "gitea.unprism.ru/KRBL/Agate/archive"
@@ -25,6 +27,8 @@ type SnapshotManagerData struct {
metadataStore store.MetadataStore metadataStore store.MetadataStore
blobStore store.BlobStore blobStore store.BlobStore
logger *log.Logger logger *log.Logger
jobs map[string]*store.SnapshotStatus
jobsMutex sync.RWMutex
} }
func CreateSnapshotManager(metadataStore store.MetadataStore, blobStore store.BlobStore, logger *log.Logger) (interfaces.SnapshotManager, error) { func CreateSnapshotManager(metadataStore store.MetadataStore, blobStore store.BlobStore, logger *log.Logger) (interfaces.SnapshotManager, error) {
@@ -41,6 +45,7 @@ func CreateSnapshotManager(metadataStore store.MetadataStore, blobStore store.Bl
metadataStore: metadataStore, metadataStore: metadataStore,
blobStore: blobStore, blobStore: blobStore,
logger: logger, logger: logger,
jobs: make(map[string]*store.SnapshotStatus),
}, nil }, nil
} }
@@ -49,13 +54,13 @@ func (data *SnapshotManagerData) CreateSnapshot(ctx context.Context, sourceDir s
info, err := os.Stat(sourceDir) info, err := os.Stat(sourceDir)
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
return nil, ErrSourceNotFound return nil, models.ErrSourceNotFound
} }
return nil, fmt.Errorf("failed to access source directory: %w", err) return nil, fmt.Errorf("failed to access source directory: %w", err)
} }
if !info.IsDir() { if !info.IsDir() {
return nil, ErrSourceNotDirectory return nil, models.ErrSourceNotDirectory
} }
// Check if parent exists if specified // Check if parent exists if specified
@@ -70,6 +75,98 @@ func (data *SnapshotManagerData) CreateSnapshot(ctx context.Context, sourceDir s
// Generate a unique ID for the snapshot // Generate a unique ID for the snapshot
snapshotID := uuid.New().String() snapshotID := uuid.New().String()
return data.createSnapshotInternal(ctx, sourceDir, name, parentID, snapshotID, nil)
}
func (data *SnapshotManagerData) CreateSnapshotAsync(ctx context.Context, sourceDir string, name string, parentID string, onStart func(), onFinish func(string, error)) (string, error) {
// Validate source directory
info, err := os.Stat(sourceDir)
if err != nil {
if os.IsNotExist(err) {
return "", models.ErrSourceNotFound
}
return "", fmt.Errorf("failed to access source directory: %w", err)
}
if !info.IsDir() {
return "", models.ErrSourceNotDirectory
}
// Check if parent exists if specified
if parentID != "" {
_, err := data.metadataStore.GetSnapshotMetadata(ctx, parentID)
if err != nil {
fmt.Println("failed to check parent snapshot: %w", err)
parentID = ""
}
}
snapshotID := uuid.New().String()
data.jobsMutex.Lock()
data.jobs[snapshotID] = &store.SnapshotStatus{
ID: snapshotID,
Status: "pending",
Progress: 0,
}
data.jobsMutex.Unlock()
go func() {
if onStart != nil {
onStart()
}
data.jobsMutex.Lock()
if job, ok := data.jobs[snapshotID]; ok {
job.Status = "running"
}
data.jobsMutex.Unlock()
_, err := data.createSnapshotInternal(context.Background(), sourceDir, name, parentID, snapshotID, func(current, total int64) {
data.jobsMutex.Lock()
defer data.jobsMutex.Unlock()
if job, ok := data.jobs[snapshotID]; ok {
if total > 0 {
job.Progress = float64(current) / float64(total)
}
}
})
data.jobsMutex.Lock()
if job, ok := data.jobs[snapshotID]; ok {
if err != nil {
job.Status = "failed"
job.Error = err.Error()
} else {
job.Status = "done"
job.Progress = 1.0
}
}
data.jobsMutex.Unlock()
if onFinish != nil {
onFinish(snapshotID, err)
}
}()
return snapshotID, nil
}
func (data *SnapshotManagerData) GetSnapshotStatus(ctx context.Context, jobID string) (*store.SnapshotStatus, error) {
data.jobsMutex.RLock()
defer data.jobsMutex.RUnlock()
job, ok := data.jobs[jobID]
if !ok {
return nil, models.ErrNotFound
}
// Return a copy to avoid race conditions
statusCopy := *job
return &statusCopy, nil
}
func (data *SnapshotManagerData) createSnapshotInternal(ctx context.Context, sourceDir, name, parentID, snapshotID string, onProgress func(current, total int64)) (*store.Snapshot, error) {
// Create a temporary file for the archive in the working directory // Create a temporary file for the archive in the working directory
tempFilePath := filepath.Join(data.blobStore.GetBaseDir(), "temp-"+snapshotID+".zip") tempFilePath := filepath.Join(data.blobStore.GetBaseDir(), "temp-"+snapshotID+".zip")
tempFile, err := os.Create(tempFilePath) tempFile, err := os.Create(tempFilePath)
@@ -80,7 +177,7 @@ func (data *SnapshotManagerData) CreateSnapshot(ctx context.Context, sourceDir s
defer os.Remove(tempFilePath) // Clean up temp file after we're done defer os.Remove(tempFilePath) // Clean up temp file after we're done
// Create archive of the source directory // Create archive of the source directory
if err := archive.CreateArchive(sourceDir, tempFilePath); err != nil { if err := archive.CreateArchiveWithProgress(sourceDir, tempFilePath, onProgress); err != nil {
return nil, fmt.Errorf("failed to create archive: %w", err) return nil, fmt.Errorf("failed to create archive: %w", err)
} }
@@ -165,8 +262,8 @@ func (data *SnapshotManagerData) GetSnapshotDetails(ctx context.Context, snapsho
// Retrieve snapshot metadata from the store // Retrieve snapshot metadata from the store
snapshot, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID) snapshot, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID)
if err != nil { if err != nil {
if errors.Is(err, ErrNotFound) { if errors.Is(err, models.ErrNotFound) {
return nil, ErrNotFound return nil, models.ErrNotFound
} }
return nil, fmt.Errorf("failed to retrieve snapshot details: %w", err) return nil, fmt.Errorf("failed to retrieve snapshot details: %w", err)
} }
@@ -191,7 +288,7 @@ func (data *SnapshotManagerData) DeleteSnapshot(ctx context.Context, snapshotID
snapshot, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID) snapshot, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID)
if err != nil { if err != nil {
if errors.Is(err, store.ErrNotFound) { if errors.Is(err, models.ErrNotFound) {
return nil return nil
} }
return fmt.Errorf("failed to check if snapshot exists: %w", err) return fmt.Errorf("failed to check if snapshot exists: %w", err)
@@ -237,8 +334,8 @@ func (data *SnapshotManagerData) OpenFile(ctx context.Context, snapshotID string
// First check if the snapshot exists // First check if the snapshot exists
_, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID) _, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID)
if err != nil { if err != nil {
if errors.Is(err, ErrNotFound) { if errors.Is(err, models.ErrNotFound) {
return nil, ErrNotFound return nil, models.ErrNotFound
} }
return nil, fmt.Errorf("failed to check if snapshot exists: %w", err) return nil, fmt.Errorf("failed to check if snapshot exists: %w", err)
} }
@@ -261,7 +358,7 @@ func (data *SnapshotManagerData) OpenFile(ctx context.Context, snapshotID string
err := archive.ExtractFileFromArchive(blobPath, filePath, pw) err := archive.ExtractFileFromArchive(blobPath, filePath, pw)
if err != nil { if err != nil {
if errors.Is(err, archive.ErrFileNotFoundInArchive) { if errors.Is(err, archive.ErrFileNotFoundInArchive) {
pw.CloseWithError(ErrFileNotFound) pw.CloseWithError(models.ErrFileNotFound)
return return
} }
pw.CloseWithError(fmt.Errorf("failed to extract file from archive: %w", err)) pw.CloseWithError(fmt.Errorf("failed to extract file from archive: %w", err))
@@ -284,8 +381,8 @@ func (data *SnapshotManagerData) ExtractSnapshot(ctx context.Context, snapshotID
// First check if the snapshot exists // First check if the snapshot exists
_, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID) _, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID)
if err != nil { if err != nil {
if errors.Is(err, ErrNotFound) { if errors.Is(err, models.ErrNotFound) {
return ErrNotFound return models.ErrNotFound
} }
return fmt.Errorf("failed to check if snapshot exists: %w", err) return fmt.Errorf("failed to check if snapshot exists: %w", err)
} }
@@ -388,8 +485,8 @@ func (data *SnapshotManagerData) UpdateSnapshotMetadata(ctx context.Context, sna
snapshot, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID) snapshot, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID)
if err != nil { if err != nil {
if errors.Is(err, store.ErrNotFound) { if errors.Is(err, models.ErrNotFound) {
return ErrNotFound return models.ErrNotFound
} }
return fmt.Errorf("failed to get snapshot metadata: %w", err) return fmt.Errorf("failed to get snapshot metadata: %w", err)
} }
@@ -403,6 +500,37 @@ func (data *SnapshotManagerData) UpdateSnapshotMetadata(ctx context.Context, sna
return nil return nil
} }
func (data *SnapshotManagerData) GetSnapshotDiffInfo(ctx context.Context, snapshotID, parentID string) (*store.DiffInfo, error) {
tempArchivePath, tempStagingDir, err := data.createDiffArchive(ctx, snapshotID, parentID)
if err != nil {
return nil, fmt.Errorf("failed to create diff archive for info: %w", err)
}
if tempArchivePath == "" {
return &store.DiffInfo{SHA256: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", Size: 0}, nil // sha256 of empty string
}
defer os.Remove(tempArchivePath)
if tempStagingDir != "" {
defer os.RemoveAll(tempStagingDir)
}
hash, err := hash.CalculateFileHash(tempArchivePath)
if err != nil {
return nil, fmt.Errorf("failed to calculate hash for diff archive: %w", err)
}
stat, err := os.Stat(tempArchivePath)
if err != nil {
return nil, fmt.Errorf("failed to get size of diff archive: %w", err)
}
return &store.DiffInfo{
SHA256: hash,
Size: stat.Size(),
}, nil
}
// diffArchiveReader is a wrapper around an *os.File that handles cleanup of temporary files. // diffArchiveReader is a wrapper around an *os.File that handles cleanup of temporary files.
type diffArchiveReader struct { type diffArchiveReader struct {
*os.File *os.File
@@ -413,15 +541,15 @@ type diffArchiveReader struct {
// Close closes the file and removes the temporary archive and staging directory. // Close closes the file and removes the temporary archive and staging directory.
func (r *diffArchiveReader) Close() error { func (r *diffArchiveReader) Close() error {
err := r.File.Close() err := r.File.Close()
os.Remove(r.tempArchive) _ = os.Remove(r.tempArchive)
os.RemoveAll(r.tempStaging) _ = os.RemoveAll(r.tempStaging)
return err return err
} }
func (data *SnapshotManagerData) StreamSnapshotDiff(ctx context.Context, snapshotID, parentID string, offset int64) (io.ReadCloser, error) { func (data *SnapshotManagerData) createDiffArchive(ctx context.Context, snapshotID, parentID string) (string, string, error) {
targetSnap, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID) targetSnap, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to get target snapshot metadata: %w", err) return "", "", fmt.Errorf("failed to get target snapshot metadata: %w", err)
} }
parentFiles := make(map[string]string) parentFiles := make(map[string]string)
@@ -449,51 +577,66 @@ func (data *SnapshotManagerData) StreamSnapshotDiff(ctx context.Context, snapsho
} }
if len(filesToInclude) == 0 { if len(filesToInclude) == 0 {
return io.NopCloser(bytes.NewReader(nil)), nil return "", "", nil
} }
tempStagingDir, err := os.MkdirTemp(data.blobStore.GetBaseDir(), "diff-staging-*") tempStagingDir, err := os.MkdirTemp(data.blobStore.GetBaseDir(), "diff-staging-*")
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create temp staging directory: %w", err) return "", "", fmt.Errorf("failed to create temp staging directory: %w", err)
} }
targetBlobPath, err := data.blobStore.GetBlobPath(ctx, snapshotID) targetBlobPath, err := data.blobStore.GetBlobPath(ctx, snapshotID)
if err != nil { if err != nil {
os.RemoveAll(tempStagingDir) os.RemoveAll(tempStagingDir)
return nil, err return "", "", err
} }
for _, filePath := range filesToInclude { for _, filePath := range filesToInclude {
destPath := filepath.Join(tempStagingDir, filePath) destPath := filepath.Join(tempStagingDir, filePath)
if err := os.MkdirAll(filepath.Dir(destPath), 0755); err != nil { if err := os.MkdirAll(filepath.Dir(destPath), 0755); err != nil {
os.RemoveAll(tempStagingDir) os.RemoveAll(tempStagingDir)
return nil, fmt.Errorf("failed to create dir for diff file: %w", err) return "", "", fmt.Errorf("failed to create dir for diff file: %w", err)
} }
fileWriter, err := os.Create(destPath) fileWriter, err := os.Create(destPath)
if err != nil { if err != nil {
os.RemoveAll(tempStagingDir) os.RemoveAll(tempStagingDir)
return nil, err return "", "", err
} }
err = archive.ExtractFileFromArchive(targetBlobPath, filePath, fileWriter) err = archive.ExtractFileFromArchive(targetBlobPath, filePath, fileWriter)
fileWriter.Close() fileWriter.Close()
if err != nil { if err != nil {
os.RemoveAll(tempStagingDir) os.RemoveAll(tempStagingDir)
return nil, fmt.Errorf("failed to extract file %s for diff: %w", filePath, err) return "", "", fmt.Errorf("failed to extract file %s for diff: %w", filePath, err)
} }
} }
tempArchivePath := filepath.Join(data.blobStore.GetBaseDir(), "diff-"+snapshotID+".zip") tempArchivePath := filepath.Join(data.blobStore.GetBaseDir(), "diff-"+snapshotID+".zip")
if err := archive.CreateArchive(tempStagingDir, tempArchivePath); err != nil { if err := archive.CreateArchive(tempStagingDir, tempArchivePath); err != nil {
os.RemoveAll(tempStagingDir) _ = os.RemoveAll(tempStagingDir)
os.Remove(tempArchivePath) _ = os.Remove(tempArchivePath)
return nil, fmt.Errorf("failed to create diff archive: %w", err) return "", "", fmt.Errorf("failed to create diff archive: %w", err)
}
return tempArchivePath, tempStagingDir, nil
}
func (data *SnapshotManagerData) StreamSnapshotDiff(ctx context.Context, snapshotID, parentID string, offset int64) (io.ReadCloser, error) {
tempArchivePath, tempStagingDir, err := data.createDiffArchive(ctx, snapshotID, parentID)
if err != nil {
return nil, fmt.Errorf("failed to create diff archive for streaming: %w", err)
}
if tempArchivePath == "" {
return io.NopCloser(bytes.NewReader(nil)), nil
} }
archiveFile, err := os.Open(tempArchivePath) archiveFile, err := os.Open(tempArchivePath)
if err != nil { if err != nil {
if tempStagingDir != "" {
os.RemoveAll(tempStagingDir) os.RemoveAll(tempStagingDir)
}
os.Remove(tempArchivePath) os.Remove(tempArchivePath)
return nil, err return nil, err
} }
@@ -501,7 +644,9 @@ func (data *SnapshotManagerData) StreamSnapshotDiff(ctx context.Context, snapsho
if offset > 0 { if offset > 0 {
if _, err := archiveFile.Seek(offset, io.SeekStart); err != nil { if _, err := archiveFile.Seek(offset, io.SeekStart); err != nil {
archiveFile.Close() archiveFile.Close()
if tempStagingDir != "" {
os.RemoveAll(tempStagingDir) os.RemoveAll(tempStagingDir)
}
os.Remove(tempArchivePath) os.Remove(tempArchivePath)
return nil, fmt.Errorf("failed to seek in diff archive: %w", err) return nil, fmt.Errorf("failed to seek in diff archive: %w", err)
} }

View File

@@ -1,4 +1,4 @@
package agate package models
import "errors" import "errors"

View File

@@ -11,6 +11,7 @@ import (
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
agateGrpc "gitea.unprism.ru/KRBL/Agate/grpc" agateGrpc "gitea.unprism.ru/KRBL/Agate/grpc"
"gitea.unprism.ru/KRBL/Agate/hash"
"gitea.unprism.ru/KRBL/Agate/interfaces" "gitea.unprism.ru/KRBL/Agate/interfaces"
"gitea.unprism.ru/KRBL/Agate/store" "gitea.unprism.ru/KRBL/Agate/store"
) )
@@ -89,8 +90,43 @@ func (c *Client) FetchSnapshotDetails(ctx context.Context, snapshotID string) (*
return snapshot, nil return snapshot, nil
} }
// GetDiffInfo gets the hash and size of a differential archive.
func (c *Client) GetDiffInfo(ctx context.Context, snapshotID, localParentID string) (*store.DiffInfo, error) {
req := &agateGrpc.GetDiffInfoRequest{
SnapshotId: snapshotID,
LocalParentId: localParentID,
}
info, err := c.client.GetDiffInfo(ctx, req)
if err != nil {
return nil, fmt.Errorf("failed to get diff info: %w", err)
}
return &store.DiffInfo{
SHA256: info.Sha256Hash,
Size: info.SizeBytes,
}, nil
}
// DownloadSnapshotDiff скачивает архив с разницей между снапшотами. // DownloadSnapshotDiff скачивает архив с разницей между снапшотами.
func (c *Client) DownloadSnapshotDiff(ctx context.Context, snapshotID, localParentID, targetPath string) error { func (c *Client) DownloadSnapshotDiff(ctx context.Context, snapshotID, localParentID, targetPath string) error {
// Check for local file and validate it
if fileInfo, err := os.Stat(targetPath); err == nil {
remoteDiffInfo, err := c.GetDiffInfo(ctx, snapshotID, localParentID)
if err != nil {
// Log the error but proceed with download
fmt.Printf("could not get remote diff info: %v. proceeding with download.", err)
} else {
if fileInfo.Size() == remoteDiffInfo.Size {
localHash, err := hash.CalculateFileHash(targetPath)
if err == nil && localHash == remoteDiffInfo.SHA256 {
fmt.Printf("local snapshot archive %s is valid, skipping download.", targetPath)
return nil // File is valid, skip download
}
}
}
}
var offset int64 var offset int64
fileInfo, err := os.Stat(targetPath) fileInfo, err := os.Stat(targetPath)
if err == nil { if err == nil {
@@ -118,7 +154,11 @@ func (c *Client) DownloadSnapshotDiff(ctx context.Context, snapshotID, localPare
if err != nil { if err != nil {
return fmt.Errorf("failed to open file %s: %w", targetPath, err) return fmt.Errorf("failed to open file %s: %w", targetPath, err)
} }
defer file.Close() defer func() {
if err := file.Close(); err != nil {
fmt.Printf("failed to close file: %v", err)
}
}()
for { for {
resp, err := stream.Recv() resp, err := stream.Recv()

View File

@@ -158,6 +158,19 @@ func (s *Server) DownloadSnapshotDiff(req *agateGrpc.DownloadSnapshotDiffRequest
return nil return nil
} }
// GetDiffInfo реализует gRPC-метод GetDiffInfo.
func (s *Server) GetDiffInfo(ctx context.Context, req *agateGrpc.GetDiffInfoRequest) (*agateGrpc.DiffInfo, error) {
diffInfo, err := s.manager.GetSnapshotDiffInfo(ctx, req.SnapshotId, req.LocalParentId)
if err != nil {
return nil, fmt.Errorf("failed to get diff info: %w", err)
}
return &agateGrpc.DiffInfo{
Sha256Hash: diffInfo.SHA256,
SizeBytes: diffInfo.Size,
}, nil
}
// Вспомогательная функция для конвертации store.SnapshotInfo в grpc.SnapshotInfo // Вспомогательная функция для конвертации store.SnapshotInfo в grpc.SnapshotInfo
func convertToGrpcSnapshotInfo(info store.SnapshotInfo) *agateGrpc.SnapshotInfo { func convertToGrpcSnapshotInfo(info store.SnapshotInfo) *agateGrpc.SnapshotInfo {
return &agateGrpc.SnapshotInfo{ return &agateGrpc.SnapshotInfo{

View File

@@ -1,9 +0,0 @@
package store
import "errors"
// Common errors that can be used by store implementations
var (
// ErrNotFound means that a requested resource was not found
ErrNotFound = errors.New("resource not found")
)

View File

@@ -7,6 +7,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"gitea.unprism.ru/KRBL/Agate/models"
"gitea.unprism.ru/KRBL/Agate/store" "gitea.unprism.ru/KRBL/Agate/store"
) )
@@ -75,7 +76,7 @@ func (fs *fileSystemStore) RetrieveBlob(ctx context.Context, snapshotID string)
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
// Если файл не найден, возвращаем кастомную ошибку // Если файл не найден, возвращаем кастомную ошибку
return nil, store.ErrNotFound return nil, models.ErrNotFound
} }
return nil, fmt.Errorf("failed to open blob file %s: %w", blobPath, err) return nil, fmt.Errorf("failed to open blob file %s: %w", blobPath, err)
} }
@@ -109,7 +110,7 @@ func (fs *fileSystemStore) GetBlobPath(ctx context.Context, snapshotID string) (
// Проверяем существование файла // Проверяем существование файла
if _, err := os.Stat(blobPath); err != nil { if _, err := os.Stat(blobPath); err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
return "", store.ErrNotFound return "", models.ErrNotFound
} }
return "", fmt.Errorf("failed to stat blob file %s: %w", blobPath, err) return "", fmt.Errorf("failed to stat blob file %s: %w", blobPath, err)
} }

View File

@@ -6,11 +6,13 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"gitea.unprism.ru/KRBL/Agate/store"
_ "github.com/mattn/go-sqlite3"
"os" "os"
"path/filepath" "path/filepath"
"time" "time"
"gitea.unprism.ru/KRBL/Agate/models"
"gitea.unprism.ru/KRBL/Agate/store"
_ "github.com/mattn/go-sqlite3"
) )
const ( const (
@@ -130,7 +132,7 @@ func (s *sqliteStore) GetSnapshotMetadata(ctx context.Context, snapshotID string
if err != nil { if err != nil {
if errors.Is(err, sql.ErrNoRows) { if errors.Is(err, sql.ErrNoRows) {
// Если запись не найдена, возвращаем кастомную ошибку // Если запись не найдена, возвращаем кастомную ошибку
return nil, store.ErrNotFound return nil, models.ErrNotFound
} }
return nil, fmt.Errorf("failed to query snapshot %s: %w", snapshotID, err) return nil, fmt.Errorf("failed to query snapshot %s: %w", snapshotID, err)
} }

View File

@@ -6,6 +6,12 @@ import (
"time" "time"
) )
// DiffInfo represents metadata about a differential archive.
type DiffInfo struct {
SHA256 string
Size int64
}
// FileInfo represents metadata and attributes of a file or directory. // FileInfo represents metadata and attributes of a file or directory.
type FileInfo struct { type FileInfo struct {
Path string // Path represents the relative or absolute location of the file or directory in the filesystem. Path string // Path represents the relative or absolute location of the file or directory in the filesystem.
@@ -31,6 +37,14 @@ type SnapshotInfo struct {
CreationTime time.Time // Время создания CreationTime time.Time // Время создания
} }
// SnapshotStatus represents the status of an asynchronous snapshot creation process.
type SnapshotStatus struct {
ID string // Unique identifier of the job (usually same as Snapshot ID)
Status string // Current status: "pending", "running", "done", "failed"
Progress float64 // Progress from 0.0 to 1.0
Error string // Error message if failed
}
// ListOptions provides options for filtering and paginating snapshot lists // ListOptions provides options for filtering and paginating snapshot lists
type ListOptions struct { type ListOptions struct {
FilterByName string // Filter snapshots by name (substring match) FilterByName string // Filter snapshots by name (substring match)