Files
Agate/grpc_test.go

264 lines
9.0 KiB
Go

package agate
import (
"context"
"crypto/sha256"
"io"
"log"
"os"
"path/filepath"
"testing"
"time"
"gitea.unprism.ru/KRBL/Agate/remote"
)
// TestGRPCServerClient tests the interaction between a gRPC server and client.
// It creates multiple snapshots with different content on the server,
// connects a client to the server, downloads the latest snapshot,
// and verifies the contents of the files.
func TestGRPCServerClient(t *testing.T) {
// Skip this test in short mode
if testing.Short() {
t.Skip("Skipping gRPC server-client test in short mode")
}
// --- Setup Server ---
serverDir, err := os.MkdirTemp("", "agate-server-*")
if err != nil {
t.Fatalf("Failed to create server temp directory: %v", err)
}
defer os.RemoveAll(serverDir)
serverAgate, err := New(AgateOptions{WorkDir: serverDir})
if err != nil {
t.Fatalf("Failed to create server Agate instance: %v", err)
}
defer serverAgate.Close()
dataDir := serverAgate.options.BlobStore.GetActiveDir()
if err := os.MkdirAll(dataDir, 0755); err != nil {
t.Fatalf("Failed to create data directory: %v", err)
}
// Create initial test files for the first snapshot
if err := os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("content1"), 0644); err != nil {
t.Fatal(err)
}
if err := os.WriteFile(filepath.Join(dataDir, "file2.txt"), []byte("content2"), 0644); err != nil {
t.Fatal(err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
snapshot1ID, err := serverAgate.SaveSnapshot(ctx, "Snapshot 1", "")
if err != nil {
t.Fatalf("Failed to create first snapshot: %v", err)
}
t.Logf("Created first snapshot with ID: %s", snapshot1ID)
// Modify content for the second snapshot
if err := os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("modified content1"), 0644); err != nil {
t.Fatal(err)
}
if err := os.WriteFile(filepath.Join(dataDir, "file3.txt"), []byte("new file3"), 0644); err != nil {
t.Fatal(err)
}
snapshot2ID, err := serverAgate.SaveSnapshot(ctx, "Snapshot 2", snapshot1ID)
if err != nil {
t.Fatalf("Failed to create second snapshot: %v", err)
}
t.Logf("Created second snapshot with ID: %s", snapshot2ID)
// Start the gRPC server
serverAddress := "localhost:50051"
server := remote.NewServer(serverAgate.manager)
go func() {
if err := server.Start(ctx, serverAddress); err != nil {
log.Printf("Server start error: %v", err)
}
}()
defer server.Stop()
time.Sleep(100 * time.Millisecond)
// --- Setup Client ---
clientDir, err := os.MkdirTemp("", "agate-client-*")
if err != nil {
t.Fatalf("Failed to create client temp directory: %v", err)
}
defer os.RemoveAll(clientDir)
clientAgate, err := New(AgateOptions{WorkDir: clientDir, CleanOnRestore: true})
if err != nil {
t.Fatalf("Failed to create client Agate instance: %v", err)
}
defer clientAgate.Close()
// --- Test Scenario ---
// 1. Client downloads the first snapshot completely
t.Log("Client downloading Snapshot 1...")
if err := clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshot1ID, ""); err != nil {
t.Fatalf("Client failed to get snapshot 1: %v", err)
}
// Verify content of snapshot 1
if err := clientAgate.RestoreSnapshot(ctx, snapshot1ID); err != nil {
t.Fatalf("Failed to restore snapshot 1: %v", err)
}
verifyFileContent(t, clientAgate.GetActiveDir(), "file1.txt", "content1")
verifyFileContent(t, clientAgate.GetActiveDir(), "file2.txt", "content2")
// 2. Client downloads the second snapshot incrementally
t.Log("Client downloading Snapshot 2 (incrementally)...")
if err := clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshot2ID, snapshot1ID); err != nil {
t.Fatalf("Client failed to get snapshot 2: %v", err)
}
// Verify content of snapshot 2
if err := clientAgate.RestoreSnapshot(ctx, snapshot2ID); err != nil {
t.Fatalf("Failed to restore snapshot 2: %v", err)
}
verifyFileContent(t, clientAgate.GetActiveDir(), "file1.txt", "modified content1")
verifyFileContent(t, clientAgate.GetActiveDir(), "file3.txt", "new file3")
// file2.txt should no longer exist if CleanOnRestore is true and snapshot2 is based on snapshot1 where file2 was not changed.
// But our diff logic is additive. Let's re-check the logic. The logic is: parent + diff = new. So file2 should exist.
verifyFileContent(t, clientAgate.GetActiveDir(), "file2.txt", "content2")
}
func verifyFileContent(t *testing.T, dir, filename, expectedContent string) {
t.Helper()
content, err := os.ReadFile(filepath.Join(dir, filename))
if err != nil {
t.Fatalf("Failed to read file %s: %v", filename, err)
}
if string(content) != expectedContent {
t.Errorf("File %s has wrong content: got '%s', want '%s'", filename, string(content), expectedContent)
}
}
// TestGRPC_GetRemoteSnapshot_FullDownload tests a full download when no parent is specified.
func TestGRPC_GetRemoteSnapshot_FullDownload(t *testing.T) {
// --- Setup Server ---
serverDir, _ := os.MkdirTemp("", "agate-server-*")
defer os.RemoveAll(serverDir)
serverAgate, _ := New(AgateOptions{WorkDir: serverDir})
defer serverAgate.Close()
dataDir := serverAgate.options.BlobStore.GetActiveDir()
os.MkdirAll(dataDir, 0755)
os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("full download"), 0644)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
snapshotID, err := serverAgate.SaveSnapshot(ctx, "FullSnapshot", "")
if err != nil {
t.Fatalf("Failed to create snapshot: %v", err)
}
// Start Server
serverAddress := "localhost:50052"
server := remote.NewServer(serverAgate.manager)
go func() { server.Start(ctx, serverAddress) }()
defer server.Stop()
time.Sleep(100 * time.Millisecond)
// --- Setup Client ---
clientDir, _ := os.MkdirTemp("", "agate-client-*")
defer os.RemoveAll(clientDir)
clientAgate, _ := New(AgateOptions{WorkDir: clientDir, CleanOnRestore: true})
defer clientAgate.Close()
// --- Test Scenario ---
t.Log("Client performing full download...")
if err := clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshotID, ""); err != nil {
t.Fatalf("Client failed to get snapshot: %v", err)
}
// Verify content
if err := clientAgate.RestoreSnapshot(ctx, snapshotID); err != nil {
t.Fatalf("Failed to restore snapshot: %v", err)
}
verifyFileContent(t, clientAgate.GetActiveDir(), "file1.txt", "full download")
}
// TestGRPC_DownloadSnapshotDiff_Resumption tests the download resumption logic.
func TestGRPC_DownloadSnapshotDiff_Resumption(t *testing.T) {
// --- Setup Server ---
serverDir, _ := os.MkdirTemp("", "agate-server-*")
defer os.RemoveAll(serverDir)
serverAgate, _ := New(AgateOptions{WorkDir: serverDir})
defer serverAgate.Close()
dataDir := serverAgate.options.BlobStore.GetActiveDir()
os.MkdirAll(dataDir, 0755)
os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("content1"), 0644)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Snap 1
snapshot1ID, _ := serverAgate.SaveSnapshot(ctx, "Snap1", "")
// Snap 2 (with changes)
os.WriteFile(filepath.Join(dataDir, "file2.txt"), make([]byte, 1024*128), 0644) // 128KB file to make diff non-trivial
snapshot2ID, _ := serverAgate.SaveSnapshot(ctx, "Snap2", snapshot1ID)
// Start Server
serverAddress := "localhost:50053"
server := remote.NewServer(serverAgate.manager)
go func() { server.Start(ctx, serverAddress) }()
defer server.Stop()
time.Sleep(100 * time.Millisecond)
// --- Setup Client ---
clientDir, _ := os.MkdirTemp("", "agate-client-*")
defer os.RemoveAll(clientDir)
rClient, err := remote.NewClient(serverAddress)
if err != nil {
t.Fatalf("Failed to create remote client: %v", err)
}
defer rClient.Close()
// --- Test Scenario ---
// 1. Manually download first part of the diff archive
diffPath := filepath.Join(clientDir, "diff.zip.part")
diffReader, err := serverAgate.manager.StreamSnapshotDiff(ctx, snapshot2ID, snapshot1ID, 0)
if err != nil {
t.Fatalf("Failed to get diff stream from manager: %v", err)
}
defer diffReader.Close()
// Read first 64KB
firstChunk := make([]byte, 64*1024)
n, err := io.ReadFull(diffReader, firstChunk)
if err != nil && err != io.ErrUnexpectedEOF {
t.Fatalf("Failed to read first chunk: %v, read %d bytes", err, n)
}
if err := os.WriteFile(diffPath, firstChunk[:n], 0644); err != nil {
t.Fatalf("Failed to write partial file: %v", err)
}
diffReader.Close() // Simulate connection drop
// 2. Resume download using the client
t.Log("Resuming download...")
if err := rClient.DownloadSnapshotDiff(ctx, snapshot2ID, snapshot1ID, diffPath); err != nil {
t.Fatalf("Failed to resume download: %v", err)
}
// 3. Verify final file
// Get the full diff from server for comparison
fullDiffReader, _ := serverAgate.manager.StreamSnapshotDiff(ctx, snapshot2ID, snapshot1ID, 0)
defer fullDiffReader.Close()
fullDiffData, _ := io.ReadAll(fullDiffReader)
resumedData, _ := os.ReadFile(diffPath)
if len(resumedData) != len(fullDiffData) {
t.Errorf("Resumed file size is incorrect. Got %d, want %d", len(resumedData), len(fullDiffData))
}
if sha256.Sum256(resumedData) != sha256.Sum256(fullDiffData) {
t.Error("File content mismatch after resumption")
}
}