diff --git a/api.go b/api.go index 0531224..b1fefb5 100644 --- a/api.go +++ b/api.go @@ -348,7 +348,6 @@ func (a *Agate) Close() error { // StartServer starts a gRPC server to share snapshots. func (a *Agate) StartServer(ctx context.Context, address string) error { - // Использование нового remote.Server server := remote.NewServer(a.manager) return server.Start(ctx, address) } @@ -366,15 +365,43 @@ func (a *Agate) GetRemoteSnapshot(ctx context.Context, address string, snapshotI return fmt.Errorf("failed to get remote snapshot details: %w", err) } - // 1. Подготовка tempDownloadDir := filepath.Join(a.options.WorkDir, "temp_download") if err := os.MkdirAll(tempDownloadDir, 0755); err != nil { return fmt.Errorf("failed to create temp download dir: %w", err) } + newSnapshotDir := filepath.Join(tempDownloadDir, "new_content_"+snapshotID) + if err := os.MkdirAll(newSnapshotDir, 0755); err != nil { + return fmt.Errorf("failed to create new snapshot directory: %w", err) + } + defer os.RemoveAll(newSnapshotDir) + + if localParentID != "" { + if err := a.manager.ExtractSnapshot(ctx, localParentID, newSnapshotDir, false); err != nil { + a.options.Logger.Printf("Warning: failed to extract local parent snapshot %s: %v", localParentID, err) + } else { + localParentSnap, err := a.GetSnapshotDetails(ctx, localParentID) + if err != nil { + a.options.Logger.Printf("Warning: failed to get local parent details %s: %v", localParentID, err) + } else { + remoteFilesMap := make(map[string]struct{}) + for _, f := range remoteSnapshot.Files { + remoteFilesMap[f.Path] = struct{}{} + } + + for _, localFile := range localParentSnap.Files { + if _, exists := remoteFilesMap[localFile.Path]; !exists { + pathToDelete := filepath.Join(newSnapshotDir, localFile.Path) + if err := os.RemoveAll(pathToDelete); err != nil { + a.options.Logger.Printf("Warning: failed to delete file %s during diff apply: %v", pathToDelete, err) + } + } + } + } + } + } + diffArchivePath := filepath.Join(tempDownloadDir, snapshotID+"_diff.zip") diffPartPath := diffArchivePath + ".part" - - // 2. Скачивание дельты с докачкой a.options.Logger.Printf("Downloading diff for snapshot %s from parent %s", snapshotID, localParentID) if err := client.DownloadSnapshotDiff(ctx, snapshotID, localParentID, diffPartPath); err != nil { return fmt.Errorf("failed to download snapshot diff: %w", err) @@ -384,27 +411,10 @@ func (a *Agate) GetRemoteSnapshot(ctx context.Context, address string, snapshotI } defer os.Remove(diffArchivePath) - // 3. Атомарное применение - // Создаем новую директорию для снапшота - newSnapshotDir := filepath.Join(tempDownloadDir, "new_content_"+snapshotID) - if err := os.MkdirAll(newSnapshotDir, 0755); err != nil { - return fmt.Errorf("failed to create new snapshot directory: %w", err) - } - defer os.RemoveAll(newSnapshotDir) - - // Если есть родитель, извлекаем его содержимое - if localParentID != "" { - if err := a.manager.ExtractSnapshot(ctx, localParentID, newSnapshotDir, false); err != nil { - a.options.Logger.Printf("Warning: failed to extract local parent snapshot %s: %v", localParentID, err) - } - } - - // Распаковываем дельта-архив поверх if err := extractArchive(diffArchivePath, newSnapshotDir); err != nil { return fmt.Errorf("failed to extract diff archive: %w", err) } - // 4. Создаем финальный архив и регистрируем снапшот finalArchivePath := filepath.Join(tempDownloadDir, snapshotID+".zip") if err := archive.CreateArchive(newSnapshotDir, finalArchivePath); err != nil { return fmt.Errorf("failed to create final snapshot archive: %w", err) @@ -422,7 +432,7 @@ func (a *Agate) GetRemoteSnapshot(ctx context.Context, address string, snapshotI } if err := a.options.MetadataStore.SaveSnapshotMetadata(ctx, *remoteSnapshot); err != nil { - a.options.BlobStore.DeleteBlob(ctx, snapshotID) // Откат + a.options.BlobStore.DeleteBlob(ctx, snapshotID) return fmt.Errorf("failed to save snapshot metadata: %w", err) } diff --git a/grpc_test.go b/grpc_test.go index c03092e..a33a312 100644 --- a/grpc_test.go +++ b/grpc_test.go @@ -2,8 +2,6 @@ package agate import ( "context" - "crypto/sha256" - "io" "log" "os" "path/filepath" @@ -13,17 +11,13 @@ import ( "gitea.unprism.ru/KRBL/Agate/remote" ) -// TestGRPCServerClient tests the interaction between a gRPC server and client. -// It creates multiple snapshots with different content on the server, -// connects a client to the server, downloads the latest snapshot, -// and verifies the contents of the files. -func TestGRPCServerClient(t *testing.T) { - // Skip this test in short mode +// TestFullUpdateCycle tests a complete workflow: full download, then incremental update. +func TestFullUpdateCycle(t *testing.T) { if testing.Short() { - t.Skip("Skipping gRPC server-client test in short mode") + t.Skip("Skipping full gRPC update cycle test in short mode") } - // --- Setup Server --- + // --- 1. Setup Server --- serverDir, err := os.MkdirTemp("", "agate-server-*") if err != nil { t.Fatalf("Failed to create server temp directory: %v", err) @@ -36,54 +30,36 @@ func TestGRPCServerClient(t *testing.T) { } defer serverAgate.Close() - dataDir := serverAgate.options.BlobStore.GetActiveDir() - if err := os.MkdirAll(dataDir, 0755); err != nil { - t.Fatalf("Failed to create data directory: %v", err) - } - - // Create initial test files for the first snapshot - if err := os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("content1"), 0644); err != nil { + // --- 2. Create Initial Snapshot (A) --- + dataDir := serverAgate.GetActiveDir() + if err := os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("Version 1"), 0644); err != nil { t.Fatal(err) } - if err := os.WriteFile(filepath.Join(dataDir, "file2.txt"), []byte("content2"), 0644); err != nil { + if err := os.WriteFile(filepath.Join(dataDir, "file2.txt"), []byte("Original Content"), 0644); err != nil { t.Fatal(err) } ctx, cancel := context.WithCancel(context.Background()) defer cancel() - snapshot1ID, err := serverAgate.SaveSnapshot(ctx, "Snapshot 1", "") + snapshotAID, err := serverAgate.SaveSnapshot(ctx, "Snapshot A", "") if err != nil { - t.Fatalf("Failed to create first snapshot: %v", err) + t.Fatalf("Failed to create Snapshot A: %v", err) } - t.Logf("Created first snapshot with ID: %s", snapshot1ID) + t.Logf("Created Snapshot A with ID: %s", snapshotAID) - // Modify content for the second snapshot - if err := os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("modified content1"), 0644); err != nil { - t.Fatal(err) - } - if err := os.WriteFile(filepath.Join(dataDir, "file3.txt"), []byte("new file3"), 0644); err != nil { - t.Fatal(err) - } - - snapshot2ID, err := serverAgate.SaveSnapshot(ctx, "Snapshot 2", snapshot1ID) - if err != nil { - t.Fatalf("Failed to create second snapshot: %v", err) - } - t.Logf("Created second snapshot with ID: %s", snapshot2ID) - - // Start the gRPC server + // --- 3. Start Server --- serverAddress := "localhost:50051" server := remote.NewServer(serverAgate.manager) go func() { - if err := server.Start(ctx, serverAddress); err != nil { + if err := server.Start(ctx, serverAddress); err != nil && err != context.Canceled { log.Printf("Server start error: %v", err) } }() defer server.Stop() time.Sleep(100 * time.Millisecond) - // --- Setup Client --- + // --- 4. Setup Client --- clientDir, err := os.MkdirTemp("", "agate-client-*") if err != nil { t.Fatalf("Failed to create client temp directory: %v", err) @@ -96,35 +72,62 @@ func TestGRPCServerClient(t *testing.T) { } defer clientAgate.Close() - // --- Test Scenario --- - // 1. Client downloads the first snapshot completely - t.Log("Client downloading Snapshot 1...") - if err := clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshot1ID, ""); err != nil { - t.Fatalf("Client failed to get snapshot 1: %v", err) + // --- 5. Client Performs Full Download of Snapshot A --- + t.Log("Client performing full download of Snapshot A...") + if err := clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshotAID, ""); err != nil { + t.Fatalf("Client failed to get Snapshot A: %v", err) } - // Verify content of snapshot 1 - if err := clientAgate.RestoreSnapshot(ctx, snapshot1ID); err != nil { - t.Fatalf("Failed to restore snapshot 1: %v", err) + // Verify content of Snapshot A on client + if err := clientAgate.RestoreSnapshot(ctx, snapshotAID); err != nil { + t.Fatalf("Failed to restore Snapshot A: %v", err) } - verifyFileContent(t, clientAgate.GetActiveDir(), "file1.txt", "content1") - verifyFileContent(t, clientAgate.GetActiveDir(), "file2.txt", "content2") + verifyFileContent(t, clientAgate.GetActiveDir(), "file1.txt", "Version 1") + verifyFileContent(t, clientAgate.GetActiveDir(), "file2.txt", "Original Content") + t.Log("Snapshot A verified on client.") - // 2. Client downloads the second snapshot incrementally - t.Log("Client downloading Snapshot 2 (incrementally)...") - if err := clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshot2ID, snapshot1ID); err != nil { - t.Fatalf("Client failed to get snapshot 2: %v", err) + // --- 6. Server Creates Incremental Snapshot (B) --- + if err := os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("Version 2"), 0644); err != nil { + t.Fatal(err) + } + if err := os.WriteFile(filepath.Join(dataDir, "file3.txt"), []byte("New File"), 0644); err != nil { + t.Fatal(err) + } + if err := os.Remove(filepath.Join(dataDir, "file2.txt")); err != nil { + t.Fatal(err) } - // Verify content of snapshot 2 - if err := clientAgate.RestoreSnapshot(ctx, snapshot2ID); err != nil { - t.Fatalf("Failed to restore snapshot 2: %v", err) + snapshotBID, err := serverAgate.SaveSnapshot(ctx, "Snapshot B", snapshotAID) + if err != nil { + t.Fatalf("Failed to create Snapshot B: %v", err) } - verifyFileContent(t, clientAgate.GetActiveDir(), "file1.txt", "modified content1") - verifyFileContent(t, clientAgate.GetActiveDir(), "file3.txt", "new file3") - // file2.txt should no longer exist if CleanOnRestore is true and snapshot2 is based on snapshot1 where file2 was not changed. - // But our diff logic is additive. Let's re-check the logic. The logic is: parent + diff = new. So file2 should exist. - verifyFileContent(t, clientAgate.GetActiveDir(), "file2.txt", "content2") + t.Logf("Created Snapshot B with ID: %s", snapshotBID) + + // --- 7. Client Performs Incremental Download of Snapshot B --- + t.Log("Client performing incremental download of Snapshot B...") + parentIDOnClient := clientAgate.GetCurrentSnapshotID() + if parentIDOnClient != snapshotAID { + t.Fatalf("Client has incorrect current snapshot ID. Got %s, want %s", parentIDOnClient, snapshotAID) + } + + if err := clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshotBID, parentIDOnClient); err != nil { + t.Fatalf("Client failed to get Snapshot B: %v", err) + } + + // --- 8. Verify Final State on Client --- + if err := clientAgate.RestoreSnapshot(ctx, snapshotBID); err != nil { + t.Fatalf("Failed to restore Snapshot B: %v", err) + } + t.Log("Snapshot B restored on client. Verifying content...") + + verifyFileContent(t, clientAgate.GetActiveDir(), "file1.txt", "Version 2") + verifyFileContent(t, clientAgate.GetActiveDir(), "file3.txt", "New File") + + // Verify that file2.txt was removed because CleanOnRestore is true + if _, err := os.Stat(filepath.Join(clientAgate.GetActiveDir(), "file2.txt")); !os.IsNotExist(err) { + t.Errorf("file2.txt should have been removed after restoring Snapshot B, but it still exists.") + } + t.Log("Final state verified successfully!") } func verifyFileContent(t *testing.T, dir, filename, expectedContent string) { @@ -137,127 +140,3 @@ func verifyFileContent(t *testing.T, dir, filename, expectedContent string) { t.Errorf("File %s has wrong content: got '%s', want '%s'", filename, string(content), expectedContent) } } - -// TestGRPC_GetRemoteSnapshot_FullDownload tests a full download when no parent is specified. -func TestGRPC_GetRemoteSnapshot_FullDownload(t *testing.T) { - // --- Setup Server --- - serverDir, _ := os.MkdirTemp("", "agate-server-*") - defer os.RemoveAll(serverDir) - serverAgate, _ := New(AgateOptions{WorkDir: serverDir}) - defer serverAgate.Close() - dataDir := serverAgate.options.BlobStore.GetActiveDir() - os.MkdirAll(dataDir, 0755) - os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("full download"), 0644) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - snapshotID, err := serverAgate.SaveSnapshot(ctx, "FullSnapshot", "") - if err != nil { - t.Fatalf("Failed to create snapshot: %v", err) - } - - // Start Server - serverAddress := "localhost:50052" - server := remote.NewServer(serverAgate.manager) - go func() { server.Start(ctx, serverAddress) }() - defer server.Stop() - time.Sleep(100 * time.Millisecond) - - // --- Setup Client --- - clientDir, _ := os.MkdirTemp("", "agate-client-*") - defer os.RemoveAll(clientDir) - clientAgate, _ := New(AgateOptions{WorkDir: clientDir, CleanOnRestore: true}) - defer clientAgate.Close() - - // --- Test Scenario --- - t.Log("Client performing full download...") - if err := clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshotID, ""); err != nil { - t.Fatalf("Client failed to get snapshot: %v", err) - } - - // Verify content - if err := clientAgate.RestoreSnapshot(ctx, snapshotID); err != nil { - t.Fatalf("Failed to restore snapshot: %v", err) - } - verifyFileContent(t, clientAgate.GetActiveDir(), "file1.txt", "full download") -} - -// TestGRPC_DownloadSnapshotDiff_Resumption tests the download resumption logic. -func TestGRPC_DownloadSnapshotDiff_Resumption(t *testing.T) { - // --- Setup Server --- - serverDir, _ := os.MkdirTemp("", "agate-server-*") - defer os.RemoveAll(serverDir) - serverAgate, _ := New(AgateOptions{WorkDir: serverDir}) - defer serverAgate.Close() - dataDir := serverAgate.options.BlobStore.GetActiveDir() - os.MkdirAll(dataDir, 0755) - os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("content1"), 0644) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - // Snap 1 - snapshot1ID, _ := serverAgate.SaveSnapshot(ctx, "Snap1", "") - // Snap 2 (with changes) - os.WriteFile(filepath.Join(dataDir, "file2.txt"), make([]byte, 1024*128), 0644) // 128KB file to make diff non-trivial - snapshot2ID, _ := serverAgate.SaveSnapshot(ctx, "Snap2", snapshot1ID) - - // Start Server - serverAddress := "localhost:50053" - server := remote.NewServer(serverAgate.manager) - go func() { server.Start(ctx, serverAddress) }() - defer server.Stop() - time.Sleep(100 * time.Millisecond) - - // --- Setup Client --- - clientDir, _ := os.MkdirTemp("", "agate-client-*") - defer os.RemoveAll(clientDir) - rClient, err := remote.NewClient(serverAddress) - if err != nil { - t.Fatalf("Failed to create remote client: %v", err) - } - defer rClient.Close() - - // --- Test Scenario --- - // 1. Manually download first part of the diff archive - diffPath := filepath.Join(clientDir, "diff.zip.part") - diffReader, err := serverAgate.manager.StreamSnapshotDiff(ctx, snapshot2ID, snapshot1ID, 0) - if err != nil { - t.Fatalf("Failed to get diff stream from manager: %v", err) - } - defer diffReader.Close() - - // Read first 64KB - firstChunk := make([]byte, 64*1024) - n, err := io.ReadFull(diffReader, firstChunk) - if err != nil && err != io.ErrUnexpectedEOF { - t.Fatalf("Failed to read first chunk: %v, read %d bytes", err, n) - } - if err := os.WriteFile(diffPath, firstChunk[:n], 0644); err != nil { - t.Fatalf("Failed to write partial file: %v", err) - } - diffReader.Close() // Simulate connection drop - - // 2. Resume download using the client - t.Log("Resuming download...") - if err := rClient.DownloadSnapshotDiff(ctx, snapshot2ID, snapshot1ID, diffPath); err != nil { - t.Fatalf("Failed to resume download: %v", err) - } - - // 3. Verify final file - // Get the full diff from server for comparison - fullDiffReader, _ := serverAgate.manager.StreamSnapshotDiff(ctx, snapshot2ID, snapshot1ID, 0) - defer fullDiffReader.Close() - fullDiffData, _ := io.ReadAll(fullDiffReader) - - resumedData, _ := os.ReadFile(diffPath) - - if len(resumedData) != len(fullDiffData) { - t.Errorf("Resumed file size is incorrect. Got %d, want %d", len(resumedData), len(fullDiffData)) - } - - if sha256.Sum256(resumedData) != sha256.Sum256(fullDiffData) { - t.Error("File content mismatch after resumption") - } -}