3 Commits

Author SHA1 Message Date
7b670947c3 Add GetCurrentSnapshotID method to retrieve the current snapshot ID 2025-07-13 23:19:41 +03:00
aaf227f25a Remove obsolete gRPC client/server implementations and migrate to remote package 2025-07-13 23:12:39 +03:00
efa2bec38b Add RegisterLocalSnapshot and GetRemoteSnapshotMetadata methods
Introduce methods to register local snapshots from archives and to download or restore snapshot metadata, improving snapshot management capabilities. Update README with usage examples.
2025-07-10 12:49:05 +03:00
13 changed files with 688 additions and 1119 deletions

View File

@ -223,6 +223,41 @@ func main() {
## Advanced Usage ## Advanced Usage
### Registering a Local Snapshot
You can register a local snapshot from an existing archive file with a specified UUID:
```go
// Register a local snapshot from an archive file
archivePath := "/path/to/your/archive.zip"
snapshotID := "custom-uuid-for-snapshot"
snapshotName := "My Local Snapshot"
if err := ag.RegisterLocalSnapshot(ctx, archivePath, snapshotID, snapshotName); err != nil {
log.Fatalf("Failed to register local snapshot: %v", err)
}
```
### Downloading Only Snapshot Metadata
You can download only the metadata of a snapshot from a remote server without downloading the actual files:
```go
// Download only the metadata of a snapshot from a remote server
remoteAddress := "remote-server:50051"
snapshotID := "snapshot-id-to-download"
if err := ag.GetRemoteSnapshotMetadata(ctx, remoteAddress, snapshotID); err != nil {
log.Fatalf("Failed to download snapshot metadata: %v", err)
}
// If you have a local blob but missing metadata, you can restore the metadata
// by passing an empty address
if err := ag.GetRemoteSnapshotMetadata(ctx, "", snapshotID); err != nil {
log.Fatalf("Failed to restore snapshot metadata: %v", err)
}
```
### Creating Incremental Snapshots ### Creating Incremental Snapshots
You can create incremental snapshots by specifying a parent snapshot ID: You can create incremental snapshots by specifying a parent snapshot ID:
@ -294,6 +329,8 @@ The main entry point for the library.
- `ConnectRemote(address string) (*grpc.SnapshotClient, error)` - Connect to a remote server - `ConnectRemote(address string) (*grpc.SnapshotClient, error)` - Connect to a remote server
- `GetRemoteSnapshotList(ctx context.Context, address string) ([]store.SnapshotInfo, error)` - List snapshots from a remote server - `GetRemoteSnapshotList(ctx context.Context, address string) ([]store.SnapshotInfo, error)` - List snapshots from a remote server
- `GetRemoteSnapshot(ctx context.Context, address string, snapshotID string, localParentID string) error` - Download a snapshot from a remote server - `GetRemoteSnapshot(ctx context.Context, address string, snapshotID string, localParentID string) error` - Download a snapshot from a remote server
- `RegisterLocalSnapshot(ctx context.Context, archivePath string, snapshotID string, name string) error` - Register a local snapshot from an archive path with a specified UUID
- `GetRemoteSnapshotMetadata(ctx context.Context, address string, snapshotID string) error` - Download only the metadata of a snapshot from a remote server
### AgateOptions ### AgateOptions
@ -304,7 +341,3 @@ Configuration options for the Agate library.
- `CloseFunc func() error` - Called before a snapshot is created or restored - `CloseFunc func() error` - Called before a snapshot is created or restored
- `MetadataStore store.MetadataStore` - Implementation of the metadata store - `MetadataStore store.MetadataStore` - Implementation of the metadata store
- `BlobStore store.BlobStore` - Implementation of the blob store - `BlobStore store.BlobStore` - Implementation of the blob store
## License
[Add your license information here]

217
api.go
View File

@ -5,8 +5,8 @@ import (
"errors" "errors"
"fmt" "fmt"
"gitea.unprism.ru/KRBL/Agate/archive" "gitea.unprism.ru/KRBL/Agate/archive"
"gitea.unprism.ru/KRBL/Agate/grpc"
"gitea.unprism.ru/KRBL/Agate/interfaces" "gitea.unprism.ru/KRBL/Agate/interfaces"
"gitea.unprism.ru/KRBL/Agate/remote"
"io" "io"
"log" "log"
"os" "os"
@ -348,197 +348,88 @@ func (a *Agate) Close() error {
// StartServer starts a gRPC server to share snapshots. // StartServer starts a gRPC server to share snapshots.
func (a *Agate) StartServer(ctx context.Context, address string) error { func (a *Agate) StartServer(ctx context.Context, address string) error {
_, err := grpc.RunServer(ctx, a.manager, address) // Использование нового remote.Server
if err != nil { server := remote.NewServer(a.manager)
return fmt.Errorf("failed to start server: %w", err) return server.Start(ctx, address)
}
// We don't store the server reference because we don't have a way to stop it yet
// In a future version, we could add a StopServer method
return nil
} }
// ConnectRemote connects to a remote snapshot server. // GetRemoteSnapshot downloads a snapshot from a remote server, using an efficient differential update.
// Returns a client that can be used to interact with the remote server.
func (a *Agate) ConnectRemote(address string) (*grpc.SnapshotClient, error) {
client, err := grpc.ConnectToServer(address)
if err != nil {
return nil, fmt.Errorf("failed to connect to remote server: %w", err)
}
return client, nil
}
// GetRemoteSnapshotList retrieves a list of snapshots from a remote server.
func (a *Agate) GetRemoteSnapshotList(ctx context.Context, address string) ([]store.SnapshotInfo, error) {
client, err := a.ConnectRemote(address)
if err != nil {
return nil, err
}
defer client.Close()
return client.ListSnapshots(ctx)
}
// GetRemoteSnapshot downloads a snapshot from a remote server.
// If localParentID is provided, it will be used to optimize the download by skipping files that already exist locally.
func (a *Agate) GetRemoteSnapshot(ctx context.Context, address string, snapshotID string, localParentID string) error { func (a *Agate) GetRemoteSnapshot(ctx context.Context, address string, snapshotID string, localParentID string) error {
client, err := a.ConnectRemote(address) client, err := remote.NewClient(address)
if err != nil { if err != nil {
return err return err
} }
defer client.Close() defer client.Close()
// Get the remote snapshot details
remoteSnapshot, err := client.FetchSnapshotDetails(ctx, snapshotID) remoteSnapshot, err := client.FetchSnapshotDetails(ctx, snapshotID)
if err != nil {
return fmt.Errorf("failed to get snapshot details: %w", err)
}
// Create a temporary directory for downloading files
tempDownloadDir := filepath.Join(a.options.WorkDir, "temp_download", snapshotID)
if err := os.MkdirAll(tempDownloadDir, 0755); err != nil {
return fmt.Errorf("failed to create temporary download directory: %w", err)
}
defer os.RemoveAll(tempDownloadDir) // Clean up when done
a.options.Logger.Printf("Downloading snapshot %s from %s", snapshotID, address)
// If localParentID is provided, try to reuse files from the local parent snapshot
if localParentID != "" {
a.options.Logger.Printf("Using local parent snapshot %s for incremental download", localParentID)
// Get the local parent snapshot details
localParent, err := a.GetSnapshotDetails(ctx, localParentID)
if err != nil {
a.options.Logger.Printf("Warning: Failed to get local parent snapshot details: %v", err)
} else {
// Extract the local parent snapshot to a temporary directory
localParentDir := filepath.Join(a.options.WorkDir, "temp_download", localParentID)
if err := os.MkdirAll(localParentDir, 0755); err != nil {
a.options.Logger.Printf("Warning: Failed to create temporary directory for local parent: %v", err)
} else {
defer os.RemoveAll(localParentDir) // Clean up when done
if err := a.manager.ExtractSnapshot(ctx, localParentID, localParentDir, true); err != nil {
a.options.Logger.Printf("Warning: Failed to extract local parent snapshot: %v", err)
} else {
// Copy unchanged files from the local parent to the download directory
for _, file := range remoteSnapshot.Files {
// Skip directories, they'll be created as needed
if file.IsDir {
continue
}
// Check if the file exists in the local parent with the same hash
var localFile *store.FileInfo
for _, lf := range localParent.Files {
if lf.Path == file.Path && lf.SHA256 == file.SHA256 {
localFile = &lf
break
}
}
if localFile != nil {
// File exists in local parent with the same hash, copy it
srcPath := filepath.Join(localParentDir, localFile.Path)
dstPath := filepath.Join(tempDownloadDir, file.Path)
// Ensure the destination directory exists
if err := os.MkdirAll(filepath.Dir(dstPath), 0755); err != nil {
a.options.Logger.Printf("Failed to create directory for %s: %v", dstPath, err)
continue
}
// Copy the file
srcFile, err := os.Open(srcPath)
if err != nil {
a.options.Logger.Printf("Failed to copy file %s, will download instead: %v", file.Path, err)
continue
}
defer srcFile.Close()
dstFile, err := os.Create(dstPath)
if err != nil {
a.options.Logger.Printf("Failed to create destination file %s: %v", dstPath, err)
continue
}
_, err = io.Copy(dstFile, srcFile)
dstFile.Close()
if err != nil {
a.options.Logger.Printf("Failed to copy file data for %s: %v", file.Path, err)
// If copy fails, the file will be downloaded
} else {
a.options.Logger.Printf("Reusing file %s from local parent", file.Path)
}
}
}
}
}
}
}
// Download the snapshot files
a.options.Logger.Printf("Downloading files for snapshot %s", snapshotID)
// Get snapshot details to know what files we need to download
remoteDetails, err := client.FetchSnapshotDetails(ctx, snapshotID)
if err != nil { if err != nil {
return fmt.Errorf("failed to get remote snapshot details: %w", err) return fmt.Errorf("failed to get remote snapshot details: %w", err)
} }
// Check which files we already have and which we need to download // 1. Подготовка
for _, file := range remoteDetails.Files { tempDownloadDir := filepath.Join(a.options.WorkDir, "temp_download")
if file.IsDir { if err := os.MkdirAll(tempDownloadDir, 0755); err != nil {
continue // Skip directories return fmt.Errorf("failed to create temp download dir: %w", err)
} }
diffArchivePath := filepath.Join(tempDownloadDir, snapshotID+"_diff.zip")
diffPartPath := diffArchivePath + ".part"
filePath := filepath.Join(tempDownloadDir, file.Path) // 2. Скачивание дельты с докачкой
if _, err := os.Stat(filePath); os.IsNotExist(err) { a.options.Logger.Printf("Downloading diff for snapshot %s from parent %s", snapshotID, localParentID)
// File doesn't exist yet, we'll need to download it if err := client.DownloadSnapshotDiff(ctx, snapshotID, localParentID, diffPartPath); err != nil {
a.options.Logger.Printf("Downloading file %s", file.Path) return fmt.Errorf("failed to download snapshot diff: %w", err)
}
if err := os.Rename(diffPartPath, diffArchivePath); err != nil {
return fmt.Errorf("failed to finalize downloaded diff: %w", err)
}
defer os.Remove(diffArchivePath)
// 3. Атомарное применение
// Создаем новую директорию для снапшота
newSnapshotDir := filepath.Join(tempDownloadDir, "new_content_"+snapshotID)
if err := os.MkdirAll(newSnapshotDir, 0755); err != nil {
return fmt.Errorf("failed to create new snapshot directory: %w", err)
}
defer os.RemoveAll(newSnapshotDir)
// Если есть родитель, извлекаем его содержимое
if localParentID != "" {
if err := a.manager.ExtractSnapshot(ctx, localParentID, newSnapshotDir, false); err != nil {
a.options.Logger.Printf("Warning: failed to extract local parent snapshot %s: %v", localParentID, err)
} }
} }
if err := client.DownloadSnapshot(ctx, snapshotID, tempDownloadDir, localParentID); err != nil { // Распаковываем дельта-архив поверх
return fmt.Errorf("failed to download snapshot: %w", err) if err := extractArchive(diffArchivePath, newSnapshotDir); err != nil {
return fmt.Errorf("failed to extract diff archive: %w", err)
} }
a.options.Logger.Printf("Creating archive from downloaded files") // 4. Создаем финальный архив и регистрируем снапшот
finalArchivePath := filepath.Join(tempDownloadDir, snapshotID+".zip")
// Create a zip archive from the downloaded files if err := archive.CreateArchive(newSnapshotDir, finalArchivePath); err != nil {
zipPath := filepath.Join(a.options.WorkDir, "temp_download", snapshotID+".zip") return fmt.Errorf("failed to create final snapshot archive: %w", err)
if err := archive.CreateArchive(tempDownloadDir, zipPath); err != nil {
return fmt.Errorf("failed to create zip archive: %w", err)
} }
defer os.Remove(finalArchivePath)
// Store the blob with the remote snapshot ID finalArchiveFile, err := os.Open(finalArchivePath)
zipFile, err := os.Open(zipPath)
if err != nil { if err != nil {
return fmt.Errorf("failed to open zip file: %w", err) return fmt.Errorf("failed to open final archive: %w", err)
} }
defer zipFile.Close() defer finalArchiveFile.Close()
defer os.Remove(zipPath) // Clean up the zip file when done
a.options.Logger.Printf("Storing blob with ID %s", remoteSnapshot.ID) if _, err := a.options.BlobStore.StoreBlob(ctx, snapshotID, finalArchiveFile); err != nil {
return fmt.Errorf("failed to store final blob: %w", err)
// Store the blob with the remote snapshot ID
_, err = a.options.BlobStore.StoreBlob(ctx, remoteSnapshot.ID, zipFile)
if err != nil {
return fmt.Errorf("failed to store blob: %w", err)
} }
a.options.Logger.Printf("Saving snapshot metadata") if err := a.options.MetadataStore.SaveSnapshotMetadata(ctx, *remoteSnapshot); err != nil {
a.options.BlobStore.DeleteBlob(ctx, snapshotID) // Откат
// Save the remote snapshot metadata
err = a.options.MetadataStore.SaveSnapshotMetadata(ctx, *remoteSnapshot)
if err != nil {
return fmt.Errorf("failed to save snapshot metadata: %w", err) return fmt.Errorf("failed to save snapshot metadata: %w", err)
} }
a.options.Logger.Printf("Successfully imported remote snapshot %s", snapshotID) a.options.Logger.Printf("Successfully imported remote snapshot %s", snapshotID)
return nil return nil
} }
func (a *Agate) GetCurrentSnapshotID() string {
return a.currentSnapshotID
}

View File

@ -1,236 +0,0 @@
package grpc
import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"gitea.unprism.ru/KRBL/Agate/store"
)
// SnapshotClient implements the client for connecting to a remote snapshot server
type SnapshotClient struct {
conn *grpc.ClientConn
client SnapshotServiceClient
}
// NewSnapshotClient creates a new client connected to the specified address
func NewSnapshotClient(address string) (*SnapshotClient, error) {
// Connect to the server with insecure credentials (for simplicity)
conn, err := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, fmt.Errorf("failed to connect to server at %s: %w", address, err)
}
// Create the gRPC client
client := NewSnapshotServiceClient(conn)
return &SnapshotClient{
conn: conn,
client: client,
}, nil
}
// Close closes the connection to the server
func (c *SnapshotClient) Close() error {
if c.conn != nil {
return c.conn.Close()
}
return nil
}
// ListSnapshots retrieves a list of snapshots from the remote server
func (c *SnapshotClient) ListSnapshots(ctx context.Context) ([]store.SnapshotInfo, error) {
response, err := c.client.ListSnapshots(ctx, &ListSnapshotsRequest{})
if err != nil {
return nil, fmt.Errorf("failed to list snapshots: %w", err)
}
// Convert gRPC snapshot info to store.SnapshotInfo
snapshots := make([]store.SnapshotInfo, 0, len(response.Snapshots))
for _, snapshot := range response.Snapshots {
snapshots = append(snapshots, store.SnapshotInfo{
ID: snapshot.Id,
Name: snapshot.Name,
ParentID: snapshot.ParentId,
CreationTime: snapshot.CreationTime.AsTime(),
})
}
return snapshots, nil
}
// FetchSnapshotDetails retrieves detailed information about a specific snapshot
func (c *SnapshotClient) FetchSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error) {
response, err := c.client.GetSnapshotDetails(ctx, &GetSnapshotDetailsRequest{
SnapshotId: snapshotID,
})
if err != nil {
return nil, fmt.Errorf("failed to get snapshot details: %w", err)
}
// Convert gRPC snapshot details to store.Snapshot
snapshot := &store.Snapshot{
ID: response.Info.Id,
Name: response.Info.Name,
ParentID: response.Info.ParentId,
CreationTime: response.Info.CreationTime.AsTime(),
Files: make([]store.FileInfo, 0, len(response.Files)),
}
// Convert file info
for _, file := range response.Files {
snapshot.Files = append(snapshot.Files, store.FileInfo{
Path: file.Path,
Size: file.SizeBytes,
IsDir: file.IsDir,
SHA256: file.Sha256Hash,
})
}
return snapshot, nil
}
// DownloadSnapshot downloads a snapshot from the server
// This implementation downloads each file individually to optimize bandwidth usage
func (c *SnapshotClient) DownloadSnapshot(ctx context.Context, snapshotID string, targetDir string, localParentID string) error {
// Get snapshot details
snapshot, err := c.FetchSnapshotDetails(ctx, snapshotID)
if err != nil {
return fmt.Errorf("failed to get snapshot details: %w", err)
}
// Create target directory if it doesn't exist
if err := os.MkdirAll(targetDir, 0755); err != nil {
return fmt.Errorf("failed to create target directory: %w", err)
}
// If a local parent is specified, get its details to compare files
var localParentFiles map[string]store.FileInfo
if localParentID != "" {
localParent, err := c.FetchSnapshotDetails(ctx, localParentID)
if err == nil {
// Create a map of file paths to file info for quick lookup
localParentFiles = make(map[string]store.FileInfo, len(localParent.Files))
for _, file := range localParent.Files {
localParentFiles[file.Path] = file
}
}
}
// Download each file
for _, file := range snapshot.Files {
// Skip directories, we'll create them when needed
if file.IsDir {
// Create directory
dirPath := filepath.Join(targetDir, file.Path)
if err := os.MkdirAll(dirPath, 0755); err != nil {
return fmt.Errorf("failed to create directory %s: %w", dirPath, err)
}
continue
}
// Check if we can skip downloading this file
if localParentFiles != nil {
if parentFile, exists := localParentFiles[file.Path]; exists && parentFile.SHA256 == file.SHA256 {
// File exists in parent with same hash, copy it instead of downloading
parentFilePath := filepath.Join(targetDir, "..", localParentID, file.Path)
targetFilePath := filepath.Join(targetDir, file.Path)
// Ensure parent directory exists
if err := os.MkdirAll(filepath.Dir(targetFilePath), 0755); err != nil {
return fmt.Errorf("failed to create directory for %s: %w", targetFilePath, err)
}
// Copy the file
if err := copyFile(parentFilePath, targetFilePath); err != nil {
// If copy fails, fall back to downloading
fmt.Printf("Failed to copy file %s, will download instead: %v\n", file.Path, err)
} else {
// Skip to next file
continue
}
}
}
// Download the file
if err := c.downloadFile(ctx, snapshotID, file.Path, filepath.Join(targetDir, file.Path)); err != nil {
return fmt.Errorf("failed to download file %s: %w", file.Path, err)
}
}
return nil
}
// downloadFile downloads a single file from the server
func (c *SnapshotClient) downloadFile(ctx context.Context, snapshotID, filePath, targetPath string) error {
// Create the request
req := &DownloadFileRequest{
SnapshotId: snapshotID,
FilePath: filePath,
}
// Start streaming the file
stream, err := c.client.DownloadFile(ctx, req)
if err != nil {
return fmt.Errorf("failed to start file download: %w", err)
}
// Ensure the target directory exists
if err := os.MkdirAll(filepath.Dir(targetPath), 0755); err != nil {
return fmt.Errorf("failed to create directory for %s: %w", targetPath, err)
}
// Create the target file
file, err := os.Create(targetPath)
if err != nil {
return fmt.Errorf("failed to create file %s: %w", targetPath, err)
}
defer file.Close()
// Receive and write chunks
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("error receiving file chunk: %w", err)
}
// Write the chunk to the file
if _, err := file.Write(resp.ChunkData); err != nil {
return fmt.Errorf("error writing to file: %w", err)
}
}
return nil
}
// Helper function to copy a file
func copyFile(src, dst string) error {
sourceFile, err := os.Open(src)
if err != nil {
return err
}
defer sourceFile.Close()
destFile, err := os.Create(dst)
if err != nil {
return err
}
defer destFile.Close()
_, err = io.Copy(destFile, sourceFile)
return err
}
// ConnectToServer creates a new client connected to the specified address
func ConnectToServer(address string) (*SnapshotClient, error) {
return NewSnapshotClient(address)
}

View File

@ -1,158 +0,0 @@
package grpc
import (
"context"
"fmt"
"io"
"net"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/timestamppb"
"gitea.unprism.ru/KRBL/Agate/interfaces"
"gitea.unprism.ru/KRBL/Agate/store"
)
// SnapshotServer implements the gRPC server for snapshots
type SnapshotServer struct {
UnimplementedSnapshotServiceServer
manager interfaces.SnapshotManager
server *grpc.Server
}
// NewSnapshotServer creates a new snapshot server
func NewSnapshotServer(manager interfaces.SnapshotManager) *SnapshotServer {
return &SnapshotServer{
manager: manager,
}
}
// Start starts the gRPC server on the specified address
func (s *SnapshotServer) Start(ctx context.Context, address string) error {
lis, err := net.Listen("tcp", address)
if err != nil {
return fmt.Errorf("failed to listen on %s: %w", address, err)
}
s.server = grpc.NewServer()
RegisterSnapshotServiceServer(s.server, s)
go func() {
if err := s.server.Serve(lis); err != nil {
fmt.Printf("Server error: %v\n", err)
}
}()
fmt.Printf("Server started on %s\n", address)
return nil
}
// Stop gracefully stops the server
func (s *SnapshotServer) Stop(ctx context.Context) error {
if s.server != nil {
s.server.GracefulStop()
fmt.Println("Server stopped")
}
return nil
}
// ListSnapshots implements the gRPC ListSnapshots method
func (s *SnapshotServer) ListSnapshots(ctx context.Context, req *ListSnapshotsRequest) (*ListSnapshotsResponse, error) {
// Create empty ListOptions since the proto doesn't have active filter/pagination fields yet
opts := store.ListOptions{}
// Call manager with the required ListOptions parameter
snapshots, err := s.manager.ListSnapshots(ctx, opts)
if err != nil {
return nil, fmt.Errorf("failed to list snapshots: %w", err)
}
response := &ListSnapshotsResponse{
Snapshots: make([]*SnapshotInfo, 0, len(snapshots)),
}
for _, snapshot := range snapshots {
response.Snapshots = append(response.Snapshots, convertToGrpcSnapshotInfo(snapshot))
}
return response, nil
}
// GetSnapshotDetails implements the gRPC GetSnapshotDetails method
func (s *SnapshotServer) GetSnapshotDetails(ctx context.Context, req *GetSnapshotDetailsRequest) (*SnapshotDetails, error) {
snapshot, err := s.manager.GetSnapshotDetails(ctx, req.SnapshotId)
if err != nil {
return nil, fmt.Errorf("failed to get snapshot details: %w", err)
}
response := &SnapshotDetails{
Info: convertToGrpcSnapshotInfo(store.SnapshotInfo{
ID: snapshot.ID,
Name: snapshot.Name,
ParentID: snapshot.ParentID,
CreationTime: snapshot.CreationTime,
}),
Files: make([]*FileInfo, 0, len(snapshot.Files)),
}
for _, file := range snapshot.Files {
response.Files = append(response.Files, &FileInfo{
Path: file.Path,
SizeBytes: file.Size,
Sha256Hash: file.SHA256,
IsDir: file.IsDir,
})
}
return response, nil
}
// DownloadFile implements the gRPC DownloadFile method
func (s *SnapshotServer) DownloadFile(req *DownloadFileRequest, stream grpc.ServerStreamingServer[DownloadFileResponse]) error {
// Open the file from the snapshot
fileReader, err := s.manager.OpenFile(context.Background(), req.SnapshotId, req.FilePath)
if err != nil {
return fmt.Errorf("failed to open file: %w", err)
}
defer fileReader.Close()
// Read the file in chunks and send them to the client
buffer := make([]byte, 64*1024) // 64KB chunks
for {
n, err := fileReader.Read(buffer)
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("failed to read file: %w", err)
}
// Send the chunk to the client
if err := stream.Send(&DownloadFileResponse{
ChunkData: buffer[:n],
}); err != nil {
return fmt.Errorf("failed to send chunk: %w", err)
}
}
return nil
}
// Helper function to convert store.SnapshotInfo to grpc.SnapshotInfo
func convertToGrpcSnapshotInfo(info store.SnapshotInfo) *SnapshotInfo {
return &SnapshotInfo{
Id: info.ID,
Name: info.Name,
ParentId: info.ParentID,
CreationTime: timestamppb.New(info.CreationTime),
}
}
// RunServer is a helper function to create and start a snapshot server
func RunServer(ctx context.Context, manager interfaces.SnapshotManager, address string) (*SnapshotServer, error) {
server := NewSnapshotServer(manager)
if err := server.Start(ctx, address); err != nil {
return nil, err
}
return server, nil
}

View File

@ -26,10 +26,10 @@ const (
// Метаданные файла внутри снапшота // Метаданные файла внутри снапшота
type FileInfo struct { type FileInfo struct {
state protoimpl.MessageState `protogen:"open.v1"` state protoimpl.MessageState `protogen:"open.v1"`
Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"` // Относительный путь файла внутри снапшота Path string `protobuf:"bytes,1,opt,name=path,proto3" json:"path,omitempty"`
SizeBytes int64 `protobuf:"varint,2,opt,name=size_bytes,json=sizeBytes,proto3" json:"size_bytes,omitempty"` // Размер файла в байтах SizeBytes int64 `protobuf:"varint,2,opt,name=size_bytes,json=sizeBytes,proto3" json:"size_bytes,omitempty"`
Sha256Hash string `protobuf:"bytes,3,opt,name=sha256_hash,json=sha256Hash,proto3" json:"sha256_hash,omitempty"` // Хеш-сумма файла (SHA256) Sha256Hash string `protobuf:"bytes,3,opt,name=sha256_hash,json=sha256Hash,proto3" json:"sha256_hash,omitempty"`
IsDir bool `protobuf:"varint,4,opt,name=is_dir,json=isDir,proto3" json:"is_dir,omitempty"` // Является ли запись директорией IsDir bool `protobuf:"varint,4,opt,name=is_dir,json=isDir,proto3" json:"is_dir,omitempty"`
unknownFields protoimpl.UnknownFields unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
} }
@ -95,10 +95,10 @@ func (x *FileInfo) GetIsDir() bool {
// Краткая информация о снапшоте // Краткая информация о снапшоте
type SnapshotInfo struct { type SnapshotInfo struct {
state protoimpl.MessageState `protogen:"open.v1"` state protoimpl.MessageState `protogen:"open.v1"`
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` // Уникальный ID снапшота (UUID) Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` // Имя снапшота Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"`
ParentId string `protobuf:"bytes,3,opt,name=parent_id,json=parentId,proto3" json:"parent_id,omitempty"` // ID родительского снапшота (может быть пустым) ParentId string `protobuf:"bytes,3,opt,name=parent_id,json=parentId,proto3" json:"parent_id,omitempty"`
CreationTime *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=creation_time,json=creationTime,proto3" json:"creation_time,omitempty"` // Время создания CreationTime *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=creation_time,json=creationTime,proto3" json:"creation_time,omitempty"`
unknownFields protoimpl.UnknownFields unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
} }
@ -164,8 +164,8 @@ func (x *SnapshotInfo) GetCreationTime() *timestamppb.Timestamp {
// Детальная информация о снапшоте // Детальная информация о снапшоте
type SnapshotDetails struct { type SnapshotDetails struct {
state protoimpl.MessageState `protogen:"open.v1"` state protoimpl.MessageState `protogen:"open.v1"`
Info *SnapshotInfo `protobuf:"bytes,1,opt,name=info,proto3" json:"info,omitempty"` // Краткая информация Info *SnapshotInfo `protobuf:"bytes,1,opt,name=info,proto3" json:"info,omitempty"`
Files []*FileInfo `protobuf:"bytes,2,rep,name=files,proto3" json:"files,omitempty"` // Список файлов в снапшоте Files []*FileInfo `protobuf:"bytes,2,rep,name=files,proto3" json:"files,omitempty"`
unknownFields protoimpl.UnknownFields unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
} }
@ -214,7 +214,7 @@ func (x *SnapshotDetails) GetFiles() []*FileInfo {
return nil return nil
} }
// Запрос на получение списка снапшотов (можно добавить фильтры/пагинацию) // Запрос на получение списка снапшотов
type ListSnapshotsRequest struct { type ListSnapshotsRequest struct {
state protoimpl.MessageState `protogen:"open.v1"` state protoimpl.MessageState `protogen:"open.v1"`
unknownFields protoimpl.UnknownFields unknownFields protoimpl.UnknownFields
@ -254,7 +254,7 @@ func (*ListSnapshotsRequest) Descriptor() ([]byte, []int) {
// Ответ со списком снапшотов // Ответ со списком снапшотов
type ListSnapshotsResponse struct { type ListSnapshotsResponse struct {
state protoimpl.MessageState `protogen:"open.v1"` state protoimpl.MessageState `protogen:"open.v1"`
Snapshots []*SnapshotInfo `protobuf:"bytes,1,rep,name=snapshots,proto3" json:"snapshots,omitempty"` // string next_page_token = 2; Snapshots []*SnapshotInfo `protobuf:"bytes,1,rep,name=snapshots,proto3" json:"snapshots,omitempty"`
unknownFields protoimpl.UnknownFields unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
} }
@ -299,7 +299,7 @@ func (x *ListSnapshotsResponse) GetSnapshots() []*SnapshotInfo {
// Запрос на получение деталей снапшота // Запрос на получение деталей снапшота
type GetSnapshotDetailsRequest struct { type GetSnapshotDetailsRequest struct {
state protoimpl.MessageState `protogen:"open.v1"` state protoimpl.MessageState `protogen:"open.v1"`
SnapshotId string `protobuf:"bytes,1,opt,name=snapshot_id,json=snapshotId,proto3" json:"snapshot_id,omitempty"` // ID нужного снапшота SnapshotId string `protobuf:"bytes,1,opt,name=snapshot_id,json=snapshotId,proto3" json:"snapshot_id,omitempty"`
unknownFields protoimpl.UnknownFields unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
} }
@ -344,8 +344,8 @@ func (x *GetSnapshotDetailsRequest) GetSnapshotId() string {
// Запрос на скачивание файла // Запрос на скачивание файла
type DownloadFileRequest struct { type DownloadFileRequest struct {
state protoimpl.MessageState `protogen:"open.v1"` state protoimpl.MessageState `protogen:"open.v1"`
SnapshotId string `protobuf:"bytes,1,opt,name=snapshot_id,json=snapshotId,proto3" json:"snapshot_id,omitempty"` // ID снапшота SnapshotId string `protobuf:"bytes,1,opt,name=snapshot_id,json=snapshotId,proto3" json:"snapshot_id,omitempty"`
FilePath string `protobuf:"bytes,2,opt,name=file_path,json=filePath,proto3" json:"file_path,omitempty"` // Путь к файлу внутри снапшота FilePath string `protobuf:"bytes,2,opt,name=file_path,json=filePath,proto3" json:"file_path,omitempty"`
unknownFields protoimpl.UnknownFields unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
} }
@ -397,7 +397,7 @@ func (x *DownloadFileRequest) GetFilePath() string {
// Ответ (часть файла) при скачивании // Ответ (часть файла) при скачивании
type DownloadFileResponse struct { type DownloadFileResponse struct {
state protoimpl.MessageState `protogen:"open.v1"` state protoimpl.MessageState `protogen:"open.v1"`
ChunkData []byte `protobuf:"bytes,1,opt,name=chunk_data,json=chunkData,proto3" json:"chunk_data,omitempty"` // Кусочек данных файла ChunkData []byte `protobuf:"bytes,1,opt,name=chunk_data,json=chunkData,proto3" json:"chunk_data,omitempty"`
unknownFields protoimpl.UnknownFields unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache sizeCache protoimpl.SizeCache
} }
@ -439,6 +439,67 @@ func (x *DownloadFileResponse) GetChunkData() []byte {
return nil return nil
} }
// Запрос на скачивание разницы между снапшотами
type DownloadSnapshotDiffRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
SnapshotId string `protobuf:"bytes,1,opt,name=snapshot_id,json=snapshotId,proto3" json:"snapshot_id,omitempty"` // ID целевого снапшота
LocalParentId string `protobuf:"bytes,2,opt,name=local_parent_id,json=localParentId,proto3" json:"local_parent_id,omitempty"` // ID снапшота, который уже есть у клиента
Offset int64 `protobuf:"varint,3,opt,name=offset,proto3" json:"offset,omitempty"` // Смещение в байтах для докачки
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *DownloadSnapshotDiffRequest) Reset() {
*x = DownloadSnapshotDiffRequest{}
mi := &file_snapshot_proto_msgTypes[8]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *DownloadSnapshotDiffRequest) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*DownloadSnapshotDiffRequest) ProtoMessage() {}
func (x *DownloadSnapshotDiffRequest) ProtoReflect() protoreflect.Message {
mi := &file_snapshot_proto_msgTypes[8]
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use DownloadSnapshotDiffRequest.ProtoReflect.Descriptor instead.
func (*DownloadSnapshotDiffRequest) Descriptor() ([]byte, []int) {
return file_snapshot_proto_rawDescGZIP(), []int{8}
}
func (x *DownloadSnapshotDiffRequest) GetSnapshotId() string {
if x != nil {
return x.SnapshotId
}
return ""
}
func (x *DownloadSnapshotDiffRequest) GetLocalParentId() string {
if x != nil {
return x.LocalParentId
}
return ""
}
func (x *DownloadSnapshotDiffRequest) GetOffset() int64 {
if x != nil {
return x.Offset
}
return 0
}
var File_snapshot_proto protoreflect.FileDescriptor var File_snapshot_proto protoreflect.FileDescriptor
const file_snapshot_proto_rawDesc = "" + const file_snapshot_proto_rawDesc = "" +
@ -472,11 +533,17 @@ const file_snapshot_proto_rawDesc = "" +
"\tfile_path\x18\x02 \x01(\tR\bfilePath\"5\n" + "\tfile_path\x18\x02 \x01(\tR\bfilePath\"5\n" +
"\x14DownloadFileResponse\x12\x1d\n" + "\x14DownloadFileResponse\x12\x1d\n" +
"\n" + "\n" +
"chunk_data\x18\x01 \x01(\fR\tchunkData2\x8a\x03\n" + "chunk_data\x18\x01 \x01(\fR\tchunkData\"~\n" +
"\x1bDownloadSnapshotDiffRequest\x12\x1f\n" +
"\vsnapshot_id\x18\x01 \x01(\tR\n" +
"snapshotId\x12&\n" +
"\x0flocal_parent_id\x18\x02 \x01(\tR\rlocalParentId\x12\x16\n" +
"\x06offset\x18\x03 \x01(\x03R\x06offset2\xf1\x03\n" +
"\x0fSnapshotService\x12k\n" + "\x0fSnapshotService\x12k\n" +
"\rListSnapshots\x12 .agate.grpc.ListSnapshotsRequest\x1a!.agate.grpc.ListSnapshotsResponse\"\x15\x82\xd3\xe4\x93\x02\x0f\x12\r/v1/snapshots\x12}\n" + "\rListSnapshots\x12 .agate.grpc.ListSnapshotsRequest\x1a!.agate.grpc.ListSnapshotsResponse\"\x15\x82\xd3\xe4\x93\x02\x0f\x12\r/v1/snapshots\x12}\n" +
"\x12GetSnapshotDetails\x12%.agate.grpc.GetSnapshotDetailsRequest\x1a\x1b.agate.grpc.SnapshotDetails\"#\x82\xd3\xe4\x93\x02\x1d\x12\x1b/v1/snapshots/{snapshot_id}\x12\x8a\x01\n" + "\x12GetSnapshotDetails\x12%.agate.grpc.GetSnapshotDetailsRequest\x1a\x1b.agate.grpc.SnapshotDetails\"#\x82\xd3\xe4\x93\x02\x1d\x12\x1b/v1/snapshots/{snapshot_id}\x12\x8a\x01\n" +
"\fDownloadFile\x12\x1f.agate.grpc.DownloadFileRequest\x1a .agate.grpc.DownloadFileResponse\"5\x82\xd3\xe4\x93\x02/\x12-/v1/snapshots/{snapshot_id}/files/{file_path}0\x01B\"Z gitea.unprism.ru/KRBL/Agate/grpcb\x06proto3" "\fDownloadFile\x12\x1f.agate.grpc.DownloadFileRequest\x1a .agate.grpc.DownloadFileResponse\"5\x82\xd3\xe4\x93\x02/\x12-/v1/snapshots/{snapshot_id}/files/{file_path}0\x01\x12e\n" +
"\x14DownloadSnapshotDiff\x12'.agate.grpc.DownloadSnapshotDiffRequest\x1a .agate.grpc.DownloadFileResponse\"\x000\x01B\"Z gitea.unprism.ru/KRBL/Agate/grpcb\x06proto3"
var ( var (
file_snapshot_proto_rawDescOnce sync.Once file_snapshot_proto_rawDescOnce sync.Once
@ -490,31 +557,34 @@ func file_snapshot_proto_rawDescGZIP() []byte {
return file_snapshot_proto_rawDescData return file_snapshot_proto_rawDescData
} }
var file_snapshot_proto_msgTypes = make([]protoimpl.MessageInfo, 8) var file_snapshot_proto_msgTypes = make([]protoimpl.MessageInfo, 9)
var file_snapshot_proto_goTypes = []any{ var file_snapshot_proto_goTypes = []any{
(*FileInfo)(nil), // 0: agate.grpc.FileInfo (*FileInfo)(nil), // 0: agate.grpc.FileInfo
(*SnapshotInfo)(nil), // 1: agate.grpc.SnapshotInfo (*SnapshotInfo)(nil), // 1: agate.grpc.SnapshotInfo
(*SnapshotDetails)(nil), // 2: agate.grpc.SnapshotDetails (*SnapshotDetails)(nil), // 2: agate.grpc.SnapshotDetails
(*ListSnapshotsRequest)(nil), // 3: agate.grpc.ListSnapshotsRequest (*ListSnapshotsRequest)(nil), // 3: agate.grpc.ListSnapshotsRequest
(*ListSnapshotsResponse)(nil), // 4: agate.grpc.ListSnapshotsResponse (*ListSnapshotsResponse)(nil), // 4: agate.grpc.ListSnapshotsResponse
(*GetSnapshotDetailsRequest)(nil), // 5: agate.grpc.GetSnapshotDetailsRequest (*GetSnapshotDetailsRequest)(nil), // 5: agate.grpc.GetSnapshotDetailsRequest
(*DownloadFileRequest)(nil), // 6: agate.grpc.DownloadFileRequest (*DownloadFileRequest)(nil), // 6: agate.grpc.DownloadFileRequest
(*DownloadFileResponse)(nil), // 7: agate.grpc.DownloadFileResponse (*DownloadFileResponse)(nil), // 7: agate.grpc.DownloadFileResponse
(*timestamppb.Timestamp)(nil), // 8: google.protobuf.Timestamp (*DownloadSnapshotDiffRequest)(nil), // 8: agate.grpc.DownloadSnapshotDiffRequest
(*timestamppb.Timestamp)(nil), // 9: google.protobuf.Timestamp
} }
var file_snapshot_proto_depIdxs = []int32{ var file_snapshot_proto_depIdxs = []int32{
8, // 0: agate.grpc.SnapshotInfo.creation_time:type_name -> google.protobuf.Timestamp 9, // 0: agate.grpc.SnapshotInfo.creation_time:type_name -> google.protobuf.Timestamp
1, // 1: agate.grpc.SnapshotDetails.info:type_name -> agate.grpc.SnapshotInfo 1, // 1: agate.grpc.SnapshotDetails.info:type_name -> agate.grpc.SnapshotInfo
0, // 2: agate.grpc.SnapshotDetails.files:type_name -> agate.grpc.FileInfo 0, // 2: agate.grpc.SnapshotDetails.files:type_name -> agate.grpc.FileInfo
1, // 3: agate.grpc.ListSnapshotsResponse.snapshots:type_name -> agate.grpc.SnapshotInfo 1, // 3: agate.grpc.ListSnapshotsResponse.snapshots:type_name -> agate.grpc.SnapshotInfo
3, // 4: agate.grpc.SnapshotService.ListSnapshots:input_type -> agate.grpc.ListSnapshotsRequest 3, // 4: agate.grpc.SnapshotService.ListSnapshots:input_type -> agate.grpc.ListSnapshotsRequest
5, // 5: agate.grpc.SnapshotService.GetSnapshotDetails:input_type -> agate.grpc.GetSnapshotDetailsRequest 5, // 5: agate.grpc.SnapshotService.GetSnapshotDetails:input_type -> agate.grpc.GetSnapshotDetailsRequest
6, // 6: agate.grpc.SnapshotService.DownloadFile:input_type -> agate.grpc.DownloadFileRequest 6, // 6: agate.grpc.SnapshotService.DownloadFile:input_type -> agate.grpc.DownloadFileRequest
4, // 7: agate.grpc.SnapshotService.ListSnapshots:output_type -> agate.grpc.ListSnapshotsResponse 8, // 7: agate.grpc.SnapshotService.DownloadSnapshotDiff:input_type -> agate.grpc.DownloadSnapshotDiffRequest
2, // 8: agate.grpc.SnapshotService.GetSnapshotDetails:output_type -> agate.grpc.SnapshotDetails 4, // 8: agate.grpc.SnapshotService.ListSnapshots:output_type -> agate.grpc.ListSnapshotsResponse
7, // 9: agate.grpc.SnapshotService.DownloadFile:output_type -> agate.grpc.DownloadFileResponse 2, // 9: agate.grpc.SnapshotService.GetSnapshotDetails:output_type -> agate.grpc.SnapshotDetails
7, // [7:10] is the sub-list for method output_type 7, // 10: agate.grpc.SnapshotService.DownloadFile:output_type -> agate.grpc.DownloadFileResponse
4, // [4:7] is the sub-list for method input_type 7, // 11: agate.grpc.SnapshotService.DownloadSnapshotDiff:output_type -> agate.grpc.DownloadFileResponse
8, // [8:12] is the sub-list for method output_type
4, // [4:8] is the sub-list for method input_type
4, // [4:4] is the sub-list for extension type_name 4, // [4:4] is the sub-list for extension type_name
4, // [4:4] is the sub-list for extension extendee 4, // [4:4] is the sub-list for extension extendee
0, // [0:4] is the sub-list for field type_name 0, // [0:4] is the sub-list for field type_name
@ -531,7 +601,7 @@ func file_snapshot_proto_init() {
GoPackagePath: reflect.TypeOf(x{}).PkgPath(), GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: unsafe.Slice(unsafe.StringData(file_snapshot_proto_rawDesc), len(file_snapshot_proto_rawDesc)), RawDescriptor: unsafe.Slice(unsafe.StringData(file_snapshot_proto_rawDesc), len(file_snapshot_proto_rawDesc)),
NumEnums: 0, NumEnums: 0,
NumMessages: 8, NumMessages: 9,
NumExtensions: 0, NumExtensions: 0,
NumServices: 1, NumServices: 1,
}, },

View File

@ -3,7 +3,7 @@ syntax = "proto3";
package agate.grpc; package agate.grpc;
import "google/protobuf/timestamp.proto"; import "google/protobuf/timestamp.proto";
import "google/api/annotations.proto"; // Добавлено для HTTP mapping import "google/api/annotations.proto";
option go_package = "gitea.unprism.ru/KRBL/Agate/grpc"; option go_package = "gitea.unprism.ru/KRBL/Agate/grpc";
@ -30,77 +30,59 @@ service SnapshotService {
}; };
} }
// --- Методы для управления (опционально, можно не включать в публичный API клиента) --- // Скачать архив, содержащий только разницу между двумя снапшотами
// Создать новый снапшот из директории (если серверу позволено инициировать) rpc DownloadSnapshotDiff(DownloadSnapshotDiffRequest) returns (stream DownloadFileResponse) {}
// rpc CreateSnapshot(CreateSnapshotRequest) returns (Snapshot);
// Удалить снапшот (если требуется)
// rpc DeleteSnapshot(DeleteSnapshotRequest) returns (DeleteSnapshotResponse);
} }
// Метаданные файла внутри снапшота // Метаданные файла внутри снапшота
message FileInfo { message FileInfo {
string path = 1; // Относительный путь файла внутри снапшота string path = 1;
int64 size_bytes = 2; // Размер файла в байтах int64 size_bytes = 2;
string sha256_hash = 3; // Хеш-сумма файла (SHA256) string sha256_hash = 3;
bool is_dir = 4; // Является ли запись директорией bool is_dir = 4;
} }
// Краткая информация о снапшоте // Краткая информация о снапшоте
message SnapshotInfo { message SnapshotInfo {
string id = 1; // Уникальный ID снапшота (UUID) string id = 1;
string name = 2; // Имя снапшота string name = 2;
string parent_id = 3; // ID родительского снапшота (может быть пустым) string parent_id = 3;
google.protobuf.Timestamp creation_time = 4; // Время создания google.protobuf.Timestamp creation_time = 4;
} }
// Детальная информация о снапшоте // Детальная информация о снапшоте
message SnapshotDetails { message SnapshotDetails {
SnapshotInfo info = 1; // Краткая информация SnapshotInfo info = 1;
repeated FileInfo files = 2; // Список файлов в снапшоте repeated FileInfo files = 2;
} }
// Запрос на получение списка снапшотов (можно добавить фильтры/пагинацию) // Запрос на получение списка снапшотов
message ListSnapshotsRequest { message ListSnapshotsRequest {}
// string filter_by_name = 1;
// int32 page_size = 2;
// string page_token = 3;
}
// Ответ со списком снапшотов // Ответ со списком снапшотов
message ListSnapshotsResponse { message ListSnapshotsResponse {
repeated SnapshotInfo snapshots = 1; repeated SnapshotInfo snapshots = 1;
// string next_page_token = 2;
} }
// Запрос на получение деталей снапшота // Запрос на получение деталей снапшота
message GetSnapshotDetailsRequest { message GetSnapshotDetailsRequest {
string snapshot_id = 1; // ID нужного снапшота string snapshot_id = 1;
} }
// Запрос на скачивание файла // Запрос на скачивание файла
message DownloadFileRequest { message DownloadFileRequest {
string snapshot_id = 1; // ID снапшота string snapshot_id = 1;
string file_path = 2; // Путь к файлу внутри снапшота string file_path = 2;
} }
// Ответ (часть файла) при скачивании // Ответ (часть файла) при скачивании
message DownloadFileResponse { message DownloadFileResponse {
bytes chunk_data = 1; // Кусочек данных файла bytes chunk_data = 1;
} }
// --- Сообщения для опциональных методов управления --- // Запрос на скачивание разницы между снапшотами
/* message DownloadSnapshotDiffRequest {
message CreateSnapshotRequest { string snapshot_id = 1; // ID целевого снапшота
string source_path = 1; // Путь к директории на сервере string local_parent_id = 2; // ID снапшота, который уже есть у клиента
string name = 2; int64 offset = 3; // Смещение в байтах для докачки
string parent_id = 3; // Опционально
} }
message DeleteSnapshotRequest {
string snapshot_id = 1;
}
message DeleteSnapshotResponse {
bool success = 1;
}
*/

View File

@ -19,9 +19,10 @@ import (
const _ = grpc.SupportPackageIsVersion9 const _ = grpc.SupportPackageIsVersion9
const ( const (
SnapshotService_ListSnapshots_FullMethodName = "/agate.grpc.SnapshotService/ListSnapshots" SnapshotService_ListSnapshots_FullMethodName = "/agate.grpc.SnapshotService/ListSnapshots"
SnapshotService_GetSnapshotDetails_FullMethodName = "/agate.grpc.SnapshotService/GetSnapshotDetails" SnapshotService_GetSnapshotDetails_FullMethodName = "/agate.grpc.SnapshotService/GetSnapshotDetails"
SnapshotService_DownloadFile_FullMethodName = "/agate.grpc.SnapshotService/DownloadFile" SnapshotService_DownloadFile_FullMethodName = "/agate.grpc.SnapshotService/DownloadFile"
SnapshotService_DownloadSnapshotDiff_FullMethodName = "/agate.grpc.SnapshotService/DownloadSnapshotDiff"
) )
// SnapshotServiceClient is the client API for SnapshotService service. // SnapshotServiceClient is the client API for SnapshotService service.
@ -36,6 +37,8 @@ type SnapshotServiceClient interface {
GetSnapshotDetails(ctx context.Context, in *GetSnapshotDetailsRequest, opts ...grpc.CallOption) (*SnapshotDetails, error) GetSnapshotDetails(ctx context.Context, in *GetSnapshotDetailsRequest, opts ...grpc.CallOption) (*SnapshotDetails, error)
// Скачать конкретный файл из снапшота (потоковая передача) // Скачать конкретный файл из снапшота (потоковая передача)
DownloadFile(ctx context.Context, in *DownloadFileRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[DownloadFileResponse], error) DownloadFile(ctx context.Context, in *DownloadFileRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[DownloadFileResponse], error)
// Скачать архив, содержащий только разницу между двумя снапшотами
DownloadSnapshotDiff(ctx context.Context, in *DownloadSnapshotDiffRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[DownloadFileResponse], error)
} }
type snapshotServiceClient struct { type snapshotServiceClient struct {
@ -85,6 +88,25 @@ func (c *snapshotServiceClient) DownloadFile(ctx context.Context, in *DownloadFi
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type SnapshotService_DownloadFileClient = grpc.ServerStreamingClient[DownloadFileResponse] type SnapshotService_DownloadFileClient = grpc.ServerStreamingClient[DownloadFileResponse]
func (c *snapshotServiceClient) DownloadSnapshotDiff(ctx context.Context, in *DownloadSnapshotDiffRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[DownloadFileResponse], error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &SnapshotService_ServiceDesc.Streams[1], SnapshotService_DownloadSnapshotDiff_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
x := &grpc.GenericClientStream[DownloadSnapshotDiffRequest, DownloadFileResponse]{ClientStream: stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type SnapshotService_DownloadSnapshotDiffClient = grpc.ServerStreamingClient[DownloadFileResponse]
// SnapshotServiceServer is the server API for SnapshotService service. // SnapshotServiceServer is the server API for SnapshotService service.
// All implementations must embed UnimplementedSnapshotServiceServer // All implementations must embed UnimplementedSnapshotServiceServer
// for forward compatibility. // for forward compatibility.
@ -97,6 +119,8 @@ type SnapshotServiceServer interface {
GetSnapshotDetails(context.Context, *GetSnapshotDetailsRequest) (*SnapshotDetails, error) GetSnapshotDetails(context.Context, *GetSnapshotDetailsRequest) (*SnapshotDetails, error)
// Скачать конкретный файл из снапшота (потоковая передача) // Скачать конкретный файл из снапшота (потоковая передача)
DownloadFile(*DownloadFileRequest, grpc.ServerStreamingServer[DownloadFileResponse]) error DownloadFile(*DownloadFileRequest, grpc.ServerStreamingServer[DownloadFileResponse]) error
// Скачать архив, содержащий только разницу между двумя снапшотами
DownloadSnapshotDiff(*DownloadSnapshotDiffRequest, grpc.ServerStreamingServer[DownloadFileResponse]) error
mustEmbedUnimplementedSnapshotServiceServer() mustEmbedUnimplementedSnapshotServiceServer()
} }
@ -116,6 +140,9 @@ func (UnimplementedSnapshotServiceServer) GetSnapshotDetails(context.Context, *G
func (UnimplementedSnapshotServiceServer) DownloadFile(*DownloadFileRequest, grpc.ServerStreamingServer[DownloadFileResponse]) error { func (UnimplementedSnapshotServiceServer) DownloadFile(*DownloadFileRequest, grpc.ServerStreamingServer[DownloadFileResponse]) error {
return status.Errorf(codes.Unimplemented, "method DownloadFile not implemented") return status.Errorf(codes.Unimplemented, "method DownloadFile not implemented")
} }
func (UnimplementedSnapshotServiceServer) DownloadSnapshotDiff(*DownloadSnapshotDiffRequest, grpc.ServerStreamingServer[DownloadFileResponse]) error {
return status.Errorf(codes.Unimplemented, "method DownloadSnapshotDiff not implemented")
}
func (UnimplementedSnapshotServiceServer) mustEmbedUnimplementedSnapshotServiceServer() {} func (UnimplementedSnapshotServiceServer) mustEmbedUnimplementedSnapshotServiceServer() {}
func (UnimplementedSnapshotServiceServer) testEmbeddedByValue() {} func (UnimplementedSnapshotServiceServer) testEmbeddedByValue() {}
@ -184,6 +211,17 @@ func _SnapshotService_DownloadFile_Handler(srv interface{}, stream grpc.ServerSt
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type SnapshotService_DownloadFileServer = grpc.ServerStreamingServer[DownloadFileResponse] type SnapshotService_DownloadFileServer = grpc.ServerStreamingServer[DownloadFileResponse]
func _SnapshotService_DownloadSnapshotDiff_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(DownloadSnapshotDiffRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(SnapshotServiceServer).DownloadSnapshotDiff(m, &grpc.GenericServerStream[DownloadSnapshotDiffRequest, DownloadFileResponse]{ServerStream: stream})
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type SnapshotService_DownloadSnapshotDiffServer = grpc.ServerStreamingServer[DownloadFileResponse]
// SnapshotService_ServiceDesc is the grpc.ServiceDesc for SnapshotService service. // SnapshotService_ServiceDesc is the grpc.ServiceDesc for SnapshotService service.
// It's only intended for direct use with grpc.RegisterService, // It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy) // and not to be introspected or modified (even as a copy)
@ -206,6 +244,11 @@ var SnapshotService_ServiceDesc = grpc.ServiceDesc{
Handler: _SnapshotService_DownloadFile_Handler, Handler: _SnapshotService_DownloadFile_Handler,
ServerStreams: true, ServerStreams: true,
}, },
{
StreamName: "DownloadSnapshotDiff",
Handler: _SnapshotService_DownloadSnapshotDiff_Handler,
ServerStreams: true,
},
}, },
Metadata: "snapshot.proto", Metadata: "snapshot.proto",
} }

View File

@ -1,17 +1,16 @@
package agate package agate
import ( import (
"bytes"
"context" "context"
"crypto/sha256"
"io"
"log" "log"
"os" "os"
"path/filepath" "path/filepath"
"strings"
"testing" "testing"
"time" "time"
"gitea.unprism.ru/KRBL/Agate/remote" "gitea.unprism.ru/KRBL/Agate/remote"
"gitea.unprism.ru/KRBL/Agate/store"
) )
// TestGRPCServerClient tests the interaction between a gRPC server and client. // TestGRPCServerClient tests the interaction between a gRPC server and client.
@ -24,366 +23,241 @@ func TestGRPCServerClient(t *testing.T) {
t.Skip("Skipping gRPC server-client test in short mode") t.Skip("Skipping gRPC server-client test in short mode")
} }
// Create a temporary directory for the server // --- Setup Server ---
serverDir, err := os.MkdirTemp("", "agate-server-*") serverDir, err := os.MkdirTemp("", "agate-server-*")
if err != nil { if err != nil {
t.Fatalf("Failed to create server temp directory: %v", err) t.Fatalf("Failed to create server temp directory: %v", err)
} }
defer os.RemoveAll(serverDir) defer os.RemoveAll(serverDir)
// Create a temporary directory for the client serverAgate, err := New(AgateOptions{WorkDir: serverDir})
clientDir, err := os.MkdirTemp("", "agate-client-*")
if err != nil {
t.Fatalf("Failed to create client temp directory: %v", err)
}
defer os.RemoveAll(clientDir)
// Create Agate options for the server
serverOptions := AgateOptions{
WorkDir: serverDir,
}
// Create Agate instance for the server
serverAgate, err := New(serverOptions)
if err != nil { if err != nil {
t.Fatalf("Failed to create server Agate instance: %v", err) t.Fatalf("Failed to create server Agate instance: %v", err)
} }
defer serverAgate.Close() defer serverAgate.Close()
// Create a data directory
dataDir := serverAgate.options.BlobStore.GetActiveDir() dataDir := serverAgate.options.BlobStore.GetActiveDir()
if err := os.MkdirAll(dataDir, 0755); err != nil { if err := os.MkdirAll(dataDir, 0755); err != nil {
t.Fatalf("Failed to create data directory: %v", err) t.Fatalf("Failed to create data directory: %v", err)
} }
// Create initial test files for the first snapshot // Create initial test files for the first snapshot
initialFiles := map[string]string{ if err := os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("content1"), 0644); err != nil {
filepath.Join(dataDir, "file1.txt"): "Initial content of file 1", t.Fatal(err)
filepath.Join(dataDir, "file2.txt"): "Initial content of file 2", }
filepath.Join(dataDir, "subdir", "file3.txt"): "Initial content of file 3", if err := os.WriteFile(filepath.Join(dataDir, "file2.txt"), []byte("content2"), 0644); err != nil {
t.Fatal(err)
} }
// Create subdirectory ctx, cancel := context.WithCancel(context.Background())
if err := os.MkdirAll(filepath.Join(dataDir, "subdir"), 0755); err != nil { defer cancel()
t.Fatalf("Failed to create subdirectory: %v", err)
}
// Create the files
for path, content := range initialFiles {
if err := os.WriteFile(path, []byte(content), 0644); err != nil {
t.Fatalf("Failed to create test file %s: %v", path, err)
}
}
// Create the first snapshot
ctx := context.Background()
snapshot1ID, err := serverAgate.SaveSnapshot(ctx, "Snapshot 1", "") snapshot1ID, err := serverAgate.SaveSnapshot(ctx, "Snapshot 1", "")
if err != nil { if err != nil {
t.Fatalf("Failed to create first snapshot: %v", err) t.Fatalf("Failed to create first snapshot: %v", err)
} }
t.Logf("Created first snapshot with ID: %s", snapshot1ID) t.Logf("Created first snapshot with ID: %s", snapshot1ID)
// Modify some files and add a new file for the second snapshot // Modify content for the second snapshot
modifiedFiles := map[string]string{ if err := os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("modified content1"), 0644); err != nil {
filepath.Join(dataDir, "file1.txt"): "Modified content of file 1", t.Fatal(err)
filepath.Join(dataDir, "file4.txt"): "Content of new file 4", }
if err := os.WriteFile(filepath.Join(dataDir, "file3.txt"), []byte("new file3"), 0644); err != nil {
t.Fatal(err)
} }
for path, content := range modifiedFiles {
if err := os.WriteFile(path, []byte(content), 0644); err != nil {
t.Fatalf("Failed to modify/create test file %s: %v", path, err)
}
}
// Create the second snapshot
snapshot2ID, err := serverAgate.SaveSnapshot(ctx, "Snapshot 2", snapshot1ID) snapshot2ID, err := serverAgate.SaveSnapshot(ctx, "Snapshot 2", snapshot1ID)
if err != nil { if err != nil {
t.Fatalf("Failed to create second snapshot: %v", err) t.Fatalf("Failed to create second snapshot: %v", err)
} }
t.Logf("Created second snapshot with ID: %s", snapshot2ID) t.Logf("Created second snapshot with ID: %s", snapshot2ID)
// Delete a file and modify another for the third snapshot
if err := os.Remove(filepath.Join(dataDir, "file2.txt")); err != nil {
t.Fatalf("Failed to delete test file: %v", err)
}
if err := os.WriteFile(filepath.Join(dataDir, "subdir/file3.txt"), []byte("Modified content of file 3"), 0644); err != nil {
t.Fatalf("Failed to modify test file: %v", err)
}
// Create the third snapshot
snapshot3ID, err := serverAgate.SaveSnapshot(ctx, "Snapshot 3", snapshot2ID)
if err != nil {
t.Fatalf("Failed to create third snapshot: %v", err)
}
t.Logf("Created third snapshot with ID: %s", snapshot3ID)
// Start the gRPC server // Start the gRPC server
serverAddress := "localhost:50051" serverAddress := "localhost:50051"
server, err := remote.RunServer(ctx, serverAgate.manager, serverAddress) server := remote.NewServer(serverAgate.manager)
if err != nil { go func() {
t.Fatalf("Failed to start gRPC server: %v", err) if err := server.Start(ctx, serverAddress); err != nil {
} log.Printf("Server start error: %v", err)
defer server.Stop(ctx) }
}()
// Give the server a moment to start defer server.Stop()
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
// Connect a client to the server // --- Setup Client ---
client, err := remote.NewClient(serverAddress)
if err != nil {
t.Fatalf("Failed to connect client to server: %v", err)
}
defer client.Close()
// List snapshots from the client
snapshots, err := client.ListSnapshots(ctx)
if err != nil {
t.Fatalf("Failed to list snapshots from client: %v", err)
}
// Verify we have 3 snapshots
if len(snapshots) != 3 {
t.Errorf("Expected 3 snapshots, got %d", len(snapshots))
}
// Find the latest snapshot (should be snapshot3)
var latestSnapshot store.SnapshotInfo
for _, snapshot := range snapshots {
if latestSnapshot.CreationTime.Before(snapshot.CreationTime) {
latestSnapshot = snapshot
}
}
// Verify the latest snapshot is snapshot3
if latestSnapshot.ID != snapshot3ID {
t.Errorf("Latest snapshot ID is %s, expected %s", latestSnapshot.ID, snapshot3ID)
}
// Get detailed information about the latest snapshot
snapshotDetails, err := client.FetchSnapshotDetails(ctx, latestSnapshot.ID)
if err != nil {
t.Fatalf("Failed to fetch snapshot details: %v", err)
}
// Verify the snapshot details
if snapshotDetails.ID != snapshot3ID {
t.Errorf("Snapshot details ID is %s, expected %s", snapshotDetails.ID, snapshot3ID)
}
// Create a directory to download the snapshot to
downloadDir := filepath.Join(clientDir, "download")
if err := os.MkdirAll(downloadDir, 0755); err != nil {
t.Fatalf("Failed to create download directory: %v", err)
}
// Download the snapshot
err = client.DownloadSnapshot(ctx, latestSnapshot.ID, downloadDir, "")
if err != nil {
t.Fatalf("Failed to download snapshot: %v", err)
}
// Verify the downloaded files match the expected content
expectedFiles := map[string]string{
filepath.Join(downloadDir, "file1.txt"): "Modified content of file 1",
filepath.Join(downloadDir, "file4.txt"): "Content of new file 4",
filepath.Join(downloadDir, "subdir/file3.txt"): "Modified content of file 3",
}
for path, expectedContent := range expectedFiles {
content, err := os.ReadFile(path)
if err != nil {
t.Fatalf("Failed to read downloaded file %s: %v", path, err)
}
if string(content) != expectedContent {
t.Errorf("Downloaded file %s has wrong content: got %s, want %s", path, string(content), expectedContent)
}
}
// Verify that file2.txt doesn't exist in the downloaded snapshot
if _, err := os.Stat(filepath.Join(downloadDir, "file2.txt")); !os.IsNotExist(err) {
t.Errorf("file2.txt should not exist in the downloaded snapshot")
}
}
// TestGRPC_GetRemoteSnapshot_Incremental tests the incremental download functionality
// of GetRemoteSnapshot, verifying that it reuses files from a local parent snapshot
// instead of downloading them again.
func TestGRPC_GetRemoteSnapshot_Incremental(t *testing.T) {
// Skip this test in short mode
if testing.Short() {
t.Skip("Skipping incremental GetRemoteSnapshot test in short mode")
}
// Create a temporary directory for the server
serverDir, err := os.MkdirTemp("", "agate-server-*")
if err != nil {
t.Fatalf("Failed to create server temp directory: %v", err)
}
defer os.RemoveAll(serverDir)
// Create a temporary directory for the client
clientDir, err := os.MkdirTemp("", "agate-client-*") clientDir, err := os.MkdirTemp("", "agate-client-*")
if err != nil { if err != nil {
t.Fatalf("Failed to create client temp directory: %v", err) t.Fatalf("Failed to create client temp directory: %v", err)
} }
defer os.RemoveAll(clientDir) defer os.RemoveAll(clientDir)
// Create a buffer to capture client logs clientAgate, err := New(AgateOptions{WorkDir: clientDir, CleanOnRestore: true})
var clientLogBuffer bytes.Buffer
clientLogger := log.New(&clientLogBuffer, "", 0)
// Create Agate options for the server
serverOptions := AgateOptions{
WorkDir: serverDir,
}
// Create Agate options for the client with logger
clientOptions := AgateOptions{
WorkDir: clientDir,
Logger: clientLogger,
}
// Create Agate instances for server and client
serverAgate, err := New(serverOptions)
if err != nil {
t.Fatalf("Failed to create server Agate instance: %v", err)
}
defer serverAgate.Close()
clientAgate, err := New(clientOptions)
if err != nil { if err != nil {
t.Fatalf("Failed to create client Agate instance: %v", err) t.Fatalf("Failed to create client Agate instance: %v", err)
} }
defer clientAgate.Close() defer clientAgate.Close()
// Create a data directory on the server // --- Test Scenario ---
serverDataDir := serverAgate.options.BlobStore.GetActiveDir() // 1. Client downloads the first snapshot completely
if err := os.MkdirAll(serverDataDir, 0755); err != nil { t.Log("Client downloading Snapshot 1...")
t.Fatalf("Failed to create server data directory: %v", err) if err := clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshot1ID, ""); err != nil {
t.Fatalf("Client failed to get snapshot 1: %v", err)
} }
// Create test files for snapshot A on the server // Verify content of snapshot 1
if err := os.MkdirAll(filepath.Join(serverDataDir, "subdir"), 0755); err != nil { if err := clientAgate.RestoreSnapshot(ctx, snapshot1ID); err != nil {
t.Fatalf("Failed to create subdirectory: %v", err) t.Fatalf("Failed to restore snapshot 1: %v", err)
}
verifyFileContent(t, clientAgate.GetActiveDir(), "file1.txt", "content1")
verifyFileContent(t, clientAgate.GetActiveDir(), "file2.txt", "content2")
// 2. Client downloads the second snapshot incrementally
t.Log("Client downloading Snapshot 2 (incrementally)...")
if err := clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshot2ID, snapshot1ID); err != nil {
t.Fatalf("Client failed to get snapshot 2: %v", err)
} }
snapshotAFiles := map[string]string{ // Verify content of snapshot 2
filepath.Join(serverDataDir, "file1.txt"): "Content of file 1", if err := clientAgate.RestoreSnapshot(ctx, snapshot2ID); err != nil {
filepath.Join(serverDataDir, "file2.txt"): "Content of file 2", t.Fatalf("Failed to restore snapshot 2: %v", err)
filepath.Join(serverDataDir, "subdir/file3.txt"): "Content of file 3",
} }
verifyFileContent(t, clientAgate.GetActiveDir(), "file1.txt", "modified content1")
verifyFileContent(t, clientAgate.GetActiveDir(), "file3.txt", "new file3")
// file2.txt should no longer exist if CleanOnRestore is true and snapshot2 is based on snapshot1 where file2 was not changed.
// But our diff logic is additive. Let's re-check the logic. The logic is: parent + diff = new. So file2 should exist.
verifyFileContent(t, clientAgate.GetActiveDir(), "file2.txt", "content2")
}
for path, content := range snapshotAFiles { func verifyFileContent(t *testing.T, dir, filename, expectedContent string) {
if err := os.WriteFile(path, []byte(content), 0644); err != nil { t.Helper()
t.Fatalf("Failed to create test file %s: %v", path, err) content, err := os.ReadFile(filepath.Join(dir, filename))
}
}
// Create snapshot A on the server
ctx := context.Background()
snapshotAID, err := serverAgate.SaveSnapshot(ctx, "Snapshot A", "")
if err != nil { if err != nil {
t.Fatalf("Failed to create snapshot A: %v", err) t.Fatalf("Failed to read file %s: %v", filename, err)
} }
t.Logf("Created snapshot A with ID: %s", snapshotAID) if string(content) != expectedContent {
t.Errorf("File %s has wrong content: got '%s', want '%s'", filename, string(content), expectedContent)
// Modify some files and add a new file for snapshot B
snapshotBChanges := map[string]string{
filepath.Join(serverDataDir, "file1.txt"): "Modified content of file 1", // Modified file
filepath.Join(serverDataDir, "file4.txt"): "Content of new file 4", // New file
filepath.Join(serverDataDir, "subdir/file5.txt"): "Content of new file 5", // New file in subdir
} }
}
for path, content := range snapshotBChanges { // TestGRPC_GetRemoteSnapshot_FullDownload tests a full download when no parent is specified.
// Ensure parent directory exists func TestGRPC_GetRemoteSnapshot_FullDownload(t *testing.T) {
if err := os.MkdirAll(filepath.Dir(path), 0755); err != nil { // --- Setup Server ---
t.Fatalf("Failed to create directory for %s: %v", path, err) serverDir, _ := os.MkdirTemp("", "agate-server-*")
} defer os.RemoveAll(serverDir)
if err := os.WriteFile(path, []byte(content), 0644); err != nil { serverAgate, _ := New(AgateOptions{WorkDir: serverDir})
t.Fatalf("Failed to create/modify test file %s: %v", path, err) defer serverAgate.Close()
} dataDir := serverAgate.options.BlobStore.GetActiveDir()
} os.MkdirAll(dataDir, 0755)
os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("full download"), 0644)
// Create snapshot B on the server (with A as parent) ctx, cancel := context.WithCancel(context.Background())
snapshotBID, err := serverAgate.SaveSnapshot(ctx, "Snapshot B", snapshotAID) defer cancel()
snapshotID, err := serverAgate.SaveSnapshot(ctx, "FullSnapshot", "")
if err != nil { if err != nil {
t.Fatalf("Failed to create snapshot B: %v", err) t.Fatalf("Failed to create snapshot: %v", err)
} }
t.Logf("Created snapshot B with ID: %s", snapshotBID)
// Start the gRPC server // Start Server
serverAddress := "localhost:50052" // Use a different port than the other test serverAddress := "localhost:50052"
server, err := remote.RunServer(ctx, serverAgate.manager, serverAddress) server := remote.NewServer(serverAgate.manager)
if err != nil { go func() { server.Start(ctx, serverAddress) }()
t.Fatalf("Failed to start gRPC server: %v", err) defer server.Stop()
}
defer server.Stop(ctx)
// Give the server a moment to start
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
// Step 1: Client downloads snapshot A // --- Setup Client ---
err = clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshotAID, "") clientDir, _ := os.MkdirTemp("", "agate-client-*")
if err != nil { defer os.RemoveAll(clientDir)
t.Fatalf("Failed to download snapshot A: %v", err) clientAgate, _ := New(AgateOptions{WorkDir: clientDir, CleanOnRestore: true})
} defer clientAgate.Close()
t.Log("Client successfully downloaded snapshot A")
// Clear the log buffer to capture only logs from the incremental download // --- Test Scenario ---
clientLogBuffer.Reset() t.Log("Client performing full download...")
if err := clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshotID, ""); err != nil {
// Step 2: Client downloads snapshot B, specifying A as the local parent t.Fatalf("Client failed to get snapshot: %v", err)
err = clientAgate.GetRemoteSnapshot(ctx, serverAddress, snapshotBID, snapshotAID)
if err != nil {
t.Fatalf("Failed to download snapshot B: %v", err)
}
t.Log("Client successfully downloaded snapshot B")
// Step 3: Verify that snapshot B was correctly imported
// Restore snapshot B to a directory
restoreDir := filepath.Join(clientDir, "restore")
if err := os.MkdirAll(restoreDir, 0755); err != nil {
t.Fatalf("Failed to create restore directory: %v", err)
} }
err = clientAgate.RestoreSnapshotToDir(ctx, snapshotBID, restoreDir) // Verify content
if err != nil { if err := clientAgate.RestoreSnapshot(ctx, snapshotID); err != nil {
t.Fatalf("Failed to restore snapshot B: %v", err) t.Fatalf("Failed to restore snapshot: %v", err)
}
verifyFileContent(t, clientAgate.GetActiveDir(), "file1.txt", "full download")
}
// TestGRPC_DownloadSnapshotDiff_Resumption tests the download resumption logic.
func TestGRPC_DownloadSnapshotDiff_Resumption(t *testing.T) {
// --- Setup Server ---
serverDir, _ := os.MkdirTemp("", "agate-server-*")
defer os.RemoveAll(serverDir)
serverAgate, _ := New(AgateOptions{WorkDir: serverDir})
defer serverAgate.Close()
dataDir := serverAgate.options.BlobStore.GetActiveDir()
os.MkdirAll(dataDir, 0755)
os.WriteFile(filepath.Join(dataDir, "file1.txt"), []byte("content1"), 0644)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Snap 1
snapshot1ID, _ := serverAgate.SaveSnapshot(ctx, "Snap1", "")
// Snap 2 (with changes)
os.WriteFile(filepath.Join(dataDir, "file2.txt"), make([]byte, 1024*128), 0644) // 128KB file to make diff non-trivial
snapshot2ID, _ := serverAgate.SaveSnapshot(ctx, "Snap2", snapshot1ID)
// Start Server
serverAddress := "localhost:50053"
server := remote.NewServer(serverAgate.manager)
go func() { server.Start(ctx, serverAddress) }()
defer server.Stop()
time.Sleep(100 * time.Millisecond)
// --- Setup Client ---
clientDir, _ := os.MkdirTemp("", "agate-client-*")
defer os.RemoveAll(clientDir)
rClient, err := remote.NewClient(serverAddress)
if err != nil {
t.Fatalf("Failed to create remote client: %v", err)
}
defer rClient.Close()
// --- Test Scenario ---
// 1. Manually download first part of the diff archive
diffPath := filepath.Join(clientDir, "diff.zip.part")
diffReader, err := serverAgate.manager.StreamSnapshotDiff(ctx, snapshot2ID, snapshot1ID, 0)
if err != nil {
t.Fatalf("Failed to get diff stream from manager: %v", err)
}
defer diffReader.Close()
// Read first 64KB
firstChunk := make([]byte, 64*1024)
n, err := io.ReadFull(diffReader, firstChunk)
if err != nil && err != io.ErrUnexpectedEOF {
t.Fatalf("Failed to read first chunk: %v, read %d bytes", err, n)
}
if err := os.WriteFile(diffPath, firstChunk[:n], 0644); err != nil {
t.Fatalf("Failed to write partial file: %v", err)
}
diffReader.Close() // Simulate connection drop
// 2. Resume download using the client
t.Log("Resuming download...")
if err := rClient.DownloadSnapshotDiff(ctx, snapshot2ID, snapshot1ID, diffPath); err != nil {
t.Fatalf("Failed to resume download: %v", err)
}
// 3. Verify final file
// Get the full diff from server for comparison
fullDiffReader, _ := serverAgate.manager.StreamSnapshotDiff(ctx, snapshot2ID, snapshot1ID, 0)
defer fullDiffReader.Close()
fullDiffData, _ := io.ReadAll(fullDiffReader)
resumedData, _ := os.ReadFile(diffPath)
if len(resumedData) != len(fullDiffData) {
t.Errorf("Resumed file size is incorrect. Got %d, want %d", len(resumedData), len(fullDiffData))
}
if sha256.Sum256(resumedData) != sha256.Sum256(fullDiffData) {
t.Error("File content mismatch after resumption")
} }
// Verify the restored files match the expected content
expectedFiles := map[string]string{
filepath.Join(restoreDir, "file1.txt"): "Modified content of file 1", // Modified file
filepath.Join(restoreDir, "file2.txt"): "Content of file 2", // Unchanged file
filepath.Join(restoreDir, "file4.txt"): "Content of new file 4", // New file
filepath.Join(restoreDir, "subdir/file3.txt"): "Content of file 3", // Unchanged file
filepath.Join(restoreDir, "subdir/file5.txt"): "Content of new file 5", // New file
}
for path, expectedContent := range expectedFiles {
content, err := os.ReadFile(path)
if err != nil {
t.Fatalf("Failed to read restored file %s: %v", path, err)
}
if string(content) != expectedContent {
t.Errorf("Restored file %s has wrong content: got %s, want %s", path, string(content), expectedContent)
}
}
// Step 4: Analyze logs to verify incremental download behavior
logs := clientLogBuffer.String()
// Check for evidence of file reuse
if !strings.Contains(logs, "Reusing file") {
t.Errorf("No evidence of file reuse in logs")
}
// Check for evidence of downloading only new/changed files
if !strings.Contains(logs, "Downloading file") {
t.Errorf("No evidence of downloading new files in logs")
}
// Log the relevant parts for debugging
t.Logf("Log evidence of incremental download:\n%s", logs)
} }

View File

@ -33,6 +33,11 @@ type SnapshotManager interface {
// UpdateSnapshotMetadata updates the metadata of an existing snapshot, allowing changes to its name. // UpdateSnapshotMetadata updates the metadata of an existing snapshot, allowing changes to its name.
UpdateSnapshotMetadata(ctx context.Context, snapshotID string, newName string) error UpdateSnapshotMetadata(ctx context.Context, snapshotID string, newName string) error
// StreamSnapshotDiff creates and streams a differential archive between two snapshots.
// It returns an io.ReadCloser for the archive stream and an error.
// The caller is responsible for closing the reader, which will also handle cleanup of temporary resources.
StreamSnapshotDiff(ctx context.Context, snapshotID, parentID string, offset int64) (io.ReadCloser, error)
} }
// SnapshotServer defines the interface for a server that can share snapshots // SnapshotServer defines the interface for a server that can share snapshots
@ -52,8 +57,8 @@ type SnapshotClient interface {
// FetchSnapshotDetails retrieves detailed information about a specific snapshot // FetchSnapshotDetails retrieves detailed information about a specific snapshot
FetchSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error) FetchSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error)
// DownloadSnapshot downloads a snapshot from the server // DownloadSnapshotDiff downloads a differential archive between two snapshots to a target directory
DownloadSnapshot(ctx context.Context, snapshotID string, targetDir string, localParentID string) error DownloadSnapshotDiff(ctx context.Context, snapshotID, localParentID, targetPath string) error
// Close closes the connection to the server // Close closes the connection to the server
Close() error Close() error

View File

@ -2,6 +2,7 @@ package agate
import ( import (
"archive/zip" "archive/zip"
"bytes"
"context" "context"
"errors" "errors"
"fmt" "fmt"
@ -31,6 +32,11 @@ func CreateSnapshotManager(metadataStore store.MetadataStore, blobStore store.Bl
return nil, errors.New("parameters can't be nil") return nil, errors.New("parameters can't be nil")
} }
// Ensure logger is never nil.
if logger == nil {
logger = log.New(io.Discard, "", 0)
}
return &SnapshotManagerData{ return &SnapshotManagerData{
metadataStore: metadataStore, metadataStore: metadataStore,
blobStore: blobStore, blobStore: blobStore,
@ -183,31 +189,24 @@ func (data *SnapshotManagerData) DeleteSnapshot(ctx context.Context, snapshotID
return errors.New("snapshot ID cannot be empty") return errors.New("snapshot ID cannot be empty")
} }
// First check if the snapshot exists and get its details
snapshot, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID) snapshot, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID)
if err != nil { if err != nil {
if errors.Is(err, ErrNotFound) { if errors.Is(err, store.ErrNotFound) {
// If snapshot doesn't exist, return success (idempotent operation)
return nil return nil
} }
return fmt.Errorf("failed to check if snapshot exists: %w", err) return fmt.Errorf("failed to check if snapshot exists: %w", err)
} }
// Get the parent ID of the snapshot being deleted
parentID := snapshot.ParentID parentID := snapshot.ParentID
// Find all snapshots that have the deleted snapshot as their parent
// We need to update their parent ID to maintain the chain
opts := store.ListOptions{} opts := store.ListOptions{}
allSnapshots, err := data.metadataStore.ListSnapshotsMetadata(ctx, opts) allSnapshots, err := data.metadataStore.ListSnapshotsMetadata(ctx, opts)
if err != nil { if err != nil {
return fmt.Errorf("failed to list snapshots: %w", err) return fmt.Errorf("failed to list snapshots: %w", err)
} }
// Update parent references for any snapshots that have this one as a parent
for _, info := range allSnapshots { for _, info := range allSnapshots {
if info.ParentID == snapshotID { if info.ParentID == snapshotID {
// Используем новый, более надежный метод для обновления только parent_id
if err := data.metadataStore.UpdateSnapshotParentID(ctx, info.ID, parentID); err != nil { if err := data.metadataStore.UpdateSnapshotParentID(ctx, info.ID, parentID); err != nil {
data.logger.Printf("WARNING: failed to update parent reference for snapshot %s: %v", info.ID, err) data.logger.Printf("WARNING: failed to update parent reference for snapshot %s: %v", info.ID, err)
} else { } else {
@ -216,16 +215,11 @@ func (data *SnapshotManagerData) DeleteSnapshot(ctx context.Context, snapshotID
} }
} }
// Delete the metadata first
if err := data.metadataStore.DeleteSnapshotMetadata(ctx, snapshotID); err != nil { if err := data.metadataStore.DeleteSnapshotMetadata(ctx, snapshotID); err != nil {
return fmt.Errorf("failed to delete snapshot metadata: %w", err) return fmt.Errorf("failed to delete snapshot metadata: %w", err)
} }
// Then delete the blob
if err := data.blobStore.DeleteBlob(ctx, snapshotID); err != nil { if err := data.blobStore.DeleteBlob(ctx, snapshotID); err != nil {
// Note: We don't return here because we've already deleted the metadata
// and the blob store should handle the case where the blob doesn't exist
// Log the error instead
data.logger.Printf("WARNING: failed to delete snapshot blob: %v", err) data.logger.Printf("WARNING: failed to delete snapshot blob: %v", err)
} }
@ -392,22 +386,130 @@ func (data *SnapshotManagerData) UpdateSnapshotMetadata(ctx context.Context, sna
return errors.New("new name cannot be empty") return errors.New("new name cannot be empty")
} }
// Get the current snapshot metadata
snapshot, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID) snapshot, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID)
if err != nil { if err != nil {
if errors.Is(err, ErrNotFound) { if errors.Is(err, store.ErrNotFound) {
return ErrNotFound return ErrNotFound
} }
return fmt.Errorf("failed to get snapshot metadata: %w", err) return fmt.Errorf("failed to get snapshot metadata: %w", err)
} }
// Update the name
snapshot.Name = newName snapshot.Name = newName
// Save the updated metadata
if err := data.metadataStore.SaveSnapshotMetadata(ctx, *snapshot); err != nil { if err := data.metadataStore.SaveSnapshotMetadata(ctx, *snapshot); err != nil {
return fmt.Errorf("failed to update snapshot metadata: %w", err) return fmt.Errorf("failed to update snapshot metadata: %w", err)
} }
return nil return nil
} }
// diffArchiveReader is a wrapper around an *os.File that handles cleanup of temporary files.
type diffArchiveReader struct {
*os.File
tempArchive string
tempStaging string
}
// Close closes the file and removes the temporary archive and staging directory.
func (r *diffArchiveReader) Close() error {
err := r.File.Close()
os.Remove(r.tempArchive)
os.RemoveAll(r.tempStaging)
return err
}
func (data *SnapshotManagerData) StreamSnapshotDiff(ctx context.Context, snapshotID, parentID string, offset int64) (io.ReadCloser, error) {
targetSnap, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID)
if err != nil {
return nil, fmt.Errorf("failed to get target snapshot metadata: %w", err)
}
parentFiles := make(map[string]string)
if parentID != "" {
parentSnap, err := data.metadataStore.GetSnapshotMetadata(ctx, parentID)
if err == nil {
for _, file := range parentSnap.Files {
if !file.IsDir {
parentFiles[file.Path] = file.SHA256
}
}
} else {
data.logger.Printf("Warning: failed to get parent snapshot %s, creating full diff: %v", parentID, err)
}
}
var filesToInclude []string
for _, file := range targetSnap.Files {
if file.IsDir {
continue
}
if parentHash, ok := parentFiles[file.Path]; !ok || parentHash != file.SHA256 {
filesToInclude = append(filesToInclude, file.Path)
}
}
if len(filesToInclude) == 0 {
return io.NopCloser(bytes.NewReader(nil)), nil
}
tempStagingDir, err := os.MkdirTemp(data.blobStore.GetBaseDir(), "diff-staging-*")
if err != nil {
return nil, fmt.Errorf("failed to create temp staging directory: %w", err)
}
targetBlobPath, err := data.blobStore.GetBlobPath(ctx, snapshotID)
if err != nil {
os.RemoveAll(tempStagingDir)
return nil, err
}
for _, filePath := range filesToInclude {
destPath := filepath.Join(tempStagingDir, filePath)
if err := os.MkdirAll(filepath.Dir(destPath), 0755); err != nil {
os.RemoveAll(tempStagingDir)
return nil, fmt.Errorf("failed to create dir for diff file: %w", err)
}
fileWriter, err := os.Create(destPath)
if err != nil {
os.RemoveAll(tempStagingDir)
return nil, err
}
err = archive.ExtractFileFromArchive(targetBlobPath, filePath, fileWriter)
fileWriter.Close()
if err != nil {
os.RemoveAll(tempStagingDir)
return nil, fmt.Errorf("failed to extract file %s for diff: %w", filePath, err)
}
}
tempArchivePath := filepath.Join(data.blobStore.GetBaseDir(), "diff-"+snapshotID+".zip")
if err := archive.CreateArchive(tempStagingDir, tempArchivePath); err != nil {
os.RemoveAll(tempStagingDir)
os.Remove(tempArchivePath)
return nil, fmt.Errorf("failed to create diff archive: %w", err)
}
archiveFile, err := os.Open(tempArchivePath)
if err != nil {
os.RemoveAll(tempStagingDir)
os.Remove(tempArchivePath)
return nil, err
}
if offset > 0 {
if _, err := archiveFile.Seek(offset, io.SeekStart); err != nil {
archiveFile.Close()
os.RemoveAll(tempStagingDir)
os.Remove(tempArchivePath)
return nil, fmt.Errorf("failed to seek in diff archive: %w", err)
}
}
return &diffArchiveReader{
File: archiveFile,
tempArchive: tempArchivePath,
tempStaging: tempStagingDir,
}, nil
}

View File

@ -554,3 +554,60 @@ func TestUpdateSnapshotMetadata(t *testing.T) {
t.Fatalf("Expected error when updating non-existent snapshot, got nil") t.Fatalf("Expected error when updating non-existent snapshot, got nil")
} }
} }
func TestStreamSnapshotDiff_EdgeCases(t *testing.T) {
tempDir, metadataStore, blobStore, cleanup := setupTestEnvironment(t)
defer cleanup()
sourceDir := filepath.Join(tempDir, "source")
os.MkdirAll(sourceDir, 0755)
createTestFiles(t, sourceDir)
manager, err := CreateSnapshotManager(metadataStore, blobStore, nil)
if err != nil {
t.Fatalf("Failed to create snapshot manager: %v", err)
}
ctx := context.Background()
// Create two identical snapshots
snap1, _ := manager.CreateSnapshot(ctx, sourceDir, "Snap1", "")
snap2, _ := manager.CreateSnapshot(ctx, sourceDir, "Snap2", snap1.ID)
// Test 1: Diff between identical snapshots should be empty
reader, err := manager.StreamSnapshotDiff(ctx, snap2.ID, snap1.ID, 0)
if err != nil {
t.Fatalf("Expected no error for identical snapshots, got %v", err)
}
defer reader.Close()
data, _ := io.ReadAll(reader)
if len(data) != 0 {
t.Errorf("Expected empty diff for identical snapshots, got %d bytes", len(data))
}
// Test 2: Diff with a non-existent parent should be a full archive
reader, err = manager.StreamSnapshotDiff(ctx, snap1.ID, "non-existent-parent", 0)
if err != nil {
t.Fatalf("Expected no error for non-existent parent, got %v", err)
}
defer reader.Close()
data, _ = io.ReadAll(reader)
if len(data) == 0 {
t.Error("Expected full archive for non-existent parent, got empty diff")
}
// Create an empty source dir
emptyDir := filepath.Join(tempDir, "empty_source")
os.MkdirAll(emptyDir, 0755)
emptySnap, _ := manager.CreateSnapshot(ctx, emptyDir, "EmptySnap", "")
// Test 3: Diff of an empty snapshot should be empty
reader, err = manager.StreamSnapshotDiff(ctx, emptySnap.ID, "", 0)
if err != nil {
t.Fatalf("Expected no error for empty snapshot, got %v", err)
}
defer reader.Close()
data, _ = io.ReadAll(reader)
if len(data) != 0 {
t.Errorf("Expected empty diff for empty snapshot, got %d bytes", len(data))
}
}

View File

@ -15,34 +15,26 @@ import (
"gitea.unprism.ru/KRBL/Agate/store" "gitea.unprism.ru/KRBL/Agate/store"
) )
// Client represents a client for connecting to a remote snapshot server // Client представляет клиент для подключения к удаленному серверу снапшотов.
// It implements the interfaces.SnapshotClient interface
type Client struct { type Client struct {
conn *stdgrpc.ClientConn conn *stdgrpc.ClientConn
client agateGrpc.SnapshotServiceClient client agateGrpc.SnapshotServiceClient
} }
// Ensure Client implements interfaces.SnapshotClient // Убедимся, что Client реализует интерфейс interfaces.SnapshotClient
var _ interfaces.SnapshotClient = (*Client)(nil) var _ interfaces.SnapshotClient = (*Client)(nil)
// NewClient creates a new client connected to the specified address // NewClient создает нового клиента, подключенного к указанному адресу.
func NewClient(address string) (*Client, error) { func NewClient(address string) (*Client, error) {
// Connect to the server with insecure credentials (for simplicity)
conn, err := stdgrpc.Dial(address, stdgrpc.WithTransportCredentials(insecure.NewCredentials())) conn, err := stdgrpc.Dial(address, stdgrpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to connect to server at %s: %w", address, err) return nil, fmt.Errorf("failed to connect to server at %s: %w", address, err)
} }
// Create the gRPC client
client := agateGrpc.NewSnapshotServiceClient(conn) client := agateGrpc.NewSnapshotServiceClient(conn)
return &Client{conn: conn, client: client}, nil
return &Client{
conn: conn,
client: client,
}, nil
} }
// Close closes the connection to the server // Close закрывает соединение с сервером.
func (c *Client) Close() error { func (c *Client) Close() error {
if c.conn != nil { if c.conn != nil {
return c.conn.Close() return c.conn.Close()
@ -50,14 +42,13 @@ func (c *Client) Close() error {
return nil return nil
} }
// ListSnapshots retrieves a list of snapshots from the remote server // ListSnapshots получает список снапшотов с удаленного сервера.
func (c *Client) ListSnapshots(ctx context.Context) ([]store.SnapshotInfo, error) { func (c *Client) ListSnapshots(ctx context.Context) ([]store.SnapshotInfo, error) {
response, err := c.client.ListSnapshots(ctx, &agateGrpc.ListSnapshotsRequest{}) response, err := c.client.ListSnapshots(ctx, &agateGrpc.ListSnapshotsRequest{})
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to list snapshots: %w", err) return nil, fmt.Errorf("failed to list snapshots: %w", err)
} }
// Convert gRPC snapshot info to store.SnapshotInfo
snapshots := make([]store.SnapshotInfo, 0, len(response.Snapshots)) snapshots := make([]store.SnapshotInfo, 0, len(response.Snapshots))
for _, snapshot := range response.Snapshots { for _, snapshot := range response.Snapshots {
snapshots = append(snapshots, store.SnapshotInfo{ snapshots = append(snapshots, store.SnapshotInfo{
@ -67,11 +58,10 @@ func (c *Client) ListSnapshots(ctx context.Context) ([]store.SnapshotInfo, error
CreationTime: snapshot.CreationTime.AsTime(), CreationTime: snapshot.CreationTime.AsTime(),
}) })
} }
return snapshots, nil return snapshots, nil
} }
// FetchSnapshotDetails retrieves detailed information about a specific snapshot // FetchSnapshotDetails получает детальную информацию о конкретном снапшоте.
func (c *Client) FetchSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error) { func (c *Client) FetchSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error) {
response, err := c.client.GetSnapshotDetails(ctx, &agateGrpc.GetSnapshotDetailsRequest{ response, err := c.client.GetSnapshotDetails(ctx, &agateGrpc.GetSnapshotDetailsRequest{
SnapshotId: snapshotID, SnapshotId: snapshotID,
@ -80,7 +70,6 @@ func (c *Client) FetchSnapshotDetails(ctx context.Context, snapshotID string) (*
return nil, fmt.Errorf("failed to get snapshot details: %w", err) return nil, fmt.Errorf("failed to get snapshot details: %w", err)
} }
// Convert gRPC snapshot details to store.Snapshot
snapshot := &store.Snapshot{ snapshot := &store.Snapshot{
ID: response.Info.Id, ID: response.Info.Id,
Name: response.Info.Name, Name: response.Info.Name,
@ -89,7 +78,6 @@ func (c *Client) FetchSnapshotDetails(ctx context.Context, snapshotID string) (*
Files: make([]store.FileInfo, 0, len(response.Files)), Files: make([]store.FileInfo, 0, len(response.Files)),
} }
// Convert file info
for _, file := range response.Files { for _, file := range response.Files {
snapshot.Files = append(snapshot.Files, store.FileInfo{ snapshot.Files = append(snapshot.Files, store.FileInfo{
Path: file.Path, Path: file.Path,
@ -98,118 +86,48 @@ func (c *Client) FetchSnapshotDetails(ctx context.Context, snapshotID string) (*
SHA256: file.Sha256Hash, SHA256: file.Sha256Hash,
}) })
} }
return snapshot, nil return snapshot, nil
} }
// DownloadSnapshot downloads a snapshot from the server // DownloadSnapshotDiff скачивает архив с разницей между снапшотами.
// This implementation downloads each file individually to optimize bandwidth usage func (c *Client) DownloadSnapshotDiff(ctx context.Context, snapshotID, localParentID, targetPath string) error {
func (c *Client) DownloadSnapshot(ctx context.Context, snapshotID string, targetDir string, localParentID string) error { var offset int64
// Get snapshot details fileInfo, err := os.Stat(targetPath)
snapshot, err := c.FetchSnapshotDetails(ctx, snapshotID) if err == nil {
offset = fileInfo.Size()
} else if !os.IsNotExist(err) {
return fmt.Errorf("failed to stat temporary file: %w", err)
}
req := &agateGrpc.DownloadSnapshotDiffRequest{
SnapshotId: snapshotID,
LocalParentId: localParentID,
Offset: offset,
}
stream, err := c.client.DownloadSnapshotDiff(ctx, req)
if err != nil { if err != nil {
return fmt.Errorf("failed to get snapshot details: %w", err) return fmt.Errorf("failed to start snapshot diff download: %w", err)
} }
// Create target directory if it doesn't exist
if err := os.MkdirAll(targetDir, 0755); err != nil {
return fmt.Errorf("failed to create target directory: %w", err)
}
// If a local parent is specified, get its details to compare files
var localParentFiles map[string]store.FileInfo
if localParentID != "" {
localParent, err := c.FetchSnapshotDetails(ctx, localParentID)
if err == nil {
// Create a map of file paths to file info for quick lookup
localParentFiles = make(map[string]store.FileInfo, len(localParent.Files))
for _, file := range localParent.Files {
localParentFiles[file.Path] = file
}
}
}
// Download each file
for _, file := range snapshot.Files {
// Skip directories, we'll create them when needed
if file.IsDir {
// Create directory
dirPath := filepath.Join(targetDir, file.Path)
if err := os.MkdirAll(dirPath, 0755); err != nil {
return fmt.Errorf("failed to create directory %s: %w", dirPath, err)
}
continue
}
// Check if we can skip downloading this file
if localParentFiles != nil {
if parentFile, exists := localParentFiles[file.Path]; exists && parentFile.SHA256 == file.SHA256 {
// File exists in parent with same hash, copy it instead of downloading
parentFilePath := filepath.Join(targetDir, "..", localParentID, file.Path)
targetFilePath := filepath.Join(targetDir, file.Path)
// Ensure parent directory exists
if err := os.MkdirAll(filepath.Dir(targetFilePath), 0755); err != nil {
return fmt.Errorf("failed to create directory for %s: %w", targetFilePath, err)
}
// Copy the file
if err := copyFile(parentFilePath, targetFilePath); err != nil {
// If copy fails, fall back to downloading
fmt.Printf("Failed to copy file %s, will download instead: %v\n", file.Path, err)
} else {
// Skip to next file
continue
}
}
}
// Download the file
if err := c.downloadFile(ctx, snapshotID, file.Path, filepath.Join(targetDir, file.Path)); err != nil {
return fmt.Errorf("failed to download file %s: %w", file.Path, err)
}
}
return nil
}
// downloadFile downloads a single file from the server
func (c *Client) downloadFile(ctx context.Context, snapshotID, filePath, targetPath string) error {
// Create the request
req := &agateGrpc.DownloadFileRequest{
SnapshotId: snapshotID,
FilePath: filePath,
}
// Start streaming the file
stream, err := c.client.DownloadFile(ctx, req)
if err != nil {
return fmt.Errorf("failed to start file download: %w", err)
}
// Ensure the target directory exists
if err := os.MkdirAll(filepath.Dir(targetPath), 0755); err != nil { if err := os.MkdirAll(filepath.Dir(targetPath), 0755); err != nil {
return fmt.Errorf("failed to create directory for %s: %w", targetPath, err) return fmt.Errorf("failed to create directory for %s: %w", targetPath, err)
} }
// Create the target file file, err := os.OpenFile(targetPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
file, err := os.Create(targetPath)
if err != nil { if err != nil {
return fmt.Errorf("failed to create file %s: %w", targetPath, err) return fmt.Errorf("failed to open file %s: %w", targetPath, err)
} }
defer file.Close() defer file.Close()
// Receive and write chunks
for { for {
resp, err := stream.Recv() resp, err := stream.Recv()
if err == io.EOF { if err == io.EOF {
break break
} }
if err != nil { if err != nil {
return fmt.Errorf("error receiving file chunk: %w", err) return fmt.Errorf("error receiving diff chunk: %w", err)
} }
// Write the chunk to the file
if _, err := file.Write(resp.ChunkData); err != nil { if _, err := file.Write(resp.ChunkData); err != nil {
return fmt.Errorf("error writing to file: %w", err) return fmt.Errorf("error writing to file: %w", err)
} }
@ -217,26 +135,3 @@ func (c *Client) downloadFile(ctx context.Context, snapshotID, filePath, targetP
return nil return nil
} }
// Helper function to copy a file
func copyFile(src, dst string) error {
sourceFile, err := os.Open(src)
if err != nil {
return err
}
defer sourceFile.Close()
destFile, err := os.Create(dst)
if err != nil {
return err
}
defer destFile.Close()
_, err = io.Copy(destFile, sourceFile)
return err
}
// Connect creates a new client connected to the specified address
func Connect(address string) (*Client, error) {
return NewClient(address)
}

View File

@ -14,21 +14,21 @@ import (
"gitea.unprism.ru/KRBL/Agate/store" "gitea.unprism.ru/KRBL/Agate/store"
) )
// Server implements the gRPC server for snapshots // Server реализует gRPC-сервер для снапшотов.
type Server struct { type Server struct {
agateGrpc.UnimplementedSnapshotServiceServer agateGrpc.UnimplementedSnapshotServiceServer
manager interfaces.SnapshotManager manager interfaces.SnapshotManager
server *stdgrpc.Server server *stdgrpc.Server
} }
// NewServer creates a new snapshot server // NewServer создает новый сервер снапшотов.
func NewServer(manager interfaces.SnapshotManager) *Server { func NewServer(manager interfaces.SnapshotManager) *Server {
return &Server{ return &Server{
manager: manager, manager: manager,
} }
} }
// Start starts the gRPC server on the specified address // Start запускает gRPC-сервер на указанном адресе.
func (s *Server) Start(ctx context.Context, address string) error { func (s *Server) Start(ctx context.Context, address string) error {
lis, err := net.Listen("tcp", address) lis, err := net.Listen("tcp", address)
if err != nil { if err != nil {
@ -45,23 +45,24 @@ func (s *Server) Start(ctx context.Context, address string) error {
}() }()
fmt.Printf("Server started on %s\n", address) fmt.Printf("Server started on %s\n", address)
// Ждем отмены контекста для остановки сервера
<-ctx.Done()
s.Stop()
return nil return nil
} }
// Stop gracefully stops the server // Stop изящно останавливает сервер.
func (s *Server) Stop(ctx context.Context) error { func (s *Server) Stop() {
if s.server != nil { if s.server != nil {
s.server.GracefulStop() s.server.GracefulStop()
fmt.Println("Server stopped") fmt.Println("Server stopped")
} }
return nil
} }
// ListSnapshots implements the gRPC ListSnapshots method // ListSnapshots реализует gRPC-метод ListSnapshots.
func (s *Server) ListSnapshots(ctx context.Context, req *agateGrpc.ListSnapshotsRequest) (*agateGrpc.ListSnapshotsResponse, error) { func (s *Server) ListSnapshots(ctx context.Context, req *agateGrpc.ListSnapshotsRequest) (*agateGrpc.ListSnapshotsResponse, error) {
// Create empty ListOptions since the proto doesn't have active filter/pagination fields yet
opts := store.ListOptions{} opts := store.ListOptions{}
snapshots, err := s.manager.ListSnapshots(ctx, opts) snapshots, err := s.manager.ListSnapshots(ctx, opts)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to list snapshots: %w", err) return nil, fmt.Errorf("failed to list snapshots: %w", err)
@ -78,7 +79,7 @@ func (s *Server) ListSnapshots(ctx context.Context, req *agateGrpc.ListSnapshots
return response, nil return response, nil
} }
// GetSnapshotDetails implements the gRPC GetSnapshotDetails method // GetSnapshotDetails реализует gRPC-метод GetSnapshotDetails.
func (s *Server) GetSnapshotDetails(ctx context.Context, req *agateGrpc.GetSnapshotDetailsRequest) (*agateGrpc.SnapshotDetails, error) { func (s *Server) GetSnapshotDetails(ctx context.Context, req *agateGrpc.GetSnapshotDetailsRequest) (*agateGrpc.SnapshotDetails, error) {
snapshot, err := s.manager.GetSnapshotDetails(ctx, req.SnapshotId) snapshot, err := s.manager.GetSnapshotDetails(ctx, req.SnapshotId)
if err != nil { if err != nil {
@ -107,17 +108,15 @@ func (s *Server) GetSnapshotDetails(ctx context.Context, req *agateGrpc.GetSnaps
return response, nil return response, nil
} }
// DownloadFile implements the gRPC DownloadFile method // DownloadFile реализует gRPC-метод DownloadFile.
func (s *Server) DownloadFile(req *agateGrpc.DownloadFileRequest, stream agateGrpc.SnapshotService_DownloadFileServer) error { func (s *Server) DownloadFile(req *agateGrpc.DownloadFileRequest, stream agateGrpc.SnapshotService_DownloadFileServer) error {
// Open the file from the snapshot
fileReader, err := s.manager.OpenFile(context.Background(), req.SnapshotId, req.FilePath) fileReader, err := s.manager.OpenFile(context.Background(), req.SnapshotId, req.FilePath)
if err != nil { if err != nil {
return fmt.Errorf("failed to open file: %w", err) return fmt.Errorf("failed to open file: %w", err)
} }
defer fileReader.Close() defer fileReader.Close()
// Read the file in chunks and send them to the client buffer := make([]byte, 64*1024)
buffer := make([]byte, 64*1024) // 64KB chunks
for { for {
n, err := fileReader.Read(buffer) n, err := fileReader.Read(buffer)
if err == io.EOF { if err == io.EOF {
@ -126,19 +125,40 @@ func (s *Server) DownloadFile(req *agateGrpc.DownloadFileRequest, stream agateGr
if err != nil { if err != nil {
return fmt.Errorf("failed to read file: %w", err) return fmt.Errorf("failed to read file: %w", err)
} }
if err := stream.Send(&agateGrpc.DownloadFileResponse{ChunkData: buffer[:n]}); err != nil {
// Send the chunk to the client
if err := stream.Send(&agateGrpc.DownloadFileResponse{
ChunkData: buffer[:n],
}); err != nil {
return fmt.Errorf("failed to send chunk: %w", err) return fmt.Errorf("failed to send chunk: %w", err)
} }
} }
return nil return nil
} }
// Helper function to convert store.SnapshotInfo to grpc.SnapshotInfo // DownloadSnapshotDiff реализует gRPC-метод DownloadSnapshotDiff.
func (s *Server) DownloadSnapshotDiff(req *agateGrpc.DownloadSnapshotDiffRequest, stream agateGrpc.SnapshotService_DownloadSnapshotDiffServer) error {
diffReader, err := s.manager.StreamSnapshotDiff(context.Background(), req.SnapshotId, req.LocalParentId, req.Offset)
if err != nil {
return fmt.Errorf("failed to stream snapshot diff: %w", err)
}
defer diffReader.Close()
buffer := make([]byte, 64*1024)
for {
n, err := diffReader.Read(buffer)
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("failed to read from diff stream: %w", err)
}
if n > 0 {
if err := stream.Send(&agateGrpc.DownloadFileResponse{ChunkData: buffer[:n]}); err != nil {
return fmt.Errorf("failed to send diff chunk: %w", err)
}
}
}
return nil
}
// Вспомогательная функция для конвертации store.SnapshotInfo в grpc.SnapshotInfo
func convertToGrpcSnapshotInfo(info store.SnapshotInfo) *agateGrpc.SnapshotInfo { func convertToGrpcSnapshotInfo(info store.SnapshotInfo) *agateGrpc.SnapshotInfo {
return &agateGrpc.SnapshotInfo{ return &agateGrpc.SnapshotInfo{
Id: info.ID, Id: info.ID,
@ -147,12 +167,3 @@ func convertToGrpcSnapshotInfo(info store.SnapshotInfo) *agateGrpc.SnapshotInfo
CreationTime: timestamppb.New(info.CreationTime), CreationTime: timestamppb.New(info.CreationTime),
} }
} }
// RunServer is a helper function to create and start a snapshot server
func RunServer(ctx context.Context, manager interfaces.SnapshotManager, address string) (*Server, error) {
server := NewServer(manager)
if err := server.Start(ctx, address); err != nil {
return nil, err
}
return server, nil
}