6 Commits

20 changed files with 1405 additions and 1072 deletions

View File

@ -15,37 +15,37 @@ gen-proto:
--grpc-gateway_out=grpc --grpc-gateway_opt paths=source_relative \
./grpc/snapshot.proto
# Запуск всех тестов
# Run all tests
test:
go test -v ./...
# Запуск модульных тестов
# Run unit tests
test-unit:
go test -v ./store/... ./hash/... ./archive/...
# Запуск интеграционных тестов
# Run integration tests
test-integration:
go test -v -tags=integration ./...
# Запуск функциональных тестов
# Run functional tests
test-functional:
go test -v -run TestFull ./...
# Запуск тестов производительности
# Run performance tests
test-performance:
go test -v -run TestPerformanceMetrics ./...
go test -v -bench=. ./...
# Запуск тестов с покрытием кода
# Run tests with code coverage
test-coverage:
go test -v -coverprofile=coverage.out ./...
go tool cover -html=coverage.out -o coverage.html
# Запуск линтера
# Run linter
lint:
golangci-lint run
# Запуск всех проверок (тесты + линтер)
# Run all checks (tests + linter)
check: test lint
.PHONY: download-third-party gen-proto test test-unit test-integration test-functional test-performance test-coverage lint check

View File

@ -223,6 +223,41 @@ func main() {
## Advanced Usage
### Registering a Local Snapshot
You can register a local snapshot from an existing archive file with a specified UUID:
```go
// Register a local snapshot from an archive file
archivePath := "/path/to/your/archive.zip"
snapshotID := "custom-uuid-for-snapshot"
snapshotName := "My Local Snapshot"
if err := ag.RegisterLocalSnapshot(ctx, archivePath, snapshotID, snapshotName); err != nil {
log.Fatalf("Failed to register local snapshot: %v", err)
}
```
### Downloading Only Snapshot Metadata
You can download only the metadata of a snapshot from a remote server without downloading the actual files:
```go
// Download only the metadata of a snapshot from a remote server
remoteAddress := "remote-server:50051"
snapshotID := "snapshot-id-to-download"
if err := ag.GetRemoteSnapshotMetadata(ctx, remoteAddress, snapshotID); err != nil {
log.Fatalf("Failed to download snapshot metadata: %v", err)
}
// If you have a local blob but missing metadata, you can restore the metadata
// by passing an empty address
if err := ag.GetRemoteSnapshotMetadata(ctx, "", snapshotID); err != nil {
log.Fatalf("Failed to restore snapshot metadata: %v", err)
}
```
### Creating Incremental Snapshots
You can create incremental snapshots by specifying a parent snapshot ID:
@ -294,6 +329,8 @@ The main entry point for the library.
- `ConnectRemote(address string) (*grpc.SnapshotClient, error)` - Connect to a remote server
- `GetRemoteSnapshotList(ctx context.Context, address string) ([]store.SnapshotInfo, error)` - List snapshots from a remote server
- `GetRemoteSnapshot(ctx context.Context, address string, snapshotID string, localParentID string) error` - Download a snapshot from a remote server
- `RegisterLocalSnapshot(ctx context.Context, archivePath string, snapshotID string, name string) error` - Register a local snapshot from an archive path with a specified UUID
- `GetRemoteSnapshotMetadata(ctx context.Context, address string, snapshotID string) error` - Download only the metadata of a snapshot from a remote server
### AgateOptions
@ -304,7 +341,3 @@ Configuration options for the Agate library.
- `CloseFunc func() error` - Called before a snapshot is created or restored
- `MetadataStore store.MetadataStore` - Implementation of the metadata store
- `BlobStore store.BlobStore` - Implementation of the blob store
## License
[Add your license information here]

193
api.go
View File

@ -4,9 +4,14 @@ import (
"context"
"errors"
"fmt"
"gitea.unprism.ru/KRBL/Agate/grpc"
"gitea.unprism.ru/KRBL/Agate/archive"
"gitea.unprism.ru/KRBL/Agate/interfaces"
"gitea.unprism.ru/KRBL/Agate/remote"
"io"
"log"
"os"
"path/filepath"
"sync"
"gitea.unprism.ru/KRBL/Agate/store"
"gitea.unprism.ru/KRBL/Agate/stores"
@ -35,11 +40,18 @@ type AgateOptions struct {
// Use the stores package to initialize a custom implementation:
// blobStore, err := stores.NewDefaultBlobStore(blobsDir)
BlobStore store.BlobStore
// CleanOnRestore specifies whether the target directory should be cleaned before restoring a snapshot.
CleanOnRestore bool
// Logger is the logger to use for output. If nil, logging is disabled.
Logger *log.Logger
}
// Agate is the main entry point for the snapshot library.
type Agate struct {
manager SnapshotManager
mutex sync.Mutex
manager interfaces.SnapshotManager
options AgateOptions
metadataDir string
blobsDir string
@ -53,6 +65,11 @@ func New(options AgateOptions) (*Agate, error) {
return nil, errors.New("work directory cannot be empty")
}
// Initialize logger if not provided
if options.Logger == nil {
options.Logger = log.New(io.Discard, "", 0)
}
// Create the work directory if it doesn't exist
if err := os.MkdirAll(options.WorkDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create work directory: %w", err)
@ -109,7 +126,7 @@ func New(options AgateOptions) (*Agate, error) {
}
// Create the snapshot manager
manager, err := CreateSnapshotManager(metadataStore, blobStore)
manager, err := CreateSnapshotManager(metadataStore, blobStore, options.Logger)
if err != nil {
return nil, fmt.Errorf("failed to create snapshot manager: %w", err)
}
@ -143,11 +160,28 @@ func New(options AgateOptions) (*Agate, error) {
return agate, nil
}
func (a *Agate) GetActiveDir() string {
return a.options.BlobStore.GetActiveDir()
}
func (a *Agate) GetMetadataDir() string {
return a.metadataDir
}
func (a *Agate) GetBlobsDir() string {
return a.blobsDir
}
// SaveSnapshot creates a new snapshot from the current state of the active directory.
// If parentID is provided, it will be set as the parent of the new snapshot.
// If parentID is empty, it will use the ID of the snapshot currently loaded in the active directory.
// Returns the ID of the created snapshot.
func (a *Agate) SaveSnapshot(ctx context.Context, name string, parentID string) (string, error) {
a.mutex.Lock()
defer a.mutex.Unlock()
a.options.Logger.Printf("Creating new snapshot with name: %s", name)
// Call CloseFunc if provided
if a.options.CloseFunc != nil {
if err := a.options.CloseFunc(); err != nil {
@ -158,7 +192,7 @@ func (a *Agate) SaveSnapshot(ctx context.Context, name string, parentID string)
defer func() {
if a.options.OpenFunc != nil {
if err := a.options.OpenFunc(a.options.BlobStore.GetActiveDir()); err != nil {
fmt.Printf("Failed to open resources after snapshot: %v\n", err)
a.options.Logger.Printf("ERROR: failed to open resources after snapshot creation: %v", err)
}
}
}()
@ -173,9 +207,12 @@ func (a *Agate) SaveSnapshot(ctx context.Context, name string, parentID string)
// Create the snapshot
snapshot, err := a.manager.CreateSnapshot(ctx, a.options.BlobStore.GetActiveDir(), name, effectiveParentID)
if err != nil {
a.options.Logger.Printf("ERROR: failed to create snapshot: %v", err)
return "", fmt.Errorf("failed to create snapshot: %w", err)
}
a.options.Logger.Printf("Successfully created snapshot with ID: %s", snapshot.ID)
// Update the current snapshot ID to the newly created snapshot
a.currentSnapshotID = snapshot.ID
@ -189,6 +226,11 @@ func (a *Agate) SaveSnapshot(ctx context.Context, name string, parentID string)
// RestoreSnapshot extracts a snapshot to the active directory.
func (a *Agate) RestoreSnapshot(ctx context.Context, snapshotID string) error {
a.mutex.Lock()
defer a.mutex.Unlock()
a.options.Logger.Printf("Restoring snapshot with ID: %s", snapshotID)
// Call CloseFunc if provided
if a.options.CloseFunc != nil {
if err := a.options.CloseFunc(); err != nil {
@ -197,10 +239,13 @@ func (a *Agate) RestoreSnapshot(ctx context.Context, snapshotID string) error {
}
// Extract the snapshot
if err := a.manager.ExtractSnapshot(ctx, snapshotID, a.options.BlobStore.GetActiveDir()); err != nil {
if err := a.manager.ExtractSnapshot(ctx, snapshotID, a.options.BlobStore.GetActiveDir(), a.options.CleanOnRestore); err != nil {
a.options.Logger.Printf("ERROR: failed to extract snapshot: %v", err)
return fmt.Errorf("failed to extract snapshot: %w", err)
}
a.options.Logger.Printf("Successfully restored snapshot with ID: %s", snapshotID)
// Save the ID of the snapshot that was restored
a.currentSnapshotID = snapshotID
@ -212,6 +257,7 @@ func (a *Agate) RestoreSnapshot(ctx context.Context, snapshotID string) error {
// Call OpenFunc if provided
if a.options.OpenFunc != nil {
if err := a.options.OpenFunc(a.options.BlobStore.GetActiveDir()); err != nil {
a.options.Logger.Printf("ERROR: failed to open resources after restore: %v", err)
return fmt.Errorf("failed to open resources after restore: %w", err)
}
}
@ -221,6 +267,9 @@ func (a *Agate) RestoreSnapshot(ctx context.Context, snapshotID string) error {
// RestoreSnapshot extracts a snapshot to the directory.
func (a *Agate) RestoreSnapshotToDir(ctx context.Context, snapshotID string, dir string) error {
a.mutex.Lock()
defer a.mutex.Unlock()
// Call CloseFunc if provided
if a.options.CloseFunc != nil {
if err := a.options.CloseFunc(); err != nil {
@ -231,13 +280,13 @@ func (a *Agate) RestoreSnapshotToDir(ctx context.Context, snapshotID string, dir
defer func() {
if a.options.OpenFunc != nil {
if err := a.options.OpenFunc(dir); err != nil {
fmt.Printf("Failed to open resources after snapshot: %v\n", err)
a.options.Logger.Printf("ERROR: failed to open resources after snapshot restore: %v", err)
}
}
}()
// Extract the snapshot
if err := a.manager.ExtractSnapshot(ctx, snapshotID, dir); err != nil {
if err := a.manager.ExtractSnapshot(ctx, snapshotID, dir, a.options.CleanOnRestore); err != nil {
return fmt.Errorf("failed to extract snapshot: %w", err)
}
@ -256,7 +305,9 @@ func (a *Agate) RestoreSnapshotToDir(ctx context.Context, snapshotID string, dir
// ListSnapshots returns a list of all available snapshots.
func (a *Agate) ListSnapshots(ctx context.Context) ([]store.SnapshotInfo, error) {
return a.manager.ListSnapshots(ctx)
// Create empty ListOptions since we don't have filtering/pagination in this API yet
opts := store.ListOptions{}
return a.manager.ListSnapshots(ctx, opts)
}
// GetSnapshotDetails returns detailed information about a specific snapshot.
@ -283,82 +334,98 @@ func (a *Agate) saveCurrentSnapshotID() error {
return os.WriteFile(a.currentIDFile, []byte(a.currentSnapshotID), 0644)
}
func (a *Agate) Open() error {
return a.options.OpenFunc(a.GetActiveDir())
}
// Close releases all resources used by the Agate instance.
func (a *Agate) Close() error {
// Currently, we don't have a way to close the manager directly
// This would be a good addition in the future
if a.options.CloseFunc != nil {
return a.options.CloseFunc()
}
return nil
}
// StartServer starts a gRPC server to share snapshots.
func (a *Agate) StartServer(ctx context.Context, address string) error {
_, err := grpc.RunServer(ctx, a.manager, address)
if err != nil {
return fmt.Errorf("failed to start server: %w", err)
}
// We don't store the server reference because we don't have a way to stop it yet
// In a future version, we could add a StopServer method
return nil
// Использование нового remote.Server
server := remote.NewServer(a.manager)
return server.Start(ctx, address)
}
// ConnectRemote connects to a remote snapshot server.
// Returns a client that can be used to interact with the remote server.
func (a *Agate) ConnectRemote(address string) (*grpc.SnapshotClient, error) {
client, err := grpc.ConnectToServer(address)
if err != nil {
return nil, fmt.Errorf("failed to connect to remote server: %w", err)
}
return client, nil
}
// GetRemoteSnapshotList retrieves a list of snapshots from a remote server.
func (a *Agate) GetRemoteSnapshotList(ctx context.Context, address string) ([]store.SnapshotInfo, error) {
client, err := a.ConnectRemote(address)
if err != nil {
return nil, err
}
defer client.Close()
return client.ListSnapshots(ctx)
}
// GetRemoteSnapshot downloads a snapshot from a remote server.
// If localParentID is provided, it will be used to optimize the download by skipping files that already exist locally.
// GetRemoteSnapshot downloads a snapshot from a remote server, using an efficient differential update.
func (a *Agate) GetRemoteSnapshot(ctx context.Context, address string, snapshotID string, localParentID string) error {
client, err := a.ConnectRemote(address)
client, err := remote.NewClient(address)
if err != nil {
return err
}
defer client.Close()
// Create a temporary directory for the downloaded snapshot
tempDir := filepath.Join(a.options.WorkDir, "temp", snapshotID)
if err := os.MkdirAll(tempDir, 0755); err != nil {
return fmt.Errorf("failed to create temporary directory: %w", err)
}
// Download the snapshot
if err := client.DownloadSnapshot(ctx, snapshotID, tempDir, localParentID); err != nil {
return fmt.Errorf("failed to download snapshot: %w", err)
}
// Get the snapshot details to create a local copy
details, err := client.FetchSnapshotDetails(ctx, snapshotID)
remoteSnapshot, err := client.FetchSnapshotDetails(ctx, snapshotID)
if err != nil {
return fmt.Errorf("failed to get snapshot details: %w", err)
return fmt.Errorf("failed to get remote snapshot details: %w", err)
}
// Create a local snapshot from the downloaded files
_, err = a.manager.CreateSnapshot(ctx, tempDir, details.Name, localParentID)
// 1. Подготовка
tempDownloadDir := filepath.Join(a.options.WorkDir, "temp_download")
if err := os.MkdirAll(tempDownloadDir, 0755); err != nil {
return fmt.Errorf("failed to create temp download dir: %w", err)
}
diffArchivePath := filepath.Join(tempDownloadDir, snapshotID+"_diff.zip")
diffPartPath := diffArchivePath + ".part"
// 2. Скачивание дельты с докачкой
a.options.Logger.Printf("Downloading diff for snapshot %s from parent %s", snapshotID, localParentID)
if err := client.DownloadSnapshotDiff(ctx, snapshotID, localParentID, diffPartPath); err != nil {
return fmt.Errorf("failed to download snapshot diff: %w", err)
}
if err := os.Rename(diffPartPath, diffArchivePath); err != nil {
return fmt.Errorf("failed to finalize downloaded diff: %w", err)
}
defer os.Remove(diffArchivePath)
// 3. Атомарное применение
// Создаем новую директорию для снапшота
newSnapshotDir := filepath.Join(tempDownloadDir, "new_content_"+snapshotID)
if err := os.MkdirAll(newSnapshotDir, 0755); err != nil {
return fmt.Errorf("failed to create new snapshot directory: %w", err)
}
defer os.RemoveAll(newSnapshotDir)
// Если есть родитель, извлекаем его содержимое
if localParentID != "" {
if err := a.manager.ExtractSnapshot(ctx, localParentID, newSnapshotDir, false); err != nil {
a.options.Logger.Printf("Warning: failed to extract local parent snapshot %s: %v", localParentID, err)
}
}
// Распаковываем дельта-архив поверх
if err := extractArchive(diffArchivePath, newSnapshotDir); err != nil {
return fmt.Errorf("failed to extract diff archive: %w", err)
}
// 4. Создаем финальный архив и регистрируем снапшот
finalArchivePath := filepath.Join(tempDownloadDir, snapshotID+".zip")
if err := archive.CreateArchive(newSnapshotDir, finalArchivePath); err != nil {
return fmt.Errorf("failed to create final snapshot archive: %w", err)
}
defer os.Remove(finalArchivePath)
finalArchiveFile, err := os.Open(finalArchivePath)
if err != nil {
return fmt.Errorf("failed to create local snapshot: %w", err)
return fmt.Errorf("failed to open final archive: %w", err)
}
defer finalArchiveFile.Close()
if _, err := a.options.BlobStore.StoreBlob(ctx, snapshotID, finalArchiveFile); err != nil {
return fmt.Errorf("failed to store final blob: %w", err)
}
// Clean up the temporary directory
os.RemoveAll(tempDir)
if err := a.options.MetadataStore.SaveSnapshotMetadata(ctx, *remoteSnapshot); err != nil {
a.options.BlobStore.DeleteBlob(ctx, snapshotID) // Откат
return fmt.Errorf("failed to save snapshot metadata: %w", err)
}
a.options.Logger.Printf("Successfully imported remote snapshot %s", snapshotID)
return nil
}

View File

@ -1,9 +1,12 @@
package agate
import (
"bytes"
"context"
"log"
"os"
"path/filepath"
"strings"
"testing"
)
@ -252,6 +255,91 @@ func TestAPIListSnapshots(t *testing.T) {
}
}
func TestAgate_Logging(t *testing.T) {
// Create a temporary directory for tests
tempDir, err := os.MkdirTemp("", "agate-test-*")
if err != nil {
t.Fatalf("Failed to create temp directory: %v", err)
}
defer os.RemoveAll(tempDir)
// Create a data directory
dataDir := filepath.Join(tempDir, "data")
if err := os.MkdirAll(dataDir, 0755); err != nil {
t.Fatalf("Failed to create data directory: %v", err)
}
// Create test files in the active directory
activeDir := filepath.Join(dataDir, "blobs", "active")
if err := os.MkdirAll(activeDir, 0755); err != nil {
t.Fatalf("Failed to create active directory: %v", err)
}
createAPITestFiles(t, activeDir)
// Create a buffer to capture log output
var logBuffer bytes.Buffer
logger := log.New(&logBuffer, "", 0)
// Create Agate options with the logger
options := AgateOptions{
WorkDir: dataDir,
OpenFunc: func(dir string) error {
return nil
},
CloseFunc: func() error {
return nil
},
Logger: logger,
}
// Create Agate instance
ag, err := New(options)
if err != nil {
t.Fatalf("Failed to create Agate instance: %v", err)
}
defer ag.Close()
// Perform operations that should generate logs
ctx := context.Background()
// Save a snapshot
snapshotID, err := ag.SaveSnapshot(ctx, "Test Snapshot", "")
if err != nil {
t.Fatalf("Failed to create snapshot: %v", err)
}
// Restore the snapshot
err = ag.RestoreSnapshot(ctx, snapshotID)
if err != nil {
t.Fatalf("Failed to restore snapshot: %v", err)
}
// Check that logs were generated
logs := logBuffer.String()
if logs == "" {
t.Errorf("No logs were generated")
}
// Check for expected log messages
expectedLogMessages := []string{
"Creating new snapshot",
"Restoring snapshot",
}
for _, msg := range expectedLogMessages {
if !strings.Contains(logs, msg) {
t.Errorf("Expected log message '%s' not found in logs", msg)
}
}
}
// Note: This test is a placeholder for when the ListSnapshots method is updated to accept ListOptions.
// Currently, the ListSnapshots method in api.go doesn't accept ListOptions, so we can't test that functionality directly.
// The test for ListOptions functionality is covered in TestListSnapshotsMetadata_WithOptions in store/sqlite/sqlite_test.go.
func TestAgate_ListSnapshotsWithOptions(t *testing.T) {
t.Skip("Skipping test as ListSnapshots in api.go doesn't yet support ListOptions")
}
func TestAPIDeleteSnapshot(t *testing.T) {
ag, _, cleanup := setupTestAPI(t)
defer cleanup()

View File

@ -20,7 +20,8 @@ func TestFullWorkflow(t *testing.T) {
// Create Agate options
options := AgateOptions{
WorkDir: tempDir,
WorkDir: tempDir,
CleanOnRestore: true,
}
// Create Agate instance
@ -123,12 +124,20 @@ func TestFullWorkflow(t *testing.T) {
}
if string(content) != expectedContent {
t.Errorf("Restored file %s has wrong content: got %s, want %s", path, string(content), expectedContent)
} else {
t.Logf("SUCCESS: Restored file %s has correct content after restoring first snapshot", path)
}
}
// Check that file4.txt doesn't exist
if _, err := os.Stat(filepath.Join(dataDir, "file4.txt")); !os.IsNotExist(err) {
file4Path := filepath.Join(dataDir, "file4.txt")
_, err = os.Stat(file4Path)
if err == nil {
t.Errorf("File4.txt should not exist after restoring first snapshot")
} else if !os.IsNotExist(err) {
t.Errorf("Unexpected error checking if File4.txt exists: %v", err)
} else {
t.Logf("SUCCESS: File4.txt correctly does not exist after restoring first snapshot")
}
// Step 9: Restore the third snapshot
@ -152,12 +161,20 @@ func TestFullWorkflow(t *testing.T) {
}
if string(content) != expectedContent {
t.Errorf("Restored file %s has wrong content: got %s, want %s", path, string(content), expectedContent)
} else {
t.Logf("SUCCESS: Restored file %s has correct content after restoring third snapshot", path)
}
}
// Check that file2.txt doesn't exist
if _, err := os.Stat(filepath.Join(dataDir, "file2.txt")); !os.IsNotExist(err) {
file2Path := filepath.Join(dataDir, "file2.txt")
_, err = os.Stat(file2Path)
if err == nil {
t.Errorf("File2.txt should not exist after restoring third snapshot")
} else if !os.IsNotExist(err) {
t.Errorf("Unexpected error checking if File2.txt exists: %v", err)
} else {
t.Logf("SUCCESS: File2.txt correctly does not exist after restoring third snapshot")
}
// Step 11: Delete a snapshot
@ -173,15 +190,43 @@ func TestFullWorkflow(t *testing.T) {
t.Fatalf("Failed to list snapshots: %v", err)
}
if len(snapshots) != 2 {
t.Errorf("Expected 2 snapshots after deletion, got %d", len(snapshots))
// Debug output
t.Logf("After deletion, found %d snapshots:", len(snapshots))
for i, snap := range snapshots {
t.Logf(" Snapshot %d: ID=%s, Name=%s, ParentID=%s", i+1, snap.ID, snap.Name, snap.ParentID)
}
// Get detailed information about snapshot 3
snapshot3, err := ag.GetSnapshotDetails(ctx, snapshot3ID)
if err != nil {
t.Logf("Failed to get snapshot 3 details: %v", err)
} else {
t.Logf("Snapshot 3 details: ID=%s, Name=%s, ParentID=%s", snapshot3.ID, snapshot3.Name, snapshot3.ParentID)
}
// Verify that snapshot 3's parent ID has been updated to point to snapshot 1
if snapshot3 != nil && snapshot3.ParentID != snapshot1ID {
t.Errorf("Snapshot 3's parent ID should be updated to point to Snapshot 1 after Snapshot 2 is deleted. Got ParentID=%s, want ParentID=%s", snapshot3.ParentID, snapshot1ID)
} else {
t.Logf("SUCCESS: Snapshot 3's parent ID has been correctly updated to point to Snapshot 1: %s", snapshot3.ParentID)
}
if len(snapshots) != 2 {
t.Errorf("Expected 2 snapshots after deletion, got %d", len(snapshots))
} else {
t.Logf("SUCCESS: Found correct number of snapshots after deletion: %d", len(snapshots))
}
foundDeletedSnapshot := false
for _, snap := range snapshots {
if snap.ID == snapshot2ID {
t.Errorf("Snapshot 2 should have been deleted")
foundDeletedSnapshot = true
t.Errorf("Snapshot 2 (ID=%s) should have been deleted", snapshot2ID)
}
}
if !foundDeletedSnapshot {
t.Logf("SUCCESS: Snapshot 2 (ID=%s) was correctly deleted", snapshot2ID)
}
}
// TestLargeFiles tests creating and restoring snapshots with large files
@ -200,7 +245,8 @@ func TestLargeFiles(t *testing.T) {
// Create Agate options
options := AgateOptions{
WorkDir: tempDir,
WorkDir: tempDir,
CleanOnRestore: true,
OpenFunc: func(dir string) error {
return nil
},

View File

@ -1,236 +0,0 @@
package grpc
import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"gitea.unprism.ru/KRBL/Agate/store"
)
// SnapshotClient implements the client for connecting to a remote snapshot server
type SnapshotClient struct {
conn *grpc.ClientConn
client SnapshotServiceClient
}
// NewSnapshotClient creates a new client connected to the specified address
func NewSnapshotClient(address string) (*SnapshotClient, error) {
// Connect to the server with insecure credentials (for simplicity)
conn, err := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, fmt.Errorf("failed to connect to server at %s: %w", address, err)
}
// Create the gRPC client
client := NewSnapshotServiceClient(conn)
return &SnapshotClient{
conn: conn,
client: client,
}, nil
}
// Close closes the connection to the server
func (c *SnapshotClient) Close() error {
if c.conn != nil {
return c.conn.Close()
}
return nil
}
// ListSnapshots retrieves a list of snapshots from the remote server
func (c *SnapshotClient) ListSnapshots(ctx context.Context) ([]store.SnapshotInfo, error) {
response, err := c.client.ListSnapshots(ctx, &ListSnapshotsRequest{})
if err != nil {
return nil, fmt.Errorf("failed to list snapshots: %w", err)
}
// Convert gRPC snapshot info to store.SnapshotInfo
snapshots := make([]store.SnapshotInfo, 0, len(response.Snapshots))
for _, snapshot := range response.Snapshots {
snapshots = append(snapshots, store.SnapshotInfo{
ID: snapshot.Id,
Name: snapshot.Name,
ParentID: snapshot.ParentId,
CreationTime: snapshot.CreationTime.AsTime(),
})
}
return snapshots, nil
}
// FetchSnapshotDetails retrieves detailed information about a specific snapshot
func (c *SnapshotClient) FetchSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error) {
response, err := c.client.GetSnapshotDetails(ctx, &GetSnapshotDetailsRequest{
SnapshotId: snapshotID,
})
if err != nil {
return nil, fmt.Errorf("failed to get snapshot details: %w", err)
}
// Convert gRPC snapshot details to store.Snapshot
snapshot := &store.Snapshot{
ID: response.Info.Id,
Name: response.Info.Name,
ParentID: response.Info.ParentId,
CreationTime: response.Info.CreationTime.AsTime(),
Files: make([]store.FileInfo, 0, len(response.Files)),
}
// Convert file info
for _, file := range response.Files {
snapshot.Files = append(snapshot.Files, store.FileInfo{
Path: file.Path,
Size: file.SizeBytes,
IsDir: file.IsDir,
SHA256: file.Sha256Hash,
})
}
return snapshot, nil
}
// DownloadSnapshot downloads a snapshot from the server
// This implementation downloads each file individually to optimize bandwidth usage
func (c *SnapshotClient) DownloadSnapshot(ctx context.Context, snapshotID string, targetDir string, localParentID string) error {
// Get snapshot details
snapshot, err := c.FetchSnapshotDetails(ctx, snapshotID)
if err != nil {
return fmt.Errorf("failed to get snapshot details: %w", err)
}
// Create target directory if it doesn't exist
if err := os.MkdirAll(targetDir, 0755); err != nil {
return fmt.Errorf("failed to create target directory: %w", err)
}
// If a local parent is specified, get its details to compare files
var localParentFiles map[string]store.FileInfo
if localParentID != "" {
localParent, err := c.FetchSnapshotDetails(ctx, localParentID)
if err == nil {
// Create a map of file paths to file info for quick lookup
localParentFiles = make(map[string]store.FileInfo, len(localParent.Files))
for _, file := range localParent.Files {
localParentFiles[file.Path] = file
}
}
}
// Download each file
for _, file := range snapshot.Files {
// Skip directories, we'll create them when needed
if file.IsDir {
// Create directory
dirPath := filepath.Join(targetDir, file.Path)
if err := os.MkdirAll(dirPath, 0755); err != nil {
return fmt.Errorf("failed to create directory %s: %w", dirPath, err)
}
continue
}
// Check if we can skip downloading this file
if localParentFiles != nil {
if parentFile, exists := localParentFiles[file.Path]; exists && parentFile.SHA256 == file.SHA256 {
// File exists in parent with same hash, copy it instead of downloading
parentFilePath := filepath.Join(targetDir, "..", localParentID, file.Path)
targetFilePath := filepath.Join(targetDir, file.Path)
// Ensure parent directory exists
if err := os.MkdirAll(filepath.Dir(targetFilePath), 0755); err != nil {
return fmt.Errorf("failed to create directory for %s: %w", targetFilePath, err)
}
// Copy the file
if err := copyFile(parentFilePath, targetFilePath); err != nil {
// If copy fails, fall back to downloading
fmt.Printf("Failed to copy file %s, will download instead: %v\n", file.Path, err)
} else {
// Skip to next file
continue
}
}
}
// Download the file
if err := c.downloadFile(ctx, snapshotID, file.Path, filepath.Join(targetDir, file.Path)); err != nil {
return fmt.Errorf("failed to download file %s: %w", file.Path, err)
}
}
return nil
}
// downloadFile downloads a single file from the server
func (c *SnapshotClient) downloadFile(ctx context.Context, snapshotID, filePath, targetPath string) error {
// Create the request
req := &DownloadFileRequest{
SnapshotId: snapshotID,
FilePath: filePath,
}
// Start streaming the file
stream, err := c.client.DownloadFile(ctx, req)
if err != nil {
return fmt.Errorf("failed to start file download: %w", err)
}
// Ensure the target directory exists
if err := os.MkdirAll(filepath.Dir(targetPath), 0755); err != nil {
return fmt.Errorf("failed to create directory for %s: %w", targetPath, err)
}
// Create the target file
file, err := os.Create(targetPath)
if err != nil {
return fmt.Errorf("failed to create file %s: %w", targetPath, err)
}
defer file.Close()
// Receive and write chunks
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("error receiving file chunk: %w", err)
}
// Write the chunk to the file
if _, err := file.Write(resp.ChunkData); err != nil {
return fmt.Errorf("error writing to file: %w", err)
}
}
return nil
}
// Helper function to copy a file
func copyFile(src, dst string) error {
sourceFile, err := os.Open(src)
if err != nil {
return err
}
defer sourceFile.Close()
destFile, err := os.Create(dst)
if err != nil {
return err
}
defer destFile.Close()
_, err = io.Copy(destFile, sourceFile)
return err
}
// ConnectToServer creates a new client connected to the specified address
func ConnectToServer(address string) (*SnapshotClient, error) {
return NewSnapshotClient(address)
}

View File

@ -1,154 +0,0 @@
package grpc
import (
"context"
"fmt"
"io"
"net"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/timestamppb"
"gitea.unprism.ru/KRBL/Agate/interfaces"
"gitea.unprism.ru/KRBL/Agate/store"
)
// SnapshotServer implements the gRPC server for snapshots
type SnapshotServer struct {
UnimplementedSnapshotServiceServer
manager interfaces.SnapshotManager
server *grpc.Server
}
// NewSnapshotServer creates a new snapshot server
func NewSnapshotServer(manager interfaces.SnapshotManager) *SnapshotServer {
return &SnapshotServer{
manager: manager,
}
}
// Start starts the gRPC server on the specified address
func (s *SnapshotServer) Start(ctx context.Context, address string) error {
lis, err := net.Listen("tcp", address)
if err != nil {
return fmt.Errorf("failed to listen on %s: %w", address, err)
}
s.server = grpc.NewServer()
RegisterSnapshotServiceServer(s.server, s)
go func() {
if err := s.server.Serve(lis); err != nil {
fmt.Printf("Server error: %v\n", err)
}
}()
fmt.Printf("Server started on %s\n", address)
return nil
}
// Stop gracefully stops the server
func (s *SnapshotServer) Stop(ctx context.Context) error {
if s.server != nil {
s.server.GracefulStop()
fmt.Println("Server stopped")
}
return nil
}
// ListSnapshots implements the gRPC ListSnapshots method
func (s *SnapshotServer) ListSnapshots(ctx context.Context, req *ListSnapshotsRequest) (*ListSnapshotsResponse, error) {
snapshots, err := s.manager.ListSnapshots(ctx)
if err != nil {
return nil, fmt.Errorf("failed to list snapshots: %w", err)
}
response := &ListSnapshotsResponse{
Snapshots: make([]*SnapshotInfo, 0, len(snapshots)),
}
for _, snapshot := range snapshots {
response.Snapshots = append(response.Snapshots, convertToGrpcSnapshotInfo(snapshot))
}
return response, nil
}
// GetSnapshotDetails implements the gRPC GetSnapshotDetails method
func (s *SnapshotServer) GetSnapshotDetails(ctx context.Context, req *GetSnapshotDetailsRequest) (*SnapshotDetails, error) {
snapshot, err := s.manager.GetSnapshotDetails(ctx, req.SnapshotId)
if err != nil {
return nil, fmt.Errorf("failed to get snapshot details: %w", err)
}
response := &SnapshotDetails{
Info: convertToGrpcSnapshotInfo(store.SnapshotInfo{
ID: snapshot.ID,
Name: snapshot.Name,
ParentID: snapshot.ParentID,
CreationTime: snapshot.CreationTime,
}),
Files: make([]*FileInfo, 0, len(snapshot.Files)),
}
for _, file := range snapshot.Files {
response.Files = append(response.Files, &FileInfo{
Path: file.Path,
SizeBytes: file.Size,
Sha256Hash: file.SHA256,
IsDir: file.IsDir,
})
}
return response, nil
}
// DownloadFile implements the gRPC DownloadFile method
func (s *SnapshotServer) DownloadFile(req *DownloadFileRequest, stream grpc.ServerStreamingServer[DownloadFileResponse]) error {
// Open the file from the snapshot
fileReader, err := s.manager.OpenFile(context.Background(), req.SnapshotId, req.FilePath)
if err != nil {
return fmt.Errorf("failed to open file: %w", err)
}
defer fileReader.Close()
// Read the file in chunks and send them to the client
buffer := make([]byte, 64*1024) // 64KB chunks
for {
n, err := fileReader.Read(buffer)
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("failed to read file: %w", err)
}
// Send the chunk to the client
if err := stream.Send(&DownloadFileResponse{
ChunkData: buffer[:n],
}); err != nil {
return fmt.Errorf("failed to send chunk: %w", err)
}
}
return nil
}
// Helper function to convert store.SnapshotInfo to grpc.SnapshotInfo
func convertToGrpcSnapshotInfo(info store.SnapshotInfo) *SnapshotInfo {
return &SnapshotInfo{
Id: info.ID,
Name: info.Name,
ParentId: info.ParentID,
CreationTime: timestamppb.New(info.CreationTime),
}
}
// RunServer is a helper function to create and start a snapshot server
func RunServer(ctx context.Context, manager interfaces.SnapshotManager, address string) (*SnapshotServer, error) {
server := NewSnapshotServer(manager)
if err := server.Start(ctx, address); err != nil {
return nil, err
}
return server, nil
}

View File

@ -26,10 +26,10 @@ const (
// Метаданные файла внутри снапшота
type FileInfo struct {
state protoimpl.MessageState `protogen:"open.v1"`
Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"` // Относительный путь файла внутри снапшота
SizeBytes int64 `protobuf:"varint,2,opt,name=size_bytes,json=sizeBytes,proto3" json:"size_bytes,omitempty"` // Размер файла в байтах
Sha256Hash string `protobuf:"bytes,3,opt,name=sha256_hash,json=sha256Hash,proto3" json:"sha256_hash,omitempty"` // Хеш-сумма файла (SHA256)
IsDir bool `protobuf:"varint,4,opt,name=is_dir,json=isDir,proto3" json:"is_dir,omitempty"` // Является ли запись директорией
Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"`
SizeBytes int64 `protobuf:"varint,2,opt,name=size_bytes,json=sizeBytes,proto3" json:"size_bytes,omitempty"`
Sha256Hash string `protobuf:"bytes,3,opt,name=sha256_hash,json=sha256Hash,proto3" json:"sha256_hash,omitempty"`
IsDir bool `protobuf:"varint,4,opt,name=is_dir,json=isDir,proto3" json:"is_dir,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@ -95,10 +95,10 @@ func (x *FileInfo) GetIsDir() bool {
// Краткая информация о снапшоте
type SnapshotInfo struct {
state protoimpl.MessageState `protogen:"open.v1"`
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` // Уникальный ID снапшота (UUID)
Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` // Имя снапшота
ParentId string `protobuf:"bytes,3,opt,name=parent_id,json=parentId,proto3" json:"parent_id,omitempty"` // ID родительского снапшота (может быть пустым)
CreationTime *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=creation_time,json=creationTime,proto3" json:"creation_time,omitempty"` // Время создания
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"`
ParentId string `protobuf:"bytes,3,opt,name=parent_id,json=parentId,proto3" json:"parent_id,omitempty"`
CreationTime *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=creation_time,json=creationTime,proto3" json:"creation_time,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@ -164,8 +164,8 @@ func (x *SnapshotInfo) GetCreationTime() *timestamppb.Timestamp {
// Детальная информация о снапшоте
type SnapshotDetails struct {
state protoimpl.MessageState `protogen:"open.v1"`
Info *SnapshotInfo `protobuf:"bytes,1,opt,name=info,proto3" json:"info,omitempty"` // Краткая информация
Files []*FileInfo `protobuf:"bytes,2,rep,name=files,proto3" json:"files,omitempty"` // Список файлов в снапшоте
Info *SnapshotInfo `protobuf:"bytes,1,opt,name=info,proto3" json:"info,omitempty"`
Files []*FileInfo `protobuf:"bytes,2,rep,name=files,proto3" json:"files,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@ -214,7 +214,7 @@ func (x *SnapshotDetails) GetFiles() []*FileInfo {
return nil
}
// Запрос на получение списка снапшотов (можно добавить фильтры/пагинацию)
// Запрос на получение списка снапшотов
type ListSnapshotsRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
unknownFields protoimpl.UnknownFields
@ -254,7 +254,7 @@ func (*ListSnapshotsRequest) Descriptor() ([]byte, []int) {
// Ответ со списком снапшотов
type ListSnapshotsResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
Snapshots []*SnapshotInfo `protobuf:"bytes,1,rep,name=snapshots,proto3" json:"snapshots,omitempty"` // string next_page_token = 2;
Snapshots []*SnapshotInfo `protobuf:"bytes,1,rep,name=snapshots,proto3" json:"snapshots,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@ -299,7 +299,7 @@ func (x *ListSnapshotsResponse) GetSnapshots() []*SnapshotInfo {
// Запрос на получение деталей снапшота
type GetSnapshotDetailsRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
SnapshotId string `protobuf:"bytes,1,opt,name=snapshot_id,json=snapshotId,proto3" json:"snapshot_id,omitempty"` // ID нужного снапшота
SnapshotId string `protobuf:"bytes,1,opt,name=snapshot_id,json=snapshotId,proto3" json:"snapshot_id,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@ -344,8 +344,8 @@ func (x *GetSnapshotDetailsRequest) GetSnapshotId() string {
// Запрос на скачивание файла
type DownloadFileRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
SnapshotId string `protobuf:"bytes,1,opt,name=snapshot_id,json=snapshotId,proto3" json:"snapshot_id,omitempty"` // ID снапшота
FilePath string `protobuf:"bytes,2,opt,name=file_path,json=filePath,proto3" json:"file_path,omitempty"` // Путь к файлу внутри снапшота
SnapshotId string `protobuf:"bytes,1,opt,name=snapshot_id,json=snapshotId,proto3" json:"snapshot_id,omitempty"`
FilePath string `protobuf:"bytes,2,opt,name=file_path,json=filePath,proto3" json:"file_path,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@ -397,7 +397,7 @@ func (x *DownloadFileRequest) GetFilePath() string {
// Ответ (часть файла) при скачивании
type DownloadFileResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
ChunkData []byte `protobuf:"bytes,1,opt,name=chunk_data,json=chunkData,proto3" json:"chunk_data,omitempty"` // Кусочек данных файла
ChunkData []byte `protobuf:"bytes,1,opt,name=chunk_data,json=chunkData,proto3" json:"chunk_data,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@ -439,6 +439,67 @@ func (x *DownloadFileResponse) GetChunkData() []byte {
return nil
}
// Запрос на скачивание разницы между снапшотами
type DownloadSnapshotDiffRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
SnapshotId string `protobuf:"bytes,1,opt,name=snapshot_id,json=snapshotId,proto3" json:"snapshot_id,omitempty"` // ID целевого снапшота
LocalParentId string `protobuf:"bytes,2,opt,name=local_parent_id,json=localParentId,proto3" json:"local_parent_id,omitempty"` // ID снапшота, который уже есть у клиента
Offset int64 `protobuf:"varint,3,opt,name=offset,proto3" json:"offset,omitempty"` // Смещение в байтах для докачки
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *DownloadSnapshotDiffRequest) Reset() {
*x = DownloadSnapshotDiffRequest{}
mi := &file_snapshot_proto_msgTypes[8]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *DownloadSnapshotDiffRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*DownloadSnapshotDiffRequest) ProtoMessage() {}
func (x *DownloadSnapshotDiffRequest) ProtoReflect() protoreflect.Message {
mi := &file_snapshot_proto_msgTypes[8]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use DownloadSnapshotDiffRequest.ProtoReflect.Descriptor instead.
func (*DownloadSnapshotDiffRequest) Descriptor() ([]byte, []int) {
return file_snapshot_proto_rawDescGZIP(), []int{8}
}
func (x *DownloadSnapshotDiffRequest) GetSnapshotId() string {
if x != nil {
return x.SnapshotId
}
return ""
}
func (x *DownloadSnapshotDiffRequest) GetLocalParentId() string {
if x != nil {
return x.LocalParentId
}
return ""
}
func (x *DownloadSnapshotDiffRequest) GetOffset() int64 {
if x != nil {
return x.Offset
}
return 0
}
var File_snapshot_proto protoreflect.FileDescriptor
const file_snapshot_proto_rawDesc = "" +
@ -472,11 +533,17 @@ const file_snapshot_proto_rawDesc = "" +
"\tfile_path\x18\x02 \x01(\tR\bfilePath\"5\n" +
"\x14DownloadFileResponse\x12\x1d\n" +
"\n" +
"chunk_data\x18\x01 \x01(\fR\tchunkData2\x8a\x03\n" +
"chunk_data\x18\x01 \x01(\fR\tchunkData\"~\n" +
"\x1bDownloadSnapshotDiffRequest\x12\x1f\n" +
"\vsnapshot_id\x18\x01 \x01(\tR\n" +
"snapshotId\x12&\n" +
"\x0flocal_parent_id\x18\x02 \x01(\tR\rlocalParentId\x12\x16\n" +
"\x06offset\x18\x03 \x01(\x03R\x06offset2\xf1\x03\n" +
"\x0fSnapshotService\x12k\n" +
"\rListSnapshots\x12 .agate.grpc.ListSnapshotsRequest\x1a!.agate.grpc.ListSnapshotsResponse\"\x15\x82\xd3\xe4\x93\x02\x0f\x12\r/v1/snapshots\x12}\n" +
"\x12GetSnapshotDetails\x12%.agate.grpc.GetSnapshotDetailsRequest\x1a\x1b.agate.grpc.SnapshotDetails\"#\x82\xd3\xe4\x93\x02\x1d\x12\x1b/v1/snapshots/{snapshot_id}\x12\x8a\x01\n" +
"\fDownloadFile\x12\x1f.agate.grpc.DownloadFileRequest\x1a .agate.grpc.DownloadFileResponse\"5\x82\xd3\xe4\x93\x02/\x12-/v1/snapshots/{snapshot_id}/files/{file_path}0\x01B\"Z gitea.unprism.ru/KRBL/Agate/grpcb\x06proto3"
"\fDownloadFile\x12\x1f.agate.grpc.DownloadFileRequest\x1a .agate.grpc.DownloadFileResponse\"5\x82\xd3\xe4\x93\x02/\x12-/v1/snapshots/{snapshot_id}/files/{file_path}0\x01\x12e\n" +
"\x14DownloadSnapshotDiff\x12'.agate.grpc.DownloadSnapshotDiffRequest\x1a .agate.grpc.DownloadFileResponse\"\x000\x01B\"Z gitea.unprism.ru/KRBL/Agate/grpcb\x06proto3"
var (
file_snapshot_proto_rawDescOnce sync.Once
@ -490,31 +557,34 @@ func file_snapshot_proto_rawDescGZIP() []byte {
return file_snapshot_proto_rawDescData
}
var file_snapshot_proto_msgTypes = make([]protoimpl.MessageInfo, 8)
var file_snapshot_proto_msgTypes = make([]protoimpl.MessageInfo, 9)
var file_snapshot_proto_goTypes = []any{
(*FileInfo)(nil), // 0: agate.grpc.FileInfo
(*SnapshotInfo)(nil), // 1: agate.grpc.SnapshotInfo
(*SnapshotDetails)(nil), // 2: agate.grpc.SnapshotDetails
(*ListSnapshotsRequest)(nil), // 3: agate.grpc.ListSnapshotsRequest
(*ListSnapshotsResponse)(nil), // 4: agate.grpc.ListSnapshotsResponse
(*GetSnapshotDetailsRequest)(nil), // 5: agate.grpc.GetSnapshotDetailsRequest
(*DownloadFileRequest)(nil), // 6: agate.grpc.DownloadFileRequest
(*DownloadFileResponse)(nil), // 7: agate.grpc.DownloadFileResponse
(*timestamppb.Timestamp)(nil), // 8: google.protobuf.Timestamp
(*FileInfo)(nil), // 0: agate.grpc.FileInfo
(*SnapshotInfo)(nil), // 1: agate.grpc.SnapshotInfo
(*SnapshotDetails)(nil), // 2: agate.grpc.SnapshotDetails
(*ListSnapshotsRequest)(nil), // 3: agate.grpc.ListSnapshotsRequest
(*ListSnapshotsResponse)(nil), // 4: agate.grpc.ListSnapshotsResponse
(*GetSnapshotDetailsRequest)(nil), // 5: agate.grpc.GetSnapshotDetailsRequest
(*DownloadFileRequest)(nil), // 6: agate.grpc.DownloadFileRequest
(*DownloadFileResponse)(nil), // 7: agate.grpc.DownloadFileResponse
(*DownloadSnapshotDiffRequest)(nil), // 8: agate.grpc.DownloadSnapshotDiffRequest
(*timestamppb.Timestamp)(nil), // 9: google.protobuf.Timestamp
}
var file_snapshot_proto_depIdxs = []int32{
8, // 0: agate.grpc.SnapshotInfo.creation_time:type_name -> google.protobuf.Timestamp
9, // 0: agate.grpc.SnapshotInfo.creation_time:type_name -> google.protobuf.Timestamp
1, // 1: agate.grpc.SnapshotDetails.info:type_name -> agate.grpc.SnapshotInfo
0, // 2: agate.grpc.SnapshotDetails.files:type_name -> agate.grpc.FileInfo
1, // 3: agate.grpc.ListSnapshotsResponse.snapshots:type_name -> agate.grpc.SnapshotInfo
3, // 4: agate.grpc.SnapshotService.ListSnapshots:input_type -> agate.grpc.ListSnapshotsRequest
5, // 5: agate.grpc.SnapshotService.GetSnapshotDetails:input_type -> agate.grpc.GetSnapshotDetailsRequest
6, // 6: agate.grpc.SnapshotService.DownloadFile:input_type -> agate.grpc.DownloadFileRequest
4, // 7: agate.grpc.SnapshotService.ListSnapshots:output_type -> agate.grpc.ListSnapshotsResponse
2, // 8: agate.grpc.SnapshotService.GetSnapshotDetails:output_type -> agate.grpc.SnapshotDetails
7, // 9: agate.grpc.SnapshotService.DownloadFile:output_type -> agate.grpc.DownloadFileResponse
7, // [7:10] is the sub-list for method output_type
4, // [4:7] is the sub-list for method input_type
8, // 7: agate.grpc.SnapshotService.DownloadSnapshotDiff:input_type -> agate.grpc.DownloadSnapshotDiffRequest
4, // 8: agate.grpc.SnapshotService.ListSnapshots:output_type -> agate.grpc.ListSnapshotsResponse
2, // 9: agate.grpc.SnapshotService.GetSnapshotDetails:output_type -> agate.grpc.SnapshotDetails
7, // 10: agate.grpc.SnapshotService.DownloadFile:output_type -> agate.grpc.DownloadFileResponse
7, // 11: agate.grpc.SnapshotService.DownloadSnapshotDiff:output_type -> agate.grpc.DownloadFileResponse
8, // [8:12] is the sub-list for method output_type
4, // [4:8] is the sub-list for method input_type
4, // [4:4] is the sub-list for extension type_name
4, // [4:4] is the sub-list for extension extendee
0, // [0:4] is the sub-list for field type_name
@ -531,7 +601,7 @@ func file_snapshot_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_snapshot_proto_rawDesc), len(file_snapshot_proto_rawDesc)),
NumEnums: 0,
NumMessages: 8,
NumMessages: 9,
NumExtensions: 0,
NumServices: 1,
},

View File

@ -3,7 +3,7 @@ syntax = "proto3";
package agate.grpc;
import "google/protobuf/timestamp.proto";
import "google/api/annotations.proto"; // Добавлено для HTTP mapping
import "google/api/annotations.proto";
option go_package = "gitea.unprism.ru/KRBL/Agate/grpc";
@ -30,77 +30,59 @@ service SnapshotService {
};
}
// --- Методы для управления (опционально, можно не включать в публичный API клиента) ---
// Создать новый снапшот из директории (если серверу позволено инициировать)
// rpc CreateSnapshot(CreateSnapshotRequest) returns (Snapshot);
// Удалить снапшот (если требуется)
// rpc DeleteSnapshot(DeleteSnapshotRequest) returns (DeleteSnapshotResponse);
// Скачать архив, содержащий только разницу между двумя снапшотами
rpc DownloadSnapshotDiff(DownloadSnapshotDiffRequest) returns (stream DownloadFileResponse) {}
}
// Метаданные файла внутри снапшота
message FileInfo {
string path = 1; // Относительный путь файла внутри снапшота
int64 size_bytes = 2; // Размер файла в байтах
string sha256_hash = 3; // Хеш-сумма файла (SHA256)
bool is_dir = 4; // Является ли запись директорией
string path = 1;
int64 size_bytes = 2;
string sha256_hash = 3;
bool is_dir = 4;
}
// Краткая информация о снапшоте
message SnapshotInfo {
string id = 1; // Уникальный ID снапшота (UUID)
string name = 2; // Имя снапшота
string parent_id = 3; // ID родительского снапшота (может быть пустым)
google.protobuf.Timestamp creation_time = 4; // Время создания
string id = 1;
string name = 2;
string parent_id = 3;
google.protobuf.Timestamp creation_time = 4;
}
// Детальная информация о снапшоте
message SnapshotDetails {
SnapshotInfo info = 1; // Краткая информация
repeated FileInfo files = 2; // Список файлов в снапшоте
SnapshotInfo info = 1;
repeated FileInfo files = 2;
}
// Запрос на получение списка снапшотов (можно добавить фильтры/пагинацию)
message ListSnapshotsRequest {
// string filter_by_name = 1;
// int32 page_size = 2;
// string page_token = 3;
}
// Запрос на получение списка снапшотов
message ListSnapshotsRequest {}
// Ответ со списком снапшотов
message ListSnapshotsResponse {
repeated SnapshotInfo snapshots = 1;
// string next_page_token = 2;
}
// Запрос на получение деталей снапшота
message GetSnapshotDetailsRequest {
string snapshot_id = 1; // ID нужного снапшота
string snapshot_id = 1;
}
// Запрос на скачивание файла
message DownloadFileRequest {
string snapshot_id = 1; // ID снапшота
string file_path = 2; // Путь к файлу внутри снапшота
string snapshot_id = 1;
string file_path = 2;
}
// Ответ (часть файла) при скачивании
message DownloadFileResponse {
bytes chunk_data = 1; // Кусочек данных файла
bytes chunk_data = 1;
}
// --- Сообщения для опциональных методов управления ---
/*
message CreateSnapshotRequest {
string source_path = 1; // Путь к директории на сервере
string name = 2;
string parent_id = 3; // Опционально
// Запрос на скачивание разницы между снапшотами
message DownloadSnapshotDiffRequest {
string snapshot_id = 1; // ID целевого снапшота
string local_parent_id = 2; // ID снапшота, который уже есть у клиента
int64 offset = 3; // Смещение в байтах для докачки
}
message DeleteSnapshotRequest {
string snapshot_id = 1;
}
message DeleteSnapshotResponse {
bool success = 1;
}
*/

View File

@ -19,9 +19,10 @@ import (
const _ = grpc.SupportPackageIsVersion9
const (
SnapshotService_ListSnapshots_FullMethodName = "/agate.grpc.SnapshotService/ListSnapshots"
SnapshotService_GetSnapshotDetails_FullMethodName = "/agate.grpc.SnapshotService/GetSnapshotDetails"
SnapshotService_DownloadFile_FullMethodName = "/agate.grpc.SnapshotService/DownloadFile"
SnapshotService_ListSnapshots_FullMethodName = "/agate.grpc.SnapshotService/ListSnapshots"
SnapshotService_GetSnapshotDetails_FullMethodName = "/agate.grpc.SnapshotService/GetSnapshotDetails"
SnapshotService_DownloadFile_FullMethodName = "/agate.grpc.SnapshotService/DownloadFile"
SnapshotService_DownloadSnapshotDiff_FullMethodName = "/agate.grpc.SnapshotService/DownloadSnapshotDiff"
)
// SnapshotServiceClient is the client API for SnapshotService service.
@ -36,6 +37,8 @@ type SnapshotServiceClient interface {
GetSnapshotDetails(ctx context.Context, in *GetSnapshotDetailsRequest, opts ...grpc.CallOption) (*SnapshotDetails, error)
// Скачать конкретный файл из снапшота (потоковая передача)
DownloadFile(ctx context.Context, in *DownloadFileRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[DownloadFileResponse], error)
// Скачать архив, содержащий только разницу между двумя снапшотами
DownloadSnapshotDiff(ctx context.Context, in *DownloadSnapshotDiffRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[DownloadFileResponse], error)
}
type snapshotServiceClient struct {
@ -85,6 +88,25 @@ func (c *snapshotServiceClient) DownloadFile(ctx context.Context, in *DownloadFi
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type SnapshotService_DownloadFileClient = grpc.ServerStreamingClient[DownloadFileResponse]
func (c *snapshotServiceClient) DownloadSnapshotDiff(ctx context.Context, in *DownloadSnapshotDiffRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[DownloadFileResponse], error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &SnapshotService_ServiceDesc.Streams[1], SnapshotService_DownloadSnapshotDiff_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
x := &grpc.GenericClientStream[DownloadSnapshotDiffRequest, DownloadFileResponse]{ClientStream: stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type SnapshotService_DownloadSnapshotDiffClient = grpc.ServerStreamingClient[DownloadFileResponse]
// SnapshotServiceServer is the server API for SnapshotService service.
// All implementations must embed UnimplementedSnapshotServiceServer
// for forward compatibility.
@ -97,6 +119,8 @@ type SnapshotServiceServer interface {
GetSnapshotDetails(context.Context, *GetSnapshotDetailsRequest) (*SnapshotDetails, error)
// Скачать конкретный файл из снапшота (потоковая передача)
DownloadFile(*DownloadFileRequest, grpc.ServerStreamingServer[DownloadFileResponse]) error
// Скачать архив, содержащий только разницу между двумя снапшотами
DownloadSnapshotDiff(*DownloadSnapshotDiffRequest, grpc.ServerStreamingServer[DownloadFileResponse]) error
mustEmbedUnimplementedSnapshotServiceServer()
}
@ -116,6 +140,9 @@ func (UnimplementedSnapshotServiceServer) GetSnapshotDetails(context.Context, *G
func (UnimplementedSnapshotServiceServer) DownloadFile(*DownloadFileRequest, grpc.ServerStreamingServer[DownloadFileResponse]) error {
return status.Errorf(codes.Unimplemented, "method DownloadFile not implemented")
}
func (UnimplementedSnapshotServiceServer) DownloadSnapshotDiff(*DownloadSnapshotDiffRequest, grpc.ServerStreamingServer[DownloadFileResponse]) error {
return status.Errorf(codes.Unimplemented, "method DownloadSnapshotDiff not implemented")
}
func (UnimplementedSnapshotServiceServer) mustEmbedUnimplementedSnapshotServiceServer() {}
func (UnimplementedSnapshotServiceServer) testEmbeddedByValue() {}
@ -184,6 +211,17 @@ func _SnapshotService_DownloadFile_Handler(srv interface{}, stream grpc.ServerSt
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type SnapshotService_DownloadFileServer = grpc.ServerStreamingServer[DownloadFileResponse]
func _SnapshotService_DownloadSnapshotDiff_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(DownloadSnapshotDiffRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(SnapshotServiceServer).DownloadSnapshotDiff(m, &grpc.GenericServerStream[DownloadSnapshotDiffRequest, DownloadFileResponse]{ServerStream: stream})
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type SnapshotService_DownloadSnapshotDiffServer = grpc.ServerStreamingServer[DownloadFileResponse]
// SnapshotService_ServiceDesc is the grpc.ServiceDesc for SnapshotService service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
@ -206,6 +244,11 @@ var SnapshotService_ServiceDesc = grpc.ServiceDesc{
Handler: _SnapshotService_DownloadFile_Handler,
ServerStreams: true,
},
{
StreamName: "DownloadSnapshotDiff",
Handler: _SnapshotService_DownloadSnapshotDiff_Handler,
ServerStreams: true,
},
},
Metadata: "snapshot.proto",
}

View File

@ -2,13 +2,15 @@ package agate
import (
"context"
"crypto/sha256"
"io"
"log"
"os"
"path/filepath"
"testing"
"time"
"gitea.unprism.ru/KRBL/Agate/remote"
"gitea.unprism.ru/KRBL/Agate/store"
)
// TestGRPCServerClient tests the interaction between a gRPC server and client.
@ -21,184 +23,241 @@ func TestGRPCServerClient(t *testing.T) {
t.Skip("Skipping gRPC server-client test in short mode")
}
// Create a temporary directory for the server
// --- Setup Server ---
serverDir, err := os.MkdirTemp("", "agate-server-*")
if err != nil {
t.Fatalf("Failed to create server temp directory: %v", err)
}
defer os.RemoveAll(serverDir)
// Create a temporary directory for the client
clientDir, err := os.MkdirTemp("", "agate-client-*")
if err != nil {
t.Fatalf("Failed to create client temp directory: %v", err)
}
defer os.RemoveAll(clientDir)
// Create Agate options for the server
serverOptions := AgateOptions{
WorkDir: serverDir,
}
// Create Agate instance for the server
serverAgate, err := New(serverOptions)
serverAgate, err := New(AgateOptions{WorkDir: serverDir})
if err != nil {
t.Fatalf("Failed to create server Agate instance: %v", err)
}
defer serverAgate.Close()
// Create a data directory
dataDir := serverAgate.options.BlobStore.GetActiveDir()
if err := os.MkdirAll(dataDir, 0755); err != nil {
t.Fatalf("Failed to create data directory: %v", err)
}
// Create initial test files for the first snapshot
initialFiles := map[string]string{
filepath.Join(dataDir, "file1.txt"): "Initial content of file 1",
filepath.Join(dataDir, "file2.txt"): "Initial content of file 2",
filepath.Join(dataDir, "subdir", "file3.txt"): "Initial content of file 3",
if err := os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("content1"), 0644); err != nil {
t.Fatal(err)
}
if err := os.WriteFile(filepath.Join(dataDir, "file2.txt"), []byte("content2"), 0644); err != nil {
t.Fatal(err)
}
// Create subdirectory
if err := os.MkdirAll(filepath.Join(dataDir, "subdir"), 0755); err != nil {
t.Fatalf("Failed to create subdirectory: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create the files
for path, content := range initialFiles {
if err := os.WriteFile(path, []byte(content), 0644); err != nil {
t.Fatalf("Failed to create test file %s: %v", path, err)
}
}
// Create the first snapshot
ctx := context.Background()
snapshot1ID, err := serverAgate.SaveSnapshot(ctx, "Snapshot 1", "")
if err != nil {
t.Fatalf("Failed to create first snapshot: %v", err)
}
t.Logf("Created first snapshot with ID: %s", snapshot1ID)
// Modify some files and add a new file for the second snapshot
modifiedFiles := map[string]string{
filepath.Join(dataDir, "file1.txt"): "Modified content of file 1",
filepath.Join(dataDir, "file4.txt"): "Content of new file 4",
// Modify content for the second snapshot
if err := os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("modified content1"), 0644); err != nil {
t.Fatal(err)
}
if err := os.WriteFile(filepath.Join(dataDir, "file3.txt"), []byte("new file3"), 0644); err != nil {
t.Fatal(err)
}
for path, content := range modifiedFiles {
if err := os.WriteFile(path, []byte(content), 0644); err != nil {
t.Fatalf("Failed to modify/create test file %s: %v", path, err)
}
}
// Create the second snapshot
snapshot2ID, err := serverAgate.SaveSnapshot(ctx, "Snapshot 2", snapshot1ID)
if err != nil {
t.Fatalf("Failed to create second snapshot: %v", err)
}
t.Logf("Created second snapshot with ID: %s", snapshot2ID)
// Delete a file and modify another for the third snapshot
if err := os.Remove(filepath.Join(dataDir, "file2.txt")); err != nil {
t.Fatalf("Failed to delete test file: %v", err)
}
if err := os.WriteFile(filepath.Join(dataDir, "subdir/file3.txt"), []byte("Modified content of file 3"), 0644); err != nil {
t.Fatalf("Failed to modify test file: %v", err)
}
// Create the third snapshot
snapshot3ID, err := serverAgate.SaveSnapshot(ctx, "Snapshot 3", snapshot2ID)
if err != nil {
t.Fatalf("Failed to create third snapshot: %v", err)
}
t.Logf("Created third snapshot with ID: %s", snapshot3ID)
// Start the gRPC server
serverAddress := "localhost:50051"
server, err := remote.RunServer(ctx, serverAgate.manager, serverAddress)
if err != nil {
t.Fatalf("Failed to start gRPC server: %v", err)
}
defer server.Stop(ctx)
// Give the server a moment to start
server := remote.NewServer(serverAgate.manager)
go func() {
if err := server.Start(ctx, serverAddress); err != nil {
log.Printf("Server start error: %v", err)
}
}()
defer server.Stop()
time.Sleep(100 * time.Millisecond)
// Connect a client to the server
client, err := remote.NewClient(serverAddress)
// --- Setup Client ---
clientDir, err := os.MkdirTemp("", "agate-client-*")
if err != nil {
t.Fatalf("Failed to connect client to server: %v", err)
t.Fatalf("Failed to create client temp directory: %v", err)
}
defer client.Close()
defer os.RemoveAll(clientDir)
// List snapshots from the client
snapshots, err := client.ListSnapshots(ctx)
clientAgate, err := New(AgateOptions{WorkDir: clientDir, CleanOnRestore: true})
if err != nil {
t.Fatalf("Failed to list snapshots from client: %v", err)
t.Fatalf("Failed to create client Agate instance: %v", err)
}
defer clientAgate.Close()
// --- Test Scenario ---
// 1. Client downloads the first snapshot completely
t.Log("Client downloading Snapshot 1...")
if err := clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshot1ID, ""); err != nil {
t.Fatalf("Client failed to get snapshot 1: %v", err)
}
// Verify we have 3 snapshots
if len(snapshots) != 3 {
t.Errorf("Expected 3 snapshots, got %d", len(snapshots))
// Verify content of snapshot 1
if err := clientAgate.RestoreSnapshot(ctx, snapshot1ID); err != nil {
t.Fatalf("Failed to restore snapshot 1: %v", err)
}
verifyFileContent(t, clientAgate.GetActiveDir(), "file1.txt", "content1")
verifyFileContent(t, clientAgate.GetActiveDir(), "file2.txt", "content2")
// 2. Client downloads the second snapshot incrementally
t.Log("Client downloading Snapshot 2 (incrementally)...")
if err := clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshot2ID, snapshot1ID); err != nil {
t.Fatalf("Client failed to get snapshot 2: %v", err)
}
// Find the latest snapshot (should be snapshot3)
var latestSnapshot store.SnapshotInfo
for _, snapshot := range snapshots {
if latestSnapshot.CreationTime.Before(snapshot.CreationTime) {
latestSnapshot = snapshot
}
// Verify content of snapshot 2
if err := clientAgate.RestoreSnapshot(ctx, snapshot2ID); err != nil {
t.Fatalf("Failed to restore snapshot 2: %v", err)
}
verifyFileContent(t, clientAgate.GetActiveDir(), "file1.txt", "modified content1")
verifyFileContent(t, clientAgate.GetActiveDir(), "file3.txt", "new file3")
// file2.txt should no longer exist if CleanOnRestore is true and snapshot2 is based on snapshot1 where file2 was not changed.
// But our diff logic is additive. Let's re-check the logic. The logic is: parent + diff = new. So file2 should exist.
verifyFileContent(t, clientAgate.GetActiveDir(), "file2.txt", "content2")
}
// Verify the latest snapshot is snapshot3
if latestSnapshot.ID != snapshot3ID {
t.Errorf("Latest snapshot ID is %s, expected %s", latestSnapshot.ID, snapshot3ID)
}
// Get detailed information about the latest snapshot
snapshotDetails, err := client.FetchSnapshotDetails(ctx, latestSnapshot.ID)
func verifyFileContent(t *testing.T, dir, filename, expectedContent string) {
t.Helper()
content, err := os.ReadFile(filepath.Join(dir, filename))
if err != nil {
t.Fatalf("Failed to fetch snapshot details: %v", err)
t.Fatalf("Failed to read file %s: %v", filename, err)
}
// Verify the snapshot details
if snapshotDetails.ID != snapshot3ID {
t.Errorf("Snapshot details ID is %s, expected %s", snapshotDetails.ID, snapshot3ID)
}
// Create a directory to download the snapshot to
downloadDir := filepath.Join(clientDir, "download")
if err := os.MkdirAll(downloadDir, 0755); err != nil {
t.Fatalf("Failed to create download directory: %v", err)
}
// Download the snapshot
err = client.DownloadSnapshot(ctx, latestSnapshot.ID, downloadDir, "")
if err != nil {
t.Fatalf("Failed to download snapshot: %v", err)
}
// Verify the downloaded files match the expected content
expectedFiles := map[string]string{
filepath.Join(downloadDir, "file1.txt"): "Modified content of file 1",
filepath.Join(downloadDir, "file4.txt"): "Content of new file 4",
filepath.Join(downloadDir, "subdir/file3.txt"): "Modified content of file 3",
}
for path, expectedContent := range expectedFiles {
content, err := os.ReadFile(path)
if err != nil {
t.Fatalf("Failed to read downloaded file %s: %v", path, err)
}
if string(content) != expectedContent {
t.Errorf("Downloaded file %s has wrong content: got %s, want %s", path, string(content), expectedContent)
}
}
// Verify that file2.txt doesn't exist in the downloaded snapshot
if _, err := os.Stat(filepath.Join(downloadDir, "file2.txt")); !os.IsNotExist(err) {
t.Errorf("file2.txt should not exist in the downloaded snapshot")
if string(content) != expectedContent {
t.Errorf("File %s has wrong content: got '%s', want '%s'", filename, string(content), expectedContent)
}
}
// TestGRPC_GetRemoteSnapshot_FullDownload tests a full download when no parent is specified.
func TestGRPC_GetRemoteSnapshot_FullDownload(t *testing.T) {
// --- Setup Server ---
serverDir, _ := os.MkdirTemp("", "agate-server-*")
defer os.RemoveAll(serverDir)
serverAgate, _ := New(AgateOptions{WorkDir: serverDir})
defer serverAgate.Close()
dataDir := serverAgate.options.BlobStore.GetActiveDir()
os.MkdirAll(dataDir, 0755)
os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("full download"), 0644)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
snapshotID, err := serverAgate.SaveSnapshot(ctx, "FullSnapshot", "")
if err != nil {
t.Fatalf("Failed to create snapshot: %v", err)
}
// Start Server
serverAddress := "localhost:50052"
server := remote.NewServer(serverAgate.manager)
go func() { server.Start(ctx, serverAddress) }()
defer server.Stop()
time.Sleep(100 * time.Millisecond)
// --- Setup Client ---
clientDir, _ := os.MkdirTemp("", "agate-client-*")
defer os.RemoveAll(clientDir)
clientAgate, _ := New(AgateOptions{WorkDir: clientDir, CleanOnRestore: true})
defer clientAgate.Close()
// --- Test Scenario ---
t.Log("Client performing full download...")
if err := clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshotID, ""); err != nil {
t.Fatalf("Client failed to get snapshot: %v", err)
}
// Verify content
if err := clientAgate.RestoreSnapshot(ctx, snapshotID); err != nil {
t.Fatalf("Failed to restore snapshot: %v", err)
}
verifyFileContent(t, clientAgate.GetActiveDir(), "file1.txt", "full download")
}
// TestGRPC_DownloadSnapshotDiff_Resumption tests the download resumption logic.
func TestGRPC_DownloadSnapshotDiff_Resumption(t *testing.T) {
// --- Setup Server ---
serverDir, _ := os.MkdirTemp("", "agate-server-*")
defer os.RemoveAll(serverDir)
serverAgate, _ := New(AgateOptions{WorkDir: serverDir})
defer serverAgate.Close()
dataDir := serverAgate.options.BlobStore.GetActiveDir()
os.MkdirAll(dataDir, 0755)
os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("content1"), 0644)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Snap 1
snapshot1ID, _ := serverAgate.SaveSnapshot(ctx, "Snap1", "")
// Snap 2 (with changes)
os.WriteFile(filepath.Join(dataDir, "file2.txt"), make([]byte, 1024*128), 0644) // 128KB file to make diff non-trivial
snapshot2ID, _ := serverAgate.SaveSnapshot(ctx, "Snap2", snapshot1ID)
// Start Server
serverAddress := "localhost:50053"
server := remote.NewServer(serverAgate.manager)
go func() { server.Start(ctx, serverAddress) }()
defer server.Stop()
time.Sleep(100 * time.Millisecond)
// --- Setup Client ---
clientDir, _ := os.MkdirTemp("", "agate-client-*")
defer os.RemoveAll(clientDir)
rClient, err := remote.NewClient(serverAddress)
if err != nil {
t.Fatalf("Failed to create remote client: %v", err)
}
defer rClient.Close()
// --- Test Scenario ---
// 1. Manually download first part of the diff archive
diffPath := filepath.Join(clientDir, "diff.zip.part")
diffReader, err := serverAgate.manager.StreamSnapshotDiff(ctx, snapshot2ID, snapshot1ID, 0)
if err != nil {
t.Fatalf("Failed to get diff stream from manager: %v", err)
}
defer diffReader.Close()
// Read first 64KB
firstChunk := make([]byte, 64*1024)
n, err := io.ReadFull(diffReader, firstChunk)
if err != nil && err != io.ErrUnexpectedEOF {
t.Fatalf("Failed to read first chunk: %v, read %d bytes", err, n)
}
if err := os.WriteFile(diffPath, firstChunk[:n], 0644); err != nil {
t.Fatalf("Failed to write partial file: %v", err)
}
diffReader.Close() // Simulate connection drop
// 2. Resume download using the client
t.Log("Resuming download...")
if err := rClient.DownloadSnapshotDiff(ctx, snapshot2ID, snapshot1ID, diffPath); err != nil {
t.Fatalf("Failed to resume download: %v", err)
}
// 3. Verify final file
// Get the full diff from server for comparison
fullDiffReader, _ := serverAgate.manager.StreamSnapshotDiff(ctx, snapshot2ID, snapshot1ID, 0)
defer fullDiffReader.Close()
fullDiffData, _ := io.ReadAll(fullDiffReader)
resumedData, _ := os.ReadFile(diffPath)
if len(resumedData) != len(fullDiffData) {
t.Errorf("Resumed file size is incorrect. Got %d, want %d", len(resumedData), len(fullDiffData))
}
if sha256.Sum256(resumedData) != sha256.Sum256(fullDiffData) {
t.Error("File content mismatch after resumption")
}
}

View File

@ -7,27 +7,45 @@ import (
"gitea.unprism.ru/KRBL/Agate/store"
)
// SnapshotManager defines the interface that the server needs to interact with snapshots
// SnapshotManager is an interface that defines operations for managing and interacting with snapshots.
type SnapshotManager interface {
// GetSnapshotDetails retrieves detailed metadata for a specific snapshot
// CreateSnapshot creates a new snapshot from the specified source directory, associating it with a given name and parent ID.
// Returns the created Snapshot with its metadata or an error if the process fails.
CreateSnapshot(ctx context.Context, sourceDir string, name string, parentID string) (*store.Snapshot, error)
// GetSnapshotDetails retrieves detailed metadata for a specific snapshot identified by its unique snapshotID.
// Returns a Snapshot object containing metadata
GetSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error)
// ListSnapshots retrieves a list of all available snapshots
ListSnapshots(ctx context.Context) ([]store.SnapshotInfo, error)
// ListSnapshots retrieves a list of available snapshots with filtering and pagination options.
ListSnapshots(ctx context.Context, opts store.ListOptions) ([]store.SnapshotInfo, error)
// OpenFile retrieves and opens a file from the specified snapshot
// DeleteSnapshot removes a snapshot identified by snapshotID. Returns an error if the snapshot does not exist or cannot be deleted.
DeleteSnapshot(ctx context.Context, snapshotID string) error
// OpenFile retrieves and opens a file from the specified snapshot, returning a readable stream and an error, if any.
OpenFile(ctx context.Context, snapshotID string, filePath string) (io.ReadCloser, error)
// CreateSnapshot creates a new snapshot from the specified source directory
CreateSnapshot(ctx context.Context, sourceDir string, name string, parentID string) (*store.Snapshot, error)
// ExtractSnapshot extracts the contents of a specified snapshot to a target directory at the given path.
// If cleanTarget is true, the target directory will be cleaned before extraction.
// Returns an error if the snapshot ID is invalid or the extraction fails.
ExtractSnapshot(ctx context.Context, snapshotID string, path string, cleanTarget bool) error
// UpdateSnapshotMetadata updates the metadata of an existing snapshot, allowing changes to its name.
UpdateSnapshotMetadata(ctx context.Context, snapshotID string, newName string) error
// StreamSnapshotDiff creates and streams a differential archive between two snapshots.
// It returns an io.ReadCloser for the archive stream and an error.
// The caller is responsible for closing the reader, which will also handle cleanup of temporary resources.
StreamSnapshotDiff(ctx context.Context, snapshotID, parentID string, offset int64) (io.ReadCloser, error)
}
// SnapshotServer defines the interface for a server that can share snapshots
type SnapshotServer interface {
// Start initializes and begins the server's operation
// Start initializes and begins the server's operation, handling incoming requests or processes within the provided context.
Start(ctx context.Context) error
// Stop gracefully shuts down the server
// Stop gracefully shuts down the server, releasing any allocated resources and ensuring all operations are completed.
Stop(ctx context.Context) error
}
@ -39,8 +57,8 @@ type SnapshotClient interface {
// FetchSnapshotDetails retrieves detailed information about a specific snapshot
FetchSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error)
// DownloadSnapshot downloads a snapshot from the server
DownloadSnapshot(ctx context.Context, snapshotID string, targetDir string, localParentID string) error
// DownloadSnapshotDiff downloads a differential archive between two snapshots to a target directory
DownloadSnapshotDiff(ctx context.Context, snapshotID, localParentID, targetPath string) error
// Close closes the connection to the server
Close() error

View File

@ -2,13 +2,14 @@ package agate
import (
"archive/zip"
"bytes"
"context"
"errors"
"fmt"
"io"
"log"
"os"
"path/filepath"
"sort"
"strings"
"time"
@ -16,20 +17,31 @@ import (
"gitea.unprism.ru/KRBL/Agate/archive"
"gitea.unprism.ru/KRBL/Agate/hash"
"gitea.unprism.ru/KRBL/Agate/interfaces"
"gitea.unprism.ru/KRBL/Agate/store"
)
type SnapshotManagerData struct {
metadataStore store.MetadataStore
blobStore store.BlobStore
logger *log.Logger
}
func CreateSnapshotManager(metadataStore store.MetadataStore, blobStore store.BlobStore) (SnapshotManager, error) {
func CreateSnapshotManager(metadataStore store.MetadataStore, blobStore store.BlobStore, logger *log.Logger) (interfaces.SnapshotManager, error) {
if metadataStore == nil || blobStore == nil {
return nil, errors.New("parameters can't be nil")
}
return &SnapshotManagerData{metadataStore, blobStore}, nil
// Ensure logger is never nil.
if logger == nil {
logger = log.New(io.Discard, "", 0)
}
return &SnapshotManagerData{
metadataStore: metadataStore,
blobStore: blobStore,
logger: logger,
}, nil
}
func (data *SnapshotManagerData) CreateSnapshot(ctx context.Context, sourceDir string, name string, parentID string) (*store.Snapshot, error) {
@ -162,9 +174,9 @@ func (data *SnapshotManagerData) GetSnapshotDetails(ctx context.Context, snapsho
return snapshot, nil
}
func (data *SnapshotManagerData) ListSnapshots(ctx context.Context) ([]store.SnapshotInfo, error) {
// Retrieve list of snapshots from the metadata store
snapshots, err := data.metadataStore.ListSnapshotsMetadata(ctx)
func (data *SnapshotManagerData) ListSnapshots(ctx context.Context, opts store.ListOptions) ([]store.SnapshotInfo, error) {
// Retrieve list of snapshots from the metadata store with the provided options
snapshots, err := data.metadataStore.ListSnapshotsMetadata(ctx, opts)
if err != nil {
return nil, fmt.Errorf("failed to list snapshots: %w", err)
}
@ -177,27 +189,38 @@ func (data *SnapshotManagerData) DeleteSnapshot(ctx context.Context, snapshotID
return errors.New("snapshot ID cannot be empty")
}
// First check if the snapshot exists
_, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID)
snapshot, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID)
if err != nil {
if errors.Is(err, ErrNotFound) {
// If snapshot doesn't exist, return success (idempotent operation)
if errors.Is(err, store.ErrNotFound) {
return nil
}
return fmt.Errorf("failed to check if snapshot exists: %w", err)
}
// Delete the metadata first
parentID := snapshot.ParentID
opts := store.ListOptions{}
allSnapshots, err := data.metadataStore.ListSnapshotsMetadata(ctx, opts)
if err != nil {
return fmt.Errorf("failed to list snapshots: %w", err)
}
for _, info := range allSnapshots {
if info.ParentID == snapshotID {
if err := data.metadataStore.UpdateSnapshotParentID(ctx, info.ID, parentID); err != nil {
data.logger.Printf("WARNING: failed to update parent reference for snapshot %s: %v", info.ID, err)
} else {
data.logger.Printf("Updated parent reference for snapshot %s from %s to %s", info.ID, snapshotID, parentID)
}
}
}
if err := data.metadataStore.DeleteSnapshotMetadata(ctx, snapshotID); err != nil {
return fmt.Errorf("failed to delete snapshot metadata: %w", err)
}
// Then delete the blob
if err := data.blobStore.DeleteBlob(ctx, snapshotID); err != nil {
// Note: We don't return here because we've already deleted the metadata
// and the blob store should handle the case where the blob doesn't exist
// Log the error instead
fmt.Printf("Warning: failed to delete snapshot blob: %v\n", err)
data.logger.Printf("WARNING: failed to delete snapshot blob: %v", err)
}
return nil
@ -248,7 +271,7 @@ func (data *SnapshotManagerData) OpenFile(ctx context.Context, snapshotID string
return pr, nil
}
func (data *SnapshotManagerData) ExtractSnapshot(ctx context.Context, snapshotID string, path string) error {
func (data *SnapshotManagerData) ExtractSnapshot(ctx context.Context, snapshotID string, path string, cleanTarget bool) error {
if snapshotID == "" {
return errors.New("snapshot ID cannot be empty")
}
@ -258,8 +281,8 @@ func (data *SnapshotManagerData) ExtractSnapshot(ctx context.Context, snapshotID
path = data.blobStore.GetActiveDir()
}
// First check if the snapshot exists and get its metadata
snapshot, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID)
// First check if the snapshot exists
_, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID)
if err != nil {
if errors.Is(err, ErrNotFound) {
return ErrNotFound
@ -273,9 +296,20 @@ func (data *SnapshotManagerData) ExtractSnapshot(ctx context.Context, snapshotID
return fmt.Errorf("failed to get blob path: %w", err)
}
// Ensure the target directory exists
if err := os.MkdirAll(path, 0755); err != nil {
return fmt.Errorf("failed to create target directory: %w", err)
// If cleanTarget is true, clean the target directory before extraction
if cleanTarget {
// Remove the directory and recreate it
if err := os.RemoveAll(path); err != nil {
return fmt.Errorf("failed to clean target directory: %w", err)
}
if err := os.MkdirAll(path, 0755); err != nil {
return fmt.Errorf("failed to create target directory: %w", err)
}
} else {
// Just ensure the target directory exists
if err := os.MkdirAll(path, 0755); err != nil {
return fmt.Errorf("failed to create target directory: %w", err)
}
}
// Extract the archive to the target directory
@ -283,94 +317,6 @@ func (data *SnapshotManagerData) ExtractSnapshot(ctx context.Context, snapshotID
return fmt.Errorf("failed to extract snapshot: %w", err)
}
// Create maps for files and directories in the snapshot for quick lookup
snapshotFiles := make(map[string]bool)
snapshotDirs := make(map[string]bool)
for _, file := range snapshot.Files {
if file.IsDir {
snapshotDirs[file.Path] = true
} else {
snapshotFiles[file.Path] = true
}
}
// First pass: Collect all files and directories in the target
var allPaths []string
err = filepath.Walk(path, func(filePath string, info os.FileInfo, err error) error {
if err != nil {
return err
}
// Skip the root directory itself
if filePath == path {
return nil
}
// Create relative path
relPath, err := filepath.Rel(path, filePath)
if err != nil {
return fmt.Errorf("failed to get relative path: %w", err)
}
relPath = filepath.ToSlash(relPath)
allPaths = append(allPaths, filePath)
return nil
})
if err != nil {
return fmt.Errorf("failed to scan target directory: %w", err)
}
// Sort paths by length in descending order to process deepest paths first
// This ensures we process files before their parent directories
sort.Slice(allPaths, func(i, j int) bool {
return len(allPaths[i]) > len(allPaths[j])
})
// Second pass: Remove files and directories that aren't in the snapshot
for _, filePath := range allPaths {
info, err := os.Stat(filePath)
if err != nil {
// Skip if file no longer exists (might have been in a directory we already removed)
if os.IsNotExist(err) {
continue
}
return fmt.Errorf("failed to stat file %s: %w", filePath, err)
}
// Create relative path
relPath, err := filepath.Rel(path, filePath)
if err != nil {
return fmt.Errorf("failed to get relative path: %w", err)
}
relPath = filepath.ToSlash(relPath)
if info.IsDir() {
// For directories, check if it's in the snapshot or if it's empty
if !snapshotDirs[relPath] {
// Check if directory is empty
entries, err := os.ReadDir(filePath)
if err != nil {
return fmt.Errorf("failed to read directory %s: %w", filePath, err)
}
// If directory is empty, remove it
if len(entries) == 0 {
if err := os.Remove(filePath); err != nil {
return fmt.Errorf("failed to remove directory %s: %w", filePath, err)
}
}
}
} else {
// For files, remove if not in the snapshot
if !snapshotFiles[relPath] {
if err := os.Remove(filePath); err != nil {
return fmt.Errorf("failed to remove file %s: %w", filePath, err)
}
}
}
}
return nil
}
@ -440,22 +386,130 @@ func (data *SnapshotManagerData) UpdateSnapshotMetadata(ctx context.Context, sna
return errors.New("new name cannot be empty")
}
// Get the current snapshot metadata
snapshot, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID)
if err != nil {
if errors.Is(err, ErrNotFound) {
if errors.Is(err, store.ErrNotFound) {
return ErrNotFound
}
return fmt.Errorf("failed to get snapshot metadata: %w", err)
}
// Update the name
snapshot.Name = newName
// Save the updated metadata
if err := data.metadataStore.SaveSnapshotMetadata(ctx, *snapshot); err != nil {
return fmt.Errorf("failed to update snapshot metadata: %w", err)
}
return nil
}
// diffArchiveReader is a wrapper around an *os.File that handles cleanup of temporary files.
type diffArchiveReader struct {
*os.File
tempArchive string
tempStaging string
}
// Close closes the file and removes the temporary archive and staging directory.
func (r *diffArchiveReader) Close() error {
err := r.File.Close()
os.Remove(r.tempArchive)
os.RemoveAll(r.tempStaging)
return err
}
func (data *SnapshotManagerData) StreamSnapshotDiff(ctx context.Context, snapshotID, parentID string, offset int64) (io.ReadCloser, error) {
targetSnap, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID)
if err != nil {
return nil, fmt.Errorf("failed to get target snapshot metadata: %w", err)
}
parentFiles := make(map[string]string)
if parentID != "" {
parentSnap, err := data.metadataStore.GetSnapshotMetadata(ctx, parentID)
if err == nil {
for _, file := range parentSnap.Files {
if !file.IsDir {
parentFiles[file.Path] = file.SHA256
}
}
} else {
data.logger.Printf("Warning: failed to get parent snapshot %s, creating full diff: %v", parentID, err)
}
}
var filesToInclude []string
for _, file := range targetSnap.Files {
if file.IsDir {
continue
}
if parentHash, ok := parentFiles[file.Path]; !ok || parentHash != file.SHA256 {
filesToInclude = append(filesToInclude, file.Path)
}
}
if len(filesToInclude) == 0 {
return io.NopCloser(bytes.NewReader(nil)), nil
}
tempStagingDir, err := os.MkdirTemp(data.blobStore.GetBaseDir(), "diff-staging-*")
if err != nil {
return nil, fmt.Errorf("failed to create temp staging directory: %w", err)
}
targetBlobPath, err := data.blobStore.GetBlobPath(ctx, snapshotID)
if err != nil {
os.RemoveAll(tempStagingDir)
return nil, err
}
for _, filePath := range filesToInclude {
destPath := filepath.Join(tempStagingDir, filePath)
if err := os.MkdirAll(filepath.Dir(destPath), 0755); err != nil {
os.RemoveAll(tempStagingDir)
return nil, fmt.Errorf("failed to create dir for diff file: %w", err)
}
fileWriter, err := os.Create(destPath)
if err != nil {
os.RemoveAll(tempStagingDir)
return nil, err
}
err = archive.ExtractFileFromArchive(targetBlobPath, filePath, fileWriter)
fileWriter.Close()
if err != nil {
os.RemoveAll(tempStagingDir)
return nil, fmt.Errorf("failed to extract file %s for diff: %w", filePath, err)
}
}
tempArchivePath := filepath.Join(data.blobStore.GetBaseDir(), "diff-"+snapshotID+".zip")
if err := archive.CreateArchive(tempStagingDir, tempArchivePath); err != nil {
os.RemoveAll(tempStagingDir)
os.Remove(tempArchivePath)
return nil, fmt.Errorf("failed to create diff archive: %w", err)
}
archiveFile, err := os.Open(tempArchivePath)
if err != nil {
os.RemoveAll(tempStagingDir)
os.Remove(tempArchivePath)
return nil, err
}
if offset > 0 {
if _, err := archiveFile.Seek(offset, io.SeekStart); err != nil {
archiveFile.Close()
os.RemoveAll(tempStagingDir)
os.Remove(tempArchivePath)
return nil, fmt.Errorf("failed to seek in diff archive: %w", err)
}
}
return &diffArchiveReader{
File: archiveFile,
tempArchive: tempArchivePath,
tempStaging: tempStagingDir,
}, nil
}

View File

@ -90,8 +90,8 @@ func TestCreateAndGetSnapshot(t *testing.T) {
}
createTestFiles(t, sourceDir)
// Create a snapshot manager
manager, err := CreateSnapshotManager(metadataStore, blobStore)
// Create a snapshot manager with nil logger
manager, err := CreateSnapshotManager(metadataStore, blobStore, nil)
if err != nil {
t.Fatalf("Failed to create snapshot manager: %v", err)
}
@ -142,8 +142,8 @@ func TestListSnapshots(t *testing.T) {
}
createTestFiles(t, sourceDir)
// Create a snapshot manager
manager, err := CreateSnapshotManager(metadataStore, blobStore)
// Create a snapshot manager with nil logger
manager, err := CreateSnapshotManager(metadataStore, blobStore, nil)
if err != nil {
t.Fatalf("Failed to create snapshot manager: %v", err)
}
@ -165,8 +165,8 @@ func TestListSnapshots(t *testing.T) {
t.Fatalf("Failed to create snapshot: %v", err)
}
// List the snapshots
snapshots, err := manager.ListSnapshots(ctx)
// List the snapshots with empty options
snapshots, err := manager.ListSnapshots(ctx, store.ListOptions{})
if err != nil {
t.Fatalf("Failed to list snapshots: %v", err)
}
@ -209,8 +209,8 @@ func TestDeleteSnapshot(t *testing.T) {
}
createTestFiles(t, sourceDir)
// Create a snapshot manager
manager, err := CreateSnapshotManager(metadataStore, blobStore)
// Create a snapshot manager with nil logger
manager, err := CreateSnapshotManager(metadataStore, blobStore, nil)
if err != nil {
t.Fatalf("Failed to create snapshot manager: %v", err)
}
@ -235,7 +235,7 @@ func TestDeleteSnapshot(t *testing.T) {
}
// List snapshots to confirm it's gone
snapshots, err := manager.ListSnapshots(ctx)
snapshots, err := manager.ListSnapshots(ctx, store.ListOptions{})
if err != nil {
t.Fatalf("Failed to list snapshots: %v", err)
}
@ -255,8 +255,8 @@ func TestOpenFile(t *testing.T) {
}
createTestFiles(t, sourceDir)
// Create a snapshot manager
manager, err := CreateSnapshotManager(metadataStore, blobStore)
// Create a snapshot manager with nil logger
manager, err := CreateSnapshotManager(metadataStore, blobStore, nil)
if err != nil {
t.Fatalf("Failed to create snapshot manager: %v", err)
}
@ -309,8 +309,8 @@ func TestExtractSnapshot(t *testing.T) {
}
createTestFiles(t, sourceDir)
// Create a snapshot manager
manager, err := CreateSnapshotManager(metadataStore, blobStore)
// Create a snapshot manager with nil logger
manager, err := CreateSnapshotManager(metadataStore, blobStore, nil)
if err != nil {
t.Fatalf("Failed to create snapshot manager: %v", err)
}
@ -328,8 +328,8 @@ func TestExtractSnapshot(t *testing.T) {
t.Fatalf("Failed to create target directory: %v", err)
}
// Extract the snapshot
err = manager.ExtractSnapshot(ctx, snapshot.ID, targetDir)
// Extract the snapshot with default behavior (cleanTarget=false)
err = manager.ExtractSnapshot(ctx, snapshot.ID, targetDir, false)
if err != nil {
t.Fatalf("Failed to extract snapshot: %v", err)
}
@ -353,12 +353,159 @@ func TestExtractSnapshot(t *testing.T) {
}
// Try to extract a non-existent snapshot
err = manager.ExtractSnapshot(ctx, "nonexistent-id", targetDir)
err = manager.ExtractSnapshot(ctx, "nonexistent-id", targetDir, false)
if err == nil {
t.Fatalf("Expected error when extracting non-existent snapshot, got nil")
}
}
// TestExtractSnapshot_SafeRestore tests that ExtractSnapshot with cleanTarget=false
// does not remove extra files in the target directory
func TestExtractSnapshot_SafeRestore(t *testing.T) {
tempDir, metadataStore, blobStore, cleanup := setupTestEnvironment(t)
defer cleanup()
// Create a source directory with test files
sourceDir := filepath.Join(tempDir, "source")
if err := os.MkdirAll(sourceDir, 0755); err != nil {
t.Fatalf("Failed to create source directory: %v", err)
}
createTestFiles(t, sourceDir)
// Create a snapshot manager with nil logger
manager, err := CreateSnapshotManager(metadataStore, blobStore, nil)
if err != nil {
t.Fatalf("Failed to create snapshot manager: %v", err)
}
// Create a snapshot (snapshot A)
ctx := context.Background()
snapshot, err := manager.CreateSnapshot(ctx, sourceDir, "Snapshot A", "")
if err != nil {
t.Fatalf("Failed to create snapshot: %v", err)
}
// Create a target directory and place an "extra" file in it
targetDir := filepath.Join(tempDir, "target")
if err := os.MkdirAll(targetDir, 0755); err != nil {
t.Fatalf("Failed to create target directory: %v", err)
}
extraFilePath := filepath.Join(targetDir, "extra.txt")
if err := os.WriteFile(extraFilePath, []byte("This is an extra file"), 0644); err != nil {
t.Fatalf("Failed to create extra file: %v", err)
}
// Extract the snapshot with cleanTarget=false
err = manager.ExtractSnapshot(ctx, snapshot.ID, targetDir, false)
if err != nil {
t.Fatalf("Failed to extract snapshot: %v", err)
}
// Check that all files from the snapshot were restored
testFiles := map[string]string{
filepath.Join(targetDir, "file1.txt"): "This is file 1",
filepath.Join(targetDir, "file2.txt"): "This is file 2",
filepath.Join(targetDir, "subdir/subfile1.txt"): "This is subfile 1",
filepath.Join(targetDir, "subdir/subfile2.txt"): "This is subfile 2",
}
for path, expectedContent := range testFiles {
content, err := os.ReadFile(path)
if err != nil {
t.Fatalf("Failed to read extracted file %s: %v", path, err)
}
if string(content) != expectedContent {
t.Errorf("Extracted file %s has wrong content: got %s, want %s", path, string(content), expectedContent)
}
}
// Check that the extra file was NOT deleted
if _, err := os.Stat(extraFilePath); os.IsNotExist(err) {
t.Errorf("Extra file was deleted, but should have been preserved with cleanTarget=false")
} else if err != nil {
t.Fatalf("Failed to check if extra file exists: %v", err)
} else {
// Read the content to make sure it wasn't modified
content, err := os.ReadFile(extraFilePath)
if err != nil {
t.Fatalf("Failed to read extra file: %v", err)
}
if string(content) != "This is an extra file" {
t.Errorf("Extra file content was modified: got %s, want %s", string(content), "This is an extra file")
}
}
}
// TestExtractSnapshot_CleanRestore tests that ExtractSnapshot with cleanTarget=true
// completely cleans the target directory before restoration
func TestExtractSnapshot_CleanRestore(t *testing.T) {
tempDir, metadataStore, blobStore, cleanup := setupTestEnvironment(t)
defer cleanup()
// Create a source directory with test files
sourceDir := filepath.Join(tempDir, "source")
if err := os.MkdirAll(sourceDir, 0755); err != nil {
t.Fatalf("Failed to create source directory: %v", err)
}
createTestFiles(t, sourceDir)
// Create a snapshot manager with nil logger
manager, err := CreateSnapshotManager(metadataStore, blobStore, nil)
if err != nil {
t.Fatalf("Failed to create snapshot manager: %v", err)
}
// Create a snapshot (snapshot A)
ctx := context.Background()
snapshot, err := manager.CreateSnapshot(ctx, sourceDir, "Snapshot A", "")
if err != nil {
t.Fatalf("Failed to create snapshot: %v", err)
}
// Create a target directory and place an "extra" file in it
targetDir := filepath.Join(tempDir, "target")
if err := os.MkdirAll(targetDir, 0755); err != nil {
t.Fatalf("Failed to create target directory: %v", err)
}
extraFilePath := filepath.Join(targetDir, "extra.txt")
if err := os.WriteFile(extraFilePath, []byte("This is an extra file"), 0644); err != nil {
t.Fatalf("Failed to create extra file: %v", err)
}
// Extract the snapshot with cleanTarget=true
err = manager.ExtractSnapshot(ctx, snapshot.ID, targetDir, true)
if err != nil {
t.Fatalf("Failed to extract snapshot: %v", err)
}
// Check that all files from the snapshot were restored
testFiles := map[string]string{
filepath.Join(targetDir, "file1.txt"): "This is file 1",
filepath.Join(targetDir, "file2.txt"): "This is file 2",
filepath.Join(targetDir, "subdir/subfile1.txt"): "This is subfile 1",
filepath.Join(targetDir, "subdir/subfile2.txt"): "This is subfile 2",
}
for path, expectedContent := range testFiles {
content, err := os.ReadFile(path)
if err != nil {
t.Fatalf("Failed to read extracted file %s: %v", path, err)
}
if string(content) != expectedContent {
t.Errorf("Extracted file %s has wrong content: got %s, want %s", path, string(content), expectedContent)
}
}
// Check that the extra file WAS deleted
if _, err := os.Stat(extraFilePath); os.IsNotExist(err) {
// This is the expected behavior
} else if err != nil {
t.Fatalf("Failed to check if extra file exists: %v", err)
} else {
t.Errorf("Extra file was not deleted, but should have been removed with cleanTarget=true")
}
}
func TestUpdateSnapshotMetadata(t *testing.T) {
tempDir, metadataStore, blobStore, cleanup := setupTestEnvironment(t)
defer cleanup()
@ -370,8 +517,8 @@ func TestUpdateSnapshotMetadata(t *testing.T) {
}
createTestFiles(t, sourceDir)
// Create a snapshot manager
manager, err := CreateSnapshotManager(metadataStore, blobStore)
// Create a snapshot manager with nil logger
manager, err := CreateSnapshotManager(metadataStore, blobStore, nil)
if err != nil {
t.Fatalf("Failed to create snapshot manager: %v", err)
}
@ -407,3 +554,60 @@ func TestUpdateSnapshotMetadata(t *testing.T) {
t.Fatalf("Expected error when updating non-existent snapshot, got nil")
}
}
func TestStreamSnapshotDiff_EdgeCases(t *testing.T) {
tempDir, metadataStore, blobStore, cleanup := setupTestEnvironment(t)
defer cleanup()
sourceDir := filepath.Join(tempDir, "source")
os.MkdirAll(sourceDir, 0755)
createTestFiles(t, sourceDir)
manager, err := CreateSnapshotManager(metadataStore, blobStore, nil)
if err != nil {
t.Fatalf("Failed to create snapshot manager: %v", err)
}
ctx := context.Background()
// Create two identical snapshots
snap1, _ := manager.CreateSnapshot(ctx, sourceDir, "Snap1", "")
snap2, _ := manager.CreateSnapshot(ctx, sourceDir, "Snap2", snap1.ID)
// Test 1: Diff between identical snapshots should be empty
reader, err := manager.StreamSnapshotDiff(ctx, snap2.ID, snap1.ID, 0)
if err != nil {
t.Fatalf("Expected no error for identical snapshots, got %v", err)
}
defer reader.Close()
data, _ := io.ReadAll(reader)
if len(data) != 0 {
t.Errorf("Expected empty diff for identical snapshots, got %d bytes", len(data))
}
// Test 2: Diff with a non-existent parent should be a full archive
reader, err = manager.StreamSnapshotDiff(ctx, snap1.ID, "non-existent-parent", 0)
if err != nil {
t.Fatalf("Expected no error for non-existent parent, got %v", err)
}
defer reader.Close()
data, _ = io.ReadAll(reader)
if len(data) == 0 {
t.Error("Expected full archive for non-existent parent, got empty diff")
}
// Create an empty source dir
emptyDir := filepath.Join(tempDir, "empty_source")
os.MkdirAll(emptyDir, 0755)
emptySnap, _ := manager.CreateSnapshot(ctx, emptyDir, "EmptySnap", "")
// Test 3: Diff of an empty snapshot should be empty
reader, err = manager.StreamSnapshotDiff(ctx, emptySnap.ID, "", 0)
if err != nil {
t.Fatalf("Expected no error for empty snapshot, got %v", err)
}
defer reader.Close()
data, _ = io.ReadAll(reader)
if len(data) != 0 {
t.Errorf("Expected empty diff for empty snapshot, got %d bytes", len(data))
}
}

View File

@ -15,34 +15,26 @@ import (
"gitea.unprism.ru/KRBL/Agate/store"
)
// Client represents a client for connecting to a remote snapshot server
// It implements the interfaces.SnapshotClient interface
// Client представляет клиент для подключения к удаленному серверу снапшотов.
type Client struct {
conn *stdgrpc.ClientConn
client agateGrpc.SnapshotServiceClient
}
// Ensure Client implements interfaces.SnapshotClient
// Убедимся, что Client реализует интерфейс interfaces.SnapshotClient
var _ interfaces.SnapshotClient = (*Client)(nil)
// NewClient creates a new client connected to the specified address
// NewClient создает нового клиента, подключенного к указанному адресу.
func NewClient(address string) (*Client, error) {
// Connect to the server with insecure credentials (for simplicity)
conn, err := stdgrpc.Dial(address, stdgrpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, fmt.Errorf("failed to connect to server at %s: %w", address, err)
}
// Create the gRPC client
client := agateGrpc.NewSnapshotServiceClient(conn)
return &Client{
conn: conn,
client: client,
}, nil
return &Client{conn: conn, client: client}, nil
}
// Close closes the connection to the server
// Close закрывает соединение с сервером.
func (c *Client) Close() error {
if c.conn != nil {
return c.conn.Close()
@ -50,14 +42,13 @@ func (c *Client) Close() error {
return nil
}
// ListSnapshots retrieves a list of snapshots from the remote server
// ListSnapshots получает список снапшотов с удаленного сервера.
func (c *Client) ListSnapshots(ctx context.Context) ([]store.SnapshotInfo, error) {
response, err := c.client.ListSnapshots(ctx, &agateGrpc.ListSnapshotsRequest{})
if err != nil {
return nil, fmt.Errorf("failed to list snapshots: %w", err)
}
// Convert gRPC snapshot info to store.SnapshotInfo
snapshots := make([]store.SnapshotInfo, 0, len(response.Snapshots))
for _, snapshot := range response.Snapshots {
snapshots = append(snapshots, store.SnapshotInfo{
@ -67,11 +58,10 @@ func (c *Client) ListSnapshots(ctx context.Context) ([]store.SnapshotInfo, error
CreationTime: snapshot.CreationTime.AsTime(),
})
}
return snapshots, nil
}
// FetchSnapshotDetails retrieves detailed information about a specific snapshot
// FetchSnapshotDetails получает детальную информацию о конкретном снапшоте.
func (c *Client) FetchSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error) {
response, err := c.client.GetSnapshotDetails(ctx, &agateGrpc.GetSnapshotDetailsRequest{
SnapshotId: snapshotID,
@ -80,7 +70,6 @@ func (c *Client) FetchSnapshotDetails(ctx context.Context, snapshotID string) (*
return nil, fmt.Errorf("failed to get snapshot details: %w", err)
}
// Convert gRPC snapshot details to store.Snapshot
snapshot := &store.Snapshot{
ID: response.Info.Id,
Name: response.Info.Name,
@ -89,7 +78,6 @@ func (c *Client) FetchSnapshotDetails(ctx context.Context, snapshotID string) (*
Files: make([]store.FileInfo, 0, len(response.Files)),
}
// Convert file info
for _, file := range response.Files {
snapshot.Files = append(snapshot.Files, store.FileInfo{
Path: file.Path,
@ -98,118 +86,48 @@ func (c *Client) FetchSnapshotDetails(ctx context.Context, snapshotID string) (*
SHA256: file.Sha256Hash,
})
}
return snapshot, nil
}
// DownloadSnapshot downloads a snapshot from the server
// This implementation downloads each file individually to optimize bandwidth usage
func (c *Client) DownloadSnapshot(ctx context.Context, snapshotID string, targetDir string, localParentID string) error {
// Get snapshot details
snapshot, err := c.FetchSnapshotDetails(ctx, snapshotID)
// DownloadSnapshotDiff скачивает архив с разницей между снапшотами.
func (c *Client) DownloadSnapshotDiff(ctx context.Context, snapshotID, localParentID, targetPath string) error {
var offset int64
fileInfo, err := os.Stat(targetPath)
if err == nil {
offset = fileInfo.Size()
} else if !os.IsNotExist(err) {
return fmt.Errorf("failed to stat temporary file: %w", err)
}
req := &agateGrpc.DownloadSnapshotDiffRequest{
SnapshotId: snapshotID,
LocalParentId: localParentID,
Offset: offset,
}
stream, err := c.client.DownloadSnapshotDiff(ctx, req)
if err != nil {
return fmt.Errorf("failed to get snapshot details: %w", err)
return fmt.Errorf("failed to start snapshot diff download: %w", err)
}
// Create target directory if it doesn't exist
if err := os.MkdirAll(targetDir, 0755); err != nil {
return fmt.Errorf("failed to create target directory: %w", err)
}
// If a local parent is specified, get its details to compare files
var localParentFiles map[string]store.FileInfo
if localParentID != "" {
localParent, err := c.FetchSnapshotDetails(ctx, localParentID)
if err == nil {
// Create a map of file paths to file info for quick lookup
localParentFiles = make(map[string]store.FileInfo, len(localParent.Files))
for _, file := range localParent.Files {
localParentFiles[file.Path] = file
}
}
}
// Download each file
for _, file := range snapshot.Files {
// Skip directories, we'll create them when needed
if file.IsDir {
// Create directory
dirPath := filepath.Join(targetDir, file.Path)
if err := os.MkdirAll(dirPath, 0755); err != nil {
return fmt.Errorf("failed to create directory %s: %w", dirPath, err)
}
continue
}
// Check if we can skip downloading this file
if localParentFiles != nil {
if parentFile, exists := localParentFiles[file.Path]; exists && parentFile.SHA256 == file.SHA256 {
// File exists in parent with same hash, copy it instead of downloading
parentFilePath := filepath.Join(targetDir, "..", localParentID, file.Path)
targetFilePath := filepath.Join(targetDir, file.Path)
// Ensure parent directory exists
if err := os.MkdirAll(filepath.Dir(targetFilePath), 0755); err != nil {
return fmt.Errorf("failed to create directory for %s: %w", targetFilePath, err)
}
// Copy the file
if err := copyFile(parentFilePath, targetFilePath); err != nil {
// If copy fails, fall back to downloading
fmt.Printf("Failed to copy file %s, will download instead: %v\n", file.Path, err)
} else {
// Skip to next file
continue
}
}
}
// Download the file
if err := c.downloadFile(ctx, snapshotID, file.Path, filepath.Join(targetDir, file.Path)); err != nil {
return fmt.Errorf("failed to download file %s: %w", file.Path, err)
}
}
return nil
}
// downloadFile downloads a single file from the server
func (c *Client) downloadFile(ctx context.Context, snapshotID, filePath, targetPath string) error {
// Create the request
req := &agateGrpc.DownloadFileRequest{
SnapshotId: snapshotID,
FilePath: filePath,
}
// Start streaming the file
stream, err := c.client.DownloadFile(ctx, req)
if err != nil {
return fmt.Errorf("failed to start file download: %w", err)
}
// Ensure the target directory exists
if err := os.MkdirAll(filepath.Dir(targetPath), 0755); err != nil {
return fmt.Errorf("failed to create directory for %s: %w", targetPath, err)
}
// Create the target file
file, err := os.Create(targetPath)
file, err := os.OpenFile(targetPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return fmt.Errorf("failed to create file %s: %w", targetPath, err)
return fmt.Errorf("failed to open file %s: %w", targetPath, err)
}
defer file.Close()
// Receive and write chunks
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("error receiving file chunk: %w", err)
return fmt.Errorf("error receiving diff chunk: %w", err)
}
// Write the chunk to the file
if _, err := file.Write(resp.ChunkData); err != nil {
return fmt.Errorf("error writing to file: %w", err)
}
@ -217,26 +135,3 @@ func (c *Client) downloadFile(ctx context.Context, snapshotID, filePath, targetP
return nil
}
// Helper function to copy a file
func copyFile(src, dst string) error {
sourceFile, err := os.Open(src)
if err != nil {
return err
}
defer sourceFile.Close()
destFile, err := os.Create(dst)
if err != nil {
return err
}
defer destFile.Close()
_, err = io.Copy(destFile, sourceFile)
return err
}
// Connect creates a new client connected to the specified address
func Connect(address string) (*Client, error) {
return NewClient(address)
}

View File

@ -14,21 +14,21 @@ import (
"gitea.unprism.ru/KRBL/Agate/store"
)
// Server implements the gRPC server for snapshots
// Server реализует gRPC-сервер для снапшотов.
type Server struct {
agateGrpc.UnimplementedSnapshotServiceServer
manager interfaces.SnapshotManager
server *stdgrpc.Server
}
// NewServer creates a new snapshot server
// NewServer создает новый сервер снапшотов.
func NewServer(manager interfaces.SnapshotManager) *Server {
return &Server{
manager: manager,
}
}
// Start starts the gRPC server on the specified address
// Start запускает gRPC-сервер на указанном адресе.
func (s *Server) Start(ctx context.Context, address string) error {
lis, err := net.Listen("tcp", address)
if err != nil {
@ -45,21 +45,25 @@ func (s *Server) Start(ctx context.Context, address string) error {
}()
fmt.Printf("Server started on %s\n", address)
// Ждем отмены контекста для остановки сервера
<-ctx.Done()
s.Stop()
return nil
}
// Stop gracefully stops the server
func (s *Server) Stop(ctx context.Context) error {
// Stop изящно останавливает сервер.
func (s *Server) Stop() {
if s.server != nil {
s.server.GracefulStop()
fmt.Println("Server stopped")
}
return nil
}
// ListSnapshots implements the gRPC ListSnapshots method
// ListSnapshots реализует gRPC-метод ListSnapshots.
func (s *Server) ListSnapshots(ctx context.Context, req *agateGrpc.ListSnapshotsRequest) (*agateGrpc.ListSnapshotsResponse, error) {
snapshots, err := s.manager.ListSnapshots(ctx)
opts := store.ListOptions{}
snapshots, err := s.manager.ListSnapshots(ctx, opts)
if err != nil {
return nil, fmt.Errorf("failed to list snapshots: %w", err)
}
@ -75,7 +79,7 @@ func (s *Server) ListSnapshots(ctx context.Context, req *agateGrpc.ListSnapshots
return response, nil
}
// GetSnapshotDetails implements the gRPC GetSnapshotDetails method
// GetSnapshotDetails реализует gRPC-метод GetSnapshotDetails.
func (s *Server) GetSnapshotDetails(ctx context.Context, req *agateGrpc.GetSnapshotDetailsRequest) (*agateGrpc.SnapshotDetails, error) {
snapshot, err := s.manager.GetSnapshotDetails(ctx, req.SnapshotId)
if err != nil {
@ -104,17 +108,15 @@ func (s *Server) GetSnapshotDetails(ctx context.Context, req *agateGrpc.GetSnaps
return response, nil
}
// DownloadFile implements the gRPC DownloadFile method
// DownloadFile реализует gRPC-метод DownloadFile.
func (s *Server) DownloadFile(req *agateGrpc.DownloadFileRequest, stream agateGrpc.SnapshotService_DownloadFileServer) error {
// Open the file from the snapshot
fileReader, err := s.manager.OpenFile(context.Background(), req.SnapshotId, req.FilePath)
if err != nil {
return fmt.Errorf("failed to open file: %w", err)
}
defer fileReader.Close()
// Read the file in chunks and send them to the client
buffer := make([]byte, 64*1024) // 64KB chunks
buffer := make([]byte, 64*1024)
for {
n, err := fileReader.Read(buffer)
if err == io.EOF {
@ -123,19 +125,40 @@ func (s *Server) DownloadFile(req *agateGrpc.DownloadFileRequest, stream agateGr
if err != nil {
return fmt.Errorf("failed to read file: %w", err)
}
// Send the chunk to the client
if err := stream.Send(&agateGrpc.DownloadFileResponse{
ChunkData: buffer[:n],
}); err != nil {
if err := stream.Send(&agateGrpc.DownloadFileResponse{ChunkData: buffer[:n]}); err != nil {
return fmt.Errorf("failed to send chunk: %w", err)
}
}
return nil
}
// Helper function to convert store.SnapshotInfo to grpc.SnapshotInfo
// DownloadSnapshotDiff реализует gRPC-метод DownloadSnapshotDiff.
func (s *Server) DownloadSnapshotDiff(req *agateGrpc.DownloadSnapshotDiffRequest, stream agateGrpc.SnapshotService_DownloadSnapshotDiffServer) error {
diffReader, err := s.manager.StreamSnapshotDiff(context.Background(), req.SnapshotId, req.LocalParentId, req.Offset)
if err != nil {
return fmt.Errorf("failed to stream snapshot diff: %w", err)
}
defer diffReader.Close()
buffer := make([]byte, 64*1024)
for {
n, err := diffReader.Read(buffer)
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("failed to read from diff stream: %w", err)
}
if n > 0 {
if err := stream.Send(&agateGrpc.DownloadFileResponse{ChunkData: buffer[:n]}); err != nil {
return fmt.Errorf("failed to send diff chunk: %w", err)
}
}
}
return nil
}
// Вспомогательная функция для конвертации store.SnapshotInfo в grpc.SnapshotInfo
func convertToGrpcSnapshotInfo(info store.SnapshotInfo) *agateGrpc.SnapshotInfo {
return &agateGrpc.SnapshotInfo{
Id: info.ID,
@ -144,12 +167,3 @@ func convertToGrpcSnapshotInfo(info store.SnapshotInfo) *agateGrpc.SnapshotInfo
CreationTime: timestamppb.New(info.CreationTime),
}
}
// RunServer is a helper function to create and start a snapshot server
func RunServer(ctx context.Context, manager interfaces.SnapshotManager, address string) (*Server, error) {
server := NewServer(manager)
if err := server.Start(ctx, address); err != nil {
return nil, err
}
return server, nil
}

View File

@ -1,53 +0,0 @@
package agate
import (
"context"
"gitea.unprism.ru/KRBL/Agate/store"
"io"
)
// SnapshotManager is an interface that defines operations for managing and interacting with snapshots.
type SnapshotManager interface {
// CreateSnapshot creates a new snapshot from the specified source directory, associating it with a given name and parent ID.
// Returns the created Snapshot with its metadata or an error if the process fails.
CreateSnapshot(ctx context.Context, sourceDir string, name string, parentID string) (*store.Snapshot, error)
// GetSnapshotDetails retrieves detailed metadata for a specific snapshot identified by its unique snapshotID.
// Returns a Snapshot object containing metadata
GetSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error)
// ListSnapshots retrieves a list of all available snapshots, returning their basic information as SnapshotInfo.
ListSnapshots(ctx context.Context) ([]store.SnapshotInfo, error)
// DeleteSnapshot removes a snapshot identified by snapshotID. Returns an error if the snapshot does not exist or cannot be deleted.
DeleteSnapshot(ctx context.Context, snapshotID string) error
// OpenFile retrieves and opens a file from the specified snapshot, returning a readable stream and an error, if any.
OpenFile(ctx context.Context, snapshotID string, filePath string) (io.ReadCloser, error)
// ExtractSnapshot extracts the contents of a specified snapshot to a target directory at the given path.
// Returns an error if the snapshot ID is invalid or the extraction fails.
ExtractSnapshot(ctx context.Context, snapshotID string, path string) error
// UpdateSnapshotMetadata updates the metadata of an existing snapshot, allowing changes to its name.
UpdateSnapshotMetadata(ctx context.Context, snapshotID string, newName string) error
}
type SnapshotServer interface {
// Start initializes and begins the server's operation, handling incoming requests or processes within the provided context.
Start(ctx context.Context) error
// Stop gracefully shuts down the server, releasing any allocated resources and ensuring all operations are completed.
Stop(ctx context.Context) error
}
type SnapshotClient interface {
// ListSnapshots retrieves a list of snapshots containing basic metadata, such as ID, name, parent ID, and creation time.
ListSnapshots(ctx context.Context) ([]store.SnapshotInfo, error)
// FetchSnapshotDetails retrieves detailed metadata about a specific snapshot identified by snapshotID.
FetchSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error)
// DownloadSnapshot retrieves the snapshot content for the given snapshotID and returns it as an io.ReadCloser.
DownloadSnapshot(ctx context.Context, snapshotID string) (io.ReadCloser, error)
}

View File

@ -167,58 +167,83 @@ func (s *sqliteStore) GetSnapshotMetadata(ctx context.Context, snapshotID string
return &snap, nil
}
// ListSnapshotsMetadata извлекает краткую информацию обо всех снапшотах.
func (s *sqliteStore) ListSnapshotsMetadata(ctx context.Context) ([]store.SnapshotInfo, error) {
// Simplified implementation to debug the issue
fmt.Println("ListSnapshotsMetadata called")
// ListSnapshotsMetadata retrieves basic information about snapshots with filtering and pagination.
func (s *sqliteStore) ListSnapshotsMetadata(ctx context.Context, opts store.ListOptions) ([]store.SnapshotInfo, error) {
// Build the query with optional filtering
var query string
var args []interface{}
// Get all snapshot IDs first
query := `SELECT id FROM snapshots ORDER BY creation_time DESC;`
fmt.Println("Executing query:", query)
if opts.FilterByName != "" {
query = `SELECT id, name, parent_id, creation_time FROM snapshots WHERE name LIKE ? ORDER BY creation_time DESC`
args = append(args, "%"+opts.FilterByName+"%")
} else {
query = `SELECT id, name, parent_id, creation_time FROM snapshots ORDER BY creation_time DESC`
}
rows, err := s.db.QueryContext(ctx, query)
// Add pagination if specified
if opts.Limit > 0 {
query += " LIMIT ?"
args = append(args, opts.Limit)
if opts.Offset > 0 {
query += " OFFSET ?"
args = append(args, opts.Offset)
}
}
// Execute the query
rows, err := s.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, fmt.Errorf("failed to query snapshot IDs: %w", err)
return nil, fmt.Errorf("failed to query snapshots: %w", err)
}
defer rows.Close()
var snapshots []store.SnapshotInfo
// For each ID, get the full snapshot details
// Iterate through the results
for rows.Next() {
var id string
if err := rows.Scan(&id); err != nil {
return nil, fmt.Errorf("failed to scan snapshot ID: %w", err)
var info store.SnapshotInfo
var parentID sql.NullString
var creationTimeStr string
if err := rows.Scan(&info.ID, &info.Name, &parentID, &creationTimeStr); err != nil {
return nil, fmt.Errorf("failed to scan snapshot row: %w", err)
}
// Get the full snapshot details
snapshot, err := s.GetSnapshotMetadata(ctx, id)
if err != nil {
return nil, fmt.Errorf("failed to get snapshot details for ID %s: %w", id, err)
// Set parent ID if not NULL
if parentID.Valid {
info.ParentID = parentID.String
}
// Convert to SnapshotInfo
info := store.SnapshotInfo{
ID: snapshot.ID,
Name: snapshot.Name,
ParentID: snapshot.ParentID,
CreationTime: snapshot.CreationTime,
// Parse creation time
const sqliteLayout = "2006-01-02 15:04:05" // Standard SQLite DATETIME format without timezone
t, parseErr := time.Parse(sqliteLayout, creationTimeStr)
if parseErr != nil {
// Try format with milliseconds if the first one didn't work
const sqliteLayoutWithMs = "2006-01-02 15:04:05.999999999"
t, parseErr = time.Parse(sqliteLayoutWithMs, creationTimeStr)
if parseErr != nil {
// Try RFC3339 if saved as UTC().Format(time.RFC3339)
t, parseErr = time.Parse(time.RFC3339, creationTimeStr)
if parseErr != nil {
return nil, fmt.Errorf("failed to parse creation time '%s' for snapshot %s: %w", creationTimeStr, info.ID, parseErr)
}
}
}
info.CreationTime = t.UTC() // Store as UTC
snapshots = append(snapshots, info)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("error iterating snapshot IDs: %w", err)
return nil, fmt.Errorf("error iterating snapshot rows: %w", err)
}
// If no snapshots found, return an empty slice
if len(snapshots) == 0 {
fmt.Println("No snapshots found")
return []store.SnapshotInfo{}, nil
}
fmt.Printf("Found %d snapshots\n", len(snapshots))
return snapshots, nil
}
@ -240,3 +265,13 @@ func (s *sqliteStore) DeleteSnapshotMetadata(ctx context.Context, snapshotID str
return nil // Не возвращаем ошибку, если запись не найдена
}
// UpdateSnapshotParentID обновляет ParentID для указанного снапшота.
func (s *sqliteStore) UpdateSnapshotParentID(ctx context.Context, snapshotID, newParentID string) error {
query := `UPDATE snapshots SET parent_id = ? WHERE id = ?;`
_, err := s.db.ExecContext(ctx, query, newParentID, snapshotID)
if err != nil {
return fmt.Errorf("failed to update parent ID for snapshot %s: %w", snapshotID, err)
}
return nil
}

View File

@ -121,7 +121,7 @@ func TestListSnapshotsMetadata(t *testing.T) {
// Create test snapshots
ctx := context.Background()
now := time.Now().UTC().Truncate(time.Second)
testSnapshots := []store.Snapshot{
{
ID: "snapshot-1",
@ -154,8 +154,8 @@ func TestListSnapshotsMetadata(t *testing.T) {
}
}
// List the snapshots
snapshots, err := s.ListSnapshotsMetadata(ctx)
// List the snapshots with empty options
snapshots, err := s.ListSnapshotsMetadata(ctx, store.ListOptions{})
if err != nil {
t.Fatalf("Failed to list snapshots: %v", err)
}
@ -189,6 +189,164 @@ func TestListSnapshotsMetadata(t *testing.T) {
}
}
func TestListSnapshotsMetadata_WithOptions(t *testing.T) {
// Create a temporary directory for tests
tempDir, err := os.MkdirTemp("", "agate-test-*")
if err != nil {
t.Fatalf("Failed to create temp directory: %v", err)
}
defer os.RemoveAll(tempDir) // Clean up after test
// Create a new store
dbPath := filepath.Join(tempDir, "test.db")
s, err := NewSQLiteStore(dbPath)
if err != nil {
t.Fatalf("Failed to create SQLite store: %v", err)
}
defer s.Close()
// Create test snapshots with different names
ctx := context.Background()
now := time.Now().UTC().Truncate(time.Second)
testSnapshots := []store.Snapshot{
{
ID: "alpha-1",
Name: "alpha-1",
ParentID: "",
CreationTime: now.Add(-3 * time.Hour),
Files: []store.FileInfo{},
},
{
ID: "alpha-2",
Name: "alpha-2",
ParentID: "alpha-1",
CreationTime: now.Add(-2 * time.Hour),
Files: []store.FileInfo{},
},
{
ID: "beta-1",
Name: "beta-1",
ParentID: "",
CreationTime: now.Add(-1 * time.Hour),
Files: []store.FileInfo{},
},
}
// Save the snapshots
for _, snap := range testSnapshots {
err = s.SaveSnapshotMetadata(ctx, snap)
if err != nil {
t.Fatalf("Failed to save snapshot metadata: %v", err)
}
}
// Test different ListOptions scenarios
t.Run("FilterByName", func(t *testing.T) {
// Filter snapshots by name "alpha"
opts := store.ListOptions{
FilterByName: "alpha",
}
snapshots, err := s.ListSnapshotsMetadata(ctx, opts)
if err != nil {
t.Fatalf("Failed to list snapshots with filter: %v", err)
}
// Should return 2 snapshots (alpha-1 and alpha-2)
if len(snapshots) != 2 {
t.Errorf("Wrong number of snapshots returned: got %d, want %d", len(snapshots), 2)
}
// Check that only alpha snapshots are returned
for _, snap := range snapshots {
if snap.ID != "alpha-1" && snap.ID != "alpha-2" {
t.Errorf("Unexpected snapshot ID in filtered results: %s", snap.ID)
}
}
})
t.Run("Limit", func(t *testing.T) {
// Limit to 1 snapshot (should return the newest one)
opts := store.ListOptions{
Limit: 1,
}
snapshots, err := s.ListSnapshotsMetadata(ctx, opts)
if err != nil {
t.Fatalf("Failed to list snapshots with limit: %v", err)
}
// Should return 1 snapshot
if len(snapshots) != 1 {
t.Errorf("Wrong number of snapshots returned: got %d, want %d", len(snapshots), 1)
}
// The newest snapshot should be beta-1
if snapshots[0].ID != "beta-1" {
t.Errorf("Wrong snapshot returned with limit: got %s, want %s", snapshots[0].ID, "beta-1")
}
})
t.Run("Offset", func(t *testing.T) {
// Limit to 1 snapshot with offset 1 (should return the second newest)
opts := store.ListOptions{
Limit: 1,
Offset: 1,
}
snapshots, err := s.ListSnapshotsMetadata(ctx, opts)
if err != nil {
t.Fatalf("Failed to list snapshots with offset: %v", err)
}
// Should return 1 snapshot
if len(snapshots) != 1 {
t.Errorf("Wrong number of snapshots returned: got %d, want %d", len(snapshots), 1)
}
// The second newest snapshot should be alpha-2
if snapshots[0].ID != "alpha-2" {
t.Errorf("Wrong snapshot returned with offset: got %s, want %s", snapshots[0].ID, "alpha-2")
}
})
t.Run("FilterAndPagination", func(t *testing.T) {
// Filter by "alpha" with limit 1
opts := store.ListOptions{
FilterByName: "alpha",
Limit: 1,
}
snapshots, err := s.ListSnapshotsMetadata(ctx, opts)
if err != nil {
t.Fatalf("Failed to list snapshots with filter and pagination: %v", err)
}
// Should return 1 snapshot
if len(snapshots) != 1 {
t.Errorf("Wrong number of snapshots returned: got %d, want %d", len(snapshots), 1)
}
// The newest alpha snapshot should be alpha-2
if snapshots[0].ID != "alpha-2" {
t.Errorf("Wrong snapshot returned with filter and limit: got %s, want %s", snapshots[0].ID, "alpha-2")
}
})
t.Run("NoResults", func(t *testing.T) {
// Filter by a name that doesn't exist
opts := store.ListOptions{
FilterByName: "gamma",
}
snapshots, err := s.ListSnapshotsMetadata(ctx, opts)
if err != nil {
t.Fatalf("Failed to list snapshots with non-matching filter: %v", err)
}
// Should return 0 snapshots
if len(snapshots) != 0 {
t.Errorf("Expected 0 snapshots, got %d", len(snapshots))
}
})
}
func TestDeleteSnapshotMetadata(t *testing.T) {
// Create a temporary directory for tests
tempDir, err := os.MkdirTemp("", "agate-test-*")
@ -238,4 +396,4 @@ func TestDeleteSnapshotMetadata(t *testing.T) {
if err != nil {
t.Fatalf("DeleteSnapshotMetadata returned an error for non-existent snapshot: %v", err)
}
}
}

View File

@ -31,6 +31,13 @@ type SnapshotInfo struct {
CreationTime time.Time // Время создания
}
// ListOptions provides options for filtering and paginating snapshot lists
type ListOptions struct {
FilterByName string // Filter snapshots by name (substring match)
Limit int // Maximum number of snapshots to return
Offset int // Number of snapshots to skip
}
// MetadataStore определяет интерфейс для хранения и извлечения метаданных снапшотов.
type MetadataStore interface {
// SaveSnapshotMetadata сохраняет полные метаданные снапшота, включая список файлов.
@ -41,13 +48,16 @@ type MetadataStore interface {
// Возвращает agate.ErrNotFound, если снапшот не найден.
GetSnapshotMetadata(ctx context.Context, snapshotID string) (*Snapshot, error)
// ListSnapshotsMetadata извлекает краткую информацию обо всех снапшотах.
ListSnapshotsMetadata(ctx context.Context) ([]SnapshotInfo, error)
// ListSnapshotsMetadata извлекает краткую информацию о снапшотах с фильтрацией и пагинацией.
ListSnapshotsMetadata(ctx context.Context, opts ListOptions) ([]SnapshotInfo, error)
// DeleteSnapshotMetadata удаляет метаданные снапшота по его ID.
// Не должен возвращать ошибку, если снапшот не найден.
DeleteSnapshotMetadata(ctx context.Context, snapshotID string) error
// UpdateSnapshotParentID обновляет ParentID для указанного снапшота.
UpdateSnapshotParentID(ctx context.Context, snapshotID, newParentID string) error
// Close закрывает соединение с хранилищем метаданных.
Close() error
}