package grpc import ( "context" "fmt" "io" "net" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/timestamppb" "gitea.unprism.ru/KRBL/Agate/interfaces" "gitea.unprism.ru/KRBL/Agate/store" ) // SnapshotServer implements the gRPC server for snapshots type SnapshotServer struct { UnimplementedSnapshotServiceServer manager interfaces.SnapshotManager server *grpc.Server } // NewSnapshotServer creates a new snapshot server func NewSnapshotServer(manager interfaces.SnapshotManager) *SnapshotServer { return &SnapshotServer{ manager: manager, } } // Start starts the gRPC server on the specified address func (s *SnapshotServer) Start(ctx context.Context, address string) error { lis, err := net.Listen("tcp", address) if err != nil { return fmt.Errorf("failed to listen on %s: %w", address, err) } s.server = grpc.NewServer() RegisterSnapshotServiceServer(s.server, s) go func() { if err := s.server.Serve(lis); err != nil { fmt.Printf("Server error: %v\n", err) } }() fmt.Printf("Server started on %s\n", address) return nil } // Stop gracefully stops the server func (s *SnapshotServer) Stop(ctx context.Context) error { if s.server != nil { s.server.GracefulStop() fmt.Println("Server stopped") } return nil } // ListSnapshots implements the gRPC ListSnapshots method func (s *SnapshotServer) ListSnapshots(ctx context.Context, req *ListSnapshotsRequest) (*ListSnapshotsResponse, error) { snapshots, err := s.manager.ListSnapshots(ctx) if err != nil { return nil, fmt.Errorf("failed to list snapshots: %w", err) } response := &ListSnapshotsResponse{ Snapshots: make([]*SnapshotInfo, 0, len(snapshots)), } for _, snapshot := range snapshots { response.Snapshots = append(response.Snapshots, convertToGrpcSnapshotInfo(snapshot)) } return response, nil } // GetSnapshotDetails implements the gRPC GetSnapshotDetails method func (s *SnapshotServer) GetSnapshotDetails(ctx context.Context, req *GetSnapshotDetailsRequest) (*SnapshotDetails, error) { snapshot, err := s.manager.GetSnapshotDetails(ctx, req.SnapshotId) if err != nil { return nil, fmt.Errorf("failed to get snapshot details: %w", err) } response := &SnapshotDetails{ Info: convertToGrpcSnapshotInfo(store.SnapshotInfo{ ID: snapshot.ID, Name: snapshot.Name, ParentID: snapshot.ParentID, CreationTime: snapshot.CreationTime, }), Files: make([]*FileInfo, 0, len(snapshot.Files)), } for _, file := range snapshot.Files { response.Files = append(response.Files, &FileInfo{ Path: file.Path, SizeBytes: file.Size, Sha256Hash: file.SHA256, IsDir: file.IsDir, }) } return response, nil } // DownloadFile implements the gRPC DownloadFile method func (s *SnapshotServer) DownloadFile(req *DownloadFileRequest, stream grpc.ServerStreamingServer[DownloadFileResponse]) error { // Open the file from the snapshot fileReader, err := s.manager.OpenFile(context.Background(), req.SnapshotId, req.FilePath) if err != nil { return fmt.Errorf("failed to open file: %w", err) } defer fileReader.Close() // Read the file in chunks and send them to the client buffer := make([]byte, 64*1024) // 64KB chunks for { n, err := fileReader.Read(buffer) if err == io.EOF { break } if err != nil { return fmt.Errorf("failed to read file: %w", err) } // Send the chunk to the client if err := stream.Send(&DownloadFileResponse{ ChunkData: buffer[:n], }); err != nil { return fmt.Errorf("failed to send chunk: %w", err) } } return nil } // Helper function to convert store.SnapshotInfo to grpc.SnapshotInfo func convertToGrpcSnapshotInfo(info store.SnapshotInfo) *SnapshotInfo { return &SnapshotInfo{ Id: info.ID, Name: info.Name, ParentId: info.ParentID, CreationTime: timestamppb.New(info.CreationTime), } } // RunServer is a helper function to create and start a snapshot server func RunServer(ctx context.Context, manager interfaces.SnapshotManager, address string) (*SnapshotServer, error) { server := NewSnapshotServer(manager) if err := server.Start(ctx, address); err != nil { return nil, err } return server, nil }