264 lines
9.0 KiB
Go
264 lines
9.0 KiB
Go
package agate
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha256"
|
|
"io"
|
|
"log"
|
|
"os"
|
|
"path/filepath"
|
|
"testing"
|
|
"time"
|
|
|
|
"gitea.unprism.ru/KRBL/Agate/remote"
|
|
)
|
|
|
|
// TestGRPCServerClient tests the interaction between a gRPC server and client.
|
|
// It creates multiple snapshots with different content on the server,
|
|
// connects a client to the server, downloads the latest snapshot,
|
|
// and verifies the contents of the files.
|
|
func TestGRPCServerClient(t *testing.T) {
|
|
// Skip this test in short mode
|
|
if testing.Short() {
|
|
t.Skip("Skipping gRPC server-client test in short mode")
|
|
}
|
|
|
|
// --- Setup Server ---
|
|
serverDir, err := os.MkdirTemp("", "agate-server-*")
|
|
if err != nil {
|
|
t.Fatalf("Failed to create server temp directory: %v", err)
|
|
}
|
|
defer os.RemoveAll(serverDir)
|
|
|
|
serverAgate, err := New(AgateOptions{WorkDir: serverDir})
|
|
if err != nil {
|
|
t.Fatalf("Failed to create server Agate instance: %v", err)
|
|
}
|
|
defer serverAgate.Close()
|
|
|
|
dataDir := serverAgate.options.BlobStore.GetActiveDir()
|
|
if err := os.MkdirAll(dataDir, 0755); err != nil {
|
|
t.Fatalf("Failed to create data directory: %v", err)
|
|
}
|
|
|
|
// Create initial test files for the first snapshot
|
|
if err := os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("content1"), 0644); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if err := os.WriteFile(filepath.Join(dataDir, "file2.txt"), []byte("content2"), 0644); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
snapshot1ID, err := serverAgate.SaveSnapshot(ctx, "Snapshot 1", "")
|
|
if err != nil {
|
|
t.Fatalf("Failed to create first snapshot: %v", err)
|
|
}
|
|
t.Logf("Created first snapshot with ID: %s", snapshot1ID)
|
|
|
|
// Modify content for the second snapshot
|
|
if err := os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("modified content1"), 0644); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
if err := os.WriteFile(filepath.Join(dataDir, "file3.txt"), []byte("new file3"), 0644); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
|
|
snapshot2ID, err := serverAgate.SaveSnapshot(ctx, "Snapshot 2", snapshot1ID)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create second snapshot: %v", err)
|
|
}
|
|
t.Logf("Created second snapshot with ID: %s", snapshot2ID)
|
|
|
|
// Start the gRPC server
|
|
serverAddress := "localhost:50051"
|
|
server := remote.NewServer(serverAgate.manager)
|
|
go func() {
|
|
if err := server.Start(ctx, serverAddress); err != nil {
|
|
log.Printf("Server start error: %v", err)
|
|
}
|
|
}()
|
|
defer server.Stop()
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
// --- Setup Client ---
|
|
clientDir, err := os.MkdirTemp("", "agate-client-*")
|
|
if err != nil {
|
|
t.Fatalf("Failed to create client temp directory: %v", err)
|
|
}
|
|
defer os.RemoveAll(clientDir)
|
|
|
|
clientAgate, err := New(AgateOptions{WorkDir: clientDir, CleanOnRestore: true})
|
|
if err != nil {
|
|
t.Fatalf("Failed to create client Agate instance: %v", err)
|
|
}
|
|
defer clientAgate.Close()
|
|
|
|
// --- Test Scenario ---
|
|
// 1. Client downloads the first snapshot completely
|
|
t.Log("Client downloading Snapshot 1...")
|
|
if err := clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshot1ID, ""); err != nil {
|
|
t.Fatalf("Client failed to get snapshot 1: %v", err)
|
|
}
|
|
|
|
// Verify content of snapshot 1
|
|
if err := clientAgate.RestoreSnapshot(ctx, snapshot1ID); err != nil {
|
|
t.Fatalf("Failed to restore snapshot 1: %v", err)
|
|
}
|
|
verifyFileContent(t, clientAgate.GetActiveDir(), "file1.txt", "content1")
|
|
verifyFileContent(t, clientAgate.GetActiveDir(), "file2.txt", "content2")
|
|
|
|
// 2. Client downloads the second snapshot incrementally
|
|
t.Log("Client downloading Snapshot 2 (incrementally)...")
|
|
if err := clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshot2ID, snapshot1ID); err != nil {
|
|
t.Fatalf("Client failed to get snapshot 2: %v", err)
|
|
}
|
|
|
|
// Verify content of snapshot 2
|
|
if err := clientAgate.RestoreSnapshot(ctx, snapshot2ID); err != nil {
|
|
t.Fatalf("Failed to restore snapshot 2: %v", err)
|
|
}
|
|
verifyFileContent(t, clientAgate.GetActiveDir(), "file1.txt", "modified content1")
|
|
verifyFileContent(t, clientAgate.GetActiveDir(), "file3.txt", "new file3")
|
|
// file2.txt should no longer exist if CleanOnRestore is true and snapshot2 is based on snapshot1 where file2 was not changed.
|
|
// But our diff logic is additive. Let's re-check the logic. The logic is: parent + diff = new. So file2 should exist.
|
|
verifyFileContent(t, clientAgate.GetActiveDir(), "file2.txt", "content2")
|
|
}
|
|
|
|
func verifyFileContent(t *testing.T, dir, filename, expectedContent string) {
|
|
t.Helper()
|
|
content, err := os.ReadFile(filepath.Join(dir, filename))
|
|
if err != nil {
|
|
t.Fatalf("Failed to read file %s: %v", filename, err)
|
|
}
|
|
if string(content) != expectedContent {
|
|
t.Errorf("File %s has wrong content: got '%s', want '%s'", filename, string(content), expectedContent)
|
|
}
|
|
}
|
|
|
|
// TestGRPC_GetRemoteSnapshot_FullDownload tests a full download when no parent is specified.
|
|
func TestGRPC_GetRemoteSnapshot_FullDownload(t *testing.T) {
|
|
// --- Setup Server ---
|
|
serverDir, _ := os.MkdirTemp("", "agate-server-*")
|
|
defer os.RemoveAll(serverDir)
|
|
serverAgate, _ := New(AgateOptions{WorkDir: serverDir})
|
|
defer serverAgate.Close()
|
|
dataDir := serverAgate.options.BlobStore.GetActiveDir()
|
|
os.MkdirAll(dataDir, 0755)
|
|
os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("full download"), 0644)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
snapshotID, err := serverAgate.SaveSnapshot(ctx, "FullSnapshot", "")
|
|
if err != nil {
|
|
t.Fatalf("Failed to create snapshot: %v", err)
|
|
}
|
|
|
|
// Start Server
|
|
serverAddress := "localhost:50052"
|
|
server := remote.NewServer(serverAgate.manager)
|
|
go func() { server.Start(ctx, serverAddress) }()
|
|
defer server.Stop()
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
// --- Setup Client ---
|
|
clientDir, _ := os.MkdirTemp("", "agate-client-*")
|
|
defer os.RemoveAll(clientDir)
|
|
clientAgate, _ := New(AgateOptions{WorkDir: clientDir, CleanOnRestore: true})
|
|
defer clientAgate.Close()
|
|
|
|
// --- Test Scenario ---
|
|
t.Log("Client performing full download...")
|
|
if err := clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshotID, ""); err != nil {
|
|
t.Fatalf("Client failed to get snapshot: %v", err)
|
|
}
|
|
|
|
// Verify content
|
|
if err := clientAgate.RestoreSnapshot(ctx, snapshotID); err != nil {
|
|
t.Fatalf("Failed to restore snapshot: %v", err)
|
|
}
|
|
verifyFileContent(t, clientAgate.GetActiveDir(), "file1.txt", "full download")
|
|
}
|
|
|
|
// TestGRPC_DownloadSnapshotDiff_Resumption tests the download resumption logic.
|
|
func TestGRPC_DownloadSnapshotDiff_Resumption(t *testing.T) {
|
|
// --- Setup Server ---
|
|
serverDir, _ := os.MkdirTemp("", "agate-server-*")
|
|
defer os.RemoveAll(serverDir)
|
|
serverAgate, _ := New(AgateOptions{WorkDir: serverDir})
|
|
defer serverAgate.Close()
|
|
dataDir := serverAgate.options.BlobStore.GetActiveDir()
|
|
os.MkdirAll(dataDir, 0755)
|
|
os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("content1"), 0644)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
// Snap 1
|
|
snapshot1ID, _ := serverAgate.SaveSnapshot(ctx, "Snap1", "")
|
|
// Snap 2 (with changes)
|
|
os.WriteFile(filepath.Join(dataDir, "file2.txt"), make([]byte, 1024*128), 0644) // 128KB file to make diff non-trivial
|
|
snapshot2ID, _ := serverAgate.SaveSnapshot(ctx, "Snap2", snapshot1ID)
|
|
|
|
// Start Server
|
|
serverAddress := "localhost:50053"
|
|
server := remote.NewServer(serverAgate.manager)
|
|
go func() { server.Start(ctx, serverAddress) }()
|
|
defer server.Stop()
|
|
time.Sleep(100 * time.Millisecond)
|
|
|
|
// --- Setup Client ---
|
|
clientDir, _ := os.MkdirTemp("", "agate-client-*")
|
|
defer os.RemoveAll(clientDir)
|
|
rClient, err := remote.NewClient(serverAddress)
|
|
if err != nil {
|
|
t.Fatalf("Failed to create remote client: %v", err)
|
|
}
|
|
defer rClient.Close()
|
|
|
|
// --- Test Scenario ---
|
|
// 1. Manually download first part of the diff archive
|
|
diffPath := filepath.Join(clientDir, "diff.zip.part")
|
|
diffReader, err := serverAgate.manager.StreamSnapshotDiff(ctx, snapshot2ID, snapshot1ID, 0)
|
|
if err != nil {
|
|
t.Fatalf("Failed to get diff stream from manager: %v", err)
|
|
}
|
|
defer diffReader.Close()
|
|
|
|
// Read first 64KB
|
|
firstChunk := make([]byte, 64*1024)
|
|
n, err := io.ReadFull(diffReader, firstChunk)
|
|
if err != nil && err != io.ErrUnexpectedEOF {
|
|
t.Fatalf("Failed to read first chunk: %v, read %d bytes", err, n)
|
|
}
|
|
if err := os.WriteFile(diffPath, firstChunk[:n], 0644); err != nil {
|
|
t.Fatalf("Failed to write partial file: %v", err)
|
|
}
|
|
diffReader.Close() // Simulate connection drop
|
|
|
|
// 2. Resume download using the client
|
|
t.Log("Resuming download...")
|
|
if err := rClient.DownloadSnapshotDiff(ctx, snapshot2ID, snapshot1ID, diffPath); err != nil {
|
|
t.Fatalf("Failed to resume download: %v", err)
|
|
}
|
|
|
|
// 3. Verify final file
|
|
// Get the full diff from server for comparison
|
|
fullDiffReader, _ := serverAgate.manager.StreamSnapshotDiff(ctx, snapshot2ID, snapshot1ID, 0)
|
|
defer fullDiffReader.Close()
|
|
fullDiffData, _ := io.ReadAll(fullDiffReader)
|
|
|
|
resumedData, _ := os.ReadFile(diffPath)
|
|
|
|
if len(resumedData) != len(fullDiffData) {
|
|
t.Errorf("Resumed file size is incorrect. Got %d, want %d", len(resumedData), len(fullDiffData))
|
|
}
|
|
|
|
if sha256.Sum256(resumedData) != sha256.Sum256(fullDiffData) {
|
|
t.Error("File content mismatch after resumption")
|
|
}
|
|
}
|