From b96ea087f522df5cfbe314a7485fb24fe5bb3bfc Mon Sep 17 00:00:00 2001 From: Nikolay Edigaryev Date: Thu, 19 Dec 2024 21:21:33 +0400 Subject: [PATCH] tart pull: re-try disk layer downloads by specifying "Range" header (#980) --- Sources/tart/OCI/Layerizer/DiskV2.swift | 25 ++++++++++++++++++++----- Sources/tart/OCI/Registry.swift | 19 ++++++++++++++++--- Sources/tart/VMStorageOCI.swift | 5 ++--- 3 files changed, 38 insertions(+), 11 deletions(-) diff --git a/Sources/tart/OCI/Layerizer/DiskV2.swift b/Sources/tart/OCI/Layerizer/DiskV2.swift index b926a429..78180a64 100644 --- a/Sources/tart/OCI/Layerizer/DiskV2.swift +++ b/Sources/tart/OCI/Layerizer/DiskV2.swift @@ -44,7 +44,7 @@ class DiskV2: Disk { let compressedData = try (data as NSData).compressed(using: .lz4) as Data let compressedDataDigest = Digest.hash(compressedData) - try await retry(maxAttempts: 5, backoff: .exponentialWithFullJitter(baseDelay: .seconds(5), maxDelay: .seconds(60))) { + try await retry(maxAttempts: 5) { if try await !registry.blobExists(compressedDataDigest) { _ = try await registry.pushBlob(fromData: compressedData, chunkSizeMb: chunkSizeMb, digest: compressedDataDigest) } @@ -208,11 +208,26 @@ class DiskV2: Disk { diskWritingOffset = try zeroSkippingWrite(disk, rdisk, fsBlockSize, diskWritingOffset, data) } - try await registry.pullBlob(diskLayer.digest) { data in - try filter.write(data) + var rangeStart: Int64 = 0 - // Update the progress - progress.completedUnitCount += Int64(data.count) + try await retry(maxAttempts: 5) { + try await registry.pullBlob(diskLayer.digest, rangeStart: rangeStart) { data in + try filter.write(data) + + // Update the progress + progress.completedUnitCount += Int64(data.count) + + // Update the current range start + rangeStart += Int64(data.count) + } + } recoverFromFailure: { error in + if error is URLError { + print("Error pulling disk layer \(index + 1): \"\(error.localizedDescription)\", attempting to re-try...") + + return .retry + } + + return .throw } try filter.finalize() diff --git a/Sources/tart/OCI/Registry.swift b/Sources/tart/OCI/Registry.swift index a9acbe5a..e4635364 100644 --- a/Sources/tart/OCI/Registry.swift +++ b/Sources/tart/OCI/Registry.swift @@ -20,6 +20,7 @@ enum HTTPCode: Int { case Ok = 200 case Created = 201 case Accepted = 202 + case PartialContent = 206 case Unauthorized = 401 case NotFound = 404 } @@ -263,9 +264,21 @@ class Registry { } } - public func pullBlob(_ digest: String, handler: (Data) async throws -> Void) async throws { - let (channel, response) = try await channelRequest(.GET, endpointURL("\(namespace)/blobs/\(digest)"), viaFile: true) - if response.statusCode != HTTPCode.Ok.rawValue { + public func pullBlob(_ digest: String, rangeStart: Int64 = 0, handler: (Data) async throws -> Void) async throws { + var expectedStatusCode = HTTPCode.Ok + var headers: [String: String] = [:] + + // Send Range header and expect HTTP 206 in return + // + // However, do not send Range header at all when rangeStart is 0, + // because it makes no sense and we might get HTTP 200 in return + if rangeStart != 0 { + expectedStatusCode = HTTPCode.PartialContent + headers["Range"] = "bytes=\(rangeStart)-" + } + + let (channel, response) = try await channelRequest(.GET, endpointURL("\(namespace)/blobs/\(digest)"), headers: headers, viaFile: true) + if response.statusCode != expectedStatusCode.rawValue { let body = try await channel.asData().asText() throw RegistryError.UnexpectedHTTPStatusCode(when: "pulling blob", code: response.statusCode, details: body) diff --git a/Sources/tart/VMStorageOCI.swift b/Sources/tart/VMStorageOCI.swift index d3edee60..5cd51f70 100644 --- a/Sources/tart/VMStorageOCI.swift +++ b/Sources/tart/VMStorageOCI.swift @@ -196,7 +196,7 @@ class VMStorageOCI: PrunableStorage { } try await withTaskCancellationHandler(operation: { - try await retry(maxAttempts: 5, backoff: .exponentialWithFullJitter(baseDelay: .seconds(5), maxDelay: .seconds(60))) { + try await retry(maxAttempts: 5) { // Choose the best base image which has the most deduplication ratio let localLayerCache = try await chooseLocalLayerCache(name, manifest, registry) @@ -213,8 +213,7 @@ class VMStorageOCI: PrunableStorage { try await tmpVMDir.pullFromRegistry(registry: registry, manifest: manifest, concurrency: concurrency, localLayerCache: localLayerCache, deduplicate: deduplicate) } recoverFromFailure: { error in if error is URLError { - print("Error: \(error.localizedDescription)") - print("Attempting to re-try...") + print("Error pulling image: \"\(error.localizedDescription)\", attempting to re-try...") return .retry }