Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/7218.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implement atomic hashing and chunking in PulpExport
42 changes: 42 additions & 0 deletions pulp_file/tests/functional/api/test_pulp_export.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import json
from pathlib import Path

import pytest
import uuid

Expand Down Expand Up @@ -420,3 +422,43 @@ def test_export_with_meta(pulpcore_bindings, pulp_export_factory, full_pulp_expo
assert meta_json.get("purpose") == "export"
# overridden field check
assert meta_json.get("checksum_type") == "crc32"


@pytest.mark.parallel
def test_export_chunk_ordering_and_naming(
pulp_exporter_factory,
pulp_export_factory,
three_synced_repositories,
):
exporter = pulp_exporter_factory(repositories=[three_synced_repositories[0]])
chunk_size_bytes = 100
body = {"chunk_size": f"{chunk_size_bytes}B"}
export = pulp_export_factory(exporter, body)

all_paths = [Path(p) for p in export.output_file_info.keys()]
tar_chunks = [p for p in all_paths if ".tar." in p.name]

assert len(tar_chunks) > 1, f"Expected multiple chunks for {chunk_size_bytes}B limit."

for index, path in enumerate(tar_chunks):
expected_suffix = f"{index:04d}"

assert path.name.endswith(expected_suffix), f"Chunk {path} missing suffix {expected_suffix}"
assert path.exists(), f"Chunk file {path} was not found on disk."

if index < len(tar_chunks) - 1:
assert path.stat().st_size == chunk_size_bytes

toc_path = Path(export.toc_info["file"])
with toc_path.open("r", encoding="utf-8") as f:
toc_data = json.load(f)

toc_filenames = list(toc_data["files"].keys())
expected_filenames = [p.name for p in tar_chunks]

assert (
toc_filenames == expected_filenames
), f"TOC order mismatch.\nExpected: {expected_filenames}\nActual: {toc_filenames}"

assert toc_data["meta"]["chunk_size"] == chunk_size_bytes
assert toc_data["meta"]["checksum_type"] == "crc32"
66 changes: 17 additions & 49 deletions pulpcore/app/tasks/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import logging
import os
import os.path
import subprocess
import tarfile

from distutils.util import strtobool
Expand All @@ -28,7 +27,7 @@
from pulpcore.app.models.content import ContentArtifact
from pulpcore.app.serializers import PulpExportSerializer

from pulpcore.app.util import compute_file_hash, Crc32Hasher
from pulpcore.app.util import compute_file_hash, Crc32Hasher, HashingFileWriter
from pulpcore.app.importexport import (
export_versions,
export_artifacts,
Expand Down Expand Up @@ -394,55 +393,24 @@ def pulp_export(exporter_pk, params):
if not path.is_dir():
path.mkdir(mode=0o775, parents=True)

rslts = {}
if the_export.validated_chunk_size:
# write it into chunks
with subprocess.Popen(
[
"split",
"-a",
"4",
"-b",
str(the_export.validated_chunk_size),
"-d",
"-",
tarfile_fp + ".",
],
stdin=subprocess.PIPE,
) as split_process:
try:
with tarfile.open(
tarfile_fp,
"w|",
fileobj=split_process.stdin,
) as tar:
_do_export(pulp_exporter, tar, the_export)
except Exception:
# no matter what went wrong, we can't trust the files we (may have) created.
# Delete the ones we can find and pass the problem up.
for pathname in glob(tarfile_fp + ".*"):
os.remove(pathname)
raise
# compute the hashes
paths = sorted([str(Path(p)) for p in glob(tarfile_fp + ".*")])
for a_file in paths:
a_hash = compute_file_hash(a_file, hasher=hasher())
rslts[a_file] = a_hash
writer = HashingFileWriter(
base_path=tarfile_fp,
chunk_size=the_export.validated_chunk_size or 0,
hasher_cls=hasher, # type: ignore
)

else:
# write into the file
try:
with tarfile.open(tarfile_fp, "w") as tar:
try:
with writer:
with tarfile.open(fileobj=writer, mode="w|") as tar:
_do_export(pulp_exporter, tar, the_export)
except Exception:
# no matter what went wrong, we can't trust the file we created.
# Delete it if it exists and pass the problem up.
if os.path.exists(tarfile_fp):
os.remove(tarfile_fp)
raise
# compute the hash
tarfile_hash = compute_file_hash(tarfile_fp, hasher=hasher())
rslts[tarfile_fp] = tarfile_hash
except Exception:
# no matter what went wrong, we can't trust the files we (may have) created.
# Delete the ones we can find and pass the problem up.
for pathname in glob(tarfile_fp + ".*"):
os.remove(pathname)
raise

rslts = writer.results

# store the outputfile/hash info
the_export.output_file_info = rslts
Expand Down
156 changes: 156 additions & 0 deletions pulpcore/app/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
import os
import socket
import tempfile
from _hashlib import HASH
from io import RawIOBase
from pathlib import Path
from types import TracebackType
from typing import Self, IO, Callable

import gnupg

from functools import lru_cache
Expand Down Expand Up @@ -649,3 +655,153 @@ def normalize_http_status(status):
return "5xx"
else:
return ""


class HashingFileWriter(RawIOBase):
"""
A file-like object that handles writing data to disk with simultaneous
hashing. It supports both single-file writing and chunk-based splitting.
"""

def __init__(
self,
base_path: str | Path,
hasher_cls: Callable[[], HASH],
chunk_size: int = 0,
suffix_length: int = 4,
dir_mode: int = 0o775,
) -> None:
"""
Args:
base_path: The target file path.
hasher_cls: The hashing class to use (e.g., hashlib.sha256).
chunk_size: Max bytes per file. 0 (or less) disables splitting.
suffix_length: Length of the numeric suffix for split files (e.g., .0001).
dir_mode: The octal file permissions to use when creating parent
directories (e.g., 0o775).
"""
super().__init__()

self.base_path = Path(base_path)
self.chunk_size = max(0, chunk_size)
self.hasher_cls = hasher_cls
self.suffix_length = suffix_length
self.dir_mode = dir_mode

self._file_index: int = 0
self._current_file: IO[bytes] | None = None
self._current_hasher: HASH | None = None
self._current_file_path: Path | None = None

self._current_chunk_written: int = 0
self._total_written: int = 0

# Maps file_path string -> hash digest
self.results: dict[str, str] = {}

@property
def is_splitting(self) -> bool:
return self.chunk_size > 0

def _get_filename(self) -> Path:
"""Determines the filename based on splitting mode."""
if not self.is_splitting:
return self.base_path

suffix = f".{self._file_index:0{self.suffix_length}d}"
return self.base_path.with_name(f"{self.base_path.name}{suffix}")

def _rotate_file(self) -> None:
"""Closes the current file and opens the next one."""
self._close_current_file()

self._current_file_path = self._get_filename()
self._current_file_path.parent.mkdir(parents=True, exist_ok=True, mode=self.dir_mode)

self._current_file = self._current_file_path.open("wb")
self._current_hasher = self.hasher_cls()
self._current_chunk_written = 0

if self.is_splitting:
self._file_index += 1

def _close_current_file(self) -> None:
"""Finalizes the current file and stores its hash."""
if self._current_file:
self._current_file.close()

if self._current_hasher and self._current_file_path:
self.results[str(self._current_file_path)] = self._current_hasher.hexdigest()

self._current_file = None
self._current_hasher = None

def write(self, data: bytes) -> int:
# Early exit for empty writes to prevent creating empty files
if not data:
return 0

if not self._current_file:
self._rotate_file()

# If splitting is disabled, strict write without rotation logic
if not self.is_splitting:
assert self._current_file is not None
assert self._current_hasher is not None
self._current_file.write(data)
self._current_hasher.update(data)
self._total_written += len(data)
return len(data)

# Splitting logic
buffer = memoryview(data)
bytes_remaining = len(buffer)
cursor = 0

while bytes_remaining > 0:
assert self._current_file is not None
assert self._current_hasher is not None

space_left = self.chunk_size - self._current_chunk_written

# If the current file is full, rotate immediately before writing more
if space_left <= 0:
self._rotate_file()
space_left = self.chunk_size

to_write = min(bytes_remaining, space_left)
chunk = buffer[cursor : cursor + to_write]

self._current_file.write(chunk)
self._current_hasher.update(chunk)

self._current_chunk_written += to_write
self._total_written += to_write
cursor += to_write
bytes_remaining -= to_write

return len(data)

def writable(self) -> bool:
return True

def tell(self) -> int:
return self._total_written

def flush(self) -> None:
if self._current_file:
self._current_file.flush()

def close(self) -> None:
self._close_current_file()

def __enter__(self) -> Self:
return self

def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
self.close()
Loading