Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Xet download workflow #2875

Draft
wants to merge 19 commits into
base: xet-integration
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 158 additions & 14 deletions src/huggingface_hub/file_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
OfflineModeIsEnabled,
SoftTemporaryDirectory,
WeakFileLock,
XetMetadata,
build_hf_headers,
get_fastai_version, # noqa: F401 # for backward compatibility
get_fastcore_version, # noqa: F401 # for backward compatibility
Expand All @@ -56,6 +57,8 @@
is_tf_available, # noqa: F401 # for backward compatibility
is_torch_available, # noqa: F401 # for backward compatibility
logging,
parse_xet_headers,
refresh_xet_metadata,
reset_sessions,
tqdm,
validate_hf_hub_args,
Expand Down Expand Up @@ -160,12 +163,15 @@ class HfFileMetadata:
size (`size`):
Size of the file. In case of an LFS file, contains the size of the actual
LFS file, not the pointer.
xet_metadata (`XetMetadata`, *optional*):
Xet metadata for the file. This is only set if the file is stored using Xet storage.
"""

commit_hash: Optional[str]
etag: Optional[str]
location: str
size: Optional[int]
xet_metadata: Optional[XetMetadata] = None


@validate_hf_hub_args
Expand Down Expand Up @@ -487,6 +493,118 @@ def http_get(
)


def xet_get(
incomplete_path: Path,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
incomplete_path: Path,
*,
incomplete_path: Path,

(nit) let's force keyword argument, easier to change things in the future

xet_metadata: XetMetadata,
headers: Dict[str, str],
expected_size: Optional[int] = None,
displayed_filename: Optional[str] = None,
_tqdm_bar: Optional[tqdm] = None,
) -> None:
"""
Download a file using Xet storage service.

Args:
incomplete_path (`Path`):
The path to the file to download.
xet_metadata (`XetMetadata`):
The metadata needed to make the request to the xet storage service.
headers (`Dict[str, str]`):
The headers to send to the xet storage service.
expected_size (`int`, *optional*):
The expected size of the file to download. If set, the download will raise an error if the size of the
received content is different from the expected one.
displayed_filename (`str`, *optional*):
The filename of the file that is being downloaded. Value is used only to display a nice progress bar. If
not set, the filename is guessed from the URL or the `Content-Disposition` header.

**How it works:**
The file download system uses Xet storage, which is a content-addressable storage system that breaks files into chunks
for efficient storage and transfer.

`hf_xet.download_files` manages downloading files by:
- Taking a list of files to download (each with its unique content hash)
- Connecting to a storage server (CAS server) that knows how files are chunked
- Using authentication to ensure secure access
- Providing progress updates during download

Authentication works by regularly refreshing access tokens through `refresh_xet_metadata` to maintain a valid
connection to the storage server.

The download process works like this:
1. Creates a local cache folder at `~/.cache/huggingface/xet/chunk-cache` to store reusable file chunks
2. Downloads files in parallel:
2.1. Prepares to write the file to disk
2.2. Asks the server "how is this file split into chunks?" using the file's unique hash
The server responds with:
- Which chunks make up the complete file
- Where each chunk can be downloaded from
2.3. For each needed chunk:
- Checks if we already have it in our local cache
- If not, downloads it from cloud storage (S3)
- Saves it to cache for future use
- Assembles the chunks in order to recreate the original file
Comment on lines +535 to +546
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
1. Creates a local cache folder at `~/.cache/huggingface/xet/chunk-cache` to store reusable file chunks
2. Downloads files in parallel:
2.1. Prepares to write the file to disk
2.2. Asks the server "how is this file split into chunks?" using the file's unique hash
The server responds with:
- Which chunks make up the complete file
- Where each chunk can be downloaded from
2.3. For each needed chunk:
- Checks if we already have it in our local cache
- If not, downloads it from cloud storage (S3)
- Saves it to cache for future use
- Assembles the chunks in order to recreate the original file
1. Create a local cache folder at `~/.cache/huggingface/xet/chunk-cache` to store reusable file chunks
2. Download files in parallel:
2.1. Prepare to write the file to disk
2.2. Ask the server "how is this file split into chunks?" using the file's unique hash
The server responds with:
- Which chunks make up the complete file
- Where each chunk can be downloaded from
2.3. For each needed chunk:
- Check if we already have it in our local cache
- If not, download it from cloud storage (S3)
- Save it to cache for future use
- Assemble the chunks in order to recreate the original file

(personal preference for "instructions")


"""
try:
from hf_xet import PyPointerFile, download_files # type: ignore[no-redef]
except ImportError:
raise ValueError(
"To use optimized download using Xet storage, you need to install the hf_xet package. "
"Try `pip install hf_xet`."
)

def token_refresher() -> Tuple[str, int]:
new_xet_metadata = refresh_xet_metadata(xet_metadata=xet_metadata, headers=headers)
if new_xet_metadata is None:
raise ValueError("Failed to refresh token using xet metadata.")
return new_xet_metadata.access_token, new_xet_metadata.expiration_unix_epoch

pointer_files = [
PyPointerFile(path=str(incomplete_path.absolute()), hash=xet_metadata.file_hash, filesize=expected_size)
]

if not displayed_filename:
displayed_filename = incomplete_path.name

# Truncate filename if too long to display
if len(displayed_filename) > 40:
displayed_filename = f"{displayed_filename[:40]}(…)"

# Stream file to buffer
progress_cm: tqdm = (
tqdm( # type: ignore[assignment]
unit="B",
unit_scale=True,
total=expected_size,
initial=0,
desc=displayed_filename,
disable=True if (logger.getEffectiveLevel() == logging.NOTSET) else None,
# ^ set `disable=None` rather than `disable=False` by default to disable progress bar when no TTY attached
# see https://github.com/huggingface/huggingface_hub/pull/2000
name="huggingface_hub.xet_get",
)
if _tqdm_bar is None
else contextlib.nullcontext(_tqdm_bar)
# ^ `contextlib.nullcontext` mimics a context manager that does nothing
# Makes it easier to use the same code path for both cases but in the later
# case, the progress bar is not closed when exiting the context manager.
)
Comment on lines +574 to +592
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be abstracted in an utility somewhere (same as http_get)


with progress_cm as progress:

def progress_updater(progress_bytes: float):
progress.update(progress_bytes)

download_files(
pointer_files,
endpoint=xet_metadata.endpoint,
token_info=(xet_metadata.access_token, xet_metadata.expiration_unix_epoch),
token_refresher=token_refresher,
progress_updater=[progress_updater],
)


def _normalize_etag(etag: Optional[str]) -> Optional[str]:
"""Normalize ETag HTTP header, so it can be used to create nice filepaths.

Expand Down Expand Up @@ -922,7 +1040,7 @@ def _hf_hub_download_to_cache_dir(

# Try to get metadata (etag, commit_hash, url, size) from the server.
# If we can't, a HEAD request error is returned.
(url_to_download, etag, commit_hash, expected_size, head_call_error) = _get_metadata_or_catch_error(
(url_to_download, etag, commit_hash, expected_size, xet_metadata, head_call_error) = _get_metadata_or_catch_error(
repo_id=repo_id,
filename=filename,
repo_type=repo_type,
Expand Down Expand Up @@ -1017,6 +1135,8 @@ def _hf_hub_download_to_cache_dir(
expected_size=expected_size,
filename=filename,
force_download=force_download,
etag=etag,
xet_metadata=xet_metadata,
)
if not os.path.exists(pointer_path):
_create_symlink(blob_path, pointer_path, new_blob=True)
Expand Down Expand Up @@ -1067,7 +1187,7 @@ def _hf_hub_download_to_local_dir(
return str(paths.file_path)

# Local file doesn't exist or commit_hash doesn't match => we need the etag
(url_to_download, etag, commit_hash, expected_size, head_call_error) = _get_metadata_or_catch_error(
(url_to_download, etag, commit_hash, expected_size, xet_metadata, head_call_error) = _get_metadata_or_catch_error(
repo_id=repo_id,
filename=filename,
repo_type=repo_type,
Expand Down Expand Up @@ -1144,6 +1264,8 @@ def _hf_hub_download_to_local_dir(
expected_size=expected_size,
filename=filename,
force_download=force_download,
etag=etag,
xet_metadata=xet_metadata,
)

write_download_metadata(local_dir=local_dir, filename=filename, commit_hash=commit_hash, etag=etag)
Expand Down Expand Up @@ -1317,6 +1439,7 @@ def get_hf_file_metadata(
size=_int_or_none(
r.headers.get(constants.HUGGINGFACE_HEADER_X_LINKED_SIZE) or r.headers.get("Content-Length")
),
xet_metadata=parse_xet_headers(r.headers), # type: ignore
)


Expand All @@ -1336,10 +1459,10 @@ def _get_metadata_or_catch_error(
storage_folder: Optional[str] = None, # only used to store `.no_exists` in cache
) -> Union[
# Either an exception is caught and returned
Tuple[None, None, None, None, Exception],
Tuple[None, None, None, None, None, Exception],
# Or the metadata is returned as
# `(url_to_download, etag, commit_hash, expected_size, None)`
Tuple[str, str, str, int, None],
Tuple[str, str, str, int, Optional[XetMetadata], None],
]:
"""Get metadata for a file on the Hub, safely handling network issues.

Expand All @@ -1356,6 +1479,7 @@ def _get_metadata_or_catch_error(
None,
None,
None,
None,
OfflineModeIsEnabled(
f"Cannot access file since 'local_files_only=True' as been set. (repo_id: {repo_id}, repo_type: {repo_type}, revision: {revision}, filename: {filename})"
),
Expand All @@ -1367,6 +1491,7 @@ def _get_metadata_or_catch_error(
commit_hash: Optional[str] = None
expected_size: Optional[int] = None
head_error_call: Optional[Exception] = None
xet_metadata: Optional[XetMetadata] = None

# Try to get metadata from the server.
# Do not raise yet if the file is not found or not accessible.
Expand Down Expand Up @@ -1414,13 +1539,15 @@ def _get_metadata_or_catch_error(
if expected_size is None:
raise FileMetadataError("Distant resource does not have a Content-Length.")

xet_metadata = metadata.xet_metadata

# In case of a redirect, save an extra redirect on the request.get call,
# and ensure we download the exact atomic version even if it changed
# between the HEAD and the GET (unlikely, but hey).
#
# If url domain is different => we are downloading from a CDN => url is signed => don't send auth
# If url domain is the same => redirect due to repo rename AND downloading a regular file => keep auth
if url != metadata.location:
if xet_metadata is not None and url != metadata.location:
url_to_download = metadata.location
if urlparse(url).netloc != urlparse(metadata.location).netloc:
# Remove authorization header when downloading a LFS blob
Expand Down Expand Up @@ -1458,7 +1585,7 @@ def _get_metadata_or_catch_error(
if not (local_files_only or etag is not None or head_error_call is not None):
raise RuntimeError("etag is empty due to uncovered problems")

return (url_to_download, etag, commit_hash, expected_size, head_error_call) # type: ignore [return-value]
return (url_to_download, etag, commit_hash, expected_size, xet_metadata, head_error_call) # type: ignore [return-value]


def _raise_on_head_call_error(head_call_error: Exception, force_download: bool, local_files_only: bool) -> NoReturn:
Expand Down Expand Up @@ -1502,6 +1629,8 @@ def _download_to_tmp_and_move(
expected_size: Optional[int],
filename: str,
force_download: bool,
etag: Optional[str],
xet_metadata: Optional[XetMetadata],
) -> None:
"""Download content from a URL to a destination path.

Expand Down Expand Up @@ -1544,14 +1673,29 @@ def _download_to_tmp_and_move(
_check_disk_space(expected_size, incomplete_path.parent)
_check_disk_space(expected_size, destination_path.parent)

http_get(
url_to_download,
f,
proxies=proxies,
resume_size=resume_size,
headers=headers,
expected_size=expected_size,
)
if xet_metadata is not None and xet_metadata.file_hash is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should either:

  1. make hf_xet a default dependency
  2. or use the xet-path only if hf_xet is installed.

For now, I'd go with 2. so it's an opt-in process. The problem with current workflow (AFAIU) is that if hf_xet is not installed and a repo becomes "xet-enabled" then users won't be able to download things, even though they haven't changed anything to their setup.

Feel free to ignore if I misunderstood it 😬

Copy link
Collaborator

@bpronan bpronan Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for using the xet-path only if hf_xet is installed.

If we ensure that it's installed and the repo becomes "xet-enabled", the user can still download the content even without hf_xet installed. We have backwards compatibility built into our service to support these cases. The service streams the entire file for download if you hit that endpoint.

logger.info("Xet Storage is enabled for this repo. Downloading file from Xet Storage..")
xet_get(
incomplete_path,
xet_metadata=xet_metadata,
headers=headers,
expected_size=expected_size,
displayed_filename=filename,
)

# TODO: xetpoc - the http_get path is building this out, so we're replicating that logic here
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer if we could either address this TODO or remove it if the logic fits in the greater download action.

parent_dir = destination_path.parent
if not parent_dir.exists():
parent_dir.mkdir(parents=True, exist_ok=True)
else:
http_get(
url_to_download,
f,
proxies=proxies,
resume_size=resume_size,
headers=headers,
expected_size=expected_size,
)

logger.info(f"Download complete. Moving file to {destination_path}")
_chmod_and_move(incomplete_path, destination_path)
Expand Down
5 changes: 5 additions & 0 deletions src/huggingface_hub/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,9 @@
from ._telemetry import send_telemetry
from ._typing import is_jsonable, is_simple_optional_type, unwrap_simple_optional_type
from ._validators import smoothly_deprecate_use_auth_token, validate_hf_hub_args, validate_repo_id
from ._xet import (
XetMetadata,
parse_xet_headers,
refresh_xet_metadata,
)
Comment on lines +110 to +114
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
from ._xet import (
XetMetadata,
parse_xet_headers,
refresh_xet_metadata,
)
from ._xet import XetMetadata, parse_xet_headers, refresh_xet_metadata

(nit)

from .tqdm import are_progress_bars_disabled, disable_progress_bars, enable_progress_bars, tqdm, tqdm_stream_file
Loading
Loading