Remove obsolete gRPC client/server implementations and migrate to remote package
This commit is contained in:
450
api.go
450
api.go
@ -5,15 +5,13 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"gitea.unprism.ru/KRBL/Agate/archive"
|
||||
"gitea.unprism.ru/KRBL/Agate/grpc"
|
||||
"gitea.unprism.ru/KRBL/Agate/hash"
|
||||
"gitea.unprism.ru/KRBL/Agate/interfaces"
|
||||
"gitea.unprism.ru/KRBL/Agate/remote"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"gitea.unprism.ru/KRBL/Agate/store"
|
||||
"gitea.unprism.ru/KRBL/Agate/stores"
|
||||
@ -350,429 +348,81 @@ func (a *Agate) Close() error {
|
||||
|
||||
// StartServer starts a gRPC server to share snapshots.
|
||||
func (a *Agate) StartServer(ctx context.Context, address string) error {
|
||||
_, err := grpc.RunServer(ctx, a.manager, address)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to start server: %w", err)
|
||||
}
|
||||
|
||||
// We don't store the server reference because we don't have a way to stop it yet
|
||||
// In a future version, we could add a StopServer method
|
||||
|
||||
return nil
|
||||
// Использование нового remote.Server
|
||||
server := remote.NewServer(a.manager)
|
||||
return server.Start(ctx, address)
|
||||
}
|
||||
|
||||
// ConnectRemote connects to a remote snapshot server.
|
||||
// Returns a client that can be used to interact with the remote server.
|
||||
func (a *Agate) ConnectRemote(address string) (*grpc.SnapshotClient, error) {
|
||||
client, err := grpc.ConnectToServer(address)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to connect to remote server: %w", err)
|
||||
}
|
||||
|
||||
return client, nil
|
||||
}
|
||||
|
||||
// GetRemoteSnapshotList retrieves a list of snapshots from a remote server.
|
||||
func (a *Agate) GetRemoteSnapshotList(ctx context.Context, address string) ([]store.SnapshotInfo, error) {
|
||||
client, err := a.ConnectRemote(address)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
return client.ListSnapshots(ctx)
|
||||
}
|
||||
|
||||
// RegisterLocalSnapshot registers a local snapshot from an archive path with a specified UUID.
|
||||
// The archive must be a valid ZIP file containing the snapshot files.
|
||||
// If the UUID already exists, an error will be returned.
|
||||
func (a *Agate) RegisterLocalSnapshot(ctx context.Context, archivePath string, snapshotID string, name string) error {
|
||||
a.mutex.Lock()
|
||||
defer a.mutex.Unlock()
|
||||
|
||||
a.options.Logger.Printf("Registering local snapshot from archive %s with ID %s", archivePath, snapshotID)
|
||||
|
||||
// Check if the archive file exists
|
||||
if _, err := os.Stat(archivePath); os.IsNotExist(err) {
|
||||
return fmt.Errorf("archive file does not exist: %w", err)
|
||||
}
|
||||
|
||||
// Check if a snapshot with this ID already exists
|
||||
_, err := a.options.MetadataStore.GetSnapshotMetadata(ctx, snapshotID)
|
||||
if err == nil {
|
||||
return fmt.Errorf("snapshot with ID %s already exists", snapshotID)
|
||||
} else if !errors.Is(err, store.ErrNotFound) {
|
||||
return fmt.Errorf("failed to check if snapshot exists: %w", err)
|
||||
}
|
||||
|
||||
// Create a temporary directory for extracting the archive
|
||||
tempDir := filepath.Join(a.options.WorkDir, "temp_extract", snapshotID)
|
||||
if err := os.MkdirAll(tempDir, 0755); err != nil {
|
||||
return fmt.Errorf("failed to create temporary directory: %w", err)
|
||||
}
|
||||
defer os.RemoveAll(tempDir) // Clean up when done
|
||||
|
||||
// Extract the archive to the temporary directory to analyze its contents
|
||||
if err := extractArchive(archivePath, tempDir); err != nil {
|
||||
return fmt.Errorf("failed to extract archive: %w", err)
|
||||
}
|
||||
|
||||
// Get the list of files in the archive
|
||||
var files []store.FileInfo
|
||||
err = filepath.Walk(tempDir, func(path string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Skip the root directory
|
||||
if path == tempDir {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get the relative path
|
||||
relPath, err := filepath.Rel(tempDir, path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get relative path: %w", err)
|
||||
}
|
||||
|
||||
// Calculate SHA256 for files (not directories)
|
||||
var sha256 string
|
||||
if !info.IsDir() {
|
||||
// Calculate the hash directly from the file path
|
||||
sha256, err = hash.CalculateFileHash(path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to calculate file hash: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Add file info to the list
|
||||
files = append(files, store.FileInfo{
|
||||
Path: relPath,
|
||||
Size: info.Size(),
|
||||
IsDir: info.IsDir(),
|
||||
SHA256: sha256,
|
||||
})
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to analyze archive contents: %w", err)
|
||||
}
|
||||
|
||||
// Copy the archive to the blob store
|
||||
archiveFile, err := os.Open(archivePath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open archive file: %w", err)
|
||||
}
|
||||
defer archiveFile.Close()
|
||||
|
||||
// Store the blob with the specified snapshot ID
|
||||
_, err = a.options.BlobStore.StoreBlob(ctx, snapshotID, archiveFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to store blob: %w", err)
|
||||
}
|
||||
|
||||
// Create and save the snapshot metadata
|
||||
snapshot := store.Snapshot{
|
||||
ID: snapshotID,
|
||||
Name: name,
|
||||
ParentID: "",
|
||||
CreationTime: time.Now(),
|
||||
Files: files,
|
||||
}
|
||||
|
||||
err = a.options.MetadataStore.SaveSnapshotMetadata(ctx, snapshot)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to save snapshot metadata: %w", err)
|
||||
}
|
||||
|
||||
a.options.Logger.Printf("Successfully registered local snapshot with ID %s", snapshotID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetRemoteSnapshotMetadata downloads only the metadata of a snapshot from a remote server.
|
||||
// If address is empty, it will try to restore the metadata from the local blob.
|
||||
func (a *Agate) GetRemoteSnapshotMetadata(ctx context.Context, address string, snapshotID string) error {
|
||||
a.mutex.Lock()
|
||||
defer a.mutex.Unlock()
|
||||
|
||||
// Check if the snapshot already exists locally
|
||||
_, err := a.options.MetadataStore.GetSnapshotMetadata(ctx, snapshotID)
|
||||
if err == nil {
|
||||
a.options.Logger.Printf("Snapshot %s already exists locally", snapshotID)
|
||||
return nil
|
||||
} else if !errors.Is(err, store.ErrNotFound) {
|
||||
return fmt.Errorf("failed to check if snapshot exists: %w", err)
|
||||
}
|
||||
|
||||
// If address is provided, download metadata from remote server
|
||||
if address != "" {
|
||||
a.options.Logger.Printf("Downloading metadata for snapshot %s from %s", snapshotID, address)
|
||||
|
||||
client, err := a.ConnectRemote(address)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to connect to remote server: %w", err)
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
// Get the remote snapshot details
|
||||
remoteSnapshot, err := client.FetchSnapshotDetails(ctx, snapshotID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get snapshot details: %w", err)
|
||||
}
|
||||
|
||||
// Save the remote snapshot metadata
|
||||
err = a.options.MetadataStore.SaveSnapshotMetadata(ctx, *remoteSnapshot)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to save snapshot metadata: %w", err)
|
||||
}
|
||||
|
||||
a.options.Logger.Printf("Successfully downloaded metadata for snapshot %s", snapshotID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// If no address is provided, try to restore metadata from the local blob
|
||||
a.options.Logger.Printf("Trying to restore metadata for snapshot %s from local blob", snapshotID)
|
||||
|
||||
// Check if the blob exists
|
||||
blobPath, err := a.options.BlobStore.GetBlobPath(ctx, snapshotID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get blob path: %w", err)
|
||||
}
|
||||
|
||||
if _, err := os.Stat(blobPath); os.IsNotExist(err) {
|
||||
return fmt.Errorf("blob for snapshot %s does not exist", snapshotID)
|
||||
}
|
||||
|
||||
// Create a temporary directory for extracting the archive
|
||||
tempDir := filepath.Join(a.options.WorkDir, "temp_extract", snapshotID)
|
||||
if err := os.MkdirAll(tempDir, 0755); err != nil {
|
||||
return fmt.Errorf("failed to create temporary directory: %w", err)
|
||||
}
|
||||
defer os.RemoveAll(tempDir) // Clean up when done
|
||||
|
||||
// Extract the archive to the temporary directory to analyze its contents
|
||||
if err := extractArchive(blobPath, tempDir); err != nil {
|
||||
return fmt.Errorf("failed to extract archive: %w", err)
|
||||
}
|
||||
|
||||
// Get the list of files in the archive
|
||||
var files []store.FileInfo
|
||||
err = filepath.Walk(tempDir, func(path string, info os.FileInfo, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Skip the root directory
|
||||
if path == tempDir {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get the relative path
|
||||
relPath, err := filepath.Rel(tempDir, path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get relative path: %w", err)
|
||||
}
|
||||
|
||||
// Calculate SHA256 for files (not directories)
|
||||
var sha256 string
|
||||
if !info.IsDir() {
|
||||
// Calculate the hash directly from the file path
|
||||
sha256, err = hash.CalculateFileHash(path)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to calculate file hash: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Add file info to the list
|
||||
files = append(files, store.FileInfo{
|
||||
Path: relPath,
|
||||
Size: info.Size(),
|
||||
IsDir: info.IsDir(),
|
||||
SHA256: sha256,
|
||||
})
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to analyze archive contents: %w", err)
|
||||
}
|
||||
|
||||
// Create and save the snapshot metadata
|
||||
snapshot := store.Snapshot{
|
||||
ID: snapshotID,
|
||||
Name: snapshotID, // Use the ID as the name since we don't have a better name
|
||||
ParentID: "",
|
||||
CreationTime: time.Now(),
|
||||
Files: files,
|
||||
}
|
||||
|
||||
err = a.options.MetadataStore.SaveSnapshotMetadata(ctx, snapshot)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to save snapshot metadata: %w", err)
|
||||
}
|
||||
|
||||
a.options.Logger.Printf("Successfully restored metadata for snapshot %s from local blob", snapshotID)
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetRemoteSnapshot downloads a snapshot from a remote server.
|
||||
// If localParentID is provided, it will be used to optimize the download by skipping files that already exist locally.
|
||||
// GetRemoteSnapshot downloads a snapshot from a remote server, using an efficient differential update.
|
||||
func (a *Agate) GetRemoteSnapshot(ctx context.Context, address string, snapshotID string, localParentID string) error {
|
||||
client, err := a.ConnectRemote(address)
|
||||
client, err := remote.NewClient(address)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
// Get the remote snapshot details
|
||||
remoteSnapshot, err := client.FetchSnapshotDetails(ctx, snapshotID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get snapshot details: %w", err)
|
||||
}
|
||||
|
||||
// Create a temporary directory for downloading files
|
||||
tempDownloadDir := filepath.Join(a.options.WorkDir, "temp_download", snapshotID)
|
||||
if err := os.MkdirAll(tempDownloadDir, 0755); err != nil {
|
||||
return fmt.Errorf("failed to create temporary download directory: %w", err)
|
||||
}
|
||||
defer os.RemoveAll(tempDownloadDir) // Clean up when done
|
||||
|
||||
a.options.Logger.Printf("Downloading snapshot %s from %s", snapshotID, address)
|
||||
|
||||
// If localParentID is provided, try to reuse files from the local parent snapshot
|
||||
if localParentID != "" {
|
||||
a.options.Logger.Printf("Using local parent snapshot %s for incremental download", localParentID)
|
||||
|
||||
// Get the local parent snapshot details
|
||||
localParent, err := a.GetSnapshotDetails(ctx, localParentID)
|
||||
if err != nil {
|
||||
a.options.Logger.Printf("Warning: Failed to get local parent snapshot details: %v", err)
|
||||
} else {
|
||||
// Extract the local parent snapshot to a temporary directory
|
||||
localParentDir := filepath.Join(a.options.WorkDir, "temp_download", localParentID)
|
||||
if err := os.MkdirAll(localParentDir, 0755); err != nil {
|
||||
a.options.Logger.Printf("Warning: Failed to create temporary directory for local parent: %v", err)
|
||||
} else {
|
||||
defer os.RemoveAll(localParentDir) // Clean up when done
|
||||
|
||||
if err := a.manager.ExtractSnapshot(ctx, localParentID, localParentDir, true); err != nil {
|
||||
a.options.Logger.Printf("Warning: Failed to extract local parent snapshot: %v", err)
|
||||
} else {
|
||||
// Copy unchanged files from the local parent to the download directory
|
||||
for _, file := range remoteSnapshot.Files {
|
||||
// Skip directories, they'll be created as needed
|
||||
if file.IsDir {
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if the file exists in the local parent with the same hash
|
||||
var localFile *store.FileInfo
|
||||
for _, lf := range localParent.Files {
|
||||
if lf.Path == file.Path && lf.SHA256 == file.SHA256 {
|
||||
localFile = &lf
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if localFile != nil {
|
||||
// File exists in local parent with the same hash, copy it
|
||||
srcPath := filepath.Join(localParentDir, localFile.Path)
|
||||
dstPath := filepath.Join(tempDownloadDir, file.Path)
|
||||
|
||||
// Ensure the destination directory exists
|
||||
if err := os.MkdirAll(filepath.Dir(dstPath), 0755); err != nil {
|
||||
a.options.Logger.Printf("Failed to create directory for %s: %v", dstPath, err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Copy the file
|
||||
srcFile, err := os.Open(srcPath)
|
||||
if err != nil {
|
||||
a.options.Logger.Printf("Failed to copy file %s, will download instead: %v", file.Path, err)
|
||||
continue
|
||||
}
|
||||
defer srcFile.Close()
|
||||
|
||||
dstFile, err := os.Create(dstPath)
|
||||
if err != nil {
|
||||
a.options.Logger.Printf("Failed to create destination file %s: %v", dstPath, err)
|
||||
continue
|
||||
}
|
||||
|
||||
_, err = io.Copy(dstFile, srcFile)
|
||||
dstFile.Close()
|
||||
|
||||
if err != nil {
|
||||
a.options.Logger.Printf("Failed to copy file data for %s: %v", file.Path, err)
|
||||
// If copy fails, the file will be downloaded
|
||||
} else {
|
||||
a.options.Logger.Printf("Reusing file %s from local parent", file.Path)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Download the snapshot files
|
||||
a.options.Logger.Printf("Downloading files for snapshot %s", snapshotID)
|
||||
|
||||
// Get snapshot details to know what files we need to download
|
||||
remoteDetails, err := client.FetchSnapshotDetails(ctx, snapshotID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get remote snapshot details: %w", err)
|
||||
}
|
||||
|
||||
// Check which files we already have and which we need to download
|
||||
for _, file := range remoteDetails.Files {
|
||||
if file.IsDir {
|
||||
continue // Skip directories
|
||||
}
|
||||
// 1. Подготовка
|
||||
tempDownloadDir := filepath.Join(a.options.WorkDir, "temp_download")
|
||||
if err := os.MkdirAll(tempDownloadDir, 0755); err != nil {
|
||||
return fmt.Errorf("failed to create temp download dir: %w", err)
|
||||
}
|
||||
diffArchivePath := filepath.Join(tempDownloadDir, snapshotID+"_diff.zip")
|
||||
diffPartPath := diffArchivePath + ".part"
|
||||
|
||||
filePath := filepath.Join(tempDownloadDir, file.Path)
|
||||
if _, err := os.Stat(filePath); os.IsNotExist(err) {
|
||||
// File doesn't exist yet, we'll need to download it
|
||||
a.options.Logger.Printf("Downloading file %s", file.Path)
|
||||
// 2. Скачивание дельты с докачкой
|
||||
a.options.Logger.Printf("Downloading diff for snapshot %s from parent %s", snapshotID, localParentID)
|
||||
if err := client.DownloadSnapshotDiff(ctx, snapshotID, localParentID, diffPartPath); err != nil {
|
||||
return fmt.Errorf("failed to download snapshot diff: %w", err)
|
||||
}
|
||||
if err := os.Rename(diffPartPath, diffArchivePath); err != nil {
|
||||
return fmt.Errorf("failed to finalize downloaded diff: %w", err)
|
||||
}
|
||||
defer os.Remove(diffArchivePath)
|
||||
|
||||
// 3. Атомарное применение
|
||||
// Создаем новую директорию для снапшота
|
||||
newSnapshotDir := filepath.Join(tempDownloadDir, "new_content_"+snapshotID)
|
||||
if err := os.MkdirAll(newSnapshotDir, 0755); err != nil {
|
||||
return fmt.Errorf("failed to create new snapshot directory: %w", err)
|
||||
}
|
||||
defer os.RemoveAll(newSnapshotDir)
|
||||
|
||||
// Если есть родитель, извлекаем его содержимое
|
||||
if localParentID != "" {
|
||||
if err := a.manager.ExtractSnapshot(ctx, localParentID, newSnapshotDir, false); err != nil {
|
||||
a.options.Logger.Printf("Warning: failed to extract local parent snapshot %s: %v", localParentID, err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := client.DownloadSnapshot(ctx, snapshotID, tempDownloadDir, localParentID); err != nil {
|
||||
return fmt.Errorf("failed to download snapshot: %w", err)
|
||||
// Распаковываем дельта-архив поверх
|
||||
if err := extractArchive(diffArchivePath, newSnapshotDir); err != nil {
|
||||
return fmt.Errorf("failed to extract diff archive: %w", err)
|
||||
}
|
||||
|
||||
a.options.Logger.Printf("Creating archive from downloaded files")
|
||||
|
||||
// Create a zip archive from the downloaded files
|
||||
zipPath := filepath.Join(a.options.WorkDir, "temp_download", snapshotID+".zip")
|
||||
if err := archive.CreateArchive(tempDownloadDir, zipPath); err != nil {
|
||||
return fmt.Errorf("failed to create zip archive: %w", err)
|
||||
// 4. Создаем финальный архив и регистрируем снапшот
|
||||
finalArchivePath := filepath.Join(tempDownloadDir, snapshotID+".zip")
|
||||
if err := archive.CreateArchive(newSnapshotDir, finalArchivePath); err != nil {
|
||||
return fmt.Errorf("failed to create final snapshot archive: %w", err)
|
||||
}
|
||||
defer os.Remove(finalArchivePath)
|
||||
|
||||
// Store the blob with the remote snapshot ID
|
||||
zipFile, err := os.Open(zipPath)
|
||||
finalArchiveFile, err := os.Open(finalArchivePath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open zip file: %w", err)
|
||||
return fmt.Errorf("failed to open final archive: %w", err)
|
||||
}
|
||||
defer zipFile.Close()
|
||||
defer os.Remove(zipPath) // Clean up the zip file when done
|
||||
defer finalArchiveFile.Close()
|
||||
|
||||
a.options.Logger.Printf("Storing blob with ID %s", remoteSnapshot.ID)
|
||||
|
||||
// Store the blob with the remote snapshot ID
|
||||
_, err = a.options.BlobStore.StoreBlob(ctx, remoteSnapshot.ID, zipFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to store blob: %w", err)
|
||||
if _, err := a.options.BlobStore.StoreBlob(ctx, snapshotID, finalArchiveFile); err != nil {
|
||||
return fmt.Errorf("failed to store final blob: %w", err)
|
||||
}
|
||||
|
||||
a.options.Logger.Printf("Saving snapshot metadata")
|
||||
|
||||
// Save the remote snapshot metadata
|
||||
err = a.options.MetadataStore.SaveSnapshotMetadata(ctx, *remoteSnapshot)
|
||||
if err != nil {
|
||||
if err := a.options.MetadataStore.SaveSnapshotMetadata(ctx, *remoteSnapshot); err != nil {
|
||||
a.options.BlobStore.DeleteBlob(ctx, snapshotID) // Откат
|
||||
return fmt.Errorf("failed to save snapshot metadata: %w", err)
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user