diff --git a/api.go b/api.go index 0cae473..d09b744 100644 --- a/api.go +++ b/api.go @@ -5,15 +5,13 @@ import ( "errors" "fmt" "gitea.unprism.ru/KRBL/Agate/archive" - "gitea.unprism.ru/KRBL/Agate/grpc" - "gitea.unprism.ru/KRBL/Agate/hash" "gitea.unprism.ru/KRBL/Agate/interfaces" + "gitea.unprism.ru/KRBL/Agate/remote" "io" "log" "os" "path/filepath" "sync" - "time" "gitea.unprism.ru/KRBL/Agate/store" "gitea.unprism.ru/KRBL/Agate/stores" @@ -350,429 +348,81 @@ func (a *Agate) Close() error { // StartServer starts a gRPC server to share snapshots. func (a *Agate) StartServer(ctx context.Context, address string) error { - _, err := grpc.RunServer(ctx, a.manager, address) - if err != nil { - return fmt.Errorf("failed to start server: %w", err) - } - - // We don't store the server reference because we don't have a way to stop it yet - // In a future version, we could add a StopServer method - - return nil + // Использование нового remote.Server + server := remote.NewServer(a.manager) + return server.Start(ctx, address) } -// ConnectRemote connects to a remote snapshot server. -// Returns a client that can be used to interact with the remote server. -func (a *Agate) ConnectRemote(address string) (*grpc.SnapshotClient, error) { - client, err := grpc.ConnectToServer(address) - if err != nil { - return nil, fmt.Errorf("failed to connect to remote server: %w", err) - } - - return client, nil -} - -// GetRemoteSnapshotList retrieves a list of snapshots from a remote server. -func (a *Agate) GetRemoteSnapshotList(ctx context.Context, address string) ([]store.SnapshotInfo, error) { - client, err := a.ConnectRemote(address) - if err != nil { - return nil, err - } - defer client.Close() - - return client.ListSnapshots(ctx) -} - -// RegisterLocalSnapshot registers a local snapshot from an archive path with a specified UUID. -// The archive must be a valid ZIP file containing the snapshot files. -// If the UUID already exists, an error will be returned. -func (a *Agate) RegisterLocalSnapshot(ctx context.Context, archivePath string, snapshotID string, name string) error { - a.mutex.Lock() - defer a.mutex.Unlock() - - a.options.Logger.Printf("Registering local snapshot from archive %s with ID %s", archivePath, snapshotID) - - // Check if the archive file exists - if _, err := os.Stat(archivePath); os.IsNotExist(err) { - return fmt.Errorf("archive file does not exist: %w", err) - } - - // Check if a snapshot with this ID already exists - _, err := a.options.MetadataStore.GetSnapshotMetadata(ctx, snapshotID) - if err == nil { - return fmt.Errorf("snapshot with ID %s already exists", snapshotID) - } else if !errors.Is(err, store.ErrNotFound) { - return fmt.Errorf("failed to check if snapshot exists: %w", err) - } - - // Create a temporary directory for extracting the archive - tempDir := filepath.Join(a.options.WorkDir, "temp_extract", snapshotID) - if err := os.MkdirAll(tempDir, 0755); err != nil { - return fmt.Errorf("failed to create temporary directory: %w", err) - } - defer os.RemoveAll(tempDir) // Clean up when done - - // Extract the archive to the temporary directory to analyze its contents - if err := extractArchive(archivePath, tempDir); err != nil { - return fmt.Errorf("failed to extract archive: %w", err) - } - - // Get the list of files in the archive - var files []store.FileInfo - err = filepath.Walk(tempDir, func(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } - - // Skip the root directory - if path == tempDir { - return nil - } - - // Get the relative path - relPath, err := filepath.Rel(tempDir, path) - if err != nil { - return fmt.Errorf("failed to get relative path: %w", err) - } - - // Calculate SHA256 for files (not directories) - var sha256 string - if !info.IsDir() { - // Calculate the hash directly from the file path - sha256, err = hash.CalculateFileHash(path) - if err != nil { - return fmt.Errorf("failed to calculate file hash: %w", err) - } - } - - // Add file info to the list - files = append(files, store.FileInfo{ - Path: relPath, - Size: info.Size(), - IsDir: info.IsDir(), - SHA256: sha256, - }) - - return nil - }) - - if err != nil { - return fmt.Errorf("failed to analyze archive contents: %w", err) - } - - // Copy the archive to the blob store - archiveFile, err := os.Open(archivePath) - if err != nil { - return fmt.Errorf("failed to open archive file: %w", err) - } - defer archiveFile.Close() - - // Store the blob with the specified snapshot ID - _, err = a.options.BlobStore.StoreBlob(ctx, snapshotID, archiveFile) - if err != nil { - return fmt.Errorf("failed to store blob: %w", err) - } - - // Create and save the snapshot metadata - snapshot := store.Snapshot{ - ID: snapshotID, - Name: name, - ParentID: "", - CreationTime: time.Now(), - Files: files, - } - - err = a.options.MetadataStore.SaveSnapshotMetadata(ctx, snapshot) - if err != nil { - return fmt.Errorf("failed to save snapshot metadata: %w", err) - } - - a.options.Logger.Printf("Successfully registered local snapshot with ID %s", snapshotID) - return nil -} - -// GetRemoteSnapshotMetadata downloads only the metadata of a snapshot from a remote server. -// If address is empty, it will try to restore the metadata from the local blob. -func (a *Agate) GetRemoteSnapshotMetadata(ctx context.Context, address string, snapshotID string) error { - a.mutex.Lock() - defer a.mutex.Unlock() - - // Check if the snapshot already exists locally - _, err := a.options.MetadataStore.GetSnapshotMetadata(ctx, snapshotID) - if err == nil { - a.options.Logger.Printf("Snapshot %s already exists locally", snapshotID) - return nil - } else if !errors.Is(err, store.ErrNotFound) { - return fmt.Errorf("failed to check if snapshot exists: %w", err) - } - - // If address is provided, download metadata from remote server - if address != "" { - a.options.Logger.Printf("Downloading metadata for snapshot %s from %s", snapshotID, address) - - client, err := a.ConnectRemote(address) - if err != nil { - return fmt.Errorf("failed to connect to remote server: %w", err) - } - defer client.Close() - - // Get the remote snapshot details - remoteSnapshot, err := client.FetchSnapshotDetails(ctx, snapshotID) - if err != nil { - return fmt.Errorf("failed to get snapshot details: %w", err) - } - - // Save the remote snapshot metadata - err = a.options.MetadataStore.SaveSnapshotMetadata(ctx, *remoteSnapshot) - if err != nil { - return fmt.Errorf("failed to save snapshot metadata: %w", err) - } - - a.options.Logger.Printf("Successfully downloaded metadata for snapshot %s", snapshotID) - return nil - } - - // If no address is provided, try to restore metadata from the local blob - a.options.Logger.Printf("Trying to restore metadata for snapshot %s from local blob", snapshotID) - - // Check if the blob exists - blobPath, err := a.options.BlobStore.GetBlobPath(ctx, snapshotID) - if err != nil { - return fmt.Errorf("failed to get blob path: %w", err) - } - - if _, err := os.Stat(blobPath); os.IsNotExist(err) { - return fmt.Errorf("blob for snapshot %s does not exist", snapshotID) - } - - // Create a temporary directory for extracting the archive - tempDir := filepath.Join(a.options.WorkDir, "temp_extract", snapshotID) - if err := os.MkdirAll(tempDir, 0755); err != nil { - return fmt.Errorf("failed to create temporary directory: %w", err) - } - defer os.RemoveAll(tempDir) // Clean up when done - - // Extract the archive to the temporary directory to analyze its contents - if err := extractArchive(blobPath, tempDir); err != nil { - return fmt.Errorf("failed to extract archive: %w", err) - } - - // Get the list of files in the archive - var files []store.FileInfo - err = filepath.Walk(tempDir, func(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } - - // Skip the root directory - if path == tempDir { - return nil - } - - // Get the relative path - relPath, err := filepath.Rel(tempDir, path) - if err != nil { - return fmt.Errorf("failed to get relative path: %w", err) - } - - // Calculate SHA256 for files (not directories) - var sha256 string - if !info.IsDir() { - // Calculate the hash directly from the file path - sha256, err = hash.CalculateFileHash(path) - if err != nil { - return fmt.Errorf("failed to calculate file hash: %w", err) - } - } - - // Add file info to the list - files = append(files, store.FileInfo{ - Path: relPath, - Size: info.Size(), - IsDir: info.IsDir(), - SHA256: sha256, - }) - - return nil - }) - - if err != nil { - return fmt.Errorf("failed to analyze archive contents: %w", err) - } - - // Create and save the snapshot metadata - snapshot := store.Snapshot{ - ID: snapshotID, - Name: snapshotID, // Use the ID as the name since we don't have a better name - ParentID: "", - CreationTime: time.Now(), - Files: files, - } - - err = a.options.MetadataStore.SaveSnapshotMetadata(ctx, snapshot) - if err != nil { - return fmt.Errorf("failed to save snapshot metadata: %w", err) - } - - a.options.Logger.Printf("Successfully restored metadata for snapshot %s from local blob", snapshotID) - return nil -} - -// GetRemoteSnapshot downloads a snapshot from a remote server. -// If localParentID is provided, it will be used to optimize the download by skipping files that already exist locally. +// GetRemoteSnapshot downloads a snapshot from a remote server, using an efficient differential update. func (a *Agate) GetRemoteSnapshot(ctx context.Context, address string, snapshotID string, localParentID string) error { - client, err := a.ConnectRemote(address) + client, err := remote.NewClient(address) if err != nil { return err } defer client.Close() - // Get the remote snapshot details remoteSnapshot, err := client.FetchSnapshotDetails(ctx, snapshotID) - if err != nil { - return fmt.Errorf("failed to get snapshot details: %w", err) - } - - // Create a temporary directory for downloading files - tempDownloadDir := filepath.Join(a.options.WorkDir, "temp_download", snapshotID) - if err := os.MkdirAll(tempDownloadDir, 0755); err != nil { - return fmt.Errorf("failed to create temporary download directory: %w", err) - } - defer os.RemoveAll(tempDownloadDir) // Clean up when done - - a.options.Logger.Printf("Downloading snapshot %s from %s", snapshotID, address) - - // If localParentID is provided, try to reuse files from the local parent snapshot - if localParentID != "" { - a.options.Logger.Printf("Using local parent snapshot %s for incremental download", localParentID) - - // Get the local parent snapshot details - localParent, err := a.GetSnapshotDetails(ctx, localParentID) - if err != nil { - a.options.Logger.Printf("Warning: Failed to get local parent snapshot details: %v", err) - } else { - // Extract the local parent snapshot to a temporary directory - localParentDir := filepath.Join(a.options.WorkDir, "temp_download", localParentID) - if err := os.MkdirAll(localParentDir, 0755); err != nil { - a.options.Logger.Printf("Warning: Failed to create temporary directory for local parent: %v", err) - } else { - defer os.RemoveAll(localParentDir) // Clean up when done - - if err := a.manager.ExtractSnapshot(ctx, localParentID, localParentDir, true); err != nil { - a.options.Logger.Printf("Warning: Failed to extract local parent snapshot: %v", err) - } else { - // Copy unchanged files from the local parent to the download directory - for _, file := range remoteSnapshot.Files { - // Skip directories, they'll be created as needed - if file.IsDir { - continue - } - - // Check if the file exists in the local parent with the same hash - var localFile *store.FileInfo - for _, lf := range localParent.Files { - if lf.Path == file.Path && lf.SHA256 == file.SHA256 { - localFile = &lf - break - } - } - - if localFile != nil { - // File exists in local parent with the same hash, copy it - srcPath := filepath.Join(localParentDir, localFile.Path) - dstPath := filepath.Join(tempDownloadDir, file.Path) - - // Ensure the destination directory exists - if err := os.MkdirAll(filepath.Dir(dstPath), 0755); err != nil { - a.options.Logger.Printf("Failed to create directory for %s: %v", dstPath, err) - continue - } - - // Copy the file - srcFile, err := os.Open(srcPath) - if err != nil { - a.options.Logger.Printf("Failed to copy file %s, will download instead: %v", file.Path, err) - continue - } - defer srcFile.Close() - - dstFile, err := os.Create(dstPath) - if err != nil { - a.options.Logger.Printf("Failed to create destination file %s: %v", dstPath, err) - continue - } - - _, err = io.Copy(dstFile, srcFile) - dstFile.Close() - - if err != nil { - a.options.Logger.Printf("Failed to copy file data for %s: %v", file.Path, err) - // If copy fails, the file will be downloaded - } else { - a.options.Logger.Printf("Reusing file %s from local parent", file.Path) - } - } - } - } - } - } - } - - // Download the snapshot files - a.options.Logger.Printf("Downloading files for snapshot %s", snapshotID) - - // Get snapshot details to know what files we need to download - remoteDetails, err := client.FetchSnapshotDetails(ctx, snapshotID) if err != nil { return fmt.Errorf("failed to get remote snapshot details: %w", err) } - // Check which files we already have and which we need to download - for _, file := range remoteDetails.Files { - if file.IsDir { - continue // Skip directories - } + // 1. Подготовка + tempDownloadDir := filepath.Join(a.options.WorkDir, "temp_download") + if err := os.MkdirAll(tempDownloadDir, 0755); err != nil { + return fmt.Errorf("failed to create temp download dir: %w", err) + } + diffArchivePath := filepath.Join(tempDownloadDir, snapshotID+"_diff.zip") + diffPartPath := diffArchivePath + ".part" - filePath := filepath.Join(tempDownloadDir, file.Path) - if _, err := os.Stat(filePath); os.IsNotExist(err) { - // File doesn't exist yet, we'll need to download it - a.options.Logger.Printf("Downloading file %s", file.Path) + // 2. Скачивание дельты с докачкой + a.options.Logger.Printf("Downloading diff for snapshot %s from parent %s", snapshotID, localParentID) + if err := client.DownloadSnapshotDiff(ctx, snapshotID, localParentID, diffPartPath); err != nil { + return fmt.Errorf("failed to download snapshot diff: %w", err) + } + if err := os.Rename(diffPartPath, diffArchivePath); err != nil { + return fmt.Errorf("failed to finalize downloaded diff: %w", err) + } + defer os.Remove(diffArchivePath) + + // 3. Атомарное применение + // Создаем новую директорию для снапшота + newSnapshotDir := filepath.Join(tempDownloadDir, "new_content_"+snapshotID) + if err := os.MkdirAll(newSnapshotDir, 0755); err != nil { + return fmt.Errorf("failed to create new snapshot directory: %w", err) + } + defer os.RemoveAll(newSnapshotDir) + + // Если есть родитель, извлекаем его содержимое + if localParentID != "" { + if err := a.manager.ExtractSnapshot(ctx, localParentID, newSnapshotDir, false); err != nil { + a.options.Logger.Printf("Warning: failed to extract local parent snapshot %s: %v", localParentID, err) } } - if err := client.DownloadSnapshot(ctx, snapshotID, tempDownloadDir, localParentID); err != nil { - return fmt.Errorf("failed to download snapshot: %w", err) + // Распаковываем дельта-архив поверх + if err := extractArchive(diffArchivePath, newSnapshotDir); err != nil { + return fmt.Errorf("failed to extract diff archive: %w", err) } - a.options.Logger.Printf("Creating archive from downloaded files") - - // Create a zip archive from the downloaded files - zipPath := filepath.Join(a.options.WorkDir, "temp_download", snapshotID+".zip") - if err := archive.CreateArchive(tempDownloadDir, zipPath); err != nil { - return fmt.Errorf("failed to create zip archive: %w", err) + // 4. Создаем финальный архив и регистрируем снапшот + finalArchivePath := filepath.Join(tempDownloadDir, snapshotID+".zip") + if err := archive.CreateArchive(newSnapshotDir, finalArchivePath); err != nil { + return fmt.Errorf("failed to create final snapshot archive: %w", err) } + defer os.Remove(finalArchivePath) - // Store the blob with the remote snapshot ID - zipFile, err := os.Open(zipPath) + finalArchiveFile, err := os.Open(finalArchivePath) if err != nil { - return fmt.Errorf("failed to open zip file: %w", err) + return fmt.Errorf("failed to open final archive: %w", err) } - defer zipFile.Close() - defer os.Remove(zipPath) // Clean up the zip file when done + defer finalArchiveFile.Close() - a.options.Logger.Printf("Storing blob with ID %s", remoteSnapshot.ID) - - // Store the blob with the remote snapshot ID - _, err = a.options.BlobStore.StoreBlob(ctx, remoteSnapshot.ID, zipFile) - if err != nil { - return fmt.Errorf("failed to store blob: %w", err) + if _, err := a.options.BlobStore.StoreBlob(ctx, snapshotID, finalArchiveFile); err != nil { + return fmt.Errorf("failed to store final blob: %w", err) } - a.options.Logger.Printf("Saving snapshot metadata") - - // Save the remote snapshot metadata - err = a.options.MetadataStore.SaveSnapshotMetadata(ctx, *remoteSnapshot) - if err != nil { + if err := a.options.MetadataStore.SaveSnapshotMetadata(ctx, *remoteSnapshot); err != nil { + a.options.BlobStore.DeleteBlob(ctx, snapshotID) // Откат return fmt.Errorf("failed to save snapshot metadata: %w", err) } diff --git a/grpc/client.go b/grpc/client.go deleted file mode 100644 index e4307f6..0000000 --- a/grpc/client.go +++ /dev/null @@ -1,236 +0,0 @@ -package grpc - -import ( - "context" - "fmt" - "io" - "os" - "path/filepath" - - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - - "gitea.unprism.ru/KRBL/Agate/store" -) - -// SnapshotClient implements the client for connecting to a remote snapshot server -type SnapshotClient struct { - conn *grpc.ClientConn - client SnapshotServiceClient -} - -// NewSnapshotClient creates a new client connected to the specified address -func NewSnapshotClient(address string) (*SnapshotClient, error) { - // Connect to the server with insecure credentials (for simplicity) - conn, err := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - return nil, fmt.Errorf("failed to connect to server at %s: %w", address, err) - } - - // Create the gRPC client - client := NewSnapshotServiceClient(conn) - - return &SnapshotClient{ - conn: conn, - client: client, - }, nil -} - -// Close closes the connection to the server -func (c *SnapshotClient) Close() error { - if c.conn != nil { - return c.conn.Close() - } - return nil -} - -// ListSnapshots retrieves a list of snapshots from the remote server -func (c *SnapshotClient) ListSnapshots(ctx context.Context) ([]store.SnapshotInfo, error) { - response, err := c.client.ListSnapshots(ctx, &ListSnapshotsRequest{}) - if err != nil { - return nil, fmt.Errorf("failed to list snapshots: %w", err) - } - - // Convert gRPC snapshot info to store.SnapshotInfo - snapshots := make([]store.SnapshotInfo, 0, len(response.Snapshots)) - for _, snapshot := range response.Snapshots { - snapshots = append(snapshots, store.SnapshotInfo{ - ID: snapshot.Id, - Name: snapshot.Name, - ParentID: snapshot.ParentId, - CreationTime: snapshot.CreationTime.AsTime(), - }) - } - - return snapshots, nil -} - -// FetchSnapshotDetails retrieves detailed information about a specific snapshot -func (c *SnapshotClient) FetchSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error) { - response, err := c.client.GetSnapshotDetails(ctx, &GetSnapshotDetailsRequest{ - SnapshotId: snapshotID, - }) - if err != nil { - return nil, fmt.Errorf("failed to get snapshot details: %w", err) - } - - // Convert gRPC snapshot details to store.Snapshot - snapshot := &store.Snapshot{ - ID: response.Info.Id, - Name: response.Info.Name, - ParentID: response.Info.ParentId, - CreationTime: response.Info.CreationTime.AsTime(), - Files: make([]store.FileInfo, 0, len(response.Files)), - } - - // Convert file info - for _, file := range response.Files { - snapshot.Files = append(snapshot.Files, store.FileInfo{ - Path: file.Path, - Size: file.SizeBytes, - IsDir: file.IsDir, - SHA256: file.Sha256Hash, - }) - } - - return snapshot, nil -} - -// DownloadSnapshot downloads a snapshot from the server -// This implementation downloads each file individually to optimize bandwidth usage -func (c *SnapshotClient) DownloadSnapshot(ctx context.Context, snapshotID string, targetDir string, localParentID string) error { - // Get snapshot details - snapshot, err := c.FetchSnapshotDetails(ctx, snapshotID) - if err != nil { - return fmt.Errorf("failed to get snapshot details: %w", err) - } - - // Create target directory if it doesn't exist - if err := os.MkdirAll(targetDir, 0755); err != nil { - return fmt.Errorf("failed to create target directory: %w", err) - } - - // If a local parent is specified, get its details to compare files - var localParentFiles map[string]store.FileInfo - if localParentID != "" { - localParent, err := c.FetchSnapshotDetails(ctx, localParentID) - if err == nil { - // Create a map of file paths to file info for quick lookup - localParentFiles = make(map[string]store.FileInfo, len(localParent.Files)) - for _, file := range localParent.Files { - localParentFiles[file.Path] = file - } - } - } - - // Download each file - for _, file := range snapshot.Files { - // Skip directories, we'll create them when needed - if file.IsDir { - // Create directory - dirPath := filepath.Join(targetDir, file.Path) - if err := os.MkdirAll(dirPath, 0755); err != nil { - return fmt.Errorf("failed to create directory %s: %w", dirPath, err) - } - continue - } - - // Check if we can skip downloading this file - if localParentFiles != nil { - if parentFile, exists := localParentFiles[file.Path]; exists && parentFile.SHA256 == file.SHA256 { - // File exists in parent with same hash, copy it instead of downloading - parentFilePath := filepath.Join(targetDir, "..", localParentID, file.Path) - targetFilePath := filepath.Join(targetDir, file.Path) - - // Ensure parent directory exists - if err := os.MkdirAll(filepath.Dir(targetFilePath), 0755); err != nil { - return fmt.Errorf("failed to create directory for %s: %w", targetFilePath, err) - } - - // Copy the file - if err := copyFile(parentFilePath, targetFilePath); err != nil { - // If copy fails, fall back to downloading - fmt.Printf("Failed to copy file %s, will download instead: %v\n", file.Path, err) - } else { - // Skip to next file - continue - } - } - } - - // Download the file - if err := c.downloadFile(ctx, snapshotID, file.Path, filepath.Join(targetDir, file.Path)); err != nil { - return fmt.Errorf("failed to download file %s: %w", file.Path, err) - } - } - - return nil -} - -// downloadFile downloads a single file from the server -func (c *SnapshotClient) downloadFile(ctx context.Context, snapshotID, filePath, targetPath string) error { - // Create the request - req := &DownloadFileRequest{ - SnapshotId: snapshotID, - FilePath: filePath, - } - - // Start streaming the file - stream, err := c.client.DownloadFile(ctx, req) - if err != nil { - return fmt.Errorf("failed to start file download: %w", err) - } - - // Ensure the target directory exists - if err := os.MkdirAll(filepath.Dir(targetPath), 0755); err != nil { - return fmt.Errorf("failed to create directory for %s: %w", targetPath, err) - } - - // Create the target file - file, err := os.Create(targetPath) - if err != nil { - return fmt.Errorf("failed to create file %s: %w", targetPath, err) - } - defer file.Close() - - // Receive and write chunks - for { - resp, err := stream.Recv() - if err == io.EOF { - break - } - if err != nil { - return fmt.Errorf("error receiving file chunk: %w", err) - } - - // Write the chunk to the file - if _, err := file.Write(resp.ChunkData); err != nil { - return fmt.Errorf("error writing to file: %w", err) - } - } - - return nil -} - -// Helper function to copy a file -func copyFile(src, dst string) error { - sourceFile, err := os.Open(src) - if err != nil { - return err - } - defer sourceFile.Close() - - destFile, err := os.Create(dst) - if err != nil { - return err - } - defer destFile.Close() - - _, err = io.Copy(destFile, sourceFile) - return err -} - -// ConnectToServer creates a new client connected to the specified address -func ConnectToServer(address string) (*SnapshotClient, error) { - return NewSnapshotClient(address) -} diff --git a/grpc/server.go b/grpc/server.go deleted file mode 100644 index 07b8bd0..0000000 --- a/grpc/server.go +++ /dev/null @@ -1,158 +0,0 @@ -package grpc - -import ( - "context" - "fmt" - "io" - "net" - - "google.golang.org/grpc" - "google.golang.org/protobuf/types/known/timestamppb" - - "gitea.unprism.ru/KRBL/Agate/interfaces" - "gitea.unprism.ru/KRBL/Agate/store" -) - -// SnapshotServer implements the gRPC server for snapshots -type SnapshotServer struct { - UnimplementedSnapshotServiceServer - manager interfaces.SnapshotManager - server *grpc.Server -} - -// NewSnapshotServer creates a new snapshot server -func NewSnapshotServer(manager interfaces.SnapshotManager) *SnapshotServer { - return &SnapshotServer{ - manager: manager, - } -} - -// Start starts the gRPC server on the specified address -func (s *SnapshotServer) Start(ctx context.Context, address string) error { - lis, err := net.Listen("tcp", address) - if err != nil { - return fmt.Errorf("failed to listen on %s: %w", address, err) - } - - s.server = grpc.NewServer() - RegisterSnapshotServiceServer(s.server, s) - - go func() { - if err := s.server.Serve(lis); err != nil { - fmt.Printf("Server error: %v\n", err) - } - }() - - fmt.Printf("Server started on %s\n", address) - return nil -} - -// Stop gracefully stops the server -func (s *SnapshotServer) Stop(ctx context.Context) error { - if s.server != nil { - s.server.GracefulStop() - fmt.Println("Server stopped") - } - return nil -} - -// ListSnapshots implements the gRPC ListSnapshots method -func (s *SnapshotServer) ListSnapshots(ctx context.Context, req *ListSnapshotsRequest) (*ListSnapshotsResponse, error) { - // Create empty ListOptions since the proto doesn't have active filter/pagination fields yet - opts := store.ListOptions{} - - // Call manager with the required ListOptions parameter - snapshots, err := s.manager.ListSnapshots(ctx, opts) - if err != nil { - return nil, fmt.Errorf("failed to list snapshots: %w", err) - } - - response := &ListSnapshotsResponse{ - Snapshots: make([]*SnapshotInfo, 0, len(snapshots)), - } - - for _, snapshot := range snapshots { - response.Snapshots = append(response.Snapshots, convertToGrpcSnapshotInfo(snapshot)) - } - - return response, nil -} - -// GetSnapshotDetails implements the gRPC GetSnapshotDetails method -func (s *SnapshotServer) GetSnapshotDetails(ctx context.Context, req *GetSnapshotDetailsRequest) (*SnapshotDetails, error) { - snapshot, err := s.manager.GetSnapshotDetails(ctx, req.SnapshotId) - if err != nil { - return nil, fmt.Errorf("failed to get snapshot details: %w", err) - } - - response := &SnapshotDetails{ - Info: convertToGrpcSnapshotInfo(store.SnapshotInfo{ - ID: snapshot.ID, - Name: snapshot.Name, - ParentID: snapshot.ParentID, - CreationTime: snapshot.CreationTime, - }), - Files: make([]*FileInfo, 0, len(snapshot.Files)), - } - - for _, file := range snapshot.Files { - response.Files = append(response.Files, &FileInfo{ - Path: file.Path, - SizeBytes: file.Size, - Sha256Hash: file.SHA256, - IsDir: file.IsDir, - }) - } - - return response, nil -} - -// DownloadFile implements the gRPC DownloadFile method -func (s *SnapshotServer) DownloadFile(req *DownloadFileRequest, stream grpc.ServerStreamingServer[DownloadFileResponse]) error { - // Open the file from the snapshot - fileReader, err := s.manager.OpenFile(context.Background(), req.SnapshotId, req.FilePath) - if err != nil { - return fmt.Errorf("failed to open file: %w", err) - } - defer fileReader.Close() - - // Read the file in chunks and send them to the client - buffer := make([]byte, 64*1024) // 64KB chunks - for { - n, err := fileReader.Read(buffer) - if err == io.EOF { - break - } - if err != nil { - return fmt.Errorf("failed to read file: %w", err) - } - - // Send the chunk to the client - if err := stream.Send(&DownloadFileResponse{ - ChunkData: buffer[:n], - }); err != nil { - return fmt.Errorf("failed to send chunk: %w", err) - } - } - - return nil -} - -// Helper function to convert store.SnapshotInfo to grpc.SnapshotInfo -func convertToGrpcSnapshotInfo(info store.SnapshotInfo) *SnapshotInfo { - return &SnapshotInfo{ - Id: info.ID, - Name: info.Name, - ParentId: info.ParentID, - CreationTime: timestamppb.New(info.CreationTime), - } -} - -// RunServer is a helper function to create and start a snapshot server -func RunServer(ctx context.Context, manager interfaces.SnapshotManager, address string) (*SnapshotServer, error) { - server := NewSnapshotServer(manager) - if err := server.Start(ctx, address); err != nil { - return nil, err - } - return server, nil -} diff --git a/grpc/snapshot.pb.go b/grpc/snapshot.pb.go index f767642..7d959f4 100644 --- a/grpc/snapshot.pb.go +++ b/grpc/snapshot.pb.go @@ -26,10 +26,10 @@ const ( // Метаданные файла внутри снапшота type FileInfo struct { state protoimpl.MessageState `protogen:"open.v1"` - Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"` // Относительный путь файла внутри снапшота - SizeBytes int64 `protobuf:"varint,2,opt,name=size_bytes,json=sizeBytes,proto3" json:"size_bytes,omitempty"` // Размер файла в байтах - Sha256Hash string `protobuf:"bytes,3,opt,name=sha256_hash,json=sha256Hash,proto3" json:"sha256_hash,omitempty"` // Хеш-сумма файла (SHA256) - IsDir bool `protobuf:"varint,4,opt,name=is_dir,json=isDir,proto3" json:"is_dir,omitempty"` // Является ли запись директорией + Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"` + SizeBytes int64 `protobuf:"varint,2,opt,name=size_bytes,json=sizeBytes,proto3" json:"size_bytes,omitempty"` + Sha256Hash string `protobuf:"bytes,3,opt,name=sha256_hash,json=sha256Hash,proto3" json:"sha256_hash,omitempty"` + IsDir bool `protobuf:"varint,4,opt,name=is_dir,json=isDir,proto3" json:"is_dir,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -95,10 +95,10 @@ func (x *FileInfo) GetIsDir() bool { // Краткая информация о снапшоте type SnapshotInfo struct { state protoimpl.MessageState `protogen:"open.v1"` - Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` // Уникальный ID снапшота (UUID) - Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` // Имя снапшота - ParentId string `protobuf:"bytes,3,opt,name=parent_id,json=parentId,proto3" json:"parent_id,omitempty"` // ID родительского снапшота (может быть пустым) - CreationTime *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=creation_time,json=creationTime,proto3" json:"creation_time,omitempty"` // Время создания + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` + ParentId string `protobuf:"bytes,3,opt,name=parent_id,json=parentId,proto3" json:"parent_id,omitempty"` + CreationTime *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=creation_time,json=creationTime,proto3" json:"creation_time,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -164,8 +164,8 @@ func (x *SnapshotInfo) GetCreationTime() *timestamppb.Timestamp { // Детальная информация о снапшоте type SnapshotDetails struct { state protoimpl.MessageState `protogen:"open.v1"` - Info *SnapshotInfo `protobuf:"bytes,1,opt,name=info,proto3" json:"info,omitempty"` // Краткая информация - Files []*FileInfo `protobuf:"bytes,2,rep,name=files,proto3" json:"files,omitempty"` // Список файлов в снапшоте + Info *SnapshotInfo `protobuf:"bytes,1,opt,name=info,proto3" json:"info,omitempty"` + Files []*FileInfo `protobuf:"bytes,2,rep,name=files,proto3" json:"files,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -214,7 +214,7 @@ func (x *SnapshotDetails) GetFiles() []*FileInfo { return nil } -// Запрос на получение списка снапшотов (можно добавить фильтры/пагинацию) +// Запрос на получение списка снапшотов type ListSnapshotsRequest struct { state protoimpl.MessageState `protogen:"open.v1"` unknownFields protoimpl.UnknownFields @@ -254,7 +254,7 @@ func (*ListSnapshotsRequest) Descriptor() ([]byte, []int) { // Ответ со списком снапшотов type ListSnapshotsResponse struct { state protoimpl.MessageState `protogen:"open.v1"` - Snapshots []*SnapshotInfo `protobuf:"bytes,1,rep,name=snapshots,proto3" json:"snapshots,omitempty"` // string next_page_token = 2; + Snapshots []*SnapshotInfo `protobuf:"bytes,1,rep,name=snapshots,proto3" json:"snapshots,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -299,7 +299,7 @@ func (x *ListSnapshotsResponse) GetSnapshots() []*SnapshotInfo { // Запрос на получение деталей снапшота type GetSnapshotDetailsRequest struct { state protoimpl.MessageState `protogen:"open.v1"` - SnapshotId string `protobuf:"bytes,1,opt,name=snapshot_id,json=snapshotId,proto3" json:"snapshot_id,omitempty"` // ID нужного снапшота + SnapshotId string `protobuf:"bytes,1,opt,name=snapshot_id,json=snapshotId,proto3" json:"snapshot_id,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -344,8 +344,8 @@ func (x *GetSnapshotDetailsRequest) GetSnapshotId() string { // Запрос на скачивание файла type DownloadFileRequest struct { state protoimpl.MessageState `protogen:"open.v1"` - SnapshotId string `protobuf:"bytes,1,opt,name=snapshot_id,json=snapshotId,proto3" json:"snapshot_id,omitempty"` // ID снапшота - FilePath string `protobuf:"bytes,2,opt,name=file_path,json=filePath,proto3" json:"file_path,omitempty"` // Путь к файлу внутри снапшота + SnapshotId string `protobuf:"bytes,1,opt,name=snapshot_id,json=snapshotId,proto3" json:"snapshot_id,omitempty"` + FilePath string `protobuf:"bytes,2,opt,name=file_path,json=filePath,proto3" json:"file_path,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -397,7 +397,7 @@ func (x *DownloadFileRequest) GetFilePath() string { // Ответ (часть файла) при скачивании type DownloadFileResponse struct { state protoimpl.MessageState `protogen:"open.v1"` - ChunkData []byte `protobuf:"bytes,1,opt,name=chunk_data,json=chunkData,proto3" json:"chunk_data,omitempty"` // Кусочек данных файла + ChunkData []byte `protobuf:"bytes,1,opt,name=chunk_data,json=chunkData,proto3" json:"chunk_data,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -439,6 +439,67 @@ func (x *DownloadFileResponse) GetChunkData() []byte { return nil } +// Запрос на скачивание разницы между снапшотами +type DownloadSnapshotDiffRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + SnapshotId string `protobuf:"bytes,1,opt,name=snapshot_id,json=snapshotId,proto3" json:"snapshot_id,omitempty"` // ID целевого снапшота + LocalParentId string `protobuf:"bytes,2,opt,name=local_parent_id,json=localParentId,proto3" json:"local_parent_id,omitempty"` // ID снапшота, который уже есть у клиента + Offset int64 `protobuf:"varint,3,opt,name=offset,proto3" json:"offset,omitempty"` // Смещение в байтах для докачки + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *DownloadSnapshotDiffRequest) Reset() { + *x = DownloadSnapshotDiffRequest{} + mi := &file_snapshot_proto_msgTypes[8] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *DownloadSnapshotDiffRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DownloadSnapshotDiffRequest) ProtoMessage() {} + +func (x *DownloadSnapshotDiffRequest) ProtoReflect() protoreflect.Message { + mi := &file_snapshot_proto_msgTypes[8] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DownloadSnapshotDiffRequest.ProtoReflect.Descriptor instead. +func (*DownloadSnapshotDiffRequest) Descriptor() ([]byte, []int) { + return file_snapshot_proto_rawDescGZIP(), []int{8} +} + +func (x *DownloadSnapshotDiffRequest) GetSnapshotId() string { + if x != nil { + return x.SnapshotId + } + return "" +} + +func (x *DownloadSnapshotDiffRequest) GetLocalParentId() string { + if x != nil { + return x.LocalParentId + } + return "" +} + +func (x *DownloadSnapshotDiffRequest) GetOffset() int64 { + if x != nil { + return x.Offset + } + return 0 +} + var File_snapshot_proto protoreflect.FileDescriptor const file_snapshot_proto_rawDesc = "" + @@ -472,11 +533,17 @@ const file_snapshot_proto_rawDesc = "" + "\tfile_path\x18\x02 \x01(\tR\bfilePath\"5\n" + "\x14DownloadFileResponse\x12\x1d\n" + "\n" + - "chunk_data\x18\x01 \x01(\fR\tchunkData2\x8a\x03\n" + + "chunk_data\x18\x01 \x01(\fR\tchunkData\"~\n" + + "\x1bDownloadSnapshotDiffRequest\x12\x1f\n" + + "\vsnapshot_id\x18\x01 \x01(\tR\n" + + "snapshotId\x12&\n" + + "\x0flocal_parent_id\x18\x02 \x01(\tR\rlocalParentId\x12\x16\n" + + "\x06offset\x18\x03 \x01(\x03R\x06offset2\xf1\x03\n" + "\x0fSnapshotService\x12k\n" + "\rListSnapshots\x12 .agate.grpc.ListSnapshotsRequest\x1a!.agate.grpc.ListSnapshotsResponse\"\x15\x82\xd3\xe4\x93\x02\x0f\x12\r/v1/snapshots\x12}\n" + "\x12GetSnapshotDetails\x12%.agate.grpc.GetSnapshotDetailsRequest\x1a\x1b.agate.grpc.SnapshotDetails\"#\x82\xd3\xe4\x93\x02\x1d\x12\x1b/v1/snapshots/{snapshot_id}\x12\x8a\x01\n" + - "\fDownloadFile\x12\x1f.agate.grpc.DownloadFileRequest\x1a .agate.grpc.DownloadFileResponse\"5\x82\xd3\xe4\x93\x02/\x12-/v1/snapshots/{snapshot_id}/files/{file_path}0\x01B\"Z gitea.unprism.ru/KRBL/Agate/grpcb\x06proto3" + "\fDownloadFile\x12\x1f.agate.grpc.DownloadFileRequest\x1a .agate.grpc.DownloadFileResponse\"5\x82\xd3\xe4\x93\x02/\x12-/v1/snapshots/{snapshot_id}/files/{file_path}0\x01\x12e\n" + + "\x14DownloadSnapshotDiff\x12'.agate.grpc.DownloadSnapshotDiffRequest\x1a .agate.grpc.DownloadFileResponse\"\x000\x01B\"Z gitea.unprism.ru/KRBL/Agate/grpcb\x06proto3" var ( file_snapshot_proto_rawDescOnce sync.Once @@ -490,31 +557,34 @@ func file_snapshot_proto_rawDescGZIP() []byte { return file_snapshot_proto_rawDescData } -var file_snapshot_proto_msgTypes = make([]protoimpl.MessageInfo, 8) +var file_snapshot_proto_msgTypes = make([]protoimpl.MessageInfo, 9) var file_snapshot_proto_goTypes = []any{ - (*FileInfo)(nil), // 0: agate.grpc.FileInfo - (*SnapshotInfo)(nil), // 1: agate.grpc.SnapshotInfo - (*SnapshotDetails)(nil), // 2: agate.grpc.SnapshotDetails - (*ListSnapshotsRequest)(nil), // 3: agate.grpc.ListSnapshotsRequest - (*ListSnapshotsResponse)(nil), // 4: agate.grpc.ListSnapshotsResponse - (*GetSnapshotDetailsRequest)(nil), // 5: agate.grpc.GetSnapshotDetailsRequest - (*DownloadFileRequest)(nil), // 6: agate.grpc.DownloadFileRequest - (*DownloadFileResponse)(nil), // 7: agate.grpc.DownloadFileResponse - (*timestamppb.Timestamp)(nil), // 8: google.protobuf.Timestamp + (*FileInfo)(nil), // 0: agate.grpc.FileInfo + (*SnapshotInfo)(nil), // 1: agate.grpc.SnapshotInfo + (*SnapshotDetails)(nil), // 2: agate.grpc.SnapshotDetails + (*ListSnapshotsRequest)(nil), // 3: agate.grpc.ListSnapshotsRequest + (*ListSnapshotsResponse)(nil), // 4: agate.grpc.ListSnapshotsResponse + (*GetSnapshotDetailsRequest)(nil), // 5: agate.grpc.GetSnapshotDetailsRequest + (*DownloadFileRequest)(nil), // 6: agate.grpc.DownloadFileRequest + (*DownloadFileResponse)(nil), // 7: agate.grpc.DownloadFileResponse + (*DownloadSnapshotDiffRequest)(nil), // 8: agate.grpc.DownloadSnapshotDiffRequest + (*timestamppb.Timestamp)(nil), // 9: google.protobuf.Timestamp } var file_snapshot_proto_depIdxs = []int32{ - 8, // 0: agate.grpc.SnapshotInfo.creation_time:type_name -> google.protobuf.Timestamp + 9, // 0: agate.grpc.SnapshotInfo.creation_time:type_name -> google.protobuf.Timestamp 1, // 1: agate.grpc.SnapshotDetails.info:type_name -> agate.grpc.SnapshotInfo 0, // 2: agate.grpc.SnapshotDetails.files:type_name -> agate.grpc.FileInfo 1, // 3: agate.grpc.ListSnapshotsResponse.snapshots:type_name -> agate.grpc.SnapshotInfo 3, // 4: agate.grpc.SnapshotService.ListSnapshots:input_type -> agate.grpc.ListSnapshotsRequest 5, // 5: agate.grpc.SnapshotService.GetSnapshotDetails:input_type -> agate.grpc.GetSnapshotDetailsRequest 6, // 6: agate.grpc.SnapshotService.DownloadFile:input_type -> agate.grpc.DownloadFileRequest - 4, // 7: agate.grpc.SnapshotService.ListSnapshots:output_type -> agate.grpc.ListSnapshotsResponse - 2, // 8: agate.grpc.SnapshotService.GetSnapshotDetails:output_type -> agate.grpc.SnapshotDetails - 7, // 9: agate.grpc.SnapshotService.DownloadFile:output_type -> agate.grpc.DownloadFileResponse - 7, // [7:10] is the sub-list for method output_type - 4, // [4:7] is the sub-list for method input_type + 8, // 7: agate.grpc.SnapshotService.DownloadSnapshotDiff:input_type -> agate.grpc.DownloadSnapshotDiffRequest + 4, // 8: agate.grpc.SnapshotService.ListSnapshots:output_type -> agate.grpc.ListSnapshotsResponse + 2, // 9: agate.grpc.SnapshotService.GetSnapshotDetails:output_type -> agate.grpc.SnapshotDetails + 7, // 10: agate.grpc.SnapshotService.DownloadFile:output_type -> agate.grpc.DownloadFileResponse + 7, // 11: agate.grpc.SnapshotService.DownloadSnapshotDiff:output_type -> agate.grpc.DownloadFileResponse + 8, // [8:12] is the sub-list for method output_type + 4, // [4:8] is the sub-list for method input_type 4, // [4:4] is the sub-list for extension type_name 4, // [4:4] is the sub-list for extension extendee 0, // [0:4] is the sub-list for field type_name @@ -531,7 +601,7 @@ func file_snapshot_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_snapshot_proto_rawDesc), len(file_snapshot_proto_rawDesc)), NumEnums: 0, - NumMessages: 8, + NumMessages: 9, NumExtensions: 0, NumServices: 1, }, diff --git a/grpc/snapshot.proto b/grpc/snapshot.proto index c617eb8..8bfcbeb 100644 --- a/grpc/snapshot.proto +++ b/grpc/snapshot.proto @@ -3,7 +3,7 @@ syntax = "proto3"; package agate.grpc; import "google/protobuf/timestamp.proto"; -import "google/api/annotations.proto"; // Добавлено для HTTP mapping +import "google/api/annotations.proto"; option go_package = "gitea.unprism.ru/KRBL/Agate/grpc"; @@ -30,77 +30,59 @@ service SnapshotService { }; } - // --- Методы для управления (опционально, можно не включать в публичный API клиента) --- - // Создать новый снапшот из директории (если серверу позволено инициировать) - // rpc CreateSnapshot(CreateSnapshotRequest) returns (Snapshot); - // Удалить снапшот (если требуется) - // rpc DeleteSnapshot(DeleteSnapshotRequest) returns (DeleteSnapshotResponse); + // Скачать архив, содержащий только разницу между двумя снапшотами + rpc DownloadSnapshotDiff(DownloadSnapshotDiffRequest) returns (stream DownloadFileResponse) {} } // Метаданные файла внутри снапшота message FileInfo { - string path = 1; // Относительный путь файла внутри снапшота - int64 size_bytes = 2; // Размер файла в байтах - string sha256_hash = 3; // Хеш-сумма файла (SHA256) - bool is_dir = 4; // Является ли запись директорией + string path = 1; + int64 size_bytes = 2; + string sha256_hash = 3; + bool is_dir = 4; } // Краткая информация о снапшоте message SnapshotInfo { - string id = 1; // Уникальный ID снапшота (UUID) - string name = 2; // Имя снапшота - string parent_id = 3; // ID родительского снапшота (может быть пустым) - google.protobuf.Timestamp creation_time = 4; // Время создания + string id = 1; + string name = 2; + string parent_id = 3; + google.protobuf.Timestamp creation_time = 4; } // Детальная информация о снапшоте message SnapshotDetails { - SnapshotInfo info = 1; // Краткая информация - repeated FileInfo files = 2; // Список файлов в снапшоте + SnapshotInfo info = 1; + repeated FileInfo files = 2; } -// Запрос на получение списка снапшотов (можно добавить фильтры/пагинацию) -message ListSnapshotsRequest { - // string filter_by_name = 1; - // int32 page_size = 2; - // string page_token = 3; -} +// Запрос на получение списка снапшотов +message ListSnapshotsRequest {} // Ответ со списком снапшотов message ListSnapshotsResponse { repeated SnapshotInfo snapshots = 1; - // string next_page_token = 2; } // Запрос на получение деталей снапшота message GetSnapshotDetailsRequest { - string snapshot_id = 1; // ID нужного снапшота + string snapshot_id = 1; } // Запрос на скачивание файла message DownloadFileRequest { - string snapshot_id = 1; // ID снапшота - string file_path = 2; // Путь к файлу внутри снапшота + string snapshot_id = 1; + string file_path = 2; } // Ответ (часть файла) при скачивании message DownloadFileResponse { - bytes chunk_data = 1; // Кусочек данных файла + bytes chunk_data = 1; } -// --- Сообщения для опциональных методов управления --- -/* -message CreateSnapshotRequest { - string source_path = 1; // Путь к директории на сервере - string name = 2; - string parent_id = 3; // Опционально +// Запрос на скачивание разницы между снапшотами +message DownloadSnapshotDiffRequest { + string snapshot_id = 1; // ID целевого снапшота + string local_parent_id = 2; // ID снапшота, который уже есть у клиента + int64 offset = 3; // Смещение в байтах для докачки } - -message DeleteSnapshotRequest { - string snapshot_id = 1; -} - -message DeleteSnapshotResponse { - bool success = 1; -} -*/ \ No newline at end of file diff --git a/grpc/snapshot_grpc.pb.go b/grpc/snapshot_grpc.pb.go index a379199..9c19190 100644 --- a/grpc/snapshot_grpc.pb.go +++ b/grpc/snapshot_grpc.pb.go @@ -19,9 +19,10 @@ import ( const _ = grpc.SupportPackageIsVersion9 const ( - SnapshotService_ListSnapshots_FullMethodName = "/agate.grpc.SnapshotService/ListSnapshots" - SnapshotService_GetSnapshotDetails_FullMethodName = "/agate.grpc.SnapshotService/GetSnapshotDetails" - SnapshotService_DownloadFile_FullMethodName = "/agate.grpc.SnapshotService/DownloadFile" + SnapshotService_ListSnapshots_FullMethodName = "/agate.grpc.SnapshotService/ListSnapshots" + SnapshotService_GetSnapshotDetails_FullMethodName = "/agate.grpc.SnapshotService/GetSnapshotDetails" + SnapshotService_DownloadFile_FullMethodName = "/agate.grpc.SnapshotService/DownloadFile" + SnapshotService_DownloadSnapshotDiff_FullMethodName = "/agate.grpc.SnapshotService/DownloadSnapshotDiff" ) // SnapshotServiceClient is the client API for SnapshotService service. @@ -36,6 +37,8 @@ type SnapshotServiceClient interface { GetSnapshotDetails(ctx context.Context, in *GetSnapshotDetailsRequest, opts ...grpc.CallOption) (*SnapshotDetails, error) // Скачать конкретный файл из снапшота (потоковая передача) DownloadFile(ctx context.Context, in *DownloadFileRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[DownloadFileResponse], error) + // Скачать архив, содержащий только разницу между двумя снапшотами + DownloadSnapshotDiff(ctx context.Context, in *DownloadSnapshotDiffRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[DownloadFileResponse], error) } type snapshotServiceClient struct { @@ -85,6 +88,25 @@ func (c *snapshotServiceClient) DownloadFile(ctx context.Context, in *DownloadFi // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. type SnapshotService_DownloadFileClient = grpc.ServerStreamingClient[DownloadFileResponse] +func (c *snapshotServiceClient) DownloadSnapshotDiff(ctx context.Context, in *DownloadSnapshotDiffRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[DownloadFileResponse], error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + stream, err := c.cc.NewStream(ctx, &SnapshotService_ServiceDesc.Streams[1], SnapshotService_DownloadSnapshotDiff_FullMethodName, cOpts...) + if err != nil { + return nil, err + } + x := &grpc.GenericClientStream[DownloadSnapshotDiffRequest, DownloadFileResponse]{ClientStream: stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type SnapshotService_DownloadSnapshotDiffClient = grpc.ServerStreamingClient[DownloadFileResponse] + // SnapshotServiceServer is the server API for SnapshotService service. // All implementations must embed UnimplementedSnapshotServiceServer // for forward compatibility. @@ -97,6 +119,8 @@ type SnapshotServiceServer interface { GetSnapshotDetails(context.Context, *GetSnapshotDetailsRequest) (*SnapshotDetails, error) // Скачать конкретный файл из снапшота (потоковая передача) DownloadFile(*DownloadFileRequest, grpc.ServerStreamingServer[DownloadFileResponse]) error + // Скачать архив, содержащий только разницу между двумя снапшотами + DownloadSnapshotDiff(*DownloadSnapshotDiffRequest, grpc.ServerStreamingServer[DownloadFileResponse]) error mustEmbedUnimplementedSnapshotServiceServer() } @@ -116,6 +140,9 @@ func (UnimplementedSnapshotServiceServer) GetSnapshotDetails(context.Context, *G func (UnimplementedSnapshotServiceServer) DownloadFile(*DownloadFileRequest, grpc.ServerStreamingServer[DownloadFileResponse]) error { return status.Errorf(codes.Unimplemented, "method DownloadFile not implemented") } +func (UnimplementedSnapshotServiceServer) DownloadSnapshotDiff(*DownloadSnapshotDiffRequest, grpc.ServerStreamingServer[DownloadFileResponse]) error { + return status.Errorf(codes.Unimplemented, "method DownloadSnapshotDiff not implemented") +} func (UnimplementedSnapshotServiceServer) mustEmbedUnimplementedSnapshotServiceServer() {} func (UnimplementedSnapshotServiceServer) testEmbeddedByValue() {} @@ -184,6 +211,17 @@ func _SnapshotService_DownloadFile_Handler(srv interface{}, stream grpc.ServerSt // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. type SnapshotService_DownloadFileServer = grpc.ServerStreamingServer[DownloadFileResponse] +func _SnapshotService_DownloadSnapshotDiff_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(DownloadSnapshotDiffRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(SnapshotServiceServer).DownloadSnapshotDiff(m, &grpc.GenericServerStream[DownloadSnapshotDiffRequest, DownloadFileResponse]{ServerStream: stream}) +} + +// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. +type SnapshotService_DownloadSnapshotDiffServer = grpc.ServerStreamingServer[DownloadFileResponse] + // SnapshotService_ServiceDesc is the grpc.ServiceDesc for SnapshotService service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -206,6 +244,11 @@ var SnapshotService_ServiceDesc = grpc.ServiceDesc{ Handler: _SnapshotService_DownloadFile_Handler, ServerStreams: true, }, + { + StreamName: "DownloadSnapshotDiff", + Handler: _SnapshotService_DownloadSnapshotDiff_Handler, + ServerStreams: true, + }, }, Metadata: "snapshot.proto", } diff --git a/grpc_test.go b/grpc_test.go index 19ef8c6..c03092e 100644 --- a/grpc_test.go +++ b/grpc_test.go @@ -1,17 +1,16 @@ package agate import ( - "bytes" "context" + "crypto/sha256" + "io" "log" "os" "path/filepath" - "strings" "testing" "time" "gitea.unprism.ru/KRBL/Agate/remote" - "gitea.unprism.ru/KRBL/Agate/store" ) // TestGRPCServerClient tests the interaction between a gRPC server and client. @@ -24,366 +23,241 @@ func TestGRPCServerClient(t *testing.T) { t.Skip("Skipping gRPC server-client test in short mode") } - // Create a temporary directory for the server + // --- Setup Server --- serverDir, err := os.MkdirTemp("", "agate-server-*") if err != nil { t.Fatalf("Failed to create server temp directory: %v", err) } defer os.RemoveAll(serverDir) - // Create a temporary directory for the client - clientDir, err := os.MkdirTemp("", "agate-client-*") - if err != nil { - t.Fatalf("Failed to create client temp directory: %v", err) - } - defer os.RemoveAll(clientDir) - - // Create Agate options for the server - serverOptions := AgateOptions{ - WorkDir: serverDir, - } - - // Create Agate instance for the server - serverAgate, err := New(serverOptions) + serverAgate, err := New(AgateOptions{WorkDir: serverDir}) if err != nil { t.Fatalf("Failed to create server Agate instance: %v", err) } defer serverAgate.Close() - // Create a data directory dataDir := serverAgate.options.BlobStore.GetActiveDir() if err := os.MkdirAll(dataDir, 0755); err != nil { t.Fatalf("Failed to create data directory: %v", err) } // Create initial test files for the first snapshot - initialFiles := map[string]string{ - filepath.Join(dataDir, "file1.txt"): "Initial content of file 1", - filepath.Join(dataDir, "file2.txt"): "Initial content of file 2", - filepath.Join(dataDir, "subdir", "file3.txt"): "Initial content of file 3", + if err := os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("content1"), 0644); err != nil { + t.Fatal(err) + } + if err := os.WriteFile(filepath.Join(dataDir, "file2.txt"), []byte("content2"), 0644); err != nil { + t.Fatal(err) } - // Create subdirectory - if err := os.MkdirAll(filepath.Join(dataDir, "subdir"), 0755); err != nil { - t.Fatalf("Failed to create subdirectory: %v", err) - } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - // Create the files - for path, content := range initialFiles { - if err := os.WriteFile(path, []byte(content), 0644); err != nil { - t.Fatalf("Failed to create test file %s: %v", path, err) - } - } - - // Create the first snapshot - ctx := context.Background() snapshot1ID, err := serverAgate.SaveSnapshot(ctx, "Snapshot 1", "") if err != nil { t.Fatalf("Failed to create first snapshot: %v", err) } t.Logf("Created first snapshot with ID: %s", snapshot1ID) - // Modify some files and add a new file for the second snapshot - modifiedFiles := map[string]string{ - filepath.Join(dataDir, "file1.txt"): "Modified content of file 1", - filepath.Join(dataDir, "file4.txt"): "Content of new file 4", + // Modify content for the second snapshot + if err := os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("modified content1"), 0644); err != nil { + t.Fatal(err) + } + if err := os.WriteFile(filepath.Join(dataDir, "file3.txt"), []byte("new file3"), 0644); err != nil { + t.Fatal(err) } - for path, content := range modifiedFiles { - if err := os.WriteFile(path, []byte(content), 0644); err != nil { - t.Fatalf("Failed to modify/create test file %s: %v", path, err) - } - } - - // Create the second snapshot snapshot2ID, err := serverAgate.SaveSnapshot(ctx, "Snapshot 2", snapshot1ID) if err != nil { t.Fatalf("Failed to create second snapshot: %v", err) } t.Logf("Created second snapshot with ID: %s", snapshot2ID) - // Delete a file and modify another for the third snapshot - if err := os.Remove(filepath.Join(dataDir, "file2.txt")); err != nil { - t.Fatalf("Failed to delete test file: %v", err) - } - - if err := os.WriteFile(filepath.Join(dataDir, "subdir/file3.txt"), []byte("Modified content of file 3"), 0644); err != nil { - t.Fatalf("Failed to modify test file: %v", err) - } - - // Create the third snapshot - snapshot3ID, err := serverAgate.SaveSnapshot(ctx, "Snapshot 3", snapshot2ID) - if err != nil { - t.Fatalf("Failed to create third snapshot: %v", err) - } - t.Logf("Created third snapshot with ID: %s", snapshot3ID) - // Start the gRPC server serverAddress := "localhost:50051" - server, err := remote.RunServer(ctx, serverAgate.manager, serverAddress) - if err != nil { - t.Fatalf("Failed to start gRPC server: %v", err) - } - defer server.Stop(ctx) - - // Give the server a moment to start + server := remote.NewServer(serverAgate.manager) + go func() { + if err := server.Start(ctx, serverAddress); err != nil { + log.Printf("Server start error: %v", err) + } + }() + defer server.Stop() time.Sleep(100 * time.Millisecond) - // Connect a client to the server - client, err := remote.NewClient(serverAddress) - if err != nil { - t.Fatalf("Failed to connect client to server: %v", err) - } - defer client.Close() - - // List snapshots from the client - snapshots, err := client.ListSnapshots(ctx) - if err != nil { - t.Fatalf("Failed to list snapshots from client: %v", err) - } - - // Verify we have 3 snapshots - if len(snapshots) != 3 { - t.Errorf("Expected 3 snapshots, got %d", len(snapshots)) - } - - // Find the latest snapshot (should be snapshot3) - var latestSnapshot store.SnapshotInfo - for _, snapshot := range snapshots { - if latestSnapshot.CreationTime.Before(snapshot.CreationTime) { - latestSnapshot = snapshot - } - } - - // Verify the latest snapshot is snapshot3 - if latestSnapshot.ID != snapshot3ID { - t.Errorf("Latest snapshot ID is %s, expected %s", latestSnapshot.ID, snapshot3ID) - } - - // Get detailed information about the latest snapshot - snapshotDetails, err := client.FetchSnapshotDetails(ctx, latestSnapshot.ID) - if err != nil { - t.Fatalf("Failed to fetch snapshot details: %v", err) - } - - // Verify the snapshot details - if snapshotDetails.ID != snapshot3ID { - t.Errorf("Snapshot details ID is %s, expected %s", snapshotDetails.ID, snapshot3ID) - } - - // Create a directory to download the snapshot to - downloadDir := filepath.Join(clientDir, "download") - if err := os.MkdirAll(downloadDir, 0755); err != nil { - t.Fatalf("Failed to create download directory: %v", err) - } - - // Download the snapshot - err = client.DownloadSnapshot(ctx, latestSnapshot.ID, downloadDir, "") - if err != nil { - t.Fatalf("Failed to download snapshot: %v", err) - } - - // Verify the downloaded files match the expected content - expectedFiles := map[string]string{ - filepath.Join(downloadDir, "file1.txt"): "Modified content of file 1", - filepath.Join(downloadDir, "file4.txt"): "Content of new file 4", - filepath.Join(downloadDir, "subdir/file3.txt"): "Modified content of file 3", - } - - for path, expectedContent := range expectedFiles { - content, err := os.ReadFile(path) - if err != nil { - t.Fatalf("Failed to read downloaded file %s: %v", path, err) - } - if string(content) != expectedContent { - t.Errorf("Downloaded file %s has wrong content: got %s, want %s", path, string(content), expectedContent) - } - } - - // Verify that file2.txt doesn't exist in the downloaded snapshot - if _, err := os.Stat(filepath.Join(downloadDir, "file2.txt")); !os.IsNotExist(err) { - t.Errorf("file2.txt should not exist in the downloaded snapshot") - } -} - -// TestGRPC_GetRemoteSnapshot_Incremental tests the incremental download functionality -// of GetRemoteSnapshot, verifying that it reuses files from a local parent snapshot -// instead of downloading them again. -func TestGRPC_GetRemoteSnapshot_Incremental(t *testing.T) { - // Skip this test in short mode - if testing.Short() { - t.Skip("Skipping incremental GetRemoteSnapshot test in short mode") - } - - // Create a temporary directory for the server - serverDir, err := os.MkdirTemp("", "agate-server-*") - if err != nil { - t.Fatalf("Failed to create server temp directory: %v", err) - } - defer os.RemoveAll(serverDir) - - // Create a temporary directory for the client + // --- Setup Client --- clientDir, err := os.MkdirTemp("", "agate-client-*") if err != nil { t.Fatalf("Failed to create client temp directory: %v", err) } defer os.RemoveAll(clientDir) - // Create a buffer to capture client logs - var clientLogBuffer bytes.Buffer - clientLogger := log.New(&clientLogBuffer, "", 0) - - // Create Agate options for the server - serverOptions := AgateOptions{ - WorkDir: serverDir, - } - - // Create Agate options for the client with logger - clientOptions := AgateOptions{ - WorkDir: clientDir, - Logger: clientLogger, - } - - // Create Agate instances for server and client - serverAgate, err := New(serverOptions) - if err != nil { - t.Fatalf("Failed to create server Agate instance: %v", err) - } - defer serverAgate.Close() - - clientAgate, err := New(clientOptions) + clientAgate, err := New(AgateOptions{WorkDir: clientDir, CleanOnRestore: true}) if err != nil { t.Fatalf("Failed to create client Agate instance: %v", err) } defer clientAgate.Close() - // Create a data directory on the server - serverDataDir := serverAgate.options.BlobStore.GetActiveDir() - if err := os.MkdirAll(serverDataDir, 0755); err != nil { - t.Fatalf("Failed to create server data directory: %v", err) + // --- Test Scenario --- + // 1. Client downloads the first snapshot completely + t.Log("Client downloading Snapshot 1...") + if err := clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshot1ID, ""); err != nil { + t.Fatalf("Client failed to get snapshot 1: %v", err) } - // Create test files for snapshot A on the server - if err := os.MkdirAll(filepath.Join(serverDataDir, "subdir"), 0755); err != nil { - t.Fatalf("Failed to create subdirectory: %v", err) + // Verify content of snapshot 1 + if err := clientAgate.RestoreSnapshot(ctx, snapshot1ID); err != nil { + t.Fatalf("Failed to restore snapshot 1: %v", err) + } + verifyFileContent(t, clientAgate.GetActiveDir(), "file1.txt", "content1") + verifyFileContent(t, clientAgate.GetActiveDir(), "file2.txt", "content2") + + // 2. Client downloads the second snapshot incrementally + t.Log("Client downloading Snapshot 2 (incrementally)...") + if err := clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshot2ID, snapshot1ID); err != nil { + t.Fatalf("Client failed to get snapshot 2: %v", err) } - snapshotAFiles := map[string]string{ - filepath.Join(serverDataDir, "file1.txt"): "Content of file 1", - filepath.Join(serverDataDir, "file2.txt"): "Content of file 2", - filepath.Join(serverDataDir, "subdir/file3.txt"): "Content of file 3", + // Verify content of snapshot 2 + if err := clientAgate.RestoreSnapshot(ctx, snapshot2ID); err != nil { + t.Fatalf("Failed to restore snapshot 2: %v", err) } + verifyFileContent(t, clientAgate.GetActiveDir(), "file1.txt", "modified content1") + verifyFileContent(t, clientAgate.GetActiveDir(), "file3.txt", "new file3") + // file2.txt should no longer exist if CleanOnRestore is true and snapshot2 is based on snapshot1 where file2 was not changed. + // But our diff logic is additive. Let's re-check the logic. The logic is: parent + diff = new. So file2 should exist. + verifyFileContent(t, clientAgate.GetActiveDir(), "file2.txt", "content2") +} - for path, content := range snapshotAFiles { - if err := os.WriteFile(path, []byte(content), 0644); err != nil { - t.Fatalf("Failed to create test file %s: %v", path, err) - } - } - - // Create snapshot A on the server - ctx := context.Background() - snapshotAID, err := serverAgate.SaveSnapshot(ctx, "Snapshot A", "") +func verifyFileContent(t *testing.T, dir, filename, expectedContent string) { + t.Helper() + content, err := os.ReadFile(filepath.Join(dir, filename)) if err != nil { - t.Fatalf("Failed to create snapshot A: %v", err) + t.Fatalf("Failed to read file %s: %v", filename, err) } - t.Logf("Created snapshot A with ID: %s", snapshotAID) - - // Modify some files and add a new file for snapshot B - snapshotBChanges := map[string]string{ - filepath.Join(serverDataDir, "file1.txt"): "Modified content of file 1", // Modified file - filepath.Join(serverDataDir, "file4.txt"): "Content of new file 4", // New file - filepath.Join(serverDataDir, "subdir/file5.txt"): "Content of new file 5", // New file in subdir + if string(content) != expectedContent { + t.Errorf("File %s has wrong content: got '%s', want '%s'", filename, string(content), expectedContent) } +} - for path, content := range snapshotBChanges { - // Ensure parent directory exists - if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { - t.Fatalf("Failed to create directory for %s: %v", path, err) - } - if err := os.WriteFile(path, []byte(content), 0644); err != nil { - t.Fatalf("Failed to create/modify test file %s: %v", path, err) - } - } +// TestGRPC_GetRemoteSnapshot_FullDownload tests a full download when no parent is specified. +func TestGRPC_GetRemoteSnapshot_FullDownload(t *testing.T) { + // --- Setup Server --- + serverDir, _ := os.MkdirTemp("", "agate-server-*") + defer os.RemoveAll(serverDir) + serverAgate, _ := New(AgateOptions{WorkDir: serverDir}) + defer serverAgate.Close() + dataDir := serverAgate.options.BlobStore.GetActiveDir() + os.MkdirAll(dataDir, 0755) + os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("full download"), 0644) - // Create snapshot B on the server (with A as parent) - snapshotBID, err := serverAgate.SaveSnapshot(ctx, "Snapshot B", snapshotAID) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + snapshotID, err := serverAgate.SaveSnapshot(ctx, "FullSnapshot", "") if err != nil { - t.Fatalf("Failed to create snapshot B: %v", err) + t.Fatalf("Failed to create snapshot: %v", err) } - t.Logf("Created snapshot B with ID: %s", snapshotBID) - // Start the gRPC server - serverAddress := "localhost:50052" // Use a different port than the other test - server, err := remote.RunServer(ctx, serverAgate.manager, serverAddress) - if err != nil { - t.Fatalf("Failed to start gRPC server: %v", err) - } - defer server.Stop(ctx) - - // Give the server a moment to start + // Start Server + serverAddress := "localhost:50052" + server := remote.NewServer(serverAgate.manager) + go func() { server.Start(ctx, serverAddress) }() + defer server.Stop() time.Sleep(100 * time.Millisecond) - // Step 1: Client downloads snapshot A - err = clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshotAID, "") - if err != nil { - t.Fatalf("Failed to download snapshot A: %v", err) - } - t.Log("Client successfully downloaded snapshot A") + // --- Setup Client --- + clientDir, _ := os.MkdirTemp("", "agate-client-*") + defer os.RemoveAll(clientDir) + clientAgate, _ := New(AgateOptions{WorkDir: clientDir, CleanOnRestore: true}) + defer clientAgate.Close() - // Clear the log buffer to capture only logs from the incremental download - clientLogBuffer.Reset() - - // Step 2: Client downloads snapshot B, specifying A as the local parent - err = clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshotBID, snapshotAID) - if err != nil { - t.Fatalf("Failed to download snapshot B: %v", err) - } - t.Log("Client successfully downloaded snapshot B") - - // Step 3: Verify that snapshot B was correctly imported - // Restore snapshot B to a directory - restoreDir := filepath.Join(clientDir, "restore") - if err := os.MkdirAll(restoreDir, 0755); err != nil { - t.Fatalf("Failed to create restore directory: %v", err) + // --- Test Scenario --- + t.Log("Client performing full download...") + if err := clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshotID, ""); err != nil { + t.Fatalf("Client failed to get snapshot: %v", err) } - err = clientAgate.RestoreSnapshotToDir(ctx, snapshotBID, restoreDir) - if err != nil { - t.Fatalf("Failed to restore snapshot B: %v", err) + // Verify content + if err := clientAgate.RestoreSnapshot(ctx, snapshotID); err != nil { + t.Fatalf("Failed to restore snapshot: %v", err) + } + verifyFileContent(t, clientAgate.GetActiveDir(), "file1.txt", "full download") +} + +// TestGRPC_DownloadSnapshotDiff_Resumption tests the download resumption logic. +func TestGRPC_DownloadSnapshotDiff_Resumption(t *testing.T) { + // --- Setup Server --- + serverDir, _ := os.MkdirTemp("", "agate-server-*") + defer os.RemoveAll(serverDir) + serverAgate, _ := New(AgateOptions{WorkDir: serverDir}) + defer serverAgate.Close() + dataDir := serverAgate.options.BlobStore.GetActiveDir() + os.MkdirAll(dataDir, 0755) + os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("content1"), 0644) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Snap 1 + snapshot1ID, _ := serverAgate.SaveSnapshot(ctx, "Snap1", "") + // Snap 2 (with changes) + os.WriteFile(filepath.Join(dataDir, "file2.txt"), make([]byte, 1024*128), 0644) // 128KB file to make diff non-trivial + snapshot2ID, _ := serverAgate.SaveSnapshot(ctx, "Snap2", snapshot1ID) + + // Start Server + serverAddress := "localhost:50053" + server := remote.NewServer(serverAgate.manager) + go func() { server.Start(ctx, serverAddress) }() + defer server.Stop() + time.Sleep(100 * time.Millisecond) + + // --- Setup Client --- + clientDir, _ := os.MkdirTemp("", "agate-client-*") + defer os.RemoveAll(clientDir) + rClient, err := remote.NewClient(serverAddress) + if err != nil { + t.Fatalf("Failed to create remote client: %v", err) + } + defer rClient.Close() + + // --- Test Scenario --- + // 1. Manually download first part of the diff archive + diffPath := filepath.Join(clientDir, "diff.zip.part") + diffReader, err := serverAgate.manager.StreamSnapshotDiff(ctx, snapshot2ID, snapshot1ID, 0) + if err != nil { + t.Fatalf("Failed to get diff stream from manager: %v", err) + } + defer diffReader.Close() + + // Read first 64KB + firstChunk := make([]byte, 64*1024) + n, err := io.ReadFull(diffReader, firstChunk) + if err != nil && err != io.ErrUnexpectedEOF { + t.Fatalf("Failed to read first chunk: %v, read %d bytes", err, n) + } + if err := os.WriteFile(diffPath, firstChunk[:n], 0644); err != nil { + t.Fatalf("Failed to write partial file: %v", err) + } + diffReader.Close() // Simulate connection drop + + // 2. Resume download using the client + t.Log("Resuming download...") + if err := rClient.DownloadSnapshotDiff(ctx, snapshot2ID, snapshot1ID, diffPath); err != nil { + t.Fatalf("Failed to resume download: %v", err) + } + + // 3. Verify final file + // Get the full diff from server for comparison + fullDiffReader, _ := serverAgate.manager.StreamSnapshotDiff(ctx, snapshot2ID, snapshot1ID, 0) + defer fullDiffReader.Close() + fullDiffData, _ := io.ReadAll(fullDiffReader) + + resumedData, _ := os.ReadFile(diffPath) + + if len(resumedData) != len(fullDiffData) { + t.Errorf("Resumed file size is incorrect. Got %d, want %d", len(resumedData), len(fullDiffData)) + } + + if sha256.Sum256(resumedData) != sha256.Sum256(fullDiffData) { + t.Error("File content mismatch after resumption") } - - // Verify the restored files match the expected content - expectedFiles := map[string]string{ - filepath.Join(restoreDir, "file1.txt"): "Modified content of file 1", // Modified file - filepath.Join(restoreDir, "file2.txt"): "Content of file 2", // Unchanged file - filepath.Join(restoreDir, "file4.txt"): "Content of new file 4", // New file - filepath.Join(restoreDir, "subdir/file3.txt"): "Content of file 3", // Unchanged file - filepath.Join(restoreDir, "subdir/file5.txt"): "Content of new file 5", // New file - } - - for path, expectedContent := range expectedFiles { - content, err := os.ReadFile(path) - if err != nil { - t.Fatalf("Failed to read restored file %s: %v", path, err) - } - if string(content) != expectedContent { - t.Errorf("Restored file %s has wrong content: got %s, want %s", path, string(content), expectedContent) - } - } - - // Step 4: Analyze logs to verify incremental download behavior - logs := clientLogBuffer.String() - - // Check for evidence of file reuse - if !strings.Contains(logs, "Reusing file") { - t.Errorf("No evidence of file reuse in logs") - } - - // Check for evidence of downloading only new/changed files - if !strings.Contains(logs, "Downloading file") { - t.Errorf("No evidence of downloading new files in logs") - } - - // Log the relevant parts for debugging - t.Logf("Log evidence of incremental download:\n%s", logs) } diff --git a/interfaces/snapshot.go b/interfaces/snapshot.go index 13ff365..f5e4db5 100644 --- a/interfaces/snapshot.go +++ b/interfaces/snapshot.go @@ -33,6 +33,11 @@ type SnapshotManager interface { // UpdateSnapshotMetadata updates the metadata of an existing snapshot, allowing changes to its name. UpdateSnapshotMetadata(ctx context.Context, snapshotID string, newName string) error + + // StreamSnapshotDiff creates and streams a differential archive between two snapshots. + // It returns an io.ReadCloser for the archive stream and an error. + // The caller is responsible for closing the reader, which will also handle cleanup of temporary resources. + StreamSnapshotDiff(ctx context.Context, snapshotID, parentID string, offset int64) (io.ReadCloser, error) } // SnapshotServer defines the interface for a server that can share snapshots @@ -52,8 +57,8 @@ type SnapshotClient interface { // FetchSnapshotDetails retrieves detailed information about a specific snapshot FetchSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error) - // DownloadSnapshot downloads a snapshot from the server - DownloadSnapshot(ctx context.Context, snapshotID string, targetDir string, localParentID string) error + // DownloadSnapshotDiff downloads a differential archive between two snapshots to a target directory + DownloadSnapshotDiff(ctx context.Context, snapshotID, localParentID, targetPath string) error // Close closes the connection to the server Close() error diff --git a/manager.go b/manager.go index 6b0e14d..4860612 100644 --- a/manager.go +++ b/manager.go @@ -2,6 +2,7 @@ package agate import ( "archive/zip" + "bytes" "context" "errors" "fmt" @@ -31,6 +32,11 @@ func CreateSnapshotManager(metadataStore store.MetadataStore, blobStore store.Bl return nil, errors.New("parameters can't be nil") } + // Ensure logger is never nil. + if logger == nil { + logger = log.New(io.Discard, "", 0) + } + return &SnapshotManagerData{ metadataStore: metadataStore, blobStore: blobStore, @@ -183,31 +189,24 @@ func (data *SnapshotManagerData) DeleteSnapshot(ctx context.Context, snapshotID return errors.New("snapshot ID cannot be empty") } - // First check if the snapshot exists and get its details snapshot, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID) if err != nil { - if errors.Is(err, ErrNotFound) { - // If snapshot doesn't exist, return success (idempotent operation) + if errors.Is(err, store.ErrNotFound) { return nil } return fmt.Errorf("failed to check if snapshot exists: %w", err) } - // Get the parent ID of the snapshot being deleted parentID := snapshot.ParentID - // Find all snapshots that have the deleted snapshot as their parent - // We need to update their parent ID to maintain the chain opts := store.ListOptions{} allSnapshots, err := data.metadataStore.ListSnapshotsMetadata(ctx, opts) if err != nil { return fmt.Errorf("failed to list snapshots: %w", err) } - // Update parent references for any snapshots that have this one as a parent for _, info := range allSnapshots { if info.ParentID == snapshotID { - // Используем новый, более надежный метод для обновления только parent_id if err := data.metadataStore.UpdateSnapshotParentID(ctx, info.ID, parentID); err != nil { data.logger.Printf("WARNING: failed to update parent reference for snapshot %s: %v", info.ID, err) } else { @@ -216,16 +215,11 @@ func (data *SnapshotManagerData) DeleteSnapshot(ctx context.Context, snapshotID } } - // Delete the metadata first if err := data.metadataStore.DeleteSnapshotMetadata(ctx, snapshotID); err != nil { return fmt.Errorf("failed to delete snapshot metadata: %w", err) } - // Then delete the blob if err := data.blobStore.DeleteBlob(ctx, snapshotID); err != nil { - // Note: We don't return here because we've already deleted the metadata - // and the blob store should handle the case where the blob doesn't exist - // Log the error instead data.logger.Printf("WARNING: failed to delete snapshot blob: %v", err) } @@ -392,22 +386,130 @@ func (data *SnapshotManagerData) UpdateSnapshotMetadata(ctx context.Context, sna return errors.New("new name cannot be empty") } - // Get the current snapshot metadata snapshot, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID) if err != nil { - if errors.Is(err, ErrNotFound) { + if errors.Is(err, store.ErrNotFound) { return ErrNotFound } return fmt.Errorf("failed to get snapshot metadata: %w", err) } - // Update the name snapshot.Name = newName - // Save the updated metadata if err := data.metadataStore.SaveSnapshotMetadata(ctx, *snapshot); err != nil { return fmt.Errorf("failed to update snapshot metadata: %w", err) } return nil } + +// diffArchiveReader is a wrapper around an *os.File that handles cleanup of temporary files. +type diffArchiveReader struct { + *os.File + tempArchive string + tempStaging string +} + +// Close closes the file and removes the temporary archive and staging directory. +func (r *diffArchiveReader) Close() error { + err := r.File.Close() + os.Remove(r.tempArchive) + os.RemoveAll(r.tempStaging) + return err +} + +func (data *SnapshotManagerData) StreamSnapshotDiff(ctx context.Context, snapshotID, parentID string, offset int64) (io.ReadCloser, error) { + targetSnap, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID) + if err != nil { + return nil, fmt.Errorf("failed to get target snapshot metadata: %w", err) + } + + parentFiles := make(map[string]string) + if parentID != "" { + parentSnap, err := data.metadataStore.GetSnapshotMetadata(ctx, parentID) + if err == nil { + for _, file := range parentSnap.Files { + if !file.IsDir { + parentFiles[file.Path] = file.SHA256 + } + } + } else { + data.logger.Printf("Warning: failed to get parent snapshot %s, creating full diff: %v", parentID, err) + } + } + + var filesToInclude []string + for _, file := range targetSnap.Files { + if file.IsDir { + continue + } + if parentHash, ok := parentFiles[file.Path]; !ok || parentHash != file.SHA256 { + filesToInclude = append(filesToInclude, file.Path) + } + } + + if len(filesToInclude) == 0 { + return io.NopCloser(bytes.NewReader(nil)), nil + } + + tempStagingDir, err := os.MkdirTemp(data.blobStore.GetBaseDir(), "diff-staging-*") + if err != nil { + return nil, fmt.Errorf("failed to create temp staging directory: %w", err) + } + + targetBlobPath, err := data.blobStore.GetBlobPath(ctx, snapshotID) + if err != nil { + os.RemoveAll(tempStagingDir) + return nil, err + } + + for _, filePath := range filesToInclude { + destPath := filepath.Join(tempStagingDir, filePath) + if err := os.MkdirAll(filepath.Dir(destPath), 0755); err != nil { + os.RemoveAll(tempStagingDir) + return nil, fmt.Errorf("failed to create dir for diff file: %w", err) + } + + fileWriter, err := os.Create(destPath) + if err != nil { + os.RemoveAll(tempStagingDir) + return nil, err + } + + err = archive.ExtractFileFromArchive(targetBlobPath, filePath, fileWriter) + fileWriter.Close() + if err != nil { + os.RemoveAll(tempStagingDir) + return nil, fmt.Errorf("failed to extract file %s for diff: %w", filePath, err) + } + } + + tempArchivePath := filepath.Join(data.blobStore.GetBaseDir(), "diff-"+snapshotID+".zip") + if err := archive.CreateArchive(tempStagingDir, tempArchivePath); err != nil { + os.RemoveAll(tempStagingDir) + os.Remove(tempArchivePath) + return nil, fmt.Errorf("failed to create diff archive: %w", err) + } + + archiveFile, err := os.Open(tempArchivePath) + if err != nil { + os.RemoveAll(tempStagingDir) + os.Remove(tempArchivePath) + return nil, err + } + + if offset > 0 { + if _, err := archiveFile.Seek(offset, io.SeekStart); err != nil { + archiveFile.Close() + os.RemoveAll(tempStagingDir) + os.Remove(tempArchivePath) + return nil, fmt.Errorf("failed to seek in diff archive: %w", err) + } + } + + return &diffArchiveReader{ + File: archiveFile, + tempArchive: tempArchivePath, + tempStaging: tempStagingDir, + }, nil +} diff --git a/manager_test.go b/manager_test.go index c1c86a4..c4c4691 100644 --- a/manager_test.go +++ b/manager_test.go @@ -554,3 +554,60 @@ func TestUpdateSnapshotMetadata(t *testing.T) { t.Fatalf("Expected error when updating non-existent snapshot, got nil") } } + +func TestStreamSnapshotDiff_EdgeCases(t *testing.T) { + tempDir, metadataStore, blobStore, cleanup := setupTestEnvironment(t) + defer cleanup() + + sourceDir := filepath.Join(tempDir, "source") + os.MkdirAll(sourceDir, 0755) + createTestFiles(t, sourceDir) + + manager, err := CreateSnapshotManager(metadataStore, blobStore, nil) + if err != nil { + t.Fatalf("Failed to create snapshot manager: %v", err) + } + ctx := context.Background() + + // Create two identical snapshots + snap1, _ := manager.CreateSnapshot(ctx, sourceDir, "Snap1", "") + snap2, _ := manager.CreateSnapshot(ctx, sourceDir, "Snap2", snap1.ID) + + // Test 1: Diff between identical snapshots should be empty + reader, err := manager.StreamSnapshotDiff(ctx, snap2.ID, snap1.ID, 0) + if err != nil { + t.Fatalf("Expected no error for identical snapshots, got %v", err) + } + defer reader.Close() + data, _ := io.ReadAll(reader) + if len(data) != 0 { + t.Errorf("Expected empty diff for identical snapshots, got %d bytes", len(data)) + } + + // Test 2: Diff with a non-existent parent should be a full archive + reader, err = manager.StreamSnapshotDiff(ctx, snap1.ID, "non-existent-parent", 0) + if err != nil { + t.Fatalf("Expected no error for non-existent parent, got %v", err) + } + defer reader.Close() + data, _ = io.ReadAll(reader) + if len(data) == 0 { + t.Error("Expected full archive for non-existent parent, got empty diff") + } + + // Create an empty source dir + emptyDir := filepath.Join(tempDir, "empty_source") + os.MkdirAll(emptyDir, 0755) + emptySnap, _ := manager.CreateSnapshot(ctx, emptyDir, "EmptySnap", "") + + // Test 3: Diff of an empty snapshot should be empty + reader, err = manager.StreamSnapshotDiff(ctx, emptySnap.ID, "", 0) + if err != nil { + t.Fatalf("Expected no error for empty snapshot, got %v", err) + } + defer reader.Close() + data, _ = io.ReadAll(reader) + if len(data) != 0 { + t.Errorf("Expected empty diff for empty snapshot, got %d bytes", len(data)) + } +} diff --git a/remote/client.go b/remote/client.go index 4b626d8..f6bcb81 100644 --- a/remote/client.go +++ b/remote/client.go @@ -15,34 +15,26 @@ import ( "gitea.unprism.ru/KRBL/Agate/store" ) -// Client represents a client for connecting to a remote snapshot server -// It implements the interfaces.SnapshotClient interface +// Client представляет клиент для подключения к удаленному серверу снапшотов. type Client struct { conn *stdgrpc.ClientConn client agateGrpc.SnapshotServiceClient } -// Ensure Client implements interfaces.SnapshotClient +// Убедимся, что Client реализует интерфейс interfaces.SnapshotClient var _ interfaces.SnapshotClient = (*Client)(nil) -// NewClient creates a new client connected to the specified address +// NewClient создает нового клиента, подключенного к указанному адресу. func NewClient(address string) (*Client, error) { - // Connect to the server with insecure credentials (for simplicity) conn, err := stdgrpc.Dial(address, stdgrpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, fmt.Errorf("failed to connect to server at %s: %w", address, err) } - - // Create the gRPC client client := agateGrpc.NewSnapshotServiceClient(conn) - - return &Client{ - conn: conn, - client: client, - }, nil + return &Client{conn: conn, client: client}, nil } -// Close closes the connection to the server +// Close закрывает соединение с сервером. func (c *Client) Close() error { if c.conn != nil { return c.conn.Close() @@ -50,14 +42,13 @@ func (c *Client) Close() error { return nil } -// ListSnapshots retrieves a list of snapshots from the remote server +// ListSnapshots получает список снапшотов с удаленного сервера. func (c *Client) ListSnapshots(ctx context.Context) ([]store.SnapshotInfo, error) { response, err := c.client.ListSnapshots(ctx, &agateGrpc.ListSnapshotsRequest{}) if err != nil { return nil, fmt.Errorf("failed to list snapshots: %w", err) } - // Convert gRPC snapshot info to store.SnapshotInfo snapshots := make([]store.SnapshotInfo, 0, len(response.Snapshots)) for _, snapshot := range response.Snapshots { snapshots = append(snapshots, store.SnapshotInfo{ @@ -67,11 +58,10 @@ func (c *Client) ListSnapshots(ctx context.Context) ([]store.SnapshotInfo, error CreationTime: snapshot.CreationTime.AsTime(), }) } - return snapshots, nil } -// FetchSnapshotDetails retrieves detailed information about a specific snapshot +// FetchSnapshotDetails получает детальную информацию о конкретном снапшоте. func (c *Client) FetchSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error) { response, err := c.client.GetSnapshotDetails(ctx, &agateGrpc.GetSnapshotDetailsRequest{ SnapshotId: snapshotID, @@ -80,7 +70,6 @@ func (c *Client) FetchSnapshotDetails(ctx context.Context, snapshotID string) (* return nil, fmt.Errorf("failed to get snapshot details: %w", err) } - // Convert gRPC snapshot details to store.Snapshot snapshot := &store.Snapshot{ ID: response.Info.Id, Name: response.Info.Name, @@ -89,7 +78,6 @@ func (c *Client) FetchSnapshotDetails(ctx context.Context, snapshotID string) (* Files: make([]store.FileInfo, 0, len(response.Files)), } - // Convert file info for _, file := range response.Files { snapshot.Files = append(snapshot.Files, store.FileInfo{ Path: file.Path, @@ -98,118 +86,48 @@ func (c *Client) FetchSnapshotDetails(ctx context.Context, snapshotID string) (* SHA256: file.Sha256Hash, }) } - return snapshot, nil } -// DownloadSnapshot downloads a snapshot from the server -// This implementation downloads each file individually to optimize bandwidth usage -func (c *Client) DownloadSnapshot(ctx context.Context, snapshotID string, targetDir string, localParentID string) error { - // Get snapshot details - snapshot, err := c.FetchSnapshotDetails(ctx, snapshotID) +// DownloadSnapshotDiff скачивает архив с разницей между снапшотами. +func (c *Client) DownloadSnapshotDiff(ctx context.Context, snapshotID, localParentID, targetPath string) error { + var offset int64 + fileInfo, err := os.Stat(targetPath) + if err == nil { + offset = fileInfo.Size() + } else if !os.IsNotExist(err) { + return fmt.Errorf("failed to stat temporary file: %w", err) + } + + req := &agateGrpc.DownloadSnapshotDiffRequest{ + SnapshotId: snapshotID, + LocalParentId: localParentID, + Offset: offset, + } + + stream, err := c.client.DownloadSnapshotDiff(ctx, req) if err != nil { - return fmt.Errorf("failed to get snapshot details: %w", err) + return fmt.Errorf("failed to start snapshot diff download: %w", err) } - // Create target directory if it doesn't exist - if err := os.MkdirAll(targetDir, 0755); err != nil { - return fmt.Errorf("failed to create target directory: %w", err) - } - - // If a local parent is specified, get its details to compare files - var localParentFiles map[string]store.FileInfo - if localParentID != "" { - localParent, err := c.FetchSnapshotDetails(ctx, localParentID) - if err == nil { - // Create a map of file paths to file info for quick lookup - localParentFiles = make(map[string]store.FileInfo, len(localParent.Files)) - for _, file := range localParent.Files { - localParentFiles[file.Path] = file - } - } - } - - // Download each file - for _, file := range snapshot.Files { - // Skip directories, we'll create them when needed - if file.IsDir { - // Create directory - dirPath := filepath.Join(targetDir, file.Path) - if err := os.MkdirAll(dirPath, 0755); err != nil { - return fmt.Errorf("failed to create directory %s: %w", dirPath, err) - } - continue - } - - // Check if we can skip downloading this file - if localParentFiles != nil { - if parentFile, exists := localParentFiles[file.Path]; exists && parentFile.SHA256 == file.SHA256 { - // File exists in parent with same hash, copy it instead of downloading - parentFilePath := filepath.Join(targetDir, "..", localParentID, file.Path) - targetFilePath := filepath.Join(targetDir, file.Path) - - // Ensure parent directory exists - if err := os.MkdirAll(filepath.Dir(targetFilePath), 0755); err != nil { - return fmt.Errorf("failed to create directory for %s: %w", targetFilePath, err) - } - - // Copy the file - if err := copyFile(parentFilePath, targetFilePath); err != nil { - // If copy fails, fall back to downloading - fmt.Printf("Failed to copy file %s, will download instead: %v\n", file.Path, err) - } else { - // Skip to next file - continue - } - } - } - - // Download the file - if err := c.downloadFile(ctx, snapshotID, file.Path, filepath.Join(targetDir, file.Path)); err != nil { - return fmt.Errorf("failed to download file %s: %w", file.Path, err) - } - } - - return nil -} - -// downloadFile downloads a single file from the server -func (c *Client) downloadFile(ctx context.Context, snapshotID, filePath, targetPath string) error { - // Create the request - req := &agateGrpc.DownloadFileRequest{ - SnapshotId: snapshotID, - FilePath: filePath, - } - - // Start streaming the file - stream, err := c.client.DownloadFile(ctx, req) - if err != nil { - return fmt.Errorf("failed to start file download: %w", err) - } - - // Ensure the target directory exists if err := os.MkdirAll(filepath.Dir(targetPath), 0755); err != nil { return fmt.Errorf("failed to create directory for %s: %w", targetPath, err) } - // Create the target file - file, err := os.Create(targetPath) + file, err := os.OpenFile(targetPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) if err != nil { - return fmt.Errorf("failed to create file %s: %w", targetPath, err) + return fmt.Errorf("failed to open file %s: %w", targetPath, err) } defer file.Close() - // Receive and write chunks for { resp, err := stream.Recv() if err == io.EOF { break } if err != nil { - return fmt.Errorf("error receiving file chunk: %w", err) + return fmt.Errorf("error receiving diff chunk: %w", err) } - - // Write the chunk to the file if _, err := file.Write(resp.ChunkData); err != nil { return fmt.Errorf("error writing to file: %w", err) } @@ -217,26 +135,3 @@ func (c *Client) downloadFile(ctx context.Context, snapshotID, filePath, targetP return nil } - -// Helper function to copy a file -func copyFile(src, dst string) error { - sourceFile, err := os.Open(src) - if err != nil { - return err - } - defer sourceFile.Close() - - destFile, err := os.Create(dst) - if err != nil { - return err - } - defer destFile.Close() - - _, err = io.Copy(destFile, sourceFile) - return err -} - -// Connect creates a new client connected to the specified address -func Connect(address string) (*Client, error) { - return NewClient(address) -} diff --git a/remote/server.go b/remote/server.go index dffed9c..9a05c62 100644 --- a/remote/server.go +++ b/remote/server.go @@ -14,21 +14,21 @@ import ( "gitea.unprism.ru/KRBL/Agate/store" ) -// Server implements the gRPC server for snapshots +// Server реализует gRPC-сервер для снапшотов. type Server struct { agateGrpc.UnimplementedSnapshotServiceServer manager interfaces.SnapshotManager server *stdgrpc.Server } -// NewServer creates a new snapshot server +// NewServer создает новый сервер снапшотов. func NewServer(manager interfaces.SnapshotManager) *Server { return &Server{ manager: manager, } } -// Start starts the gRPC server on the specified address +// Start запускает gRPC-сервер на указанном адресе. func (s *Server) Start(ctx context.Context, address string) error { lis, err := net.Listen("tcp", address) if err != nil { @@ -45,23 +45,24 @@ func (s *Server) Start(ctx context.Context, address string) error { }() fmt.Printf("Server started on %s\n", address) + + // Ждем отмены контекста для остановки сервера + <-ctx.Done() + s.Stop() return nil } -// Stop gracefully stops the server -func (s *Server) Stop(ctx context.Context) error { +// Stop изящно останавливает сервер. +func (s *Server) Stop() { if s.server != nil { s.server.GracefulStop() fmt.Println("Server stopped") } - return nil } -// ListSnapshots implements the gRPC ListSnapshots method +// ListSnapshots реализует gRPC-метод ListSnapshots. func (s *Server) ListSnapshots(ctx context.Context, req *agateGrpc.ListSnapshotsRequest) (*agateGrpc.ListSnapshotsResponse, error) { - // Create empty ListOptions since the proto doesn't have active filter/pagination fields yet opts := store.ListOptions{} - snapshots, err := s.manager.ListSnapshots(ctx, opts) if err != nil { return nil, fmt.Errorf("failed to list snapshots: %w", err) @@ -78,7 +79,7 @@ func (s *Server) ListSnapshots(ctx context.Context, req *agateGrpc.ListSnapshots return response, nil } -// GetSnapshotDetails implements the gRPC GetSnapshotDetails method +// GetSnapshotDetails реализует gRPC-метод GetSnapshotDetails. func (s *Server) GetSnapshotDetails(ctx context.Context, req *agateGrpc.GetSnapshotDetailsRequest) (*agateGrpc.SnapshotDetails, error) { snapshot, err := s.manager.GetSnapshotDetails(ctx, req.SnapshotId) if err != nil { @@ -107,17 +108,15 @@ func (s *Server) GetSnapshotDetails(ctx context.Context, req *agateGrpc.GetSnaps return response, nil } -// DownloadFile implements the gRPC DownloadFile method +// DownloadFile реализует gRPC-метод DownloadFile. func (s *Server) DownloadFile(req *agateGrpc.DownloadFileRequest, stream agateGrpc.SnapshotService_DownloadFileServer) error { - // Open the file from the snapshot fileReader, err := s.manager.OpenFile(context.Background(), req.SnapshotId, req.FilePath) if err != nil { return fmt.Errorf("failed to open file: %w", err) } defer fileReader.Close() - // Read the file in chunks and send them to the client - buffer := make([]byte, 64*1024) // 64KB chunks + buffer := make([]byte, 64*1024) for { n, err := fileReader.Read(buffer) if err == io.EOF { @@ -126,19 +125,40 @@ func (s *Server) DownloadFile(req *agateGrpc.DownloadFileRequest, stream agateGr if err != nil { return fmt.Errorf("failed to read file: %w", err) } - - // Send the chunk to the client - if err := stream.Send(&agateGrpc.DownloadFileResponse{ - ChunkData: buffer[:n], - }); err != nil { + if err := stream.Send(&agateGrpc.DownloadFileResponse{ChunkData: buffer[:n]}); err != nil { return fmt.Errorf("failed to send chunk: %w", err) } } - return nil } -// Helper function to convert store.SnapshotInfo to grpc.SnapshotInfo +// DownloadSnapshotDiff реализует gRPC-метод DownloadSnapshotDiff. +func (s *Server) DownloadSnapshotDiff(req *agateGrpc.DownloadSnapshotDiffRequest, stream agateGrpc.SnapshotService_DownloadSnapshotDiffServer) error { + diffReader, err := s.manager.StreamSnapshotDiff(context.Background(), req.SnapshotId, req.LocalParentId, req.Offset) + if err != nil { + return fmt.Errorf("failed to stream snapshot diff: %w", err) + } + defer diffReader.Close() + + buffer := make([]byte, 64*1024) + for { + n, err := diffReader.Read(buffer) + if err == io.EOF { + break + } + if err != nil { + return fmt.Errorf("failed to read from diff stream: %w", err) + } + if n > 0 { + if err := stream.Send(&agateGrpc.DownloadFileResponse{ChunkData: buffer[:n]}); err != nil { + return fmt.Errorf("failed to send diff chunk: %w", err) + } + } + } + return nil +} + +// Вспомогательная функция для конвертации store.SnapshotInfo в grpc.SnapshotInfo func convertToGrpcSnapshotInfo(info store.SnapshotInfo) *agateGrpc.SnapshotInfo { return &agateGrpc.SnapshotInfo{ Id: info.ID, @@ -147,12 +167,3 @@ func convertToGrpcSnapshotInfo(info store.SnapshotInfo) *agateGrpc.SnapshotInfo CreationTime: timestamppb.New(info.CreationTime), } } - -// RunServer is a helper function to create and start a snapshot server -func RunServer(ctx context.Context, manager interfaces.SnapshotManager, address string) (*Server, error) { - server := NewServer(manager) - if err := server.Start(ctx, address); err != nil { - return nil, err - } - return server, nil -}