Agate/remote/server.go

156 lines
4.2 KiB
Go

package remote
import (
"context"
"fmt"
"io"
"net"
stdgrpc "google.golang.org/grpc"
"google.golang.org/protobuf/types/known/timestamppb"
agateGrpc "gitea.unprism.ru/KRBL/Agate/grpc"
"gitea.unprism.ru/KRBL/Agate/interfaces"
"gitea.unprism.ru/KRBL/Agate/store"
)
// Server implements the gRPC server for snapshots
type Server struct {
agateGrpc.UnimplementedSnapshotServiceServer
manager interfaces.SnapshotManager
server *stdgrpc.Server
}
// NewServer creates a new snapshot server
func NewServer(manager interfaces.SnapshotManager) *Server {
return &Server{
manager: manager,
}
}
// Start starts the gRPC server on the specified address
func (s *Server) Start(ctx context.Context, address string) error {
lis, err := net.Listen("tcp", address)
if err != nil {
return fmt.Errorf("failed to listen on %s: %w", address, err)
}
s.server = stdgrpc.NewServer()
agateGrpc.RegisterSnapshotServiceServer(s.server, s)
go func() {
if err := s.server.Serve(lis); err != nil {
fmt.Printf("Server error: %v\n", err)
}
}()
fmt.Printf("Server started on %s\n", address)
return nil
}
// Stop gracefully stops the server
func (s *Server) Stop(ctx context.Context) error {
if s.server != nil {
s.server.GracefulStop()
fmt.Println("Server stopped")
}
return nil
}
// ListSnapshots implements the gRPC ListSnapshots method
func (s *Server) ListSnapshots(ctx context.Context, req *agateGrpc.ListSnapshotsRequest) (*agateGrpc.ListSnapshotsResponse, error) {
snapshots, err := s.manager.ListSnapshots(ctx)
if err != nil {
return nil, fmt.Errorf("failed to list snapshots: %w", err)
}
response := &agateGrpc.ListSnapshotsResponse{
Snapshots: make([]*agateGrpc.SnapshotInfo, 0, len(snapshots)),
}
for _, snapshot := range snapshots {
response.Snapshots = append(response.Snapshots, convertToGrpcSnapshotInfo(snapshot))
}
return response, nil
}
// GetSnapshotDetails implements the gRPC GetSnapshotDetails method
func (s *Server) GetSnapshotDetails(ctx context.Context, req *agateGrpc.GetSnapshotDetailsRequest) (*agateGrpc.SnapshotDetails, error) {
snapshot, err := s.manager.GetSnapshotDetails(ctx, req.SnapshotId)
if err != nil {
return nil, fmt.Errorf("failed to get snapshot details: %w", err)
}
response := &agateGrpc.SnapshotDetails{
Info: convertToGrpcSnapshotInfo(store.SnapshotInfo{
ID: snapshot.ID,
Name: snapshot.Name,
ParentID: snapshot.ParentID,
CreationTime: snapshot.CreationTime,
}),
Files: make([]*agateGrpc.FileInfo, 0, len(snapshot.Files)),
}
for _, file := range snapshot.Files {
response.Files = append(response.Files, &agateGrpc.FileInfo{
Path: file.Path,
SizeBytes: file.Size,
Sha256Hash: file.SHA256,
IsDir: file.IsDir,
})
}
return response, nil
}
// DownloadFile implements the gRPC DownloadFile method
func (s *Server) DownloadFile(req *agateGrpc.DownloadFileRequest, stream agateGrpc.SnapshotService_DownloadFileServer) error {
// Open the file from the snapshot
fileReader, err := s.manager.OpenFile(context.Background(), req.SnapshotId, req.FilePath)
if err != nil {
return fmt.Errorf("failed to open file: %w", err)
}
defer fileReader.Close()
// Read the file in chunks and send them to the client
buffer := make([]byte, 64*1024) // 64KB chunks
for {
n, err := fileReader.Read(buffer)
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("failed to read file: %w", err)
}
// Send the chunk to the client
if err := stream.Send(&agateGrpc.DownloadFileResponse{
ChunkData: buffer[:n],
}); err != nil {
return fmt.Errorf("failed to send chunk: %w", err)
}
}
return nil
}
// Helper function to convert store.SnapshotInfo to grpc.SnapshotInfo
func convertToGrpcSnapshotInfo(info store.SnapshotInfo) *agateGrpc.SnapshotInfo {
return &agateGrpc.SnapshotInfo{
Id: info.ID,
Name: info.Name,
ParentId: info.ParentID,
CreationTime: timestamppb.New(info.CreationTime),
}
}
// RunServer is a helper function to create and start a snapshot server
func RunServer(ctx context.Context, manager interfaces.SnapshotManager, address string) (*Server, error) {
server := NewServer(manager)
if err := server.Start(ctx, address); err != nil {
return nil, err
}
return server, nil
}