package remote import ( "context" "fmt" "io" "net" stdgrpc "google.golang.org/grpc" "google.golang.org/protobuf/types/known/timestamppb" agateGrpc "gitea.unprism.ru/KRBL/Agate/grpc" "gitea.unprism.ru/KRBL/Agate/interfaces" "gitea.unprism.ru/KRBL/Agate/store" ) // Server implements the gRPC server for snapshots type Server struct { agateGrpc.UnimplementedSnapshotServiceServer manager interfaces.SnapshotManager server *stdgrpc.Server } // NewServer creates a new snapshot server func NewServer(manager interfaces.SnapshotManager) *Server { return &Server{ manager: manager, } } // Start starts the gRPC server on the specified address func (s *Server) Start(ctx context.Context, address string) error { lis, err := net.Listen("tcp", address) if err != nil { return fmt.Errorf("failed to listen on %s: %w", address, err) } s.server = stdgrpc.NewServer() agateGrpc.RegisterSnapshotServiceServer(s.server, s) go func() { if err := s.server.Serve(lis); err != nil { fmt.Printf("Server error: %v\n", err) } }() fmt.Printf("Server started on %s\n", address) return nil } // Stop gracefully stops the server func (s *Server) Stop(ctx context.Context) error { if s.server != nil { s.server.GracefulStop() fmt.Println("Server stopped") } return nil } // ListSnapshots implements the gRPC ListSnapshots method func (s *Server) ListSnapshots(ctx context.Context, req *agateGrpc.ListSnapshotsRequest) (*agateGrpc.ListSnapshotsResponse, error) { snapshots, err := s.manager.ListSnapshots(ctx) if err != nil { return nil, fmt.Errorf("failed to list snapshots: %w", err) } response := &agateGrpc.ListSnapshotsResponse{ Snapshots: make([]*agateGrpc.SnapshotInfo, 0, len(snapshots)), } for _, snapshot := range snapshots { response.Snapshots = append(response.Snapshots, convertToGrpcSnapshotInfo(snapshot)) } return response, nil } // GetSnapshotDetails implements the gRPC GetSnapshotDetails method func (s *Server) GetSnapshotDetails(ctx context.Context, req *agateGrpc.GetSnapshotDetailsRequest) (*agateGrpc.SnapshotDetails, error) { snapshot, err := s.manager.GetSnapshotDetails(ctx, req.SnapshotId) if err != nil { return nil, fmt.Errorf("failed to get snapshot details: %w", err) } response := &agateGrpc.SnapshotDetails{ Info: convertToGrpcSnapshotInfo(store.SnapshotInfo{ ID: snapshot.ID, Name: snapshot.Name, ParentID: snapshot.ParentID, CreationTime: snapshot.CreationTime, }), Files: make([]*agateGrpc.FileInfo, 0, len(snapshot.Files)), } for _, file := range snapshot.Files { response.Files = append(response.Files, &agateGrpc.FileInfo{ Path: file.Path, SizeBytes: file.Size, Sha256Hash: file.SHA256, IsDir: file.IsDir, }) } return response, nil } // DownloadFile implements the gRPC DownloadFile method func (s *Server) DownloadFile(req *agateGrpc.DownloadFileRequest, stream agateGrpc.SnapshotService_DownloadFileServer) error { // Open the file from the snapshot fileReader, err := s.manager.OpenFile(context.Background(), req.SnapshotId, req.FilePath) if err != nil { return fmt.Errorf("failed to open file: %w", err) } defer fileReader.Close() // Read the file in chunks and send them to the client buffer := make([]byte, 64*1024) // 64KB chunks for { n, err := fileReader.Read(buffer) if err == io.EOF { break } if err != nil { return fmt.Errorf("failed to read file: %w", err) } // Send the chunk to the client if err := stream.Send(&agateGrpc.DownloadFileResponse{ ChunkData: buffer[:n], }); err != nil { return fmt.Errorf("failed to send chunk: %w", err) } } return nil } // Helper function to convert store.SnapshotInfo to grpc.SnapshotInfo func convertToGrpcSnapshotInfo(info store.SnapshotInfo) *agateGrpc.SnapshotInfo { return &agateGrpc.SnapshotInfo{ Id: info.ID, Name: info.Name, ParentId: info.ParentID, CreationTime: timestamppb.New(info.CreationTime), } } // RunServer is a helper function to create and start a snapshot server func RunServer(ctx context.Context, manager interfaces.SnapshotManager, address string) (*Server, error) { server := NewServer(manager) if err := server.Start(ctx, address); err != nil { return nil, err } return server, nil }