Compare commits
9 Commits
v0.3.1-alp
...
fe7a02cc18
| Author | SHA1 | Date | |
|---|---|---|---|
|
fe7a02cc18
|
|||
|
9754bf3f6c
|
|||
|
058aff4019
|
|||
|
6845219e94
|
|||
|
99764eb91f
|
|||
|
644a94656a
|
|||
|
cd98d1f4a2
|
|||
|
f34539c06b
|
|||
|
5192658607
|
4
Makefile
4
Makefile
@@ -41,10 +41,6 @@ test-coverage:
|
|||||||
go test -v -coverprofile=coverage.out ./...
|
go test -v -coverprofile=coverage.out ./...
|
||||||
go tool cover -html=coverage.out -o coverage.html
|
go tool cover -html=coverage.out -o coverage.html
|
||||||
|
|
||||||
# Run linter
|
|
||||||
lint:
|
|
||||||
golangci-lint run
|
|
||||||
|
|
||||||
# Run all checks (tests + linter)
|
# Run all checks (tests + linter)
|
||||||
check: test lint
|
check: test lint
|
||||||
|
|
||||||
|
|||||||
224
api.go
224
api.go
@@ -4,14 +4,17 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"gitea.unprism.ru/KRBL/Agate/archive"
|
|
||||||
"gitea.unprism.ru/KRBL/Agate/interfaces"
|
|
||||||
"gitea.unprism.ru/KRBL/Agate/remote"
|
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitea.unprism.ru/KRBL/Agate/archive"
|
||||||
|
"gitea.unprism.ru/KRBL/Agate/interfaces"
|
||||||
|
"gitea.unprism.ru/KRBL/Agate/models"
|
||||||
|
"gitea.unprism.ru/KRBL/Agate/remote"
|
||||||
|
|
||||||
"gitea.unprism.ru/KRBL/Agate/store"
|
"gitea.unprism.ru/KRBL/Agate/store"
|
||||||
"gitea.unprism.ru/KRBL/Agate/stores"
|
"gitea.unprism.ru/KRBL/Agate/stores"
|
||||||
@@ -224,6 +227,56 @@ func (a *Agate) SaveSnapshot(ctx context.Context, name string, parentID string)
|
|||||||
return snapshot.ID, nil
|
return snapshot.ID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SnapshotAsync creates a new snapshot asynchronously.
|
||||||
|
// It returns the job ID (which is also the snapshot ID) immediately.
|
||||||
|
// The actual snapshot creation happens in a background goroutine.
|
||||||
|
// Use GetSnapshotStatus to check the progress.
|
||||||
|
func (a *Agate) SnapshotAsync(ctx context.Context, name string, parentID string) (string, error) {
|
||||||
|
a.options.Logger.Printf("Starting async snapshot creation with name: %s", name)
|
||||||
|
|
||||||
|
// If parentID is not provided, use the current snapshot ID
|
||||||
|
if parentID == "" {
|
||||||
|
parentID = a.currentSnapshotID
|
||||||
|
}
|
||||||
|
|
||||||
|
return a.manager.CreateSnapshotAsync(ctx, a.options.BlobStore.GetActiveDir(), name, parentID,
|
||||||
|
func() {
|
||||||
|
// onStart: Lock mutex and close resources
|
||||||
|
a.mutex.Lock()
|
||||||
|
if a.options.CloseFunc != nil {
|
||||||
|
if err := a.options.CloseFunc(); err != nil {
|
||||||
|
a.options.Logger.Printf("ERROR: failed to close resources before async snapshot: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
func(id string, err error) {
|
||||||
|
// onFinish: Open resources, update state, and unlock mutex
|
||||||
|
defer a.mutex.Unlock()
|
||||||
|
|
||||||
|
if a.options.OpenFunc != nil {
|
||||||
|
if err := a.options.OpenFunc(a.options.BlobStore.GetActiveDir()); err != nil {
|
||||||
|
a.options.Logger.Printf("ERROR: failed to open resources after async snapshot: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err == nil {
|
||||||
|
a.currentSnapshotID = id
|
||||||
|
if err := a.saveCurrentSnapshotID(); err != nil {
|
||||||
|
a.options.Logger.Printf("ERROR: failed to save current snapshot ID: %v", err)
|
||||||
|
}
|
||||||
|
a.options.Logger.Printf("Async snapshot %s created successfully", id)
|
||||||
|
} else {
|
||||||
|
a.options.Logger.Printf("Async snapshot creation failed: %v", err)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetSnapshotStatus returns the status of an asynchronous snapshot creation job.
|
||||||
|
func (a *Agate) GetSnapshotStatus(ctx context.Context, jobID string) (*store.SnapshotStatus, error) {
|
||||||
|
return a.manager.GetSnapshotStatus(ctx, jobID)
|
||||||
|
}
|
||||||
|
|
||||||
// RestoreSnapshot extracts a snapshot to the active directory.
|
// RestoreSnapshot extracts a snapshot to the active directory.
|
||||||
func (a *Agate) RestoreSnapshot(ctx context.Context, snapshotID string) error {
|
func (a *Agate) RestoreSnapshot(ctx context.Context, snapshotID string) error {
|
||||||
a.mutex.Lock()
|
a.mutex.Lock()
|
||||||
@@ -325,7 +378,9 @@ func (a *Agate) saveCurrentSnapshotID() error {
|
|||||||
if a.currentSnapshotID == "" {
|
if a.currentSnapshotID == "" {
|
||||||
// If there's no current snapshot ID, remove the file if it exists
|
// If there's no current snapshot ID, remove the file if it exists
|
||||||
if _, err := os.Stat(a.currentIDFile); err == nil {
|
if _, err := os.Stat(a.currentIDFile); err == nil {
|
||||||
return os.Remove(a.currentIDFile)
|
if err := os.Remove(a.currentIDFile); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -348,7 +403,6 @@ func (a *Agate) Close() error {
|
|||||||
|
|
||||||
// StartServer starts a gRPC server to share snapshots.
|
// StartServer starts a gRPC server to share snapshots.
|
||||||
func (a *Agate) StartServer(ctx context.Context, address string) error {
|
func (a *Agate) StartServer(ctx context.Context, address string) error {
|
||||||
// Использование нового remote.Server
|
|
||||||
server := remote.NewServer(a.manager)
|
server := remote.NewServer(a.manager)
|
||||||
return server.Start(ctx, address)
|
return server.Start(ctx, address)
|
||||||
}
|
}
|
||||||
@@ -359,22 +413,58 @@ func (a *Agate) GetRemoteSnapshot(ctx context.Context, address string, snapshotI
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer client.Close()
|
defer func() {
|
||||||
|
if err := client.Close(); err != nil {
|
||||||
|
a.options.Logger.Printf("ERROR: failed to close client: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
remoteSnapshot, err := client.FetchSnapshotDetails(ctx, snapshotID)
|
remoteSnapshot, err := client.FetchSnapshotDetails(ctx, snapshotID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to get remote snapshot details: %w", err)
|
return fmt.Errorf("failed to get remote snapshot details: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 1. Подготовка
|
|
||||||
tempDownloadDir := filepath.Join(a.options.WorkDir, "temp_download")
|
tempDownloadDir := filepath.Join(a.options.WorkDir, "temp_download")
|
||||||
if err := os.MkdirAll(tempDownloadDir, 0755); err != nil {
|
if err := os.MkdirAll(tempDownloadDir, 0755); err != nil {
|
||||||
return fmt.Errorf("failed to create temp download dir: %w", err)
|
return fmt.Errorf("failed to create temp download dir: %w", err)
|
||||||
}
|
}
|
||||||
|
newSnapshotDir := filepath.Join(tempDownloadDir, "new_content_"+snapshotID)
|
||||||
|
if err := os.MkdirAll(newSnapshotDir, 0755); err != nil {
|
||||||
|
return fmt.Errorf("failed to create new snapshot directory: %w", err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err := os.RemoveAll(newSnapshotDir); err != nil {
|
||||||
|
a.options.Logger.Printf("ERROR: failed to remove temp dir: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
if localParentID != "" {
|
||||||
|
if err := a.manager.ExtractSnapshot(ctx, localParentID, newSnapshotDir, false); err != nil {
|
||||||
|
a.options.Logger.Printf("Warning: failed to extract local parent snapshot %s: %v", localParentID, err)
|
||||||
|
} else {
|
||||||
|
localParentSnap, err := a.GetSnapshotDetails(ctx, localParentID)
|
||||||
|
if err != nil {
|
||||||
|
a.options.Logger.Printf("Warning: failed to get local parent details %s: %v", localParentID, err)
|
||||||
|
} else {
|
||||||
|
remoteFilesMap := make(map[string]struct{})
|
||||||
|
for _, f := range remoteSnapshot.Files {
|
||||||
|
remoteFilesMap[f.Path] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, localFile := range localParentSnap.Files {
|
||||||
|
if _, exists := remoteFilesMap[localFile.Path]; !exists {
|
||||||
|
pathToDelete := filepath.Join(newSnapshotDir, localFile.Path)
|
||||||
|
if err := os.RemoveAll(pathToDelete); err != nil {
|
||||||
|
a.options.Logger.Printf("Warning: failed to delete file %s during diff apply: %v", pathToDelete, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
diffArchivePath := filepath.Join(tempDownloadDir, snapshotID+"_diff.zip")
|
diffArchivePath := filepath.Join(tempDownloadDir, snapshotID+"_diff.zip")
|
||||||
diffPartPath := diffArchivePath + ".part"
|
diffPartPath := diffArchivePath + ".part"
|
||||||
|
|
||||||
// 2. Скачивание дельты с докачкой
|
|
||||||
a.options.Logger.Printf("Downloading diff for snapshot %s from parent %s", snapshotID, localParentID)
|
a.options.Logger.Printf("Downloading diff for snapshot %s from parent %s", snapshotID, localParentID)
|
||||||
if err := client.DownloadSnapshotDiff(ctx, snapshotID, localParentID, diffPartPath); err != nil {
|
if err := client.DownloadSnapshotDiff(ctx, snapshotID, localParentID, diffPartPath); err != nil {
|
||||||
return fmt.Errorf("failed to download snapshot diff: %w", err)
|
return fmt.Errorf("failed to download snapshot diff: %w", err)
|
||||||
@@ -382,34 +472,25 @@ func (a *Agate) GetRemoteSnapshot(ctx context.Context, address string, snapshotI
|
|||||||
if err := os.Rename(diffPartPath, diffArchivePath); err != nil {
|
if err := os.Rename(diffPartPath, diffArchivePath); err != nil {
|
||||||
return fmt.Errorf("failed to finalize downloaded diff: %w", err)
|
return fmt.Errorf("failed to finalize downloaded diff: %w", err)
|
||||||
}
|
}
|
||||||
defer os.Remove(diffArchivePath)
|
defer func() {
|
||||||
|
if err := os.Remove(diffArchivePath); err != nil {
|
||||||
// 3. Атомарное применение
|
a.options.Logger.Printf("ERROR: failed to remove temp file: %v", err)
|
||||||
// Создаем новую директорию для снапшота
|
|
||||||
newSnapshotDir := filepath.Join(tempDownloadDir, "new_content_"+snapshotID)
|
|
||||||
if err := os.MkdirAll(newSnapshotDir, 0755); err != nil {
|
|
||||||
return fmt.Errorf("failed to create new snapshot directory: %w", err)
|
|
||||||
}
|
|
||||||
defer os.RemoveAll(newSnapshotDir)
|
|
||||||
|
|
||||||
// Если есть родитель, извлекаем его содержимое
|
|
||||||
if localParentID != "" {
|
|
||||||
if err := a.manager.ExtractSnapshot(ctx, localParentID, newSnapshotDir, false); err != nil {
|
|
||||||
a.options.Logger.Printf("Warning: failed to extract local parent snapshot %s: %v", localParentID, err)
|
|
||||||
}
|
}
|
||||||
}
|
}()
|
||||||
|
|
||||||
// Распаковываем дельта-архив поверх
|
|
||||||
if err := extractArchive(diffArchivePath, newSnapshotDir); err != nil {
|
if err := extractArchive(diffArchivePath, newSnapshotDir); err != nil {
|
||||||
return fmt.Errorf("failed to extract diff archive: %w", err)
|
return fmt.Errorf("failed to extract diff archive: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// 4. Создаем финальный архив и регистрируем снапшот
|
|
||||||
finalArchivePath := filepath.Join(tempDownloadDir, snapshotID+".zip")
|
finalArchivePath := filepath.Join(tempDownloadDir, snapshotID+".zip")
|
||||||
if err := archive.CreateArchive(newSnapshotDir, finalArchivePath); err != nil {
|
if err := archive.CreateArchive(newSnapshotDir, finalArchivePath); err != nil {
|
||||||
return fmt.Errorf("failed to create final snapshot archive: %w", err)
|
return fmt.Errorf("failed to create final snapshot archive: %w", err)
|
||||||
}
|
}
|
||||||
defer os.Remove(finalArchivePath)
|
defer func() {
|
||||||
|
if err := os.Remove(finalArchivePath); err != nil {
|
||||||
|
a.options.Logger.Printf("ERROR: failed to remove temp file: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
finalArchiveFile, err := os.Open(finalArchivePath)
|
finalArchiveFile, err := os.Open(finalArchivePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -422,7 +503,7 @@ func (a *Agate) GetRemoteSnapshot(ctx context.Context, address string, snapshotI
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err := a.options.MetadataStore.SaveSnapshotMetadata(ctx, *remoteSnapshot); err != nil {
|
if err := a.options.MetadataStore.SaveSnapshotMetadata(ctx, *remoteSnapshot); err != nil {
|
||||||
a.options.BlobStore.DeleteBlob(ctx, snapshotID) // Откат
|
a.options.BlobStore.DeleteBlob(ctx, snapshotID)
|
||||||
return fmt.Errorf("failed to save snapshot metadata: %w", err)
|
return fmt.Errorf("failed to save snapshot metadata: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -433,3 +514,90 @@ func (a *Agate) GetRemoteSnapshot(ctx context.Context, address string, snapshotI
|
|||||||
func (a *Agate) GetCurrentSnapshotID() string {
|
func (a *Agate) GetCurrentSnapshotID() string {
|
||||||
return a.currentSnapshotID
|
return a.currentSnapshotID
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RegisterLocalSnapshot регистрирует локальный файл как блоб снимка и создает
|
||||||
|
// соответствующую запись в метаданных. Если снимок с таким ID уже существует,
|
||||||
|
// метод ничего не делает и возвращает nil.
|
||||||
|
//
|
||||||
|
// - ctx: Контекст для выполнения операции.
|
||||||
|
// - snapshotID: ID регистрируемого снимка.
|
||||||
|
// - parentID: ID родительского снимка. Может быть пустым для полных снимков.
|
||||||
|
// - name: Описательное имя для снимка.
|
||||||
|
// - localPath: Абсолютный путь к локальному файлу снимка (полному или дифф-архиву).
|
||||||
|
func (ag *Agate) RegisterLocalSnapshot(ctx context.Context, snapshotID, parentID, name, localPath string) error {
|
||||||
|
// 1. Check if snapshot already exists
|
||||||
|
_, err := ag.manager.GetSnapshotDetails(ctx, snapshotID)
|
||||||
|
if err == nil {
|
||||||
|
ag.options.Logger.Printf("snapshot %s already exists, skipping registration", snapshotID)
|
||||||
|
return nil // Snapshot already exists
|
||||||
|
}
|
||||||
|
// We expect ErrNotFound, anything else is a real error.
|
||||||
|
if !errors.Is(err, models.ErrNotFound) {
|
||||||
|
return fmt.Errorf("failed to check for existing snapshot: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. Add the file to the blob store
|
||||||
|
// Check if blob already exists. If so, we assume it's the correct one and skip overwriting.
|
||||||
|
// This is to prevent issues when registering a file that is already in the blob store.
|
||||||
|
_, err = ag.options.BlobStore.GetBlobPath(ctx, snapshotID)
|
||||||
|
if err == nil {
|
||||||
|
ag.options.Logger.Printf("blob for snapshot %s already exists, skipping storing it", snapshotID)
|
||||||
|
} else if errors.Is(err, models.ErrNotFound) {
|
||||||
|
// Blob does not exist, so we store it.
|
||||||
|
localFile, err := os.Open(localPath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to open local snapshot file: %w", err)
|
||||||
|
}
|
||||||
|
defer func() {
|
||||||
|
if err := localFile.Close(); err != nil {
|
||||||
|
ag.options.Logger.Printf("ERROR: failed to close local file: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
if _, err = ag.options.BlobStore.StoreBlob(ctx, snapshotID, localFile); err != nil {
|
||||||
|
return fmt.Errorf("failed to store blob from local file: %w", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Another error occurred when checking for the blob.
|
||||||
|
return fmt.Errorf("failed to check for existing blob: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. Create and save snapshot metadata
|
||||||
|
// We get the file list from the archive to create the metadata.
|
||||||
|
// Note: This method does not calculate file hashes, so the metadata will be incomplete.
|
||||||
|
// This is a limitation of the current implementation.
|
||||||
|
var files []store.FileInfo
|
||||||
|
archiveFiles, err := archive.ListArchiveContents(localPath)
|
||||||
|
if err != nil {
|
||||||
|
// If we can't list the contents, we can't create the metadata.
|
||||||
|
// We should clean up the blob we just stored.
|
||||||
|
_ = ag.options.BlobStore.DeleteBlob(ctx, snapshotID)
|
||||||
|
return fmt.Errorf("failed to list archive contents for metadata creation: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, f := range archiveFiles {
|
||||||
|
files = append(files, store.FileInfo{
|
||||||
|
Path: f.Path,
|
||||||
|
Size: int64(f.Size),
|
||||||
|
IsDir: f.IsDir,
|
||||||
|
// SHA256 is intentionally left empty as we don't have it.
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
snapshot := store.Snapshot{
|
||||||
|
ID: snapshotID,
|
||||||
|
Name: name,
|
||||||
|
ParentID: parentID,
|
||||||
|
CreationTime: time.Now(),
|
||||||
|
Files: files,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := ag.options.MetadataStore.SaveSnapshotMetadata(ctx, snapshot); err != nil {
|
||||||
|
// Clean up the blob
|
||||||
|
_ = ag.options.BlobStore.DeleteBlob(ctx, snapshotID)
|
||||||
|
return fmt.Errorf("failed to save snapshot metadata: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ag.options.Logger.Printf("Successfully registered local snapshot %s", snapshotID)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -95,6 +95,104 @@ func CreateArchive(sourceDir, targetPath string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CreateArchiveWithProgress creates a ZIP archive with progress reporting.
|
||||||
|
// onProgress is called with the current number of bytes written and the total size.
|
||||||
|
func CreateArchiveWithProgress(sourceDir, targetPath string, onProgress func(current, total int64)) error {
|
||||||
|
info, err := os.Stat(sourceDir)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to stat source directory %s: %w", sourceDir, err)
|
||||||
|
}
|
||||||
|
if !info.IsDir() {
|
||||||
|
return fmt.Errorf("source %s is not a directory", sourceDir)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Calculate total size
|
||||||
|
var totalSize int64
|
||||||
|
err = filepath.Walk(sourceDir, func(path string, info os.FileInfo, err error) error {
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if !info.IsDir() {
|
||||||
|
totalSize += info.Size()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to calculate total size of %s: %w", sourceDir, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create file for ZIP archive
|
||||||
|
outFile, err := os.Create(targetPath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create target archive file %s: %w", targetPath, err)
|
||||||
|
}
|
||||||
|
defer outFile.Close()
|
||||||
|
|
||||||
|
// Create zip.Writer
|
||||||
|
zipWriter := zip.NewWriter(outFile)
|
||||||
|
defer zipWriter.Close()
|
||||||
|
|
||||||
|
var currentSize int64
|
||||||
|
|
||||||
|
// Recursively walk sourceDir
|
||||||
|
err = filepath.Walk(sourceDir, func(filePath string, fileInfo os.FileInfo, walkErr error) error {
|
||||||
|
if walkErr != nil {
|
||||||
|
return fmt.Errorf("error walking path %s: %w", filePath, walkErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip sourceDir itself
|
||||||
|
if filePath == sourceDir {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create relative path
|
||||||
|
relativePath := strings.TrimPrefix(filePath, sourceDir+string(filepath.Separator))
|
||||||
|
relativePath = filepath.ToSlash(relativePath)
|
||||||
|
|
||||||
|
// Check if directory
|
||||||
|
if fileInfo.IsDir() {
|
||||||
|
_, err = zipWriter.Create(relativePath + "/")
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create directory entry %s in archive: %w", relativePath, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Open file for reading
|
||||||
|
fileToArchive, err := os.Open(filePath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to open file %s for archiving: %w", filePath, err)
|
||||||
|
}
|
||||||
|
defer fileToArchive.Close()
|
||||||
|
|
||||||
|
// Create archive entry
|
||||||
|
zipEntryWriter, err := zipWriter.Create(relativePath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create entry %s in archive: %w", relativePath, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Copy content
|
||||||
|
n, err := io.Copy(zipEntryWriter, fileToArchive)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to copy file content %s to archive: %w", filePath, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
currentSize += n
|
||||||
|
if onProgress != nil {
|
||||||
|
onProgress(currentSize, totalSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
os.Remove(targetPath)
|
||||||
|
return fmt.Errorf("failed during directory walk for archiving %s: %w", sourceDir, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// ListArchiveContents читает ZIP-архив и возвращает информацию о его содержимом.
|
// ListArchiveContents читает ZIP-архив и возвращает информацию о его содержимом.
|
||||||
func ListArchiveContents(archivePath string) ([]ArchiveEntryInfo, error) {
|
func ListArchiveContents(archivePath string) ([]ArchiveEntryInfo, error) {
|
||||||
// Открываем ZIP-архив
|
// Открываем ZIP-архив
|
||||||
|
|||||||
115
async_test.go
Normal file
115
async_test.go
Normal file
@@ -0,0 +1,115 @@
|
|||||||
|
package agate
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestSnapshotAsync(t *testing.T) {
|
||||||
|
// Setup temporary work directory
|
||||||
|
workDir, err := os.MkdirTemp("", "agate_async_test")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(workDir)
|
||||||
|
|
||||||
|
// Initialize Agate
|
||||||
|
opts := AgateOptions{
|
||||||
|
WorkDir: workDir,
|
||||||
|
Logger: nil, // Disable logging for test
|
||||||
|
}
|
||||||
|
|
||||||
|
ag, err := New(opts)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to initialize Agate: %v", err)
|
||||||
|
}
|
||||||
|
defer ag.Close()
|
||||||
|
|
||||||
|
// Get active directory and create some dummy files
|
||||||
|
activeDir := ag.GetActiveDir()
|
||||||
|
if err := os.MkdirAll(activeDir, 0755); err != nil {
|
||||||
|
t.Fatalf("Failed to create active dir: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a large-ish file to ensure it takes some time (though still fast)
|
||||||
|
dummyFile := filepath.Join(activeDir, "data.bin")
|
||||||
|
data := make([]byte, 1024*1024) // 1MB
|
||||||
|
if err := os.WriteFile(dummyFile, data, 0644); err != nil {
|
||||||
|
t.Fatalf("Failed to create dummy file: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start async snapshot
|
||||||
|
ctx := context.Background()
|
||||||
|
snapshotID, err := ag.SnapshotAsync(ctx, "async-snap", "")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("SnapshotAsync failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if snapshotID == "" {
|
||||||
|
t.Fatal("SnapshotAsync returned empty ID")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check status immediately
|
||||||
|
status, err := ag.GetSnapshotStatus(ctx, snapshotID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GetSnapshotStatus failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Status should be pending or running
|
||||||
|
if status.Status != "pending" && status.Status != "running" {
|
||||||
|
t.Errorf("Initial status should be pending or running, got: %s", status.Status)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Poll for completion
|
||||||
|
timeout := time.After(5 * time.Second)
|
||||||
|
ticker := time.NewTicker(10 * time.Millisecond)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
done := false
|
||||||
|
for !done {
|
||||||
|
select {
|
||||||
|
case <-timeout:
|
||||||
|
t.Fatal("Timeout waiting for snapshot completion")
|
||||||
|
case <-ticker.C:
|
||||||
|
status, err := ag.GetSnapshotStatus(ctx, snapshotID)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GetSnapshotStatus failed during polling: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if status.Status == "done" {
|
||||||
|
done = true
|
||||||
|
if status.Progress != 1.0 {
|
||||||
|
t.Errorf("Expected progress 1.0, got %f", status.Progress)
|
||||||
|
}
|
||||||
|
} else if status.Status == "failed" {
|
||||||
|
t.Fatalf("Snapshot creation failed: %s", status.Error)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify snapshot exists
|
||||||
|
snaps, err := ag.ListSnapshots(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("ListSnapshots failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
found := false
|
||||||
|
for _, s := range snaps {
|
||||||
|
if s.ID == snapshotID {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !found {
|
||||||
|
t.Errorf("Snapshot %s not found in list", snapshotID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify current snapshot ID is updated
|
||||||
|
if ag.GetCurrentSnapshotID() != snapshotID {
|
||||||
|
t.Errorf("Current snapshot ID not updated. Expected %s, got %s", snapshotID, ag.GetCurrentSnapshotID())
|
||||||
|
}
|
||||||
|
}
|
||||||
218
go.mod
218
go.mod
@@ -1,216 +1,20 @@
|
|||||||
module gitea.unprism.ru/KRBL/Agate
|
module gitea.unprism.ru/KRBL/Agate
|
||||||
|
|
||||||
go 1.24.3
|
go 1.25.5
|
||||||
|
|
||||||
tool github.com/golangci/golangci-lint/v2/cmd/golangci-lint
|
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/google/uuid v1.6.0
|
github.com/google/uuid v1.6.0
|
||||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3
|
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3
|
||||||
github.com/mattn/go-sqlite3 v1.14.28
|
github.com/mattn/go-sqlite3 v1.14.32
|
||||||
google.golang.org/genproto/googleapis/api v0.0.0-20250505200425-f936aa4a68b2
|
github.com/sirupsen/logrus v1.9.3
|
||||||
google.golang.org/grpc v1.72.0
|
google.golang.org/genproto/googleapis/api v0.0.0-20251222181119-0a764e51fe1b
|
||||||
google.golang.org/protobuf v1.36.6
|
google.golang.org/grpc v1.78.0
|
||||||
|
google.golang.org/protobuf v1.36.11
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
4d63.com/gocheckcompilerdirectives v1.3.0 // indirect
|
golang.org/x/net v0.48.0 // indirect
|
||||||
4d63.com/gochecknoglobals v0.2.2 // indirect
|
golang.org/x/sys v0.39.0 // indirect
|
||||||
github.com/4meepo/tagalign v1.4.2 // indirect
|
golang.org/x/text v0.32.0 // indirect
|
||||||
github.com/Abirdcfly/dupword v0.1.3 // indirect
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20251222181119-0a764e51fe1b // indirect
|
||||||
github.com/Antonboom/errname v1.1.0 // indirect
|
|
||||||
github.com/Antonboom/nilnil v1.1.0 // indirect
|
|
||||||
github.com/Antonboom/testifylint v1.6.1 // indirect
|
|
||||||
github.com/BurntSushi/toml v1.5.0 // indirect
|
|
||||||
github.com/Djarvur/go-err113 v0.0.0-20210108212216-aea10b59be24 // indirect
|
|
||||||
github.com/GaijinEntertainment/go-exhaustruct/v3 v3.3.1 // indirect
|
|
||||||
github.com/Masterminds/semver/v3 v3.3.1 // indirect
|
|
||||||
github.com/OpenPeeDeeP/depguard/v2 v2.2.1 // indirect
|
|
||||||
github.com/alecthomas/chroma/v2 v2.17.2 // indirect
|
|
||||||
github.com/alecthomas/go-check-sumtype v0.3.1 // indirect
|
|
||||||
github.com/alexkohler/nakedret/v2 v2.0.6 // indirect
|
|
||||||
github.com/alexkohler/prealloc v1.0.0 // indirect
|
|
||||||
github.com/alingse/asasalint v0.0.11 // indirect
|
|
||||||
github.com/alingse/nilnesserr v0.2.0 // indirect
|
|
||||||
github.com/ashanbrown/forbidigo v1.6.0 // indirect
|
|
||||||
github.com/ashanbrown/makezero v1.2.0 // indirect
|
|
||||||
github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect
|
|
||||||
github.com/beorn7/perks v1.0.1 // indirect
|
|
||||||
github.com/bkielbasa/cyclop v1.2.3 // indirect
|
|
||||||
github.com/blizzy78/varnamelen v0.8.0 // indirect
|
|
||||||
github.com/bombsimon/wsl/v4 v4.7.0 // indirect
|
|
||||||
github.com/breml/bidichk v0.3.3 // indirect
|
|
||||||
github.com/breml/errchkjson v0.4.1 // indirect
|
|
||||||
github.com/butuzov/ireturn v0.4.0 // indirect
|
|
||||||
github.com/butuzov/mirror v1.3.0 // indirect
|
|
||||||
github.com/catenacyber/perfsprint v0.9.1 // indirect
|
|
||||||
github.com/ccojocar/zxcvbn-go v1.0.2 // indirect
|
|
||||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
|
||||||
github.com/charithe/durationcheck v0.0.10 // indirect
|
|
||||||
github.com/charmbracelet/colorprofile v0.2.3-0.20250311203215-f60798e515dc // indirect
|
|
||||||
github.com/charmbracelet/lipgloss v1.1.0 // indirect
|
|
||||||
github.com/charmbracelet/x/ansi v0.8.0 // indirect
|
|
||||||
github.com/charmbracelet/x/cellbuf v0.0.13-0.20250311204145-2c3ea96c31dd // indirect
|
|
||||||
github.com/charmbracelet/x/term v0.2.1 // indirect
|
|
||||||
github.com/chavacava/garif v0.1.0 // indirect
|
|
||||||
github.com/ckaznocha/intrange v0.3.1 // indirect
|
|
||||||
github.com/curioswitch/go-reassign v0.3.0 // indirect
|
|
||||||
github.com/daixiang0/gci v0.13.6 // indirect
|
|
||||||
github.com/dave/dst v0.27.3 // indirect
|
|
||||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
|
||||||
github.com/denis-tingaikin/go-header v0.5.0 // indirect
|
|
||||||
github.com/dlclark/regexp2 v1.11.5 // indirect
|
|
||||||
github.com/ettle/strcase v0.2.0 // indirect
|
|
||||||
github.com/fatih/color v1.18.0 // indirect
|
|
||||||
github.com/fatih/structtag v1.2.0 // indirect
|
|
||||||
github.com/firefart/nonamedreturns v1.0.6 // indirect
|
|
||||||
github.com/fsnotify/fsnotify v1.5.4 // indirect
|
|
||||||
github.com/fzipp/gocyclo v0.6.0 // indirect
|
|
||||||
github.com/ghostiam/protogetter v0.3.15 // indirect
|
|
||||||
github.com/go-critic/go-critic v0.13.0 // indirect
|
|
||||||
github.com/go-toolsmith/astcast v1.1.0 // indirect
|
|
||||||
github.com/go-toolsmith/astcopy v1.1.0 // indirect
|
|
||||||
github.com/go-toolsmith/astequal v1.2.0 // indirect
|
|
||||||
github.com/go-toolsmith/astfmt v1.1.0 // indirect
|
|
||||||
github.com/go-toolsmith/astp v1.1.0 // indirect
|
|
||||||
github.com/go-toolsmith/strparse v1.1.0 // indirect
|
|
||||||
github.com/go-toolsmith/typep v1.1.0 // indirect
|
|
||||||
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
|
|
||||||
github.com/go-xmlfmt/xmlfmt v1.1.3 // indirect
|
|
||||||
github.com/gobwas/glob v0.2.3 // indirect
|
|
||||||
github.com/gofrs/flock v0.12.1 // indirect
|
|
||||||
github.com/golang/protobuf v1.5.4 // indirect
|
|
||||||
github.com/golangci/dupl v0.0.0-20250308024227-f665c8d69b32 // indirect
|
|
||||||
github.com/golangci/go-printf-func-name v0.1.0 // indirect
|
|
||||||
github.com/golangci/gofmt v0.0.0-20250106114630-d62b90e6713d // indirect
|
|
||||||
github.com/golangci/golangci-lint/v2 v2.1.6 // indirect
|
|
||||||
github.com/golangci/golines v0.0.0-20250217134842-442fd0091d95 // indirect
|
|
||||||
github.com/golangci/misspell v0.6.0 // indirect
|
|
||||||
github.com/golangci/plugin-module-register v0.1.1 // indirect
|
|
||||||
github.com/golangci/revgrep v0.8.0 // indirect
|
|
||||||
github.com/golangci/unconvert v0.0.0-20250410112200-a129a6e6413e // indirect
|
|
||||||
github.com/google/go-cmp v0.7.0 // indirect
|
|
||||||
github.com/gordonklaus/ineffassign v0.1.0 // indirect
|
|
||||||
github.com/gostaticanalysis/analysisutil v0.7.1 // indirect
|
|
||||||
github.com/gostaticanalysis/comment v1.5.0 // indirect
|
|
||||||
github.com/gostaticanalysis/forcetypeassert v0.2.0 // indirect
|
|
||||||
github.com/gostaticanalysis/nilerr v0.1.1 // indirect
|
|
||||||
github.com/hashicorp/go-immutable-radix/v2 v2.1.0 // indirect
|
|
||||||
github.com/hashicorp/go-version v1.7.0 // indirect
|
|
||||||
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
|
|
||||||
github.com/hashicorp/hcl v1.0.0 // indirect
|
|
||||||
github.com/hexops/gotextdiff v1.0.3 // indirect
|
|
||||||
github.com/inconshreveable/mousetrap v1.1.0 // indirect
|
|
||||||
github.com/jgautheron/goconst v1.8.1 // indirect
|
|
||||||
github.com/jingyugao/rowserrcheck v1.1.1 // indirect
|
|
||||||
github.com/jjti/go-spancheck v0.6.4 // indirect
|
|
||||||
github.com/julz/importas v0.2.0 // indirect
|
|
||||||
github.com/karamaru-alpha/copyloopvar v1.2.1 // indirect
|
|
||||||
github.com/kisielk/errcheck v1.9.0 // indirect
|
|
||||||
github.com/kkHAIKE/contextcheck v1.1.6 // indirect
|
|
||||||
github.com/kulti/thelper v0.6.3 // indirect
|
|
||||||
github.com/kunwardeep/paralleltest v1.0.14 // indirect
|
|
||||||
github.com/lasiar/canonicalheader v1.1.2 // indirect
|
|
||||||
github.com/ldez/exptostd v0.4.3 // indirect
|
|
||||||
github.com/ldez/gomoddirectives v0.6.1 // indirect
|
|
||||||
github.com/ldez/grignotin v0.9.0 // indirect
|
|
||||||
github.com/ldez/tagliatelle v0.7.1 // indirect
|
|
||||||
github.com/ldez/usetesting v0.4.3 // indirect
|
|
||||||
github.com/leonklingele/grouper v1.1.2 // indirect
|
|
||||||
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
|
|
||||||
github.com/macabu/inamedparam v0.2.0 // indirect
|
|
||||||
github.com/magiconair/properties v1.8.6 // indirect
|
|
||||||
github.com/manuelarte/funcorder v0.2.1 // indirect
|
|
||||||
github.com/maratori/testableexamples v1.0.0 // indirect
|
|
||||||
github.com/maratori/testpackage v1.1.1 // indirect
|
|
||||||
github.com/matoous/godox v1.1.0 // indirect
|
|
||||||
github.com/mattn/go-colorable v0.1.14 // indirect
|
|
||||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
|
||||||
github.com/mattn/go-runewidth v0.0.16 // indirect
|
|
||||||
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
|
|
||||||
github.com/mgechev/revive v1.9.0 // indirect
|
|
||||||
github.com/mitchellh/go-homedir v1.1.0 // indirect
|
|
||||||
github.com/mitchellh/mapstructure v1.5.0 // indirect
|
|
||||||
github.com/moricho/tparallel v0.3.2 // indirect
|
|
||||||
github.com/muesli/termenv v0.16.0 // indirect
|
|
||||||
github.com/nakabonne/nestif v0.3.1 // indirect
|
|
||||||
github.com/nishanths/exhaustive v0.12.0 // indirect
|
|
||||||
github.com/nishanths/predeclared v0.2.2 // indirect
|
|
||||||
github.com/nunnatsa/ginkgolinter v0.19.1 // indirect
|
|
||||||
github.com/olekukonko/tablewriter v0.0.5 // indirect
|
|
||||||
github.com/pelletier/go-toml v1.9.5 // indirect
|
|
||||||
github.com/pelletier/go-toml/v2 v2.2.4 // indirect
|
|
||||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
|
||||||
github.com/polyfloyd/go-errorlint v1.8.0 // indirect
|
|
||||||
github.com/prometheus/client_golang v1.12.1 // indirect
|
|
||||||
github.com/prometheus/client_model v0.2.0 // indirect
|
|
||||||
github.com/prometheus/common v0.32.1 // indirect
|
|
||||||
github.com/prometheus/procfs v0.7.3 // indirect
|
|
||||||
github.com/quasilyte/go-ruleguard v0.4.4 // indirect
|
|
||||||
github.com/quasilyte/go-ruleguard/dsl v0.3.22 // indirect
|
|
||||||
github.com/quasilyte/gogrep v0.5.0 // indirect
|
|
||||||
github.com/quasilyte/regex/syntax v0.0.0-20210819130434-b3f0c404a727 // indirect
|
|
||||||
github.com/quasilyte/stdinfo v0.0.0-20220114132959-f7386bf02567 // indirect
|
|
||||||
github.com/raeperd/recvcheck v0.2.0 // indirect
|
|
||||||
github.com/rivo/uniseg v0.4.7 // indirect
|
|
||||||
github.com/rogpeppe/go-internal v1.14.1 // indirect
|
|
||||||
github.com/ryancurrah/gomodguard v1.4.1 // indirect
|
|
||||||
github.com/ryanrolds/sqlclosecheck v0.5.1 // indirect
|
|
||||||
github.com/sanposhiho/wastedassign/v2 v2.1.0 // indirect
|
|
||||||
github.com/santhosh-tekuri/jsonschema/v6 v6.0.1 // indirect
|
|
||||||
github.com/sashamelentyev/interfacebloat v1.1.0 // indirect
|
|
||||||
github.com/sashamelentyev/usestdlibvars v1.28.0 // indirect
|
|
||||||
github.com/securego/gosec/v2 v2.22.3 // indirect
|
|
||||||
github.com/sirupsen/logrus v1.9.3 // indirect
|
|
||||||
github.com/sivchari/containedctx v1.0.3 // indirect
|
|
||||||
github.com/sonatard/noctx v0.1.0 // indirect
|
|
||||||
github.com/sourcegraph/go-diff v0.7.0 // indirect
|
|
||||||
github.com/spf13/afero v1.14.0 // indirect
|
|
||||||
github.com/spf13/cast v1.5.0 // indirect
|
|
||||||
github.com/spf13/cobra v1.9.1 // indirect
|
|
||||||
github.com/spf13/jwalterweatherman v1.1.0 // indirect
|
|
||||||
github.com/spf13/pflag v1.0.6 // indirect
|
|
||||||
github.com/spf13/viper v1.12.0 // indirect
|
|
||||||
github.com/ssgreg/nlreturn/v2 v2.2.1 // indirect
|
|
||||||
github.com/stbenjam/no-sprintf-host-port v0.2.0 // indirect
|
|
||||||
github.com/stretchr/objx v0.5.2 // indirect
|
|
||||||
github.com/stretchr/testify v1.10.0 // indirect
|
|
||||||
github.com/subosito/gotenv v1.4.1 // indirect
|
|
||||||
github.com/tdakkota/asciicheck v0.4.1 // indirect
|
|
||||||
github.com/tetafro/godot v1.5.1 // indirect
|
|
||||||
github.com/timakin/bodyclose v0.0.0-20241222091800-1db5c5ca4d67 // indirect
|
|
||||||
github.com/timonwong/loggercheck v0.11.0 // indirect
|
|
||||||
github.com/tomarrell/wrapcheck/v2 v2.11.0 // indirect
|
|
||||||
github.com/tommy-muehle/go-mnd/v2 v2.5.1 // indirect
|
|
||||||
github.com/ultraware/funlen v0.2.0 // indirect
|
|
||||||
github.com/ultraware/whitespace v0.2.0 // indirect
|
|
||||||
github.com/uudashr/gocognit v1.2.0 // indirect
|
|
||||||
github.com/uudashr/iface v1.3.1 // indirect
|
|
||||||
github.com/xen0n/gosmopolitan v1.3.0 // indirect
|
|
||||||
github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect
|
|
||||||
github.com/yagipy/maintidx v1.0.0 // indirect
|
|
||||||
github.com/yeya24/promlinter v0.3.0 // indirect
|
|
||||||
github.com/ykadowak/zerologlint v0.1.5 // indirect
|
|
||||||
gitlab.com/bosi/decorder v0.4.2 // indirect
|
|
||||||
go-simpler.org/musttag v0.13.1 // indirect
|
|
||||||
go-simpler.org/sloglint v0.11.0 // indirect
|
|
||||||
go.augendre.info/fatcontext v0.8.0 // indirect
|
|
||||||
go.uber.org/atomic v1.7.0 // indirect
|
|
||||||
go.uber.org/automaxprocs v1.6.0 // indirect
|
|
||||||
go.uber.org/multierr v1.6.0 // indirect
|
|
||||||
go.uber.org/zap v1.24.0 // indirect
|
|
||||||
golang.org/x/exp/typeparams v0.0.0-20250210185358-939b2ce775ac // indirect
|
|
||||||
golang.org/x/mod v0.24.0 // indirect
|
|
||||||
golang.org/x/net v0.40.0 // indirect
|
|
||||||
golang.org/x/sync v0.14.0 // indirect
|
|
||||||
golang.org/x/sys v0.33.0 // indirect
|
|
||||||
golang.org/x/text v0.25.0 // indirect
|
|
||||||
golang.org/x/tools v0.32.0 // indirect
|
|
||||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250505200425-f936aa4a68b2 // indirect
|
|
||||||
gopkg.in/ini.v1 v1.67.0 // indirect
|
|
||||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
|
||||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
|
||||||
honnef.co/go/tools v0.6.1 // indirect
|
|
||||||
mvdan.cc/gofumpt v0.8.0 // indirect
|
|
||||||
mvdan.cc/unparam v0.0.0-20250301125049-0df0534333a4 // indirect
|
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
// Code generated by protoc-gen-go. DO NOT EDIT.
|
// Code generated by protoc-gen-go. DO NOT EDIT.
|
||||||
// versions:
|
// versions:
|
||||||
// protoc-gen-go v1.36.6
|
// protoc-gen-go v1.36.8
|
||||||
// protoc v4.25.3
|
// protoc v6.32.0
|
||||||
// source: snapshot.proto
|
// source: snapshot.proto
|
||||||
|
|
||||||
package grpc
|
package grpc
|
||||||
@@ -500,6 +500,112 @@ func (x *DownloadSnapshotDiffRequest) GetOffset() int64 {
|
|||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Запрос на получение информации о дифе
|
||||||
|
type GetDiffInfoRequest struct {
|
||||||
|
state protoimpl.MessageState `protogen:"open.v1"`
|
||||||
|
SnapshotId string `protobuf:"bytes,1,opt,name=snapshot_id,json=snapshotId,proto3" json:"snapshot_id,omitempty"`
|
||||||
|
LocalParentId string `protobuf:"bytes,2,opt,name=local_parent_id,json=localParentId,proto3" json:"local_parent_id,omitempty"`
|
||||||
|
unknownFields protoimpl.UnknownFields
|
||||||
|
sizeCache protoimpl.SizeCache
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *GetDiffInfoRequest) Reset() {
|
||||||
|
*x = GetDiffInfoRequest{}
|
||||||
|
mi := &file_snapshot_proto_msgTypes[9]
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *GetDiffInfoRequest) String() string {
|
||||||
|
return protoimpl.X.MessageStringOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*GetDiffInfoRequest) ProtoMessage() {}
|
||||||
|
|
||||||
|
func (x *GetDiffInfoRequest) ProtoReflect() protoreflect.Message {
|
||||||
|
mi := &file_snapshot_proto_msgTypes[9]
|
||||||
|
if x != nil {
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
if ms.LoadMessageInfo() == nil {
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
return ms
|
||||||
|
}
|
||||||
|
return mi.MessageOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deprecated: Use GetDiffInfoRequest.ProtoReflect.Descriptor instead.
|
||||||
|
func (*GetDiffInfoRequest) Descriptor() ([]byte, []int) {
|
||||||
|
return file_snapshot_proto_rawDescGZIP(), []int{9}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *GetDiffInfoRequest) GetSnapshotId() string {
|
||||||
|
if x != nil {
|
||||||
|
return x.SnapshotId
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *GetDiffInfoRequest) GetLocalParentId() string {
|
||||||
|
if x != nil {
|
||||||
|
return x.LocalParentId
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
// Информация о дифе
|
||||||
|
type DiffInfo struct {
|
||||||
|
state protoimpl.MessageState `protogen:"open.v1"`
|
||||||
|
Sha256Hash string `protobuf:"bytes,1,opt,name=sha256_hash,json=sha256Hash,proto3" json:"sha256_hash,omitempty"`
|
||||||
|
SizeBytes int64 `protobuf:"varint,2,opt,name=size_bytes,json=sizeBytes,proto3" json:"size_bytes,omitempty"`
|
||||||
|
unknownFields protoimpl.UnknownFields
|
||||||
|
sizeCache protoimpl.SizeCache
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *DiffInfo) Reset() {
|
||||||
|
*x = DiffInfo{}
|
||||||
|
mi := &file_snapshot_proto_msgTypes[10]
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *DiffInfo) String() string {
|
||||||
|
return protoimpl.X.MessageStringOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*DiffInfo) ProtoMessage() {}
|
||||||
|
|
||||||
|
func (x *DiffInfo) ProtoReflect() protoreflect.Message {
|
||||||
|
mi := &file_snapshot_proto_msgTypes[10]
|
||||||
|
if x != nil {
|
||||||
|
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||||
|
if ms.LoadMessageInfo() == nil {
|
||||||
|
ms.StoreMessageInfo(mi)
|
||||||
|
}
|
||||||
|
return ms
|
||||||
|
}
|
||||||
|
return mi.MessageOf(x)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deprecated: Use DiffInfo.ProtoReflect.Descriptor instead.
|
||||||
|
func (*DiffInfo) Descriptor() ([]byte, []int) {
|
||||||
|
return file_snapshot_proto_rawDescGZIP(), []int{10}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *DiffInfo) GetSha256Hash() string {
|
||||||
|
if x != nil {
|
||||||
|
return x.Sha256Hash
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *DiffInfo) GetSizeBytes() int64 {
|
||||||
|
if x != nil {
|
||||||
|
return x.SizeBytes
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
var File_snapshot_proto protoreflect.FileDescriptor
|
var File_snapshot_proto protoreflect.FileDescriptor
|
||||||
|
|
||||||
const file_snapshot_proto_rawDesc = "" +
|
const file_snapshot_proto_rawDesc = "" +
|
||||||
@@ -538,12 +644,22 @@ const file_snapshot_proto_rawDesc = "" +
|
|||||||
"\vsnapshot_id\x18\x01 \x01(\tR\n" +
|
"\vsnapshot_id\x18\x01 \x01(\tR\n" +
|
||||||
"snapshotId\x12&\n" +
|
"snapshotId\x12&\n" +
|
||||||
"\x0flocal_parent_id\x18\x02 \x01(\tR\rlocalParentId\x12\x16\n" +
|
"\x0flocal_parent_id\x18\x02 \x01(\tR\rlocalParentId\x12\x16\n" +
|
||||||
"\x06offset\x18\x03 \x01(\x03R\x06offset2\xf1\x03\n" +
|
"\x06offset\x18\x03 \x01(\x03R\x06offset\"]\n" +
|
||||||
|
"\x12GetDiffInfoRequest\x12\x1f\n" +
|
||||||
|
"\vsnapshot_id\x18\x01 \x01(\tR\n" +
|
||||||
|
"snapshotId\x12&\n" +
|
||||||
|
"\x0flocal_parent_id\x18\x02 \x01(\tR\rlocalParentId\"J\n" +
|
||||||
|
"\bDiffInfo\x12\x1f\n" +
|
||||||
|
"\vsha256_hash\x18\x01 \x01(\tR\n" +
|
||||||
|
"sha256Hash\x12\x1d\n" +
|
||||||
|
"\n" +
|
||||||
|
"size_bytes\x18\x02 \x01(\x03R\tsizeBytes2\xb8\x04\n" +
|
||||||
"\x0fSnapshotService\x12k\n" +
|
"\x0fSnapshotService\x12k\n" +
|
||||||
"\rListSnapshots\x12 .agate.grpc.ListSnapshotsRequest\x1a!.agate.grpc.ListSnapshotsResponse\"\x15\x82\xd3\xe4\x93\x02\x0f\x12\r/v1/snapshots\x12}\n" +
|
"\rListSnapshots\x12 .agate.grpc.ListSnapshotsRequest\x1a!.agate.grpc.ListSnapshotsResponse\"\x15\x82\xd3\xe4\x93\x02\x0f\x12\r/v1/snapshots\x12}\n" +
|
||||||
"\x12GetSnapshotDetails\x12%.agate.grpc.GetSnapshotDetailsRequest\x1a\x1b.agate.grpc.SnapshotDetails\"#\x82\xd3\xe4\x93\x02\x1d\x12\x1b/v1/snapshots/{snapshot_id}\x12\x8a\x01\n" +
|
"\x12GetSnapshotDetails\x12%.agate.grpc.GetSnapshotDetailsRequest\x1a\x1b.agate.grpc.SnapshotDetails\"#\x82\xd3\xe4\x93\x02\x1d\x12\x1b/v1/snapshots/{snapshot_id}\x12\x8a\x01\n" +
|
||||||
"\fDownloadFile\x12\x1f.agate.grpc.DownloadFileRequest\x1a .agate.grpc.DownloadFileResponse\"5\x82\xd3\xe4\x93\x02/\x12-/v1/snapshots/{snapshot_id}/files/{file_path}0\x01\x12e\n" +
|
"\fDownloadFile\x12\x1f.agate.grpc.DownloadFileRequest\x1a .agate.grpc.DownloadFileResponse\"5\x82\xd3\xe4\x93\x02/\x12-/v1/snapshots/{snapshot_id}/files/{file_path}0\x01\x12e\n" +
|
||||||
"\x14DownloadSnapshotDiff\x12'.agate.grpc.DownloadSnapshotDiffRequest\x1a .agate.grpc.DownloadFileResponse\"\x000\x01B\"Z gitea.unprism.ru/KRBL/Agate/grpcb\x06proto3"
|
"\x14DownloadSnapshotDiff\x12'.agate.grpc.DownloadSnapshotDiffRequest\x1a .agate.grpc.DownloadFileResponse\"\x000\x01\x12E\n" +
|
||||||
|
"\vGetDiffInfo\x12\x1e.agate.grpc.GetDiffInfoRequest\x1a\x14.agate.grpc.DiffInfo\"\x00B\"Z gitea.unprism.ru/KRBL/Agate/grpcb\x06proto3"
|
||||||
|
|
||||||
var (
|
var (
|
||||||
file_snapshot_proto_rawDescOnce sync.Once
|
file_snapshot_proto_rawDescOnce sync.Once
|
||||||
@@ -557,7 +673,7 @@ func file_snapshot_proto_rawDescGZIP() []byte {
|
|||||||
return file_snapshot_proto_rawDescData
|
return file_snapshot_proto_rawDescData
|
||||||
}
|
}
|
||||||
|
|
||||||
var file_snapshot_proto_msgTypes = make([]protoimpl.MessageInfo, 9)
|
var file_snapshot_proto_msgTypes = make([]protoimpl.MessageInfo, 11)
|
||||||
var file_snapshot_proto_goTypes = []any{
|
var file_snapshot_proto_goTypes = []any{
|
||||||
(*FileInfo)(nil), // 0: agate.grpc.FileInfo
|
(*FileInfo)(nil), // 0: agate.grpc.FileInfo
|
||||||
(*SnapshotInfo)(nil), // 1: agate.grpc.SnapshotInfo
|
(*SnapshotInfo)(nil), // 1: agate.grpc.SnapshotInfo
|
||||||
@@ -568,26 +684,30 @@ var file_snapshot_proto_goTypes = []any{
|
|||||||
(*DownloadFileRequest)(nil), // 6: agate.grpc.DownloadFileRequest
|
(*DownloadFileRequest)(nil), // 6: agate.grpc.DownloadFileRequest
|
||||||
(*DownloadFileResponse)(nil), // 7: agate.grpc.DownloadFileResponse
|
(*DownloadFileResponse)(nil), // 7: agate.grpc.DownloadFileResponse
|
||||||
(*DownloadSnapshotDiffRequest)(nil), // 8: agate.grpc.DownloadSnapshotDiffRequest
|
(*DownloadSnapshotDiffRequest)(nil), // 8: agate.grpc.DownloadSnapshotDiffRequest
|
||||||
(*timestamppb.Timestamp)(nil), // 9: google.protobuf.Timestamp
|
(*GetDiffInfoRequest)(nil), // 9: agate.grpc.GetDiffInfoRequest
|
||||||
|
(*DiffInfo)(nil), // 10: agate.grpc.DiffInfo
|
||||||
|
(*timestamppb.Timestamp)(nil), // 11: google.protobuf.Timestamp
|
||||||
}
|
}
|
||||||
var file_snapshot_proto_depIdxs = []int32{
|
var file_snapshot_proto_depIdxs = []int32{
|
||||||
9, // 0: agate.grpc.SnapshotInfo.creation_time:type_name -> google.protobuf.Timestamp
|
11, // 0: agate.grpc.SnapshotInfo.creation_time:type_name -> google.protobuf.Timestamp
|
||||||
1, // 1: agate.grpc.SnapshotDetails.info:type_name -> agate.grpc.SnapshotInfo
|
1, // 1: agate.grpc.SnapshotDetails.info:type_name -> agate.grpc.SnapshotInfo
|
||||||
0, // 2: agate.grpc.SnapshotDetails.files:type_name -> agate.grpc.FileInfo
|
0, // 2: agate.grpc.SnapshotDetails.files:type_name -> agate.grpc.FileInfo
|
||||||
1, // 3: agate.grpc.ListSnapshotsResponse.snapshots:type_name -> agate.grpc.SnapshotInfo
|
1, // 3: agate.grpc.ListSnapshotsResponse.snapshots:type_name -> agate.grpc.SnapshotInfo
|
||||||
3, // 4: agate.grpc.SnapshotService.ListSnapshots:input_type -> agate.grpc.ListSnapshotsRequest
|
3, // 4: agate.grpc.SnapshotService.ListSnapshots:input_type -> agate.grpc.ListSnapshotsRequest
|
||||||
5, // 5: agate.grpc.SnapshotService.GetSnapshotDetails:input_type -> agate.grpc.GetSnapshotDetailsRequest
|
5, // 5: agate.grpc.SnapshotService.GetSnapshotDetails:input_type -> agate.grpc.GetSnapshotDetailsRequest
|
||||||
6, // 6: agate.grpc.SnapshotService.DownloadFile:input_type -> agate.grpc.DownloadFileRequest
|
6, // 6: agate.grpc.SnapshotService.DownloadFile:input_type -> agate.grpc.DownloadFileRequest
|
||||||
8, // 7: agate.grpc.SnapshotService.DownloadSnapshotDiff:input_type -> agate.grpc.DownloadSnapshotDiffRequest
|
8, // 7: agate.grpc.SnapshotService.DownloadSnapshotDiff:input_type -> agate.grpc.DownloadSnapshotDiffRequest
|
||||||
4, // 8: agate.grpc.SnapshotService.ListSnapshots:output_type -> agate.grpc.ListSnapshotsResponse
|
9, // 8: agate.grpc.SnapshotService.GetDiffInfo:input_type -> agate.grpc.GetDiffInfoRequest
|
||||||
2, // 9: agate.grpc.SnapshotService.GetSnapshotDetails:output_type -> agate.grpc.SnapshotDetails
|
4, // 9: agate.grpc.SnapshotService.ListSnapshots:output_type -> agate.grpc.ListSnapshotsResponse
|
||||||
7, // 10: agate.grpc.SnapshotService.DownloadFile:output_type -> agate.grpc.DownloadFileResponse
|
2, // 10: agate.grpc.SnapshotService.GetSnapshotDetails:output_type -> agate.grpc.SnapshotDetails
|
||||||
7, // 11: agate.grpc.SnapshotService.DownloadSnapshotDiff:output_type -> agate.grpc.DownloadFileResponse
|
7, // 11: agate.grpc.SnapshotService.DownloadFile:output_type -> agate.grpc.DownloadFileResponse
|
||||||
8, // [8:12] is the sub-list for method output_type
|
7, // 12: agate.grpc.SnapshotService.DownloadSnapshotDiff:output_type -> agate.grpc.DownloadFileResponse
|
||||||
4, // [4:8] is the sub-list for method input_type
|
10, // 13: agate.grpc.SnapshotService.GetDiffInfo:output_type -> agate.grpc.DiffInfo
|
||||||
4, // [4:4] is the sub-list for extension type_name
|
9, // [9:14] is the sub-list for method output_type
|
||||||
4, // [4:4] is the sub-list for extension extendee
|
4, // [4:9] is the sub-list for method input_type
|
||||||
0, // [0:4] is the sub-list for field type_name
|
4, // [4:4] is the sub-list for extension type_name
|
||||||
|
4, // [4:4] is the sub-list for extension extendee
|
||||||
|
0, // [0:4] is the sub-list for field type_name
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() { file_snapshot_proto_init() }
|
func init() { file_snapshot_proto_init() }
|
||||||
@@ -601,7 +721,7 @@ func file_snapshot_proto_init() {
|
|||||||
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
|
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
|
||||||
RawDescriptor: unsafe.Slice(unsafe.StringData(file_snapshot_proto_rawDesc), len(file_snapshot_proto_rawDesc)),
|
RawDescriptor: unsafe.Slice(unsafe.StringData(file_snapshot_proto_rawDesc), len(file_snapshot_proto_rawDesc)),
|
||||||
NumEnums: 0,
|
NumEnums: 0,
|
||||||
NumMessages: 9,
|
NumMessages: 11,
|
||||||
NumExtensions: 0,
|
NumExtensions: 0,
|
||||||
NumServices: 1,
|
NumServices: 1,
|
||||||
},
|
},
|
||||||
|
|||||||
@@ -40,7 +40,9 @@ func request_SnapshotService_ListSnapshots_0(ctx context.Context, marshaler runt
|
|||||||
protoReq ListSnapshotsRequest
|
protoReq ListSnapshotsRequest
|
||||||
metadata runtime.ServerMetadata
|
metadata runtime.ServerMetadata
|
||||||
)
|
)
|
||||||
io.Copy(io.Discard, req.Body)
|
if req.Body != nil {
|
||||||
|
_, _ = io.Copy(io.Discard, req.Body)
|
||||||
|
}
|
||||||
msg, err := client.ListSnapshots(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
|
msg, err := client.ListSnapshots(ctx, &protoReq, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
|
||||||
return msg, metadata, err
|
return msg, metadata, err
|
||||||
}
|
}
|
||||||
@@ -60,7 +62,9 @@ func request_SnapshotService_GetSnapshotDetails_0(ctx context.Context, marshaler
|
|||||||
metadata runtime.ServerMetadata
|
metadata runtime.ServerMetadata
|
||||||
err error
|
err error
|
||||||
)
|
)
|
||||||
io.Copy(io.Discard, req.Body)
|
if req.Body != nil {
|
||||||
|
_, _ = io.Copy(io.Discard, req.Body)
|
||||||
|
}
|
||||||
val, ok := pathParams["snapshot_id"]
|
val, ok := pathParams["snapshot_id"]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "snapshot_id")
|
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "snapshot_id")
|
||||||
@@ -97,7 +101,9 @@ func request_SnapshotService_DownloadFile_0(ctx context.Context, marshaler runti
|
|||||||
metadata runtime.ServerMetadata
|
metadata runtime.ServerMetadata
|
||||||
err error
|
err error
|
||||||
)
|
)
|
||||||
io.Copy(io.Discard, req.Body)
|
if req.Body != nil {
|
||||||
|
_, _ = io.Copy(io.Discard, req.Body)
|
||||||
|
}
|
||||||
val, ok := pathParams["snapshot_id"]
|
val, ok := pathParams["snapshot_id"]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "snapshot_id")
|
return nil, metadata, status.Errorf(codes.InvalidArgument, "missing parameter %s", "snapshot_id")
|
||||||
|
|||||||
@@ -32,6 +32,9 @@ service SnapshotService {
|
|||||||
|
|
||||||
// Скачать архив, содержащий только разницу между двумя снапшотами
|
// Скачать архив, содержащий только разницу между двумя снапшотами
|
||||||
rpc DownloadSnapshotDiff(DownloadSnapshotDiffRequest) returns (stream DownloadFileResponse) {}
|
rpc DownloadSnapshotDiff(DownloadSnapshotDiffRequest) returns (stream DownloadFileResponse) {}
|
||||||
|
|
||||||
|
// Получить информацию о дифе (хеш и размер)
|
||||||
|
rpc GetDiffInfo(GetDiffInfoRequest) returns (DiffInfo) {}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Метаданные файла внутри снапшота
|
// Метаданные файла внутри снапшота
|
||||||
@@ -86,3 +89,15 @@ message DownloadSnapshotDiffRequest {
|
|||||||
string local_parent_id = 2; // ID снапшота, который уже есть у клиента
|
string local_parent_id = 2; // ID снапшота, который уже есть у клиента
|
||||||
int64 offset = 3; // Смещение в байтах для докачки
|
int64 offset = 3; // Смещение в байтах для докачки
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Запрос на получение информации о дифе
|
||||||
|
message GetDiffInfoRequest {
|
||||||
|
string snapshot_id = 1;
|
||||||
|
string local_parent_id = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Информация о дифе
|
||||||
|
message DiffInfo {
|
||||||
|
string sha256_hash = 1;
|
||||||
|
int64 size_bytes = 2;
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
|
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
|
||||||
// versions:
|
// versions:
|
||||||
// - protoc-gen-go-grpc v1.5.1
|
// - protoc-gen-go-grpc v1.5.1
|
||||||
// - protoc v4.25.3
|
// - protoc v6.32.0
|
||||||
// source: snapshot.proto
|
// source: snapshot.proto
|
||||||
|
|
||||||
package grpc
|
package grpc
|
||||||
@@ -23,6 +23,7 @@ const (
|
|||||||
SnapshotService_GetSnapshotDetails_FullMethodName = "/agate.grpc.SnapshotService/GetSnapshotDetails"
|
SnapshotService_GetSnapshotDetails_FullMethodName = "/agate.grpc.SnapshotService/GetSnapshotDetails"
|
||||||
SnapshotService_DownloadFile_FullMethodName = "/agate.grpc.SnapshotService/DownloadFile"
|
SnapshotService_DownloadFile_FullMethodName = "/agate.grpc.SnapshotService/DownloadFile"
|
||||||
SnapshotService_DownloadSnapshotDiff_FullMethodName = "/agate.grpc.SnapshotService/DownloadSnapshotDiff"
|
SnapshotService_DownloadSnapshotDiff_FullMethodName = "/agate.grpc.SnapshotService/DownloadSnapshotDiff"
|
||||||
|
SnapshotService_GetDiffInfo_FullMethodName = "/agate.grpc.SnapshotService/GetDiffInfo"
|
||||||
)
|
)
|
||||||
|
|
||||||
// SnapshotServiceClient is the client API for SnapshotService service.
|
// SnapshotServiceClient is the client API for SnapshotService service.
|
||||||
@@ -39,6 +40,8 @@ type SnapshotServiceClient interface {
|
|||||||
DownloadFile(ctx context.Context, in *DownloadFileRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[DownloadFileResponse], error)
|
DownloadFile(ctx context.Context, in *DownloadFileRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[DownloadFileResponse], error)
|
||||||
// Скачать архив, содержащий только разницу между двумя снапшотами
|
// Скачать архив, содержащий только разницу между двумя снапшотами
|
||||||
DownloadSnapshotDiff(ctx context.Context, in *DownloadSnapshotDiffRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[DownloadFileResponse], error)
|
DownloadSnapshotDiff(ctx context.Context, in *DownloadSnapshotDiffRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[DownloadFileResponse], error)
|
||||||
|
// Получить информацию о дифе (хеш и размер)
|
||||||
|
GetDiffInfo(ctx context.Context, in *GetDiffInfoRequest, opts ...grpc.CallOption) (*DiffInfo, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type snapshotServiceClient struct {
|
type snapshotServiceClient struct {
|
||||||
@@ -107,6 +110,16 @@ func (c *snapshotServiceClient) DownloadSnapshotDiff(ctx context.Context, in *Do
|
|||||||
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
|
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
|
||||||
type SnapshotService_DownloadSnapshotDiffClient = grpc.ServerStreamingClient[DownloadFileResponse]
|
type SnapshotService_DownloadSnapshotDiffClient = grpc.ServerStreamingClient[DownloadFileResponse]
|
||||||
|
|
||||||
|
func (c *snapshotServiceClient) GetDiffInfo(ctx context.Context, in *GetDiffInfoRequest, opts ...grpc.CallOption) (*DiffInfo, error) {
|
||||||
|
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||||
|
out := new(DiffInfo)
|
||||||
|
err := c.cc.Invoke(ctx, SnapshotService_GetDiffInfo_FullMethodName, in, out, cOpts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
// SnapshotServiceServer is the server API for SnapshotService service.
|
// SnapshotServiceServer is the server API for SnapshotService service.
|
||||||
// All implementations must embed UnimplementedSnapshotServiceServer
|
// All implementations must embed UnimplementedSnapshotServiceServer
|
||||||
// for forward compatibility.
|
// for forward compatibility.
|
||||||
@@ -121,6 +134,8 @@ type SnapshotServiceServer interface {
|
|||||||
DownloadFile(*DownloadFileRequest, grpc.ServerStreamingServer[DownloadFileResponse]) error
|
DownloadFile(*DownloadFileRequest, grpc.ServerStreamingServer[DownloadFileResponse]) error
|
||||||
// Скачать архив, содержащий только разницу между двумя снапшотами
|
// Скачать архив, содержащий только разницу между двумя снапшотами
|
||||||
DownloadSnapshotDiff(*DownloadSnapshotDiffRequest, grpc.ServerStreamingServer[DownloadFileResponse]) error
|
DownloadSnapshotDiff(*DownloadSnapshotDiffRequest, grpc.ServerStreamingServer[DownloadFileResponse]) error
|
||||||
|
// Получить информацию о дифе (хеш и размер)
|
||||||
|
GetDiffInfo(context.Context, *GetDiffInfoRequest) (*DiffInfo, error)
|
||||||
mustEmbedUnimplementedSnapshotServiceServer()
|
mustEmbedUnimplementedSnapshotServiceServer()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -143,6 +158,9 @@ func (UnimplementedSnapshotServiceServer) DownloadFile(*DownloadFileRequest, grp
|
|||||||
func (UnimplementedSnapshotServiceServer) DownloadSnapshotDiff(*DownloadSnapshotDiffRequest, grpc.ServerStreamingServer[DownloadFileResponse]) error {
|
func (UnimplementedSnapshotServiceServer) DownloadSnapshotDiff(*DownloadSnapshotDiffRequest, grpc.ServerStreamingServer[DownloadFileResponse]) error {
|
||||||
return status.Errorf(codes.Unimplemented, "method DownloadSnapshotDiff not implemented")
|
return status.Errorf(codes.Unimplemented, "method DownloadSnapshotDiff not implemented")
|
||||||
}
|
}
|
||||||
|
func (UnimplementedSnapshotServiceServer) GetDiffInfo(context.Context, *GetDiffInfoRequest) (*DiffInfo, error) {
|
||||||
|
return nil, status.Errorf(codes.Unimplemented, "method GetDiffInfo not implemented")
|
||||||
|
}
|
||||||
func (UnimplementedSnapshotServiceServer) mustEmbedUnimplementedSnapshotServiceServer() {}
|
func (UnimplementedSnapshotServiceServer) mustEmbedUnimplementedSnapshotServiceServer() {}
|
||||||
func (UnimplementedSnapshotServiceServer) testEmbeddedByValue() {}
|
func (UnimplementedSnapshotServiceServer) testEmbeddedByValue() {}
|
||||||
|
|
||||||
@@ -222,6 +240,24 @@ func _SnapshotService_DownloadSnapshotDiff_Handler(srv interface{}, stream grpc.
|
|||||||
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
|
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
|
||||||
type SnapshotService_DownloadSnapshotDiffServer = grpc.ServerStreamingServer[DownloadFileResponse]
|
type SnapshotService_DownloadSnapshotDiffServer = grpc.ServerStreamingServer[DownloadFileResponse]
|
||||||
|
|
||||||
|
func _SnapshotService_GetDiffInfo_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||||
|
in := new(GetDiffInfoRequest)
|
||||||
|
if err := dec(in); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if interceptor == nil {
|
||||||
|
return srv.(SnapshotServiceServer).GetDiffInfo(ctx, in)
|
||||||
|
}
|
||||||
|
info := &grpc.UnaryServerInfo{
|
||||||
|
Server: srv,
|
||||||
|
FullMethod: SnapshotService_GetDiffInfo_FullMethodName,
|
||||||
|
}
|
||||||
|
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||||
|
return srv.(SnapshotServiceServer).GetDiffInfo(ctx, req.(*GetDiffInfoRequest))
|
||||||
|
}
|
||||||
|
return interceptor(ctx, in, info, handler)
|
||||||
|
}
|
||||||
|
|
||||||
// SnapshotService_ServiceDesc is the grpc.ServiceDesc for SnapshotService service.
|
// SnapshotService_ServiceDesc is the grpc.ServiceDesc for SnapshotService service.
|
||||||
// It's only intended for direct use with grpc.RegisterService,
|
// It's only intended for direct use with grpc.RegisterService,
|
||||||
// and not to be introspected or modified (even as a copy)
|
// and not to be introspected or modified (even as a copy)
|
||||||
@@ -237,6 +273,10 @@ var SnapshotService_ServiceDesc = grpc.ServiceDesc{
|
|||||||
MethodName: "GetSnapshotDetails",
|
MethodName: "GetSnapshotDetails",
|
||||||
Handler: _SnapshotService_GetSnapshotDetails_Handler,
|
Handler: _SnapshotService_GetSnapshotDetails_Handler,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
MethodName: "GetDiffInfo",
|
||||||
|
Handler: _SnapshotService_GetDiffInfo_Handler,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
Streams: []grpc.StreamDesc{
|
Streams: []grpc.StreamDesc{
|
||||||
{
|
{
|
||||||
|
|||||||
247
grpc_test.go
247
grpc_test.go
@@ -2,8 +2,6 @@ package agate
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/sha256"
|
|
||||||
"io"
|
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
@@ -13,17 +11,13 @@ import (
|
|||||||
"gitea.unprism.ru/KRBL/Agate/remote"
|
"gitea.unprism.ru/KRBL/Agate/remote"
|
||||||
)
|
)
|
||||||
|
|
||||||
// TestGRPCServerClient tests the interaction between a gRPC server and client.
|
// TestFullUpdateCycle tests a complete workflow: full download, then incremental update.
|
||||||
// It creates multiple snapshots with different content on the server,
|
func TestFullUpdateCycle(t *testing.T) {
|
||||||
// connects a client to the server, downloads the latest snapshot,
|
|
||||||
// and verifies the contents of the files.
|
|
||||||
func TestGRPCServerClient(t *testing.T) {
|
|
||||||
// Skip this test in short mode
|
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
t.Skip("Skipping gRPC server-client test in short mode")
|
t.Skip("Skipping full gRPC update cycle test in short mode")
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- Setup Server ---
|
// --- 1. Setup Server ---
|
||||||
serverDir, err := os.MkdirTemp("", "agate-server-*")
|
serverDir, err := os.MkdirTemp("", "agate-server-*")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to create server temp directory: %v", err)
|
t.Fatalf("Failed to create server temp directory: %v", err)
|
||||||
@@ -36,54 +30,36 @@ func TestGRPCServerClient(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer serverAgate.Close()
|
defer serverAgate.Close()
|
||||||
|
|
||||||
dataDir := serverAgate.options.BlobStore.GetActiveDir()
|
// --- 2. Create Initial Snapshot (A) ---
|
||||||
if err := os.MkdirAll(dataDir, 0755); err != nil {
|
dataDir := serverAgate.GetActiveDir()
|
||||||
t.Fatalf("Failed to create data directory: %v", err)
|
if err := os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("Version 1"), 0644); err != nil {
|
||||||
}
|
|
||||||
|
|
||||||
// Create initial test files for the first snapshot
|
|
||||||
if err := os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("content1"), 0644); err != nil {
|
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if err := os.WriteFile(filepath.Join(dataDir, "file2.txt"), []byte("content2"), 0644); err != nil {
|
if err := os.WriteFile(filepath.Join(dataDir, "file2.txt"), []byte("Original Content"), 0644); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
snapshot1ID, err := serverAgate.SaveSnapshot(ctx, "Snapshot 1", "")
|
snapshotAID, err := serverAgate.SaveSnapshot(ctx, "Snapshot A", "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to create first snapshot: %v", err)
|
t.Fatalf("Failed to create Snapshot A: %v", err)
|
||||||
}
|
}
|
||||||
t.Logf("Created first snapshot with ID: %s", snapshot1ID)
|
t.Logf("Created Snapshot A with ID: %s", snapshotAID)
|
||||||
|
|
||||||
// Modify content for the second snapshot
|
// --- 3. Start Server ---
|
||||||
if err := os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("modified content1"), 0644); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
if err := os.WriteFile(filepath.Join(dataDir, "file3.txt"), []byte("new file3"), 0644); err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
snapshot2ID, err := serverAgate.SaveSnapshot(ctx, "Snapshot 2", snapshot1ID)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to create second snapshot: %v", err)
|
|
||||||
}
|
|
||||||
t.Logf("Created second snapshot with ID: %s", snapshot2ID)
|
|
||||||
|
|
||||||
// Start the gRPC server
|
|
||||||
serverAddress := "localhost:50051"
|
serverAddress := "localhost:50051"
|
||||||
server := remote.NewServer(serverAgate.manager)
|
server := remote.NewServer(serverAgate.manager)
|
||||||
go func() {
|
go func() {
|
||||||
if err := server.Start(ctx, serverAddress); err != nil {
|
if err := server.Start(ctx, serverAddress); err != nil && err != context.Canceled {
|
||||||
log.Printf("Server start error: %v", err)
|
log.Printf("Server start error: %v", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
defer server.Stop()
|
defer server.Stop()
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
// --- Setup Client ---
|
// --- 4. Setup Client ---
|
||||||
clientDir, err := os.MkdirTemp("", "agate-client-*")
|
clientDir, err := os.MkdirTemp("", "agate-client-*")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to create client temp directory: %v", err)
|
t.Fatalf("Failed to create client temp directory: %v", err)
|
||||||
@@ -96,35 +72,62 @@ func TestGRPCServerClient(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer clientAgate.Close()
|
defer clientAgate.Close()
|
||||||
|
|
||||||
// --- Test Scenario ---
|
// --- 5. Client Performs Full Download of Snapshot A ---
|
||||||
// 1. Client downloads the first snapshot completely
|
t.Log("Client performing full download of Snapshot A...")
|
||||||
t.Log("Client downloading Snapshot 1...")
|
if err := clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshotAID, ""); err != nil {
|
||||||
if err := clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshot1ID, ""); err != nil {
|
t.Fatalf("Client failed to get Snapshot A: %v", err)
|
||||||
t.Fatalf("Client failed to get snapshot 1: %v", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify content of snapshot 1
|
// Verify content of Snapshot A on client
|
||||||
if err := clientAgate.RestoreSnapshot(ctx, snapshot1ID); err != nil {
|
if err := clientAgate.RestoreSnapshot(ctx, snapshotAID); err != nil {
|
||||||
t.Fatalf("Failed to restore snapshot 1: %v", err)
|
t.Fatalf("Failed to restore Snapshot A: %v", err)
|
||||||
}
|
}
|
||||||
verifyFileContent(t, clientAgate.GetActiveDir(), "file1.txt", "content1")
|
verifyFileContent(t, clientAgate.GetActiveDir(), "file1.txt", "Version 1")
|
||||||
verifyFileContent(t, clientAgate.GetActiveDir(), "file2.txt", "content2")
|
verifyFileContent(t, clientAgate.GetActiveDir(), "file2.txt", "Original Content")
|
||||||
|
t.Log("Snapshot A verified on client.")
|
||||||
|
|
||||||
// 2. Client downloads the second snapshot incrementally
|
// --- 6. Server Creates Incremental Snapshot (B) ---
|
||||||
t.Log("Client downloading Snapshot 2 (incrementally)...")
|
if err := os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("Version 2"), 0644); err != nil {
|
||||||
if err := clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshot2ID, snapshot1ID); err != nil {
|
t.Fatal(err)
|
||||||
t.Fatalf("Client failed to get snapshot 2: %v", err)
|
}
|
||||||
|
if err := os.WriteFile(filepath.Join(dataDir, "file3.txt"), []byte("New File"), 0644); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err := os.Remove(filepath.Join(dataDir, "file2.txt")); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify content of snapshot 2
|
snapshotBID, err := serverAgate.SaveSnapshot(ctx, "Snapshot B", snapshotAID)
|
||||||
if err := clientAgate.RestoreSnapshot(ctx, snapshot2ID); err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to restore snapshot 2: %v", err)
|
t.Fatalf("Failed to create Snapshot B: %v", err)
|
||||||
}
|
}
|
||||||
verifyFileContent(t, clientAgate.GetActiveDir(), "file1.txt", "modified content1")
|
t.Logf("Created Snapshot B with ID: %s", snapshotBID)
|
||||||
verifyFileContent(t, clientAgate.GetActiveDir(), "file3.txt", "new file3")
|
|
||||||
// file2.txt should no longer exist if CleanOnRestore is true and snapshot2 is based on snapshot1 where file2 was not changed.
|
// --- 7. Client Performs Incremental Download of Snapshot B ---
|
||||||
// But our diff logic is additive. Let's re-check the logic. The logic is: parent + diff = new. So file2 should exist.
|
t.Log("Client performing incremental download of Snapshot B...")
|
||||||
verifyFileContent(t, clientAgate.GetActiveDir(), "file2.txt", "content2")
|
parentIDOnClient := clientAgate.GetCurrentSnapshotID()
|
||||||
|
if parentIDOnClient != snapshotAID {
|
||||||
|
t.Fatalf("Client has incorrect current snapshot ID. Got %s, want %s", parentIDOnClient, snapshotAID)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshotBID, parentIDOnClient); err != nil {
|
||||||
|
t.Fatalf("Client failed to get Snapshot B: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- 8. Verify Final State on Client ---
|
||||||
|
if err := clientAgate.RestoreSnapshot(ctx, snapshotBID); err != nil {
|
||||||
|
t.Fatalf("Failed to restore Snapshot B: %v", err)
|
||||||
|
}
|
||||||
|
t.Log("Snapshot B restored on client. Verifying content...")
|
||||||
|
|
||||||
|
verifyFileContent(t, clientAgate.GetActiveDir(), "file1.txt", "Version 2")
|
||||||
|
verifyFileContent(t, clientAgate.GetActiveDir(), "file3.txt", "New File")
|
||||||
|
|
||||||
|
// Verify that file2.txt was removed because CleanOnRestore is true
|
||||||
|
if _, err := os.Stat(filepath.Join(clientAgate.GetActiveDir(), "file2.txt")); !os.IsNotExist(err) {
|
||||||
|
t.Errorf("file2.txt should have been removed after restoring Snapshot B, but it still exists.")
|
||||||
|
}
|
||||||
|
t.Log("Final state verified successfully!")
|
||||||
}
|
}
|
||||||
|
|
||||||
func verifyFileContent(t *testing.T, dir, filename, expectedContent string) {
|
func verifyFileContent(t *testing.T, dir, filename, expectedContent string) {
|
||||||
@@ -137,127 +140,3 @@ func verifyFileContent(t *testing.T, dir, filename, expectedContent string) {
|
|||||||
t.Errorf("File %s has wrong content: got '%s', want '%s'", filename, string(content), expectedContent)
|
t.Errorf("File %s has wrong content: got '%s', want '%s'", filename, string(content), expectedContent)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestGRPC_GetRemoteSnapshot_FullDownload tests a full download when no parent is specified.
|
|
||||||
func TestGRPC_GetRemoteSnapshot_FullDownload(t *testing.T) {
|
|
||||||
// --- Setup Server ---
|
|
||||||
serverDir, _ := os.MkdirTemp("", "agate-server-*")
|
|
||||||
defer os.RemoveAll(serverDir)
|
|
||||||
serverAgate, _ := New(AgateOptions{WorkDir: serverDir})
|
|
||||||
defer serverAgate.Close()
|
|
||||||
dataDir := serverAgate.options.BlobStore.GetActiveDir()
|
|
||||||
os.MkdirAll(dataDir, 0755)
|
|
||||||
os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("full download"), 0644)
|
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
snapshotID, err := serverAgate.SaveSnapshot(ctx, "FullSnapshot", "")
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to create snapshot: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start Server
|
|
||||||
serverAddress := "localhost:50052"
|
|
||||||
server := remote.NewServer(serverAgate.manager)
|
|
||||||
go func() { server.Start(ctx, serverAddress) }()
|
|
||||||
defer server.Stop()
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
|
|
||||||
// --- Setup Client ---
|
|
||||||
clientDir, _ := os.MkdirTemp("", "agate-client-*")
|
|
||||||
defer os.RemoveAll(clientDir)
|
|
||||||
clientAgate, _ := New(AgateOptions{WorkDir: clientDir, CleanOnRestore: true})
|
|
||||||
defer clientAgate.Close()
|
|
||||||
|
|
||||||
// --- Test Scenario ---
|
|
||||||
t.Log("Client performing full download...")
|
|
||||||
if err := clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshotID, ""); err != nil {
|
|
||||||
t.Fatalf("Client failed to get snapshot: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify content
|
|
||||||
if err := clientAgate.RestoreSnapshot(ctx, snapshotID); err != nil {
|
|
||||||
t.Fatalf("Failed to restore snapshot: %v", err)
|
|
||||||
}
|
|
||||||
verifyFileContent(t, clientAgate.GetActiveDir(), "file1.txt", "full download")
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestGRPC_DownloadSnapshotDiff_Resumption tests the download resumption logic.
|
|
||||||
func TestGRPC_DownloadSnapshotDiff_Resumption(t *testing.T) {
|
|
||||||
// --- Setup Server ---
|
|
||||||
serverDir, _ := os.MkdirTemp("", "agate-server-*")
|
|
||||||
defer os.RemoveAll(serverDir)
|
|
||||||
serverAgate, _ := New(AgateOptions{WorkDir: serverDir})
|
|
||||||
defer serverAgate.Close()
|
|
||||||
dataDir := serverAgate.options.BlobStore.GetActiveDir()
|
|
||||||
os.MkdirAll(dataDir, 0755)
|
|
||||||
os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("content1"), 0644)
|
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
// Snap 1
|
|
||||||
snapshot1ID, _ := serverAgate.SaveSnapshot(ctx, "Snap1", "")
|
|
||||||
// Snap 2 (with changes)
|
|
||||||
os.WriteFile(filepath.Join(dataDir, "file2.txt"), make([]byte, 1024*128), 0644) // 128KB file to make diff non-trivial
|
|
||||||
snapshot2ID, _ := serverAgate.SaveSnapshot(ctx, "Snap2", snapshot1ID)
|
|
||||||
|
|
||||||
// Start Server
|
|
||||||
serverAddress := "localhost:50053"
|
|
||||||
server := remote.NewServer(serverAgate.manager)
|
|
||||||
go func() { server.Start(ctx, serverAddress) }()
|
|
||||||
defer server.Stop()
|
|
||||||
time.Sleep(100 * time.Millisecond)
|
|
||||||
|
|
||||||
// --- Setup Client ---
|
|
||||||
clientDir, _ := os.MkdirTemp("", "agate-client-*")
|
|
||||||
defer os.RemoveAll(clientDir)
|
|
||||||
rClient, err := remote.NewClient(serverAddress)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to create remote client: %v", err)
|
|
||||||
}
|
|
||||||
defer rClient.Close()
|
|
||||||
|
|
||||||
// --- Test Scenario ---
|
|
||||||
// 1. Manually download first part of the diff archive
|
|
||||||
diffPath := filepath.Join(clientDir, "diff.zip.part")
|
|
||||||
diffReader, err := serverAgate.manager.StreamSnapshotDiff(ctx, snapshot2ID, snapshot1ID, 0)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to get diff stream from manager: %v", err)
|
|
||||||
}
|
|
||||||
defer diffReader.Close()
|
|
||||||
|
|
||||||
// Read first 64KB
|
|
||||||
firstChunk := make([]byte, 64*1024)
|
|
||||||
n, err := io.ReadFull(diffReader, firstChunk)
|
|
||||||
if err != nil && err != io.ErrUnexpectedEOF {
|
|
||||||
t.Fatalf("Failed to read first chunk: %v, read %d bytes", err, n)
|
|
||||||
}
|
|
||||||
if err := os.WriteFile(diffPath, firstChunk[:n], 0644); err != nil {
|
|
||||||
t.Fatalf("Failed to write partial file: %v", err)
|
|
||||||
}
|
|
||||||
diffReader.Close() // Simulate connection drop
|
|
||||||
|
|
||||||
// 2. Resume download using the client
|
|
||||||
t.Log("Resuming download...")
|
|
||||||
if err := rClient.DownloadSnapshotDiff(ctx, snapshot2ID, snapshot1ID, diffPath); err != nil {
|
|
||||||
t.Fatalf("Failed to resume download: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 3. Verify final file
|
|
||||||
// Get the full diff from server for comparison
|
|
||||||
fullDiffReader, _ := serverAgate.manager.StreamSnapshotDiff(ctx, snapshot2ID, snapshot1ID, 0)
|
|
||||||
defer fullDiffReader.Close()
|
|
||||||
fullDiffData, _ := io.ReadAll(fullDiffReader)
|
|
||||||
|
|
||||||
resumedData, _ := os.ReadFile(diffPath)
|
|
||||||
|
|
||||||
if len(resumedData) != len(fullDiffData) {
|
|
||||||
t.Errorf("Resumed file size is incorrect. Got %d, want %d", len(resumedData), len(fullDiffData))
|
|
||||||
}
|
|
||||||
|
|
||||||
if sha256.Sum256(resumedData) != sha256.Sum256(fullDiffData) {
|
|
||||||
t.Error("File content mismatch after resumption")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -13,6 +13,15 @@ type SnapshotManager interface {
|
|||||||
// Returns the created Snapshot with its metadata or an error if the process fails.
|
// Returns the created Snapshot with its metadata or an error if the process fails.
|
||||||
CreateSnapshot(ctx context.Context, sourceDir string, name string, parentID string) (*store.Snapshot, error)
|
CreateSnapshot(ctx context.Context, sourceDir string, name string, parentID string) (*store.Snapshot, error)
|
||||||
|
|
||||||
|
// CreateSnapshotAsync initiates a background process to create a snapshot.
|
||||||
|
// Returns the job ID (which is also the snapshot ID) or an error if the process couldn't start.
|
||||||
|
// onStart is called in the background goroutine before the snapshot creation starts.
|
||||||
|
// onFinish is called in the background goroutine after the snapshot creation finishes (successfully or with error).
|
||||||
|
CreateSnapshotAsync(ctx context.Context, sourceDir string, name string, parentID string, onStart func(), onFinish func(string, error)) (string, error)
|
||||||
|
|
||||||
|
// GetSnapshotStatus retrieves the status of an asynchronous snapshot creation job.
|
||||||
|
GetSnapshotStatus(ctx context.Context, jobID string) (*store.SnapshotStatus, error)
|
||||||
|
|
||||||
// GetSnapshotDetails retrieves detailed metadata for a specific snapshot identified by its unique snapshotID.
|
// GetSnapshotDetails retrieves detailed metadata for a specific snapshot identified by its unique snapshotID.
|
||||||
// Returns a Snapshot object containing metadata
|
// Returns a Snapshot object containing metadata
|
||||||
GetSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error)
|
GetSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error)
|
||||||
@@ -38,6 +47,9 @@ type SnapshotManager interface {
|
|||||||
// It returns an io.ReadCloser for the archive stream and an error.
|
// It returns an io.ReadCloser for the archive stream and an error.
|
||||||
// The caller is responsible for closing the reader, which will also handle cleanup of temporary resources.
|
// The caller is responsible for closing the reader, which will also handle cleanup of temporary resources.
|
||||||
StreamSnapshotDiff(ctx context.Context, snapshotID, parentID string, offset int64) (io.ReadCloser, error)
|
StreamSnapshotDiff(ctx context.Context, snapshotID, parentID string, offset int64) (io.ReadCloser, error)
|
||||||
|
|
||||||
|
// GetSnapshotDiffInfo calculates the hash and size of a differential archive between two snapshots.
|
||||||
|
GetSnapshotDiffInfo(ctx context.Context, snapshotID, parentID string) (*store.DiffInfo, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// SnapshotServer defines the interface for a server that can share snapshots
|
// SnapshotServer defines the interface for a server that can share snapshots
|
||||||
|
|||||||
201
manager.go
201
manager.go
@@ -11,8 +11,10 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"gitea.unprism.ru/KRBL/Agate/models"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
|
|
||||||
"gitea.unprism.ru/KRBL/Agate/archive"
|
"gitea.unprism.ru/KRBL/Agate/archive"
|
||||||
@@ -25,6 +27,8 @@ type SnapshotManagerData struct {
|
|||||||
metadataStore store.MetadataStore
|
metadataStore store.MetadataStore
|
||||||
blobStore store.BlobStore
|
blobStore store.BlobStore
|
||||||
logger *log.Logger
|
logger *log.Logger
|
||||||
|
jobs map[string]*store.SnapshotStatus
|
||||||
|
jobsMutex sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func CreateSnapshotManager(metadataStore store.MetadataStore, blobStore store.BlobStore, logger *log.Logger) (interfaces.SnapshotManager, error) {
|
func CreateSnapshotManager(metadataStore store.MetadataStore, blobStore store.BlobStore, logger *log.Logger) (interfaces.SnapshotManager, error) {
|
||||||
@@ -41,6 +45,7 @@ func CreateSnapshotManager(metadataStore store.MetadataStore, blobStore store.Bl
|
|||||||
metadataStore: metadataStore,
|
metadataStore: metadataStore,
|
||||||
blobStore: blobStore,
|
blobStore: blobStore,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
|
jobs: make(map[string]*store.SnapshotStatus),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -49,13 +54,13 @@ func (data *SnapshotManagerData) CreateSnapshot(ctx context.Context, sourceDir s
|
|||||||
info, err := os.Stat(sourceDir)
|
info, err := os.Stat(sourceDir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
return nil, ErrSourceNotFound
|
return nil, models.ErrSourceNotFound
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("failed to access source directory: %w", err)
|
return nil, fmt.Errorf("failed to access source directory: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !info.IsDir() {
|
if !info.IsDir() {
|
||||||
return nil, ErrSourceNotDirectory
|
return nil, models.ErrSourceNotDirectory
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if parent exists if specified
|
// Check if parent exists if specified
|
||||||
@@ -70,6 +75,98 @@ func (data *SnapshotManagerData) CreateSnapshot(ctx context.Context, sourceDir s
|
|||||||
// Generate a unique ID for the snapshot
|
// Generate a unique ID for the snapshot
|
||||||
snapshotID := uuid.New().String()
|
snapshotID := uuid.New().String()
|
||||||
|
|
||||||
|
return data.createSnapshotInternal(ctx, sourceDir, name, parentID, snapshotID, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (data *SnapshotManagerData) CreateSnapshotAsync(ctx context.Context, sourceDir string, name string, parentID string, onStart func(), onFinish func(string, error)) (string, error) {
|
||||||
|
// Validate source directory
|
||||||
|
info, err := os.Stat(sourceDir)
|
||||||
|
if err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
return "", models.ErrSourceNotFound
|
||||||
|
}
|
||||||
|
return "", fmt.Errorf("failed to access source directory: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !info.IsDir() {
|
||||||
|
return "", models.ErrSourceNotDirectory
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if parent exists if specified
|
||||||
|
if parentID != "" {
|
||||||
|
_, err := data.metadataStore.GetSnapshotMetadata(ctx, parentID)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Println("failed to check parent snapshot: %w", err)
|
||||||
|
parentID = ""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
snapshotID := uuid.New().String()
|
||||||
|
|
||||||
|
data.jobsMutex.Lock()
|
||||||
|
data.jobs[snapshotID] = &store.SnapshotStatus{
|
||||||
|
ID: snapshotID,
|
||||||
|
Status: "pending",
|
||||||
|
Progress: 0,
|
||||||
|
}
|
||||||
|
data.jobsMutex.Unlock()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
if onStart != nil {
|
||||||
|
onStart()
|
||||||
|
}
|
||||||
|
|
||||||
|
data.jobsMutex.Lock()
|
||||||
|
if job, ok := data.jobs[snapshotID]; ok {
|
||||||
|
job.Status = "running"
|
||||||
|
}
|
||||||
|
data.jobsMutex.Unlock()
|
||||||
|
|
||||||
|
_, err := data.createSnapshotInternal(context.Background(), sourceDir, name, parentID, snapshotID, func(current, total int64) {
|
||||||
|
data.jobsMutex.Lock()
|
||||||
|
defer data.jobsMutex.Unlock()
|
||||||
|
if job, ok := data.jobs[snapshotID]; ok {
|
||||||
|
if total > 0 {
|
||||||
|
job.Progress = float64(current) / float64(total)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
data.jobsMutex.Lock()
|
||||||
|
if job, ok := data.jobs[snapshotID]; ok {
|
||||||
|
if err != nil {
|
||||||
|
job.Status = "failed"
|
||||||
|
job.Error = err.Error()
|
||||||
|
} else {
|
||||||
|
job.Status = "done"
|
||||||
|
job.Progress = 1.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
data.jobsMutex.Unlock()
|
||||||
|
|
||||||
|
if onFinish != nil {
|
||||||
|
onFinish(snapshotID, err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return snapshotID, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (data *SnapshotManagerData) GetSnapshotStatus(ctx context.Context, jobID string) (*store.SnapshotStatus, error) {
|
||||||
|
data.jobsMutex.RLock()
|
||||||
|
defer data.jobsMutex.RUnlock()
|
||||||
|
|
||||||
|
job, ok := data.jobs[jobID]
|
||||||
|
if !ok {
|
||||||
|
return nil, models.ErrNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return a copy to avoid race conditions
|
||||||
|
statusCopy := *job
|
||||||
|
return &statusCopy, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (data *SnapshotManagerData) createSnapshotInternal(ctx context.Context, sourceDir, name, parentID, snapshotID string, onProgress func(current, total int64)) (*store.Snapshot, error) {
|
||||||
// Create a temporary file for the archive in the working directory
|
// Create a temporary file for the archive in the working directory
|
||||||
tempFilePath := filepath.Join(data.blobStore.GetBaseDir(), "temp-"+snapshotID+".zip")
|
tempFilePath := filepath.Join(data.blobStore.GetBaseDir(), "temp-"+snapshotID+".zip")
|
||||||
tempFile, err := os.Create(tempFilePath)
|
tempFile, err := os.Create(tempFilePath)
|
||||||
@@ -80,7 +177,7 @@ func (data *SnapshotManagerData) CreateSnapshot(ctx context.Context, sourceDir s
|
|||||||
defer os.Remove(tempFilePath) // Clean up temp file after we're done
|
defer os.Remove(tempFilePath) // Clean up temp file after we're done
|
||||||
|
|
||||||
// Create archive of the source directory
|
// Create archive of the source directory
|
||||||
if err := archive.CreateArchive(sourceDir, tempFilePath); err != nil {
|
if err := archive.CreateArchiveWithProgress(sourceDir, tempFilePath, onProgress); err != nil {
|
||||||
return nil, fmt.Errorf("failed to create archive: %w", err)
|
return nil, fmt.Errorf("failed to create archive: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -165,8 +262,8 @@ func (data *SnapshotManagerData) GetSnapshotDetails(ctx context.Context, snapsho
|
|||||||
// Retrieve snapshot metadata from the store
|
// Retrieve snapshot metadata from the store
|
||||||
snapshot, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID)
|
snapshot, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, ErrNotFound) {
|
if errors.Is(err, models.ErrNotFound) {
|
||||||
return nil, ErrNotFound
|
return nil, models.ErrNotFound
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("failed to retrieve snapshot details: %w", err)
|
return nil, fmt.Errorf("failed to retrieve snapshot details: %w", err)
|
||||||
}
|
}
|
||||||
@@ -191,7 +288,7 @@ func (data *SnapshotManagerData) DeleteSnapshot(ctx context.Context, snapshotID
|
|||||||
|
|
||||||
snapshot, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID)
|
snapshot, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, store.ErrNotFound) {
|
if errors.Is(err, models.ErrNotFound) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return fmt.Errorf("failed to check if snapshot exists: %w", err)
|
return fmt.Errorf("failed to check if snapshot exists: %w", err)
|
||||||
@@ -237,8 +334,8 @@ func (data *SnapshotManagerData) OpenFile(ctx context.Context, snapshotID string
|
|||||||
// First check if the snapshot exists
|
// First check if the snapshot exists
|
||||||
_, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID)
|
_, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, ErrNotFound) {
|
if errors.Is(err, models.ErrNotFound) {
|
||||||
return nil, ErrNotFound
|
return nil, models.ErrNotFound
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("failed to check if snapshot exists: %w", err)
|
return nil, fmt.Errorf("failed to check if snapshot exists: %w", err)
|
||||||
}
|
}
|
||||||
@@ -261,7 +358,7 @@ func (data *SnapshotManagerData) OpenFile(ctx context.Context, snapshotID string
|
|||||||
err := archive.ExtractFileFromArchive(blobPath, filePath, pw)
|
err := archive.ExtractFileFromArchive(blobPath, filePath, pw)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, archive.ErrFileNotFoundInArchive) {
|
if errors.Is(err, archive.ErrFileNotFoundInArchive) {
|
||||||
pw.CloseWithError(ErrFileNotFound)
|
pw.CloseWithError(models.ErrFileNotFound)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
pw.CloseWithError(fmt.Errorf("failed to extract file from archive: %w", err))
|
pw.CloseWithError(fmt.Errorf("failed to extract file from archive: %w", err))
|
||||||
@@ -284,8 +381,8 @@ func (data *SnapshotManagerData) ExtractSnapshot(ctx context.Context, snapshotID
|
|||||||
// First check if the snapshot exists
|
// First check if the snapshot exists
|
||||||
_, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID)
|
_, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, ErrNotFound) {
|
if errors.Is(err, models.ErrNotFound) {
|
||||||
return ErrNotFound
|
return models.ErrNotFound
|
||||||
}
|
}
|
||||||
return fmt.Errorf("failed to check if snapshot exists: %w", err)
|
return fmt.Errorf("failed to check if snapshot exists: %w", err)
|
||||||
}
|
}
|
||||||
@@ -388,8 +485,8 @@ func (data *SnapshotManagerData) UpdateSnapshotMetadata(ctx context.Context, sna
|
|||||||
|
|
||||||
snapshot, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID)
|
snapshot, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, store.ErrNotFound) {
|
if errors.Is(err, models.ErrNotFound) {
|
||||||
return ErrNotFound
|
return models.ErrNotFound
|
||||||
}
|
}
|
||||||
return fmt.Errorf("failed to get snapshot metadata: %w", err)
|
return fmt.Errorf("failed to get snapshot metadata: %w", err)
|
||||||
}
|
}
|
||||||
@@ -403,6 +500,37 @@ func (data *SnapshotManagerData) UpdateSnapshotMetadata(ctx context.Context, sna
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (data *SnapshotManagerData) GetSnapshotDiffInfo(ctx context.Context, snapshotID, parentID string) (*store.DiffInfo, error) {
|
||||||
|
tempArchivePath, tempStagingDir, err := data.createDiffArchive(ctx, snapshotID, parentID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to create diff archive for info: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if tempArchivePath == "" {
|
||||||
|
return &store.DiffInfo{SHA256: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", Size: 0}, nil // sha256 of empty string
|
||||||
|
}
|
||||||
|
|
||||||
|
defer os.Remove(tempArchivePath)
|
||||||
|
if tempStagingDir != "" {
|
||||||
|
defer os.RemoveAll(tempStagingDir)
|
||||||
|
}
|
||||||
|
|
||||||
|
hash, err := hash.CalculateFileHash(tempArchivePath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to calculate hash for diff archive: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
stat, err := os.Stat(tempArchivePath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to get size of diff archive: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &store.DiffInfo{
|
||||||
|
SHA256: hash,
|
||||||
|
Size: stat.Size(),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
// diffArchiveReader is a wrapper around an *os.File that handles cleanup of temporary files.
|
// diffArchiveReader is a wrapper around an *os.File that handles cleanup of temporary files.
|
||||||
type diffArchiveReader struct {
|
type diffArchiveReader struct {
|
||||||
*os.File
|
*os.File
|
||||||
@@ -413,15 +541,15 @@ type diffArchiveReader struct {
|
|||||||
// Close closes the file and removes the temporary archive and staging directory.
|
// Close closes the file and removes the temporary archive and staging directory.
|
||||||
func (r *diffArchiveReader) Close() error {
|
func (r *diffArchiveReader) Close() error {
|
||||||
err := r.File.Close()
|
err := r.File.Close()
|
||||||
os.Remove(r.tempArchive)
|
_ = os.Remove(r.tempArchive)
|
||||||
os.RemoveAll(r.tempStaging)
|
_ = os.RemoveAll(r.tempStaging)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (data *SnapshotManagerData) StreamSnapshotDiff(ctx context.Context, snapshotID, parentID string, offset int64) (io.ReadCloser, error) {
|
func (data *SnapshotManagerData) createDiffArchive(ctx context.Context, snapshotID, parentID string) (string, string, error) {
|
||||||
targetSnap, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID)
|
targetSnap, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to get target snapshot metadata: %w", err)
|
return "", "", fmt.Errorf("failed to get target snapshot metadata: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
parentFiles := make(map[string]string)
|
parentFiles := make(map[string]string)
|
||||||
@@ -449,51 +577,66 @@ func (data *SnapshotManagerData) StreamSnapshotDiff(ctx context.Context, snapsho
|
|||||||
}
|
}
|
||||||
|
|
||||||
if len(filesToInclude) == 0 {
|
if len(filesToInclude) == 0 {
|
||||||
return io.NopCloser(bytes.NewReader(nil)), nil
|
return "", "", nil
|
||||||
}
|
}
|
||||||
|
|
||||||
tempStagingDir, err := os.MkdirTemp(data.blobStore.GetBaseDir(), "diff-staging-*")
|
tempStagingDir, err := os.MkdirTemp(data.blobStore.GetBaseDir(), "diff-staging-*")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create temp staging directory: %w", err)
|
return "", "", fmt.Errorf("failed to create temp staging directory: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
targetBlobPath, err := data.blobStore.GetBlobPath(ctx, snapshotID)
|
targetBlobPath, err := data.blobStore.GetBlobPath(ctx, snapshotID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
os.RemoveAll(tempStagingDir)
|
os.RemoveAll(tempStagingDir)
|
||||||
return nil, err
|
return "", "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, filePath := range filesToInclude {
|
for _, filePath := range filesToInclude {
|
||||||
destPath := filepath.Join(tempStagingDir, filePath)
|
destPath := filepath.Join(tempStagingDir, filePath)
|
||||||
if err := os.MkdirAll(filepath.Dir(destPath), 0755); err != nil {
|
if err := os.MkdirAll(filepath.Dir(destPath), 0755); err != nil {
|
||||||
os.RemoveAll(tempStagingDir)
|
os.RemoveAll(tempStagingDir)
|
||||||
return nil, fmt.Errorf("failed to create dir for diff file: %w", err)
|
return "", "", fmt.Errorf("failed to create dir for diff file: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
fileWriter, err := os.Create(destPath)
|
fileWriter, err := os.Create(destPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
os.RemoveAll(tempStagingDir)
|
os.RemoveAll(tempStagingDir)
|
||||||
return nil, err
|
return "", "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = archive.ExtractFileFromArchive(targetBlobPath, filePath, fileWriter)
|
err = archive.ExtractFileFromArchive(targetBlobPath, filePath, fileWriter)
|
||||||
fileWriter.Close()
|
fileWriter.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
os.RemoveAll(tempStagingDir)
|
os.RemoveAll(tempStagingDir)
|
||||||
return nil, fmt.Errorf("failed to extract file %s for diff: %w", filePath, err)
|
return "", "", fmt.Errorf("failed to extract file %s for diff: %w", filePath, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tempArchivePath := filepath.Join(data.blobStore.GetBaseDir(), "diff-"+snapshotID+".zip")
|
tempArchivePath := filepath.Join(data.blobStore.GetBaseDir(), "diff-"+snapshotID+".zip")
|
||||||
if err := archive.CreateArchive(tempStagingDir, tempArchivePath); err != nil {
|
if err := archive.CreateArchive(tempStagingDir, tempArchivePath); err != nil {
|
||||||
os.RemoveAll(tempStagingDir)
|
_ = os.RemoveAll(tempStagingDir)
|
||||||
os.Remove(tempArchivePath)
|
_ = os.Remove(tempArchivePath)
|
||||||
return nil, fmt.Errorf("failed to create diff archive: %w", err)
|
return "", "", fmt.Errorf("failed to create diff archive: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return tempArchivePath, tempStagingDir, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (data *SnapshotManagerData) StreamSnapshotDiff(ctx context.Context, snapshotID, parentID string, offset int64) (io.ReadCloser, error) {
|
||||||
|
tempArchivePath, tempStagingDir, err := data.createDiffArchive(ctx, snapshotID, parentID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to create diff archive for streaming: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if tempArchivePath == "" {
|
||||||
|
return io.NopCloser(bytes.NewReader(nil)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
archiveFile, err := os.Open(tempArchivePath)
|
archiveFile, err := os.Open(tempArchivePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
os.RemoveAll(tempStagingDir)
|
if tempStagingDir != "" {
|
||||||
|
os.RemoveAll(tempStagingDir)
|
||||||
|
}
|
||||||
os.Remove(tempArchivePath)
|
os.Remove(tempArchivePath)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -501,7 +644,9 @@ func (data *SnapshotManagerData) StreamSnapshotDiff(ctx context.Context, snapsho
|
|||||||
if offset > 0 {
|
if offset > 0 {
|
||||||
if _, err := archiveFile.Seek(offset, io.SeekStart); err != nil {
|
if _, err := archiveFile.Seek(offset, io.SeekStart); err != nil {
|
||||||
archiveFile.Close()
|
archiveFile.Close()
|
||||||
os.RemoveAll(tempStagingDir)
|
if tempStagingDir != "" {
|
||||||
|
os.RemoveAll(tempStagingDir)
|
||||||
|
}
|
||||||
os.Remove(tempArchivePath)
|
os.Remove(tempArchivePath)
|
||||||
return nil, fmt.Errorf("failed to seek in diff archive: %w", err)
|
return nil, fmt.Errorf("failed to seek in diff archive: %w", err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
package agate
|
package models
|
||||||
|
|
||||||
import "errors"
|
import "errors"
|
||||||
|
|
||||||
@@ -11,6 +11,7 @@ import (
|
|||||||
"google.golang.org/grpc/credentials/insecure"
|
"google.golang.org/grpc/credentials/insecure"
|
||||||
|
|
||||||
agateGrpc "gitea.unprism.ru/KRBL/Agate/grpc"
|
agateGrpc "gitea.unprism.ru/KRBL/Agate/grpc"
|
||||||
|
"gitea.unprism.ru/KRBL/Agate/hash"
|
||||||
"gitea.unprism.ru/KRBL/Agate/interfaces"
|
"gitea.unprism.ru/KRBL/Agate/interfaces"
|
||||||
"gitea.unprism.ru/KRBL/Agate/store"
|
"gitea.unprism.ru/KRBL/Agate/store"
|
||||||
)
|
)
|
||||||
@@ -89,8 +90,43 @@ func (c *Client) FetchSnapshotDetails(ctx context.Context, snapshotID string) (*
|
|||||||
return snapshot, nil
|
return snapshot, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetDiffInfo gets the hash and size of a differential archive.
|
||||||
|
func (c *Client) GetDiffInfo(ctx context.Context, snapshotID, localParentID string) (*store.DiffInfo, error) {
|
||||||
|
req := &agateGrpc.GetDiffInfoRequest{
|
||||||
|
SnapshotId: snapshotID,
|
||||||
|
LocalParentId: localParentID,
|
||||||
|
}
|
||||||
|
|
||||||
|
info, err := c.client.GetDiffInfo(ctx, req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to get diff info: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &store.DiffInfo{
|
||||||
|
SHA256: info.Sha256Hash,
|
||||||
|
Size: info.SizeBytes,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
// DownloadSnapshotDiff скачивает архив с разницей между снапшотами.
|
// DownloadSnapshotDiff скачивает архив с разницей между снапшотами.
|
||||||
func (c *Client) DownloadSnapshotDiff(ctx context.Context, snapshotID, localParentID, targetPath string) error {
|
func (c *Client) DownloadSnapshotDiff(ctx context.Context, snapshotID, localParentID, targetPath string) error {
|
||||||
|
// Check for local file and validate it
|
||||||
|
if fileInfo, err := os.Stat(targetPath); err == nil {
|
||||||
|
remoteDiffInfo, err := c.GetDiffInfo(ctx, snapshotID, localParentID)
|
||||||
|
if err != nil {
|
||||||
|
// Log the error but proceed with download
|
||||||
|
fmt.Printf("could not get remote diff info: %v. proceeding with download.", err)
|
||||||
|
} else {
|
||||||
|
if fileInfo.Size() == remoteDiffInfo.Size {
|
||||||
|
localHash, err := hash.CalculateFileHash(targetPath)
|
||||||
|
if err == nil && localHash == remoteDiffInfo.SHA256 {
|
||||||
|
fmt.Printf("local snapshot archive %s is valid, skipping download.", targetPath)
|
||||||
|
return nil // File is valid, skip download
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
var offset int64
|
var offset int64
|
||||||
fileInfo, err := os.Stat(targetPath)
|
fileInfo, err := os.Stat(targetPath)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
@@ -118,7 +154,11 @@ func (c *Client) DownloadSnapshotDiff(ctx context.Context, snapshotID, localPare
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to open file %s: %w", targetPath, err)
|
return fmt.Errorf("failed to open file %s: %w", targetPath, err)
|
||||||
}
|
}
|
||||||
defer file.Close()
|
defer func() {
|
||||||
|
if err := file.Close(); err != nil {
|
||||||
|
fmt.Printf("failed to close file: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
resp, err := stream.Recv()
|
resp, err := stream.Recv()
|
||||||
|
|||||||
@@ -158,6 +158,19 @@ func (s *Server) DownloadSnapshotDiff(req *agateGrpc.DownloadSnapshotDiffRequest
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetDiffInfo реализует gRPC-метод GetDiffInfo.
|
||||||
|
func (s *Server) GetDiffInfo(ctx context.Context, req *agateGrpc.GetDiffInfoRequest) (*agateGrpc.DiffInfo, error) {
|
||||||
|
diffInfo, err := s.manager.GetSnapshotDiffInfo(ctx, req.SnapshotId, req.LocalParentId)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to get diff info: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &agateGrpc.DiffInfo{
|
||||||
|
Sha256Hash: diffInfo.SHA256,
|
||||||
|
SizeBytes: diffInfo.Size,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Вспомогательная функция для конвертации store.SnapshotInfo в grpc.SnapshotInfo
|
// Вспомогательная функция для конвертации store.SnapshotInfo в grpc.SnapshotInfo
|
||||||
func convertToGrpcSnapshotInfo(info store.SnapshotInfo) *agateGrpc.SnapshotInfo {
|
func convertToGrpcSnapshotInfo(info store.SnapshotInfo) *agateGrpc.SnapshotInfo {
|
||||||
return &agateGrpc.SnapshotInfo{
|
return &agateGrpc.SnapshotInfo{
|
||||||
|
|||||||
@@ -1,9 +0,0 @@
|
|||||||
package store
|
|
||||||
|
|
||||||
import "errors"
|
|
||||||
|
|
||||||
// Common errors that can be used by store implementations
|
|
||||||
var (
|
|
||||||
// ErrNotFound means that a requested resource was not found
|
|
||||||
ErrNotFound = errors.New("resource not found")
|
|
||||||
)
|
|
||||||
@@ -6,12 +6,37 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitea.unprism.ru/KRBL/Agate/models"
|
||||||
"gitea.unprism.ru/KRBL/Agate/store"
|
"gitea.unprism.ru/KRBL/Agate/store"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
const blobExtension = ".zip"
|
const blobExtension = ".zip"
|
||||||
|
|
||||||
|
// progressReader — обертка для логирования
|
||||||
|
type progressReader struct {
|
||||||
|
reader io.Reader
|
||||||
|
totalRead int64
|
||||||
|
lastLog time.Time
|
||||||
|
snapshotID string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pr *progressReader) Read(p []byte) (int, error) {
|
||||||
|
n, err := pr.reader.Read(p)
|
||||||
|
pr.totalRead += int64(n)
|
||||||
|
|
||||||
|
// Логируем каждые 5 секунд
|
||||||
|
if time.Since(pr.lastLog) > 5*time.Second {
|
||||||
|
log.Printf("Snapshot %s download progress: %.2f MB downloaded",
|
||||||
|
pr.snapshotID, float64(pr.totalRead)/1024/1024)
|
||||||
|
pr.lastLog = time.Now()
|
||||||
|
}
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
// fileSystemStore реализует интерфейс store.BlobStore с использованием локальной файловой системы.
|
// fileSystemStore реализует интерфейс store.BlobStore с использованием локальной файловой системы.
|
||||||
type fileSystemStore struct {
|
type fileSystemStore struct {
|
||||||
baseDir string // Директория для хранения блобов (архивов)
|
baseDir string // Директория для хранения блобов (архивов)
|
||||||
@@ -46,23 +71,49 @@ func (fs *fileSystemStore) getBlobPath(snapshotID string) string {
|
|||||||
// StoreBlob сохраняет данные из reader в файл в baseDir.
|
// StoreBlob сохраняет данные из reader в файл в baseDir.
|
||||||
func (fs *fileSystemStore) StoreBlob(ctx context.Context, snapshotID string, reader io.Reader) (string, error) {
|
func (fs *fileSystemStore) StoreBlob(ctx context.Context, snapshotID string, reader io.Reader) (string, error) {
|
||||||
blobPath := fs.getBlobPath(snapshotID)
|
blobPath := fs.getBlobPath(snapshotID)
|
||||||
|
log.Printf("Starting to store blob for snapshot %s at %s", snapshotID, blobPath)
|
||||||
|
|
||||||
// Создаем или перезаписываем файл
|
// Используем временный файл, чтобы не повредить (возможно) существующий валидный файл
|
||||||
file, err := os.Create(blobPath)
|
// если загрузка упадет на середине.
|
||||||
|
tempPath := blobPath + ".tmp"
|
||||||
|
file, err := os.Create(tempPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", fmt.Errorf("failed to create blob file %s: %w", blobPath, err)
|
return "", fmt.Errorf("failed to create temp blob file %s: %w", tempPath, err)
|
||||||
}
|
}
|
||||||
defer file.Close() // Гарантируем закрытие файла
|
defer file.Close()
|
||||||
|
|
||||||
// Копируем данные из ридера в файл
|
// Оборачиваем reader для логирования
|
||||||
_, err = io.Copy(file, reader)
|
pr := &progressReader{
|
||||||
if err != nil {
|
reader: reader,
|
||||||
// Если произошла ошибка копирования, удаляем неполный файл
|
snapshotID: snapshotID,
|
||||||
os.Remove(blobPath)
|
lastLog: time.Now(),
|
||||||
return "", fmt.Errorf("failed to write data to blob file %s: %w", blobPath, err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Возвращаем путь к созданному файлу
|
// Копируем данные
|
||||||
|
written, err := io.Copy(file, pr)
|
||||||
|
if err != nil {
|
||||||
|
file.Close()
|
||||||
|
os.Remove(tempPath) // Удаляем мусор
|
||||||
|
return "", fmt.Errorf("failed to write data to blob file %s: %w", tempPath, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Важный момент: Sync, чтобы убедиться, что данные на диске
|
||||||
|
if err := file.Sync(); err != nil {
|
||||||
|
file.Close()
|
||||||
|
os.Remove(tempPath)
|
||||||
|
return "", fmt.Errorf("failed to sync file to disk: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
file.Close() // Закрываем перед переименованием
|
||||||
|
|
||||||
|
// Переименовываем временный файл в финальный (атомарная операция)
|
||||||
|
if err := os.Rename(tempPath, blobPath); err != nil {
|
||||||
|
return "", fmt.Errorf("failed to rename temp file to final blob: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Successfully stored blob for snapshot %s. Total size: %.2f MB",
|
||||||
|
snapshotID, float64(written)/1024/1024)
|
||||||
|
|
||||||
return blobPath, nil
|
return blobPath, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -75,7 +126,7 @@ func (fs *fileSystemStore) RetrieveBlob(ctx context.Context, snapshotID string)
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
// Если файл не найден, возвращаем кастомную ошибку
|
// Если файл не найден, возвращаем кастомную ошибку
|
||||||
return nil, store.ErrNotFound
|
return nil, models.ErrNotFound
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("failed to open blob file %s: %w", blobPath, err)
|
return nil, fmt.Errorf("failed to open blob file %s: %w", blobPath, err)
|
||||||
}
|
}
|
||||||
@@ -109,7 +160,7 @@ func (fs *fileSystemStore) GetBlobPath(ctx context.Context, snapshotID string) (
|
|||||||
// Проверяем существование файла
|
// Проверяем существование файла
|
||||||
if _, err := os.Stat(blobPath); err != nil {
|
if _, err := os.Stat(blobPath); err != nil {
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
return "", store.ErrNotFound
|
return "", models.ErrNotFound
|
||||||
}
|
}
|
||||||
return "", fmt.Errorf("failed to stat blob file %s: %w", blobPath, err)
|
return "", fmt.Errorf("failed to stat blob file %s: %w", blobPath, err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,11 +6,13 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"gitea.unprism.ru/KRBL/Agate/store"
|
|
||||||
_ "github.com/mattn/go-sqlite3"
|
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"gitea.unprism.ru/KRBL/Agate/models"
|
||||||
|
"gitea.unprism.ru/KRBL/Agate/store"
|
||||||
|
_ "github.com/mattn/go-sqlite3"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -130,7 +132,7 @@ func (s *sqliteStore) GetSnapshotMetadata(ctx context.Context, snapshotID string
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, sql.ErrNoRows) {
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
// Если запись не найдена, возвращаем кастомную ошибку
|
// Если запись не найдена, возвращаем кастомную ошибку
|
||||||
return nil, store.ErrNotFound
|
return nil, models.ErrNotFound
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("failed to query snapshot %s: %w", snapshotID, err)
|
return nil, fmt.Errorf("failed to query snapshot %s: %w", snapshotID, err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,6 +6,12 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// DiffInfo represents metadata about a differential archive.
|
||||||
|
type DiffInfo struct {
|
||||||
|
SHA256 string
|
||||||
|
Size int64
|
||||||
|
}
|
||||||
|
|
||||||
// FileInfo represents metadata and attributes of a file or directory.
|
// FileInfo represents metadata and attributes of a file or directory.
|
||||||
type FileInfo struct {
|
type FileInfo struct {
|
||||||
Path string // Path represents the relative or absolute location of the file or directory in the filesystem.
|
Path string // Path represents the relative or absolute location of the file or directory in the filesystem.
|
||||||
@@ -31,6 +37,14 @@ type SnapshotInfo struct {
|
|||||||
CreationTime time.Time // Время создания
|
CreationTime time.Time // Время создания
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SnapshotStatus represents the status of an asynchronous snapshot creation process.
|
||||||
|
type SnapshotStatus struct {
|
||||||
|
ID string // Unique identifier of the job (usually same as Snapshot ID)
|
||||||
|
Status string // Current status: "pending", "running", "done", "failed"
|
||||||
|
Progress float64 // Progress from 0.0 to 1.0
|
||||||
|
Error string // Error message if failed
|
||||||
|
}
|
||||||
|
|
||||||
// ListOptions provides options for filtering and paginating snapshot lists
|
// ListOptions provides options for filtering and paginating snapshot lists
|
||||||
type ListOptions struct {
|
type ListOptions struct {
|
||||||
FilterByName string // Filter snapshots by name (substring match)
|
FilterByName string // Filter snapshots by name (substring match)
|
||||||
|
|||||||
Reference in New Issue
Block a user