Добавлена локальная загрузка снапшота
This commit is contained in:
70
manager.go
70
manager.go
@@ -403,6 +403,37 @@ func (data *SnapshotManagerData) UpdateSnapshotMetadata(ctx context.Context, sna
|
||||
return nil
|
||||
}
|
||||
|
||||
func (data *SnapshotManagerData) GetSnapshotDiffInfo(ctx context.Context, snapshotID, parentID string) (*store.DiffInfo, error) {
|
||||
tempArchivePath, tempStagingDir, err := data.createDiffArchive(ctx, snapshotID, parentID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create diff archive for info: %w", err)
|
||||
}
|
||||
|
||||
if tempArchivePath == "" {
|
||||
return &store.DiffInfo{SHA256: "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855", Size: 0}, nil // sha256 of empty string
|
||||
}
|
||||
|
||||
defer os.Remove(tempArchivePath)
|
||||
if tempStagingDir != "" {
|
||||
defer os.RemoveAll(tempStagingDir)
|
||||
}
|
||||
|
||||
hash, err := hash.CalculateFileHash(tempArchivePath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to calculate hash for diff archive: %w", err)
|
||||
}
|
||||
|
||||
stat, err := os.Stat(tempArchivePath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get size of diff archive: %w", err)
|
||||
}
|
||||
|
||||
return &store.DiffInfo{
|
||||
SHA256: hash,
|
||||
Size: stat.Size(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// diffArchiveReader is a wrapper around an *os.File that handles cleanup of temporary files.
|
||||
type diffArchiveReader struct {
|
||||
*os.File
|
||||
@@ -418,10 +449,10 @@ func (r *diffArchiveReader) Close() error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (data *SnapshotManagerData) StreamSnapshotDiff(ctx context.Context, snapshotID, parentID string, offset int64) (io.ReadCloser, error) {
|
||||
func (data *SnapshotManagerData) createDiffArchive(ctx context.Context, snapshotID, parentID string) (string, string, error) {
|
||||
targetSnap, err := data.metadataStore.GetSnapshotMetadata(ctx, snapshotID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get target snapshot metadata: %w", err)
|
||||
return "", "", fmt.Errorf("failed to get target snapshot metadata: %w", err)
|
||||
}
|
||||
|
||||
parentFiles := make(map[string]string)
|
||||
@@ -449,38 +480,38 @@ func (data *SnapshotManagerData) StreamSnapshotDiff(ctx context.Context, snapsho
|
||||
}
|
||||
|
||||
if len(filesToInclude) == 0 {
|
||||
return io.NopCloser(bytes.NewReader(nil)), nil
|
||||
return "", "", nil
|
||||
}
|
||||
|
||||
tempStagingDir, err := os.MkdirTemp(data.blobStore.GetBaseDir(), "diff-staging-*")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create temp staging directory: %w", err)
|
||||
return "", "", fmt.Errorf("failed to create temp staging directory: %w", err)
|
||||
}
|
||||
|
||||
targetBlobPath, err := data.blobStore.GetBlobPath(ctx, snapshotID)
|
||||
if err != nil {
|
||||
os.RemoveAll(tempStagingDir)
|
||||
return nil, err
|
||||
return "", "", err
|
||||
}
|
||||
|
||||
for _, filePath := range filesToInclude {
|
||||
destPath := filepath.Join(tempStagingDir, filePath)
|
||||
if err := os.MkdirAll(filepath.Dir(destPath), 0755); err != nil {
|
||||
os.RemoveAll(tempStagingDir)
|
||||
return nil, fmt.Errorf("failed to create dir for diff file: %w", err)
|
||||
return "", "", fmt.Errorf("failed to create dir for diff file: %w", err)
|
||||
}
|
||||
|
||||
fileWriter, err := os.Create(destPath)
|
||||
if err != nil {
|
||||
os.RemoveAll(tempStagingDir)
|
||||
return nil, err
|
||||
return "", "", err
|
||||
}
|
||||
|
||||
err = archive.ExtractFileFromArchive(targetBlobPath, filePath, fileWriter)
|
||||
fileWriter.Close()
|
||||
if err != nil {
|
||||
os.RemoveAll(tempStagingDir)
|
||||
return nil, fmt.Errorf("failed to extract file %s for diff: %w", filePath, err)
|
||||
return "", "", fmt.Errorf("failed to extract file %s for diff: %w", filePath, err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -488,12 +519,27 @@ func (data *SnapshotManagerData) StreamSnapshotDiff(ctx context.Context, snapsho
|
||||
if err := archive.CreateArchive(tempStagingDir, tempArchivePath); err != nil {
|
||||
os.RemoveAll(tempStagingDir)
|
||||
os.Remove(tempArchivePath)
|
||||
return nil, fmt.Errorf("failed to create diff archive: %w", err)
|
||||
return "", "", fmt.Errorf("failed to create diff archive: %w", err)
|
||||
}
|
||||
|
||||
return tempArchivePath, tempStagingDir, nil
|
||||
}
|
||||
|
||||
func (data *SnapshotManagerData) StreamSnapshotDiff(ctx context.Context, snapshotID, parentID string, offset int64) (io.ReadCloser, error) {
|
||||
tempArchivePath, tempStagingDir, err := data.createDiffArchive(ctx, snapshotID, parentID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create diff archive for streaming: %w", err)
|
||||
}
|
||||
|
||||
if tempArchivePath == "" {
|
||||
return io.NopCloser(bytes.NewReader(nil)), nil
|
||||
}
|
||||
|
||||
archiveFile, err := os.Open(tempArchivePath)
|
||||
if err != nil {
|
||||
os.RemoveAll(tempStagingDir)
|
||||
if tempStagingDir != "" {
|
||||
os.RemoveAll(tempStagingDir)
|
||||
}
|
||||
os.Remove(tempArchivePath)
|
||||
return nil, err
|
||||
}
|
||||
@@ -501,7 +547,9 @@ func (data *SnapshotManagerData) StreamSnapshotDiff(ctx context.Context, snapsho
|
||||
if offset > 0 {
|
||||
if _, err := archiveFile.Seek(offset, io.SeekStart); err != nil {
|
||||
archiveFile.Close()
|
||||
os.RemoveAll(tempStagingDir)
|
||||
if tempStagingDir != "" {
|
||||
os.RemoveAll(tempStagingDir)
|
||||
}
|
||||
os.Remove(tempArchivePath)
|
||||
return nil, fmt.Errorf("failed to seek in diff archive: %w", err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user