Compare commits
4 Commits
main
...
v0.1.3-alp
Author | SHA1 | Date | |
---|---|---|---|
f7c1e461e6
|
|||
b05058b5cd
|
|||
9d04f43104
|
|||
13744f0500
|
2
Makefile
2
Makefile
@ -6,7 +6,7 @@ download-third-party:
|
|||||||
mv ./grpc/third_party/googleapis/grafeas ./grpc
|
mv ./grpc/third_party/googleapis/grafeas ./grpc
|
||||||
rm -rf ./grpc/third_party
|
rm -rf ./grpc/third_party
|
||||||
|
|
||||||
gen-proto-geolocation:
|
gen-proto:
|
||||||
mkdir -p ./grpc
|
mkdir -p ./grpc
|
||||||
|
|
||||||
@protoc -I ./grpc \
|
@protoc -I ./grpc \
|
||||||
|
310
README.md
Normal file
310
README.md
Normal file
@ -0,0 +1,310 @@
|
|||||||
|
# Agate
|
||||||
|
|
||||||
|
Agate is a Go library for creating, managing, and sharing snapshots of directories. It provides functionality for creating incremental snapshots, storing them efficiently, and sharing them over a network using gRPC.
|
||||||
|
|
||||||
|
## Installation
|
||||||
|
|
||||||
|
```bash
|
||||||
|
go get gitea.unprism.ru/KRBL/Agate
|
||||||
|
```
|
||||||
|
|
||||||
|
## Features
|
||||||
|
|
||||||
|
- Create snapshots of directories
|
||||||
|
- Incremental snapshots (only store changes)
|
||||||
|
- Restore snapshots
|
||||||
|
- List and manage snapshots
|
||||||
|
- Share snapshots over a network using gRPC
|
||||||
|
- Connect to remote snapshot repositories
|
||||||
|
|
||||||
|
## Basic Usage
|
||||||
|
|
||||||
|
### Creating a Snapshot Repository
|
||||||
|
|
||||||
|
To create a snapshot repository, you need to initialize the Agate library with the appropriate options:
|
||||||
|
|
||||||
|
```go
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
|
||||||
|
"gitea.unprism.ru/KRBL/Agate"
|
||||||
|
"gitea.unprism.ru/KRBL/Agate/stores"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
// Create directories for your repository
|
||||||
|
workDir := "/path/to/your/repository"
|
||||||
|
if err := os.MkdirAll(workDir, 0755); err != nil {
|
||||||
|
log.Fatalf("Failed to create work directory: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize the default stores
|
||||||
|
metadataStore, blobStore, err := stores.InitDefaultStores(workDir)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to initialize stores: %v", err)
|
||||||
|
}
|
||||||
|
defer metadataStore.Close()
|
||||||
|
|
||||||
|
// Initialize Agate
|
||||||
|
agateOptions := agate.AgateOptions{
|
||||||
|
WorkDir: workDir,
|
||||||
|
MetadataStore: metadataStore,
|
||||||
|
BlobStore: blobStore,
|
||||||
|
}
|
||||||
|
|
||||||
|
ag, err := agate.New(agateOptions)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to initialize Agate: %v", err)
|
||||||
|
}
|
||||||
|
defer ag.Close()
|
||||||
|
|
||||||
|
// Create a snapshot
|
||||||
|
ctx := context.Background()
|
||||||
|
snapshotID, err := ag.SaveSnapshot(ctx, "My First Snapshot", "")
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to create snapshot: %v", err)
|
||||||
|
}
|
||||||
|
fmt.Printf("Created snapshot with ID: %s\n", snapshotID)
|
||||||
|
|
||||||
|
// List snapshots
|
||||||
|
snapshots, err := ag.ListSnapshots(ctx)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to list snapshots: %v", err)
|
||||||
|
}
|
||||||
|
fmt.Printf("Found %d snapshots:\n", len(snapshots))
|
||||||
|
for _, s := range snapshots {
|
||||||
|
fmt.Printf(" - %s: %s (created at %s)\n", s.ID, s.Name, s.CreationTime.Format("2006-01-02 15:04:05"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Hosting a Snapshot Repository
|
||||||
|
|
||||||
|
To host a snapshot repository and make it available over the network, you can use the `StartServer` method:
|
||||||
|
|
||||||
|
```go
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
"syscall"
|
||||||
|
|
||||||
|
"gitea.unprism.ru/KRBL/Agate"
|
||||||
|
"gitea.unprism.ru/KRBL/Agate/stores"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
// Create directories for your repository
|
||||||
|
workDir := "/path/to/your/repository"
|
||||||
|
if err := os.MkdirAll(workDir, 0755); err != nil {
|
||||||
|
log.Fatalf("Failed to create work directory: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize the default stores
|
||||||
|
metadataStore, blobStore, err := stores.InitDefaultStores(workDir)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to initialize stores: %v", err)
|
||||||
|
}
|
||||||
|
defer metadataStore.Close()
|
||||||
|
|
||||||
|
// Initialize Agate
|
||||||
|
agateOptions := agate.AgateOptions{
|
||||||
|
WorkDir: workDir,
|
||||||
|
MetadataStore: metadataStore,
|
||||||
|
BlobStore: blobStore,
|
||||||
|
}
|
||||||
|
|
||||||
|
ag, err := agate.New(agateOptions)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to initialize Agate: %v", err)
|
||||||
|
}
|
||||||
|
defer ag.Close()
|
||||||
|
|
||||||
|
// Start the gRPC server
|
||||||
|
ctx := context.Background()
|
||||||
|
address := "0.0.0.0:50051" // Listen on all interfaces, port 50051
|
||||||
|
if err := ag.StartServer(ctx, address); err != nil {
|
||||||
|
log.Fatalf("Failed to start server: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Server started on %s", address)
|
||||||
|
|
||||||
|
// Wait for termination signal
|
||||||
|
sigCh := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
|
||||||
|
<-sigCh
|
||||||
|
|
||||||
|
log.Println("Shutting down...")
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Connecting to a Hosted Snapshot Repository
|
||||||
|
|
||||||
|
To connect to a hosted snapshot repository and retrieve snapshots:
|
||||||
|
|
||||||
|
```go
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"gitea.unprism.ru/KRBL/Agate"
|
||||||
|
"gitea.unprism.ru/KRBL/Agate/stores"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
// Create directories for your local repository
|
||||||
|
workDir := "/path/to/your/local/repository"
|
||||||
|
if err := os.MkdirAll(workDir, 0755); err != nil {
|
||||||
|
log.Fatalf("Failed to create work directory: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize the default stores
|
||||||
|
metadataStore, blobStore, err := stores.InitDefaultStores(workDir)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to initialize stores: %v", err)
|
||||||
|
}
|
||||||
|
defer metadataStore.Close()
|
||||||
|
|
||||||
|
// Initialize Agate
|
||||||
|
agateOptions := agate.AgateOptions{
|
||||||
|
WorkDir: workDir,
|
||||||
|
MetadataStore: metadataStore,
|
||||||
|
BlobStore: blobStore,
|
||||||
|
}
|
||||||
|
|
||||||
|
ag, err := agate.New(agateOptions)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to initialize Agate: %v", err)
|
||||||
|
}
|
||||||
|
defer ag.Close()
|
||||||
|
|
||||||
|
// Connect to a remote server
|
||||||
|
ctx := context.Background()
|
||||||
|
remoteAddress := "remote-server:50051"
|
||||||
|
|
||||||
|
// List snapshots from the remote server
|
||||||
|
snapshots, err := ag.GetRemoteSnapshotList(ctx, remoteAddress)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to list remote snapshots: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("Found %d remote snapshots:\n", len(snapshots))
|
||||||
|
for _, s := range snapshots {
|
||||||
|
fmt.Printf(" - %s: %s (created at %s)\n", s.ID, s.Name, s.CreationTime.Format("2006-01-02 15:04:05"))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Download a specific snapshot
|
||||||
|
if len(snapshots) > 0 {
|
||||||
|
snapshotID := snapshots[0].ID
|
||||||
|
fmt.Printf("Downloading snapshot %s...\n", snapshotID)
|
||||||
|
|
||||||
|
// Download the snapshot (pass empty string as localParentID if this is the first download)
|
||||||
|
if err := ag.GetRemoteSnapshot(ctx, remoteAddress, snapshotID, ""); err != nil {
|
||||||
|
log.Fatalf("Failed to download snapshot: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("Successfully downloaded snapshot %s\n", snapshotID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## Advanced Usage
|
||||||
|
|
||||||
|
### Creating Incremental Snapshots
|
||||||
|
|
||||||
|
You can create incremental snapshots by specifying a parent snapshot ID:
|
||||||
|
|
||||||
|
```go
|
||||||
|
// Create a first snapshot
|
||||||
|
snapshotID1, err := ag.SaveSnapshot(ctx, "First Snapshot", "")
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to create first snapshot: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make some changes to your files...
|
||||||
|
|
||||||
|
// Create a second snapshot with the first one as parent
|
||||||
|
snapshotID2, err := ag.SaveSnapshot(ctx, "Second Snapshot", snapshotID1)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to create second snapshot: %v", err)
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Restoring a Snapshot
|
||||||
|
|
||||||
|
To restore a snapshot:
|
||||||
|
|
||||||
|
```go
|
||||||
|
if err := ag.RestoreSnapshot(ctx, snapshotID); err != nil {
|
||||||
|
log.Fatalf("Failed to restore snapshot: %v", err)
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Getting Snapshot Details
|
||||||
|
|
||||||
|
To get detailed information about a snapshot:
|
||||||
|
|
||||||
|
```go
|
||||||
|
snapshot, err := ag.GetSnapshotDetails(ctx, snapshotID)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to get snapshot details: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("Snapshot: %s\n", snapshot.Name)
|
||||||
|
fmt.Printf("Created: %s\n", snapshot.CreationTime.Format("2006-01-02 15:04:05"))
|
||||||
|
fmt.Printf("Files: %d\n", len(snapshot.Files))
|
||||||
|
```
|
||||||
|
|
||||||
|
### Deleting a Snapshot
|
||||||
|
|
||||||
|
To delete a snapshot:
|
||||||
|
|
||||||
|
```go
|
||||||
|
if err := ag.DeleteSnapshot(ctx, snapshotID); err != nil {
|
||||||
|
log.Fatalf("Failed to delete snapshot: %v", err)
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## API Reference
|
||||||
|
|
||||||
|
### Agate
|
||||||
|
|
||||||
|
The main entry point for the library.
|
||||||
|
|
||||||
|
- `New(options AgateOptions) (*Agate, error)` - Create a new Agate instance
|
||||||
|
- `SaveSnapshot(ctx context.Context, name string, parentID string) (string, error)` - Create a new snapshot
|
||||||
|
- `RestoreSnapshot(ctx context.Context, snapshotID string) error` - Restore a snapshot
|
||||||
|
- `ListSnapshots(ctx context.Context) ([]store.SnapshotInfo, error)` - List all snapshots
|
||||||
|
- `GetSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error)` - Get details of a snapshot
|
||||||
|
- `DeleteSnapshot(ctx context.Context, snapshotID string) error` - Delete a snapshot
|
||||||
|
- `StartServer(ctx context.Context, address string) error` - Start a gRPC server to share snapshots
|
||||||
|
- `ConnectRemote(address string) (*grpc.SnapshotClient, error)` - Connect to a remote server
|
||||||
|
- `GetRemoteSnapshotList(ctx context.Context, address string) ([]store.SnapshotInfo, error)` - List snapshots from a remote server
|
||||||
|
- `GetRemoteSnapshot(ctx context.Context, address string, snapshotID string, localParentID string) error` - Download a snapshot from a remote server
|
||||||
|
|
||||||
|
### AgateOptions
|
||||||
|
|
||||||
|
Configuration options for the Agate library.
|
||||||
|
|
||||||
|
- `WorkDir string` - Directory where snapshots will be stored and managed
|
||||||
|
- `OpenFunc func(dir string) error` - Called after a snapshot is restored
|
||||||
|
- `CloseFunc func() error` - Called before a snapshot is created or restored
|
||||||
|
- `MetadataStore store.MetadataStore` - Implementation of the metadata store
|
||||||
|
- `BlobStore store.BlobStore` - Implementation of the blob store
|
||||||
|
|
||||||
|
## License
|
||||||
|
|
||||||
|
[Add your license information here]
|
108
api.go
108
api.go
@ -4,10 +4,11 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"gitea.unprism.ru/KRBL/Agate/grpc"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
"unprism.ru/KRBL/agate/store"
|
"gitea.unprism.ru/KRBL/Agate/store"
|
||||||
)
|
)
|
||||||
|
|
||||||
// AgateOptions defines configuration options for the Agate library.
|
// AgateOptions defines configuration options for the Agate library.
|
||||||
@ -97,7 +98,7 @@ func New(options AgateOptions) (*Agate, error) {
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SaveSnapshot creates a new snapshot from the current state of the work directory.
|
// SaveSnapshot creates a new snapshot from the current state of the active directory.
|
||||||
// If parentID is provided, it will be set as the parent of the new snapshot.
|
// If parentID is provided, it will be set as the parent of the new snapshot.
|
||||||
// Returns the ID of the created snapshot.
|
// Returns the ID of the created snapshot.
|
||||||
func (a *Agate) SaveSnapshot(ctx context.Context, name string, parentID string) (string, error) {
|
func (a *Agate) SaveSnapshot(ctx context.Context, name string, parentID string) (string, error) {
|
||||||
@ -109,7 +110,7 @@ func (a *Agate) SaveSnapshot(ctx context.Context, name string, parentID string)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Create the snapshot
|
// Create the snapshot
|
||||||
snapshot, err := a.manager.CreateSnapshot(ctx, a.options.WorkDir, name, parentID)
|
snapshot, err := a.manager.CreateSnapshot(ctx, a.options.BlobStore.GetActiveDir(), name, parentID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", fmt.Errorf("failed to create snapshot: %w", err)
|
return "", fmt.Errorf("failed to create snapshot: %w", err)
|
||||||
}
|
}
|
||||||
@ -124,7 +125,7 @@ func (a *Agate) SaveSnapshot(ctx context.Context, name string, parentID string)
|
|||||||
return snapshot.ID, nil
|
return snapshot.ID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// RestoreSnapshot extracts a snapshot to the work directory.
|
// RestoreSnapshot extracts a snapshot to the active directory.
|
||||||
func (a *Agate) RestoreSnapshot(ctx context.Context, snapshotID string) error {
|
func (a *Agate) RestoreSnapshot(ctx context.Context, snapshotID string) error {
|
||||||
// Call CloseFunc if provided
|
// Call CloseFunc if provided
|
||||||
if a.options.CloseFunc != nil {
|
if a.options.CloseFunc != nil {
|
||||||
@ -134,13 +135,37 @@ func (a *Agate) RestoreSnapshot(ctx context.Context, snapshotID string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Extract the snapshot
|
// Extract the snapshot
|
||||||
if err := a.manager.ExtractSnapshot(ctx, snapshotID, a.options.WorkDir); err != nil {
|
if err := a.manager.ExtractSnapshot(ctx, snapshotID, a.options.BlobStore.GetActiveDir()); err != nil {
|
||||||
return fmt.Errorf("failed to extract snapshot: %w", err)
|
return fmt.Errorf("failed to extract snapshot: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Call OpenFunc if provided
|
// Call OpenFunc if provided
|
||||||
if a.options.OpenFunc != nil {
|
if a.options.OpenFunc != nil {
|
||||||
if err := a.options.OpenFunc(a.options.WorkDir); err != nil {
|
if err := a.options.OpenFunc(a.options.BlobStore.GetActiveDir()); err != nil {
|
||||||
|
return fmt.Errorf("failed to open resources after restore: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// RestoreSnapshot extracts a snapshot to the directory.
|
||||||
|
func (a *Agate) RestoreSnapshotToDir(ctx context.Context, snapshotID string, dir string) error {
|
||||||
|
// Call CloseFunc if provided
|
||||||
|
if a.options.CloseFunc != nil {
|
||||||
|
if err := a.options.CloseFunc(); err != nil {
|
||||||
|
return fmt.Errorf("failed to close resources before restore: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extract the snapshot
|
||||||
|
if err := a.manager.ExtractSnapshot(ctx, snapshotID, dir); err != nil {
|
||||||
|
return fmt.Errorf("failed to extract snapshot: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Call OpenFunc if provided
|
||||||
|
if a.options.OpenFunc != nil {
|
||||||
|
if err := a.options.OpenFunc(dir); err != nil {
|
||||||
return fmt.Errorf("failed to open resources after restore: %w", err)
|
return fmt.Errorf("failed to open resources after restore: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -171,7 +196,74 @@ func (a *Agate) Close() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// StartServer starts a gRPC server to share snapshots.
|
// StartServer starts a gRPC server to share snapshots.
|
||||||
// This is a placeholder for future implementation.
|
|
||||||
func (a *Agate) StartServer(ctx context.Context, address string) error {
|
func (a *Agate) StartServer(ctx context.Context, address string) error {
|
||||||
return errors.New("server functionality not implemented yet")
|
_, err := grpc.RunServer(ctx, a.manager, address)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to start server: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// We don't store the server reference because we don't have a way to stop it yet
|
||||||
|
// In a future version, we could add a StopServer method
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ConnectRemote connects to a remote snapshot server.
|
||||||
|
// Returns a client that can be used to interact with the remote server.
|
||||||
|
func (a *Agate) ConnectRemote(address string) (*grpc.SnapshotClient, error) {
|
||||||
|
client, err := grpc.ConnectToServer(address)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to connect to remote server: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return client, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetRemoteSnapshotList retrieves a list of snapshots from a remote server.
|
||||||
|
func (a *Agate) GetRemoteSnapshotList(ctx context.Context, address string) ([]store.SnapshotInfo, error) {
|
||||||
|
client, err := a.ConnectRemote(address)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
return client.ListSnapshots(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetRemoteSnapshot downloads a snapshot from a remote server.
|
||||||
|
// If localParentID is provided, it will be used to optimize the download by skipping files that already exist locally.
|
||||||
|
func (a *Agate) GetRemoteSnapshot(ctx context.Context, address string, snapshotID string, localParentID string) error {
|
||||||
|
client, err := a.ConnectRemote(address)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
// Create a temporary directory for the downloaded snapshot
|
||||||
|
tempDir := filepath.Join(a.options.WorkDir, "temp", snapshotID)
|
||||||
|
if err := os.MkdirAll(tempDir, 0755); err != nil {
|
||||||
|
return fmt.Errorf("failed to create temporary directory: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Download the snapshot
|
||||||
|
if err := client.DownloadSnapshot(ctx, snapshotID, tempDir, localParentID); err != nil {
|
||||||
|
return fmt.Errorf("failed to download snapshot: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the snapshot details to create a local copy
|
||||||
|
details, err := client.FetchSnapshotDetails(ctx, snapshotID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get snapshot details: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a local snapshot from the downloaded files
|
||||||
|
_, err = a.manager.CreateSnapshot(ctx, tempDir, details.Name, localParentID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create local snapshot: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clean up the temporary directory
|
||||||
|
os.RemoveAll(tempDir)
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -7,8 +7,8 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
"unprism.ru/KRBL/agate"
|
"gitea.unprism.ru/KRBL/Agate"
|
||||||
"unprism.ru/KRBL/agate/stores"
|
"gitea.unprism.ru/KRBL/Agate/stores"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
12
go.mod
12
go.mod
@ -1,4 +1,4 @@
|
|||||||
module unprism.ru/KRBL/agate
|
module gitea.unprism.ru/KRBL/Agate
|
||||||
|
|
||||||
go 1.24.0
|
go 1.24.0
|
||||||
|
|
||||||
@ -6,14 +6,14 @@ require (
|
|||||||
github.com/google/uuid v1.6.0
|
github.com/google/uuid v1.6.0
|
||||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3
|
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.3
|
||||||
github.com/mattn/go-sqlite3 v1.14.28
|
github.com/mattn/go-sqlite3 v1.14.28
|
||||||
google.golang.org/genproto/googleapis/api v0.0.0-20250422160041-2d3770c4ea7f
|
google.golang.org/genproto/googleapis/api v0.0.0-20250505200425-f936aa4a68b2
|
||||||
google.golang.org/grpc v1.72.0
|
google.golang.org/grpc v1.72.0
|
||||||
google.golang.org/protobuf v1.36.6
|
google.golang.org/protobuf v1.36.6
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
golang.org/x/net v0.39.0 // indirect
|
golang.org/x/net v0.40.0 // indirect
|
||||||
golang.org/x/sys v0.32.0 // indirect
|
golang.org/x/sys v0.33.0 // indirect
|
||||||
golang.org/x/text v0.24.0 // indirect
|
golang.org/x/text v0.25.0 // indirect
|
||||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250422160041-2d3770c4ea7f // indirect
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20250505200425-f936aa4a68b2 // indirect
|
||||||
)
|
)
|
||||||
|
10
go.sum
10
go.sum
@ -26,14 +26,24 @@ go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC
|
|||||||
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
|
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
|
||||||
golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY=
|
golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY=
|
||||||
golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E=
|
golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E=
|
||||||
|
golang.org/x/net v0.40.0 h1:79Xs7wF06Gbdcg4kdCCIQArK11Z1hr5POQ6+fIYHNuY=
|
||||||
|
golang.org/x/net v0.40.0/go.mod h1:y0hY0exeL2Pku80/zKK7tpntoX23cqL3Oa6njdgRtds=
|
||||||
golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
|
golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
|
||||||
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
||||||
|
golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw=
|
||||||
|
golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
||||||
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
|
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
|
||||||
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
|
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
|
||||||
|
golang.org/x/text v0.25.0 h1:qVyWApTSYLk/drJRO5mDlNYskwQznZmkpV2c8q9zls4=
|
||||||
|
golang.org/x/text v0.25.0/go.mod h1:WEdwpYrmk1qmdHvhkSTNPm3app7v4rsT8F2UD6+VHIA=
|
||||||
google.golang.org/genproto/googleapis/api v0.0.0-20250422160041-2d3770c4ea7f h1:tjZsroqekhC63+WMqzmWyW5Twj/ZfR5HAlpd5YQ1Vs0=
|
google.golang.org/genproto/googleapis/api v0.0.0-20250422160041-2d3770c4ea7f h1:tjZsroqekhC63+WMqzmWyW5Twj/ZfR5HAlpd5YQ1Vs0=
|
||||||
google.golang.org/genproto/googleapis/api v0.0.0-20250422160041-2d3770c4ea7f/go.mod h1:Cd8IzgPo5Akum2c9R6FsXNaZbH3Jpa2gpHlW89FqlyQ=
|
google.golang.org/genproto/googleapis/api v0.0.0-20250422160041-2d3770c4ea7f/go.mod h1:Cd8IzgPo5Akum2c9R6FsXNaZbH3Jpa2gpHlW89FqlyQ=
|
||||||
|
google.golang.org/genproto/googleapis/api v0.0.0-20250505200425-f936aa4a68b2 h1:vPV0tzlsK6EzEDHNNH5sa7Hs9bd7iXR7B1tSiPepkV0=
|
||||||
|
google.golang.org/genproto/googleapis/api v0.0.0-20250505200425-f936aa4a68b2/go.mod h1:pKLAc5OolXC3ViWGI62vvC0n10CpwAtRcTNCFwTKBEw=
|
||||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250422160041-2d3770c4ea7f h1:N/PrbTw4kdkqNRzVfWPrBekzLuarFREcbFOiOLkXon4=
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20250422160041-2d3770c4ea7f h1:N/PrbTw4kdkqNRzVfWPrBekzLuarFREcbFOiOLkXon4=
|
||||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20250422160041-2d3770c4ea7f/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20250422160041-2d3770c4ea7f/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
|
||||||
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20250505200425-f936aa4a68b2 h1:IqsN8hx+lWLqlN+Sc3DoMy/watjofWiU8sRFgQ8fhKM=
|
||||||
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20250505200425-f936aa4a68b2/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A=
|
||||||
google.golang.org/grpc v1.72.0 h1:S7UkcVa60b5AAQTaO6ZKamFp1zMZSU0fGDK2WZLbBnM=
|
google.golang.org/grpc v1.72.0 h1:S7UkcVa60b5AAQTaO6ZKamFp1zMZSU0fGDK2WZLbBnM=
|
||||||
google.golang.org/grpc v1.72.0/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM=
|
google.golang.org/grpc v1.72.0/go.mod h1:wH5Aktxcg25y1I3w7H69nHfXdOG3UiadoBtjh3izSDM=
|
||||||
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
|
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
|
||||||
|
236
grpc/client.go
Normal file
236
grpc/client.go
Normal file
@ -0,0 +1,236 @@
|
|||||||
|
package grpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/credentials/insecure"
|
||||||
|
|
||||||
|
"gitea.unprism.ru/KRBL/Agate/store"
|
||||||
|
)
|
||||||
|
|
||||||
|
// SnapshotClient implements the client for connecting to a remote snapshot server
|
||||||
|
type SnapshotClient struct {
|
||||||
|
conn *grpc.ClientConn
|
||||||
|
client SnapshotServiceClient
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewSnapshotClient creates a new client connected to the specified address
|
||||||
|
func NewSnapshotClient(address string) (*SnapshotClient, error) {
|
||||||
|
// Connect to the server with insecure credentials (for simplicity)
|
||||||
|
conn, err := grpc.NewClient(address, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to connect to server at %s: %w", address, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the gRPC client
|
||||||
|
client := NewSnapshotServiceClient(conn)
|
||||||
|
|
||||||
|
return &SnapshotClient{
|
||||||
|
conn: conn,
|
||||||
|
client: client,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the connection to the server
|
||||||
|
func (c *SnapshotClient) Close() error {
|
||||||
|
if c.conn != nil {
|
||||||
|
return c.conn.Close()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListSnapshots retrieves a list of snapshots from the remote server
|
||||||
|
func (c *SnapshotClient) ListSnapshots(ctx context.Context) ([]store.SnapshotInfo, error) {
|
||||||
|
response, err := c.client.ListSnapshots(ctx, &ListSnapshotsRequest{})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to list snapshots: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert gRPC snapshot info to store.SnapshotInfo
|
||||||
|
snapshots := make([]store.SnapshotInfo, 0, len(response.Snapshots))
|
||||||
|
for _, snapshot := range response.Snapshots {
|
||||||
|
snapshots = append(snapshots, store.SnapshotInfo{
|
||||||
|
ID: snapshot.Id,
|
||||||
|
Name: snapshot.Name,
|
||||||
|
ParentID: snapshot.ParentId,
|
||||||
|
CreationTime: snapshot.CreationTime.AsTime(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return snapshots, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FetchSnapshotDetails retrieves detailed information about a specific snapshot
|
||||||
|
func (c *SnapshotClient) FetchSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error) {
|
||||||
|
response, err := c.client.GetSnapshotDetails(ctx, &GetSnapshotDetailsRequest{
|
||||||
|
SnapshotId: snapshotID,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to get snapshot details: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert gRPC snapshot details to store.Snapshot
|
||||||
|
snapshot := &store.Snapshot{
|
||||||
|
ID: response.Info.Id,
|
||||||
|
Name: response.Info.Name,
|
||||||
|
ParentID: response.Info.ParentId,
|
||||||
|
CreationTime: response.Info.CreationTime.AsTime(),
|
||||||
|
Files: make([]store.FileInfo, 0, len(response.Files)),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert file info
|
||||||
|
for _, file := range response.Files {
|
||||||
|
snapshot.Files = append(snapshot.Files, store.FileInfo{
|
||||||
|
Path: file.Path,
|
||||||
|
Size: file.SizeBytes,
|
||||||
|
IsDir: file.IsDir,
|
||||||
|
SHA256: file.Sha256Hash,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return snapshot, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// DownloadSnapshot downloads a snapshot from the server
|
||||||
|
// This implementation downloads each file individually to optimize bandwidth usage
|
||||||
|
func (c *SnapshotClient) DownloadSnapshot(ctx context.Context, snapshotID string, targetDir string, localParentID string) error {
|
||||||
|
// Get snapshot details
|
||||||
|
snapshot, err := c.FetchSnapshotDetails(ctx, snapshotID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get snapshot details: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create target directory if it doesn't exist
|
||||||
|
if err := os.MkdirAll(targetDir, 0755); err != nil {
|
||||||
|
return fmt.Errorf("failed to create target directory: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// If a local parent is specified, get its details to compare files
|
||||||
|
var localParentFiles map[string]store.FileInfo
|
||||||
|
if localParentID != "" {
|
||||||
|
localParent, err := c.FetchSnapshotDetails(ctx, localParentID)
|
||||||
|
if err == nil {
|
||||||
|
// Create a map of file paths to file info for quick lookup
|
||||||
|
localParentFiles = make(map[string]store.FileInfo, len(localParent.Files))
|
||||||
|
for _, file := range localParent.Files {
|
||||||
|
localParentFiles[file.Path] = file
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Download each file
|
||||||
|
for _, file := range snapshot.Files {
|
||||||
|
// Skip directories, we'll create them when needed
|
||||||
|
if file.IsDir {
|
||||||
|
// Create directory
|
||||||
|
dirPath := filepath.Join(targetDir, file.Path)
|
||||||
|
if err := os.MkdirAll(dirPath, 0755); err != nil {
|
||||||
|
return fmt.Errorf("failed to create directory %s: %w", dirPath, err)
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if we can skip downloading this file
|
||||||
|
if localParentFiles != nil {
|
||||||
|
if parentFile, exists := localParentFiles[file.Path]; exists && parentFile.SHA256 == file.SHA256 {
|
||||||
|
// File exists in parent with same hash, copy it instead of downloading
|
||||||
|
parentFilePath := filepath.Join(targetDir, "..", localParentID, file.Path)
|
||||||
|
targetFilePath := filepath.Join(targetDir, file.Path)
|
||||||
|
|
||||||
|
// Ensure parent directory exists
|
||||||
|
if err := os.MkdirAll(filepath.Dir(targetFilePath), 0755); err != nil {
|
||||||
|
return fmt.Errorf("failed to create directory for %s: %w", targetFilePath, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Copy the file
|
||||||
|
if err := copyFile(parentFilePath, targetFilePath); err != nil {
|
||||||
|
// If copy fails, fall back to downloading
|
||||||
|
fmt.Printf("Failed to copy file %s, will download instead: %v\n", file.Path, err)
|
||||||
|
} else {
|
||||||
|
// Skip to next file
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Download the file
|
||||||
|
if err := c.downloadFile(ctx, snapshotID, file.Path, filepath.Join(targetDir, file.Path)); err != nil {
|
||||||
|
return fmt.Errorf("failed to download file %s: %w", file.Path, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// downloadFile downloads a single file from the server
|
||||||
|
func (c *SnapshotClient) downloadFile(ctx context.Context, snapshotID, filePath, targetPath string) error {
|
||||||
|
// Create the request
|
||||||
|
req := &DownloadFileRequest{
|
||||||
|
SnapshotId: snapshotID,
|
||||||
|
FilePath: filePath,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start streaming the file
|
||||||
|
stream, err := c.client.DownloadFile(ctx, req)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to start file download: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure the target directory exists
|
||||||
|
if err := os.MkdirAll(filepath.Dir(targetPath), 0755); err != nil {
|
||||||
|
return fmt.Errorf("failed to create directory for %s: %w", targetPath, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the target file
|
||||||
|
file, err := os.Create(targetPath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create file %s: %w", targetPath, err)
|
||||||
|
}
|
||||||
|
defer file.Close()
|
||||||
|
|
||||||
|
// Receive and write chunks
|
||||||
|
for {
|
||||||
|
resp, err := stream.Recv()
|
||||||
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error receiving file chunk: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write the chunk to the file
|
||||||
|
if _, err := file.Write(resp.ChunkData); err != nil {
|
||||||
|
return fmt.Errorf("error writing to file: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper function to copy a file
|
||||||
|
func copyFile(src, dst string) error {
|
||||||
|
sourceFile, err := os.Open(src)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer sourceFile.Close()
|
||||||
|
|
||||||
|
destFile, err := os.Create(dst)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer destFile.Close()
|
||||||
|
|
||||||
|
_, err = io.Copy(destFile, sourceFile)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// ConnectToServer creates a new client connected to the specified address
|
||||||
|
func ConnectToServer(address string) (*SnapshotClient, error) {
|
||||||
|
return NewSnapshotClient(address)
|
||||||
|
}
|
154
grpc/server.go
Normal file
154
grpc/server.go
Normal file
@ -0,0 +1,154 @@
|
|||||||
|
package grpc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net"
|
||||||
|
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/protobuf/types/known/timestamppb"
|
||||||
|
|
||||||
|
"gitea.unprism.ru/KRBL/Agate/interfaces"
|
||||||
|
"gitea.unprism.ru/KRBL/Agate/store"
|
||||||
|
)
|
||||||
|
|
||||||
|
// SnapshotServer implements the gRPC server for snapshots
|
||||||
|
type SnapshotServer struct {
|
||||||
|
UnimplementedSnapshotServiceServer
|
||||||
|
manager interfaces.SnapshotManager
|
||||||
|
server *grpc.Server
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewSnapshotServer creates a new snapshot server
|
||||||
|
func NewSnapshotServer(manager interfaces.SnapshotManager) *SnapshotServer {
|
||||||
|
return &SnapshotServer{
|
||||||
|
manager: manager,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start starts the gRPC server on the specified address
|
||||||
|
func (s *SnapshotServer) Start(ctx context.Context, address string) error {
|
||||||
|
lis, err := net.Listen("tcp", address)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to listen on %s: %w", address, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.server = grpc.NewServer()
|
||||||
|
RegisterSnapshotServiceServer(s.server, s)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
if err := s.server.Serve(lis); err != nil {
|
||||||
|
fmt.Printf("Server error: %v\n", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
fmt.Printf("Server started on %s\n", address)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop gracefully stops the server
|
||||||
|
func (s *SnapshotServer) Stop(ctx context.Context) error {
|
||||||
|
if s.server != nil {
|
||||||
|
s.server.GracefulStop()
|
||||||
|
fmt.Println("Server stopped")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListSnapshots implements the gRPC ListSnapshots method
|
||||||
|
func (s *SnapshotServer) ListSnapshots(ctx context.Context, req *ListSnapshotsRequest) (*ListSnapshotsResponse, error) {
|
||||||
|
snapshots, err := s.manager.ListSnapshots(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to list snapshots: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
response := &ListSnapshotsResponse{
|
||||||
|
Snapshots: make([]*SnapshotInfo, 0, len(snapshots)),
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, snapshot := range snapshots {
|
||||||
|
response.Snapshots = append(response.Snapshots, convertToGrpcSnapshotInfo(snapshot))
|
||||||
|
}
|
||||||
|
|
||||||
|
return response, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetSnapshotDetails implements the gRPC GetSnapshotDetails method
|
||||||
|
func (s *SnapshotServer) GetSnapshotDetails(ctx context.Context, req *GetSnapshotDetailsRequest) (*SnapshotDetails, error) {
|
||||||
|
snapshot, err := s.manager.GetSnapshotDetails(ctx, req.SnapshotId)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to get snapshot details: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
response := &SnapshotDetails{
|
||||||
|
Info: convertToGrpcSnapshotInfo(store.SnapshotInfo{
|
||||||
|
ID: snapshot.ID,
|
||||||
|
Name: snapshot.Name,
|
||||||
|
ParentID: snapshot.ParentID,
|
||||||
|
CreationTime: snapshot.CreationTime,
|
||||||
|
}),
|
||||||
|
Files: make([]*FileInfo, 0, len(snapshot.Files)),
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, file := range snapshot.Files {
|
||||||
|
response.Files = append(response.Files, &FileInfo{
|
||||||
|
Path: file.Path,
|
||||||
|
SizeBytes: file.Size,
|
||||||
|
Sha256Hash: file.SHA256,
|
||||||
|
IsDir: file.IsDir,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return response, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// DownloadFile implements the gRPC DownloadFile method
|
||||||
|
func (s *SnapshotServer) DownloadFile(req *DownloadFileRequest, stream grpc.ServerStreamingServer[DownloadFileResponse]) error {
|
||||||
|
// Open the file from the snapshot
|
||||||
|
fileReader, err := s.manager.OpenFile(context.Background(), req.SnapshotId, req.FilePath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to open file: %w", err)
|
||||||
|
}
|
||||||
|
defer fileReader.Close()
|
||||||
|
|
||||||
|
// Read the file in chunks and send them to the client
|
||||||
|
buffer := make([]byte, 64*1024) // 64KB chunks
|
||||||
|
for {
|
||||||
|
n, err := fileReader.Read(buffer)
|
||||||
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to read file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send the chunk to the client
|
||||||
|
if err := stream.Send(&DownloadFileResponse{
|
||||||
|
ChunkData: buffer[:n],
|
||||||
|
}); err != nil {
|
||||||
|
return fmt.Errorf("failed to send chunk: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper function to convert store.SnapshotInfo to grpc.SnapshotInfo
|
||||||
|
func convertToGrpcSnapshotInfo(info store.SnapshotInfo) *SnapshotInfo {
|
||||||
|
return &SnapshotInfo{
|
||||||
|
Id: info.ID,
|
||||||
|
Name: info.Name,
|
||||||
|
ParentId: info.ParentID,
|
||||||
|
CreationTime: timestamppb.New(info.CreationTime),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RunServer is a helper function to create and start a snapshot server
|
||||||
|
func RunServer(ctx context.Context, manager interfaces.SnapshotManager, address string) (*SnapshotServer, error) {
|
||||||
|
server := NewSnapshotServer(manager)
|
||||||
|
if err := server.Start(ctx, address); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return server, nil
|
||||||
|
}
|
@ -476,7 +476,7 @@ const file_snapshot_proto_rawDesc = "" +
|
|||||||
"\x0fSnapshotService\x12k\n" +
|
"\x0fSnapshotService\x12k\n" +
|
||||||
"\rListSnapshots\x12 .agate.grpc.ListSnapshotsRequest\x1a!.agate.grpc.ListSnapshotsResponse\"\x15\x82\xd3\xe4\x93\x02\x0f\x12\r/v1/snapshots\x12}\n" +
|
"\rListSnapshots\x12 .agate.grpc.ListSnapshotsRequest\x1a!.agate.grpc.ListSnapshotsResponse\"\x15\x82\xd3\xe4\x93\x02\x0f\x12\r/v1/snapshots\x12}\n" +
|
||||||
"\x12GetSnapshotDetails\x12%.agate.grpc.GetSnapshotDetailsRequest\x1a\x1b.agate.grpc.SnapshotDetails\"#\x82\xd3\xe4\x93\x02\x1d\x12\x1b/v1/snapshots/{snapshot_id}\x12\x8a\x01\n" +
|
"\x12GetSnapshotDetails\x12%.agate.grpc.GetSnapshotDetailsRequest\x1a\x1b.agate.grpc.SnapshotDetails\"#\x82\xd3\xe4\x93\x02\x1d\x12\x1b/v1/snapshots/{snapshot_id}\x12\x8a\x01\n" +
|
||||||
"\fDownloadFile\x12\x1f.agate.grpc.DownloadFileRequest\x1a .agate.grpc.DownloadFileResponse\"5\x82\xd3\xe4\x93\x02/\x12-/v1/snapshots/{snapshot_id}/files/{file_path}0\x01B\x1cZ\x1aunprism.ru/KRBL/agate/grpcb\x06proto3"
|
"\fDownloadFile\x12\x1f.agate.grpc.DownloadFileRequest\x1a .agate.grpc.DownloadFileResponse\"5\x82\xd3\xe4\x93\x02/\x12-/v1/snapshots/{snapshot_id}/files/{file_path}0\x01B\"Z gitea.unprism.ru/KRBL/Agate/grpcb\x06proto3"
|
||||||
|
|
||||||
var (
|
var (
|
||||||
file_snapshot_proto_rawDescOnce sync.Once
|
file_snapshot_proto_rawDescOnce sync.Once
|
||||||
|
@ -5,7 +5,7 @@ package agate.grpc;
|
|||||||
import "google/protobuf/timestamp.proto";
|
import "google/protobuf/timestamp.proto";
|
||||||
import "google/api/annotations.proto"; // Добавлено для HTTP mapping
|
import "google/api/annotations.proto"; // Добавлено для HTTP mapping
|
||||||
|
|
||||||
option go_package = "unprism.ru/KRBL/agate/grpc";
|
option go_package = "gitea.unprism.ru/KRBL/Agate/grpc";
|
||||||
|
|
||||||
// Сервис для управления снапшотами
|
// Сервис для управления снапшотами
|
||||||
service SnapshotService {
|
service SnapshotService {
|
||||||
|
47
interfaces/snapshot.go
Normal file
47
interfaces/snapshot.go
Normal file
@ -0,0 +1,47 @@
|
|||||||
|
package interfaces
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"gitea.unprism.ru/KRBL/Agate/store"
|
||||||
|
)
|
||||||
|
|
||||||
|
// SnapshotManager defines the interface that the server needs to interact with snapshots
|
||||||
|
type SnapshotManager interface {
|
||||||
|
// GetSnapshotDetails retrieves detailed metadata for a specific snapshot
|
||||||
|
GetSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error)
|
||||||
|
|
||||||
|
// ListSnapshots retrieves a list of all available snapshots
|
||||||
|
ListSnapshots(ctx context.Context) ([]store.SnapshotInfo, error)
|
||||||
|
|
||||||
|
// OpenFile retrieves and opens a file from the specified snapshot
|
||||||
|
OpenFile(ctx context.Context, snapshotID string, filePath string) (io.ReadCloser, error)
|
||||||
|
|
||||||
|
// CreateSnapshot creates a new snapshot from the specified source directory
|
||||||
|
CreateSnapshot(ctx context.Context, sourceDir string, name string, parentID string) (*store.Snapshot, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// SnapshotServer defines the interface for a server that can share snapshots
|
||||||
|
type SnapshotServer interface {
|
||||||
|
// Start initializes and begins the server's operation
|
||||||
|
Start(ctx context.Context) error
|
||||||
|
|
||||||
|
// Stop gracefully shuts down the server
|
||||||
|
Stop(ctx context.Context) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// SnapshotClient defines the interface for a client that can connect to a server and download snapshots
|
||||||
|
type SnapshotClient interface {
|
||||||
|
// ListSnapshots retrieves a list of snapshots from the server
|
||||||
|
ListSnapshots(ctx context.Context) ([]store.SnapshotInfo, error)
|
||||||
|
|
||||||
|
// FetchSnapshotDetails retrieves detailed information about a specific snapshot
|
||||||
|
FetchSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error)
|
||||||
|
|
||||||
|
// DownloadSnapshot downloads a snapshot from the server
|
||||||
|
DownloadSnapshot(ctx context.Context, snapshotID string, targetDir string, localParentID string) error
|
||||||
|
|
||||||
|
// Close closes the connection to the server
|
||||||
|
Close() error
|
||||||
|
}
|
33
manager.go
33
manager.go
@ -13,9 +13,9 @@ import (
|
|||||||
|
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
|
|
||||||
"unprism.ru/KRBL/agate/archive"
|
"gitea.unprism.ru/KRBL/Agate/archive"
|
||||||
"unprism.ru/KRBL/agate/hash"
|
"gitea.unprism.ru/KRBL/Agate/hash"
|
||||||
"unprism.ru/KRBL/agate/store"
|
"gitea.unprism.ru/KRBL/Agate/store"
|
||||||
)
|
)
|
||||||
|
|
||||||
type SnapshotManagerData struct {
|
type SnapshotManagerData struct {
|
||||||
@ -59,12 +59,20 @@ func (data *SnapshotManagerData) CreateSnapshot(ctx context.Context, sourceDir s
|
|||||||
// Generate a unique ID for the snapshot
|
// Generate a unique ID for the snapshot
|
||||||
snapshotID := uuid.New().String()
|
snapshotID := uuid.New().String()
|
||||||
|
|
||||||
// Create a temporary file for the archive
|
// Clean the active directory to avoid conflicts
|
||||||
tempFile, err := os.CreateTemp("", "agate-snapshot-*.zip")
|
if err := data.blobStore.CleanActiveDir(ctx); err != nil {
|
||||||
if err != nil {
|
return nil, fmt.Errorf("failed to clean active directory: %w", err)
|
||||||
return nil, fmt.Errorf("failed to create temporary file: %w", err)
|
}
|
||||||
|
|
||||||
|
// Get the active directory for operations
|
||||||
|
activeDir := data.blobStore.GetActiveDir()
|
||||||
|
|
||||||
|
// Create a temporary file for the archive in the active directory
|
||||||
|
tempFilePath := filepath.Join(activeDir, "temp-"+snapshotID+".zip")
|
||||||
|
tempFile, err := os.Create(tempFilePath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to create temporary file in active directory: %w", err)
|
||||||
}
|
}
|
||||||
tempFilePath := tempFile.Name()
|
|
||||||
tempFile.Close() // Close it as CreateArchive will reopen it
|
tempFile.Close() // Close it as CreateArchive will reopen it
|
||||||
defer os.Remove(tempFilePath) // Clean up temp file after we're done
|
defer os.Remove(tempFilePath) // Clean up temp file after we're done
|
||||||
|
|
||||||
@ -253,8 +261,15 @@ func (data *SnapshotManagerData) ExtractSnapshot(ctx context.Context, snapshotID
|
|||||||
if snapshotID == "" {
|
if snapshotID == "" {
|
||||||
return errors.New("snapshot ID cannot be empty")
|
return errors.New("snapshot ID cannot be empty")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If no specific path is provided, use the active directory
|
||||||
if path == "" {
|
if path == "" {
|
||||||
return errors.New("target path cannot be empty")
|
// Clean the active directory to avoid conflicts
|
||||||
|
if err := data.blobStore.CleanActiveDir(ctx); err != nil {
|
||||||
|
return fmt.Errorf("failed to clean active directory: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
path = filepath.Join(data.blobStore.GetActiveDir(), snapshotID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// First check if the snapshot exists
|
// First check if the snapshot exists
|
||||||
|
242
remote/client.go
Normal file
242
remote/client.go
Normal file
@ -0,0 +1,242 @@
|
|||||||
|
package remote
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
|
||||||
|
stdgrpc "google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/credentials/insecure"
|
||||||
|
|
||||||
|
agateGrpc "gitea.unprism.ru/KRBL/Agate/grpc"
|
||||||
|
"gitea.unprism.ru/KRBL/Agate/interfaces"
|
||||||
|
"gitea.unprism.ru/KRBL/Agate/store"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Client represents a client for connecting to a remote snapshot server
|
||||||
|
// It implements the interfaces.SnapshotClient interface
|
||||||
|
type Client struct {
|
||||||
|
conn *stdgrpc.ClientConn
|
||||||
|
client agateGrpc.SnapshotServiceClient
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure Client implements interfaces.SnapshotClient
|
||||||
|
var _ interfaces.SnapshotClient = (*Client)(nil)
|
||||||
|
|
||||||
|
// NewClient creates a new client connected to the specified address
|
||||||
|
func NewClient(address string) (*Client, error) {
|
||||||
|
// Connect to the server with insecure credentials (for simplicity)
|
||||||
|
conn, err := stdgrpc.Dial(address, stdgrpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to connect to server at %s: %w", address, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the gRPC client
|
||||||
|
client := agateGrpc.NewSnapshotServiceClient(conn)
|
||||||
|
|
||||||
|
return &Client{
|
||||||
|
conn: conn,
|
||||||
|
client: client,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Close closes the connection to the server
|
||||||
|
func (c *Client) Close() error {
|
||||||
|
if c.conn != nil {
|
||||||
|
return c.conn.Close()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListSnapshots retrieves a list of snapshots from the remote server
|
||||||
|
func (c *Client) ListSnapshots(ctx context.Context) ([]store.SnapshotInfo, error) {
|
||||||
|
response, err := c.client.ListSnapshots(ctx, &agateGrpc.ListSnapshotsRequest{})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to list snapshots: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert gRPC snapshot info to store.SnapshotInfo
|
||||||
|
snapshots := make([]store.SnapshotInfo, 0, len(response.Snapshots))
|
||||||
|
for _, snapshot := range response.Snapshots {
|
||||||
|
snapshots = append(snapshots, store.SnapshotInfo{
|
||||||
|
ID: snapshot.Id,
|
||||||
|
Name: snapshot.Name,
|
||||||
|
ParentID: snapshot.ParentId,
|
||||||
|
CreationTime: snapshot.CreationTime.AsTime(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return snapshots, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FetchSnapshotDetails retrieves detailed information about a specific snapshot
|
||||||
|
func (c *Client) FetchSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error) {
|
||||||
|
response, err := c.client.GetSnapshotDetails(ctx, &agateGrpc.GetSnapshotDetailsRequest{
|
||||||
|
SnapshotId: snapshotID,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to get snapshot details: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert gRPC snapshot details to store.Snapshot
|
||||||
|
snapshot := &store.Snapshot{
|
||||||
|
ID: response.Info.Id,
|
||||||
|
Name: response.Info.Name,
|
||||||
|
ParentID: response.Info.ParentId,
|
||||||
|
CreationTime: response.Info.CreationTime.AsTime(),
|
||||||
|
Files: make([]store.FileInfo, 0, len(response.Files)),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Convert file info
|
||||||
|
for _, file := range response.Files {
|
||||||
|
snapshot.Files = append(snapshot.Files, store.FileInfo{
|
||||||
|
Path: file.Path,
|
||||||
|
Size: file.SizeBytes,
|
||||||
|
IsDir: file.IsDir,
|
||||||
|
SHA256: file.Sha256Hash,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return snapshot, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// DownloadSnapshot downloads a snapshot from the server
|
||||||
|
// This implementation downloads each file individually to optimize bandwidth usage
|
||||||
|
func (c *Client) DownloadSnapshot(ctx context.Context, snapshotID string, targetDir string, localParentID string) error {
|
||||||
|
// Get snapshot details
|
||||||
|
snapshot, err := c.FetchSnapshotDetails(ctx, snapshotID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get snapshot details: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create target directory if it doesn't exist
|
||||||
|
if err := os.MkdirAll(targetDir, 0755); err != nil {
|
||||||
|
return fmt.Errorf("failed to create target directory: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// If a local parent is specified, get its details to compare files
|
||||||
|
var localParentFiles map[string]store.FileInfo
|
||||||
|
if localParentID != "" {
|
||||||
|
localParent, err := c.FetchSnapshotDetails(ctx, localParentID)
|
||||||
|
if err == nil {
|
||||||
|
// Create a map of file paths to file info for quick lookup
|
||||||
|
localParentFiles = make(map[string]store.FileInfo, len(localParent.Files))
|
||||||
|
for _, file := range localParent.Files {
|
||||||
|
localParentFiles[file.Path] = file
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Download each file
|
||||||
|
for _, file := range snapshot.Files {
|
||||||
|
// Skip directories, we'll create them when needed
|
||||||
|
if file.IsDir {
|
||||||
|
// Create directory
|
||||||
|
dirPath := filepath.Join(targetDir, file.Path)
|
||||||
|
if err := os.MkdirAll(dirPath, 0755); err != nil {
|
||||||
|
return fmt.Errorf("failed to create directory %s: %w", dirPath, err)
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if we can skip downloading this file
|
||||||
|
if localParentFiles != nil {
|
||||||
|
if parentFile, exists := localParentFiles[file.Path]; exists && parentFile.SHA256 == file.SHA256 {
|
||||||
|
// File exists in parent with same hash, copy it instead of downloading
|
||||||
|
parentFilePath := filepath.Join(targetDir, "..", localParentID, file.Path)
|
||||||
|
targetFilePath := filepath.Join(targetDir, file.Path)
|
||||||
|
|
||||||
|
// Ensure parent directory exists
|
||||||
|
if err := os.MkdirAll(filepath.Dir(targetFilePath), 0755); err != nil {
|
||||||
|
return fmt.Errorf("failed to create directory for %s: %w", targetFilePath, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Copy the file
|
||||||
|
if err := copyFile(parentFilePath, targetFilePath); err != nil {
|
||||||
|
// If copy fails, fall back to downloading
|
||||||
|
fmt.Printf("Failed to copy file %s, will download instead: %v\n", file.Path, err)
|
||||||
|
} else {
|
||||||
|
// Skip to next file
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Download the file
|
||||||
|
if err := c.downloadFile(ctx, snapshotID, file.Path, filepath.Join(targetDir, file.Path)); err != nil {
|
||||||
|
return fmt.Errorf("failed to download file %s: %w", file.Path, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// downloadFile downloads a single file from the server
|
||||||
|
func (c *Client) downloadFile(ctx context.Context, snapshotID, filePath, targetPath string) error {
|
||||||
|
// Create the request
|
||||||
|
req := &agateGrpc.DownloadFileRequest{
|
||||||
|
SnapshotId: snapshotID,
|
||||||
|
FilePath: filePath,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start streaming the file
|
||||||
|
stream, err := c.client.DownloadFile(ctx, req)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to start file download: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure the target directory exists
|
||||||
|
if err := os.MkdirAll(filepath.Dir(targetPath), 0755); err != nil {
|
||||||
|
return fmt.Errorf("failed to create directory for %s: %w", targetPath, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the target file
|
||||||
|
file, err := os.Create(targetPath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create file %s: %w", targetPath, err)
|
||||||
|
}
|
||||||
|
defer file.Close()
|
||||||
|
|
||||||
|
// Receive and write chunks
|
||||||
|
for {
|
||||||
|
resp, err := stream.Recv()
|
||||||
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error receiving file chunk: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write the chunk to the file
|
||||||
|
if _, err := file.Write(resp.ChunkData); err != nil {
|
||||||
|
return fmt.Errorf("error writing to file: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper function to copy a file
|
||||||
|
func copyFile(src, dst string) error {
|
||||||
|
sourceFile, err := os.Open(src)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer sourceFile.Close()
|
||||||
|
|
||||||
|
destFile, err := os.Create(dst)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer destFile.Close()
|
||||||
|
|
||||||
|
_, err = io.Copy(destFile, sourceFile)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Connect creates a new client connected to the specified address
|
||||||
|
func Connect(address string) (*Client, error) {
|
||||||
|
return NewClient(address)
|
||||||
|
}
|
155
remote/server.go
Normal file
155
remote/server.go
Normal file
@ -0,0 +1,155 @@
|
|||||||
|
package remote
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net"
|
||||||
|
|
||||||
|
stdgrpc "google.golang.org/grpc"
|
||||||
|
"google.golang.org/protobuf/types/known/timestamppb"
|
||||||
|
|
||||||
|
agateGrpc "gitea.unprism.ru/KRBL/Agate/grpc"
|
||||||
|
"gitea.unprism.ru/KRBL/Agate/interfaces"
|
||||||
|
"gitea.unprism.ru/KRBL/Agate/store"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Server implements the gRPC server for snapshots
|
||||||
|
type Server struct {
|
||||||
|
agateGrpc.UnimplementedSnapshotServiceServer
|
||||||
|
manager interfaces.SnapshotManager
|
||||||
|
server *stdgrpc.Server
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewServer creates a new snapshot server
|
||||||
|
func NewServer(manager interfaces.SnapshotManager) *Server {
|
||||||
|
return &Server{
|
||||||
|
manager: manager,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start starts the gRPC server on the specified address
|
||||||
|
func (s *Server) Start(ctx context.Context, address string) error {
|
||||||
|
lis, err := net.Listen("tcp", address)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to listen on %s: %w", address, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.server = stdgrpc.NewServer()
|
||||||
|
agateGrpc.RegisterSnapshotServiceServer(s.server, s)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
if err := s.server.Serve(lis); err != nil {
|
||||||
|
fmt.Printf("Server error: %v\n", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
fmt.Printf("Server started on %s\n", address)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop gracefully stops the server
|
||||||
|
func (s *Server) Stop(ctx context.Context) error {
|
||||||
|
if s.server != nil {
|
||||||
|
s.server.GracefulStop()
|
||||||
|
fmt.Println("Server stopped")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListSnapshots implements the gRPC ListSnapshots method
|
||||||
|
func (s *Server) ListSnapshots(ctx context.Context, req *agateGrpc.ListSnapshotsRequest) (*agateGrpc.ListSnapshotsResponse, error) {
|
||||||
|
snapshots, err := s.manager.ListSnapshots(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to list snapshots: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
response := &agateGrpc.ListSnapshotsResponse{
|
||||||
|
Snapshots: make([]*agateGrpc.SnapshotInfo, 0, len(snapshots)),
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, snapshot := range snapshots {
|
||||||
|
response.Snapshots = append(response.Snapshots, convertToGrpcSnapshotInfo(snapshot))
|
||||||
|
}
|
||||||
|
|
||||||
|
return response, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetSnapshotDetails implements the gRPC GetSnapshotDetails method
|
||||||
|
func (s *Server) GetSnapshotDetails(ctx context.Context, req *agateGrpc.GetSnapshotDetailsRequest) (*agateGrpc.SnapshotDetails, error) {
|
||||||
|
snapshot, err := s.manager.GetSnapshotDetails(ctx, req.SnapshotId)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to get snapshot details: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
response := &agateGrpc.SnapshotDetails{
|
||||||
|
Info: convertToGrpcSnapshotInfo(store.SnapshotInfo{
|
||||||
|
ID: snapshot.ID,
|
||||||
|
Name: snapshot.Name,
|
||||||
|
ParentID: snapshot.ParentID,
|
||||||
|
CreationTime: snapshot.CreationTime,
|
||||||
|
}),
|
||||||
|
Files: make([]*agateGrpc.FileInfo, 0, len(snapshot.Files)),
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, file := range snapshot.Files {
|
||||||
|
response.Files = append(response.Files, &agateGrpc.FileInfo{
|
||||||
|
Path: file.Path,
|
||||||
|
SizeBytes: file.Size,
|
||||||
|
Sha256Hash: file.SHA256,
|
||||||
|
IsDir: file.IsDir,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return response, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// DownloadFile implements the gRPC DownloadFile method
|
||||||
|
func (s *Server) DownloadFile(req *agateGrpc.DownloadFileRequest, stream agateGrpc.SnapshotService_DownloadFileServer) error {
|
||||||
|
// Open the file from the snapshot
|
||||||
|
fileReader, err := s.manager.OpenFile(context.Background(), req.SnapshotId, req.FilePath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to open file: %w", err)
|
||||||
|
}
|
||||||
|
defer fileReader.Close()
|
||||||
|
|
||||||
|
// Read the file in chunks and send them to the client
|
||||||
|
buffer := make([]byte, 64*1024) // 64KB chunks
|
||||||
|
for {
|
||||||
|
n, err := fileReader.Read(buffer)
|
||||||
|
if err == io.EOF {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to read file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send the chunk to the client
|
||||||
|
if err := stream.Send(&agateGrpc.DownloadFileResponse{
|
||||||
|
ChunkData: buffer[:n],
|
||||||
|
}); err != nil {
|
||||||
|
return fmt.Errorf("failed to send chunk: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper function to convert store.SnapshotInfo to grpc.SnapshotInfo
|
||||||
|
func convertToGrpcSnapshotInfo(info store.SnapshotInfo) *agateGrpc.SnapshotInfo {
|
||||||
|
return &agateGrpc.SnapshotInfo{
|
||||||
|
Id: info.ID,
|
||||||
|
Name: info.Name,
|
||||||
|
ParentId: info.ParentID,
|
||||||
|
CreationTime: timestamppb.New(info.CreationTime),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RunServer is a helper function to create and start a snapshot server
|
||||||
|
func RunServer(ctx context.Context, manager interfaces.SnapshotManager, address string) (*Server, error) {
|
||||||
|
server := NewServer(manager)
|
||||||
|
if err := server.Start(ctx, address); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return server, nil
|
||||||
|
}
|
@ -2,8 +2,8 @@ package agate
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"gitea.unprism.ru/KRBL/Agate/store"
|
||||||
"io"
|
"io"
|
||||||
"unprism.ru/KRBL/agate/store"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// SnapshotManager is an interface that defines operations for managing and interacting with snapshots.
|
// SnapshotManager is an interface that defines operations for managing and interacting with snapshots.
|
||||||
|
@ -3,11 +3,11 @@ package filesystem
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"gitea.unprism.ru/KRBL/Agate"
|
||||||
|
"gitea.unprism.ru/KRBL/Agate/store"
|
||||||
"io"
|
"io"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"unprism.ru/KRBL/agate"
|
|
||||||
"unprism.ru/KRBL/agate/store"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const blobExtension = ".zip"
|
const blobExtension = ".zip"
|
||||||
@ -15,15 +15,26 @@ const blobExtension = ".zip"
|
|||||||
// fileSystemStore реализует интерфейс store.BlobStore с использованием локальной файловой системы.
|
// fileSystemStore реализует интерфейс store.BlobStore с использованием локальной файловой системы.
|
||||||
type fileSystemStore struct {
|
type fileSystemStore struct {
|
||||||
baseDir string // Директория для хранения блобов (архивов)
|
baseDir string // Директория для хранения блобов (архивов)
|
||||||
|
activeDir string // Директория для активных операций (создание и восстановление)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewFileSystemStore создает новое хранилище блобов в указанной директории.
|
// NewFileSystemStore создает новое хранилище блобов в указанной директории.
|
||||||
func NewFileSystemStore(baseDir string) (store.BlobStore, error) {
|
func NewFileSystemStore(baseDir string) (store.BlobStore, error) {
|
||||||
// Убедимся, что директория существует
|
// Убедимся, что базовая директория существует
|
||||||
if err := os.MkdirAll(baseDir, 0755); err != nil {
|
if err := os.MkdirAll(baseDir, 0755); err != nil {
|
||||||
return nil, fmt.Errorf("failed to create base directory %s for filesystem blob store: %w", baseDir, err)
|
return nil, fmt.Errorf("failed to create base directory %s for filesystem blob store: %w", baseDir, err)
|
||||||
}
|
}
|
||||||
return &fileSystemStore{baseDir: baseDir}, nil
|
|
||||||
|
// Создаем директорию для активных операций внутри базовой директории
|
||||||
|
activeDir := filepath.Join(baseDir, "active")
|
||||||
|
if err := os.MkdirAll(activeDir, 0755); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to create active directory %s for filesystem blob store: %w", activeDir, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &fileSystemStore{
|
||||||
|
baseDir: baseDir,
|
||||||
|
activeDir: activeDir,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// getBlobPath формирует полный путь к файлу блоба.
|
// getBlobPath формирует полный путь к файлу блоба.
|
||||||
@ -106,3 +117,27 @@ func (fs *fileSystemStore) GetBlobPath(ctx context.Context, snapshotID string) (
|
|||||||
// Файл существует, возвращаем путь
|
// Файл существует, возвращаем путь
|
||||||
return blobPath, nil
|
return blobPath, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetActiveDir возвращает путь к директории для активных операций.
|
||||||
|
func (fs *fileSystemStore) GetActiveDir() string {
|
||||||
|
return fs.activeDir
|
||||||
|
}
|
||||||
|
|
||||||
|
// CleanActiveDir очищает директорию для активных операций.
|
||||||
|
// Это полезно перед началом новых операций, чтобы избежать конфликтов.
|
||||||
|
func (fs *fileSystemStore) CleanActiveDir(ctx context.Context) error {
|
||||||
|
// Удаляем все файлы в активной директории, но сохраняем саму директорию
|
||||||
|
entries, err := os.ReadDir(fs.activeDir)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to read active directory: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, entry := range entries {
|
||||||
|
path := filepath.Join(fs.activeDir, entry.Name())
|
||||||
|
if err := os.RemoveAll(path); err != nil {
|
||||||
|
return fmt.Errorf("failed to remove %s from active directory: %w", path, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -6,12 +6,12 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"gitea.unprism.ru/KRBL/Agate"
|
||||||
|
"gitea.unprism.ru/KRBL/Agate/store"
|
||||||
_ "github.com/mattn/go-sqlite3"
|
_ "github.com/mattn/go-sqlite3"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"time"
|
"time"
|
||||||
"unprism.ru/KRBL/agate"
|
|
||||||
"unprism.ru/KRBL/agate/store"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -71,4 +71,11 @@ type BlobStore interface {
|
|||||||
// Это может быть полезно для функций пакета archive, которые работают с путями.
|
// Это может быть полезно для функций пакета archive, которые работают с путями.
|
||||||
// Возвращает agate.ErrNotFound, если блоб не найден.
|
// Возвращает agate.ErrNotFound, если блоб не найден.
|
||||||
GetBlobPath(ctx context.Context, snapshotID string) (string, error)
|
GetBlobPath(ctx context.Context, snapshotID string) (string, error)
|
||||||
|
|
||||||
|
// GetActiveDir возвращает путь к директории для активных операций (создание и восстановление).
|
||||||
|
GetActiveDir() string
|
||||||
|
|
||||||
|
// CleanActiveDir очищает директорию для активных операций.
|
||||||
|
// Это полезно перед началом новых операций, чтобы избежать конфликтов.
|
||||||
|
CleanActiveDir(ctx context.Context) error
|
||||||
}
|
}
|
||||||
|
@ -4,9 +4,9 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
"unprism.ru/KRBL/agate/store"
|
"gitea.unprism.ru/KRBL/Agate/store"
|
||||||
"unprism.ru/KRBL/agate/store/filesystem"
|
"gitea.unprism.ru/KRBL/Agate/store/filesystem"
|
||||||
"unprism.ru/KRBL/agate/store/sqlite"
|
"gitea.unprism.ru/KRBL/Agate/store/sqlite"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewDefaultMetadataStore creates a new SQLite-based metadata store.
|
// NewDefaultMetadataStore creates a new SQLite-based metadata store.
|
||||||
|
Reference in New Issue
Block a user