432 lines
14 KiB
Go
432 lines
14 KiB
Go
package agate
|
||
|
||
import (
|
||
"context"
|
||
"errors"
|
||
"fmt"
|
||
"gitea.unprism.ru/KRBL/Agate/archive"
|
||
"gitea.unprism.ru/KRBL/Agate/interfaces"
|
||
"gitea.unprism.ru/KRBL/Agate/remote"
|
||
"io"
|
||
"log"
|
||
"os"
|
||
"path/filepath"
|
||
"sync"
|
||
|
||
"gitea.unprism.ru/KRBL/Agate/store"
|
||
"gitea.unprism.ru/KRBL/Agate/stores"
|
||
)
|
||
|
||
// AgateOptions defines configuration options for the Agate library.
|
||
type AgateOptions struct {
|
||
// WorkDir is the directory where snapshots will be stored and managed.
|
||
WorkDir string
|
||
|
||
// OpenFunc is called after a snapshot is restored to initialize resources.
|
||
// The parameter is the directory where the snapshot was extracted.
|
||
OpenFunc func(dir string) error
|
||
|
||
// CloseFunc is called before a snapshot is created or restored to clean up resources.
|
||
CloseFunc func() error
|
||
|
||
// MetadataStore is the implementation of the metadata store to use.
|
||
// If nil, a default SQLite-based metadata store will be created automatically.
|
||
// Use the stores package to initialize a custom implementation:
|
||
// metadataStore, err := stores.NewDefaultMetadataStore(metadataDir)
|
||
MetadataStore store.MetadataStore
|
||
|
||
// BlobStore is the implementation of the blob store to use.
|
||
// If nil, a default filesystem-based blob store will be created automatically.
|
||
// Use the stores package to initialize a custom implementation:
|
||
// blobStore, err := stores.NewDefaultBlobStore(blobsDir)
|
||
BlobStore store.BlobStore
|
||
|
||
// CleanOnRestore specifies whether the target directory should be cleaned before restoring a snapshot.
|
||
CleanOnRestore bool
|
||
|
||
// Logger is the logger to use for output. If nil, logging is disabled.
|
||
Logger *log.Logger
|
||
}
|
||
|
||
// Agate is the main entry point for the snapshot library.
|
||
type Agate struct {
|
||
mutex sync.Mutex
|
||
manager interfaces.SnapshotManager
|
||
options AgateOptions
|
||
metadataDir string
|
||
blobsDir string
|
||
currentSnapshotID string
|
||
currentIDFile string
|
||
}
|
||
|
||
// New initializes a new instance of the Agate library with the given options.
|
||
func New(options AgateOptions) (*Agate, error) {
|
||
if options.WorkDir == "" {
|
||
return nil, errors.New("work directory cannot be empty")
|
||
}
|
||
|
||
// Initialize logger if not provided
|
||
if options.Logger == nil {
|
||
options.Logger = log.New(io.Discard, "", 0)
|
||
}
|
||
|
||
// Create the work directory if it doesn't exist
|
||
if err := os.MkdirAll(options.WorkDir, 0755); err != nil {
|
||
return nil, fmt.Errorf("failed to create work directory: %w", err)
|
||
}
|
||
|
||
// Create subdirectories for metadata and blobs
|
||
metadataDir := filepath.Join(options.WorkDir, "metadata")
|
||
blobsDir := filepath.Join(options.WorkDir, "blobs")
|
||
|
||
if err := os.MkdirAll(metadataDir, 0755); err != nil {
|
||
return nil, fmt.Errorf("failed to create metadata directory: %w", err)
|
||
}
|
||
|
||
if err := os.MkdirAll(blobsDir, 0755); err != nil {
|
||
return nil, fmt.Errorf("failed to create blobs directory: %w", err)
|
||
}
|
||
|
||
var metadataStore store.MetadataStore
|
||
var blobStore store.BlobStore
|
||
var err error
|
||
|
||
// Use provided stores or initialize default ones
|
||
if options.MetadataStore != nil && options.BlobStore != nil {
|
||
// Use the provided stores
|
||
metadataStore = options.MetadataStore
|
||
blobStore = options.BlobStore
|
||
} else if options.MetadataStore == nil && options.BlobStore == nil {
|
||
// Initialize both stores with default implementations
|
||
metadataStore, blobStore, err = stores.InitDefaultStores(options.WorkDir)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("failed to initialize default stores: %w", err)
|
||
}
|
||
// Update options with the created stores
|
||
options.MetadataStore = metadataStore
|
||
options.BlobStore = blobStore
|
||
} else if options.MetadataStore == nil {
|
||
// Initialize only the metadata store
|
||
metadataStore, err = stores.NewDefaultMetadataStore(metadataDir)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("failed to initialize default metadata store: %w", err)
|
||
}
|
||
blobStore = options.BlobStore
|
||
// Update options with the created metadata store
|
||
options.MetadataStore = metadataStore
|
||
} else {
|
||
// Initialize only the blob store
|
||
blobStore, err = stores.NewDefaultBlobStore(blobsDir)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("failed to initialize default blob store: %w", err)
|
||
}
|
||
metadataStore = options.MetadataStore
|
||
// Update options with the created blob store
|
||
options.BlobStore = blobStore
|
||
}
|
||
|
||
// Create the snapshot manager
|
||
manager, err := CreateSnapshotManager(metadataStore, blobStore, options.Logger)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("failed to create snapshot manager: %w", err)
|
||
}
|
||
|
||
// Create a file path for storing the current snapshot ID
|
||
currentIDFile := filepath.Join(options.WorkDir, "current_snapshot_id")
|
||
|
||
agate := &Agate{
|
||
manager: manager,
|
||
options: options,
|
||
metadataDir: metadataDir,
|
||
blobsDir: blobsDir,
|
||
currentIDFile: currentIDFile,
|
||
}
|
||
|
||
// Load the current snapshot ID if it exists
|
||
if _, err := os.Stat(currentIDFile); err == nil {
|
||
data, err := os.ReadFile(currentIDFile)
|
||
if err == nil && len(data) > 0 {
|
||
agate.currentSnapshotID = string(data)
|
||
}
|
||
}
|
||
|
||
// Call OpenFunc if provided to initialize resources in the active directory
|
||
if options.OpenFunc != nil {
|
||
if err := options.OpenFunc(blobStore.GetActiveDir()); err != nil {
|
||
return nil, fmt.Errorf("failed to open resources during initialization: %w", err)
|
||
}
|
||
}
|
||
|
||
return agate, nil
|
||
}
|
||
|
||
func (a *Agate) GetActiveDir() string {
|
||
return a.options.BlobStore.GetActiveDir()
|
||
}
|
||
|
||
func (a *Agate) GetMetadataDir() string {
|
||
return a.metadataDir
|
||
}
|
||
|
||
func (a *Agate) GetBlobsDir() string {
|
||
return a.blobsDir
|
||
}
|
||
|
||
// SaveSnapshot creates a new snapshot from the current state of the active directory.
|
||
// If parentID is provided, it will be set as the parent of the new snapshot.
|
||
// If parentID is empty, it will use the ID of the snapshot currently loaded in the active directory.
|
||
// Returns the ID of the created snapshot.
|
||
func (a *Agate) SaveSnapshot(ctx context.Context, name string, parentID string) (string, error) {
|
||
a.mutex.Lock()
|
||
defer a.mutex.Unlock()
|
||
|
||
a.options.Logger.Printf("Creating new snapshot with name: %s", name)
|
||
|
||
// Call CloseFunc if provided
|
||
if a.options.CloseFunc != nil {
|
||
if err := a.options.CloseFunc(); err != nil {
|
||
return "", fmt.Errorf("failed to close resources before snapshot: %w", err)
|
||
}
|
||
}
|
||
|
||
defer func() {
|
||
if a.options.OpenFunc != nil {
|
||
if err := a.options.OpenFunc(a.options.BlobStore.GetActiveDir()); err != nil {
|
||
a.options.Logger.Printf("ERROR: failed to open resources after snapshot creation: %v", err)
|
||
}
|
||
}
|
||
}()
|
||
|
||
// If parentID is not provided, use the current snapshot ID
|
||
if parentID == "" {
|
||
parentID = a.currentSnapshotID
|
||
}
|
||
|
||
effectiveParentID := parentID
|
||
|
||
// Create the snapshot
|
||
snapshot, err := a.manager.CreateSnapshot(ctx, a.options.BlobStore.GetActiveDir(), name, effectiveParentID)
|
||
if err != nil {
|
||
a.options.Logger.Printf("ERROR: failed to create snapshot: %v", err)
|
||
return "", fmt.Errorf("failed to create snapshot: %w", err)
|
||
}
|
||
|
||
a.options.Logger.Printf("Successfully created snapshot with ID: %s", snapshot.ID)
|
||
|
||
// Update the current snapshot ID to the newly created snapshot
|
||
a.currentSnapshotID = snapshot.ID
|
||
|
||
// Save the current snapshot ID to a file
|
||
if err := a.saveCurrentSnapshotID(); err != nil {
|
||
return "", fmt.Errorf("failed to save current snapshot ID: %w", err)
|
||
}
|
||
|
||
return snapshot.ID, nil
|
||
}
|
||
|
||
// RestoreSnapshot extracts a snapshot to the active directory.
|
||
func (a *Agate) RestoreSnapshot(ctx context.Context, snapshotID string) error {
|
||
a.mutex.Lock()
|
||
defer a.mutex.Unlock()
|
||
|
||
a.options.Logger.Printf("Restoring snapshot with ID: %s", snapshotID)
|
||
|
||
// Call CloseFunc if provided
|
||
if a.options.CloseFunc != nil {
|
||
if err := a.options.CloseFunc(); err != nil {
|
||
return fmt.Errorf("failed to close resources before restore: %w", err)
|
||
}
|
||
}
|
||
|
||
// Extract the snapshot
|
||
if err := a.manager.ExtractSnapshot(ctx, snapshotID, a.options.BlobStore.GetActiveDir(), a.options.CleanOnRestore); err != nil {
|
||
a.options.Logger.Printf("ERROR: failed to extract snapshot: %v", err)
|
||
return fmt.Errorf("failed to extract snapshot: %w", err)
|
||
}
|
||
|
||
a.options.Logger.Printf("Successfully restored snapshot with ID: %s", snapshotID)
|
||
|
||
// Save the ID of the snapshot that was restored
|
||
a.currentSnapshotID = snapshotID
|
||
|
||
// Save the current snapshot ID to a file
|
||
if err := a.saveCurrentSnapshotID(); err != nil {
|
||
return fmt.Errorf("failed to save current snapshot ID: %w", err)
|
||
}
|
||
|
||
// Call OpenFunc if provided
|
||
if a.options.OpenFunc != nil {
|
||
if err := a.options.OpenFunc(a.options.BlobStore.GetActiveDir()); err != nil {
|
||
a.options.Logger.Printf("ERROR: failed to open resources after restore: %v", err)
|
||
return fmt.Errorf("failed to open resources after restore: %w", err)
|
||
}
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// RestoreSnapshot extracts a snapshot to the directory.
|
||
func (a *Agate) RestoreSnapshotToDir(ctx context.Context, snapshotID string, dir string) error {
|
||
a.mutex.Lock()
|
||
defer a.mutex.Unlock()
|
||
|
||
// Call CloseFunc if provided
|
||
if a.options.CloseFunc != nil {
|
||
if err := a.options.CloseFunc(); err != nil {
|
||
return fmt.Errorf("failed to close resources before restore: %w", err)
|
||
}
|
||
}
|
||
|
||
defer func() {
|
||
if a.options.OpenFunc != nil {
|
||
if err := a.options.OpenFunc(dir); err != nil {
|
||
a.options.Logger.Printf("ERROR: failed to open resources after snapshot restore: %v", err)
|
||
}
|
||
}
|
||
}()
|
||
|
||
// Extract the snapshot
|
||
if err := a.manager.ExtractSnapshot(ctx, snapshotID, dir, a.options.CleanOnRestore); err != nil {
|
||
return fmt.Errorf("failed to extract snapshot: %w", err)
|
||
}
|
||
|
||
// If restoring to the active directory, save the snapshot ID
|
||
if dir == a.options.BlobStore.GetActiveDir() {
|
||
a.currentSnapshotID = snapshotID
|
||
|
||
// Save the current snapshot ID to a file
|
||
if err := a.saveCurrentSnapshotID(); err != nil {
|
||
return fmt.Errorf("failed to save current snapshot ID: %w", err)
|
||
}
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// ListSnapshots returns a list of all available snapshots.
|
||
func (a *Agate) ListSnapshots(ctx context.Context) ([]store.SnapshotInfo, error) {
|
||
// Create empty ListOptions since we don't have filtering/pagination in this API yet
|
||
opts := store.ListOptions{}
|
||
return a.manager.ListSnapshots(ctx, opts)
|
||
}
|
||
|
||
// GetSnapshotDetails returns detailed information about a specific snapshot.
|
||
func (a *Agate) GetSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error) {
|
||
return a.manager.GetSnapshotDetails(ctx, snapshotID)
|
||
}
|
||
|
||
// DeleteSnapshot removes a snapshot.
|
||
func (a *Agate) DeleteSnapshot(ctx context.Context, snapshotID string) error {
|
||
return a.manager.DeleteSnapshot(ctx, snapshotID)
|
||
}
|
||
|
||
// saveCurrentSnapshotID saves the current snapshot ID to a file in the WorkDir
|
||
func (a *Agate) saveCurrentSnapshotID() error {
|
||
if a.currentSnapshotID == "" {
|
||
// If there's no current snapshot ID, remove the file if it exists
|
||
if _, err := os.Stat(a.currentIDFile); err == nil {
|
||
return os.Remove(a.currentIDFile)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// Write the current snapshot ID to the file
|
||
return os.WriteFile(a.currentIDFile, []byte(a.currentSnapshotID), 0644)
|
||
}
|
||
|
||
func (a *Agate) Open() error {
|
||
return a.options.OpenFunc(a.GetActiveDir())
|
||
}
|
||
|
||
// Close releases all resources used by the Agate instance.
|
||
func (a *Agate) Close() error {
|
||
if a.options.CloseFunc != nil {
|
||
return a.options.CloseFunc()
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// StartServer starts a gRPC server to share snapshots.
|
||
func (a *Agate) StartServer(ctx context.Context, address string) error {
|
||
// Использование нового remote.Server
|
||
server := remote.NewServer(a.manager)
|
||
return server.Start(ctx, address)
|
||
}
|
||
|
||
// GetRemoteSnapshot downloads a snapshot from a remote server, using an efficient differential update.
|
||
func (a *Agate) GetRemoteSnapshot(ctx context.Context, address string, snapshotID string, localParentID string) error {
|
||
client, err := remote.NewClient(address)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer client.Close()
|
||
|
||
remoteSnapshot, err := client.FetchSnapshotDetails(ctx, snapshotID)
|
||
if err != nil {
|
||
return fmt.Errorf("failed to get remote snapshot details: %w", err)
|
||
}
|
||
|
||
// 1. Подготовка
|
||
tempDownloadDir := filepath.Join(a.options.WorkDir, "temp_download")
|
||
if err := os.MkdirAll(tempDownloadDir, 0755); err != nil {
|
||
return fmt.Errorf("failed to create temp download dir: %w", err)
|
||
}
|
||
diffArchivePath := filepath.Join(tempDownloadDir, snapshotID+"_diff.zip")
|
||
diffPartPath := diffArchivePath + ".part"
|
||
|
||
// 2. Скачивание дельты с докачкой
|
||
a.options.Logger.Printf("Downloading diff for snapshot %s from parent %s", snapshotID, localParentID)
|
||
if err := client.DownloadSnapshotDiff(ctx, snapshotID, localParentID, diffPartPath); err != nil {
|
||
return fmt.Errorf("failed to download snapshot diff: %w", err)
|
||
}
|
||
if err := os.Rename(diffPartPath, diffArchivePath); err != nil {
|
||
return fmt.Errorf("failed to finalize downloaded diff: %w", err)
|
||
}
|
||
defer os.Remove(diffArchivePath)
|
||
|
||
// 3. Атомарное применение
|
||
// Создаем новую директорию для снапшота
|
||
newSnapshotDir := filepath.Join(tempDownloadDir, "new_content_"+snapshotID)
|
||
if err := os.MkdirAll(newSnapshotDir, 0755); err != nil {
|
||
return fmt.Errorf("failed to create new snapshot directory: %w", err)
|
||
}
|
||
defer os.RemoveAll(newSnapshotDir)
|
||
|
||
// Если есть родитель, извлекаем его содержимое
|
||
if localParentID != "" {
|
||
if err := a.manager.ExtractSnapshot(ctx, localParentID, newSnapshotDir, false); err != nil {
|
||
a.options.Logger.Printf("Warning: failed to extract local parent snapshot %s: %v", localParentID, err)
|
||
}
|
||
}
|
||
|
||
// Распаковываем дельта-архив поверх
|
||
if err := extractArchive(diffArchivePath, newSnapshotDir); err != nil {
|
||
return fmt.Errorf("failed to extract diff archive: %w", err)
|
||
}
|
||
|
||
// 4. Создаем финальный архив и регистрируем снапшот
|
||
finalArchivePath := filepath.Join(tempDownloadDir, snapshotID+".zip")
|
||
if err := archive.CreateArchive(newSnapshotDir, finalArchivePath); err != nil {
|
||
return fmt.Errorf("failed to create final snapshot archive: %w", err)
|
||
}
|
||
defer os.Remove(finalArchivePath)
|
||
|
||
finalArchiveFile, err := os.Open(finalArchivePath)
|
||
if err != nil {
|
||
return fmt.Errorf("failed to open final archive: %w", err)
|
||
}
|
||
defer finalArchiveFile.Close()
|
||
|
||
if _, err := a.options.BlobStore.StoreBlob(ctx, snapshotID, finalArchiveFile); err != nil {
|
||
return fmt.Errorf("failed to store final blob: %w", err)
|
||
}
|
||
|
||
if err := a.options.MetadataStore.SaveSnapshotMetadata(ctx, *remoteSnapshot); err != nil {
|
||
a.options.BlobStore.DeleteBlob(ctx, snapshotID) // Откат
|
||
return fmt.Errorf("failed to save snapshot metadata: %w", err)
|
||
}
|
||
|
||
a.options.Logger.Printf("Successfully imported remote snapshot %s", snapshotID)
|
||
return nil
|
||
}
|