Introduce methods to register local snapshots from archives and to download or restore snapshot metadata, improving snapshot management capabilities. Update README with usage examples.
782 lines
25 KiB
Go
782 lines
25 KiB
Go
package agate
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"gitea.unprism.ru/KRBL/Agate/archive"
|
|
"gitea.unprism.ru/KRBL/Agate/grpc"
|
|
"gitea.unprism.ru/KRBL/Agate/hash"
|
|
"gitea.unprism.ru/KRBL/Agate/interfaces"
|
|
"io"
|
|
"log"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
|
|
"gitea.unprism.ru/KRBL/Agate/store"
|
|
"gitea.unprism.ru/KRBL/Agate/stores"
|
|
)
|
|
|
|
// AgateOptions defines configuration options for the Agate library.
|
|
type AgateOptions struct {
|
|
// WorkDir is the directory where snapshots will be stored and managed.
|
|
WorkDir string
|
|
|
|
// OpenFunc is called after a snapshot is restored to initialize resources.
|
|
// The parameter is the directory where the snapshot was extracted.
|
|
OpenFunc func(dir string) error
|
|
|
|
// CloseFunc is called before a snapshot is created or restored to clean up resources.
|
|
CloseFunc func() error
|
|
|
|
// MetadataStore is the implementation of the metadata store to use.
|
|
// If nil, a default SQLite-based metadata store will be created automatically.
|
|
// Use the stores package to initialize a custom implementation:
|
|
// metadataStore, err := stores.NewDefaultMetadataStore(metadataDir)
|
|
MetadataStore store.MetadataStore
|
|
|
|
// BlobStore is the implementation of the blob store to use.
|
|
// If nil, a default filesystem-based blob store will be created automatically.
|
|
// Use the stores package to initialize a custom implementation:
|
|
// blobStore, err := stores.NewDefaultBlobStore(blobsDir)
|
|
BlobStore store.BlobStore
|
|
|
|
// CleanOnRestore specifies whether the target directory should be cleaned before restoring a snapshot.
|
|
CleanOnRestore bool
|
|
|
|
// Logger is the logger to use for output. If nil, logging is disabled.
|
|
Logger *log.Logger
|
|
}
|
|
|
|
// Agate is the main entry point for the snapshot library.
|
|
type Agate struct {
|
|
mutex sync.Mutex
|
|
manager interfaces.SnapshotManager
|
|
options AgateOptions
|
|
metadataDir string
|
|
blobsDir string
|
|
currentSnapshotID string
|
|
currentIDFile string
|
|
}
|
|
|
|
// New initializes a new instance of the Agate library with the given options.
|
|
func New(options AgateOptions) (*Agate, error) {
|
|
if options.WorkDir == "" {
|
|
return nil, errors.New("work directory cannot be empty")
|
|
}
|
|
|
|
// Initialize logger if not provided
|
|
if options.Logger == nil {
|
|
options.Logger = log.New(io.Discard, "", 0)
|
|
}
|
|
|
|
// Create the work directory if it doesn't exist
|
|
if err := os.MkdirAll(options.WorkDir, 0755); err != nil {
|
|
return nil, fmt.Errorf("failed to create work directory: %w", err)
|
|
}
|
|
|
|
// Create subdirectories for metadata and blobs
|
|
metadataDir := filepath.Join(options.WorkDir, "metadata")
|
|
blobsDir := filepath.Join(options.WorkDir, "blobs")
|
|
|
|
if err := os.MkdirAll(metadataDir, 0755); err != nil {
|
|
return nil, fmt.Errorf("failed to create metadata directory: %w", err)
|
|
}
|
|
|
|
if err := os.MkdirAll(blobsDir, 0755); err != nil {
|
|
return nil, fmt.Errorf("failed to create blobs directory: %w", err)
|
|
}
|
|
|
|
var metadataStore store.MetadataStore
|
|
var blobStore store.BlobStore
|
|
var err error
|
|
|
|
// Use provided stores or initialize default ones
|
|
if options.MetadataStore != nil && options.BlobStore != nil {
|
|
// Use the provided stores
|
|
metadataStore = options.MetadataStore
|
|
blobStore = options.BlobStore
|
|
} else if options.MetadataStore == nil && options.BlobStore == nil {
|
|
// Initialize both stores with default implementations
|
|
metadataStore, blobStore, err = stores.InitDefaultStores(options.WorkDir)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to initialize default stores: %w", err)
|
|
}
|
|
// Update options with the created stores
|
|
options.MetadataStore = metadataStore
|
|
options.BlobStore = blobStore
|
|
} else if options.MetadataStore == nil {
|
|
// Initialize only the metadata store
|
|
metadataStore, err = stores.NewDefaultMetadataStore(metadataDir)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to initialize default metadata store: %w", err)
|
|
}
|
|
blobStore = options.BlobStore
|
|
// Update options with the created metadata store
|
|
options.MetadataStore = metadataStore
|
|
} else {
|
|
// Initialize only the blob store
|
|
blobStore, err = stores.NewDefaultBlobStore(blobsDir)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to initialize default blob store: %w", err)
|
|
}
|
|
metadataStore = options.MetadataStore
|
|
// Update options with the created blob store
|
|
options.BlobStore = blobStore
|
|
}
|
|
|
|
// Create the snapshot manager
|
|
manager, err := CreateSnapshotManager(metadataStore, blobStore, options.Logger)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create snapshot manager: %w", err)
|
|
}
|
|
|
|
// Create a file path for storing the current snapshot ID
|
|
currentIDFile := filepath.Join(options.WorkDir, "current_snapshot_id")
|
|
|
|
agate := &Agate{
|
|
manager: manager,
|
|
options: options,
|
|
metadataDir: metadataDir,
|
|
blobsDir: blobsDir,
|
|
currentIDFile: currentIDFile,
|
|
}
|
|
|
|
// Load the current snapshot ID if it exists
|
|
if _, err := os.Stat(currentIDFile); err == nil {
|
|
data, err := os.ReadFile(currentIDFile)
|
|
if err == nil && len(data) > 0 {
|
|
agate.currentSnapshotID = string(data)
|
|
}
|
|
}
|
|
|
|
// Call OpenFunc if provided to initialize resources in the active directory
|
|
if options.OpenFunc != nil {
|
|
if err := options.OpenFunc(blobStore.GetActiveDir()); err != nil {
|
|
return nil, fmt.Errorf("failed to open resources during initialization: %w", err)
|
|
}
|
|
}
|
|
|
|
return agate, nil
|
|
}
|
|
|
|
func (a *Agate) GetActiveDir() string {
|
|
return a.options.BlobStore.GetActiveDir()
|
|
}
|
|
|
|
func (a *Agate) GetMetadataDir() string {
|
|
return a.metadataDir
|
|
}
|
|
|
|
func (a *Agate) GetBlobsDir() string {
|
|
return a.blobsDir
|
|
}
|
|
|
|
// SaveSnapshot creates a new snapshot from the current state of the active directory.
|
|
// If parentID is provided, it will be set as the parent of the new snapshot.
|
|
// If parentID is empty, it will use the ID of the snapshot currently loaded in the active directory.
|
|
// Returns the ID of the created snapshot.
|
|
func (a *Agate) SaveSnapshot(ctx context.Context, name string, parentID string) (string, error) {
|
|
a.mutex.Lock()
|
|
defer a.mutex.Unlock()
|
|
|
|
a.options.Logger.Printf("Creating new snapshot with name: %s", name)
|
|
|
|
// Call CloseFunc if provided
|
|
if a.options.CloseFunc != nil {
|
|
if err := a.options.CloseFunc(); err != nil {
|
|
return "", fmt.Errorf("failed to close resources before snapshot: %w", err)
|
|
}
|
|
}
|
|
|
|
defer func() {
|
|
if a.options.OpenFunc != nil {
|
|
if err := a.options.OpenFunc(a.options.BlobStore.GetActiveDir()); err != nil {
|
|
a.options.Logger.Printf("ERROR: failed to open resources after snapshot creation: %v", err)
|
|
}
|
|
}
|
|
}()
|
|
|
|
// If parentID is not provided, use the current snapshot ID
|
|
if parentID == "" {
|
|
parentID = a.currentSnapshotID
|
|
}
|
|
|
|
effectiveParentID := parentID
|
|
|
|
// Create the snapshot
|
|
snapshot, err := a.manager.CreateSnapshot(ctx, a.options.BlobStore.GetActiveDir(), name, effectiveParentID)
|
|
if err != nil {
|
|
a.options.Logger.Printf("ERROR: failed to create snapshot: %v", err)
|
|
return "", fmt.Errorf("failed to create snapshot: %w", err)
|
|
}
|
|
|
|
a.options.Logger.Printf("Successfully created snapshot with ID: %s", snapshot.ID)
|
|
|
|
// Update the current snapshot ID to the newly created snapshot
|
|
a.currentSnapshotID = snapshot.ID
|
|
|
|
// Save the current snapshot ID to a file
|
|
if err := a.saveCurrentSnapshotID(); err != nil {
|
|
return "", fmt.Errorf("failed to save current snapshot ID: %w", err)
|
|
}
|
|
|
|
return snapshot.ID, nil
|
|
}
|
|
|
|
// RestoreSnapshot extracts a snapshot to the active directory.
|
|
func (a *Agate) RestoreSnapshot(ctx context.Context, snapshotID string) error {
|
|
a.mutex.Lock()
|
|
defer a.mutex.Unlock()
|
|
|
|
a.options.Logger.Printf("Restoring snapshot with ID: %s", snapshotID)
|
|
|
|
// Call CloseFunc if provided
|
|
if a.options.CloseFunc != nil {
|
|
if err := a.options.CloseFunc(); err != nil {
|
|
return fmt.Errorf("failed to close resources before restore: %w", err)
|
|
}
|
|
}
|
|
|
|
// Extract the snapshot
|
|
if err := a.manager.ExtractSnapshot(ctx, snapshotID, a.options.BlobStore.GetActiveDir(), a.options.CleanOnRestore); err != nil {
|
|
a.options.Logger.Printf("ERROR: failed to extract snapshot: %v", err)
|
|
return fmt.Errorf("failed to extract snapshot: %w", err)
|
|
}
|
|
|
|
a.options.Logger.Printf("Successfully restored snapshot with ID: %s", snapshotID)
|
|
|
|
// Save the ID of the snapshot that was restored
|
|
a.currentSnapshotID = snapshotID
|
|
|
|
// Save the current snapshot ID to a file
|
|
if err := a.saveCurrentSnapshotID(); err != nil {
|
|
return fmt.Errorf("failed to save current snapshot ID: %w", err)
|
|
}
|
|
|
|
// Call OpenFunc if provided
|
|
if a.options.OpenFunc != nil {
|
|
if err := a.options.OpenFunc(a.options.BlobStore.GetActiveDir()); err != nil {
|
|
a.options.Logger.Printf("ERROR: failed to open resources after restore: %v", err)
|
|
return fmt.Errorf("failed to open resources after restore: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// RestoreSnapshot extracts a snapshot to the directory.
|
|
func (a *Agate) RestoreSnapshotToDir(ctx context.Context, snapshotID string, dir string) error {
|
|
a.mutex.Lock()
|
|
defer a.mutex.Unlock()
|
|
|
|
// Call CloseFunc if provided
|
|
if a.options.CloseFunc != nil {
|
|
if err := a.options.CloseFunc(); err != nil {
|
|
return fmt.Errorf("failed to close resources before restore: %w", err)
|
|
}
|
|
}
|
|
|
|
defer func() {
|
|
if a.options.OpenFunc != nil {
|
|
if err := a.options.OpenFunc(dir); err != nil {
|
|
a.options.Logger.Printf("ERROR: failed to open resources after snapshot restore: %v", err)
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Extract the snapshot
|
|
if err := a.manager.ExtractSnapshot(ctx, snapshotID, dir, a.options.CleanOnRestore); err != nil {
|
|
return fmt.Errorf("failed to extract snapshot: %w", err)
|
|
}
|
|
|
|
// If restoring to the active directory, save the snapshot ID
|
|
if dir == a.options.BlobStore.GetActiveDir() {
|
|
a.currentSnapshotID = snapshotID
|
|
|
|
// Save the current snapshot ID to a file
|
|
if err := a.saveCurrentSnapshotID(); err != nil {
|
|
return fmt.Errorf("failed to save current snapshot ID: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ListSnapshots returns a list of all available snapshots.
|
|
func (a *Agate) ListSnapshots(ctx context.Context) ([]store.SnapshotInfo, error) {
|
|
// Create empty ListOptions since we don't have filtering/pagination in this API yet
|
|
opts := store.ListOptions{}
|
|
return a.manager.ListSnapshots(ctx, opts)
|
|
}
|
|
|
|
// GetSnapshotDetails returns detailed information about a specific snapshot.
|
|
func (a *Agate) GetSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error) {
|
|
return a.manager.GetSnapshotDetails(ctx, snapshotID)
|
|
}
|
|
|
|
// DeleteSnapshot removes a snapshot.
|
|
func (a *Agate) DeleteSnapshot(ctx context.Context, snapshotID string) error {
|
|
return a.manager.DeleteSnapshot(ctx, snapshotID)
|
|
}
|
|
|
|
// saveCurrentSnapshotID saves the current snapshot ID to a file in the WorkDir
|
|
func (a *Agate) saveCurrentSnapshotID() error {
|
|
if a.currentSnapshotID == "" {
|
|
// If there's no current snapshot ID, remove the file if it exists
|
|
if _, err := os.Stat(a.currentIDFile); err == nil {
|
|
return os.Remove(a.currentIDFile)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Write the current snapshot ID to the file
|
|
return os.WriteFile(a.currentIDFile, []byte(a.currentSnapshotID), 0644)
|
|
}
|
|
|
|
func (a *Agate) Open() error {
|
|
return a.options.OpenFunc(a.GetActiveDir())
|
|
}
|
|
|
|
// Close releases all resources used by the Agate instance.
|
|
func (a *Agate) Close() error {
|
|
if a.options.CloseFunc != nil {
|
|
return a.options.CloseFunc()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// StartServer starts a gRPC server to share snapshots.
|
|
func (a *Agate) StartServer(ctx context.Context, address string) error {
|
|
_, err := grpc.RunServer(ctx, a.manager, address)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to start server: %w", err)
|
|
}
|
|
|
|
// We don't store the server reference because we don't have a way to stop it yet
|
|
// In a future version, we could add a StopServer method
|
|
|
|
return nil
|
|
}
|
|
|
|
// ConnectRemote connects to a remote snapshot server.
|
|
// Returns a client that can be used to interact with the remote server.
|
|
func (a *Agate) ConnectRemote(address string) (*grpc.SnapshotClient, error) {
|
|
client, err := grpc.ConnectToServer(address)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to connect to remote server: %w", err)
|
|
}
|
|
|
|
return client, nil
|
|
}
|
|
|
|
// GetRemoteSnapshotList retrieves a list of snapshots from a remote server.
|
|
func (a *Agate) GetRemoteSnapshotList(ctx context.Context, address string) ([]store.SnapshotInfo, error) {
|
|
client, err := a.ConnectRemote(address)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer client.Close()
|
|
|
|
return client.ListSnapshots(ctx)
|
|
}
|
|
|
|
// RegisterLocalSnapshot registers a local snapshot from an archive path with a specified UUID.
|
|
// The archive must be a valid ZIP file containing the snapshot files.
|
|
// If the UUID already exists, an error will be returned.
|
|
func (a *Agate) RegisterLocalSnapshot(ctx context.Context, archivePath string, snapshotID string, name string) error {
|
|
a.mutex.Lock()
|
|
defer a.mutex.Unlock()
|
|
|
|
a.options.Logger.Printf("Registering local snapshot from archive %s with ID %s", archivePath, snapshotID)
|
|
|
|
// Check if the archive file exists
|
|
if _, err := os.Stat(archivePath); os.IsNotExist(err) {
|
|
return fmt.Errorf("archive file does not exist: %w", err)
|
|
}
|
|
|
|
// Check if a snapshot with this ID already exists
|
|
_, err := a.options.MetadataStore.GetSnapshotMetadata(ctx, snapshotID)
|
|
if err == nil {
|
|
return fmt.Errorf("snapshot with ID %s already exists", snapshotID)
|
|
} else if !errors.Is(err, store.ErrNotFound) {
|
|
return fmt.Errorf("failed to check if snapshot exists: %w", err)
|
|
}
|
|
|
|
// Create a temporary directory for extracting the archive
|
|
tempDir := filepath.Join(a.options.WorkDir, "temp_extract", snapshotID)
|
|
if err := os.MkdirAll(tempDir, 0755); err != nil {
|
|
return fmt.Errorf("failed to create temporary directory: %w", err)
|
|
}
|
|
defer os.RemoveAll(tempDir) // Clean up when done
|
|
|
|
// Extract the archive to the temporary directory to analyze its contents
|
|
if err := extractArchive(archivePath, tempDir); err != nil {
|
|
return fmt.Errorf("failed to extract archive: %w", err)
|
|
}
|
|
|
|
// Get the list of files in the archive
|
|
var files []store.FileInfo
|
|
err = filepath.Walk(tempDir, func(path string, info os.FileInfo, err error) error {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Skip the root directory
|
|
if path == tempDir {
|
|
return nil
|
|
}
|
|
|
|
// Get the relative path
|
|
relPath, err := filepath.Rel(tempDir, path)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get relative path: %w", err)
|
|
}
|
|
|
|
// Calculate SHA256 for files (not directories)
|
|
var sha256 string
|
|
if !info.IsDir() {
|
|
// Calculate the hash directly from the file path
|
|
sha256, err = hash.CalculateFileHash(path)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to calculate file hash: %w", err)
|
|
}
|
|
}
|
|
|
|
// Add file info to the list
|
|
files = append(files, store.FileInfo{
|
|
Path: relPath,
|
|
Size: info.Size(),
|
|
IsDir: info.IsDir(),
|
|
SHA256: sha256,
|
|
})
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to analyze archive contents: %w", err)
|
|
}
|
|
|
|
// Copy the archive to the blob store
|
|
archiveFile, err := os.Open(archivePath)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to open archive file: %w", err)
|
|
}
|
|
defer archiveFile.Close()
|
|
|
|
// Store the blob with the specified snapshot ID
|
|
_, err = a.options.BlobStore.StoreBlob(ctx, snapshotID, archiveFile)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to store blob: %w", err)
|
|
}
|
|
|
|
// Create and save the snapshot metadata
|
|
snapshot := store.Snapshot{
|
|
ID: snapshotID,
|
|
Name: name,
|
|
ParentID: "",
|
|
CreationTime: time.Now(),
|
|
Files: files,
|
|
}
|
|
|
|
err = a.options.MetadataStore.SaveSnapshotMetadata(ctx, snapshot)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to save snapshot metadata: %w", err)
|
|
}
|
|
|
|
a.options.Logger.Printf("Successfully registered local snapshot with ID %s", snapshotID)
|
|
return nil
|
|
}
|
|
|
|
// GetRemoteSnapshotMetadata downloads only the metadata of a snapshot from a remote server.
|
|
// If address is empty, it will try to restore the metadata from the local blob.
|
|
func (a *Agate) GetRemoteSnapshotMetadata(ctx context.Context, address string, snapshotID string) error {
|
|
a.mutex.Lock()
|
|
defer a.mutex.Unlock()
|
|
|
|
// Check if the snapshot already exists locally
|
|
_, err := a.options.MetadataStore.GetSnapshotMetadata(ctx, snapshotID)
|
|
if err == nil {
|
|
a.options.Logger.Printf("Snapshot %s already exists locally", snapshotID)
|
|
return nil
|
|
} else if !errors.Is(err, store.ErrNotFound) {
|
|
return fmt.Errorf("failed to check if snapshot exists: %w", err)
|
|
}
|
|
|
|
// If address is provided, download metadata from remote server
|
|
if address != "" {
|
|
a.options.Logger.Printf("Downloading metadata for snapshot %s from %s", snapshotID, address)
|
|
|
|
client, err := a.ConnectRemote(address)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to connect to remote server: %w", err)
|
|
}
|
|
defer client.Close()
|
|
|
|
// Get the remote snapshot details
|
|
remoteSnapshot, err := client.FetchSnapshotDetails(ctx, snapshotID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get snapshot details: %w", err)
|
|
}
|
|
|
|
// Save the remote snapshot metadata
|
|
err = a.options.MetadataStore.SaveSnapshotMetadata(ctx, *remoteSnapshot)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to save snapshot metadata: %w", err)
|
|
}
|
|
|
|
a.options.Logger.Printf("Successfully downloaded metadata for snapshot %s", snapshotID)
|
|
return nil
|
|
}
|
|
|
|
// If no address is provided, try to restore metadata from the local blob
|
|
a.options.Logger.Printf("Trying to restore metadata for snapshot %s from local blob", snapshotID)
|
|
|
|
// Check if the blob exists
|
|
blobPath, err := a.options.BlobStore.GetBlobPath(ctx, snapshotID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get blob path: %w", err)
|
|
}
|
|
|
|
if _, err := os.Stat(blobPath); os.IsNotExist(err) {
|
|
return fmt.Errorf("blob for snapshot %s does not exist", snapshotID)
|
|
}
|
|
|
|
// Create a temporary directory for extracting the archive
|
|
tempDir := filepath.Join(a.options.WorkDir, "temp_extract", snapshotID)
|
|
if err := os.MkdirAll(tempDir, 0755); err != nil {
|
|
return fmt.Errorf("failed to create temporary directory: %w", err)
|
|
}
|
|
defer os.RemoveAll(tempDir) // Clean up when done
|
|
|
|
// Extract the archive to the temporary directory to analyze its contents
|
|
if err := extractArchive(blobPath, tempDir); err != nil {
|
|
return fmt.Errorf("failed to extract archive: %w", err)
|
|
}
|
|
|
|
// Get the list of files in the archive
|
|
var files []store.FileInfo
|
|
err = filepath.Walk(tempDir, func(path string, info os.FileInfo, err error) error {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Skip the root directory
|
|
if path == tempDir {
|
|
return nil
|
|
}
|
|
|
|
// Get the relative path
|
|
relPath, err := filepath.Rel(tempDir, path)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get relative path: %w", err)
|
|
}
|
|
|
|
// Calculate SHA256 for files (not directories)
|
|
var sha256 string
|
|
if !info.IsDir() {
|
|
// Calculate the hash directly from the file path
|
|
sha256, err = hash.CalculateFileHash(path)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to calculate file hash: %w", err)
|
|
}
|
|
}
|
|
|
|
// Add file info to the list
|
|
files = append(files, store.FileInfo{
|
|
Path: relPath,
|
|
Size: info.Size(),
|
|
IsDir: info.IsDir(),
|
|
SHA256: sha256,
|
|
})
|
|
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to analyze archive contents: %w", err)
|
|
}
|
|
|
|
// Create and save the snapshot metadata
|
|
snapshot := store.Snapshot{
|
|
ID: snapshotID,
|
|
Name: snapshotID, // Use the ID as the name since we don't have a better name
|
|
ParentID: "",
|
|
CreationTime: time.Now(),
|
|
Files: files,
|
|
}
|
|
|
|
err = a.options.MetadataStore.SaveSnapshotMetadata(ctx, snapshot)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to save snapshot metadata: %w", err)
|
|
}
|
|
|
|
a.options.Logger.Printf("Successfully restored metadata for snapshot %s from local blob", snapshotID)
|
|
return nil
|
|
}
|
|
|
|
// GetRemoteSnapshot downloads a snapshot from a remote server.
|
|
// If localParentID is provided, it will be used to optimize the download by skipping files that already exist locally.
|
|
func (a *Agate) GetRemoteSnapshot(ctx context.Context, address string, snapshotID string, localParentID string) error {
|
|
client, err := a.ConnectRemote(address)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer client.Close()
|
|
|
|
// Get the remote snapshot details
|
|
remoteSnapshot, err := client.FetchSnapshotDetails(ctx, snapshotID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get snapshot details: %w", err)
|
|
}
|
|
|
|
// Create a temporary directory for downloading files
|
|
tempDownloadDir := filepath.Join(a.options.WorkDir, "temp_download", snapshotID)
|
|
if err := os.MkdirAll(tempDownloadDir, 0755); err != nil {
|
|
return fmt.Errorf("failed to create temporary download directory: %w", err)
|
|
}
|
|
defer os.RemoveAll(tempDownloadDir) // Clean up when done
|
|
|
|
a.options.Logger.Printf("Downloading snapshot %s from %s", snapshotID, address)
|
|
|
|
// If localParentID is provided, try to reuse files from the local parent snapshot
|
|
if localParentID != "" {
|
|
a.options.Logger.Printf("Using local parent snapshot %s for incremental download", localParentID)
|
|
|
|
// Get the local parent snapshot details
|
|
localParent, err := a.GetSnapshotDetails(ctx, localParentID)
|
|
if err != nil {
|
|
a.options.Logger.Printf("Warning: Failed to get local parent snapshot details: %v", err)
|
|
} else {
|
|
// Extract the local parent snapshot to a temporary directory
|
|
localParentDir := filepath.Join(a.options.WorkDir, "temp_download", localParentID)
|
|
if err := os.MkdirAll(localParentDir, 0755); err != nil {
|
|
a.options.Logger.Printf("Warning: Failed to create temporary directory for local parent: %v", err)
|
|
} else {
|
|
defer os.RemoveAll(localParentDir) // Clean up when done
|
|
|
|
if err := a.manager.ExtractSnapshot(ctx, localParentID, localParentDir, true); err != nil {
|
|
a.options.Logger.Printf("Warning: Failed to extract local parent snapshot: %v", err)
|
|
} else {
|
|
// Copy unchanged files from the local parent to the download directory
|
|
for _, file := range remoteSnapshot.Files {
|
|
// Skip directories, they'll be created as needed
|
|
if file.IsDir {
|
|
continue
|
|
}
|
|
|
|
// Check if the file exists in the local parent with the same hash
|
|
var localFile *store.FileInfo
|
|
for _, lf := range localParent.Files {
|
|
if lf.Path == file.Path && lf.SHA256 == file.SHA256 {
|
|
localFile = &lf
|
|
break
|
|
}
|
|
}
|
|
|
|
if localFile != nil {
|
|
// File exists in local parent with the same hash, copy it
|
|
srcPath := filepath.Join(localParentDir, localFile.Path)
|
|
dstPath := filepath.Join(tempDownloadDir, file.Path)
|
|
|
|
// Ensure the destination directory exists
|
|
if err := os.MkdirAll(filepath.Dir(dstPath), 0755); err != nil {
|
|
a.options.Logger.Printf("Failed to create directory for %s: %v", dstPath, err)
|
|
continue
|
|
}
|
|
|
|
// Copy the file
|
|
srcFile, err := os.Open(srcPath)
|
|
if err != nil {
|
|
a.options.Logger.Printf("Failed to copy file %s, will download instead: %v", file.Path, err)
|
|
continue
|
|
}
|
|
defer srcFile.Close()
|
|
|
|
dstFile, err := os.Create(dstPath)
|
|
if err != nil {
|
|
a.options.Logger.Printf("Failed to create destination file %s: %v", dstPath, err)
|
|
continue
|
|
}
|
|
|
|
_, err = io.Copy(dstFile, srcFile)
|
|
dstFile.Close()
|
|
|
|
if err != nil {
|
|
a.options.Logger.Printf("Failed to copy file data for %s: %v", file.Path, err)
|
|
// If copy fails, the file will be downloaded
|
|
} else {
|
|
a.options.Logger.Printf("Reusing file %s from local parent", file.Path)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Download the snapshot files
|
|
a.options.Logger.Printf("Downloading files for snapshot %s", snapshotID)
|
|
|
|
// Get snapshot details to know what files we need to download
|
|
remoteDetails, err := client.FetchSnapshotDetails(ctx, snapshotID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get remote snapshot details: %w", err)
|
|
}
|
|
|
|
// Check which files we already have and which we need to download
|
|
for _, file := range remoteDetails.Files {
|
|
if file.IsDir {
|
|
continue // Skip directories
|
|
}
|
|
|
|
filePath := filepath.Join(tempDownloadDir, file.Path)
|
|
if _, err := os.Stat(filePath); os.IsNotExist(err) {
|
|
// File doesn't exist yet, we'll need to download it
|
|
a.options.Logger.Printf("Downloading file %s", file.Path)
|
|
}
|
|
}
|
|
|
|
if err := client.DownloadSnapshot(ctx, snapshotID, tempDownloadDir, localParentID); err != nil {
|
|
return fmt.Errorf("failed to download snapshot: %w", err)
|
|
}
|
|
|
|
a.options.Logger.Printf("Creating archive from downloaded files")
|
|
|
|
// Create a zip archive from the downloaded files
|
|
zipPath := filepath.Join(a.options.WorkDir, "temp_download", snapshotID+".zip")
|
|
if err := archive.CreateArchive(tempDownloadDir, zipPath); err != nil {
|
|
return fmt.Errorf("failed to create zip archive: %w", err)
|
|
}
|
|
|
|
// Store the blob with the remote snapshot ID
|
|
zipFile, err := os.Open(zipPath)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to open zip file: %w", err)
|
|
}
|
|
defer zipFile.Close()
|
|
defer os.Remove(zipPath) // Clean up the zip file when done
|
|
|
|
a.options.Logger.Printf("Storing blob with ID %s", remoteSnapshot.ID)
|
|
|
|
// Store the blob with the remote snapshot ID
|
|
_, err = a.options.BlobStore.StoreBlob(ctx, remoteSnapshot.ID, zipFile)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to store blob: %w", err)
|
|
}
|
|
|
|
a.options.Logger.Printf("Saving snapshot metadata")
|
|
|
|
// Save the remote snapshot metadata
|
|
err = a.options.MetadataStore.SaveSnapshotMetadata(ctx, *remoteSnapshot)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to save snapshot metadata: %w", err)
|
|
}
|
|
|
|
a.options.Logger.Printf("Successfully imported remote snapshot %s", snapshotID)
|
|
return nil
|
|
}
|