Introduced `GetActiveDir` and `CleanActiveDir` methods in the blob store to manage a dedicated directory for active snapshot operations. This ensures a clean working state before starting new operations and prevents conflicts. Updated related logic in snapshot creation and restoration to utilize the active directory.
144 lines
5.4 KiB
Go
144 lines
5.4 KiB
Go
package filesystem
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"gitea.unprism.ru/KRBL/Agate"
|
||
"gitea.unprism.ru/KRBL/Agate/store"
|
||
"io"
|
||
"os"
|
||
"path/filepath"
|
||
)
|
||
|
||
const blobExtension = ".zip"
|
||
|
||
// fileSystemStore реализует интерфейс store.BlobStore с использованием локальной файловой системы.
|
||
type fileSystemStore struct {
|
||
baseDir string // Директория для хранения блобов (архивов)
|
||
activeDir string // Директория для активных операций (создание и восстановление)
|
||
}
|
||
|
||
// NewFileSystemStore создает новое хранилище блобов в указанной директории.
|
||
func NewFileSystemStore(baseDir string) (store.BlobStore, error) {
|
||
// Убедимся, что базовая директория существует
|
||
if err := os.MkdirAll(baseDir, 0755); err != nil {
|
||
return nil, fmt.Errorf("failed to create base directory %s for filesystem blob store: %w", baseDir, err)
|
||
}
|
||
|
||
// Создаем директорию для активных операций внутри базовой директории
|
||
activeDir := filepath.Join(baseDir, "active")
|
||
if err := os.MkdirAll(activeDir, 0755); err != nil {
|
||
return nil, fmt.Errorf("failed to create active directory %s for filesystem blob store: %w", activeDir, err)
|
||
}
|
||
|
||
return &fileSystemStore{
|
||
baseDir: baseDir,
|
||
activeDir: activeDir,
|
||
}, nil
|
||
}
|
||
|
||
// getBlobPath формирует полный путь к файлу блоба.
|
||
func (fs *fileSystemStore) getBlobPath(snapshotID string) string {
|
||
// Используем ID снапшота в качестве имени файла
|
||
return filepath.Join(fs.baseDir, snapshotID+blobExtension)
|
||
}
|
||
|
||
// StoreBlob сохраняет данные из reader в файл в baseDir.
|
||
func (fs *fileSystemStore) StoreBlob(ctx context.Context, snapshotID string, reader io.Reader) (string, error) {
|
||
blobPath := fs.getBlobPath(snapshotID)
|
||
|
||
// Создаем или перезаписываем файл
|
||
file, err := os.Create(blobPath)
|
||
if err != nil {
|
||
return "", fmt.Errorf("failed to create blob file %s: %w", blobPath, err)
|
||
}
|
||
defer file.Close() // Гарантируем закрытие файла
|
||
|
||
// Копируем данные из ридера в файл
|
||
_, err = io.Copy(file, reader)
|
||
if err != nil {
|
||
// Если произошла ошибка копирования, удаляем неполный файл
|
||
os.Remove(blobPath)
|
||
return "", fmt.Errorf("failed to write data to blob file %s: %w", blobPath, err)
|
||
}
|
||
|
||
// Возвращаем путь к созданному файлу
|
||
return blobPath, nil
|
||
}
|
||
|
||
// RetrieveBlob открывает файл блоба и возвращает его как io.ReadCloser.
|
||
func (fs *fileSystemStore) RetrieveBlob(ctx context.Context, snapshotID string) (io.ReadCloser, error) {
|
||
blobPath := fs.getBlobPath(snapshotID)
|
||
|
||
// Открываем файл для чтения
|
||
file, err := os.Open(blobPath)
|
||
if err != nil {
|
||
if os.IsNotExist(err) {
|
||
// Если файл не найден, возвращаем кастомную ошибку
|
||
return nil, agate.ErrNotFound
|
||
}
|
||
return nil, fmt.Errorf("failed to open blob file %s: %w", blobPath, err)
|
||
}
|
||
|
||
// Возвращаем открытый файл (*os.File реализует io.ReadCloser)
|
||
return file, nil
|
||
}
|
||
|
||
// DeleteBlob удаляет файл блоба из файловой системы.
|
||
func (fs *fileSystemStore) DeleteBlob(ctx context.Context, snapshotID string) error {
|
||
blobPath := fs.getBlobPath(snapshotID)
|
||
|
||
// Удаляем файл
|
||
err := os.Remove(blobPath)
|
||
if err != nil {
|
||
if os.IsNotExist(err) {
|
||
// Если файл и так не существует, это не ошибка
|
||
return nil
|
||
}
|
||
// Если произошла другая ошибка при удалении
|
||
return fmt.Errorf("failed to delete blob file %s: %w", blobPath, err)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// GetBlobPath возвращает путь к файлу блоба, если он существует.
|
||
func (fs *fileSystemStore) GetBlobPath(ctx context.Context, snapshotID string) (string, error) {
|
||
blobPath := fs.getBlobPath(snapshotID)
|
||
|
||
// Проверяем существование файла
|
||
if _, err := os.Stat(blobPath); err != nil {
|
||
if os.IsNotExist(err) {
|
||
return "", agate.ErrNotFound
|
||
}
|
||
return "", fmt.Errorf("failed to stat blob file %s: %w", blobPath, err)
|
||
}
|
||
|
||
// Файл существует, возвращаем путь
|
||
return blobPath, nil
|
||
}
|
||
|
||
// GetActiveDir возвращает путь к директории для активных операций.
|
||
func (fs *fileSystemStore) GetActiveDir() string {
|
||
return fs.activeDir
|
||
}
|
||
|
||
// CleanActiveDir очищает директорию для активных операций.
|
||
// Это полезно перед началом новых операций, чтобы избежать конфликтов.
|
||
func (fs *fileSystemStore) CleanActiveDir(ctx context.Context) error {
|
||
// Удаляем все файлы в активной директории, но сохраняем саму директорию
|
||
entries, err := os.ReadDir(fs.activeDir)
|
||
if err != nil {
|
||
return fmt.Errorf("failed to read active directory: %w", err)
|
||
}
|
||
|
||
for _, entry := range entries {
|
||
path := filepath.Join(fs.activeDir, entry.Name())
|
||
if err := os.RemoveAll(path); err != nil {
|
||
return fmt.Errorf("failed to remove %s from active directory: %w", path, err)
|
||
}
|
||
}
|
||
|
||
return nil
|
||
}
|