Skip to content

Commit 5833503

Browse files
authored
Upload with xet by default (#1761)
Enable xet by default when available for uploads Use the new git xet transfer to remove the API call to model/dataset/space info: ``` `${params.hubUrl ?? HUB_URL}/api/${repoId.type}s/${repoId.name}?expand[]=xetEnabled`, ``` cc @Wauplin @hanouticelina for viz ⬆️ We could also save more API calls by using the token returned by the git xet transfer - and remove a hardcoded value for the write token url. But I'm waiting until the git `xet` transfer protocol is stable before doing so - see also [this comment](huggingface-internal/moon-landing#14572 (comment)) - for now only using presence/absence of xet transfer in the response cc @seanses cc @jsulz @assafvayner for viz --- For downloads, would like to do some perf tuning / analysis before enabling by default
1 parent 04132de commit 5833503

File tree

6 files changed

+204
-173
lines changed

6 files changed

+204
-173
lines changed

packages/hub/src/lib/commit.ts

Lines changed: 65 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,13 @@ export type CommitParams = {
120120
*/
121121
fetch?: typeof fetch;
122122
abortSignal?: AbortSignal;
123-
// Credentials are optional due to custom fetch functions or cookie auth
123+
/**
124+
* @default true
125+
*
126+
* Use xet protocol: https://huggingface.co/blog/xet-on-the-hub to upload, rather than a basic S3 PUT
127+
*/
124128
useXet?: boolean;
129+
// Credentials are optional due to custom fetch functions or cookie auth
125130
} & Partial<CredentialsParams>;
126131

127132
export interface CommitOutput {
@@ -165,24 +170,7 @@ export async function* commitIter(params: CommitParams): AsyncGenerator<CommitPr
165170
const repoId = toRepoId(params.repo);
166171
yield { event: "phase", phase: "preuploading" };
167172

168-
let useXet = params.useXet;
169-
if (useXet) {
170-
const info = await (params.fetch ?? fetch)(
171-
`${params.hubUrl ?? HUB_URL}/api/${repoId.type}s/${repoId.name}?expand[]=xetEnabled`,
172-
{
173-
headers: {
174-
...(accessToken && { Authorization: `Bearer ${accessToken}` }),
175-
},
176-
}
177-
);
178-
179-
if (!info.ok) {
180-
throw await createApiError(info);
181-
}
182-
183-
const data = await info.json();
184-
useXet = !!data.xetEnabled;
185-
}
173+
let useXet = params.useXet ?? true;
186174

187175
const lfsShas = new Map<string, string | null>();
188176

@@ -206,10 +194,6 @@ export async function* commitIter(params: CommitParams): AsyncGenerator<CommitPr
206194
const allOperations = (
207195
await Promise.all(
208196
params.operations.map(async (operation) => {
209-
if (operation.operation === "edit" && !useXet) {
210-
throw new Error("Edit operation is not supported when Xet is disabled");
211-
}
212-
213197
if (operation.operation === "edit") {
214198
// Convert EditFile operation to a file operation with SplicedBlob
215199
const splicedBlob = SplicedBlob.create(
@@ -325,7 +309,7 @@ export async function* commitIter(params: CommitParams): AsyncGenerator<CommitPr
325309
const payload: ApiLfsBatchRequest = {
326310
operation: "upload",
327311
// multipart is a custom protocol for HF
328-
transfers: ["basic", "multipart"],
312+
transfers: ["basic", "multipart", ...(useXet ? ["xet" as const] : [])],
329313
hash_algo: "sha_256",
330314
...(!params.isPullRequest && {
331315
ref: {
@@ -363,6 +347,12 @@ export async function* commitIter(params: CommitParams): AsyncGenerator<CommitPr
363347

364348
const shaToOperation = new Map(operations.map((op, i) => [shas[i], op]));
365349

350+
if (useXet && json.transfer !== "xet") {
351+
useXet = false;
352+
}
353+
let xetRefreshWriteTokenUrl: string | undefined;
354+
let xetSessionId: string | undefined;
355+
366356
if (useXet) {
367357
// First get all the files that are already uploaded out of the way
368358
for (const obj of json.objects) {
@@ -386,6 +376,17 @@ export async function* commitIter(params: CommitParams): AsyncGenerator<CommitPr
386376
progress: 1,
387377
state: "uploading",
388378
};
379+
} else {
380+
xetRefreshWriteTokenUrl = obj.actions.upload.href;
381+
// Also, obj.actions.upload.header: {
382+
// X-Xet-Cas-Url: string;
383+
// X-Xet-Access-Token: string;
384+
// X-Xet-Token-Expiration: string;
385+
// X-Xet-Session-Id: string;
386+
// }
387+
const headers = new Headers(obj.actions.upload.header);
388+
xetSessionId = headers.get("X-Xet-Session-Id") ?? undefined;
389+
// todo: use other data, like x-xet-cas-url, ...
389390
}
390391
}
391392
const source = (async function* () {
@@ -395,43 +396,50 @@ export async function* commitIter(params: CommitParams): AsyncGenerator<CommitPr
395396
continue;
396397
}
397398
abortSignal?.throwIfAborted();
398-
399399
yield { content: op.content, path: op.path, sha256: obj.oid };
400400
}
401401
})();
402-
const sources = splitAsyncGenerator(source, 5);
403-
yield* eventToGenerator((yieldCallback, returnCallback, rejectCallback) =>
404-
Promise.all(
405-
sources.map(async function (source) {
406-
for await (const event of uploadShards(source, {
407-
fetch: params.fetch,
408-
accessToken,
409-
hubUrl: params.hubUrl ?? HUB_URL,
410-
repo: repoId,
411-
// todo: maybe leave empty if PR?
412-
rev: params.branch ?? "main",
413-
isPullRequest: params.isPullRequest,
414-
yieldCallback: (event) => yieldCallback({ ...event, state: "uploading" }),
415-
})) {
416-
if (event.event === "file") {
417-
yieldCallback({
418-
event: "fileProgress" as const,
419-
path: event.path,
420-
progress: 1,
421-
state: "uploading" as const,
422-
});
423-
} else if (event.event === "fileProgress") {
424-
yieldCallback({
425-
event: "fileProgress" as const,
426-
path: event.path,
427-
progress: event.progress,
428-
state: "uploading" as const,
429-
});
402+
if (xetRefreshWriteTokenUrl) {
403+
const xetRefreshWriteTokenUrlFixed = xetRefreshWriteTokenUrl;
404+
const sources = splitAsyncGenerator(source, 5);
405+
yield* eventToGenerator((yieldCallback, returnCallback, rejectCallback) =>
406+
Promise.all(
407+
sources.map(async function (source) {
408+
for await (const event of uploadShards(source, {
409+
fetch: params.fetch,
410+
accessToken,
411+
hubUrl: params.hubUrl ?? HUB_URL,
412+
repo: repoId,
413+
xetRefreshWriteTokenUrl: xetRefreshWriteTokenUrlFixed,
414+
xetSessionId,
415+
// todo: maybe leave empty if PR?
416+
rev: params.branch ?? "main",
417+
isPullRequest: params.isPullRequest,
418+
yieldCallback: (event) => yieldCallback({ ...event, state: "uploading" }),
419+
})) {
420+
if (event.event === "file") {
421+
// No need: uploading xorbs already sent a fileProgress event with progress 1
422+
// yieldCallback({
423+
// event: "fileProgress" as const,
424+
// path: event.path,
425+
// progress: 1,
426+
// state: "uploading" as const,
427+
// });
428+
} else if (event.event === "fileProgress") {
429+
yieldCallback({
430+
event: "fileProgress" as const,
431+
path: event.path,
432+
progress: event.progress,
433+
state: "uploading" as const,
434+
});
435+
}
430436
}
431-
}
432-
})
433-
).then(() => returnCallback(undefined), rejectCallback)
434-
);
437+
})
438+
).then(() => returnCallback(undefined), rejectCallback)
439+
);
440+
} else {
441+
// No LFS file to upload
442+
}
435443
} else {
436444
yield* eventToGenerator<CommitProgressEvent, void>((yieldCallback, returnCallback, rejectCallback) => {
437445
return promisesQueueStreaming(

0 commit comments

Comments
 (0)