Files
Agate/api.go

545 lines
18 KiB
Go

package agate
import (
"context"
"errors"
"fmt"
"gitea.unprism.ru/KRBL/Agate/archive"
"gitea.unprism.ru/KRBL/Agate/grpc"
"gitea.unprism.ru/KRBL/Agate/interfaces"
"io"
"log"
"os"
"path/filepath"
"sync"
"gitea.unprism.ru/KRBL/Agate/store"
"gitea.unprism.ru/KRBL/Agate/stores"
)
// AgateOptions defines configuration options for the Agate library.
type AgateOptions struct {
// WorkDir is the directory where snapshots will be stored and managed.
WorkDir string
// OpenFunc is called after a snapshot is restored to initialize resources.
// The parameter is the directory where the snapshot was extracted.
OpenFunc func(dir string) error
// CloseFunc is called before a snapshot is created or restored to clean up resources.
CloseFunc func() error
// MetadataStore is the implementation of the metadata store to use.
// If nil, a default SQLite-based metadata store will be created automatically.
// Use the stores package to initialize a custom implementation:
// metadataStore, err := stores.NewDefaultMetadataStore(metadataDir)
MetadataStore store.MetadataStore
// BlobStore is the implementation of the blob store to use.
// If nil, a default filesystem-based blob store will be created automatically.
// Use the stores package to initialize a custom implementation:
// blobStore, err := stores.NewDefaultBlobStore(blobsDir)
BlobStore store.BlobStore
// CleanOnRestore specifies whether the target directory should be cleaned before restoring a snapshot.
CleanOnRestore bool
// Logger is the logger to use for output. If nil, logging is disabled.
Logger *log.Logger
}
// Agate is the main entry point for the snapshot library.
type Agate struct {
mutex sync.Mutex
manager interfaces.SnapshotManager
options AgateOptions
metadataDir string
blobsDir string
currentSnapshotID string
currentIDFile string
}
// New initializes a new instance of the Agate library with the given options.
func New(options AgateOptions) (*Agate, error) {
if options.WorkDir == "" {
return nil, errors.New("work directory cannot be empty")
}
// Initialize logger if not provided
if options.Logger == nil {
options.Logger = log.New(io.Discard, "", 0)
}
// Create the work directory if it doesn't exist
if err := os.MkdirAll(options.WorkDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create work directory: %w", err)
}
// Create subdirectories for metadata and blobs
metadataDir := filepath.Join(options.WorkDir, "metadata")
blobsDir := filepath.Join(options.WorkDir, "blobs")
if err := os.MkdirAll(metadataDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create metadata directory: %w", err)
}
if err := os.MkdirAll(blobsDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create blobs directory: %w", err)
}
var metadataStore store.MetadataStore
var blobStore store.BlobStore
var err error
// Use provided stores or initialize default ones
if options.MetadataStore != nil && options.BlobStore != nil {
// Use the provided stores
metadataStore = options.MetadataStore
blobStore = options.BlobStore
} else if options.MetadataStore == nil && options.BlobStore == nil {
// Initialize both stores with default implementations
metadataStore, blobStore, err = stores.InitDefaultStores(options.WorkDir)
if err != nil {
return nil, fmt.Errorf("failed to initialize default stores: %w", err)
}
// Update options with the created stores
options.MetadataStore = metadataStore
options.BlobStore = blobStore
} else if options.MetadataStore == nil {
// Initialize only the metadata store
metadataStore, err = stores.NewDefaultMetadataStore(metadataDir)
if err != nil {
return nil, fmt.Errorf("failed to initialize default metadata store: %w", err)
}
blobStore = options.BlobStore
// Update options with the created metadata store
options.MetadataStore = metadataStore
} else {
// Initialize only the blob store
blobStore, err = stores.NewDefaultBlobStore(blobsDir)
if err != nil {
return nil, fmt.Errorf("failed to initialize default blob store: %w", err)
}
metadataStore = options.MetadataStore
// Update options with the created blob store
options.BlobStore = blobStore
}
// Create the snapshot manager
manager, err := CreateSnapshotManager(metadataStore, blobStore, options.Logger)
if err != nil {
return nil, fmt.Errorf("failed to create snapshot manager: %w", err)
}
// Create a file path for storing the current snapshot ID
currentIDFile := filepath.Join(options.WorkDir, "current_snapshot_id")
agate := &Agate{
manager: manager,
options: options,
metadataDir: metadataDir,
blobsDir: blobsDir,
currentIDFile: currentIDFile,
}
// Load the current snapshot ID if it exists
if _, err := os.Stat(currentIDFile); err == nil {
data, err := os.ReadFile(currentIDFile)
if err == nil && len(data) > 0 {
agate.currentSnapshotID = string(data)
}
}
// Call OpenFunc if provided to initialize resources in the active directory
if options.OpenFunc != nil {
if err := options.OpenFunc(blobStore.GetActiveDir()); err != nil {
return nil, fmt.Errorf("failed to open resources during initialization: %w", err)
}
}
return agate, nil
}
func (a *Agate) GetActiveDir() string {
return a.options.BlobStore.GetActiveDir()
}
func (a *Agate) GetMetadataDir() string {
return a.metadataDir
}
func (a *Agate) GetBlobsDir() string {
return a.blobsDir
}
// SaveSnapshot creates a new snapshot from the current state of the active directory.
// If parentID is provided, it will be set as the parent of the new snapshot.
// If parentID is empty, it will use the ID of the snapshot currently loaded in the active directory.
// Returns the ID of the created snapshot.
func (a *Agate) SaveSnapshot(ctx context.Context, name string, parentID string) (string, error) {
a.mutex.Lock()
defer a.mutex.Unlock()
a.options.Logger.Printf("Creating new snapshot with name: %s", name)
// Call CloseFunc if provided
if a.options.CloseFunc != nil {
if err := a.options.CloseFunc(); err != nil {
return "", fmt.Errorf("failed to close resources before snapshot: %w", err)
}
}
defer func() {
if a.options.OpenFunc != nil {
if err := a.options.OpenFunc(a.options.BlobStore.GetActiveDir()); err != nil {
a.options.Logger.Printf("ERROR: failed to open resources after snapshot creation: %v", err)
}
}
}()
// If parentID is not provided, use the current snapshot ID
if parentID == "" {
parentID = a.currentSnapshotID
}
effectiveParentID := parentID
// Create the snapshot
snapshot, err := a.manager.CreateSnapshot(ctx, a.options.BlobStore.GetActiveDir(), name, effectiveParentID)
if err != nil {
a.options.Logger.Printf("ERROR: failed to create snapshot: %v", err)
return "", fmt.Errorf("failed to create snapshot: %w", err)
}
a.options.Logger.Printf("Successfully created snapshot with ID: %s", snapshot.ID)
// Update the current snapshot ID to the newly created snapshot
a.currentSnapshotID = snapshot.ID
// Save the current snapshot ID to a file
if err := a.saveCurrentSnapshotID(); err != nil {
return "", fmt.Errorf("failed to save current snapshot ID: %w", err)
}
return snapshot.ID, nil
}
// RestoreSnapshot extracts a snapshot to the active directory.
func (a *Agate) RestoreSnapshot(ctx context.Context, snapshotID string) error {
a.mutex.Lock()
defer a.mutex.Unlock()
a.options.Logger.Printf("Restoring snapshot with ID: %s", snapshotID)
// Call CloseFunc if provided
if a.options.CloseFunc != nil {
if err := a.options.CloseFunc(); err != nil {
return fmt.Errorf("failed to close resources before restore: %w", err)
}
}
// Extract the snapshot
if err := a.manager.ExtractSnapshot(ctx, snapshotID, a.options.BlobStore.GetActiveDir(), a.options.CleanOnRestore); err != nil {
a.options.Logger.Printf("ERROR: failed to extract snapshot: %v", err)
return fmt.Errorf("failed to extract snapshot: %w", err)
}
a.options.Logger.Printf("Successfully restored snapshot with ID: %s", snapshotID)
// Save the ID of the snapshot that was restored
a.currentSnapshotID = snapshotID
// Save the current snapshot ID to a file
if err := a.saveCurrentSnapshotID(); err != nil {
return fmt.Errorf("failed to save current snapshot ID: %w", err)
}
// Call OpenFunc if provided
if a.options.OpenFunc != nil {
if err := a.options.OpenFunc(a.options.BlobStore.GetActiveDir()); err != nil {
a.options.Logger.Printf("ERROR: failed to open resources after restore: %v", err)
return fmt.Errorf("failed to open resources after restore: %w", err)
}
}
return nil
}
// RestoreSnapshot extracts a snapshot to the directory.
func (a *Agate) RestoreSnapshotToDir(ctx context.Context, snapshotID string, dir string) error {
a.mutex.Lock()
defer a.mutex.Unlock()
// Call CloseFunc if provided
if a.options.CloseFunc != nil {
if err := a.options.CloseFunc(); err != nil {
return fmt.Errorf("failed to close resources before restore: %w", err)
}
}
defer func() {
if a.options.OpenFunc != nil {
if err := a.options.OpenFunc(dir); err != nil {
a.options.Logger.Printf("ERROR: failed to open resources after snapshot restore: %v", err)
}
}
}()
// Extract the snapshot
if err := a.manager.ExtractSnapshot(ctx, snapshotID, dir, a.options.CleanOnRestore); err != nil {
return fmt.Errorf("failed to extract snapshot: %w", err)
}
// If restoring to the active directory, save the snapshot ID
if dir == a.options.BlobStore.GetActiveDir() {
a.currentSnapshotID = snapshotID
// Save the current snapshot ID to a file
if err := a.saveCurrentSnapshotID(); err != nil {
return fmt.Errorf("failed to save current snapshot ID: %w", err)
}
}
return nil
}
// ListSnapshots returns a list of all available snapshots.
func (a *Agate) ListSnapshots(ctx context.Context) ([]store.SnapshotInfo, error) {
// Create empty ListOptions since we don't have filtering/pagination in this API yet
opts := store.ListOptions{}
return a.manager.ListSnapshots(ctx, opts)
}
// GetSnapshotDetails returns detailed information about a specific snapshot.
func (a *Agate) GetSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error) {
return a.manager.GetSnapshotDetails(ctx, snapshotID)
}
// DeleteSnapshot removes a snapshot.
func (a *Agate) DeleteSnapshot(ctx context.Context, snapshotID string) error {
return a.manager.DeleteSnapshot(ctx, snapshotID)
}
// saveCurrentSnapshotID saves the current snapshot ID to a file in the WorkDir
func (a *Agate) saveCurrentSnapshotID() error {
if a.currentSnapshotID == "" {
// If there's no current snapshot ID, remove the file if it exists
if _, err := os.Stat(a.currentIDFile); err == nil {
return os.Remove(a.currentIDFile)
}
return nil
}
// Write the current snapshot ID to the file
return os.WriteFile(a.currentIDFile, []byte(a.currentSnapshotID), 0644)
}
func (a *Agate) Open() error {
return a.options.OpenFunc(a.GetActiveDir())
}
// Close releases all resources used by the Agate instance.
func (a *Agate) Close() error {
if a.options.CloseFunc != nil {
return a.options.CloseFunc()
}
return nil
}
// StartServer starts a gRPC server to share snapshots.
func (a *Agate) StartServer(ctx context.Context, address string) error {
_, err := grpc.RunServer(ctx, a.manager, address)
if err != nil {
return fmt.Errorf("failed to start server: %w", err)
}
// We don't store the server reference because we don't have a way to stop it yet
// In a future version, we could add a StopServer method
return nil
}
// ConnectRemote connects to a remote snapshot server.
// Returns a client that can be used to interact with the remote server.
func (a *Agate) ConnectRemote(address string) (*grpc.SnapshotClient, error) {
client, err := grpc.ConnectToServer(address)
if err != nil {
return nil, fmt.Errorf("failed to connect to remote server: %w", err)
}
return client, nil
}
// GetRemoteSnapshotList retrieves a list of snapshots from a remote server.
func (a *Agate) GetRemoteSnapshotList(ctx context.Context, address string) ([]store.SnapshotInfo, error) {
client, err := a.ConnectRemote(address)
if err != nil {
return nil, err
}
defer client.Close()
return client.ListSnapshots(ctx)
}
// GetRemoteSnapshot downloads a snapshot from a remote server.
// If localParentID is provided, it will be used to optimize the download by skipping files that already exist locally.
func (a *Agate) GetRemoteSnapshot(ctx context.Context, address string, snapshotID string, localParentID string) error {
client, err := a.ConnectRemote(address)
if err != nil {
return err
}
defer client.Close()
// Get the remote snapshot details
remoteSnapshot, err := client.FetchSnapshotDetails(ctx, snapshotID)
if err != nil {
return fmt.Errorf("failed to get snapshot details: %w", err)
}
// Create a temporary directory for downloading files
tempDownloadDir := filepath.Join(a.options.WorkDir, "temp_download", snapshotID)
if err := os.MkdirAll(tempDownloadDir, 0755); err != nil {
return fmt.Errorf("failed to create temporary download directory: %w", err)
}
defer os.RemoveAll(tempDownloadDir) // Clean up when done
a.options.Logger.Printf("Downloading snapshot %s from %s", snapshotID, address)
// If localParentID is provided, try to reuse files from the local parent snapshot
if localParentID != "" {
a.options.Logger.Printf("Using local parent snapshot %s for incremental download", localParentID)
// Get the local parent snapshot details
localParent, err := a.GetSnapshotDetails(ctx, localParentID)
if err != nil {
a.options.Logger.Printf("Warning: Failed to get local parent snapshot details: %v", err)
} else {
// Extract the local parent snapshot to a temporary directory
localParentDir := filepath.Join(a.options.WorkDir, "temp_download", localParentID)
if err := os.MkdirAll(localParentDir, 0755); err != nil {
a.options.Logger.Printf("Warning: Failed to create temporary directory for local parent: %v", err)
} else {
defer os.RemoveAll(localParentDir) // Clean up when done
if err := a.manager.ExtractSnapshot(ctx, localParentID, localParentDir, true); err != nil {
a.options.Logger.Printf("Warning: Failed to extract local parent snapshot: %v", err)
} else {
// Copy unchanged files from the local parent to the download directory
for _, file := range remoteSnapshot.Files {
// Skip directories, they'll be created as needed
if file.IsDir {
continue
}
// Check if the file exists in the local parent with the same hash
var localFile *store.FileInfo
for _, lf := range localParent.Files {
if lf.Path == file.Path && lf.SHA256 == file.SHA256 {
localFile = &lf
break
}
}
if localFile != nil {
// File exists in local parent with the same hash, copy it
srcPath := filepath.Join(localParentDir, localFile.Path)
dstPath := filepath.Join(tempDownloadDir, file.Path)
// Ensure the destination directory exists
if err := os.MkdirAll(filepath.Dir(dstPath), 0755); err != nil {
a.options.Logger.Printf("Failed to create directory for %s: %v", dstPath, err)
continue
}
// Copy the file
srcFile, err := os.Open(srcPath)
if err != nil {
a.options.Logger.Printf("Failed to copy file %s, will download instead: %v", file.Path, err)
continue
}
defer srcFile.Close()
dstFile, err := os.Create(dstPath)
if err != nil {
a.options.Logger.Printf("Failed to create destination file %s: %v", dstPath, err)
continue
}
_, err = io.Copy(dstFile, srcFile)
dstFile.Close()
if err != nil {
a.options.Logger.Printf("Failed to copy file data for %s: %v", file.Path, err)
// If copy fails, the file will be downloaded
} else {
a.options.Logger.Printf("Reusing file %s from local parent", file.Path)
}
}
}
}
}
}
}
// Download the snapshot files
a.options.Logger.Printf("Downloading files for snapshot %s", snapshotID)
// Get snapshot details to know what files we need to download
remoteDetails, err := client.FetchSnapshotDetails(ctx, snapshotID)
if err != nil {
return fmt.Errorf("failed to get remote snapshot details: %w", err)
}
// Check which files we already have and which we need to download
for _, file := range remoteDetails.Files {
if file.IsDir {
continue // Skip directories
}
filePath := filepath.Join(tempDownloadDir, file.Path)
if _, err := os.Stat(filePath); os.IsNotExist(err) {
// File doesn't exist yet, we'll need to download it
a.options.Logger.Printf("Downloading file %s", file.Path)
}
}
if err := client.DownloadSnapshot(ctx, snapshotID, tempDownloadDir, localParentID); err != nil {
return fmt.Errorf("failed to download snapshot: %w", err)
}
a.options.Logger.Printf("Creating archive from downloaded files")
// Create a zip archive from the downloaded files
zipPath := filepath.Join(a.options.WorkDir, "temp_download", snapshotID+".zip")
if err := archive.CreateArchive(tempDownloadDir, zipPath); err != nil {
return fmt.Errorf("failed to create zip archive: %w", err)
}
// Store the blob with the remote snapshot ID
zipFile, err := os.Open(zipPath)
if err != nil {
return fmt.Errorf("failed to open zip file: %w", err)
}
defer zipFile.Close()
defer os.Remove(zipPath) // Clean up the zip file when done
a.options.Logger.Printf("Storing blob with ID %s", remoteSnapshot.ID)
// Store the blob with the remote snapshot ID
_, err = a.options.BlobStore.StoreBlob(ctx, remoteSnapshot.ID, zipFile)
if err != nil {
return fmt.Errorf("failed to store blob: %w", err)
}
a.options.Logger.Printf("Saving snapshot metadata")
// Save the remote snapshot metadata
err = a.options.MetadataStore.SaveSnapshotMetadata(ctx, *remoteSnapshot)
if err != nil {
return fmt.Errorf("failed to save snapshot metadata: %w", err)
}
a.options.Logger.Printf("Successfully imported remote snapshot %s", snapshotID)
return nil
}