Files
Agate/store/filesystem/filesystem.go

200 lines
7.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package filesystem
import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"time"
"gitea.unprism.ru/KRBL/Agate/models"
"gitea.unprism.ru/KRBL/Agate/store"
log "github.com/sirupsen/logrus"
)
const blobExtension = ".zip"
// progressReader — обертка для логирования
type progressReader struct {
reader io.Reader
totalRead int64
lastLog time.Time
snapshotID string
}
func (pr *progressReader) Read(p []byte) (int, error) {
n, err := pr.reader.Read(p)
pr.totalRead += int64(n)
// Логируем каждые 5 секунд
if time.Since(pr.lastLog) > 5*time.Second {
log.Printf("Snapshot %s download progress: %.2f MB downloaded",
pr.snapshotID, float64(pr.totalRead)/1024/1024)
pr.lastLog = time.Now()
}
return n, err
}
// fileSystemStore реализует интерфейс store.BlobStore с использованием локальной файловой системы.
type fileSystemStore struct {
baseDir string // Директория для хранения блобов (архивов)
activeDir string // Директория для активных операций (создание и восстановление)
}
// NewFileSystemStore создает новое хранилище блобов в указанной директории.
func NewFileSystemStore(baseDir string) (store.BlobStore, error) {
// Убедимся, что базовая директория существует
if err := os.MkdirAll(baseDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create base directory %s for filesystem blob store: %w", baseDir, err)
}
// Создаем директорию для активных операций внутри базовой директории
activeDir := filepath.Join(baseDir, "active")
if err := os.MkdirAll(activeDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create active directory %s for filesystem blob store: %w", activeDir, err)
}
return &fileSystemStore{
baseDir: baseDir,
activeDir: activeDir,
}, nil
}
// getBlobPath формирует полный путь к файлу блоба.
func (fs *fileSystemStore) getBlobPath(snapshotID string) string {
// Используем ID снапшота в качестве имени файла
return filepath.Join(fs.baseDir, snapshotID+blobExtension)
}
// StoreBlob сохраняет данные из reader в файл в baseDir.
func (fs *fileSystemStore) StoreBlob(ctx context.Context, snapshotID string, reader io.Reader) (string, error) {
blobPath := fs.getBlobPath(snapshotID)
log.Printf("Starting to store blob for snapshot %s at %s", snapshotID, blobPath)
// Используем временный файл, чтобы не повредить (возможно) существующий валидный файл
// если загрузка упадет на середине.
tempPath := blobPath + ".tmp"
file, err := os.Create(tempPath)
if err != nil {
return "", fmt.Errorf("failed to create temp blob file %s: %w", tempPath, err)
}
defer file.Close()
// Оборачиваем reader для логирования
pr := &progressReader{
reader: reader,
snapshotID: snapshotID,
lastLog: time.Now(),
}
// Копируем данные
written, err := io.Copy(file, pr)
if err != nil {
file.Close()
os.Remove(tempPath) // Удаляем мусор
return "", fmt.Errorf("failed to write data to blob file %s: %w", tempPath, err)
}
// Важный момент: Sync, чтобы убедиться, что данные на диске
if err := file.Sync(); err != nil {
file.Close()
os.Remove(tempPath)
return "", fmt.Errorf("failed to sync file to disk: %w", err)
}
file.Close() // Закрываем перед переименованием
// Переименовываем временный файл в финальный (атомарная операция)
if err := os.Rename(tempPath, blobPath); err != nil {
return "", fmt.Errorf("failed to rename temp file to final blob: %w", err)
}
log.Printf("Successfully stored blob for snapshot %s. Total size: %.2f MB",
snapshotID, float64(written)/1024/1024)
return blobPath, nil
}
// RetrieveBlob открывает файл блоба и возвращает его как io.ReadCloser.
func (fs *fileSystemStore) RetrieveBlob(ctx context.Context, snapshotID string) (io.ReadCloser, error) {
blobPath := fs.getBlobPath(snapshotID)
// Открываем файл для чтения
file, err := os.Open(blobPath)
if err != nil {
if os.IsNotExist(err) {
// Если файл не найден, возвращаем кастомную ошибку
return nil, models.ErrNotFound
}
return nil, fmt.Errorf("failed to open blob file %s: %w", blobPath, err)
}
// Возвращаем открытый файл (*os.File реализует io.ReadCloser)
return file, nil
}
// DeleteBlob удаляет файл блоба из файловой системы.
func (fs *fileSystemStore) DeleteBlob(ctx context.Context, snapshotID string) error {
blobPath := fs.getBlobPath(snapshotID)
// Удаляем файл
err := os.Remove(blobPath)
if err != nil {
if os.IsNotExist(err) {
// Если файл и так не существует, это не ошибка
return nil
}
// Если произошла другая ошибка при удалении
return fmt.Errorf("failed to delete blob file %s: %w", blobPath, err)
}
return nil
}
// GetBlobPath возвращает путь к файлу блоба, если он существует.
func (fs *fileSystemStore) GetBlobPath(ctx context.Context, snapshotID string) (string, error) {
blobPath := fs.getBlobPath(snapshotID)
// Проверяем существование файла
if _, err := os.Stat(blobPath); err != nil {
if os.IsNotExist(err) {
return "", models.ErrNotFound
}
return "", fmt.Errorf("failed to stat blob file %s: %w", blobPath, err)
}
// Файл существует, возвращаем путь
return blobPath, nil
}
// GetActiveDir возвращает путь к директории для активных операций.
func (fs *fileSystemStore) GetBaseDir() string {
return fs.baseDir
}
// GetActiveDir возвращает путь к директории для активных операций.
func (fs *fileSystemStore) GetActiveDir() string {
return fs.activeDir
}
// CleanActiveDir очищает директорию для активных операций.
// Это полезно перед началом новых операций, чтобы избежать конфликтов.
func (fs *fileSystemStore) CleanActiveDir(ctx context.Context) error {
// Удаляем все файлы в активной директории, но сохраняем саму директорию
entries, err := os.ReadDir(fs.activeDir)
if err != nil {
return fmt.Errorf("failed to read active directory: %w", err)
}
for _, entry := range entries {
path := filepath.Join(fs.activeDir, entry.Name())
if err := os.RemoveAll(path); err != nil {
return fmt.Errorf("failed to remove %s from active directory: %w", path, err)
}
}
return nil
}