Files
Agate/api.go

544 lines
18 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package agate
import (
"context"
"errors"
"fmt"
"io"
"log"
"os"
"path/filepath"
"sync"
"time"
"gitea.unprism.ru/KRBL/Agate/archive"
"gitea.unprism.ru/KRBL/Agate/interfaces"
"gitea.unprism.ru/KRBL/Agate/models"
"gitea.unprism.ru/KRBL/Agate/remote"
"gitea.unprism.ru/KRBL/Agate/store"
"gitea.unprism.ru/KRBL/Agate/stores"
)
// AgateOptions defines configuration options for the Agate library.
type AgateOptions struct {
// WorkDir is the directory where snapshots will be stored and managed.
WorkDir string
// OpenFunc is called after a snapshot is restored to initialize resources.
// The parameter is the directory where the snapshot was extracted.
OpenFunc func(dir string) error
// CloseFunc is called before a snapshot is created or restored to clean up resources.
CloseFunc func() error
// MetadataStore is the implementation of the metadata store to use.
// If nil, a default SQLite-based metadata store will be created automatically.
// Use the stores package to initialize a custom implementation:
// metadataStore, err := stores.NewDefaultMetadataStore(metadataDir)
MetadataStore store.MetadataStore
// BlobStore is the implementation of the blob store to use.
// If nil, a default filesystem-based blob store will be created automatically.
// Use the stores package to initialize a custom implementation:
// blobStore, err := stores.NewDefaultBlobStore(blobsDir)
BlobStore store.BlobStore
// CleanOnRestore specifies whether the target directory should be cleaned before restoring a snapshot.
CleanOnRestore bool
// Logger is the logger to use for output. If nil, logging is disabled.
Logger *log.Logger
}
// Agate is the main entry point for the snapshot library.
type Agate struct {
mutex sync.Mutex
manager interfaces.SnapshotManager
options AgateOptions
metadataDir string
blobsDir string
currentSnapshotID string
currentIDFile string
}
// New initializes a new instance of the Agate library with the given options.
func New(options AgateOptions) (*Agate, error) {
if options.WorkDir == "" {
return nil, errors.New("work directory cannot be empty")
}
// Initialize logger if not provided
if options.Logger == nil {
options.Logger = log.New(io.Discard, "", 0)
}
// Create the work directory if it doesn't exist
if err := os.MkdirAll(options.WorkDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create work directory: %w", err)
}
// Create subdirectories for metadata and blobs
metadataDir := filepath.Join(options.WorkDir, "metadata")
blobsDir := filepath.Join(options.WorkDir, "blobs")
if err := os.MkdirAll(metadataDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create metadata directory: %w", err)
}
if err := os.MkdirAll(blobsDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create blobs directory: %w", err)
}
var metadataStore store.MetadataStore
var blobStore store.BlobStore
var err error
// Use provided stores or initialize default ones
if options.MetadataStore != nil && options.BlobStore != nil {
// Use the provided stores
metadataStore = options.MetadataStore
blobStore = options.BlobStore
} else if options.MetadataStore == nil && options.BlobStore == nil {
// Initialize both stores with default implementations
metadataStore, blobStore, err = stores.InitDefaultStores(options.WorkDir)
if err != nil {
return nil, fmt.Errorf("failed to initialize default stores: %w", err)
}
// Update options with the created stores
options.MetadataStore = metadataStore
options.BlobStore = blobStore
} else if options.MetadataStore == nil {
// Initialize only the metadata store
metadataStore, err = stores.NewDefaultMetadataStore(metadataDir)
if err != nil {
return nil, fmt.Errorf("failed to initialize default metadata store: %w", err)
}
blobStore = options.BlobStore
// Update options with the created metadata store
options.MetadataStore = metadataStore
} else {
// Initialize only the blob store
blobStore, err = stores.NewDefaultBlobStore(blobsDir)
if err != nil {
return nil, fmt.Errorf("failed to initialize default blob store: %w", err)
}
metadataStore = options.MetadataStore
// Update options with the created blob store
options.BlobStore = blobStore
}
// Create the snapshot manager
manager, err := CreateSnapshotManager(metadataStore, blobStore, options.Logger)
if err != nil {
return nil, fmt.Errorf("failed to create snapshot manager: %w", err)
}
// Create a file path for storing the current snapshot ID
currentIDFile := filepath.Join(options.WorkDir, "current_snapshot_id")
agate := &Agate{
manager: manager,
options: options,
metadataDir: metadataDir,
blobsDir: blobsDir,
currentIDFile: currentIDFile,
}
// Load the current snapshot ID if it exists
if _, err := os.Stat(currentIDFile); err == nil {
data, err := os.ReadFile(currentIDFile)
if err == nil && len(data) > 0 {
agate.currentSnapshotID = string(data)
}
}
// Call OpenFunc if provided to initialize resources in the active directory
if options.OpenFunc != nil {
if err := options.OpenFunc(blobStore.GetActiveDir()); err != nil {
return nil, fmt.Errorf("failed to open resources during initialization: %w", err)
}
}
return agate, nil
}
func (a *Agate) GetActiveDir() string {
return a.options.BlobStore.GetActiveDir()
}
func (a *Agate) GetMetadataDir() string {
return a.metadataDir
}
func (a *Agate) GetBlobsDir() string {
return a.blobsDir
}
// SaveSnapshot creates a new snapshot from the current state of the active directory.
// If parentID is provided, it will be set as the parent of the new snapshot.
// If parentID is empty, it will use the ID of the snapshot currently loaded in the active directory.
// Returns the ID of the created snapshot.
func (a *Agate) SaveSnapshot(ctx context.Context, name string, parentID string) (string, error) {
a.mutex.Lock()
defer a.mutex.Unlock()
a.options.Logger.Printf("Creating new snapshot with name: %s", name)
// Call CloseFunc if provided
if a.options.CloseFunc != nil {
if err := a.options.CloseFunc(); err != nil {
return "", fmt.Errorf("failed to close resources before snapshot: %w", err)
}
}
defer func() {
if a.options.OpenFunc != nil {
if err := a.options.OpenFunc(a.options.BlobStore.GetActiveDir()); err != nil {
a.options.Logger.Printf("ERROR: failed to open resources after snapshot creation: %v", err)
}
}
}()
// If parentID is not provided, use the current snapshot ID
if parentID == "" {
parentID = a.currentSnapshotID
}
effectiveParentID := parentID
// Create the snapshot
snapshot, err := a.manager.CreateSnapshot(ctx, a.options.BlobStore.GetActiveDir(), name, effectiveParentID)
if err != nil {
a.options.Logger.Printf("ERROR: failed to create snapshot: %v", err)
return "", fmt.Errorf("failed to create snapshot: %w", err)
}
a.options.Logger.Printf("Successfully created snapshot with ID: %s", snapshot.ID)
// Update the current snapshot ID to the newly created snapshot
a.currentSnapshotID = snapshot.ID
// Save the current snapshot ID to a file
if err := a.saveCurrentSnapshotID(); err != nil {
return "", fmt.Errorf("failed to save current snapshot ID: %w", err)
}
return snapshot.ID, nil
}
// RestoreSnapshot extracts a snapshot to the active directory.
func (a *Agate) RestoreSnapshot(ctx context.Context, snapshotID string) error {
a.mutex.Lock()
defer a.mutex.Unlock()
a.options.Logger.Printf("Restoring snapshot with ID: %s", snapshotID)
// Call CloseFunc if provided
if a.options.CloseFunc != nil {
if err := a.options.CloseFunc(); err != nil {
return fmt.Errorf("failed to close resources before restore: %w", err)
}
}
// Extract the snapshot
if err := a.manager.ExtractSnapshot(ctx, snapshotID, a.options.BlobStore.GetActiveDir(), a.options.CleanOnRestore); err != nil {
a.options.Logger.Printf("ERROR: failed to extract snapshot: %v", err)
return fmt.Errorf("failed to extract snapshot: %w", err)
}
a.options.Logger.Printf("Successfully restored snapshot with ID: %s", snapshotID)
// Save the ID of the snapshot that was restored
a.currentSnapshotID = snapshotID
// Save the current snapshot ID to a file
if err := a.saveCurrentSnapshotID(); err != nil {
return fmt.Errorf("failed to save current snapshot ID: %w", err)
}
// Call OpenFunc if provided
if a.options.OpenFunc != nil {
if err := a.options.OpenFunc(a.options.BlobStore.GetActiveDir()); err != nil {
a.options.Logger.Printf("ERROR: failed to open resources after restore: %v", err)
return fmt.Errorf("failed to open resources after restore: %w", err)
}
}
return nil
}
// RestoreSnapshot extracts a snapshot to the directory.
func (a *Agate) RestoreSnapshotToDir(ctx context.Context, snapshotID string, dir string) error {
a.mutex.Lock()
defer a.mutex.Unlock()
// Call CloseFunc if provided
if a.options.CloseFunc != nil {
if err := a.options.CloseFunc(); err != nil {
return fmt.Errorf("failed to close resources before restore: %w", err)
}
}
defer func() {
if a.options.OpenFunc != nil {
if err := a.options.OpenFunc(dir); err != nil {
a.options.Logger.Printf("ERROR: failed to open resources after snapshot restore: %v", err)
}
}
}()
// Extract the snapshot
if err := a.manager.ExtractSnapshot(ctx, snapshotID, dir, a.options.CleanOnRestore); err != nil {
return fmt.Errorf("failed to extract snapshot: %w", err)
}
// If restoring to the active directory, save the snapshot ID
if dir == a.options.BlobStore.GetActiveDir() {
a.currentSnapshotID = snapshotID
// Save the current snapshot ID to a file
if err := a.saveCurrentSnapshotID(); err != nil {
return fmt.Errorf("failed to save current snapshot ID: %w", err)
}
}
return nil
}
// ListSnapshots returns a list of all available snapshots.
func (a *Agate) ListSnapshots(ctx context.Context) ([]store.SnapshotInfo, error) {
// Create empty ListOptions since we don't have filtering/pagination in this API yet
opts := store.ListOptions{}
return a.manager.ListSnapshots(ctx, opts)
}
// GetSnapshotDetails returns detailed information about a specific snapshot.
func (a *Agate) GetSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error) {
return a.manager.GetSnapshotDetails(ctx, snapshotID)
}
// DeleteSnapshot removes a snapshot.
func (a *Agate) DeleteSnapshot(ctx context.Context, snapshotID string) error {
return a.manager.DeleteSnapshot(ctx, snapshotID)
}
// saveCurrentSnapshotID saves the current snapshot ID to a file in the WorkDir
func (a *Agate) saveCurrentSnapshotID() error {
if a.currentSnapshotID == "" {
// If there's no current snapshot ID, remove the file if it exists
if _, err := os.Stat(a.currentIDFile); err == nil {
if err := os.Remove(a.currentIDFile); err != nil {
return err
}
}
return nil
}
// Write the current snapshot ID to the file
return os.WriteFile(a.currentIDFile, []byte(a.currentSnapshotID), 0644)
}
func (a *Agate) Open() error {
return a.options.OpenFunc(a.GetActiveDir())
}
// Close releases all resources used by the Agate instance.
func (a *Agate) Close() error {
if a.options.CloseFunc != nil {
return a.options.CloseFunc()
}
return nil
}
// StartServer starts a gRPC server to share snapshots.
func (a *Agate) StartServer(ctx context.Context, address string) error {
server := remote.NewServer(a.manager)
return server.Start(ctx, address)
}
// GetRemoteSnapshot downloads a snapshot from a remote server, using an efficient differential update.
func (a *Agate) GetRemoteSnapshot(ctx context.Context, address string, snapshotID string, localParentID string) error {
client, err := remote.NewClient(address)
if err != nil {
return err
}
defer func() {
if err := client.Close(); err != nil {
a.options.Logger.Printf("ERROR: failed to close client: %v", err)
}
}()
remoteSnapshot, err := client.FetchSnapshotDetails(ctx, snapshotID)
if err != nil {
return fmt.Errorf("failed to get remote snapshot details: %w", err)
}
tempDownloadDir := filepath.Join(a.options.WorkDir, "temp_download")
if err := os.MkdirAll(tempDownloadDir, 0755); err != nil {
return fmt.Errorf("failed to create temp download dir: %w", err)
}
newSnapshotDir := filepath.Join(tempDownloadDir, "new_content_"+snapshotID)
if err := os.MkdirAll(newSnapshotDir, 0755); err != nil {
return fmt.Errorf("failed to create new snapshot directory: %w", err)
}
defer func() {
if err := os.RemoveAll(newSnapshotDir); err != nil {
a.options.Logger.Printf("ERROR: failed to remove temp dir: %v", err)
}
}()
if localParentID != "" {
if err := a.manager.ExtractSnapshot(ctx, localParentID, newSnapshotDir, false); err != nil {
a.options.Logger.Printf("Warning: failed to extract local parent snapshot %s: %v", localParentID, err)
} else {
localParentSnap, err := a.GetSnapshotDetails(ctx, localParentID)
if err != nil {
a.options.Logger.Printf("Warning: failed to get local parent details %s: %v", localParentID, err)
} else {
remoteFilesMap := make(map[string]struct{})
for _, f := range remoteSnapshot.Files {
remoteFilesMap[f.Path] = struct{}{}
}
for _, localFile := range localParentSnap.Files {
if _, exists := remoteFilesMap[localFile.Path]; !exists {
pathToDelete := filepath.Join(newSnapshotDir, localFile.Path)
if err := os.RemoveAll(pathToDelete); err != nil {
a.options.Logger.Printf("Warning: failed to delete file %s during diff apply: %v", pathToDelete, err)
}
}
}
}
}
}
diffArchivePath := filepath.Join(tempDownloadDir, snapshotID+"_diff.zip")
diffPartPath := diffArchivePath + ".part"
a.options.Logger.Printf("Downloading diff for snapshot %s from parent %s", snapshotID, localParentID)
if err := client.DownloadSnapshotDiff(ctx, snapshotID, localParentID, diffPartPath); err != nil {
return fmt.Errorf("failed to download snapshot diff: %w", err)
}
if err := os.Rename(diffPartPath, diffArchivePath); err != nil {
return fmt.Errorf("failed to finalize downloaded diff: %w", err)
}
defer func() {
if err := os.Remove(diffArchivePath); err != nil {
a.options.Logger.Printf("ERROR: failed to remove temp file: %v", err)
}
}()
if err := extractArchive(diffArchivePath, newSnapshotDir); err != nil {
return fmt.Errorf("failed to extract diff archive: %w", err)
}
finalArchivePath := filepath.Join(tempDownloadDir, snapshotID+".zip")
if err := archive.CreateArchive(newSnapshotDir, finalArchivePath); err != nil {
return fmt.Errorf("failed to create final snapshot archive: %w", err)
}
defer func() {
if err := os.Remove(finalArchivePath); err != nil {
a.options.Logger.Printf("ERROR: failed to remove temp file: %v", err)
}
}()
finalArchiveFile, err := os.Open(finalArchivePath)
if err != nil {
return fmt.Errorf("failed to open final archive: %w", err)
}
defer finalArchiveFile.Close()
if _, err := a.options.BlobStore.StoreBlob(ctx, snapshotID, finalArchiveFile); err != nil {
return fmt.Errorf("failed to store final blob: %w", err)
}
if err := a.options.MetadataStore.SaveSnapshotMetadata(ctx, *remoteSnapshot); err != nil {
a.options.BlobStore.DeleteBlob(ctx, snapshotID)
return fmt.Errorf("failed to save snapshot metadata: %w", err)
}
a.options.Logger.Printf("Successfully imported remote snapshot %s", snapshotID)
return nil
}
func (a *Agate) GetCurrentSnapshotID() string {
return a.currentSnapshotID
}
// RegisterLocalSnapshot регистрирует локальный файл как блоб снимка и создает
// соответствующую запись в метаданных. Если снимок с таким ID уже существует,
// метод ничего не делает и возвращает nil.
//
// - ctx: Контекст для выполнения операции.
// - snapshotID: ID регистрируемого снимка.
// - parentID: ID родительского снимка. Может быть пустым для полных снимков.
// - name: Описательное имя для снимка.
// - localPath: Абсолютный путь к локальному файлу снимка (полному или дифф-архиву).
func (ag *Agate) RegisterLocalSnapshot(ctx context.Context, snapshotID, parentID, name, localPath string) error {
// 1. Check if snapshot already exists
_, err := ag.manager.GetSnapshotDetails(ctx, snapshotID)
if err == nil {
ag.options.Logger.Printf("snapshot %s already exists, skipping registration", snapshotID)
return nil // Snapshot already exists
}
// We expect ErrNotFound, anything else is a real error.
if !errors.Is(err, models.ErrNotFound) {
return fmt.Errorf("failed to check for existing snapshot: %w", err)
}
// 2. Add the file to the blob store
localFile, err := os.Open(localPath)
if err != nil {
return fmt.Errorf("failed to open local snapshot file: %w", err)
}
defer func() {
if err := localFile.Close(); err != nil {
ag.options.Logger.Printf("ERROR: failed to close local file: %v", err)
}
}()
_, err = ag.options.BlobStore.StoreBlob(ctx, snapshotID, localFile)
if err != nil {
return fmt.Errorf("failed to store blob from local file: %w", err)
}
// 3. Create and save snapshot metadata
// We get the file list from the archive to create the metadata.
// Note: This method does not calculate file hashes, so the metadata will be incomplete.
// This is a limitation of the current implementation.
var files []store.FileInfo
archiveFiles, err := archive.ListArchiveContents(localPath)
if err != nil {
// If we can't list the contents, we can't create the metadata.
// We should clean up the blob we just stored.
_ = ag.options.BlobStore.DeleteBlob(ctx, snapshotID)
return fmt.Errorf("failed to list archive contents for metadata creation: %w", err)
}
for _, f := range archiveFiles {
files = append(files, store.FileInfo{
Path: f.Path,
Size: int64(f.Size),
IsDir: f.IsDir,
// SHA256 is intentionally left empty as we don't have it.
})
}
snapshot := store.Snapshot{
ID: snapshotID,
Name: name,
ParentID: parentID,
CreationTime: time.Now(),
Files: files,
}
if err := ag.options.MetadataStore.SaveSnapshotMetadata(ctx, snapshot); err != nil {
// Clean up the blob
_ = ag.options.BlobStore.DeleteBlob(ctx, snapshotID)
return fmt.Errorf("failed to save snapshot metadata: %w", err)
}
ag.options.Logger.Printf("Successfully registered local snapshot %s", snapshotID)
return nil
}