package agate import ( "context" "errors" "fmt" "gitea.unprism.ru/KRBL/Agate/archive" "gitea.unprism.ru/KRBL/Agate/grpc" "gitea.unprism.ru/KRBL/Agate/interfaces" "io" "log" "os" "path/filepath" "sync" "gitea.unprism.ru/KRBL/Agate/store" "gitea.unprism.ru/KRBL/Agate/stores" ) // AgateOptions defines configuration options for the Agate library. type AgateOptions struct { // WorkDir is the directory where snapshots will be stored and managed. WorkDir string // OpenFunc is called after a snapshot is restored to initialize resources. // The parameter is the directory where the snapshot was extracted. OpenFunc func(dir string) error // CloseFunc is called before a snapshot is created or restored to clean up resources. CloseFunc func() error // MetadataStore is the implementation of the metadata store to use. // If nil, a default SQLite-based metadata store will be created automatically. // Use the stores package to initialize a custom implementation: // metadataStore, err := stores.NewDefaultMetadataStore(metadataDir) MetadataStore store.MetadataStore // BlobStore is the implementation of the blob store to use. // If nil, a default filesystem-based blob store will be created automatically. // Use the stores package to initialize a custom implementation: // blobStore, err := stores.NewDefaultBlobStore(blobsDir) BlobStore store.BlobStore // CleanOnRestore specifies whether the target directory should be cleaned before restoring a snapshot. CleanOnRestore bool // Logger is the logger to use for output. If nil, logging is disabled. Logger *log.Logger } // Agate is the main entry point for the snapshot library. type Agate struct { mutex sync.Mutex manager interfaces.SnapshotManager options AgateOptions metadataDir string blobsDir string currentSnapshotID string currentIDFile string } // New initializes a new instance of the Agate library with the given options. func New(options AgateOptions) (*Agate, error) { if options.WorkDir == "" { return nil, errors.New("work directory cannot be empty") } // Initialize logger if not provided if options.Logger == nil { options.Logger = log.New(io.Discard, "", 0) } // Create the work directory if it doesn't exist if err := os.MkdirAll(options.WorkDir, 0755); err != nil { return nil, fmt.Errorf("failed to create work directory: %w", err) } // Create subdirectories for metadata and blobs metadataDir := filepath.Join(options.WorkDir, "metadata") blobsDir := filepath.Join(options.WorkDir, "blobs") if err := os.MkdirAll(metadataDir, 0755); err != nil { return nil, fmt.Errorf("failed to create metadata directory: %w", err) } if err := os.MkdirAll(blobsDir, 0755); err != nil { return nil, fmt.Errorf("failed to create blobs directory: %w", err) } var metadataStore store.MetadataStore var blobStore store.BlobStore var err error // Use provided stores or initialize default ones if options.MetadataStore != nil && options.BlobStore != nil { // Use the provided stores metadataStore = options.MetadataStore blobStore = options.BlobStore } else if options.MetadataStore == nil && options.BlobStore == nil { // Initialize both stores with default implementations metadataStore, blobStore, err = stores.InitDefaultStores(options.WorkDir) if err != nil { return nil, fmt.Errorf("failed to initialize default stores: %w", err) } // Update options with the created stores options.MetadataStore = metadataStore options.BlobStore = blobStore } else if options.MetadataStore == nil { // Initialize only the metadata store metadataStore, err = stores.NewDefaultMetadataStore(metadataDir) if err != nil { return nil, fmt.Errorf("failed to initialize default metadata store: %w", err) } blobStore = options.BlobStore // Update options with the created metadata store options.MetadataStore = metadataStore } else { // Initialize only the blob store blobStore, err = stores.NewDefaultBlobStore(blobsDir) if err != nil { return nil, fmt.Errorf("failed to initialize default blob store: %w", err) } metadataStore = options.MetadataStore // Update options with the created blob store options.BlobStore = blobStore } // Create the snapshot manager manager, err := CreateSnapshotManager(metadataStore, blobStore, options.Logger) if err != nil { return nil, fmt.Errorf("failed to create snapshot manager: %w", err) } // Create a file path for storing the current snapshot ID currentIDFile := filepath.Join(options.WorkDir, "current_snapshot_id") agate := &Agate{ manager: manager, options: options, metadataDir: metadataDir, blobsDir: blobsDir, currentIDFile: currentIDFile, } // Load the current snapshot ID if it exists if _, err := os.Stat(currentIDFile); err == nil { data, err := os.ReadFile(currentIDFile) if err == nil && len(data) > 0 { agate.currentSnapshotID = string(data) } } // Call OpenFunc if provided to initialize resources in the active directory if options.OpenFunc != nil { if err := options.OpenFunc(blobStore.GetActiveDir()); err != nil { return nil, fmt.Errorf("failed to open resources during initialization: %w", err) } } return agate, nil } func (a *Agate) GetActiveDir() string { return a.options.BlobStore.GetActiveDir() } func (a *Agate) GetMetadataDir() string { return a.metadataDir } func (a *Agate) GetBlobsDir() string { return a.blobsDir } // SaveSnapshot creates a new snapshot from the current state of the active directory. // If parentID is provided, it will be set as the parent of the new snapshot. // If parentID is empty, it will use the ID of the snapshot currently loaded in the active directory. // Returns the ID of the created snapshot. func (a *Agate) SaveSnapshot(ctx context.Context, name string, parentID string) (string, error) { a.mutex.Lock() defer a.mutex.Unlock() a.options.Logger.Printf("Creating new snapshot with name: %s", name) // Call CloseFunc if provided if a.options.CloseFunc != nil { if err := a.options.CloseFunc(); err != nil { return "", fmt.Errorf("failed to close resources before snapshot: %w", err) } } defer func() { if a.options.OpenFunc != nil { if err := a.options.OpenFunc(a.options.BlobStore.GetActiveDir()); err != nil { a.options.Logger.Printf("ERROR: failed to open resources after snapshot creation: %v", err) } } }() // If parentID is not provided, use the current snapshot ID if parentID == "" { parentID = a.currentSnapshotID } effectiveParentID := parentID // Create the snapshot snapshot, err := a.manager.CreateSnapshot(ctx, a.options.BlobStore.GetActiveDir(), name, effectiveParentID) if err != nil { a.options.Logger.Printf("ERROR: failed to create snapshot: %v", err) return "", fmt.Errorf("failed to create snapshot: %w", err) } a.options.Logger.Printf("Successfully created snapshot with ID: %s", snapshot.ID) // Update the current snapshot ID to the newly created snapshot a.currentSnapshotID = snapshot.ID // Save the current snapshot ID to a file if err := a.saveCurrentSnapshotID(); err != nil { return "", fmt.Errorf("failed to save current snapshot ID: %w", err) } return snapshot.ID, nil } // RestoreSnapshot extracts a snapshot to the active directory. func (a *Agate) RestoreSnapshot(ctx context.Context, snapshotID string) error { a.mutex.Lock() defer a.mutex.Unlock() a.options.Logger.Printf("Restoring snapshot with ID: %s", snapshotID) // Call CloseFunc if provided if a.options.CloseFunc != nil { if err := a.options.CloseFunc(); err != nil { return fmt.Errorf("failed to close resources before restore: %w", err) } } // Extract the snapshot if err := a.manager.ExtractSnapshot(ctx, snapshotID, a.options.BlobStore.GetActiveDir(), a.options.CleanOnRestore); err != nil { a.options.Logger.Printf("ERROR: failed to extract snapshot: %v", err) return fmt.Errorf("failed to extract snapshot: %w", err) } a.options.Logger.Printf("Successfully restored snapshot with ID: %s", snapshotID) // Save the ID of the snapshot that was restored a.currentSnapshotID = snapshotID // Save the current snapshot ID to a file if err := a.saveCurrentSnapshotID(); err != nil { return fmt.Errorf("failed to save current snapshot ID: %w", err) } // Call OpenFunc if provided if a.options.OpenFunc != nil { if err := a.options.OpenFunc(a.options.BlobStore.GetActiveDir()); err != nil { a.options.Logger.Printf("ERROR: failed to open resources after restore: %v", err) return fmt.Errorf("failed to open resources after restore: %w", err) } } return nil } // RestoreSnapshot extracts a snapshot to the directory. func (a *Agate) RestoreSnapshotToDir(ctx context.Context, snapshotID string, dir string) error { a.mutex.Lock() defer a.mutex.Unlock() // Call CloseFunc if provided if a.options.CloseFunc != nil { if err := a.options.CloseFunc(); err != nil { return fmt.Errorf("failed to close resources before restore: %w", err) } } defer func() { if a.options.OpenFunc != nil { if err := a.options.OpenFunc(dir); err != nil { a.options.Logger.Printf("ERROR: failed to open resources after snapshot restore: %v", err) } } }() // Extract the snapshot if err := a.manager.ExtractSnapshot(ctx, snapshotID, dir, a.options.CleanOnRestore); err != nil { return fmt.Errorf("failed to extract snapshot: %w", err) } // If restoring to the active directory, save the snapshot ID if dir == a.options.BlobStore.GetActiveDir() { a.currentSnapshotID = snapshotID // Save the current snapshot ID to a file if err := a.saveCurrentSnapshotID(); err != nil { return fmt.Errorf("failed to save current snapshot ID: %w", err) } } return nil } // ListSnapshots returns a list of all available snapshots. func (a *Agate) ListSnapshots(ctx context.Context) ([]store.SnapshotInfo, error) { // Create empty ListOptions since we don't have filtering/pagination in this API yet opts := store.ListOptions{} return a.manager.ListSnapshots(ctx, opts) } // GetSnapshotDetails returns detailed information about a specific snapshot. func (a *Agate) GetSnapshotDetails(ctx context.Context, snapshotID string) (*store.Snapshot, error) { return a.manager.GetSnapshotDetails(ctx, snapshotID) } // DeleteSnapshot removes a snapshot. func (a *Agate) DeleteSnapshot(ctx context.Context, snapshotID string) error { return a.manager.DeleteSnapshot(ctx, snapshotID) } // saveCurrentSnapshotID saves the current snapshot ID to a file in the WorkDir func (a *Agate) saveCurrentSnapshotID() error { if a.currentSnapshotID == "" { // If there's no current snapshot ID, remove the file if it exists if _, err := os.Stat(a.currentIDFile); err == nil { return os.Remove(a.currentIDFile) } return nil } // Write the current snapshot ID to the file return os.WriteFile(a.currentIDFile, []byte(a.currentSnapshotID), 0644) } func (a *Agate) Open() error { return a.options.OpenFunc(a.GetActiveDir()) } // Close releases all resources used by the Agate instance. func (a *Agate) Close() error { if a.options.CloseFunc != nil { return a.options.CloseFunc() } return nil } // StartServer starts a gRPC server to share snapshots. func (a *Agate) StartServer(ctx context.Context, address string) error { _, err := grpc.RunServer(ctx, a.manager, address) if err != nil { return fmt.Errorf("failed to start server: %w", err) } // We don't store the server reference because we don't have a way to stop it yet // In a future version, we could add a StopServer method return nil } // ConnectRemote connects to a remote snapshot server. // Returns a client that can be used to interact with the remote server. func (a *Agate) ConnectRemote(address string) (*grpc.SnapshotClient, error) { client, err := grpc.ConnectToServer(address) if err != nil { return nil, fmt.Errorf("failed to connect to remote server: %w", err) } return client, nil } // GetRemoteSnapshotList retrieves a list of snapshots from a remote server. func (a *Agate) GetRemoteSnapshotList(ctx context.Context, address string) ([]store.SnapshotInfo, error) { client, err := a.ConnectRemote(address) if err != nil { return nil, err } defer client.Close() return client.ListSnapshots(ctx) } // GetRemoteSnapshot downloads a snapshot from a remote server. // If localParentID is provided, it will be used to optimize the download by skipping files that already exist locally. func (a *Agate) GetRemoteSnapshot(ctx context.Context, address string, snapshotID string, localParentID string) error { client, err := a.ConnectRemote(address) if err != nil { return err } defer client.Close() // Get the remote snapshot details remoteSnapshot, err := client.FetchSnapshotDetails(ctx, snapshotID) if err != nil { return fmt.Errorf("failed to get snapshot details: %w", err) } // Create a temporary directory for downloading files tempDownloadDir := filepath.Join(a.options.WorkDir, "temp_download", snapshotID) if err := os.MkdirAll(tempDownloadDir, 0755); err != nil { return fmt.Errorf("failed to create temporary download directory: %w", err) } defer os.RemoveAll(tempDownloadDir) // Clean up when done a.options.Logger.Printf("Downloading snapshot %s from %s", snapshotID, address) // If localParentID is provided, try to reuse files from the local parent snapshot if localParentID != "" { a.options.Logger.Printf("Using local parent snapshot %s for incremental download", localParentID) // Get the local parent snapshot details localParent, err := a.GetSnapshotDetails(ctx, localParentID) if err != nil { a.options.Logger.Printf("Warning: Failed to get local parent snapshot details: %v", err) } else { // Extract the local parent snapshot to a temporary directory localParentDir := filepath.Join(a.options.WorkDir, "temp_download", localParentID) if err := os.MkdirAll(localParentDir, 0755); err != nil { a.options.Logger.Printf("Warning: Failed to create temporary directory for local parent: %v", err) } else { defer os.RemoveAll(localParentDir) // Clean up when done if err := a.manager.ExtractSnapshot(ctx, localParentID, localParentDir, true); err != nil { a.options.Logger.Printf("Warning: Failed to extract local parent snapshot: %v", err) } else { // Copy unchanged files from the local parent to the download directory for _, file := range remoteSnapshot.Files { // Skip directories, they'll be created as needed if file.IsDir { continue } // Check if the file exists in the local parent with the same hash var localFile *store.FileInfo for _, lf := range localParent.Files { if lf.Path == file.Path && lf.SHA256 == file.SHA256 { localFile = &lf break } } if localFile != nil { // File exists in local parent with the same hash, copy it srcPath := filepath.Join(localParentDir, localFile.Path) dstPath := filepath.Join(tempDownloadDir, file.Path) // Ensure the destination directory exists if err := os.MkdirAll(filepath.Dir(dstPath), 0755); err != nil { a.options.Logger.Printf("Failed to create directory for %s: %v", dstPath, err) continue } // Copy the file srcFile, err := os.Open(srcPath) if err != nil { a.options.Logger.Printf("Failed to copy file %s, will download instead: %v", file.Path, err) continue } defer srcFile.Close() dstFile, err := os.Create(dstPath) if err != nil { a.options.Logger.Printf("Failed to create destination file %s: %v", dstPath, err) continue } _, err = io.Copy(dstFile, srcFile) dstFile.Close() if err != nil { a.options.Logger.Printf("Failed to copy file data for %s: %v", file.Path, err) // If copy fails, the file will be downloaded } else { a.options.Logger.Printf("Reusing file %s from local parent", file.Path) } } } } } } } // Download the snapshot files a.options.Logger.Printf("Downloading files for snapshot %s", snapshotID) // Get snapshot details to know what files we need to download remoteDetails, err := client.FetchSnapshotDetails(ctx, snapshotID) if err != nil { return fmt.Errorf("failed to get remote snapshot details: %w", err) } // Check which files we already have and which we need to download for _, file := range remoteDetails.Files { if file.IsDir { continue // Skip directories } filePath := filepath.Join(tempDownloadDir, file.Path) if _, err := os.Stat(filePath); os.IsNotExist(err) { // File doesn't exist yet, we'll need to download it a.options.Logger.Printf("Downloading file %s", file.Path) } } if err := client.DownloadSnapshot(ctx, snapshotID, tempDownloadDir, localParentID); err != nil { return fmt.Errorf("failed to download snapshot: %w", err) } a.options.Logger.Printf("Creating archive from downloaded files") // Create a zip archive from the downloaded files zipPath := filepath.Join(a.options.WorkDir, "temp_download", snapshotID+".zip") if err := archive.CreateArchive(tempDownloadDir, zipPath); err != nil { return fmt.Errorf("failed to create zip archive: %w", err) } // Store the blob with the remote snapshot ID zipFile, err := os.Open(zipPath) if err != nil { return fmt.Errorf("failed to open zip file: %w", err) } defer zipFile.Close() defer os.Remove(zipPath) // Clean up the zip file when done a.options.Logger.Printf("Storing blob with ID %s", remoteSnapshot.ID) // Store the blob with the remote snapshot ID _, err = a.options.BlobStore.StoreBlob(ctx, remoteSnapshot.ID, zipFile) if err != nil { return fmt.Errorf("failed to store blob: %w", err) } a.options.Logger.Printf("Saving snapshot metadata") // Save the remote snapshot metadata err = a.options.MetadataStore.SaveSnapshotMetadata(ctx, *remoteSnapshot) if err != nil { return fmt.Errorf("failed to save snapshot metadata: %w", err) } a.options.Logger.Printf("Successfully imported remote snapshot %s", snapshotID) return nil }