1 Commits

2 changed files with 95 additions and 206 deletions

54
api.go
View File

@ -348,7 +348,6 @@ func (a *Agate) Close() error {
// StartServer starts a gRPC server to share snapshots.
func (a *Agate) StartServer(ctx context.Context, address string) error {
// Использование нового remote.Server
server := remote.NewServer(a.manager)
return server.Start(ctx, address)
}
@ -366,15 +365,43 @@ func (a *Agate) GetRemoteSnapshot(ctx context.Context, address string, snapshotI
return fmt.Errorf("failed to get remote snapshot details: %w", err)
}
// 1. Подготовка
tempDownloadDir := filepath.Join(a.options.WorkDir, "temp_download")
if err := os.MkdirAll(tempDownloadDir, 0755); err != nil {
return fmt.Errorf("failed to create temp download dir: %w", err)
}
newSnapshotDir := filepath.Join(tempDownloadDir, "new_content_"+snapshotID)
if err := os.MkdirAll(newSnapshotDir, 0755); err != nil {
return fmt.Errorf("failed to create new snapshot directory: %w", err)
}
defer os.RemoveAll(newSnapshotDir)
if localParentID != "" {
if err := a.manager.ExtractSnapshot(ctx, localParentID, newSnapshotDir, false); err != nil {
a.options.Logger.Printf("Warning: failed to extract local parent snapshot %s: %v", localParentID, err)
} else {
localParentSnap, err := a.GetSnapshotDetails(ctx, localParentID)
if err != nil {
a.options.Logger.Printf("Warning: failed to get local parent details %s: %v", localParentID, err)
} else {
remoteFilesMap := make(map[string]struct{})
for _, f := range remoteSnapshot.Files {
remoteFilesMap[f.Path] = struct{}{}
}
for _, localFile := range localParentSnap.Files {
if _, exists := remoteFilesMap[localFile.Path]; !exists {
pathToDelete := filepath.Join(newSnapshotDir, localFile.Path)
if err := os.RemoveAll(pathToDelete); err != nil {
a.options.Logger.Printf("Warning: failed to delete file %s during diff apply: %v", pathToDelete, err)
}
}
}
}
}
}
diffArchivePath := filepath.Join(tempDownloadDir, snapshotID+"_diff.zip")
diffPartPath := diffArchivePath + ".part"
// 2. Скачивание дельты с докачкой
a.options.Logger.Printf("Downloading diff for snapshot %s from parent %s", snapshotID, localParentID)
if err := client.DownloadSnapshotDiff(ctx, snapshotID, localParentID, diffPartPath); err != nil {
return fmt.Errorf("failed to download snapshot diff: %w", err)
@ -384,27 +411,10 @@ func (a *Agate) GetRemoteSnapshot(ctx context.Context, address string, snapshotI
}
defer os.Remove(diffArchivePath)
// 3. Атомарное применение
// Создаем новую директорию для снапшота
newSnapshotDir := filepath.Join(tempDownloadDir, "new_content_"+snapshotID)
if err := os.MkdirAll(newSnapshotDir, 0755); err != nil {
return fmt.Errorf("failed to create new snapshot directory: %w", err)
}
defer os.RemoveAll(newSnapshotDir)
// Если есть родитель, извлекаем его содержимое
if localParentID != "" {
if err := a.manager.ExtractSnapshot(ctx, localParentID, newSnapshotDir, false); err != nil {
a.options.Logger.Printf("Warning: failed to extract local parent snapshot %s: %v", localParentID, err)
}
}
// Распаковываем дельта-архив поверх
if err := extractArchive(diffArchivePath, newSnapshotDir); err != nil {
return fmt.Errorf("failed to extract diff archive: %w", err)
}
// 4. Создаем финальный архив и регистрируем снапшот
finalArchivePath := filepath.Join(tempDownloadDir, snapshotID+".zip")
if err := archive.CreateArchive(newSnapshotDir, finalArchivePath); err != nil {
return fmt.Errorf("failed to create final snapshot archive: %w", err)
@ -422,7 +432,7 @@ func (a *Agate) GetRemoteSnapshot(ctx context.Context, address string, snapshotI
}
if err := a.options.MetadataStore.SaveSnapshotMetadata(ctx, *remoteSnapshot); err != nil {
a.options.BlobStore.DeleteBlob(ctx, snapshotID) // Откат
a.options.BlobStore.DeleteBlob(ctx, snapshotID)
return fmt.Errorf("failed to save snapshot metadata: %w", err)
}

View File

@ -2,8 +2,6 @@ package agate
import (
"context"
"crypto/sha256"
"io"
"log"
"os"
"path/filepath"
@ -13,17 +11,13 @@ import (
"gitea.unprism.ru/KRBL/Agate/remote"
)
// TestGRPCServerClient tests the interaction between a gRPC server and client.
// It creates multiple snapshots with different content on the server,
// connects a client to the server, downloads the latest snapshot,
// and verifies the contents of the files.
func TestGRPCServerClient(t *testing.T) {
// Skip this test in short mode
// TestFullUpdateCycle tests a complete workflow: full download, then incremental update.
func TestFullUpdateCycle(t *testing.T) {
if testing.Short() {
t.Skip("Skipping gRPC server-client test in short mode")
t.Skip("Skipping full gRPC update cycle test in short mode")
}
// --- Setup Server ---
// --- 1. Setup Server ---
serverDir, err := os.MkdirTemp("", "agate-server-*")
if err != nil {
t.Fatalf("Failed to create server temp directory: %v", err)
@ -36,54 +30,36 @@ func TestGRPCServerClient(t *testing.T) {
}
defer serverAgate.Close()
dataDir := serverAgate.options.BlobStore.GetActiveDir()
if err := os.MkdirAll(dataDir, 0755); err != nil {
t.Fatalf("Failed to create data directory: %v", err)
}
// Create initial test files for the first snapshot
if err := os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("content1"), 0644); err != nil {
// --- 2. Create Initial Snapshot (A) ---
dataDir := serverAgate.GetActiveDir()
if err := os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("Version 1"), 0644); err != nil {
t.Fatal(err)
}
if err := os.WriteFile(filepath.Join(dataDir, "file2.txt"), []byte("content2"), 0644); err != nil {
if err := os.WriteFile(filepath.Join(dataDir, "file2.txt"), []byte("Original Content"), 0644); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
snapshot1ID, err := serverAgate.SaveSnapshot(ctx, "Snapshot 1", "")
snapshotAID, err := serverAgate.SaveSnapshot(ctx, "Snapshot A", "")
if err != nil {
t.Fatalf("Failed to create first snapshot: %v", err)
t.Fatalf("Failed to create Snapshot A: %v", err)
}
t.Logf("Created first snapshot with ID: %s", snapshot1ID)
t.Logf("Created Snapshot A with ID: %s", snapshotAID)
// Modify content for the second snapshot
if err := os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("modified content1"), 0644); err != nil {
t.Fatal(err)
}
if err := os.WriteFile(filepath.Join(dataDir, "file3.txt"), []byte("new file3"), 0644); err != nil {
t.Fatal(err)
}
snapshot2ID, err := serverAgate.SaveSnapshot(ctx, "Snapshot 2", snapshot1ID)
if err != nil {
t.Fatalf("Failed to create second snapshot: %v", err)
}
t.Logf("Created second snapshot with ID: %s", snapshot2ID)
// Start the gRPC server
// --- 3. Start Server ---
serverAddress := "localhost:50051"
server := remote.NewServer(serverAgate.manager)
go func() {
if err := server.Start(ctx, serverAddress); err != nil {
if err := server.Start(ctx, serverAddress); err != nil && err != context.Canceled {
log.Printf("Server start error: %v", err)
}
}()
defer server.Stop()
time.Sleep(100 * time.Millisecond)
// --- Setup Client ---
// --- 4. Setup Client ---
clientDir, err := os.MkdirTemp("", "agate-client-*")
if err != nil {
t.Fatalf("Failed to create client temp directory: %v", err)
@ -96,35 +72,62 @@ func TestGRPCServerClient(t *testing.T) {
}
defer clientAgate.Close()
// --- Test Scenario ---
// 1. Client downloads the first snapshot completely
t.Log("Client downloading Snapshot 1...")
if err := clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshot1ID, ""); err != nil {
t.Fatalf("Client failed to get snapshot 1: %v", err)
// --- 5. Client Performs Full Download of Snapshot A ---
t.Log("Client performing full download of Snapshot A...")
if err := clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshotAID, ""); err != nil {
t.Fatalf("Client failed to get Snapshot A: %v", err)
}
// Verify content of snapshot 1
if err := clientAgate.RestoreSnapshot(ctx, snapshot1ID); err != nil {
t.Fatalf("Failed to restore snapshot 1: %v", err)
// Verify content of Snapshot A on client
if err := clientAgate.RestoreSnapshot(ctx, snapshotAID); err != nil {
t.Fatalf("Failed to restore Snapshot A: %v", err)
}
verifyFileContent(t, clientAgate.GetActiveDir(), "file1.txt", "content1")
verifyFileContent(t, clientAgate.GetActiveDir(), "file2.txt", "content2")
verifyFileContent(t, clientAgate.GetActiveDir(), "file1.txt", "Version 1")
verifyFileContent(t, clientAgate.GetActiveDir(), "file2.txt", "Original Content")
t.Log("Snapshot A verified on client.")
// 2. Client downloads the second snapshot incrementally
t.Log("Client downloading Snapshot 2 (incrementally)...")
if err := clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshot2ID, snapshot1ID); err != nil {
t.Fatalf("Client failed to get snapshot 2: %v", err)
// --- 6. Server Creates Incremental Snapshot (B) ---
if err := os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("Version 2"), 0644); err != nil {
t.Fatal(err)
}
if err := os.WriteFile(filepath.Join(dataDir, "file3.txt"), []byte("New File"), 0644); err != nil {
t.Fatal(err)
}
if err := os.Remove(filepath.Join(dataDir, "file2.txt")); err != nil {
t.Fatal(err)
}
// Verify content of snapshot 2
if err := clientAgate.RestoreSnapshot(ctx, snapshot2ID); err != nil {
t.Fatalf("Failed to restore snapshot 2: %v", err)
snapshotBID, err := serverAgate.SaveSnapshot(ctx, "Snapshot B", snapshotAID)
if err != nil {
t.Fatalf("Failed to create Snapshot B: %v", err)
}
verifyFileContent(t, clientAgate.GetActiveDir(), "file1.txt", "modified content1")
verifyFileContent(t, clientAgate.GetActiveDir(), "file3.txt", "new file3")
// file2.txt should no longer exist if CleanOnRestore is true and snapshot2 is based on snapshot1 where file2 was not changed.
// But our diff logic is additive. Let's re-check the logic. The logic is: parent + diff = new. So file2 should exist.
verifyFileContent(t, clientAgate.GetActiveDir(), "file2.txt", "content2")
t.Logf("Created Snapshot B with ID: %s", snapshotBID)
// --- 7. Client Performs Incremental Download of Snapshot B ---
t.Log("Client performing incremental download of Snapshot B...")
parentIDOnClient := clientAgate.GetCurrentSnapshotID()
if parentIDOnClient != snapshotAID {
t.Fatalf("Client has incorrect current snapshot ID. Got %s, want %s", parentIDOnClient, snapshotAID)
}
if err := clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshotBID, parentIDOnClient); err != nil {
t.Fatalf("Client failed to get Snapshot B: %v", err)
}
// --- 8. Verify Final State on Client ---
if err := clientAgate.RestoreSnapshot(ctx, snapshotBID); err != nil {
t.Fatalf("Failed to restore Snapshot B: %v", err)
}
t.Log("Snapshot B restored on client. Verifying content...")
verifyFileContent(t, clientAgate.GetActiveDir(), "file1.txt", "Version 2")
verifyFileContent(t, clientAgate.GetActiveDir(), "file3.txt", "New File")
// Verify that file2.txt was removed because CleanOnRestore is true
if _, err := os.Stat(filepath.Join(clientAgate.GetActiveDir(), "file2.txt")); !os.IsNotExist(err) {
t.Errorf("file2.txt should have been removed after restoring Snapshot B, but it still exists.")
}
t.Log("Final state verified successfully!")
}
func verifyFileContent(t *testing.T, dir, filename, expectedContent string) {
@ -137,127 +140,3 @@ func verifyFileContent(t *testing.T, dir, filename, expectedContent string) {
t.Errorf("File %s has wrong content: got '%s', want '%s'", filename, string(content), expectedContent)
}
}
// TestGRPC_GetRemoteSnapshot_FullDownload tests a full download when no parent is specified.
func TestGRPC_GetRemoteSnapshot_FullDownload(t *testing.T) {
// --- Setup Server ---
serverDir, _ := os.MkdirTemp("", "agate-server-*")
defer os.RemoveAll(serverDir)
serverAgate, _ := New(AgateOptions{WorkDir: serverDir})
defer serverAgate.Close()
dataDir := serverAgate.options.BlobStore.GetActiveDir()
os.MkdirAll(dataDir, 0755)
os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("full download"), 0644)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
snapshotID, err := serverAgate.SaveSnapshot(ctx, "FullSnapshot", "")
if err != nil {
t.Fatalf("Failed to create snapshot: %v", err)
}
// Start Server
serverAddress := "localhost:50052"
server := remote.NewServer(serverAgate.manager)
go func() { server.Start(ctx, serverAddress) }()
defer server.Stop()
time.Sleep(100 * time.Millisecond)
// --- Setup Client ---
clientDir, _ := os.MkdirTemp("", "agate-client-*")
defer os.RemoveAll(clientDir)
clientAgate, _ := New(AgateOptions{WorkDir: clientDir, CleanOnRestore: true})
defer clientAgate.Close()
// --- Test Scenario ---
t.Log("Client performing full download...")
if err := clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshotID, ""); err != nil {
t.Fatalf("Client failed to get snapshot: %v", err)
}
// Verify content
if err := clientAgate.RestoreSnapshot(ctx, snapshotID); err != nil {
t.Fatalf("Failed to restore snapshot: %v", err)
}
verifyFileContent(t, clientAgate.GetActiveDir(), "file1.txt", "full download")
}
// TestGRPC_DownloadSnapshotDiff_Resumption tests the download resumption logic.
func TestGRPC_DownloadSnapshotDiff_Resumption(t *testing.T) {
// --- Setup Server ---
serverDir, _ := os.MkdirTemp("", "agate-server-*")
defer os.RemoveAll(serverDir)
serverAgate, _ := New(AgateOptions{WorkDir: serverDir})
defer serverAgate.Close()
dataDir := serverAgate.options.BlobStore.GetActiveDir()
os.MkdirAll(dataDir, 0755)
os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("content1"), 0644)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Snap 1
snapshot1ID, _ := serverAgate.SaveSnapshot(ctx, "Snap1", "")
// Snap 2 (with changes)
os.WriteFile(filepath.Join(dataDir, "file2.txt"), make([]byte, 1024*128), 0644) // 128KB file to make diff non-trivial
snapshot2ID, _ := serverAgate.SaveSnapshot(ctx, "Snap2", snapshot1ID)
// Start Server
serverAddress := "localhost:50053"
server := remote.NewServer(serverAgate.manager)
go func() { server.Start(ctx, serverAddress) }()
defer server.Stop()
time.Sleep(100 * time.Millisecond)
// --- Setup Client ---
clientDir, _ := os.MkdirTemp("", "agate-client-*")
defer os.RemoveAll(clientDir)
rClient, err := remote.NewClient(serverAddress)
if err != nil {
t.Fatalf("Failed to create remote client: %v", err)
}
defer rClient.Close()
// --- Test Scenario ---
// 1. Manually download first part of the diff archive
diffPath := filepath.Join(clientDir, "diff.zip.part")
diffReader, err := serverAgate.manager.StreamSnapshotDiff(ctx, snapshot2ID, snapshot1ID, 0)
if err != nil {
t.Fatalf("Failed to get diff stream from manager: %v", err)
}
defer diffReader.Close()
// Read first 64KB
firstChunk := make([]byte, 64*1024)
n, err := io.ReadFull(diffReader, firstChunk)
if err != nil && err != io.ErrUnexpectedEOF {
t.Fatalf("Failed to read first chunk: %v, read %d bytes", err, n)
}
if err := os.WriteFile(diffPath, firstChunk[:n], 0644); err != nil {
t.Fatalf("Failed to write partial file: %v", err)
}
diffReader.Close() // Simulate connection drop
// 2. Resume download using the client
t.Log("Resuming download...")
if err := rClient.DownloadSnapshotDiff(ctx, snapshot2ID, snapshot1ID, diffPath); err != nil {
t.Fatalf("Failed to resume download: %v", err)
}
// 3. Verify final file
// Get the full diff from server for comparison
fullDiffReader, _ := serverAgate.manager.StreamSnapshotDiff(ctx, snapshot2ID, snapshot1ID, 0)
defer fullDiffReader.Close()
fullDiffData, _ := io.ReadAll(fullDiffReader)
resumedData, _ := os.ReadFile(diffPath)
if len(resumedData) != len(fullDiffData) {
t.Errorf("Resumed file size is incorrect. Got %d, want %d", len(resumedData), len(fullDiffData))
}
if sha256.Sum256(resumedData) != sha256.Sum256(fullDiffData) {
t.Error("File content mismatch after resumption")
}
}