diff --git a/api.go b/api.go index 9e6616b..021ee37 100644 --- a/api.go +++ b/api.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "path/filepath" + "unprism.ru/KRBL/agate/grpc" "unprism.ru/KRBL/agate/store" ) @@ -171,7 +172,74 @@ func (a *Agate) Close() error { } // StartServer starts a gRPC server to share snapshots. -// This is a placeholder for future implementation. func (a *Agate) StartServer(ctx context.Context, address string) error { - return errors.New("server functionality not implemented yet") + _, err := grpc.RunServer(ctx, a.manager, address) + if err != nil { + return fmt.Errorf("failed to start server: %w", err) + } + + // We don't store the server reference because we don't have a way to stop it yet + // In a future version, we could add a StopServer method + + return nil +} + +// ConnectRemote connects to a remote snapshot server. +// Returns a client that can be used to interact with the remote server. +func (a *Agate) ConnectRemote(address string) (*grpc.SnapshotClient, error) { + client, err := grpc.ConnectToServer(address) + if err != nil { + return nil, fmt.Errorf("failed to connect to remote server: %w", err) + } + + return client, nil +} + +// GetRemoteSnapshotList retrieves a list of snapshots from a remote server. +func (a *Agate) GetRemoteSnapshotList(ctx context.Context, address string) ([]store.SnapshotInfo, error) { + client, err := a.ConnectRemote(address) + if err != nil { + return nil, err + } + defer client.Close() + + return client.ListSnapshots(ctx) +} + +// GetRemoteSnapshot downloads a snapshot from a remote server. +// If localParentID is provided, it will be used to optimize the download by skipping files that already exist locally. +func (a *Agate) GetRemoteSnapshot(ctx context.Context, address string, snapshotID string, localParentID string) error { + client, err := a.ConnectRemote(address) + if err != nil { + return err + } + defer client.Close() + + // Create a temporary directory for the downloaded snapshot + tempDir := filepath.Join(a.options.WorkDir, "temp", snapshotID) + if err := os.MkdirAll(tempDir, 0755); err != nil { + return fmt.Errorf("failed to create temporary directory: %w", err) + } + + // Download the snapshot + if err := client.DownloadSnapshot(ctx, snapshotID, tempDir, localParentID); err != nil { + return fmt.Errorf("failed to download snapshot: %w", err) + } + + // Get the snapshot details to create a local copy + details, err := client.FetchSnapshotDetails(ctx, snapshotID) + if err != nil { + return fmt.Errorf("failed to get snapshot details: %w", err) + } + + // Create a local snapshot from the downloaded files + _, err = a.manager.CreateSnapshot(ctx, tempDir, details.Name, localParentID) + if err != nil { + return fmt.Errorf("failed to create local snapshot: %w", err) + } + + // Clean up the temporary directory + os.RemoveAll(tempDir) + + return nil } diff --git a/grpc/client.go b/grpc/client.go new file mode 100644 index 0000000..6cee3df --- /dev/null +++ b/grpc/client.go @@ -0,0 +1,236 @@ +package grpc + +import ( + "context" + "fmt" + "io" + "os" + "path/filepath" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "unprism.ru/KRBL/agate/store" +) + +// SnapshotClient implements the client for connecting to a remote snapshot server +type SnapshotClient struct { + conn *grpc.ClientConn + client SnapshotServiceClient +} + +// NewSnapshotClient creates a new client connected to the specified address +func NewSnapshotClient(address string) (*SnapshotClient, error) { + // Connect to the server with insecure credentials (for simplicity) + conn, err := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return nil, fmt.Errorf("failed to connect to server at %s: %w", address, err) + } + + // Create the gRPC client + client := NewSnapshotServiceClient(conn) + + return &SnapshotClient{ + conn: conn, + client: client, + }, nil +} + +// Close closes the connection to the server +func (c *SnapshotClient) Close() error { + if c.conn != nil { + return c.conn.Close() + } + return nil +} + +// ListSnapshots retrieves a list of snapshots from the remote server +func (c *SnapshotClient) ListSnapshots(ctx context.Context) ([]store.SnapshotInfo, error) { + response, err := c.client.ListSnapshots(ctx, &ListSnapshotsRequest{}) + if err != nil { + return nil, fmt.Errorf("failed to list snapshots: %w", err) + } + + // Convert gRPC snapshot info to store.SnapshotInfo + snapshots := make([]store.SnapshotInfo, 0, len(response.Snapshots)) + for _, snapshot := range response.Snapshots { + snapshots = append(snapshots, store.SnapshotInfo{ + ID: snapshot.Id, + Name: snapshot.Name, + ParentID: snapshot.ParentId, + CreationTime: snapshot.CreationTime.AsTime(), + }) + } + + return snapshots, nil +} + +// FetchSnapshotDetails retrieves detailed information about a specific snapshot +func (c *SnapshotClient) FetchSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error) { + response, err := c.client.GetSnapshotDetails(ctx, &GetSnapshotDetailsRequest{ + SnapshotId: snapshotID, + }) + if err != nil { + return nil, fmt.Errorf("failed to get snapshot details: %w", err) + } + + // Convert gRPC snapshot details to store.Snapshot + snapshot := &store.Snapshot{ + ID: response.Info.Id, + Name: response.Info.Name, + ParentID: response.Info.ParentId, + CreationTime: response.Info.CreationTime.AsTime(), + Files: make([]store.FileInfo, 0, len(response.Files)), + } + + // Convert file info + for _, file := range response.Files { + snapshot.Files = append(snapshot.Files, store.FileInfo{ + Path: file.Path, + Size: file.SizeBytes, + IsDir: file.IsDir, + SHA256: file.Sha256Hash, + }) + } + + return snapshot, nil +} + +// DownloadSnapshot downloads a snapshot from the server +// This implementation downloads each file individually to optimize bandwidth usage +func (c *SnapshotClient) DownloadSnapshot(ctx context.Context, snapshotID string, targetDir string, localParentID string) error { + // Get snapshot details + snapshot, err := c.FetchSnapshotDetails(ctx, snapshotID) + if err != nil { + return fmt.Errorf("failed to get snapshot details: %w", err) + } + + // Create target directory if it doesn't exist + if err := os.MkdirAll(targetDir, 0755); err != nil { + return fmt.Errorf("failed to create target directory: %w", err) + } + + // If a local parent is specified, get its details to compare files + var localParentFiles map[string]store.FileInfo + if localParentID != "" { + localParent, err := c.FetchSnapshotDetails(ctx, localParentID) + if err == nil { + // Create a map of file paths to file info for quick lookup + localParentFiles = make(map[string]store.FileInfo, len(localParent.Files)) + for _, file := range localParent.Files { + localParentFiles[file.Path] = file + } + } + } + + // Download each file + for _, file := range snapshot.Files { + // Skip directories, we'll create them when needed + if file.IsDir { + // Create directory + dirPath := filepath.Join(targetDir, file.Path) + if err := os.MkdirAll(dirPath, 0755); err != nil { + return fmt.Errorf("failed to create directory %s: %w", dirPath, err) + } + continue + } + + // Check if we can skip downloading this file + if localParentFiles != nil { + if parentFile, exists := localParentFiles[file.Path]; exists && parentFile.SHA256 == file.SHA256 { + // File exists in parent with same hash, copy it instead of downloading + parentFilePath := filepath.Join(targetDir, "..", localParentID, file.Path) + targetFilePath := filepath.Join(targetDir, file.Path) + + // Ensure parent directory exists + if err := os.MkdirAll(filepath.Dir(targetFilePath), 0755); err != nil { + return fmt.Errorf("failed to create directory for %s: %w", targetFilePath, err) + } + + // Copy the file + if err := copyFile(parentFilePath, targetFilePath); err != nil { + // If copy fails, fall back to downloading + fmt.Printf("Failed to copy file %s, will download instead: %v\n", file.Path, err) + } else { + // Skip to next file + continue + } + } + } + + // Download the file + if err := c.downloadFile(ctx, snapshotID, file.Path, filepath.Join(targetDir, file.Path)); err != nil { + return fmt.Errorf("failed to download file %s: %w", file.Path, err) + } + } + + return nil +} + +// downloadFile downloads a single file from the server +func (c *SnapshotClient) downloadFile(ctx context.Context, snapshotID, filePath, targetPath string) error { + // Create the request + req := &DownloadFileRequest{ + SnapshotId: snapshotID, + FilePath: filePath, + } + + // Start streaming the file + stream, err := c.client.DownloadFile(ctx, req) + if err != nil { + return fmt.Errorf("failed to start file download: %w", err) + } + + // Ensure the target directory exists + if err := os.MkdirAll(filepath.Dir(targetPath), 0755); err != nil { + return fmt.Errorf("failed to create directory for %s: %w", targetPath, err) + } + + // Create the target file + file, err := os.Create(targetPath) + if err != nil { + return fmt.Errorf("failed to create file %s: %w", targetPath, err) + } + defer file.Close() + + // Receive and write chunks + for { + resp, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + return fmt.Errorf("error receiving file chunk: %w", err) + } + + // Write the chunk to the file + if _, err := file.Write(resp.ChunkData); err != nil { + return fmt.Errorf("error writing to file: %w", err) + } + } + + return nil +} + +// Helper function to copy a file +func copyFile(src, dst string) error { + sourceFile, err := os.Open(src) + if err != nil { + return err + } + defer sourceFile.Close() + + destFile, err := os.Create(dst) + if err != nil { + return err + } + defer destFile.Close() + + _, err = io.Copy(destFile, sourceFile) + return err +} + +// ConnectToServer creates a new client connected to the specified address +func ConnectToServer(address string) (*SnapshotClient, error) { + return NewSnapshotClient(address) +} diff --git a/grpc/server.go b/grpc/server.go new file mode 100644 index 0000000..b310769 --- /dev/null +++ b/grpc/server.go @@ -0,0 +1,154 @@ +package grpc + +import ( + "context" + "fmt" + "io" + "net" + + "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/timestamppb" + + "unprism.ru/KRBL/agate/interfaces" + "unprism.ru/KRBL/agate/store" +) + +// SnapshotServer implements the gRPC server for snapshots +type SnapshotServer struct { + UnimplementedSnapshotServiceServer + manager interfaces.SnapshotManager + server *grpc.Server +} + +// NewSnapshotServer creates a new snapshot server +func NewSnapshotServer(manager interfaces.SnapshotManager) *SnapshotServer { + return &SnapshotServer{ + manager: manager, + } +} + +// Start starts the gRPC server on the specified address +func (s *SnapshotServer) Start(ctx context.Context, address string) error { + lis, err := net.Listen("tcp", address) + if err != nil { + return fmt.Errorf("failed to listen on %s: %w", address, err) + } + + s.server = grpc.NewServer() + RegisterSnapshotServiceServer(s.server, s) + + go func() { + if err := s.server.Serve(lis); err != nil { + fmt.Printf("Server error: %v\n", err) + } + }() + + fmt.Printf("Server started on %s\n", address) + return nil +} + +// Stop gracefully stops the server +func (s *SnapshotServer) Stop(ctx context.Context) error { + if s.server != nil { + s.server.GracefulStop() + fmt.Println("Server stopped") + } + return nil +} + +// ListSnapshots implements the gRPC ListSnapshots method +func (s *SnapshotServer) ListSnapshots(ctx context.Context, req *ListSnapshotsRequest) (*ListSnapshotsResponse, error) { + snapshots, err := s.manager.ListSnapshots(ctx) + if err != nil { + return nil, fmt.Errorf("failed to list snapshots: %w", err) + } + + response := &ListSnapshotsResponse{ + Snapshots: make([]*SnapshotInfo, 0, len(snapshots)), + } + + for _, snapshot := range snapshots { + response.Snapshots = append(response.Snapshots, convertToGrpcSnapshotInfo(snapshot)) + } + + return response, nil +} + +// GetSnapshotDetails implements the gRPC GetSnapshotDetails method +func (s *SnapshotServer) GetSnapshotDetails(ctx context.Context, req *GetSnapshotDetailsRequest) (*SnapshotDetails, error) { + snapshot, err := s.manager.GetSnapshotDetails(ctx, req.SnapshotId) + if err != nil { + return nil, fmt.Errorf("failed to get snapshot details: %w", err) + } + + response := &SnapshotDetails{ + Info: convertToGrpcSnapshotInfo(store.SnapshotInfo{ + ID: snapshot.ID, + Name: snapshot.Name, + ParentID: snapshot.ParentID, + CreationTime: snapshot.CreationTime, + }), + Files: make([]*FileInfo, 0, len(snapshot.Files)), + } + + for _, file := range snapshot.Files { + response.Files = append(response.Files, &FileInfo{ + Path: file.Path, + SizeBytes: file.Size, + Sha256Hash: file.SHA256, + IsDir: file.IsDir, + }) + } + + return response, nil +} + +// DownloadFile implements the gRPC DownloadFile method +func (s *SnapshotServer) DownloadFile(req *DownloadFileRequest, stream grpc.ServerStreamingServer[DownloadFileResponse]) error { + // Open the file from the snapshot + fileReader, err := s.manager.OpenFile(context.Background(), req.SnapshotId, req.FilePath) + if err != nil { + return fmt.Errorf("failed to open file: %w", err) + } + defer fileReader.Close() + + // Read the file in chunks and send them to the client + buffer := make([]byte, 64*1024) // 64KB chunks + for { + n, err := fileReader.Read(buffer) + if err == io.EOF { + break + } + if err != nil { + return fmt.Errorf("failed to read file: %w", err) + } + + // Send the chunk to the client + if err := stream.Send(&DownloadFileResponse{ + ChunkData: buffer[:n], + }); err != nil { + return fmt.Errorf("failed to send chunk: %w", err) + } + } + + return nil +} + +// Helper function to convert store.SnapshotInfo to grpc.SnapshotInfo +func convertToGrpcSnapshotInfo(info store.SnapshotInfo) *SnapshotInfo { + return &SnapshotInfo{ + Id: info.ID, + Name: info.Name, + ParentId: info.ParentID, + CreationTime: timestamppb.New(info.CreationTime), + } +} + +// RunServer is a helper function to create and start a snapshot server +func RunServer(ctx context.Context, manager interfaces.SnapshotManager, address string) (*SnapshotServer, error) { + server := NewSnapshotServer(manager) + if err := server.Start(ctx, address); err != nil { + return nil, err + } + return server, nil +} diff --git a/interfaces/snapshot.go b/interfaces/snapshot.go new file mode 100644 index 0000000..849c027 --- /dev/null +++ b/interfaces/snapshot.go @@ -0,0 +1,47 @@ +package interfaces + +import ( + "context" + "io" + + "unprism.ru/KRBL/agate/store" +) + +// SnapshotManager defines the interface that the server needs to interact with snapshots +type SnapshotManager interface { + // GetSnapshotDetails retrieves detailed metadata for a specific snapshot + GetSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error) + + // ListSnapshots retrieves a list of all available snapshots + ListSnapshots(ctx context.Context) ([]store.SnapshotInfo, error) + + // OpenFile retrieves and opens a file from the specified snapshot + OpenFile(ctx context.Context, snapshotID string, filePath string) (io.ReadCloser, error) + + // CreateSnapshot creates a new snapshot from the specified source directory + CreateSnapshot(ctx context.Context, sourceDir string, name string, parentID string) (*store.Snapshot, error) +} + +// SnapshotServer defines the interface for a server that can share snapshots +type SnapshotServer interface { + // Start initializes and begins the server's operation + Start(ctx context.Context) error + + // Stop gracefully shuts down the server + Stop(ctx context.Context) error +} + +// SnapshotClient defines the interface for a client that can connect to a server and download snapshots +type SnapshotClient interface { + // ListSnapshots retrieves a list of snapshots from the server + ListSnapshots(ctx context.Context) ([]store.SnapshotInfo, error) + + // FetchSnapshotDetails retrieves detailed information about a specific snapshot + FetchSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error) + + // DownloadSnapshot downloads a snapshot from the server + DownloadSnapshot(ctx context.Context, snapshotID string, targetDir string, localParentID string) error + + // Close closes the connection to the server + Close() error +} diff --git a/remote/client.go b/remote/client.go new file mode 100644 index 0000000..186134d --- /dev/null +++ b/remote/client.go @@ -0,0 +1,242 @@ +package remote + +import ( + "context" + "fmt" + "io" + "os" + "path/filepath" + + stdgrpc "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + agateGrpc "unprism.ru/KRBL/agate/grpc" + "unprism.ru/KRBL/agate/interfaces" + "unprism.ru/KRBL/agate/store" +) + +// Client represents a client for connecting to a remote snapshot server +// It implements the interfaces.SnapshotClient interface +type Client struct { + conn *stdgrpc.ClientConn + client agateGrpc.SnapshotServiceClient +} + +// Ensure Client implements interfaces.SnapshotClient +var _ interfaces.SnapshotClient = (*Client)(nil) + +// NewClient creates a new client connected to the specified address +func NewClient(address string) (*Client, error) { + // Connect to the server with insecure credentials (for simplicity) + conn, err := stdgrpc.Dial(address, stdgrpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return nil, fmt.Errorf("failed to connect to server at %s: %w", address, err) + } + + // Create the gRPC client + client := agateGrpc.NewSnapshotServiceClient(conn) + + return &Client{ + conn: conn, + client: client, + }, nil +} + +// Close closes the connection to the server +func (c *Client) Close() error { + if c.conn != nil { + return c.conn.Close() + } + return nil +} + +// ListSnapshots retrieves a list of snapshots from the remote server +func (c *Client) ListSnapshots(ctx context.Context) ([]store.SnapshotInfo, error) { + response, err := c.client.ListSnapshots(ctx, &agateGrpc.ListSnapshotsRequest{}) + if err != nil { + return nil, fmt.Errorf("failed to list snapshots: %w", err) + } + + // Convert gRPC snapshot info to store.SnapshotInfo + snapshots := make([]store.SnapshotInfo, 0, len(response.Snapshots)) + for _, snapshot := range response.Snapshots { + snapshots = append(snapshots, store.SnapshotInfo{ + ID: snapshot.Id, + Name: snapshot.Name, + ParentID: snapshot.ParentId, + CreationTime: snapshot.CreationTime.AsTime(), + }) + } + + return snapshots, nil +} + +// FetchSnapshotDetails retrieves detailed information about a specific snapshot +func (c *Client) FetchSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error) { + response, err := c.client.GetSnapshotDetails(ctx, &agateGrpc.GetSnapshotDetailsRequest{ + SnapshotId: snapshotID, + }) + if err != nil { + return nil, fmt.Errorf("failed to get snapshot details: %w", err) + } + + // Convert gRPC snapshot details to store.Snapshot + snapshot := &store.Snapshot{ + ID: response.Info.Id, + Name: response.Info.Name, + ParentID: response.Info.ParentId, + CreationTime: response.Info.CreationTime.AsTime(), + Files: make([]store.FileInfo, 0, len(response.Files)), + } + + // Convert file info + for _, file := range response.Files { + snapshot.Files = append(snapshot.Files, store.FileInfo{ + Path: file.Path, + Size: file.SizeBytes, + IsDir: file.IsDir, + SHA256: file.Sha256Hash, + }) + } + + return snapshot, nil +} + +// DownloadSnapshot downloads a snapshot from the server +// This implementation downloads each file individually to optimize bandwidth usage +func (c *Client) DownloadSnapshot(ctx context.Context, snapshotID string, targetDir string, localParentID string) error { + // Get snapshot details + snapshot, err := c.FetchSnapshotDetails(ctx, snapshotID) + if err != nil { + return fmt.Errorf("failed to get snapshot details: %w", err) + } + + // Create target directory if it doesn't exist + if err := os.MkdirAll(targetDir, 0755); err != nil { + return fmt.Errorf("failed to create target directory: %w", err) + } + + // If a local parent is specified, get its details to compare files + var localParentFiles map[string]store.FileInfo + if localParentID != "" { + localParent, err := c.FetchSnapshotDetails(ctx, localParentID) + if err == nil { + // Create a map of file paths to file info for quick lookup + localParentFiles = make(map[string]store.FileInfo, len(localParent.Files)) + for _, file := range localParent.Files { + localParentFiles[file.Path] = file + } + } + } + + // Download each file + for _, file := range snapshot.Files { + // Skip directories, we'll create them when needed + if file.IsDir { + // Create directory + dirPath := filepath.Join(targetDir, file.Path) + if err := os.MkdirAll(dirPath, 0755); err != nil { + return fmt.Errorf("failed to create directory %s: %w", dirPath, err) + } + continue + } + + // Check if we can skip downloading this file + if localParentFiles != nil { + if parentFile, exists := localParentFiles[file.Path]; exists && parentFile.SHA256 == file.SHA256 { + // File exists in parent with same hash, copy it instead of downloading + parentFilePath := filepath.Join(targetDir, "..", localParentID, file.Path) + targetFilePath := filepath.Join(targetDir, file.Path) + + // Ensure parent directory exists + if err := os.MkdirAll(filepath.Dir(targetFilePath), 0755); err != nil { + return fmt.Errorf("failed to create directory for %s: %w", targetFilePath, err) + } + + // Copy the file + if err := copyFile(parentFilePath, targetFilePath); err != nil { + // If copy fails, fall back to downloading + fmt.Printf("Failed to copy file %s, will download instead: %v\n", file.Path, err) + } else { + // Skip to next file + continue + } + } + } + + // Download the file + if err := c.downloadFile(ctx, snapshotID, file.Path, filepath.Join(targetDir, file.Path)); err != nil { + return fmt.Errorf("failed to download file %s: %w", file.Path, err) + } + } + + return nil +} + +// downloadFile downloads a single file from the server +func (c *Client) downloadFile(ctx context.Context, snapshotID, filePath, targetPath string) error { + // Create the request + req := &agateGrpc.DownloadFileRequest{ + SnapshotId: snapshotID, + FilePath: filePath, + } + + // Start streaming the file + stream, err := c.client.DownloadFile(ctx, req) + if err != nil { + return fmt.Errorf("failed to start file download: %w", err) + } + + // Ensure the target directory exists + if err := os.MkdirAll(filepath.Dir(targetPath), 0755); err != nil { + return fmt.Errorf("failed to create directory for %s: %w", targetPath, err) + } + + // Create the target file + file, err := os.Create(targetPath) + if err != nil { + return fmt.Errorf("failed to create file %s: %w", targetPath, err) + } + defer file.Close() + + // Receive and write chunks + for { + resp, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + return fmt.Errorf("error receiving file chunk: %w", err) + } + + // Write the chunk to the file + if _, err := file.Write(resp.ChunkData); err != nil { + return fmt.Errorf("error writing to file: %w", err) + } + } + + return nil +} + +// Helper function to copy a file +func copyFile(src, dst string) error { + sourceFile, err := os.Open(src) + if err != nil { + return err + } + defer sourceFile.Close() + + destFile, err := os.Create(dst) + if err != nil { + return err + } + defer destFile.Close() + + _, err = io.Copy(destFile, sourceFile) + return err +} + +// Connect creates a new client connected to the specified address +func Connect(address string) (*Client, error) { + return NewClient(address) +} diff --git a/remote/server.go b/remote/server.go new file mode 100644 index 0000000..45e1c76 --- /dev/null +++ b/remote/server.go @@ -0,0 +1,155 @@ +package remote + +import ( + "context" + "fmt" + "io" + "net" + + stdgrpc "google.golang.org/grpc" + "google.golang.org/protobuf/types/known/timestamppb" + + agateGrpc "unprism.ru/KRBL/agate/grpc" + "unprism.ru/KRBL/agate/interfaces" + "unprism.ru/KRBL/agate/store" +) + +// Server implements the gRPC server for snapshots +type Server struct { + agateGrpc.UnimplementedSnapshotServiceServer + manager interfaces.SnapshotManager + server *stdgrpc.Server +} + +// NewServer creates a new snapshot server +func NewServer(manager interfaces.SnapshotManager) *Server { + return &Server{ + manager: manager, + } +} + +// Start starts the gRPC server on the specified address +func (s *Server) Start(ctx context.Context, address string) error { + lis, err := net.Listen("tcp", address) + if err != nil { + return fmt.Errorf("failed to listen on %s: %w", address, err) + } + + s.server = stdgrpc.NewServer() + agateGrpc.RegisterSnapshotServiceServer(s.server, s) + + go func() { + if err := s.server.Serve(lis); err != nil { + fmt.Printf("Server error: %v\n", err) + } + }() + + fmt.Printf("Server started on %s\n", address) + return nil +} + +// Stop gracefully stops the server +func (s *Server) Stop(ctx context.Context) error { + if s.server != nil { + s.server.GracefulStop() + fmt.Println("Server stopped") + } + return nil +} + +// ListSnapshots implements the gRPC ListSnapshots method +func (s *Server) ListSnapshots(ctx context.Context, req *agateGrpc.ListSnapshotsRequest) (*agateGrpc.ListSnapshotsResponse, error) { + snapshots, err := s.manager.ListSnapshots(ctx) + if err != nil { + return nil, fmt.Errorf("failed to list snapshots: %w", err) + } + + response := &agateGrpc.ListSnapshotsResponse{ + Snapshots: make([]*agateGrpc.SnapshotInfo, 0, len(snapshots)), + } + + for _, snapshot := range snapshots { + response.Snapshots = append(response.Snapshots, convertToGrpcSnapshotInfo(snapshot)) + } + + return response, nil +} + +// GetSnapshotDetails implements the gRPC GetSnapshotDetails method +func (s *Server) GetSnapshotDetails(ctx context.Context, req *agateGrpc.GetSnapshotDetailsRequest) (*agateGrpc.SnapshotDetails, error) { + snapshot, err := s.manager.GetSnapshotDetails(ctx, req.SnapshotId) + if err != nil { + return nil, fmt.Errorf("failed to get snapshot details: %w", err) + } + + response := &agateGrpc.SnapshotDetails{ + Info: convertToGrpcSnapshotInfo(store.SnapshotInfo{ + ID: snapshot.ID, + Name: snapshot.Name, + ParentID: snapshot.ParentID, + CreationTime: snapshot.CreationTime, + }), + Files: make([]*agateGrpc.FileInfo, 0, len(snapshot.Files)), + } + + for _, file := range snapshot.Files { + response.Files = append(response.Files, &agateGrpc.FileInfo{ + Path: file.Path, + SizeBytes: file.Size, + Sha256Hash: file.SHA256, + IsDir: file.IsDir, + }) + } + + return response, nil +} + +// DownloadFile implements the gRPC DownloadFile method +func (s *Server) DownloadFile(req *agateGrpc.DownloadFileRequest, stream agateGrpc.SnapshotService_DownloadFileServer) error { + // Open the file from the snapshot + fileReader, err := s.manager.OpenFile(context.Background(), req.SnapshotId, req.FilePath) + if err != nil { + return fmt.Errorf("failed to open file: %w", err) + } + defer fileReader.Close() + + // Read the file in chunks and send them to the client + buffer := make([]byte, 64*1024) // 64KB chunks + for { + n, err := fileReader.Read(buffer) + if err == io.EOF { + break + } + if err != nil { + return fmt.Errorf("failed to read file: %w", err) + } + + // Send the chunk to the client + if err := stream.Send(&agateGrpc.DownloadFileResponse{ + ChunkData: buffer[:n], + }); err != nil { + return fmt.Errorf("failed to send chunk: %w", err) + } + } + + return nil +} + +// Helper function to convert store.SnapshotInfo to grpc.SnapshotInfo +func convertToGrpcSnapshotInfo(info store.SnapshotInfo) *agateGrpc.SnapshotInfo { + return &agateGrpc.SnapshotInfo{ + Id: info.ID, + Name: info.Name, + ParentId: info.ParentID, + CreationTime: timestamppb.New(info.CreationTime), + } +} + +// RunServer is a helper function to create and start a snapshot server +func RunServer(ctx context.Context, manager interfaces.SnapshotManager, address string) (*Server, error) { + server := NewServer(manager) + if err := server.Start(ctx, address); err != nil { + return nil, err + } + return server, nil +}