Introduce `currentSnapshotID` to track the active snapshot and persist its state in a file. This change ensures the current snapshot ID is restored during initialization and maintained consistently across snapshot operations.
337 lines
11 KiB
Go
337 lines
11 KiB
Go
package agate
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"gitea.unprism.ru/KRBL/Agate/grpc"
|
|
"os"
|
|
"path/filepath"
|
|
|
|
"gitea.unprism.ru/KRBL/Agate/store"
|
|
)
|
|
|
|
// AgateOptions defines configuration options for the Agate library.
|
|
type AgateOptions struct {
|
|
// WorkDir is the directory where snapshots will be stored and managed.
|
|
WorkDir string
|
|
|
|
// OpenFunc is called after a snapshot is restored to initialize resources.
|
|
// The parameter is the directory where the snapshot was extracted.
|
|
OpenFunc func(dir string) error
|
|
|
|
// CloseFunc is called before a snapshot is created or restored to clean up resources.
|
|
CloseFunc func() error
|
|
|
|
// MetadataStore is the implementation of the metadata store to use.
|
|
// Use the stores package to initialize the default implementation:
|
|
// metadataStore, err := stores.NewDefaultMetadataStore(metadataDir)
|
|
MetadataStore store.MetadataStore
|
|
|
|
// BlobStore is the implementation of the blob store to use.
|
|
// Use the stores package to initialize the default implementation:
|
|
// blobStore, err := stores.NewDefaultBlobStore(blobsDir)
|
|
BlobStore store.BlobStore
|
|
}
|
|
|
|
// Agate is the main entry point for the snapshot library.
|
|
type Agate struct {
|
|
manager SnapshotManager
|
|
options AgateOptions
|
|
metadataDir string
|
|
blobsDir string
|
|
currentSnapshotID string
|
|
currentIDFile string
|
|
}
|
|
|
|
// New initializes a new instance of the Agate library with the given options.
|
|
func New(options AgateOptions) (*Agate, error) {
|
|
if options.WorkDir == "" {
|
|
return nil, errors.New("work directory cannot be empty")
|
|
}
|
|
|
|
// Create the work directory if it doesn't exist
|
|
if err := os.MkdirAll(options.WorkDir, 0755); err != nil {
|
|
return nil, fmt.Errorf("failed to create work directory: %w", err)
|
|
}
|
|
|
|
// Create subdirectories for metadata and blobs
|
|
metadataDir := filepath.Join(options.WorkDir, "metadata")
|
|
blobsDir := filepath.Join(options.WorkDir, "blobs")
|
|
|
|
if err := os.MkdirAll(metadataDir, 0755); err != nil {
|
|
return nil, fmt.Errorf("failed to create metadata directory: %w", err)
|
|
}
|
|
|
|
if err := os.MkdirAll(blobsDir, 0755); err != nil {
|
|
return nil, fmt.Errorf("failed to create blobs directory: %w", err)
|
|
}
|
|
|
|
var metadataStore store.MetadataStore
|
|
var blobStore store.BlobStore
|
|
var err error
|
|
|
|
// Use provided stores or initialize default ones
|
|
if options.MetadataStore != nil {
|
|
metadataStore = options.MetadataStore
|
|
} else {
|
|
// For default implementation, the user needs to initialize and provide the stores
|
|
return nil, errors.New("metadata store must be provided")
|
|
}
|
|
|
|
if options.BlobStore != nil {
|
|
blobStore = options.BlobStore
|
|
} else {
|
|
// For default implementation, the user needs to initialize and provide the stores
|
|
return nil, errors.New("blob store must be provided")
|
|
}
|
|
|
|
// Create the snapshot manager
|
|
manager, err := CreateSnapshotManager(metadataStore, blobStore)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create snapshot manager: %w", err)
|
|
}
|
|
|
|
// Create a file path for storing the current snapshot ID
|
|
currentIDFile := filepath.Join(options.WorkDir, "current_snapshot_id")
|
|
|
|
agate := &Agate{
|
|
manager: manager,
|
|
options: options,
|
|
metadataDir: metadataDir,
|
|
blobsDir: blobsDir,
|
|
currentIDFile: currentIDFile,
|
|
}
|
|
|
|
// Load the current snapshot ID if it exists
|
|
if _, err := os.Stat(currentIDFile); err == nil {
|
|
data, err := os.ReadFile(currentIDFile)
|
|
if err == nil && len(data) > 0 {
|
|
agate.currentSnapshotID = string(data)
|
|
}
|
|
}
|
|
|
|
// Call OpenFunc if provided to initialize resources in the active directory
|
|
if options.OpenFunc != nil {
|
|
if err := options.OpenFunc(options.BlobStore.GetActiveDir()); err != nil {
|
|
return nil, fmt.Errorf("failed to open resources during initialization: %w", err)
|
|
}
|
|
}
|
|
|
|
return agate, nil
|
|
}
|
|
|
|
// SaveSnapshot creates a new snapshot from the current state of the active directory.
|
|
// If parentID is provided, it will be set as the parent of the new snapshot.
|
|
// If parentID is empty, it will use the ID of the snapshot currently loaded in the active directory.
|
|
// Returns the ID of the created snapshot.
|
|
func (a *Agate) SaveSnapshot(ctx context.Context, name string, parentID string) (string, error) {
|
|
// Call CloseFunc if provided
|
|
if a.options.CloseFunc != nil {
|
|
if err := a.options.CloseFunc(); err != nil {
|
|
return "", fmt.Errorf("failed to close resources before snapshot: %w", err)
|
|
}
|
|
}
|
|
|
|
// If parentID is not provided, use the current snapshot ID
|
|
effectiveParentID := parentID
|
|
|
|
// Create the snapshot
|
|
snapshot, err := a.manager.CreateSnapshot(ctx, a.options.BlobStore.GetActiveDir(), name, effectiveParentID)
|
|
if err != nil {
|
|
return "", fmt.Errorf("failed to create snapshot: %w", err)
|
|
}
|
|
|
|
// Update the current snapshot ID to the newly created snapshot
|
|
a.currentSnapshotID = snapshot.ID
|
|
|
|
// Save the current snapshot ID to a file
|
|
if err := a.saveCurrentSnapshotID(); err != nil {
|
|
return "", fmt.Errorf("failed to save current snapshot ID: %w", err)
|
|
}
|
|
|
|
// Call OpenFunc if provided
|
|
if a.options.OpenFunc != nil {
|
|
if err := a.options.OpenFunc(a.options.BlobStore.GetActiveDir()); err != nil {
|
|
return "", fmt.Errorf("failed to open resources after snapshot: %w", err)
|
|
}
|
|
}
|
|
|
|
return snapshot.ID, nil
|
|
}
|
|
|
|
// RestoreSnapshot extracts a snapshot to the active directory.
|
|
func (a *Agate) RestoreSnapshot(ctx context.Context, snapshotID string) error {
|
|
// Call CloseFunc if provided
|
|
if a.options.CloseFunc != nil {
|
|
if err := a.options.CloseFunc(); err != nil {
|
|
return fmt.Errorf("failed to close resources before restore: %w", err)
|
|
}
|
|
}
|
|
|
|
// Extract the snapshot
|
|
if err := a.manager.ExtractSnapshot(ctx, snapshotID, a.options.BlobStore.GetActiveDir()); err != nil {
|
|
return fmt.Errorf("failed to extract snapshot: %w", err)
|
|
}
|
|
|
|
// Save the ID of the snapshot that was restored
|
|
a.currentSnapshotID = snapshotID
|
|
|
|
// Save the current snapshot ID to a file
|
|
if err := a.saveCurrentSnapshotID(); err != nil {
|
|
return fmt.Errorf("failed to save current snapshot ID: %w", err)
|
|
}
|
|
|
|
// Call OpenFunc if provided
|
|
if a.options.OpenFunc != nil {
|
|
if err := a.options.OpenFunc(a.options.BlobStore.GetActiveDir()); err != nil {
|
|
return fmt.Errorf("failed to open resources after restore: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// RestoreSnapshot extracts a snapshot to the directory.
|
|
func (a *Agate) RestoreSnapshotToDir(ctx context.Context, snapshotID string, dir string) error {
|
|
// Call CloseFunc if provided
|
|
if a.options.CloseFunc != nil {
|
|
if err := a.options.CloseFunc(); err != nil {
|
|
return fmt.Errorf("failed to close resources before restore: %w", err)
|
|
}
|
|
}
|
|
|
|
// Extract the snapshot
|
|
if err := a.manager.ExtractSnapshot(ctx, snapshotID, dir); err != nil {
|
|
return fmt.Errorf("failed to extract snapshot: %w", err)
|
|
}
|
|
|
|
// If restoring to the active directory, save the snapshot ID
|
|
if dir == a.options.BlobStore.GetActiveDir() {
|
|
a.currentSnapshotID = snapshotID
|
|
|
|
// Save the current snapshot ID to a file
|
|
if err := a.saveCurrentSnapshotID(); err != nil {
|
|
return fmt.Errorf("failed to save current snapshot ID: %w", err)
|
|
}
|
|
}
|
|
|
|
// Call OpenFunc if provided
|
|
if a.options.OpenFunc != nil {
|
|
if err := a.options.OpenFunc(dir); err != nil {
|
|
return fmt.Errorf("failed to open resources after restore: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ListSnapshots returns a list of all available snapshots.
|
|
func (a *Agate) ListSnapshots(ctx context.Context) ([]store.SnapshotInfo, error) {
|
|
return a.manager.ListSnapshots(ctx)
|
|
}
|
|
|
|
// GetSnapshotDetails returns detailed information about a specific snapshot.
|
|
func (a *Agate) GetSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error) {
|
|
return a.manager.GetSnapshotDetails(ctx, snapshotID)
|
|
}
|
|
|
|
// DeleteSnapshot removes a snapshot.
|
|
func (a *Agate) DeleteSnapshot(ctx context.Context, snapshotID string) error {
|
|
return a.manager.DeleteSnapshot(ctx, snapshotID)
|
|
}
|
|
|
|
// saveCurrentSnapshotID saves the current snapshot ID to a file in the WorkDir
|
|
func (a *Agate) saveCurrentSnapshotID() error {
|
|
if a.currentSnapshotID == "" {
|
|
// If there's no current snapshot ID, remove the file if it exists
|
|
if _, err := os.Stat(a.currentIDFile); err == nil {
|
|
return os.Remove(a.currentIDFile)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Write the current snapshot ID to the file
|
|
return os.WriteFile(a.currentIDFile, []byte(a.currentSnapshotID), 0644)
|
|
}
|
|
|
|
// Close releases all resources used by the Agate instance.
|
|
func (a *Agate) Close() error {
|
|
// Currently, we don't have a way to close the manager directly
|
|
// This would be a good addition in the future
|
|
return nil
|
|
}
|
|
|
|
// StartServer starts a gRPC server to share snapshots.
|
|
func (a *Agate) StartServer(ctx context.Context, address string) error {
|
|
_, err := grpc.RunServer(ctx, a.manager, address)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to start server: %w", err)
|
|
}
|
|
|
|
// We don't store the server reference because we don't have a way to stop it yet
|
|
// In a future version, we could add a StopServer method
|
|
|
|
return nil
|
|
}
|
|
|
|
// ConnectRemote connects to a remote snapshot server.
|
|
// Returns a client that can be used to interact with the remote server.
|
|
func (a *Agate) ConnectRemote(address string) (*grpc.SnapshotClient, error) {
|
|
client, err := grpc.ConnectToServer(address)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to connect to remote server: %w", err)
|
|
}
|
|
|
|
return client, nil
|
|
}
|
|
|
|
// GetRemoteSnapshotList retrieves a list of snapshots from a remote server.
|
|
func (a *Agate) GetRemoteSnapshotList(ctx context.Context, address string) ([]store.SnapshotInfo, error) {
|
|
client, err := a.ConnectRemote(address)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer client.Close()
|
|
|
|
return client.ListSnapshots(ctx)
|
|
}
|
|
|
|
// GetRemoteSnapshot downloads a snapshot from a remote server.
|
|
// If localParentID is provided, it will be used to optimize the download by skipping files that already exist locally.
|
|
func (a *Agate) GetRemoteSnapshot(ctx context.Context, address string, snapshotID string, localParentID string) error {
|
|
client, err := a.ConnectRemote(address)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer client.Close()
|
|
|
|
// Create a temporary directory for the downloaded snapshot
|
|
tempDir := filepath.Join(a.options.WorkDir, "temp", snapshotID)
|
|
if err := os.MkdirAll(tempDir, 0755); err != nil {
|
|
return fmt.Errorf("failed to create temporary directory: %w", err)
|
|
}
|
|
|
|
// Download the snapshot
|
|
if err := client.DownloadSnapshot(ctx, snapshotID, tempDir, localParentID); err != nil {
|
|
return fmt.Errorf("failed to download snapshot: %w", err)
|
|
}
|
|
|
|
// Get the snapshot details to create a local copy
|
|
details, err := client.FetchSnapshotDetails(ctx, snapshotID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get snapshot details: %w", err)
|
|
}
|
|
|
|
// Create a local snapshot from the downloaded files
|
|
_, err = a.manager.CreateSnapshot(ctx, tempDir, details.Name, localParentID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create local snapshot: %w", err)
|
|
}
|
|
|
|
// Clean up the temporary directory
|
|
os.RemoveAll(tempDir)
|
|
|
|
return nil
|
|
}
|