4 Commits

Author SHA1 Message Date
f7c1e461e6 Refactor snapshot methods to use active directory context
Updated `SaveSnapshot` and `RestoreSnapshot` methods to reference the active directory via `BlobStore`. Introduced `RestoreSnapshotToDir` for granular restore operations. Additionally, updated dependencies in `go.mod` to their latest versions for compatibility and maintenance.
2025-05-10 01:23:36 +03:00
b05058b5cd Add active directory management for snapshot operations
Introduced `GetActiveDir` and `CleanActiveDir` methods in the blob store to manage a dedicated directory for active snapshot operations. This ensures a clean working state before starting new operations and prevents conflicts. Updated related logic in snapshot creation and restoration to utilize the active directory.
2025-05-10 00:57:34 +03:00
9d04f43104 Updated package name 2025-05-08 11:32:21 +03:00
13744f0500 Introduce gRPC-based snapshot client and server functionality.
Implemented gRPC client and server for snapshot management, enabling remote operations like listing, fetching, and downloading snapshots. Updated interfaces to support the new gRPC implementation and integrated server start functionality into the API.
2025-05-07 22:03:03 +03:00
19 changed files with 1343 additions and 40 deletions

View File

@ -6,7 +6,7 @@ download-third-party:
mv ./grpc/third_party/googleapis/grafeas ./grpc mv ./grpc/third_party/googleapis/grafeas ./grpc
rm -rf ./grpc/third_party rm -rf ./grpc/third_party
gen-proto-geolocation: gen-proto:
mkdir -p ./grpc mkdir -p ./grpc
@protoc -I ./grpc \ @protoc -I ./grpc \

310
README.md Normal file
View File

@ -0,0 +1,310 @@
# Agate
Agate is a Go library for creating, managing, and sharing snapshots of directories. It provides functionality for creating incremental snapshots, storing them efficiently, and sharing them over a network using gRPC.
## Installation
```bash
go get gitea.unprism.ru/KRBL/Agate
```
## Features
- Create snapshots of directories
- Incremental snapshots (only store changes)
- Restore snapshots
- List and manage snapshots
- Share snapshots over a network using gRPC
- Connect to remote snapshot repositories
## Basic Usage
### Creating a Snapshot Repository
To create a snapshot repository, you need to initialize the Agate library with the appropriate options:
```go
package main
import (
"context"
"fmt"
"log"
"os"
"path/filepath"
"gitea.unprism.ru/KRBL/Agate"
"gitea.unprism.ru/KRBL/Agate/stores"
)
func main() {
// Create directories for your repository
workDir := "/path/to/your/repository"
if err := os.MkdirAll(workDir, 0755); err != nil {
log.Fatalf("Failed to create work directory: %v", err)
}
// Initialize the default stores
metadataStore, blobStore, err := stores.InitDefaultStores(workDir)
if err != nil {
log.Fatalf("Failed to initialize stores: %v", err)
}
defer metadataStore.Close()
// Initialize Agate
agateOptions := agate.AgateOptions{
WorkDir: workDir,
MetadataStore: metadataStore,
BlobStore: blobStore,
}
ag, err := agate.New(agateOptions)
if err != nil {
log.Fatalf("Failed to initialize Agate: %v", err)
}
defer ag.Close()
// Create a snapshot
ctx := context.Background()
snapshotID, err := ag.SaveSnapshot(ctx, "My First Snapshot", "")
if err != nil {
log.Fatalf("Failed to create snapshot: %v", err)
}
fmt.Printf("Created snapshot with ID: %s\n", snapshotID)
// List snapshots
snapshots, err := ag.ListSnapshots(ctx)
if err != nil {
log.Fatalf("Failed to list snapshots: %v", err)
}
fmt.Printf("Found %d snapshots:\n", len(snapshots))
for _, s := range snapshots {
fmt.Printf(" - %s: %s (created at %s)\n", s.ID, s.Name, s.CreationTime.Format("2006-01-02 15:04:05"))
}
}
```
### Hosting a Snapshot Repository
To host a snapshot repository and make it available over the network, you can use the `StartServer` method:
```go
package main
import (
"context"
"log"
"os"
"os/signal"
"syscall"
"gitea.unprism.ru/KRBL/Agate"
"gitea.unprism.ru/KRBL/Agate/stores"
)
func main() {
// Create directories for your repository
workDir := "/path/to/your/repository"
if err := os.MkdirAll(workDir, 0755); err != nil {
log.Fatalf("Failed to create work directory: %v", err)
}
// Initialize the default stores
metadataStore, blobStore, err := stores.InitDefaultStores(workDir)
if err != nil {
log.Fatalf("Failed to initialize stores: %v", err)
}
defer metadataStore.Close()
// Initialize Agate
agateOptions := agate.AgateOptions{
WorkDir: workDir,
MetadataStore: metadataStore,
BlobStore: blobStore,
}
ag, err := agate.New(agateOptions)
if err != nil {
log.Fatalf("Failed to initialize Agate: %v", err)
}
defer ag.Close()
// Start the gRPC server
ctx := context.Background()
address := "0.0.0.0:50051" // Listen on all interfaces, port 50051
if err := ag.StartServer(ctx, address); err != nil {
log.Fatalf("Failed to start server: %v", err)
}
log.Printf("Server started on %s", address)
// Wait for termination signal
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
log.Println("Shutting down...")
}
```
### Connecting to a Hosted Snapshot Repository
To connect to a hosted snapshot repository and retrieve snapshots:
```go
package main
import (
"context"
"fmt"
"log"
"os"
"gitea.unprism.ru/KRBL/Agate"
"gitea.unprism.ru/KRBL/Agate/stores"
)
func main() {
// Create directories for your local repository
workDir := "/path/to/your/local/repository"
if err := os.MkdirAll(workDir, 0755); err != nil {
log.Fatalf("Failed to create work directory: %v", err)
}
// Initialize the default stores
metadataStore, blobStore, err := stores.InitDefaultStores(workDir)
if err != nil {
log.Fatalf("Failed to initialize stores: %v", err)
}
defer metadataStore.Close()
// Initialize Agate
agateOptions := agate.AgateOptions{
WorkDir: workDir,
MetadataStore: metadataStore,
BlobStore: blobStore,
}
ag, err := agate.New(agateOptions)
if err != nil {
log.Fatalf("Failed to initialize Agate: %v", err)
}
defer ag.Close()
// Connect to a remote server
ctx := context.Background()
remoteAddress := "remote-server:50051"
// List snapshots from the remote server
snapshots, err := ag.GetRemoteSnapshotList(ctx, remoteAddress)
if err != nil {
log.Fatalf("Failed to list remote snapshots: %v", err)
}
fmt.Printf("Found %d remote snapshots:\n", len(snapshots))
for _, s := range snapshots {
fmt.Printf(" - %s: %s (created at %s)\n", s.ID, s.Name, s.CreationTime.Format("2006-01-02 15:04:05"))
}
// Download a specific snapshot
if len(snapshots) > 0 {
snapshotID := snapshots[0].ID
fmt.Printf("Downloading snapshot %s...\n", snapshotID)
// Download the snapshot (pass empty string as localParentID if this is the first download)
if err := ag.GetRemoteSnapshot(ctx, remoteAddress, snapshotID, ""); err != nil {
log.Fatalf("Failed to download snapshot: %v", err)
}
fmt.Printf("Successfully downloaded snapshot %s\n", snapshotID)
}
}
```
## Advanced Usage
### Creating Incremental Snapshots
You can create incremental snapshots by specifying a parent snapshot ID:
```go
// Create a first snapshot
snapshotID1, err := ag.SaveSnapshot(ctx, "First Snapshot", "")
if err != nil {
log.Fatalf("Failed to create first snapshot: %v", err)
}
// Make some changes to your files...
// Create a second snapshot with the first one as parent
snapshotID2, err := ag.SaveSnapshot(ctx, "Second Snapshot", snapshotID1)
if err != nil {
log.Fatalf("Failed to create second snapshot: %v", err)
}
```
### Restoring a Snapshot
To restore a snapshot:
```go
if err := ag.RestoreSnapshot(ctx, snapshotID); err != nil {
log.Fatalf("Failed to restore snapshot: %v", err)
}
```
### Getting Snapshot Details
To get detailed information about a snapshot:
```go
snapshot, err := ag.GetSnapshotDetails(ctx, snapshotID)
if err != nil {
log.Fatalf("Failed to get snapshot details: %v", err)
}
fmt.Printf("Snapshot: %s\n", snapshot.Name)
fmt.Printf("Created: %s\n", snapshot.CreationTime.Format("2006-01-02 15:04:05"))
fmt.Printf("Files: %d\n", len(snapshot.Files))
```
### Deleting a Snapshot
To delete a snapshot:
```go
if err := ag.DeleteSnapshot(ctx, snapshotID); err != nil {
log.Fatalf("Failed to delete snapshot: %v", err)
}
```
## API Reference
### Agate
The main entry point for the library.
- `New(options AgateOptions) (*Agate, error)` - Create a new Agate instance
- `SaveSnapshot(ctx context.Context, name string, parentID string) (string, error)` - Create a new snapshot
- `RestoreSnapshot(ctx context.Context, snapshotID string) error` - Restore a snapshot
- `ListSnapshots(ctx context.Context) ([]store.SnapshotInfo, error)` - List all snapshots
- `GetSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error)` - Get details of a snapshot
- `DeleteSnapshot(ctx context.Context, snapshotID string) error` - Delete a snapshot
- `StartServer(ctx context.Context, address string) error` - Start a gRPC server to share snapshots
- `ConnectRemote(address string) (*grpc.SnapshotClient, error)` - Connect to a remote server
- `GetRemoteSnapshotList(ctx context.Context, address string) ([]store.SnapshotInfo, error)` - List snapshots from a remote server
- `GetRemoteSnapshot(ctx context.Context, address string, snapshotID string, localParentID string) error` - Download a snapshot from a remote server
### AgateOptions
Configuration options for the Agate library.
- `WorkDir string` - Directory where snapshots will be stored and managed
- `OpenFunc func(dir string) error` - Called after a snapshot is restored
- `CloseFunc func() error` - Called before a snapshot is created or restored
- `MetadataStore store.MetadataStore` - Implementation of the metadata store
- `BlobStore store.BlobStore` - Implementation of the blob store
## License
[Add your license information here]

108
api.go
View File

@ -4,10 +4,11 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"gitea.unprism.ru/KRBL/Agate/grpc"
"os" "os"
"path/filepath" "path/filepath"
"unprism.ru/KRBL/agate/store" "gitea.unprism.ru/KRBL/Agate/store"
) )
// AgateOptions defines configuration options for the Agate library. // AgateOptions defines configuration options for the Agate library.
@ -97,7 +98,7 @@ func New(options AgateOptions) (*Agate, error) {
}, nil }, nil
} }
// SaveSnapshot creates a new snapshot from the current state of the work directory. // SaveSnapshot creates a new snapshot from the current state of the active directory.
// If parentID is provided, it will be set as the parent of the new snapshot. // If parentID is provided, it will be set as the parent of the new snapshot.
// Returns the ID of the created snapshot. // Returns the ID of the created snapshot.
func (a *Agate) SaveSnapshot(ctx context.Context, name string, parentID string) (string, error) { func (a *Agate) SaveSnapshot(ctx context.Context, name string, parentID string) (string, error) {
@ -109,7 +110,7 @@ func (a *Agate) SaveSnapshot(ctx context.Context, name string, parentID string)
} }
// Create the snapshot // Create the snapshot
snapshot, err := a.manager.CreateSnapshot(ctx, a.options.WorkDir, name, parentID) snapshot, err := a.manager.CreateSnapshot(ctx, a.options.BlobStore.GetActiveDir(), name, parentID)
if err != nil { if err != nil {
return "", fmt.Errorf("failed to create snapshot: %w", err) return "", fmt.Errorf("failed to create snapshot: %w", err)
} }
@ -124,7 +125,7 @@ func (a *Agate) SaveSnapshot(ctx context.Context, name string, parentID string)
return snapshot.ID, nil return snapshot.ID, nil
} }
// RestoreSnapshot extracts a snapshot to the work directory. // RestoreSnapshot extracts a snapshot to the active directory.
func (a *Agate) RestoreSnapshot(ctx context.Context, snapshotID string) error { func (a *Agate) RestoreSnapshot(ctx context.Context, snapshotID string) error {
// Call CloseFunc if provided // Call CloseFunc if provided
if a.options.CloseFunc != nil { if a.options.CloseFunc != nil {
@ -134,13 +135,37 @@ func (a *Agate) RestoreSnapshot(ctx context.Context, snapshotID string) error {
} }
// Extract the snapshot // Extract the snapshot
if err := a.manager.ExtractSnapshot(ctx, snapshotID, a.options.WorkDir); err != nil { if err := a.manager.ExtractSnapshot(ctx, snapshotID, a.options.BlobStore.GetActiveDir()); err != nil {
return fmt.Errorf("failed to extract snapshot: %w", err) return fmt.Errorf("failed to extract snapshot: %w", err)
} }
// Call OpenFunc if provided // Call OpenFunc if provided
if a.options.OpenFunc != nil { if a.options.OpenFunc != nil {
if err := a.options.OpenFunc(a.options.WorkDir); err != nil { if err := a.options.OpenFunc(a.options.BlobStore.GetActiveDir()); err != nil {
return fmt.Errorf("failed to open resources after restore: %w", err)
}
}
return nil
}
// RestoreSnapshot extracts a snapshot to the directory.
func (a *Agate) RestoreSnapshotToDir(ctx context.Context, snapshotID string, dir string) error {
// Call CloseFunc if provided
if a.options.CloseFunc != nil {
if err := a.options.CloseFunc(); err != nil {
return fmt.Errorf("failed to close resources before restore: %w", err)
}
}
// Extract the snapshot
if err := a.manager.ExtractSnapshot(ctx, snapshotID, dir); err != nil {
return fmt.Errorf("failed to extract snapshot: %w", err)
}
// Call OpenFunc if provided
if a.options.OpenFunc != nil {
if err := a.options.OpenFunc(dir); err != nil {
return fmt.Errorf("failed to open resources after restore: %w", err) return fmt.Errorf("failed to open resources after restore: %w", err)
} }
} }
@ -171,7 +196,74 @@ func (a *Agate) Close() error {
} }
// StartServer starts a gRPC server to share snapshots. // StartServer starts a gRPC server to share snapshots.
// This is a placeholder for future implementation.
func (a *Agate) StartServer(ctx context.Context, address string) error { func (a *Agate) StartServer(ctx context.Context, address string) error {
return errors.New("server functionality not implemented yet") _, err := grpc.RunServer(ctx, a.manager, address)
if err != nil {
return fmt.Errorf("failed to start server: %w", err)
}
// We don't store the server reference because we don't have a way to stop it yet
// In a future version, we could add a StopServer method
return nil
}
// ConnectRemote connects to a remote snapshot server.
// Returns a client that can be used to interact with the remote server.
func (a *Agate) ConnectRemote(address string) (*grpc.SnapshotClient, error) {
client, err := grpc.ConnectToServer(address)
if err != nil {
return nil, fmt.Errorf("failed to connect to remote server: %w", err)
}
return client, nil
}
// GetRemoteSnapshotList retrieves a list of snapshots from a remote server.
func (a *Agate) GetRemoteSnapshotList(ctx context.Context, address string) ([]store.SnapshotInfo, error) {
client, err := a.ConnectRemote(address)
if err != nil {
return nil, err
}
defer client.Close()
return client.ListSnapshots(ctx)
}
// GetRemoteSnapshot downloads a snapshot from a remote server.
// If localParentID is provided, it will be used to optimize the download by skipping files that already exist locally.
func (a *Agate) GetRemoteSnapshot(ctx context.Context, address string, snapshotID string, localParentID string) error {
client, err := a.ConnectRemote(address)
if err != nil {
return err
}
defer client.Close()
// Create a temporary directory for the downloaded snapshot
tempDir := filepath.Join(a.options.WorkDir, "temp", snapshotID)
if err := os.MkdirAll(tempDir, 0755); err != nil {
return fmt.Errorf("failed to create temporary directory: %w", err)
}
// Download the snapshot
if err := client.DownloadSnapshot(ctx, snapshotID, tempDir, localParentID); err != nil {
return fmt.Errorf("failed to download snapshot: %w", err)
}
// Get the snapshot details to create a local copy
details, err := client.FetchSnapshotDetails(ctx, snapshotID)
if err != nil {
return fmt.Errorf("failed to get snapshot details: %w", err)
}
// Create a local snapshot from the downloaded files
_, err = a.manager.CreateSnapshot(ctx, tempDir, details.Name, localParentID)
if err != nil {
return fmt.Errorf("failed to create local snapshot: %w", err)
}
// Clean up the temporary directory
os.RemoveAll(tempDir)
return nil
} }

View File

@ -7,8 +7,8 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"unprism.ru/KRBL/agate" "gitea.unprism.ru/KRBL/Agate"
"unprism.ru/KRBL/agate/stores" "gitea.unprism.ru/KRBL/Agate/stores"
) )
func main() { func main() {

12
go.mod
View File

@ -1,4 +1,4 @@
module unprism.ru/KRBL/agate module gitea.unprism.ru/KRBL/Agate
go 1.24.0 go 1.24.0
@ -6,14 +6,14 @@ require (
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3 github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3
github.com/mattn/go-sqlite3 v1.14.28 github.com/mattn/go-sqlite3 v1.14.28
google.golang.org/genproto/googleapis/api v0.0.0-20250422160041-2d3770c4ea7f google.golang.org/genproto/googleapis/api v0.0.0-20250505200425-f936aa4a68b2
google.golang.org/grpc v1.72.0 google.golang.org/grpc v1.72.0
google.golang.org/protobuf v1.36.6 google.golang.org/protobuf v1.36.6
) )
require ( require (
golang.org/x/net v0.39.0 // indirect golang.org/x/net v0.40.0 // indirect
golang.org/x/sys v0.32.0 // indirect golang.org/x/sys v0.33.0 // indirect
golang.org/x/text v0.24.0 // indirect golang.org/x/text v0.25.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250422160041-2d3770c4ea7f // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250505200425-f936aa4a68b2 // indirect
) )

10
go.sum
View File

@ -26,14 +26,24 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY= golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY=
golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E= golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E=
golang.org/x/net v0.40.0 h1:79Xs7wF06Gbdcg4kdCCIQArK11Z1hr5POQ6+fIYHNuY=
golang.org/x/net v0.40.0/go.mod h1:y0hY0exeL2Pku80/zKK7tpntoX23cqL3Oa6njdgRtds=
golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw=
golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
golang.org/x/text v0.25.0 h1:qVyWApTSYLk/drJRO5mDlNYskwQznZmkpV2c8q9zls4=
golang.org/x/text v0.25.0/go.mod h1:WEdwpYrmk1qmdHvhkSTNPm3app7v4rsT8F2UD6+VHIA=
google.golang.org/genproto/googleapis/api v0.0.0-20250422160041-2d3770c4ea7f h1:tjZsroqekhC63+WMqzmWyW5Twj/ZfR5HAlpd5YQ1Vs0= google.golang.org/genproto/googleapis/api v0.0.0-20250422160041-2d3770c4ea7f h1:tjZsroqekhC63+WMqzmWyW5Twj/ZfR5HAlpd5YQ1Vs0=
google.golang.org/genproto/googleapis/api v0.0.0-20250422160041-2d3770c4ea7f/go.mod h1:Cd8IzgPo5Akum2c9R6FsXNaZbH3Jpa2gpHlW89FqlyQ= google.golang.org/genproto/googleapis/api v0.0.0-20250422160041-2d3770c4ea7f/go.mod h1:Cd8IzgPo5Akum2c9R6FsXNaZbH3Jpa2gpHlW89FqlyQ=
google.golang.org/genproto/googleapis/api v0.0.0-20250505200425-f936aa4a68b2 h1:vPV0tzlsK6EzEDHNNH5sa7Hs9bd7iXR7B1tSiPepkV0=
google.golang.org/genproto/googleapis/api v0.0.0-20250505200425-f936aa4a68b2/go.mod h1:pKLAc5OolXC3ViWGI62vvC0n10CpwAtRcTNCFwTKBEw=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250422160041-2d3770c4ea7f h1:N/PrbTw4kdkqNRzVfWPrBekzLuarFREcbFOiOLkXon4= google.golang.org/genproto/googleapis/rpc v0.0.0-20250422160041-2d3770c4ea7f h1:N/PrbTw4kdkqNRzVfWPrBekzLuarFREcbFOiOLkXon4=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250422160041-2d3770c4ea7f/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= google.golang.org/genproto/googleapis/rpc v0.0.0-20250422160041-2d3770c4ea7f/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250505200425-f936aa4a68b2 h1:IqsN8hx+lWLqlN+Sc3DoMy/watjofWiU8sRFgQ8fhKM=
google.golang.org/genproto/googleapis/rpc v0.0.0-20250505200425-f936aa4a68b2/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
google.golang.org/grpc v1.72.0 h1:S7UkcVa60b5AAQTaO6ZKamFp1zMZSU0fGDK2WZLbBnM= google.golang.org/grpc v1.72.0 h1:S7UkcVa60b5AAQTaO6ZKamFp1zMZSU0fGDK2WZLbBnM=
google.golang.org/grpc v1.72.0/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM= google.golang.org/grpc v1.72.0/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM=
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=

236
grpc/client.go Normal file
View File

@ -0,0 +1,236 @@
package grpc
import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"gitea.unprism.ru/KRBL/Agate/store"
)
// SnapshotClient implements the client for connecting to a remote snapshot server
type SnapshotClient struct {
conn *grpc.ClientConn
client SnapshotServiceClient
}
// NewSnapshotClient creates a new client connected to the specified address
func NewSnapshotClient(address string) (*SnapshotClient, error) {
// Connect to the server with insecure credentials (for simplicity)
conn, err := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, fmt.Errorf("failed to connect to server at %s: %w", address, err)
}
// Create the gRPC client
client := NewSnapshotServiceClient(conn)
return &SnapshotClient{
conn: conn,
client: client,
}, nil
}
// Close closes the connection to the server
func (c *SnapshotClient) Close() error {
if c.conn != nil {
return c.conn.Close()
}
return nil
}
// ListSnapshots retrieves a list of snapshots from the remote server
func (c *SnapshotClient) ListSnapshots(ctx context.Context) ([]store.SnapshotInfo, error) {
response, err := c.client.ListSnapshots(ctx, &ListSnapshotsRequest{})
if err != nil {
return nil, fmt.Errorf("failed to list snapshots: %w", err)
}
// Convert gRPC snapshot info to store.SnapshotInfo
snapshots := make([]store.SnapshotInfo, 0, len(response.Snapshots))
for _, snapshot := range response.Snapshots {
snapshots = append(snapshots, store.SnapshotInfo{
ID: snapshot.Id,
Name: snapshot.Name,
ParentID: snapshot.ParentId,
CreationTime: snapshot.CreationTime.AsTime(),
})
}
return snapshots, nil
}
// FetchSnapshotDetails retrieves detailed information about a specific snapshot
func (c *SnapshotClient) FetchSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error) {
response, err := c.client.GetSnapshotDetails(ctx, &GetSnapshotDetailsRequest{
SnapshotId: snapshotID,
})
if err != nil {
return nil, fmt.Errorf("failed to get snapshot details: %w", err)
}
// Convert gRPC snapshot details to store.Snapshot
snapshot := &store.Snapshot{
ID: response.Info.Id,
Name: response.Info.Name,
ParentID: response.Info.ParentId,
CreationTime: response.Info.CreationTime.AsTime(),
Files: make([]store.FileInfo, 0, len(response.Files)),
}
// Convert file info
for _, file := range response.Files {
snapshot.Files = append(snapshot.Files, store.FileInfo{
Path: file.Path,
Size: file.SizeBytes,
IsDir: file.IsDir,
SHA256: file.Sha256Hash,
})
}
return snapshot, nil
}
// DownloadSnapshot downloads a snapshot from the server
// This implementation downloads each file individually to optimize bandwidth usage
func (c *SnapshotClient) DownloadSnapshot(ctx context.Context, snapshotID string, targetDir string, localParentID string) error {
// Get snapshot details
snapshot, err := c.FetchSnapshotDetails(ctx, snapshotID)
if err != nil {
return fmt.Errorf("failed to get snapshot details: %w", err)
}
// Create target directory if it doesn't exist
if err := os.MkdirAll(targetDir, 0755); err != nil {
return fmt.Errorf("failed to create target directory: %w", err)
}
// If a local parent is specified, get its details to compare files
var localParentFiles map[string]store.FileInfo
if localParentID != "" {
localParent, err := c.FetchSnapshotDetails(ctx, localParentID)
if err == nil {
// Create a map of file paths to file info for quick lookup
localParentFiles = make(map[string]store.FileInfo, len(localParent.Files))
for _, file := range localParent.Files {
localParentFiles[file.Path] = file
}
}
}
// Download each file
for _, file := range snapshot.Files {
// Skip directories, we'll create them when needed
if file.IsDir {
// Create directory
dirPath := filepath.Join(targetDir, file.Path)
if err := os.MkdirAll(dirPath, 0755); err != nil {
return fmt.Errorf("failed to create directory %s: %w", dirPath, err)
}
continue
}
// Check if we can skip downloading this file
if localParentFiles != nil {
if parentFile, exists := localParentFiles[file.Path]; exists && parentFile.SHA256 == file.SHA256 {
// File exists in parent with same hash, copy it instead of downloading
parentFilePath := filepath.Join(targetDir, "..", localParentID, file.Path)
targetFilePath := filepath.Join(targetDir, file.Path)
// Ensure parent directory exists
if err := os.MkdirAll(filepath.Dir(targetFilePath), 0755); err != nil {
return fmt.Errorf("failed to create directory for %s: %w", targetFilePath, err)
}
// Copy the file
if err := copyFile(parentFilePath, targetFilePath); err != nil {
// If copy fails, fall back to downloading
fmt.Printf("Failed to copy file %s, will download instead: %v\n", file.Path, err)
} else {
// Skip to next file
continue
}
}
}
// Download the file
if err := c.downloadFile(ctx, snapshotID, file.Path, filepath.Join(targetDir, file.Path)); err != nil {
return fmt.Errorf("failed to download file %s: %w", file.Path, err)
}
}
return nil
}
// downloadFile downloads a single file from the server
func (c *SnapshotClient) downloadFile(ctx context.Context, snapshotID, filePath, targetPath string) error {
// Create the request
req := &DownloadFileRequest{
SnapshotId: snapshotID,
FilePath: filePath,
}
// Start streaming the file
stream, err := c.client.DownloadFile(ctx, req)
if err != nil {
return fmt.Errorf("failed to start file download: %w", err)
}
// Ensure the target directory exists
if err := os.MkdirAll(filepath.Dir(targetPath), 0755); err != nil {
return fmt.Errorf("failed to create directory for %s: %w", targetPath, err)
}
// Create the target file
file, err := os.Create(targetPath)
if err != nil {
return fmt.Errorf("failed to create file %s: %w", targetPath, err)
}
defer file.Close()
// Receive and write chunks
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("error receiving file chunk: %w", err)
}
// Write the chunk to the file
if _, err := file.Write(resp.ChunkData); err != nil {
return fmt.Errorf("error writing to file: %w", err)
}
}
return nil
}
// Helper function to copy a file
func copyFile(src, dst string) error {
sourceFile, err := os.Open(src)
if err != nil {
return err
}
defer sourceFile.Close()
destFile, err := os.Create(dst)
if err != nil {
return err
}
defer destFile.Close()
_, err = io.Copy(destFile, sourceFile)
return err
}
// ConnectToServer creates a new client connected to the specified address
func ConnectToServer(address string) (*SnapshotClient, error) {
return NewSnapshotClient(address)
}

154
grpc/server.go Normal file
View File

@ -0,0 +1,154 @@
package grpc
import (
"context"
"fmt"
"io"
"net"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/timestamppb"
"gitea.unprism.ru/KRBL/Agate/interfaces"
"gitea.unprism.ru/KRBL/Agate/store"
)
// SnapshotServer implements the gRPC server for snapshots
type SnapshotServer struct {
UnimplementedSnapshotServiceServer
manager interfaces.SnapshotManager
server *grpc.Server
}
// NewSnapshotServer creates a new snapshot server
func NewSnapshotServer(manager interfaces.SnapshotManager) *SnapshotServer {
return &SnapshotServer{
manager: manager,
}
}
// Start starts the gRPC server on the specified address
func (s *SnapshotServer) Start(ctx context.Context, address string) error {
lis, err := net.Listen("tcp", address)
if err != nil {
return fmt.Errorf("failed to listen on %s: %w", address, err)
}
s.server = grpc.NewServer()
RegisterSnapshotServiceServer(s.server, s)
go func() {
if err := s.server.Serve(lis); err != nil {
fmt.Printf("Server error: %v\n", err)
}
}()
fmt.Printf("Server started on %s\n", address)
return nil
}
// Stop gracefully stops the server
func (s *SnapshotServer) Stop(ctx context.Context) error {
if s.server != nil {
s.server.GracefulStop()
fmt.Println("Server stopped")
}
return nil
}
// ListSnapshots implements the gRPC ListSnapshots method
func (s *SnapshotServer) ListSnapshots(ctx context.Context, req *ListSnapshotsRequest) (*ListSnapshotsResponse, error) {
snapshots, err := s.manager.ListSnapshots(ctx)
if err != nil {
return nil, fmt.Errorf("failed to list snapshots: %w", err)
}
response := &ListSnapshotsResponse{
Snapshots: make([]*SnapshotInfo, 0, len(snapshots)),
}
for _, snapshot := range snapshots {
response.Snapshots = append(response.Snapshots, convertToGrpcSnapshotInfo(snapshot))
}
return response, nil
}
// GetSnapshotDetails implements the gRPC GetSnapshotDetails method
func (s *SnapshotServer) GetSnapshotDetails(ctx context.Context, req *GetSnapshotDetailsRequest) (*SnapshotDetails, error) {
snapshot, err := s.manager.GetSnapshotDetails(ctx, req.SnapshotId)
if err != nil {
return nil, fmt.Errorf("failed to get snapshot details: %w", err)
}
response := &SnapshotDetails{
Info: convertToGrpcSnapshotInfo(store.SnapshotInfo{
ID: snapshot.ID,
Name: snapshot.Name,
ParentID: snapshot.ParentID,
CreationTime: snapshot.CreationTime,
}),
Files: make([]*FileInfo, 0, len(snapshot.Files)),
}
for _, file := range snapshot.Files {
response.Files = append(response.Files, &FileInfo{
Path: file.Path,
SizeBytes: file.Size,
Sha256Hash: file.SHA256,
IsDir: file.IsDir,
})
}
return response, nil
}
// DownloadFile implements the gRPC DownloadFile method
func (s *SnapshotServer) DownloadFile(req *DownloadFileRequest, stream grpc.ServerStreamingServer[DownloadFileResponse]) error {
// Open the file from the snapshot
fileReader, err := s.manager.OpenFile(context.Background(), req.SnapshotId, req.FilePath)
if err != nil {
return fmt.Errorf("failed to open file: %w", err)
}
defer fileReader.Close()
// Read the file in chunks and send them to the client
buffer := make([]byte, 64*1024) // 64KB chunks
for {
n, err := fileReader.Read(buffer)
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("failed to read file: %w", err)
}
// Send the chunk to the client
if err := stream.Send(&DownloadFileResponse{
ChunkData: buffer[:n],
}); err != nil {
return fmt.Errorf("failed to send chunk: %w", err)
}
}
return nil
}
// Helper function to convert store.SnapshotInfo to grpc.SnapshotInfo
func convertToGrpcSnapshotInfo(info store.SnapshotInfo) *SnapshotInfo {
return &SnapshotInfo{
Id: info.ID,
Name: info.Name,
ParentId: info.ParentID,
CreationTime: timestamppb.New(info.CreationTime),
}
}
// RunServer is a helper function to create and start a snapshot server
func RunServer(ctx context.Context, manager interfaces.SnapshotManager, address string) (*SnapshotServer, error) {
server := NewSnapshotServer(manager)
if err := server.Start(ctx, address); err != nil {
return nil, err
}
return server, nil
}

View File

@ -476,7 +476,7 @@ const file_snapshot_proto_rawDesc = "" +
"\x0fSnapshotService\x12k\n" + "\x0fSnapshotService\x12k\n" +
"\rListSnapshots\x12 .agate.grpc.ListSnapshotsRequest\x1a!.agate.grpc.ListSnapshotsResponse\"\x15\x82\xd3\xe4\x93\x02\x0f\x12\r/v1/snapshots\x12}\n" + "\rListSnapshots\x12 .agate.grpc.ListSnapshotsRequest\x1a!.agate.grpc.ListSnapshotsResponse\"\x15\x82\xd3\xe4\x93\x02\x0f\x12\r/v1/snapshots\x12}\n" +
"\x12GetSnapshotDetails\x12%.agate.grpc.GetSnapshotDetailsRequest\x1a\x1b.agate.grpc.SnapshotDetails\"#\x82\xd3\xe4\x93\x02\x1d\x12\x1b/v1/snapshots/{snapshot_id}\x12\x8a\x01\n" + "\x12GetSnapshotDetails\x12%.agate.grpc.GetSnapshotDetailsRequest\x1a\x1b.agate.grpc.SnapshotDetails\"#\x82\xd3\xe4\x93\x02\x1d\x12\x1b/v1/snapshots/{snapshot_id}\x12\x8a\x01\n" +
"\fDownloadFile\x12\x1f.agate.grpc.DownloadFileRequest\x1a .agate.grpc.DownloadFileResponse\"5\x82\xd3\xe4\x93\x02/\x12-/v1/snapshots/{snapshot_id}/files/{file_path}0\x01B\x1cZ\x1aunprism.ru/KRBL/agate/grpcb\x06proto3" "\fDownloadFile\x12\x1f.agate.grpc.DownloadFileRequest\x1a .agate.grpc.DownloadFileResponse\"5\x82\xd3\xe4\x93\x02/\x12-/v1/snapshots/{snapshot_id}/files/{file_path}0\x01B\"Z gitea.unprism.ru/KRBL/Agate/grpcb\x06proto3"
var ( var (
file_snapshot_proto_rawDescOnce sync.Once file_snapshot_proto_rawDescOnce sync.Once

View File

@ -5,7 +5,7 @@ package agate.grpc;
import "google/protobuf/timestamp.proto"; import "google/protobuf/timestamp.proto";
import "google/api/annotations.proto"; // Добавлено для HTTP mapping import "google/api/annotations.proto"; // Добавлено для HTTP mapping
option go_package = "unprism.ru/KRBL/agate/grpc"; option go_package = "gitea.unprism.ru/KRBL/Agate/grpc";
// Сервис для управления снапшотами // Сервис для управления снапшотами
service SnapshotService { service SnapshotService {

47
interfaces/snapshot.go Normal file
View File

@ -0,0 +1,47 @@
package interfaces
import (
"context"
"io"
"gitea.unprism.ru/KRBL/Agate/store"
)
// SnapshotManager defines the interface that the server needs to interact with snapshots
type SnapshotManager interface {
// GetSnapshotDetails retrieves detailed metadata for a specific snapshot
GetSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error)
// ListSnapshots retrieves a list of all available snapshots
ListSnapshots(ctx context.Context) ([]store.SnapshotInfo, error)
// OpenFile retrieves and opens a file from the specified snapshot
OpenFile(ctx context.Context, snapshotID string, filePath string) (io.ReadCloser, error)
// CreateSnapshot creates a new snapshot from the specified source directory
CreateSnapshot(ctx context.Context, sourceDir string, name string, parentID string) (*store.Snapshot, error)
}
// SnapshotServer defines the interface for a server that can share snapshots
type SnapshotServer interface {
// Start initializes and begins the server's operation
Start(ctx context.Context) error
// Stop gracefully shuts down the server
Stop(ctx context.Context) error
}
// SnapshotClient defines the interface for a client that can connect to a server and download snapshots
type SnapshotClient interface {
// ListSnapshots retrieves a list of snapshots from the server
ListSnapshots(ctx context.Context) ([]store.SnapshotInfo, error)
// FetchSnapshotDetails retrieves detailed information about a specific snapshot
FetchSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error)
// DownloadSnapshot downloads a snapshot from the server
DownloadSnapshot(ctx context.Context, snapshotID string, targetDir string, localParentID string) error
// Close closes the connection to the server
Close() error
}

View File

@ -13,9 +13,9 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
"unprism.ru/KRBL/agate/archive" "gitea.unprism.ru/KRBL/Agate/archive"
"unprism.ru/KRBL/agate/hash" "gitea.unprism.ru/KRBL/Agate/hash"
"unprism.ru/KRBL/agate/store" "gitea.unprism.ru/KRBL/Agate/store"
) )
type SnapshotManagerData struct { type SnapshotManagerData struct {
@ -59,12 +59,20 @@ func (data *SnapshotManagerData) CreateSnapshot(ctx context.Context, sourceDir s
// Generate a unique ID for the snapshot // Generate a unique ID for the snapshot
snapshotID := uuid.New().String() snapshotID := uuid.New().String()
// Create a temporary file for the archive // Clean the active directory to avoid conflicts
tempFile, err := os.CreateTemp("", "agate-snapshot-*.zip") if err := data.blobStore.CleanActiveDir(ctx); err != nil {
if err != nil { return nil, fmt.Errorf("failed to clean active directory: %w", err)
return nil, fmt.Errorf("failed to create temporary file: %w", err) }
// Get the active directory for operations
activeDir := data.blobStore.GetActiveDir()
// Create a temporary file for the archive in the active directory
tempFilePath := filepath.Join(activeDir, "temp-"+snapshotID+".zip")
tempFile, err := os.Create(tempFilePath)
if err != nil {
return nil, fmt.Errorf("failed to create temporary file in active directory: %w", err)
} }
tempFilePath := tempFile.Name()
tempFile.Close() // Close it as CreateArchive will reopen it tempFile.Close() // Close it as CreateArchive will reopen it
defer os.Remove(tempFilePath) // Clean up temp file after we're done defer os.Remove(tempFilePath) // Clean up temp file after we're done
@ -253,8 +261,15 @@ func (data *SnapshotManagerData) ExtractSnapshot(ctx context.Context, snapshotID
if snapshotID == "" { if snapshotID == "" {
return errors.New("snapshot ID cannot be empty") return errors.New("snapshot ID cannot be empty")
} }
// If no specific path is provided, use the active directory
if path == "" { if path == "" {
return errors.New("target path cannot be empty") // Clean the active directory to avoid conflicts
if err := data.blobStore.CleanActiveDir(ctx); err != nil {
return fmt.Errorf("failed to clean active directory: %w", err)
}
path = filepath.Join(data.blobStore.GetActiveDir(), snapshotID)
} }
// First check if the snapshot exists // First check if the snapshot exists

242
remote/client.go Normal file
View File

@ -0,0 +1,242 @@
package remote
import (
"context"
"fmt"
"io"
"os"
"path/filepath"
stdgrpc "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
agateGrpc "gitea.unprism.ru/KRBL/Agate/grpc"
"gitea.unprism.ru/KRBL/Agate/interfaces"
"gitea.unprism.ru/KRBL/Agate/store"
)
// Client represents a client for connecting to a remote snapshot server
// It implements the interfaces.SnapshotClient interface
type Client struct {
conn *stdgrpc.ClientConn
client agateGrpc.SnapshotServiceClient
}
// Ensure Client implements interfaces.SnapshotClient
var _ interfaces.SnapshotClient = (*Client)(nil)
// NewClient creates a new client connected to the specified address
func NewClient(address string) (*Client, error) {
// Connect to the server with insecure credentials (for simplicity)
conn, err := stdgrpc.Dial(address, stdgrpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, fmt.Errorf("failed to connect to server at %s: %w", address, err)
}
// Create the gRPC client
client := agateGrpc.NewSnapshotServiceClient(conn)
return &Client{
conn: conn,
client: client,
}, nil
}
// Close closes the connection to the server
func (c *Client) Close() error {
if c.conn != nil {
return c.conn.Close()
}
return nil
}
// ListSnapshots retrieves a list of snapshots from the remote server
func (c *Client) ListSnapshots(ctx context.Context) ([]store.SnapshotInfo, error) {
response, err := c.client.ListSnapshots(ctx, &agateGrpc.ListSnapshotsRequest{})
if err != nil {
return nil, fmt.Errorf("failed to list snapshots: %w", err)
}
// Convert gRPC snapshot info to store.SnapshotInfo
snapshots := make([]store.SnapshotInfo, 0, len(response.Snapshots))
for _, snapshot := range response.Snapshots {
snapshots = append(snapshots, store.SnapshotInfo{
ID: snapshot.Id,
Name: snapshot.Name,
ParentID: snapshot.ParentId,
CreationTime: snapshot.CreationTime.AsTime(),
})
}
return snapshots, nil
}
// FetchSnapshotDetails retrieves detailed information about a specific snapshot
func (c *Client) FetchSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error) {
response, err := c.client.GetSnapshotDetails(ctx, &agateGrpc.GetSnapshotDetailsRequest{
SnapshotId: snapshotID,
})
if err != nil {
return nil, fmt.Errorf("failed to get snapshot details: %w", err)
}
// Convert gRPC snapshot details to store.Snapshot
snapshot := &store.Snapshot{
ID: response.Info.Id,
Name: response.Info.Name,
ParentID: response.Info.ParentId,
CreationTime: response.Info.CreationTime.AsTime(),
Files: make([]store.FileInfo, 0, len(response.Files)),
}
// Convert file info
for _, file := range response.Files {
snapshot.Files = append(snapshot.Files, store.FileInfo{
Path: file.Path,
Size: file.SizeBytes,
IsDir: file.IsDir,
SHA256: file.Sha256Hash,
})
}
return snapshot, nil
}
// DownloadSnapshot downloads a snapshot from the server
// This implementation downloads each file individually to optimize bandwidth usage
func (c *Client) DownloadSnapshot(ctx context.Context, snapshotID string, targetDir string, localParentID string) error {
// Get snapshot details
snapshot, err := c.FetchSnapshotDetails(ctx, snapshotID)
if err != nil {
return fmt.Errorf("failed to get snapshot details: %w", err)
}
// Create target directory if it doesn't exist
if err := os.MkdirAll(targetDir, 0755); err != nil {
return fmt.Errorf("failed to create target directory: %w", err)
}
// If a local parent is specified, get its details to compare files
var localParentFiles map[string]store.FileInfo
if localParentID != "" {
localParent, err := c.FetchSnapshotDetails(ctx, localParentID)
if err == nil {
// Create a map of file paths to file info for quick lookup
localParentFiles = make(map[string]store.FileInfo, len(localParent.Files))
for _, file := range localParent.Files {
localParentFiles[file.Path] = file
}
}
}
// Download each file
for _, file := range snapshot.Files {
// Skip directories, we'll create them when needed
if file.IsDir {
// Create directory
dirPath := filepath.Join(targetDir, file.Path)
if err := os.MkdirAll(dirPath, 0755); err != nil {
return fmt.Errorf("failed to create directory %s: %w", dirPath, err)
}
continue
}
// Check if we can skip downloading this file
if localParentFiles != nil {
if parentFile, exists := localParentFiles[file.Path]; exists && parentFile.SHA256 == file.SHA256 {
// File exists in parent with same hash, copy it instead of downloading
parentFilePath := filepath.Join(targetDir, "..", localParentID, file.Path)
targetFilePath := filepath.Join(targetDir, file.Path)
// Ensure parent directory exists
if err := os.MkdirAll(filepath.Dir(targetFilePath), 0755); err != nil {
return fmt.Errorf("failed to create directory for %s: %w", targetFilePath, err)
}
// Copy the file
if err := copyFile(parentFilePath, targetFilePath); err != nil {
// If copy fails, fall back to downloading
fmt.Printf("Failed to copy file %s, will download instead: %v\n", file.Path, err)
} else {
// Skip to next file
continue
}
}
}
// Download the file
if err := c.downloadFile(ctx, snapshotID, file.Path, filepath.Join(targetDir, file.Path)); err != nil {
return fmt.Errorf("failed to download file %s: %w", file.Path, err)
}
}
return nil
}
// downloadFile downloads a single file from the server
func (c *Client) downloadFile(ctx context.Context, snapshotID, filePath, targetPath string) error {
// Create the request
req := &agateGrpc.DownloadFileRequest{
SnapshotId: snapshotID,
FilePath: filePath,
}
// Start streaming the file
stream, err := c.client.DownloadFile(ctx, req)
if err != nil {
return fmt.Errorf("failed to start file download: %w", err)
}
// Ensure the target directory exists
if err := os.MkdirAll(filepath.Dir(targetPath), 0755); err != nil {
return fmt.Errorf("failed to create directory for %s: %w", targetPath, err)
}
// Create the target file
file, err := os.Create(targetPath)
if err != nil {
return fmt.Errorf("failed to create file %s: %w", targetPath, err)
}
defer file.Close()
// Receive and write chunks
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("error receiving file chunk: %w", err)
}
// Write the chunk to the file
if _, err := file.Write(resp.ChunkData); err != nil {
return fmt.Errorf("error writing to file: %w", err)
}
}
return nil
}
// Helper function to copy a file
func copyFile(src, dst string) error {
sourceFile, err := os.Open(src)
if err != nil {
return err
}
defer sourceFile.Close()
destFile, err := os.Create(dst)
if err != nil {
return err
}
defer destFile.Close()
_, err = io.Copy(destFile, sourceFile)
return err
}
// Connect creates a new client connected to the specified address
func Connect(address string) (*Client, error) {
return NewClient(address)
}

155
remote/server.go Normal file
View File

@ -0,0 +1,155 @@
package remote
import (
"context"
"fmt"
"io"
"net"
stdgrpc "google.golang.org/grpc"
"google.golang.org/protobuf/types/known/timestamppb"
agateGrpc "gitea.unprism.ru/KRBL/Agate/grpc"
"gitea.unprism.ru/KRBL/Agate/interfaces"
"gitea.unprism.ru/KRBL/Agate/store"
)
// Server implements the gRPC server for snapshots
type Server struct {
agateGrpc.UnimplementedSnapshotServiceServer
manager interfaces.SnapshotManager
server *stdgrpc.Server
}
// NewServer creates a new snapshot server
func NewServer(manager interfaces.SnapshotManager) *Server {
return &Server{
manager: manager,
}
}
// Start starts the gRPC server on the specified address
func (s *Server) Start(ctx context.Context, address string) error {
lis, err := net.Listen("tcp", address)
if err != nil {
return fmt.Errorf("failed to listen on %s: %w", address, err)
}
s.server = stdgrpc.NewServer()
agateGrpc.RegisterSnapshotServiceServer(s.server, s)
go func() {
if err := s.server.Serve(lis); err != nil {
fmt.Printf("Server error: %v\n", err)
}
}()
fmt.Printf("Server started on %s\n", address)
return nil
}
// Stop gracefully stops the server
func (s *Server) Stop(ctx context.Context) error {
if s.server != nil {
s.server.GracefulStop()
fmt.Println("Server stopped")
}
return nil
}
// ListSnapshots implements the gRPC ListSnapshots method
func (s *Server) ListSnapshots(ctx context.Context, req *agateGrpc.ListSnapshotsRequest) (*agateGrpc.ListSnapshotsResponse, error) {
snapshots, err := s.manager.ListSnapshots(ctx)
if err != nil {
return nil, fmt.Errorf("failed to list snapshots: %w", err)
}
response := &agateGrpc.ListSnapshotsResponse{
Snapshots: make([]*agateGrpc.SnapshotInfo, 0, len(snapshots)),
}
for _, snapshot := range snapshots {
response.Snapshots = append(response.Snapshots, convertToGrpcSnapshotInfo(snapshot))
}
return response, nil
}
// GetSnapshotDetails implements the gRPC GetSnapshotDetails method
func (s *Server) GetSnapshotDetails(ctx context.Context, req *agateGrpc.GetSnapshotDetailsRequest) (*agateGrpc.SnapshotDetails, error) {
snapshot, err := s.manager.GetSnapshotDetails(ctx, req.SnapshotId)
if err != nil {
return nil, fmt.Errorf("failed to get snapshot details: %w", err)
}
response := &agateGrpc.SnapshotDetails{
Info: convertToGrpcSnapshotInfo(store.SnapshotInfo{
ID: snapshot.ID,
Name: snapshot.Name,
ParentID: snapshot.ParentID,
CreationTime: snapshot.CreationTime,
}),
Files: make([]*agateGrpc.FileInfo, 0, len(snapshot.Files)),
}
for _, file := range snapshot.Files {
response.Files = append(response.Files, &agateGrpc.FileInfo{
Path: file.Path,
SizeBytes: file.Size,
Sha256Hash: file.SHA256,
IsDir: file.IsDir,
})
}
return response, nil
}
// DownloadFile implements the gRPC DownloadFile method
func (s *Server) DownloadFile(req *agateGrpc.DownloadFileRequest, stream agateGrpc.SnapshotService_DownloadFileServer) error {
// Open the file from the snapshot
fileReader, err := s.manager.OpenFile(context.Background(), req.SnapshotId, req.FilePath)
if err != nil {
return fmt.Errorf("failed to open file: %w", err)
}
defer fileReader.Close()
// Read the file in chunks and send them to the client
buffer := make([]byte, 64*1024) // 64KB chunks
for {
n, err := fileReader.Read(buffer)
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("failed to read file: %w", err)
}
// Send the chunk to the client
if err := stream.Send(&agateGrpc.DownloadFileResponse{
ChunkData: buffer[:n],
}); err != nil {
return fmt.Errorf("failed to send chunk: %w", err)
}
}
return nil
}
// Helper function to convert store.SnapshotInfo to grpc.SnapshotInfo
func convertToGrpcSnapshotInfo(info store.SnapshotInfo) *agateGrpc.SnapshotInfo {
return &agateGrpc.SnapshotInfo{
Id: info.ID,
Name: info.Name,
ParentId: info.ParentID,
CreationTime: timestamppb.New(info.CreationTime),
}
}
// RunServer is a helper function to create and start a snapshot server
func RunServer(ctx context.Context, manager interfaces.SnapshotManager, address string) (*Server, error) {
server := NewServer(manager)
if err := server.Start(ctx, address); err != nil {
return nil, err
}
return server, nil
}

View File

@ -2,8 +2,8 @@ package agate
import ( import (
"context" "context"
"gitea.unprism.ru/KRBL/Agate/store"
"io" "io"
"unprism.ru/KRBL/agate/store"
) )
// SnapshotManager is an interface that defines operations for managing and interacting with snapshots. // SnapshotManager is an interface that defines operations for managing and interacting with snapshots.

View File

@ -3,11 +3,11 @@ package filesystem
import ( import (
"context" "context"
"fmt" "fmt"
"gitea.unprism.ru/KRBL/Agate"
"gitea.unprism.ru/KRBL/Agate/store"
"io" "io"
"os" "os"
"path/filepath" "path/filepath"
"unprism.ru/KRBL/agate"
"unprism.ru/KRBL/agate/store"
) )
const blobExtension = ".zip" const blobExtension = ".zip"
@ -15,15 +15,26 @@ const blobExtension = ".zip"
// fileSystemStore реализует интерфейс store.BlobStore с использованием локальной файловой системы. // fileSystemStore реализует интерфейс store.BlobStore с использованием локальной файловой системы.
type fileSystemStore struct { type fileSystemStore struct {
baseDir string // Директория для хранения блобов (архивов) baseDir string // Директория для хранения блобов (архивов)
activeDir string // Директория для активных операций (создание и восстановление)
} }
// NewFileSystemStore создает новое хранилище блобов в указанной директории. // NewFileSystemStore создает новое хранилище блобов в указанной директории.
func NewFileSystemStore(baseDir string) (store.BlobStore, error) { func NewFileSystemStore(baseDir string) (store.BlobStore, error) {
// Убедимся, что директория существует // Убедимся, что базовая директория существует
if err := os.MkdirAll(baseDir, 0755); err != nil { if err := os.MkdirAll(baseDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create base directory %s for filesystem blob store: %w", baseDir, err) return nil, fmt.Errorf("failed to create base directory %s for filesystem blob store: %w", baseDir, err)
} }
return &fileSystemStore{baseDir: baseDir}, nil
// Создаем директорию для активных операций внутри базовой директории
activeDir := filepath.Join(baseDir, "active")
if err := os.MkdirAll(activeDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create active directory %s for filesystem blob store: %w", activeDir, err)
}
return &fileSystemStore{
baseDir: baseDir,
activeDir: activeDir,
}, nil
} }
// getBlobPath формирует полный путь к файлу блоба. // getBlobPath формирует полный путь к файлу блоба.
@ -106,3 +117,27 @@ func (fs *fileSystemStore) GetBlobPath(ctx context.Context, snapshotID string) (
// Файл существует, возвращаем путь // Файл существует, возвращаем путь
return blobPath, nil return blobPath, nil
} }
// GetActiveDir возвращает путь к директории для активных операций.
func (fs *fileSystemStore) GetActiveDir() string {
return fs.activeDir
}
// CleanActiveDir очищает директорию для активных операций.
// Это полезно перед началом новых операций, чтобы избежать конфликтов.
func (fs *fileSystemStore) CleanActiveDir(ctx context.Context) error {
// Удаляем все файлы в активной директории, но сохраняем саму директорию
entries, err := os.ReadDir(fs.activeDir)
if err != nil {
return fmt.Errorf("failed to read active directory: %w", err)
}
for _, entry := range entries {
path := filepath.Join(fs.activeDir, entry.Name())
if err := os.RemoveAll(path); err != nil {
return fmt.Errorf("failed to remove %s from active directory: %w", path, err)
}
}
return nil
}

View File

@ -6,12 +6,12 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"gitea.unprism.ru/KRBL/Agate"
"gitea.unprism.ru/KRBL/Agate/store"
_ "github.com/mattn/go-sqlite3" _ "github.com/mattn/go-sqlite3"
"os" "os"
"path/filepath" "path/filepath"
"time" "time"
"unprism.ru/KRBL/agate"
"unprism.ru/KRBL/agate/store"
) )
const ( const (

View File

@ -71,4 +71,11 @@ type BlobStore interface {
// Это может быть полезно для функций пакета archive, которые работают с путями. // Это может быть полезно для функций пакета archive, которые работают с путями.
// Возвращает agate.ErrNotFound, если блоб не найден. // Возвращает agate.ErrNotFound, если блоб не найден.
GetBlobPath(ctx context.Context, snapshotID string) (string, error) GetBlobPath(ctx context.Context, snapshotID string) (string, error)
// GetActiveDir возвращает путь к директории для активных операций (создание и восстановление).
GetActiveDir() string
// CleanActiveDir очищает директорию для активных операций.
// Это полезно перед началом новых операций, чтобы избежать конфликтов.
CleanActiveDir(ctx context.Context) error
} }

View File

@ -4,9 +4,9 @@ import (
"fmt" "fmt"
"path/filepath" "path/filepath"
"unprism.ru/KRBL/agate/store" "gitea.unprism.ru/KRBL/Agate/store"
"unprism.ru/KRBL/agate/store/filesystem" "gitea.unprism.ru/KRBL/Agate/store/filesystem"
"unprism.ru/KRBL/agate/store/sqlite" "gitea.unprism.ru/KRBL/Agate/store/sqlite"
) )
// NewDefaultMetadataStore creates a new SQLite-based metadata store. // NewDefaultMetadataStore creates a new SQLite-based metadata store.