170 lines
4.9 KiB
Go
170 lines
4.9 KiB
Go
package remote
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
|
|
stdgrpc "google.golang.org/grpc"
|
|
"google.golang.org/protobuf/types/known/timestamppb"
|
|
|
|
agateGrpc "gitea.unprism.ru/KRBL/Agate/grpc"
|
|
"gitea.unprism.ru/KRBL/Agate/interfaces"
|
|
"gitea.unprism.ru/KRBL/Agate/store"
|
|
)
|
|
|
|
// Server реализует gRPC-сервер для снапшотов.
|
|
type Server struct {
|
|
agateGrpc.UnimplementedSnapshotServiceServer
|
|
manager interfaces.SnapshotManager
|
|
server *stdgrpc.Server
|
|
}
|
|
|
|
// NewServer создает новый сервер снапшотов.
|
|
func NewServer(manager interfaces.SnapshotManager) *Server {
|
|
return &Server{
|
|
manager: manager,
|
|
}
|
|
}
|
|
|
|
// Start запускает gRPC-сервер на указанном адресе.
|
|
func (s *Server) Start(ctx context.Context, address string) error {
|
|
lis, err := net.Listen("tcp", address)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to listen on %s: %w", address, err)
|
|
}
|
|
|
|
s.server = stdgrpc.NewServer()
|
|
agateGrpc.RegisterSnapshotServiceServer(s.server, s)
|
|
|
|
go func() {
|
|
if err := s.server.Serve(lis); err != nil {
|
|
fmt.Printf("Server error: %v\n", err)
|
|
}
|
|
}()
|
|
|
|
fmt.Printf("Server started on %s\n", address)
|
|
|
|
// Ждем отмены контекста для остановки сервера
|
|
<-ctx.Done()
|
|
s.Stop()
|
|
return nil
|
|
}
|
|
|
|
// Stop изящно останавливает сервер.
|
|
func (s *Server) Stop() {
|
|
if s.server != nil {
|
|
s.server.GracefulStop()
|
|
fmt.Println("Server stopped")
|
|
}
|
|
}
|
|
|
|
// ListSnapshots реализует gRPC-метод ListSnapshots.
|
|
func (s *Server) ListSnapshots(ctx context.Context, req *agateGrpc.ListSnapshotsRequest) (*agateGrpc.ListSnapshotsResponse, error) {
|
|
opts := store.ListOptions{}
|
|
snapshots, err := s.manager.ListSnapshots(ctx, opts)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to list snapshots: %w", err)
|
|
}
|
|
|
|
response := &agateGrpc.ListSnapshotsResponse{
|
|
Snapshots: make([]*agateGrpc.SnapshotInfo, 0, len(snapshots)),
|
|
}
|
|
|
|
for _, snapshot := range snapshots {
|
|
response.Snapshots = append(response.Snapshots, convertToGrpcSnapshotInfo(snapshot))
|
|
}
|
|
|
|
return response, nil
|
|
}
|
|
|
|
// GetSnapshotDetails реализует gRPC-метод GetSnapshotDetails.
|
|
func (s *Server) GetSnapshotDetails(ctx context.Context, req *agateGrpc.GetSnapshotDetailsRequest) (*agateGrpc.SnapshotDetails, error) {
|
|
snapshot, err := s.manager.GetSnapshotDetails(ctx, req.SnapshotId)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get snapshot details: %w", err)
|
|
}
|
|
|
|
response := &agateGrpc.SnapshotDetails{
|
|
Info: convertToGrpcSnapshotInfo(store.SnapshotInfo{
|
|
ID: snapshot.ID,
|
|
Name: snapshot.Name,
|
|
ParentID: snapshot.ParentID,
|
|
CreationTime: snapshot.CreationTime,
|
|
}),
|
|
Files: make([]*agateGrpc.FileInfo, 0, len(snapshot.Files)),
|
|
}
|
|
|
|
for _, file := range snapshot.Files {
|
|
response.Files = append(response.Files, &agateGrpc.FileInfo{
|
|
Path: file.Path,
|
|
SizeBytes: file.Size,
|
|
Sha256Hash: file.SHA256,
|
|
IsDir: file.IsDir,
|
|
})
|
|
}
|
|
|
|
return response, nil
|
|
}
|
|
|
|
// DownloadFile реализует gRPC-метод DownloadFile.
|
|
func (s *Server) DownloadFile(req *agateGrpc.DownloadFileRequest, stream agateGrpc.SnapshotService_DownloadFileServer) error {
|
|
fileReader, err := s.manager.OpenFile(context.Background(), req.SnapshotId, req.FilePath)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to open file: %w", err)
|
|
}
|
|
defer fileReader.Close()
|
|
|
|
buffer := make([]byte, 64*1024)
|
|
for {
|
|
n, err := fileReader.Read(buffer)
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("failed to read file: %w", err)
|
|
}
|
|
if err := stream.Send(&agateGrpc.DownloadFileResponse{ChunkData: buffer[:n]}); err != nil {
|
|
return fmt.Errorf("failed to send chunk: %w", err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// DownloadSnapshotDiff реализует gRPC-метод DownloadSnapshotDiff.
|
|
func (s *Server) DownloadSnapshotDiff(req *agateGrpc.DownloadSnapshotDiffRequest, stream agateGrpc.SnapshotService_DownloadSnapshotDiffServer) error {
|
|
diffReader, err := s.manager.StreamSnapshotDiff(context.Background(), req.SnapshotId, req.LocalParentId, req.Offset)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to stream snapshot diff: %w", err)
|
|
}
|
|
defer diffReader.Close()
|
|
|
|
buffer := make([]byte, 64*1024)
|
|
for {
|
|
n, err := diffReader.Read(buffer)
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("failed to read from diff stream: %w", err)
|
|
}
|
|
if n > 0 {
|
|
if err := stream.Send(&agateGrpc.DownloadFileResponse{ChunkData: buffer[:n]}); err != nil {
|
|
return fmt.Errorf("failed to send diff chunk: %w", err)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Вспомогательная функция для конвертации store.SnapshotInfo в grpc.SnapshotInfo
|
|
func convertToGrpcSnapshotInfo(info store.SnapshotInfo) *agateGrpc.SnapshotInfo {
|
|
return &agateGrpc.SnapshotInfo{
|
|
Id: info.ID,
|
|
Name: info.Name,
|
|
ParentId: info.ParentID,
|
|
CreationTime: timestamppb.New(info.CreationTime),
|
|
}
|
|
}
|