package remote import ( "context" "fmt" "io" "net" stdgrpc "google.golang.org/grpc" "google.golang.org/protobuf/types/known/timestamppb" agateGrpc "gitea.unprism.ru/KRBL/Agate/grpc" "gitea.unprism.ru/KRBL/Agate/interfaces" "gitea.unprism.ru/KRBL/Agate/store" ) // Server реализует gRPC-сервер для снапшотов. type Server struct { agateGrpc.UnimplementedSnapshotServiceServer manager interfaces.SnapshotManager server *stdgrpc.Server } // NewServer создает новый сервер снапшотов. func NewServer(manager interfaces.SnapshotManager) *Server { return &Server{ manager: manager, } } // Start запускает gRPC-сервер на указанном адресе. func (s *Server) Start(ctx context.Context, address string) error { lis, err := net.Listen("tcp", address) if err != nil { return fmt.Errorf("failed to listen on %s: %w", address, err) } s.server = stdgrpc.NewServer() agateGrpc.RegisterSnapshotServiceServer(s.server, s) go func() { if err := s.server.Serve(lis); err != nil { fmt.Printf("Server error: %v\n", err) } }() fmt.Printf("Server started on %s\n", address) // Ждем отмены контекста для остановки сервера <-ctx.Done() s.Stop() return nil } // Stop изящно останавливает сервер. func (s *Server) Stop() { if s.server != nil { s.server.GracefulStop() fmt.Println("Server stopped") } } // ListSnapshots реализует gRPC-метод ListSnapshots. func (s *Server) ListSnapshots(ctx context.Context, req *agateGrpc.ListSnapshotsRequest) (*agateGrpc.ListSnapshotsResponse, error) { opts := store.ListOptions{} snapshots, err := s.manager.ListSnapshots(ctx, opts) if err != nil { return nil, fmt.Errorf("failed to list snapshots: %w", err) } response := &agateGrpc.ListSnapshotsResponse{ Snapshots: make([]*agateGrpc.SnapshotInfo, 0, len(snapshots)), } for _, snapshot := range snapshots { response.Snapshots = append(response.Snapshots, convertToGrpcSnapshotInfo(snapshot)) } return response, nil } // GetSnapshotDetails реализует gRPC-метод GetSnapshotDetails. func (s *Server) GetSnapshotDetails(ctx context.Context, req *agateGrpc.GetSnapshotDetailsRequest) (*agateGrpc.SnapshotDetails, error) { snapshot, err := s.manager.GetSnapshotDetails(ctx, req.SnapshotId) if err != nil { return nil, fmt.Errorf("failed to get snapshot details: %w", err) } response := &agateGrpc.SnapshotDetails{ Info: convertToGrpcSnapshotInfo(store.SnapshotInfo{ ID: snapshot.ID, Name: snapshot.Name, ParentID: snapshot.ParentID, CreationTime: snapshot.CreationTime, }), Files: make([]*agateGrpc.FileInfo, 0, len(snapshot.Files)), } for _, file := range snapshot.Files { response.Files = append(response.Files, &agateGrpc.FileInfo{ Path: file.Path, SizeBytes: file.Size, Sha256Hash: file.SHA256, IsDir: file.IsDir, }) } return response, nil } // DownloadFile реализует gRPC-метод DownloadFile. func (s *Server) DownloadFile(req *agateGrpc.DownloadFileRequest, stream agateGrpc.SnapshotService_DownloadFileServer) error { fileReader, err := s.manager.OpenFile(context.Background(), req.SnapshotId, req.FilePath) if err != nil { return fmt.Errorf("failed to open file: %w", err) } defer fileReader.Close() buffer := make([]byte, 64*1024) for { n, err := fileReader.Read(buffer) if err == io.EOF { break } if err != nil { return fmt.Errorf("failed to read file: %w", err) } if err := stream.Send(&agateGrpc.DownloadFileResponse{ChunkData: buffer[:n]}); err != nil { return fmt.Errorf("failed to send chunk: %w", err) } } return nil } // DownloadSnapshotDiff реализует gRPC-метод DownloadSnapshotDiff. func (s *Server) DownloadSnapshotDiff(req *agateGrpc.DownloadSnapshotDiffRequest, stream agateGrpc.SnapshotService_DownloadSnapshotDiffServer) error { diffReader, err := s.manager.StreamSnapshotDiff(context.Background(), req.SnapshotId, req.LocalParentId, req.Offset) if err != nil { return fmt.Errorf("failed to stream snapshot diff: %w", err) } defer diffReader.Close() buffer := make([]byte, 64*1024) for { n, err := diffReader.Read(buffer) if err == io.EOF { break } if err != nil { return fmt.Errorf("failed to read from diff stream: %w", err) } if n > 0 { if err := stream.Send(&agateGrpc.DownloadFileResponse{ChunkData: buffer[:n]}); err != nil { return fmt.Errorf("failed to send diff chunk: %w", err) } } } return nil } // Вспомогательная функция для конвертации store.SnapshotInfo в grpc.SnapshotInfo func convertToGrpcSnapshotInfo(info store.SnapshotInfo) *agateGrpc.SnapshotInfo { return &agateGrpc.SnapshotInfo{ Id: info.ID, Name: info.Name, ParentId: info.ParentID, CreationTime: timestamppb.New(info.CreationTime), } }