Refactor snapshot management: integrate logging, enhance concurrency with mutex, add clean extraction option, and update gRPC ListSnapshots with ListOptions.
This commit is contained in:
220
api.go
220
api.go
@ -4,9 +4,14 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"gitea.unprism.ru/KRBL/Agate/archive"
|
||||
"gitea.unprism.ru/KRBL/Agate/grpc"
|
||||
"gitea.unprism.ru/KRBL/Agate/interfaces"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"gitea.unprism.ru/KRBL/Agate/store"
|
||||
"gitea.unprism.ru/KRBL/Agate/stores"
|
||||
@ -35,11 +40,18 @@ type AgateOptions struct {
|
||||
// Use the stores package to initialize a custom implementation:
|
||||
// blobStore, err := stores.NewDefaultBlobStore(blobsDir)
|
||||
BlobStore store.BlobStore
|
||||
|
||||
// CleanOnRestore specifies whether the target directory should be cleaned before restoring a snapshot.
|
||||
CleanOnRestore bool
|
||||
|
||||
// Logger is the logger to use for output. If nil, logging is disabled.
|
||||
Logger *log.Logger
|
||||
}
|
||||
|
||||
// Agate is the main entry point for the snapshot library.
|
||||
type Agate struct {
|
||||
manager SnapshotManager
|
||||
mutex sync.Mutex
|
||||
manager interfaces.SnapshotManager
|
||||
options AgateOptions
|
||||
metadataDir string
|
||||
blobsDir string
|
||||
@ -53,6 +65,11 @@ func New(options AgateOptions) (*Agate, error) {
|
||||
return nil, errors.New("work directory cannot be empty")
|
||||
}
|
||||
|
||||
// Initialize logger if not provided
|
||||
if options.Logger == nil {
|
||||
options.Logger = log.New(io.Discard, "", 0)
|
||||
}
|
||||
|
||||
// Create the work directory if it doesn't exist
|
||||
if err := os.MkdirAll(options.WorkDir, 0755); err != nil {
|
||||
return nil, fmt.Errorf("failed to create work directory: %w", err)
|
||||
@ -109,7 +126,7 @@ func New(options AgateOptions) (*Agate, error) {
|
||||
}
|
||||
|
||||
// Create the snapshot manager
|
||||
manager, err := CreateSnapshotManager(metadataStore, blobStore)
|
||||
manager, err := CreateSnapshotManager(metadataStore, blobStore, options.Logger)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create snapshot manager: %w", err)
|
||||
}
|
||||
@ -160,6 +177,11 @@ func (a *Agate) GetBlobsDir() string {
|
||||
// If parentID is empty, it will use the ID of the snapshot currently loaded in the active directory.
|
||||
// Returns the ID of the created snapshot.
|
||||
func (a *Agate) SaveSnapshot(ctx context.Context, name string, parentID string) (string, error) {
|
||||
a.mutex.Lock()
|
||||
defer a.mutex.Unlock()
|
||||
|
||||
a.options.Logger.Printf("Creating new snapshot with name: %s", name)
|
||||
|
||||
// Call CloseFunc if provided
|
||||
if a.options.CloseFunc != nil {
|
||||
if err := a.options.CloseFunc(); err != nil {
|
||||
@ -170,7 +192,7 @@ func (a *Agate) SaveSnapshot(ctx context.Context, name string, parentID string)
|
||||
defer func() {
|
||||
if a.options.OpenFunc != nil {
|
||||
if err := a.options.OpenFunc(a.options.BlobStore.GetActiveDir()); err != nil {
|
||||
fmt.Printf("Failed to open resources after snapshot: %v\n", err)
|
||||
a.options.Logger.Printf("ERROR: failed to open resources after snapshot creation: %v", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
@ -185,9 +207,12 @@ func (a *Agate) SaveSnapshot(ctx context.Context, name string, parentID string)
|
||||
// Create the snapshot
|
||||
snapshot, err := a.manager.CreateSnapshot(ctx, a.options.BlobStore.GetActiveDir(), name, effectiveParentID)
|
||||
if err != nil {
|
||||
a.options.Logger.Printf("ERROR: failed to create snapshot: %v", err)
|
||||
return "", fmt.Errorf("failed to create snapshot: %w", err)
|
||||
}
|
||||
|
||||
a.options.Logger.Printf("Successfully created snapshot with ID: %s", snapshot.ID)
|
||||
|
||||
// Update the current snapshot ID to the newly created snapshot
|
||||
a.currentSnapshotID = snapshot.ID
|
||||
|
||||
@ -201,6 +226,11 @@ func (a *Agate) SaveSnapshot(ctx context.Context, name string, parentID string)
|
||||
|
||||
// RestoreSnapshot extracts a snapshot to the active directory.
|
||||
func (a *Agate) RestoreSnapshot(ctx context.Context, snapshotID string) error {
|
||||
a.mutex.Lock()
|
||||
defer a.mutex.Unlock()
|
||||
|
||||
a.options.Logger.Printf("Restoring snapshot with ID: %s", snapshotID)
|
||||
|
||||
// Call CloseFunc if provided
|
||||
if a.options.CloseFunc != nil {
|
||||
if err := a.options.CloseFunc(); err != nil {
|
||||
@ -209,10 +239,13 @@ func (a *Agate) RestoreSnapshot(ctx context.Context, snapshotID string) error {
|
||||
}
|
||||
|
||||
// Extract the snapshot
|
||||
if err := a.manager.ExtractSnapshot(ctx, snapshotID, a.options.BlobStore.GetActiveDir()); err != nil {
|
||||
if err := a.manager.ExtractSnapshot(ctx, snapshotID, a.options.BlobStore.GetActiveDir(), a.options.CleanOnRestore); err != nil {
|
||||
a.options.Logger.Printf("ERROR: failed to extract snapshot: %v", err)
|
||||
return fmt.Errorf("failed to extract snapshot: %w", err)
|
||||
}
|
||||
|
||||
a.options.Logger.Printf("Successfully restored snapshot with ID: %s", snapshotID)
|
||||
|
||||
// Save the ID of the snapshot that was restored
|
||||
a.currentSnapshotID = snapshotID
|
||||
|
||||
@ -224,6 +257,7 @@ func (a *Agate) RestoreSnapshot(ctx context.Context, snapshotID string) error {
|
||||
// Call OpenFunc if provided
|
||||
if a.options.OpenFunc != nil {
|
||||
if err := a.options.OpenFunc(a.options.BlobStore.GetActiveDir()); err != nil {
|
||||
a.options.Logger.Printf("ERROR: failed to open resources after restore: %v", err)
|
||||
return fmt.Errorf("failed to open resources after restore: %w", err)
|
||||
}
|
||||
}
|
||||
@ -233,6 +267,9 @@ func (a *Agate) RestoreSnapshot(ctx context.Context, snapshotID string) error {
|
||||
|
||||
// RestoreSnapshot extracts a snapshot to the directory.
|
||||
func (a *Agate) RestoreSnapshotToDir(ctx context.Context, snapshotID string, dir string) error {
|
||||
a.mutex.Lock()
|
||||
defer a.mutex.Unlock()
|
||||
|
||||
// Call CloseFunc if provided
|
||||
if a.options.CloseFunc != nil {
|
||||
if err := a.options.CloseFunc(); err != nil {
|
||||
@ -243,13 +280,13 @@ func (a *Agate) RestoreSnapshotToDir(ctx context.Context, snapshotID string, dir
|
||||
defer func() {
|
||||
if a.options.OpenFunc != nil {
|
||||
if err := a.options.OpenFunc(dir); err != nil {
|
||||
fmt.Printf("Failed to open resources after snapshot: %v\n", err)
|
||||
a.options.Logger.Printf("ERROR: failed to open resources after snapshot restore: %v", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Extract the snapshot
|
||||
if err := a.manager.ExtractSnapshot(ctx, snapshotID, dir); err != nil {
|
||||
if err := a.manager.ExtractSnapshot(ctx, snapshotID, dir, a.options.CleanOnRestore); err != nil {
|
||||
return fmt.Errorf("failed to extract snapshot: %w", err)
|
||||
}
|
||||
|
||||
@ -268,7 +305,9 @@ func (a *Agate) RestoreSnapshotToDir(ctx context.Context, snapshotID string, dir
|
||||
|
||||
// ListSnapshots returns a list of all available snapshots.
|
||||
func (a *Agate) ListSnapshots(ctx context.Context) ([]store.SnapshotInfo, error) {
|
||||
return a.manager.ListSnapshots(ctx)
|
||||
// Create empty ListOptions since we don't have filtering/pagination in this API yet
|
||||
opts := store.ListOptions{}
|
||||
return a.manager.ListSnapshots(ctx, opts)
|
||||
}
|
||||
|
||||
// GetSnapshotDetails returns detailed information about a specific snapshot.
|
||||
@ -301,7 +340,10 @@ func (a *Agate) Open() error {
|
||||
|
||||
// Close releases all resources used by the Agate instance.
|
||||
func (a *Agate) Close() error {
|
||||
return a.options.CloseFunc()
|
||||
if a.options.CloseFunc != nil {
|
||||
return a.options.CloseFunc()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// StartServer starts a gRPC server to share snapshots.
|
||||
@ -348,31 +390,155 @@ func (a *Agate) GetRemoteSnapshot(ctx context.Context, address string, snapshotI
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
// Create a temporary directory for the downloaded snapshot
|
||||
tempDir := filepath.Join(a.options.WorkDir, "temp", snapshotID)
|
||||
if err := os.MkdirAll(tempDir, 0755); err != nil {
|
||||
return fmt.Errorf("failed to create temporary directory: %w", err)
|
||||
}
|
||||
|
||||
// Download the snapshot
|
||||
if err := client.DownloadSnapshot(ctx, snapshotID, tempDir, localParentID); err != nil {
|
||||
return fmt.Errorf("failed to download snapshot: %w", err)
|
||||
}
|
||||
|
||||
// Get the snapshot details to create a local copy
|
||||
details, err := client.FetchSnapshotDetails(ctx, snapshotID)
|
||||
// Get the remote snapshot details
|
||||
remoteSnapshot, err := client.FetchSnapshotDetails(ctx, snapshotID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get snapshot details: %w", err)
|
||||
}
|
||||
|
||||
// Create a local snapshot from the downloaded files
|
||||
_, err = a.manager.CreateSnapshot(ctx, tempDir, details.Name, localParentID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create local snapshot: %w", err)
|
||||
// Create a temporary directory for downloading files
|
||||
tempDownloadDir := filepath.Join(a.options.WorkDir, "temp_download", snapshotID)
|
||||
if err := os.MkdirAll(tempDownloadDir, 0755); err != nil {
|
||||
return fmt.Errorf("failed to create temporary download directory: %w", err)
|
||||
}
|
||||
defer os.RemoveAll(tempDownloadDir) // Clean up when done
|
||||
|
||||
a.options.Logger.Printf("Downloading snapshot %s from %s", snapshotID, address)
|
||||
|
||||
// If localParentID is provided, try to reuse files from the local parent snapshot
|
||||
if localParentID != "" {
|
||||
a.options.Logger.Printf("Using local parent snapshot %s for incremental download", localParentID)
|
||||
|
||||
// Get the local parent snapshot details
|
||||
localParent, err := a.GetSnapshotDetails(ctx, localParentID)
|
||||
if err != nil {
|
||||
a.options.Logger.Printf("Warning: Failed to get local parent snapshot details: %v", err)
|
||||
} else {
|
||||
// Extract the local parent snapshot to a temporary directory
|
||||
localParentDir := filepath.Join(a.options.WorkDir, "temp_download", localParentID)
|
||||
if err := os.MkdirAll(localParentDir, 0755); err != nil {
|
||||
a.options.Logger.Printf("Warning: Failed to create temporary directory for local parent: %v", err)
|
||||
} else {
|
||||
defer os.RemoveAll(localParentDir) // Clean up when done
|
||||
|
||||
if err := a.manager.ExtractSnapshot(ctx, localParentID, localParentDir, true); err != nil {
|
||||
a.options.Logger.Printf("Warning: Failed to extract local parent snapshot: %v", err)
|
||||
} else {
|
||||
// Copy unchanged files from the local parent to the download directory
|
||||
for _, file := range remoteSnapshot.Files {
|
||||
// Skip directories, they'll be created as needed
|
||||
if file.IsDir {
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if the file exists in the local parent with the same hash
|
||||
var localFile *store.FileInfo
|
||||
for _, lf := range localParent.Files {
|
||||
if lf.Path == file.Path && lf.SHA256 == file.SHA256 {
|
||||
localFile = &lf
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if localFile != nil {
|
||||
// File exists in local parent with the same hash, copy it
|
||||
srcPath := filepath.Join(localParentDir, localFile.Path)
|
||||
dstPath := filepath.Join(tempDownloadDir, file.Path)
|
||||
|
||||
// Ensure the destination directory exists
|
||||
if err := os.MkdirAll(filepath.Dir(dstPath), 0755); err != nil {
|
||||
a.options.Logger.Printf("Failed to create directory for %s: %v", dstPath, err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Copy the file
|
||||
srcFile, err := os.Open(srcPath)
|
||||
if err != nil {
|
||||
a.options.Logger.Printf("Failed to copy file %s, will download instead: %v", file.Path, err)
|
||||
continue
|
||||
}
|
||||
defer srcFile.Close()
|
||||
|
||||
dstFile, err := os.Create(dstPath)
|
||||
if err != nil {
|
||||
a.options.Logger.Printf("Failed to create destination file %s: %v", dstPath, err)
|
||||
continue
|
||||
}
|
||||
|
||||
_, err = io.Copy(dstFile, srcFile)
|
||||
dstFile.Close()
|
||||
|
||||
if err != nil {
|
||||
a.options.Logger.Printf("Failed to copy file data for %s: %v", file.Path, err)
|
||||
// If copy fails, the file will be downloaded
|
||||
} else {
|
||||
a.options.Logger.Printf("Reusing file %s from local parent", file.Path)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up the temporary directory
|
||||
os.RemoveAll(tempDir)
|
||||
// Download the snapshot files
|
||||
a.options.Logger.Printf("Downloading files for snapshot %s", snapshotID)
|
||||
|
||||
// Get snapshot details to know what files we need to download
|
||||
remoteDetails, err := client.FetchSnapshotDetails(ctx, snapshotID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get remote snapshot details: %w", err)
|
||||
}
|
||||
|
||||
// Check which files we already have and which we need to download
|
||||
for _, file := range remoteDetails.Files {
|
||||
if file.IsDir {
|
||||
continue // Skip directories
|
||||
}
|
||||
|
||||
filePath := filepath.Join(tempDownloadDir, file.Path)
|
||||
if _, err := os.Stat(filePath); os.IsNotExist(err) {
|
||||
// File doesn't exist yet, we'll need to download it
|
||||
a.options.Logger.Printf("Downloading file %s", file.Path)
|
||||
}
|
||||
}
|
||||
|
||||
if err := client.DownloadSnapshot(ctx, snapshotID, tempDownloadDir, localParentID); err != nil {
|
||||
return fmt.Errorf("failed to download snapshot: %w", err)
|
||||
}
|
||||
|
||||
a.options.Logger.Printf("Creating archive from downloaded files")
|
||||
|
||||
// Create a zip archive from the downloaded files
|
||||
zipPath := filepath.Join(a.options.WorkDir, "temp_download", snapshotID+".zip")
|
||||
if err := archive.CreateArchive(tempDownloadDir, zipPath); err != nil {
|
||||
return fmt.Errorf("failed to create zip archive: %w", err)
|
||||
}
|
||||
|
||||
// Store the blob with the remote snapshot ID
|
||||
zipFile, err := os.Open(zipPath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open zip file: %w", err)
|
||||
}
|
||||
defer zipFile.Close()
|
||||
defer os.Remove(zipPath) // Clean up the zip file when done
|
||||
|
||||
a.options.Logger.Printf("Storing blob with ID %s", remoteSnapshot.ID)
|
||||
|
||||
// Store the blob with the remote snapshot ID
|
||||
_, err = a.options.BlobStore.StoreBlob(ctx, remoteSnapshot.ID, zipFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to store blob: %w", err)
|
||||
}
|
||||
|
||||
a.options.Logger.Printf("Saving snapshot metadata")
|
||||
|
||||
// Save the remote snapshot metadata
|
||||
err = a.options.MetadataStore.SaveSnapshotMetadata(ctx, *remoteSnapshot)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to save snapshot metadata: %w", err)
|
||||
}
|
||||
|
||||
a.options.Logger.Printf("Successfully imported remote snapshot %s", snapshotID)
|
||||
return nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user