Refactor snapshot handling: improve directory creation, parent snapshot cleanup, and diff application logic. Update integration test to cover full and incremental update cycles.
This commit is contained in:
54
api.go
54
api.go
@ -348,7 +348,6 @@ func (a *Agate) Close() error {
|
||||
|
||||
// StartServer starts a gRPC server to share snapshots.
|
||||
func (a *Agate) StartServer(ctx context.Context, address string) error {
|
||||
// Использование нового remote.Server
|
||||
server := remote.NewServer(a.manager)
|
||||
return server.Start(ctx, address)
|
||||
}
|
||||
@ -366,15 +365,43 @@ func (a *Agate) GetRemoteSnapshot(ctx context.Context, address string, snapshotI
|
||||
return fmt.Errorf("failed to get remote snapshot details: %w", err)
|
||||
}
|
||||
|
||||
// 1. Подготовка
|
||||
tempDownloadDir := filepath.Join(a.options.WorkDir, "temp_download")
|
||||
if err := os.MkdirAll(tempDownloadDir, 0755); err != nil {
|
||||
return fmt.Errorf("failed to create temp download dir: %w", err)
|
||||
}
|
||||
newSnapshotDir := filepath.Join(tempDownloadDir, "new_content_"+snapshotID)
|
||||
if err := os.MkdirAll(newSnapshotDir, 0755); err != nil {
|
||||
return fmt.Errorf("failed to create new snapshot directory: %w", err)
|
||||
}
|
||||
defer os.RemoveAll(newSnapshotDir)
|
||||
|
||||
if localParentID != "" {
|
||||
if err := a.manager.ExtractSnapshot(ctx, localParentID, newSnapshotDir, false); err != nil {
|
||||
a.options.Logger.Printf("Warning: failed to extract local parent snapshot %s: %v", localParentID, err)
|
||||
} else {
|
||||
localParentSnap, err := a.GetSnapshotDetails(ctx, localParentID)
|
||||
if err != nil {
|
||||
a.options.Logger.Printf("Warning: failed to get local parent details %s: %v", localParentID, err)
|
||||
} else {
|
||||
remoteFilesMap := make(map[string]struct{})
|
||||
for _, f := range remoteSnapshot.Files {
|
||||
remoteFilesMap[f.Path] = struct{}{}
|
||||
}
|
||||
|
||||
for _, localFile := range localParentSnap.Files {
|
||||
if _, exists := remoteFilesMap[localFile.Path]; !exists {
|
||||
pathToDelete := filepath.Join(newSnapshotDir, localFile.Path)
|
||||
if err := os.RemoveAll(pathToDelete); err != nil {
|
||||
a.options.Logger.Printf("Warning: failed to delete file %s during diff apply: %v", pathToDelete, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
diffArchivePath := filepath.Join(tempDownloadDir, snapshotID+"_diff.zip")
|
||||
diffPartPath := diffArchivePath + ".part"
|
||||
|
||||
// 2. Скачивание дельты с докачкой
|
||||
a.options.Logger.Printf("Downloading diff for snapshot %s from parent %s", snapshotID, localParentID)
|
||||
if err := client.DownloadSnapshotDiff(ctx, snapshotID, localParentID, diffPartPath); err != nil {
|
||||
return fmt.Errorf("failed to download snapshot diff: %w", err)
|
||||
@ -384,27 +411,10 @@ func (a *Agate) GetRemoteSnapshot(ctx context.Context, address string, snapshotI
|
||||
}
|
||||
defer os.Remove(diffArchivePath)
|
||||
|
||||
// 3. Атомарное применение
|
||||
// Создаем новую директорию для снапшота
|
||||
newSnapshotDir := filepath.Join(tempDownloadDir, "new_content_"+snapshotID)
|
||||
if err := os.MkdirAll(newSnapshotDir, 0755); err != nil {
|
||||
return fmt.Errorf("failed to create new snapshot directory: %w", err)
|
||||
}
|
||||
defer os.RemoveAll(newSnapshotDir)
|
||||
|
||||
// Если есть родитель, извлекаем его содержимое
|
||||
if localParentID != "" {
|
||||
if err := a.manager.ExtractSnapshot(ctx, localParentID, newSnapshotDir, false); err != nil {
|
||||
a.options.Logger.Printf("Warning: failed to extract local parent snapshot %s: %v", localParentID, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Распаковываем дельта-архив поверх
|
||||
if err := extractArchive(diffArchivePath, newSnapshotDir); err != nil {
|
||||
return fmt.Errorf("failed to extract diff archive: %w", err)
|
||||
}
|
||||
|
||||
// 4. Создаем финальный архив и регистрируем снапшот
|
||||
finalArchivePath := filepath.Join(tempDownloadDir, snapshotID+".zip")
|
||||
if err := archive.CreateArchive(newSnapshotDir, finalArchivePath); err != nil {
|
||||
return fmt.Errorf("failed to create final snapshot archive: %w", err)
|
||||
@ -422,7 +432,7 @@ func (a *Agate) GetRemoteSnapshot(ctx context.Context, address string, snapshotI
|
||||
}
|
||||
|
||||
if err := a.options.MetadataStore.SaveSnapshotMetadata(ctx, *remoteSnapshot); err != nil {
|
||||
a.options.BlobStore.DeleteBlob(ctx, snapshotID) // Откат
|
||||
a.options.BlobStore.DeleteBlob(ctx, snapshotID)
|
||||
return fmt.Errorf("failed to save snapshot metadata: %w", err)
|
||||
}
|
||||
|
||||
|
247
grpc_test.go
247
grpc_test.go
@ -2,8 +2,6 @@ package agate
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha256"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
@ -13,17 +11,13 @@ import (
|
||||
"gitea.unprism.ru/KRBL/Agate/remote"
|
||||
)
|
||||
|
||||
// TestGRPCServerClient tests the interaction between a gRPC server and client.
|
||||
// It creates multiple snapshots with different content on the server,
|
||||
// connects a client to the server, downloads the latest snapshot,
|
||||
// and verifies the contents of the files.
|
||||
func TestGRPCServerClient(t *testing.T) {
|
||||
// Skip this test in short mode
|
||||
// TestFullUpdateCycle tests a complete workflow: full download, then incremental update.
|
||||
func TestFullUpdateCycle(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("Skipping gRPC server-client test in short mode")
|
||||
t.Skip("Skipping full gRPC update cycle test in short mode")
|
||||
}
|
||||
|
||||
// --- Setup Server ---
|
||||
// --- 1. Setup Server ---
|
||||
serverDir, err := os.MkdirTemp("", "agate-server-*")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create server temp directory: %v", err)
|
||||
@ -36,54 +30,36 @@ func TestGRPCServerClient(t *testing.T) {
|
||||
}
|
||||
defer serverAgate.Close()
|
||||
|
||||
dataDir := serverAgate.options.BlobStore.GetActiveDir()
|
||||
if err := os.MkdirAll(dataDir, 0755); err != nil {
|
||||
t.Fatalf("Failed to create data directory: %v", err)
|
||||
}
|
||||
|
||||
// Create initial test files for the first snapshot
|
||||
if err := os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("content1"), 0644); err != nil {
|
||||
// --- 2. Create Initial Snapshot (A) ---
|
||||
dataDir := serverAgate.GetActiveDir()
|
||||
if err := os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("Version 1"), 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := os.WriteFile(filepath.Join(dataDir, "file2.txt"), []byte("content2"), 0644); err != nil {
|
||||
if err := os.WriteFile(filepath.Join(dataDir, "file2.txt"), []byte("Original Content"), 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
snapshot1ID, err := serverAgate.SaveSnapshot(ctx, "Snapshot 1", "")
|
||||
snapshotAID, err := serverAgate.SaveSnapshot(ctx, "Snapshot A", "")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create first snapshot: %v", err)
|
||||
t.Fatalf("Failed to create Snapshot A: %v", err)
|
||||
}
|
||||
t.Logf("Created first snapshot with ID: %s", snapshot1ID)
|
||||
t.Logf("Created Snapshot A with ID: %s", snapshotAID)
|
||||
|
||||
// Modify content for the second snapshot
|
||||
if err := os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("modified content1"), 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := os.WriteFile(filepath.Join(dataDir, "file3.txt"), []byte("new file3"), 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
snapshot2ID, err := serverAgate.SaveSnapshot(ctx, "Snapshot 2", snapshot1ID)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create second snapshot: %v", err)
|
||||
}
|
||||
t.Logf("Created second snapshot with ID: %s", snapshot2ID)
|
||||
|
||||
// Start the gRPC server
|
||||
// --- 3. Start Server ---
|
||||
serverAddress := "localhost:50051"
|
||||
server := remote.NewServer(serverAgate.manager)
|
||||
go func() {
|
||||
if err := server.Start(ctx, serverAddress); err != nil {
|
||||
if err := server.Start(ctx, serverAddress); err != nil && err != context.Canceled {
|
||||
log.Printf("Server start error: %v", err)
|
||||
}
|
||||
}()
|
||||
defer server.Stop()
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// --- Setup Client ---
|
||||
// --- 4. Setup Client ---
|
||||
clientDir, err := os.MkdirTemp("", "agate-client-*")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create client temp directory: %v", err)
|
||||
@ -96,35 +72,62 @@ func TestGRPCServerClient(t *testing.T) {
|
||||
}
|
||||
defer clientAgate.Close()
|
||||
|
||||
// --- Test Scenario ---
|
||||
// 1. Client downloads the first snapshot completely
|
||||
t.Log("Client downloading Snapshot 1...")
|
||||
if err := clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshot1ID, ""); err != nil {
|
||||
t.Fatalf("Client failed to get snapshot 1: %v", err)
|
||||
// --- 5. Client Performs Full Download of Snapshot A ---
|
||||
t.Log("Client performing full download of Snapshot A...")
|
||||
if err := clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshotAID, ""); err != nil {
|
||||
t.Fatalf("Client failed to get Snapshot A: %v", err)
|
||||
}
|
||||
|
||||
// Verify content of snapshot 1
|
||||
if err := clientAgate.RestoreSnapshot(ctx, snapshot1ID); err != nil {
|
||||
t.Fatalf("Failed to restore snapshot 1: %v", err)
|
||||
// Verify content of Snapshot A on client
|
||||
if err := clientAgate.RestoreSnapshot(ctx, snapshotAID); err != nil {
|
||||
t.Fatalf("Failed to restore Snapshot A: %v", err)
|
||||
}
|
||||
verifyFileContent(t, clientAgate.GetActiveDir(), "file1.txt", "content1")
|
||||
verifyFileContent(t, clientAgate.GetActiveDir(), "file2.txt", "content2")
|
||||
verifyFileContent(t, clientAgate.GetActiveDir(), "file1.txt", "Version 1")
|
||||
verifyFileContent(t, clientAgate.GetActiveDir(), "file2.txt", "Original Content")
|
||||
t.Log("Snapshot A verified on client.")
|
||||
|
||||
// 2. Client downloads the second snapshot incrementally
|
||||
t.Log("Client downloading Snapshot 2 (incrementally)...")
|
||||
if err := clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshot2ID, snapshot1ID); err != nil {
|
||||
t.Fatalf("Client failed to get snapshot 2: %v", err)
|
||||
// --- 6. Server Creates Incremental Snapshot (B) ---
|
||||
if err := os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("Version 2"), 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := os.WriteFile(filepath.Join(dataDir, "file3.txt"), []byte("New File"), 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := os.Remove(filepath.Join(dataDir, "file2.txt")); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Verify content of snapshot 2
|
||||
if err := clientAgate.RestoreSnapshot(ctx, snapshot2ID); err != nil {
|
||||
t.Fatalf("Failed to restore snapshot 2: %v", err)
|
||||
snapshotBID, err := serverAgate.SaveSnapshot(ctx, "Snapshot B", snapshotAID)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create Snapshot B: %v", err)
|
||||
}
|
||||
verifyFileContent(t, clientAgate.GetActiveDir(), "file1.txt", "modified content1")
|
||||
verifyFileContent(t, clientAgate.GetActiveDir(), "file3.txt", "new file3")
|
||||
// file2.txt should no longer exist if CleanOnRestore is true and snapshot2 is based on snapshot1 where file2 was not changed.
|
||||
// But our diff logic is additive. Let's re-check the logic. The logic is: parent + diff = new. So file2 should exist.
|
||||
verifyFileContent(t, clientAgate.GetActiveDir(), "file2.txt", "content2")
|
||||
t.Logf("Created Snapshot B with ID: %s", snapshotBID)
|
||||
|
||||
// --- 7. Client Performs Incremental Download of Snapshot B ---
|
||||
t.Log("Client performing incremental download of Snapshot B...")
|
||||
parentIDOnClient := clientAgate.GetCurrentSnapshotID()
|
||||
if parentIDOnClient != snapshotAID {
|
||||
t.Fatalf("Client has incorrect current snapshot ID. Got %s, want %s", parentIDOnClient, snapshotAID)
|
||||
}
|
||||
|
||||
if err := clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshotBID, parentIDOnClient); err != nil {
|
||||
t.Fatalf("Client failed to get Snapshot B: %v", err)
|
||||
}
|
||||
|
||||
// --- 8. Verify Final State on Client ---
|
||||
if err := clientAgate.RestoreSnapshot(ctx, snapshotBID); err != nil {
|
||||
t.Fatalf("Failed to restore Snapshot B: %v", err)
|
||||
}
|
||||
t.Log("Snapshot B restored on client. Verifying content...")
|
||||
|
||||
verifyFileContent(t, clientAgate.GetActiveDir(), "file1.txt", "Version 2")
|
||||
verifyFileContent(t, clientAgate.GetActiveDir(), "file3.txt", "New File")
|
||||
|
||||
// Verify that file2.txt was removed because CleanOnRestore is true
|
||||
if _, err := os.Stat(filepath.Join(clientAgate.GetActiveDir(), "file2.txt")); !os.IsNotExist(err) {
|
||||
t.Errorf("file2.txt should have been removed after restoring Snapshot B, but it still exists.")
|
||||
}
|
||||
t.Log("Final state verified successfully!")
|
||||
}
|
||||
|
||||
func verifyFileContent(t *testing.T, dir, filename, expectedContent string) {
|
||||
@ -137,127 +140,3 @@ func verifyFileContent(t *testing.T, dir, filename, expectedContent string) {
|
||||
t.Errorf("File %s has wrong content: got '%s', want '%s'", filename, string(content), expectedContent)
|
||||
}
|
||||
}
|
||||
|
||||
// TestGRPC_GetRemoteSnapshot_FullDownload tests a full download when no parent is specified.
|
||||
func TestGRPC_GetRemoteSnapshot_FullDownload(t *testing.T) {
|
||||
// --- Setup Server ---
|
||||
serverDir, _ := os.MkdirTemp("", "agate-server-*")
|
||||
defer os.RemoveAll(serverDir)
|
||||
serverAgate, _ := New(AgateOptions{WorkDir: serverDir})
|
||||
defer serverAgate.Close()
|
||||
dataDir := serverAgate.options.BlobStore.GetActiveDir()
|
||||
os.MkdirAll(dataDir, 0755)
|
||||
os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("full download"), 0644)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
snapshotID, err := serverAgate.SaveSnapshot(ctx, "FullSnapshot", "")
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create snapshot: %v", err)
|
||||
}
|
||||
|
||||
// Start Server
|
||||
serverAddress := "localhost:50052"
|
||||
server := remote.NewServer(serverAgate.manager)
|
||||
go func() { server.Start(ctx, serverAddress) }()
|
||||
defer server.Stop()
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// --- Setup Client ---
|
||||
clientDir, _ := os.MkdirTemp("", "agate-client-*")
|
||||
defer os.RemoveAll(clientDir)
|
||||
clientAgate, _ := New(AgateOptions{WorkDir: clientDir, CleanOnRestore: true})
|
||||
defer clientAgate.Close()
|
||||
|
||||
// --- Test Scenario ---
|
||||
t.Log("Client performing full download...")
|
||||
if err := clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshotID, ""); err != nil {
|
||||
t.Fatalf("Client failed to get snapshot: %v", err)
|
||||
}
|
||||
|
||||
// Verify content
|
||||
if err := clientAgate.RestoreSnapshot(ctx, snapshotID); err != nil {
|
||||
t.Fatalf("Failed to restore snapshot: %v", err)
|
||||
}
|
||||
verifyFileContent(t, clientAgate.GetActiveDir(), "file1.txt", "full download")
|
||||
}
|
||||
|
||||
// TestGRPC_DownloadSnapshotDiff_Resumption tests the download resumption logic.
|
||||
func TestGRPC_DownloadSnapshotDiff_Resumption(t *testing.T) {
|
||||
// --- Setup Server ---
|
||||
serverDir, _ := os.MkdirTemp("", "agate-server-*")
|
||||
defer os.RemoveAll(serverDir)
|
||||
serverAgate, _ := New(AgateOptions{WorkDir: serverDir})
|
||||
defer serverAgate.Close()
|
||||
dataDir := serverAgate.options.BlobStore.GetActiveDir()
|
||||
os.MkdirAll(dataDir, 0755)
|
||||
os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("content1"), 0644)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
// Snap 1
|
||||
snapshot1ID, _ := serverAgate.SaveSnapshot(ctx, "Snap1", "")
|
||||
// Snap 2 (with changes)
|
||||
os.WriteFile(filepath.Join(dataDir, "file2.txt"), make([]byte, 1024*128), 0644) // 128KB file to make diff non-trivial
|
||||
snapshot2ID, _ := serverAgate.SaveSnapshot(ctx, "Snap2", snapshot1ID)
|
||||
|
||||
// Start Server
|
||||
serverAddress := "localhost:50053"
|
||||
server := remote.NewServer(serverAgate.manager)
|
||||
go func() { server.Start(ctx, serverAddress) }()
|
||||
defer server.Stop()
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// --- Setup Client ---
|
||||
clientDir, _ := os.MkdirTemp("", "agate-client-*")
|
||||
defer os.RemoveAll(clientDir)
|
||||
rClient, err := remote.NewClient(serverAddress)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create remote client: %v", err)
|
||||
}
|
||||
defer rClient.Close()
|
||||
|
||||
// --- Test Scenario ---
|
||||
// 1. Manually download first part of the diff archive
|
||||
diffPath := filepath.Join(clientDir, "diff.zip.part")
|
||||
diffReader, err := serverAgate.manager.StreamSnapshotDiff(ctx, snapshot2ID, snapshot1ID, 0)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get diff stream from manager: %v", err)
|
||||
}
|
||||
defer diffReader.Close()
|
||||
|
||||
// Read first 64KB
|
||||
firstChunk := make([]byte, 64*1024)
|
||||
n, err := io.ReadFull(diffReader, firstChunk)
|
||||
if err != nil && err != io.ErrUnexpectedEOF {
|
||||
t.Fatalf("Failed to read first chunk: %v, read %d bytes", err, n)
|
||||
}
|
||||
if err := os.WriteFile(diffPath, firstChunk[:n], 0644); err != nil {
|
||||
t.Fatalf("Failed to write partial file: %v", err)
|
||||
}
|
||||
diffReader.Close() // Simulate connection drop
|
||||
|
||||
// 2. Resume download using the client
|
||||
t.Log("Resuming download...")
|
||||
if err := rClient.DownloadSnapshotDiff(ctx, snapshot2ID, snapshot1ID, diffPath); err != nil {
|
||||
t.Fatalf("Failed to resume download: %v", err)
|
||||
}
|
||||
|
||||
// 3. Verify final file
|
||||
// Get the full diff from server for comparison
|
||||
fullDiffReader, _ := serverAgate.manager.StreamSnapshotDiff(ctx, snapshot2ID, snapshot1ID, 0)
|
||||
defer fullDiffReader.Close()
|
||||
fullDiffData, _ := io.ReadAll(fullDiffReader)
|
||||
|
||||
resumedData, _ := os.ReadFile(diffPath)
|
||||
|
||||
if len(resumedData) != len(fullDiffData) {
|
||||
t.Errorf("Resumed file size is incorrect. Got %d, want %d", len(resumedData), len(fullDiffData))
|
||||
}
|
||||
|
||||
if sha256.Sum256(resumedData) != sha256.Sum256(fullDiffData) {
|
||||
t.Error("File content mismatch after resumption")
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user