Agate/remote/server.go
Alexander Lazarenko 13744f0500
Introduce gRPC-based snapshot client and server functionality.
Implemented gRPC client and server for snapshot management, enabling remote operations like listing, fetching, and downloading snapshots. Updated interfaces to support the new gRPC implementation and integrated server start functionality into the API.
2025-05-07 22:03:03 +03:00

156 lines
4.2 KiB
Go

package remote
import (
"context"
"fmt"
"io"
"net"
stdgrpc "google.golang.org/grpc"
"google.golang.org/protobuf/types/known/timestamppb"
agateGrpc "unprism.ru/KRBL/agate/grpc"
"unprism.ru/KRBL/agate/interfaces"
"unprism.ru/KRBL/agate/store"
)
// Server implements the gRPC server for snapshots
type Server struct {
agateGrpc.UnimplementedSnapshotServiceServer
manager interfaces.SnapshotManager
server *stdgrpc.Server
}
// NewServer creates a new snapshot server
func NewServer(manager interfaces.SnapshotManager) *Server {
return &Server{
manager: manager,
}
}
// Start starts the gRPC server on the specified address
func (s *Server) Start(ctx context.Context, address string) error {
lis, err := net.Listen("tcp", address)
if err != nil {
return fmt.Errorf("failed to listen on %s: %w", address, err)
}
s.server = stdgrpc.NewServer()
agateGrpc.RegisterSnapshotServiceServer(s.server, s)
go func() {
if err := s.server.Serve(lis); err != nil {
fmt.Printf("Server error: %v\n", err)
}
}()
fmt.Printf("Server started on %s\n", address)
return nil
}
// Stop gracefully stops the server
func (s *Server) Stop(ctx context.Context) error {
if s.server != nil {
s.server.GracefulStop()
fmt.Println("Server stopped")
}
return nil
}
// ListSnapshots implements the gRPC ListSnapshots method
func (s *Server) ListSnapshots(ctx context.Context, req *agateGrpc.ListSnapshotsRequest) (*agateGrpc.ListSnapshotsResponse, error) {
snapshots, err := s.manager.ListSnapshots(ctx)
if err != nil {
return nil, fmt.Errorf("failed to list snapshots: %w", err)
}
response := &agateGrpc.ListSnapshotsResponse{
Snapshots: make([]*agateGrpc.SnapshotInfo, 0, len(snapshots)),
}
for _, snapshot := range snapshots {
response.Snapshots = append(response.Snapshots, convertToGrpcSnapshotInfo(snapshot))
}
return response, nil
}
// GetSnapshotDetails implements the gRPC GetSnapshotDetails method
func (s *Server) GetSnapshotDetails(ctx context.Context, req *agateGrpc.GetSnapshotDetailsRequest) (*agateGrpc.SnapshotDetails, error) {
snapshot, err := s.manager.GetSnapshotDetails(ctx, req.SnapshotId)
if err != nil {
return nil, fmt.Errorf("failed to get snapshot details: %w", err)
}
response := &agateGrpc.SnapshotDetails{
Info: convertToGrpcSnapshotInfo(store.SnapshotInfo{
ID: snapshot.ID,
Name: snapshot.Name,
ParentID: snapshot.ParentID,
CreationTime: snapshot.CreationTime,
}),
Files: make([]*agateGrpc.FileInfo, 0, len(snapshot.Files)),
}
for _, file := range snapshot.Files {
response.Files = append(response.Files, &agateGrpc.FileInfo{
Path: file.Path,
SizeBytes: file.Size,
Sha256Hash: file.SHA256,
IsDir: file.IsDir,
})
}
return response, nil
}
// DownloadFile implements the gRPC DownloadFile method
func (s *Server) DownloadFile(req *agateGrpc.DownloadFileRequest, stream agateGrpc.SnapshotService_DownloadFileServer) error {
// Open the file from the snapshot
fileReader, err := s.manager.OpenFile(context.Background(), req.SnapshotId, req.FilePath)
if err != nil {
return fmt.Errorf("failed to open file: %w", err)
}
defer fileReader.Close()
// Read the file in chunks and send them to the client
buffer := make([]byte, 64*1024) // 64KB chunks
for {
n, err := fileReader.Read(buffer)
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("failed to read file: %w", err)
}
// Send the chunk to the client
if err := stream.Send(&agateGrpc.DownloadFileResponse{
ChunkData: buffer[:n],
}); err != nil {
return fmt.Errorf("failed to send chunk: %w", err)
}
}
return nil
}
// Helper function to convert store.SnapshotInfo to grpc.SnapshotInfo
func convertToGrpcSnapshotInfo(info store.SnapshotInfo) *agateGrpc.SnapshotInfo {
return &agateGrpc.SnapshotInfo{
Id: info.ID,
Name: info.Name,
ParentId: info.ParentID,
CreationTime: timestamppb.New(info.CreationTime),
}
}
// RunServer is a helper function to create and start a snapshot server
func RunServer(ctx context.Context, manager interfaces.SnapshotManager, address string) (*Server, error) {
server := NewServer(manager)
if err := server.Start(ctx, address); err != nil {
return nil, err
}
return server, nil
}