diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 13e12f0..6aa9898 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -8,6 +8,12 @@ on: branches: - main +# These permissions are needed to interact with AWS S3 via GitHub's OIDC Token endpoint +permissions: + id-token: write + contents: read + pull-requests: read + jobs: unit-tests: runs-on: ${{ matrix.os }} @@ -56,7 +62,33 @@ jobs: pip install setuptools - name: Install cdx_toolkit - run: pip install .[test] + run: pip install .[all] + + - name: Configure AWS credentials from OIDC (disabled for forks) + if: github.event.pull_request.head.repo.full_name == github.repository || github.event_name == 'push' + uses: aws-actions/configure-aws-credentials@v4 + with: + role-to-assume: arn:aws:iam::837454214164:role/GitHubActions-Role + aws-region: us-east-1 + + - name: Disable S3 unit tests for Python 3.8 (botocore requires Python 3.9+) + if: ${{ startsWith(matrix.python-version, '3.8') }} + uses: actions/github-script@v7 + with: + script: | + core.exportVariable('CDXT_DISABLE_S3_TESTS', '1') + - name: Set environment variables for faster unit tests (requests are mocked) + uses: actions/github-script@v7 + with: + script: | + core.exportVariable('CDXT_MAX_ERRORS', '2') + core.exportVariable('CDXT_WARNING_AFTER_N_ERRORS', '2') + core.exportVariable('CDXT_DEFAULT_MIN_RETRY_INTERVAL', '0.01') + core.exportVariable('CDXT_CC_INDEX_MIN_RETRY_INTERVAL', '0.01') + core.exportVariable('CDXT_CC_DATA_MIN_RETRY_INTERVAL', '0.01') + core.exportVariable('CDXT_IA_MIN_RETRY_INTERVAL', '0.01') + core.exportVariable('DISABLE_ATHENA_TESTS', '1') + core.exportVariable('LOGLEVEL', 'DEBUG') - name: Lint code run: | @@ -70,3 +102,48 @@ jobs: uses: codecov/codecov-action@v4 with: token: ${{ secrets.CODECOV_TOKEN }} + + unit-tests-minimal: + runs-on: ${{ matrix.os }} + strategy: + fail-fast: true + matrix: + include: + - python-version: '3.9' + os: ubuntu-22.04 + - python-version: '3.14' + os: ubuntu-latest + + steps: + - name: checkout + uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + + - name: Install setuptools on python 3.12+ + if: ${{ matrix.python-version >= '3.12' }} + run: | + pip install setuptools + + - name: Install cdx_toolkit (minimal) + run: pip install .[test] + + - name: Set environment variables for faster unit tests (requests are mocked) + uses: actions/github-script@v7 + with: + script: | + core.exportVariable('CDXT_MAX_ERRORS', '2') + core.exportVariable('CDXT_WARNING_AFTER_N_ERRORS', '2') + core.exportVariable('CDXT_DEFAULT_MIN_RETRY_INTERVAL', '0.01') + core.exportVariable('CDXT_CC_INDEX_MIN_RETRY_INTERVAL', '0.01') + core.exportVariable('CDXT_CC_DATA_MIN_RETRY_INTERVAL', '0.01') + core.exportVariable('CDXT_IA_MIN_RETRY_INTERVAL', '0.01') + core.exportVariable('DISABLE_ATHENA_TESTS', '1') + core.exportVariable('LOGLEVEL', 'DEBUG') + + - name: test minimal + run: | + make test diff --git a/.gitignore b/.gitignore index bdb73f6..ab1b7ef 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,6 @@ __pycache__ cdx_toolkit.egg-info .coverage .eggs/ -tmp/ \ No newline at end of file +tmp/ +.env +.vscode diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 80b0f3e..d7f991b 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -10,6 +10,18 @@ Clone the repository, setup a virtual environment, and run the following command make install ``` +For S3-related features or tests, install optional dependencies: + +```bash +pip install -e ".[s3]" +``` + +To install everything (dev/test/S3), use: + +```bash +pip install -e ".[all]" +``` + ## Tests To test code changes, please run our test suite before submitting pull requests: @@ -33,14 +45,14 @@ If the remote APIs change, new mock data can be semi-automatically collected by ```bash # set environment variable (DISABLE_MOCK_RESPONSES should not be set) export SAVE_MOCK_RESPONSES=./tmp/mock_responses - + # run the test for what mock data should be saved to $SAVE_MOCK_RESPONSES//.jsonl pytest tests/test_cli.py::test_basics ``` ## Code format & linting -Please following the definitions from `.editorconfig` and `.flake8`. +Please following the definitions from `.editorconfig` and `.flake8`. To test the linting, run this command: @@ -54,4 +66,4 @@ You can also run the hooks manually on all files: ```bash pre-commit run --all-files -``` \ No newline at end of file +``` diff --git a/README.md b/README.md index 9d794ff..bab194d 100644 --- a/README.md +++ b/README.md @@ -24,6 +24,13 @@ $ pip install cdx_toolkit or clone this repo and use `pip install .` +Optional extras: + +``` +$ pip install cdx_toolkit[s3] # enable S3 and other remote filesystem support +$ pip install cdx_toolkit[all] # install all optional dependencies +``` + ## Command-line tools ``` @@ -275,7 +282,7 @@ cdx_toolkit has reached the beta-testing stage of development. ## Contributing -See [CONTRIBUTING.md](CONTRIBUTING.md) for guidelines on contributing +See [CONTRIBUTING.md](CONTRIBUTING.md) for guidelines on contributing and running tests. ## License diff --git a/cdx_toolkit/cli.py b/cdx_toolkit/cli.py index bfe7e23..79cad66 100644 --- a/cdx_toolkit/cli.py +++ b/cdx_toolkit/cli.py @@ -1,4 +1,4 @@ -from argparse import ArgumentParser +from argparse import ArgumentParser, Namespace import logging import csv import sys @@ -6,7 +6,9 @@ import os import cdx_toolkit -from cdx_toolkit.commoncrawl import normalize_crawl + +from cdx_toolkit.utils import get_version, setup_cdx_fetcher_and_kwargs + LOGGER = logging.getLogger(__name__) @@ -135,7 +137,7 @@ def main(args=None): cmd.func(cmd, cmdline) -def set_loglevel(cmd): +def set_loglevel(cmd: Namespace): loglevel = os.getenv('LOGLEVEL') or 'WARNING' if cmd.verbose: if cmd.verbose > 0: @@ -151,50 +153,7 @@ def set_loglevel(cmd): LOGGER.info('set loglevel to %s', str(loglevel)) -def get_version(): - return cdx_toolkit.__version__ - - -def setup(cmd): - kwargs = {} - kwargs['source'] = 'cc' if cmd.crawl else cmd.cc or cmd.ia or cmd.source or None - if kwargs['source'] is None: - raise ValueError('must specify --cc, --ia, or a --source') - if cmd.wb: - kwargs['wb'] = cmd.wb - if cmd.cc_mirror: - kwargs['cc_mirror'] = cmd.cc_mirror - if cmd.crawl: - kwargs['crawl'] = normalize_crawl([cmd.crawl]) # currently a string, not a list - if getattr(cmd, 'warc_download_prefix', None) is not None: - kwargs['warc_download_prefix'] = cmd.warc_download_prefix - - cdx = cdx_toolkit.CDXFetcher(**kwargs) - - kwargs = {} - if cmd.limit: - kwargs['limit'] = cmd.limit - if 'from' in vars(cmd) and vars(cmd)['from']: # python, uh, from is a reserved word - kwargs['from_ts'] = vars(cmd)['from'] - if cmd.to: - kwargs['to'] = cmd.to - if cmd.closest: - if not cmd.get: # pragma: no cover - LOGGER.info('note: --closest works best with --get') - kwargs['closest'] = cmd.closest - if cmd.filter: - kwargs['filter'] = cmd.filter - - if cmd.cmd == 'warc' and cmd.size: - kwargs['size'] = cmd.size - - if cmd.cmd == 'size' and cmd.details: - kwargs['details'] = cmd.details - - return cdx, kwargs - - -def winnow_fields(cmd, fields, obj): +def winnow_fields(cmd: Namespace, fields, obj): if cmd.all_fields: printme = obj else: @@ -202,7 +161,7 @@ def winnow_fields(cmd, fields, obj): return printme -def print_line(cmd, writer, printme): +def print_line(cmd: Namespace, writer, printme): if cmd.jsonl: print(json.dumps(printme, sort_keys=True)) elif writer: @@ -211,8 +170,8 @@ def print_line(cmd, writer, printme): print(', '.join([' '.join((k, printme[k])) for k in sorted(printme.keys())])) -def iterator(cmd, cmdline): - cdx, kwargs = setup(cmd) +def iterator(cmd: Namespace, cmdline): + cdx, kwargs = setup_cdx_fetcher_and_kwargs(cmd) fields = set(cmd.fields.split(',')) if cmd.csv: writer = csv.DictWriter(sys.stdout, fieldnames=sorted(list(fields))) @@ -232,8 +191,8 @@ def iterator(cmd, cmdline): print_line(cmd, writer, printme) -def warcer(cmd, cmdline): - cdx, kwargs = setup(cmd) +def warcer(cmd: Namespace, cmdline: str): + cdx, kwargs = setup_cdx_fetcher_and_kwargs(cmd) ispartof = cmd.prefix if cmd.subprefix: @@ -275,9 +234,15 @@ def warcer(cmd, cmdline): LOGGER.warning('revisit record being resolved for url %s %s', url, timestamp) writer.write_record(record) + writer.close() -def sizer(cmd, cmdline): - cdx, kwargs = setup(cmd) + +def sizer(cmd: Namespace, cmdline): + cdx, kwargs = setup_cdx_fetcher_and_kwargs(cmd) size = cdx.get_size_estimate(cmd.url, **kwargs) print(size) + + +if __name__ == "__main__": + main() diff --git a/cdx_toolkit/commoncrawl.py b/cdx_toolkit/commoncrawl.py index a604c41..c132db5 100644 --- a/cdx_toolkit/commoncrawl.py +++ b/cdx_toolkit/commoncrawl.py @@ -9,6 +9,8 @@ import json import logging +from cdx_toolkit.settings import CACHE_DIR, get_mock_time + from .myrequests import myrequests_get from .timeutils import ( time_to_timestamp, @@ -34,7 +36,7 @@ def normalize_crawl(crawl): def get_cache_names(cc_mirror): - cache = os.path.expanduser('~/.cache/cdx_toolkit/') + cache = os.path.expanduser(CACHE_DIR) filename = re.sub(r'[^\w]', '_', cc_mirror.replace('https://', '')) return cache, filename @@ -128,9 +130,13 @@ def apply_cc_defaults(params, crawl_present=False, now=None): LOGGER.info('to but no from_ts, setting from_ts=%s', params['from_ts']) else: if not now: - # now is passed in by tests. if not set, use actual now. - # XXX could be changed to mock - now = time.time() + # Check for test/override time first + mock_time = get_mock_time() + if mock_time: + now = mock_time + else: + # now is passed in by tests. if not set, use actual now. + now = time.time() params['from_ts'] = time_to_timestamp(now - year) LOGGER.info('no from or to, setting default 1 year ago from_ts=%s', params['from_ts']) else: diff --git a/cdx_toolkit/myrequests.py b/cdx_toolkit/myrequests.py index 408bebf..6e8d684 100644 --- a/cdx_toolkit/myrequests.py +++ b/cdx_toolkit/myrequests.py @@ -1,9 +1,18 @@ +from typing import Optional import requests import logging import time from urllib.parse import urlparse from . import __version__ +from .settings import ( + DEFAULT_MIN_RETRY_INTERVAL, + CC_DATA_MIN_RETRY_INTERVAL, + CC_INDEX_MIN_RETRY_INTERVAL, + IA_MIN_RETRY_INTERVAL, + MAX_ERRORS, + WARNING_AFTER_N_ERRORS, +) LOGGER = logging.getLogger(__name__) @@ -23,19 +32,19 @@ def dns_fatal(hostname): retry_info = { 'default': { 'next_fetch': 0, - 'minimum_interval': 3.0, + 'minimum_interval': DEFAULT_MIN_RETRY_INTERVAL, }, 'index.commoncrawl.org': { 'next_fetch': 0, - 'minimum_interval': 1.0, + 'minimum_interval': CC_INDEX_MIN_RETRY_INTERVAL, }, 'data.commoncrawl.org': { 'next_fetch': 0, - 'minimum_interval': 0.55, + 'minimum_interval': CC_DATA_MIN_RETRY_INTERVAL, }, 'web.archive.org': { 'next_fetch': 0, - 'minimum_interval': 6.0, + 'minimum_interval': IA_MIN_RETRY_INTERVAL, }, } @@ -60,12 +69,18 @@ def myrequests_get( headers=None, cdx=False, allow404=False, - raise_error_after_n_errors: int = 100, - raise_warning_after_n_errors: int = 10, + raise_error_after_n_errors: Optional[int] = None, + raise_warning_after_n_errors: Optional[int] = None, retry_max_sec: int = 60, ): t = time.time() + if raise_error_after_n_errors is None: + raise_error_after_n_errors = MAX_ERRORS + + if raise_warning_after_n_errors is None: + raise_warning_after_n_errors = WARNING_AFTER_N_ERRORS + hostname = urlparse(url).hostname next_fetch, minimum_interval = get_retries(hostname) diff --git a/cdx_toolkit/settings.py b/cdx_toolkit/settings.py new file mode 100644 index 0000000..ca09b58 --- /dev/null +++ b/cdx_toolkit/settings.py @@ -0,0 +1,17 @@ +import os + +CACHE_DIR = os.environ.get('CDXT_CACHE_DIR', '~/.cache/cdx_toolkit/') + +MAX_ERRORS = int(os.environ.get('CDXT_MAX_ERRORS', 100)) +WARNING_AFTER_N_ERRORS = int(os.environ.get('CDXT_WARNING_AFTER_N_ERRORS', 10)) + +DEFAULT_MIN_RETRY_INTERVAL = float(os.environ.get('CDXT_DEFAULT_MIN_RETRY_INTERVAL', 3.0)) +CC_INDEX_MIN_RETRY_INTERVAL = float(os.environ.get('CDXT_CC_INDEX_MIN_RETRY_INTERVAL', 1.0)) +CC_DATA_MIN_RETRY_INTERVAL = float(os.environ.get('CDXT_CC_DATA_MIN_RETRY_INTERVAL', 0.55)) +IA_MIN_RETRY_INTERVAL = float(os.environ.get('CDXT_IA_MIN_RETRY_INTERVAL', 6.0)) + + +def get_mock_time(): + """Get the mock time from environment variable, evaluated dynamically""" + mock_time = os.environ.get('CDXT_MOCK_TIME') + return float(mock_time) if mock_time else None diff --git a/cdx_toolkit/utils.py b/cdx_toolkit/utils.py new file mode 100644 index 0000000..aaf460c --- /dev/null +++ b/cdx_toolkit/utils.py @@ -0,0 +1,50 @@ +import cdx_toolkit +from cdx_toolkit.commoncrawl import normalize_crawl + +from argparse import Namespace +import logging + +LOGGER = logging.getLogger(__name__) + + +def get_version(): + return cdx_toolkit.__version__ + + +def setup_cdx_fetcher_and_kwargs(cmd: Namespace): + kwargs = {} + kwargs['source'] = 'cc' if cmd.crawl else cmd.cc or cmd.ia or cmd.source or None + if kwargs['source'] is None: + raise ValueError('must specify --cc, --ia, or a --source') + if cmd.wb: + kwargs['wb'] = cmd.wb + if cmd.cc_mirror: + kwargs['cc_mirror'] = cmd.cc_mirror + if cmd.crawl: + kwargs['crawl'] = normalize_crawl([cmd.crawl]) # currently a string, not a list + if getattr(cmd, 'warc_download_prefix', None) is not None: + kwargs['warc_download_prefix'] = cmd.warc_download_prefix + + cdx = cdx_toolkit.CDXFetcher(**kwargs) + + kwargs = {} + if cmd.limit: + kwargs['limit'] = cmd.limit + if 'from' in vars(cmd) and vars(cmd)['from']: # python, uh, from is a reserved word + kwargs['from_ts'] = vars(cmd)['from'] + if cmd.to: + kwargs['to'] = cmd.to + if cmd.closest: + if not cmd.get: # pragma: no cover + LOGGER.info('note: --closest works best with --get') + kwargs['closest'] = cmd.closest + if cmd.filter: + kwargs['filter'] = cmd.filter + + if cmd.cmd == 'warc' and cmd.size: + kwargs['size'] = cmd.size + + if cmd.cmd == 'size' and cmd.details: + kwargs['details'] = cmd.details + + return cdx, kwargs diff --git a/cdx_toolkit/warc.py b/cdx_toolkit/warc.py index e7e2bd5..3757ce4 100644 --- a/cdx_toolkit/warc.py +++ b/cdx_toolkit/warc.py @@ -1,10 +1,15 @@ from urllib.parse import quote from io import BytesIO -import os.path import datetime import logging -import sys - +import os + +try: + import fsspec + _HAS_FSSPEC = True +except ImportError: # pragma: no cover - exercised in minimal installs + fsspec = None + _HAS_FSSPEC = False from warcio import WARCWriter from warcio.recordloader import ArcWarcRecordLoader from warcio.bufferedreaders import DecompressingBufferedReader @@ -16,6 +21,35 @@ LOGGER = logging.getLogger(__name__) +def _is_s3_url(url): + return url.startswith('s3://') or url.startswith('s3:') + + +def _require_s3_deps(): + if not _HAS_FSSPEC: + raise RuntimeError( + 'Remote filesystem (S3) support requires optional dependencies. Install cdx_toolkit[s3].' + ) + + +class _LocalFileSystem: + def open(self, filename, mode): + return open(filename, mode) + + def exists(self, filename): + return os.path.exists(filename) + + +def _url_to_fs(prefix): + if _HAS_FSSPEC: + return fsspec.url_to_fs(prefix) + + if _is_s3_url(prefix) or '://' in prefix: + _require_s3_deps() + + return _LocalFileSystem(), prefix + + def wb_redir_to_original(location): return 'http' + location.split('_/http', 1)[1] @@ -32,9 +66,9 @@ def wb_redir_to_original(location): def fake_wb_warc(url, wb_url, resp, capture): - ''' + """ Given a playback from a wayback, fake up a warc response record - ''' + """ status_code = resp.status_code status_reason = resp.reason @@ -42,19 +76,18 @@ def fake_wb_warc(url, wb_url, resp, capture): url = capture['url'] timestamp = capture['timestamp'] if status_code == 200 and capture['status'] == '-': - LOGGER.warning('revisit record vivified by wayback for %s %s', - url, timestamp) + LOGGER.warning('revisit record vivified by wayback for %s %s', url, timestamp) elif status_code == 200 and capture['status'].startswith('3'): - LOGGER.warning('redirect capture came back 200, same-surt same-timestamp capture? %s %s', - url, timestamp) + LOGGER.warning('redirect capture came back 200, same-surt same-timestamp capture? %s %s', url, timestamp) elif status_code == 302 and capture['status'].startswith('3'): # this is OK, wayback always sends a temporary redir status_code = int(capture['status']) if status_code != resp.status_code and status_code in http_status_text: status_reason = http_status_text[status_code] else: # pragma: no cover - LOGGER.warning('surprised that status code is now=%d orig=%s %s %s', - status_code, capture['status'], url, timestamp) + LOGGER.warning( + f'status code is now={status_code}, orig={capture["status"]}, {url}, {timestamp}' + ) http_headers = [] http_date = None @@ -76,7 +109,7 @@ def fake_wb_warc(url, wb_url, resp, capture): k = 'X-Archive-' + k http_headers.append((k, v)) - statusline = '{} {}'.format(status_code, status_reason) + statusline = f'{status_code} {status_reason}' http_headers = StatusAndHeaders(statusline, headers=http_headers, protocol='HTTP/1.1') warc_headers_dict = { @@ -89,16 +122,15 @@ def fake_wb_warc(url, wb_url, resp, capture): content_bytes = resp.content writer = WARCWriter(None) # needs warc_version here? - return writer.create_warc_record(url, 'response', - payload=BytesIO(content_bytes), - http_headers=http_headers, - warc_headers_dict=warc_headers_dict) + return writer.create_warc_record( + url, 'response', payload=BytesIO(content_bytes), http_headers=http_headers, warc_headers_dict=warc_headers_dict + ) def fetch_wb_warc(capture, wb, modifier='id_'): for field in ('url', 'timestamp', 'status'): if field not in capture: # pragma: no cover - raise ValueError('capture must contain '+field) + raise ValueError('capture must contain ' + field) if wb is None: # pragma: no cover raise ValueError('No wayback configured') @@ -106,7 +138,7 @@ def fetch_wb_warc(capture, wb, modifier='id_'): url = capture['url'] timestamp = capture['timestamp'] - wb_url = '{}/{}{}/{}'.format(wb, timestamp, modifier, quote(url)) + wb_url = f'{wb}/{timestamp}{modifier}/{quote(url)}' kwargs = {} status = capture['status'] @@ -123,7 +155,7 @@ def fetch_wb_warc(capture, wb, modifier='id_'): def fetch_warc_record(capture, warc_download_prefix): for field in ('url', 'filename', 'offset', 'length'): if field not in capture: # pragma: no cover - raise ValueError('capture must contain '+field) + raise ValueError('capture must contain ' + field) url = capture['url'] filename = capture['filename'] @@ -131,32 +163,43 @@ def fetch_warc_record(capture, warc_download_prefix): length = int(capture['length']) warc_url = warc_download_prefix + '/' + filename - headers = {'Range': 'bytes={}-{}'.format(offset, offset+length-1)} - resp = myrequests_get(warc_url, headers=headers) - record_bytes = resp.content + if _is_s3_url(warc_url): + # fetch from S3 + _require_s3_deps() + with fsspec.open(warc_url, 'rb') as f: + f.seek(offset) + record_bytes = f.read(length) + else: + # fetch over HTTP + headers = {'Range': f'bytes={offset}-{offset + length - 1}'} + + resp = myrequests_get(warc_url, headers=headers) + record_bytes = resp.content + stream = DecompressingBufferedReader(BytesIO(record_bytes)) record = ArcWarcRecordLoader().parse_record_stream(stream) - for header in ('WARC-Source-URI', 'WARC-Source-Range'): - if record.rec_headers.get_header(header): # pragma: no cover - print('Surprised that {} was already set in this WARC record'.format(header), file=sys.stderr) + for header_field in ('WARC-Source-URI', 'WARC-Source-Range'): + if record.rec_headers.get_header(header_field): # pragma: no cover + LOGGER.error(f'Header field {header_field} is already set in this WARC record') warc_target_uri = record.rec_headers.get_header('WARC-Target-URI') if url != warc_target_uri: # pragma: no cover - print( - "Surprised that WARC-Target-URI {} is not the capture url {}".format( - warc_target_uri, url - ), - file=sys.stderr, + LOGGER.error( + f'WARC-Target-URI {warc_target_uri} is not the capture url {url}', ) record.rec_headers.replace_header('WARC-Source-URI', warc_url) - record.rec_headers.replace_header('WARC-Source-Range', 'bytes={}-{}'.format(offset, offset+length-1)) + record.rec_headers.replace_header('WARC-Source-Range', f'bytes={offset}-{offset + length - 1}') return record class CDXToolkitWARCWriter: + """Writer for WARC files. + + The fsspec package is used for writting to local or remote file system, e.g., S3.""" + def __init__(self, prefix, subprefix, info, size=1000000000, gzip=True, warc_version=None): self.prefix = prefix self.subprefix = subprefix @@ -166,6 +209,9 @@ def __init__(self, prefix, subprefix, info, size=1000000000, gzip=True, warc_ver self.warc_version = warc_version self.segment = 0 self.writer = None + self.file_handler = None + self.file_system, self.file_system_prefix = _url_to_fs(self.prefix) + self._file_context = None def write_record(self, *args, **kwargs): if self.writer is None: @@ -180,21 +226,21 @@ def write_record(self, *args, **kwargs): self.writer.write_record(*args, **kwargs) - fsize = os.fstat(self.fd.fileno()).st_size - if fsize > self.size: - self.fd.close() + # Compare file size of current segment with max. file size + if self.file_handler and self.file_handler.tell() > self.size: + self._close_current_file() self.writer = None self.segment += 1 def _unique_warc_filename(self): while True: - name = self.prefix + '-' + name = self.file_system_prefix + '-' if self.subprefix is not None: name += self.subprefix + '-' - name += '{:06d}'.format(self.segment) + '.extracted.warc' + name += f'{self.segment:06d}.extracted.warc' if self.gzip: name += '.gz' - if os.path.exists(name): + if self.file_system.exists(name): self.segment += 1 else: break @@ -202,12 +248,24 @@ def _unique_warc_filename(self): def _start_new_warc(self): self.filename = self._unique_warc_filename() - self.fd = open(self.filename, 'wb') + self._file_context = self.file_system.open(self.filename, 'wb') + self.file_handler = self._file_context.__enter__() LOGGER.info('opening new warc file %s', self.filename) - self.writer = WARCWriter(self.fd, gzip=self.gzip, warc_version=self.warc_version) + self.writer = WARCWriter(self.file_handler, gzip=self.gzip, warc_version=self.warc_version) warcinfo = self.writer.create_warcinfo_record(self.filename, self.info) self.writer.write_record(warcinfo) + def _close_current_file(self): + # Close the handler of the current file (needed for fsspec abstraction) + if self._file_context is not None: + self._file_context.__exit__(None, None, None) + self._file_context = None + self.file_handler = None + + def close(self): + # Close the WARC writer (this must be called at the end) + self._close_current_file() + def get_writer(prefix, subprefix, info, **kwargs): return CDXToolkitWARCWriter(prefix, subprefix, info, **kwargs) diff --git a/examples/iter-and-warc.py b/examples/iter-and-warc.py index 73ea3dd..61de9c0 100755 --- a/examples/iter-and-warc.py +++ b/examples/iter-and-warc.py @@ -32,3 +32,5 @@ writer.write_record(record) print(' wrote', url) + +writer.close() diff --git a/requirements.txt b/requirements.txt index f74bc4c..dba64ea 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,19 +1,23 @@ # Install with "python -m pip install -r requirements.txt". # must be kept in sync with setup.py -requests==2.25.1 +requests>=2.25.1 warcio==1.7.4 +# optional S3 dependencies (install via cdx_toolkit[s3]) +fsspec[s3] +botocore + # used by Makefile -pytest==6.2.4 -pytest-cov==2.12.1 -pytest-sugar==0.9.4 -coveralls==3.1.0 +pytest>=6.2.4 +pytest-cov>=2.12.1 +pytest-sugar>=0.9.4 +coveralls>=3.1.0 flake8>=7.3.0 responses==0.25.8 pre-commit>=4.3.0 # packaging -twine==3.4.1 -setuptools==57.0.0 -setuptools-scm==6.0.1 +twine>=3.4.1 +setuptools>=57.0.0 +setuptools-scm>=6.0.1 diff --git a/scripts/cdx_iter b/scripts/cdx_iter index 8b0c5a3..99445c0 100644 --- a/scripts/cdx_iter +++ b/scripts/cdx_iter @@ -143,6 +143,8 @@ elif args.warc: if obj.is_revisit(): LOGGER.warning('revisit record being resolved for url %s %s', url, timestamp) writer.write_record(record) + + writer.close() else: for obj in cdx.iter(args.url, **kwargs): printme = winnow_fields(obj) diff --git a/setup.py b/setup.py index 9a03208..b707eb6 100755 --- a/setup.py +++ b/setup.py @@ -2,26 +2,27 @@ from os import path -from setuptools import setup +from setuptools import setup, find_packages -packages = [ - 'cdx_toolkit', -] +packages = find_packages(include=['cdx_toolkit*']) # remember: keep requires synchronized with requirements.txt requires = ['requests', 'warcio'] test_requirements = ['pytest', 'pytest-cov', 'flake8', 'responses'] +optional_s3_requirements = ['fsspec[s3]', 'botocore'] package_requirements = ['twine', 'setuptools', 'setuptools-scm'] dev_requirements = ['pre-commit'] extras_require = { + 's3': optional_s3_requirements, 'test': test_requirements, # setup no longer tests, so make them an extra 'package': package_requirements, 'dev': package_requirements, + 'all': test_requirements + package_requirements + dev_requirements + optional_s3_requirements, } scripts = ['scripts/cdx_size', 'scripts/cdx_iter'] @@ -61,8 +62,8 @@ 'Natural Language :: English', 'License :: OSI Approved :: Apache Software License', 'Programming Language :: Python', - #'Programming Language :: Python :: 3.5', # setuptools-scm problem - #'Programming Language :: Python :: 3.6', # not offered in github actions + # 'Programming Language :: Python :: 3.5', # setuptools-scm problem + # 'Programming Language :: Python :: 3.6', # not offered in github actions 'Programming Language :: Python :: 3.9', 'Programming Language :: Python :: 3.10', 'Programming Language :: Python :: 3.11', diff --git a/tests/conftest.py b/tests/conftest.py index 1555c24..9f54f82 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,16 +1,97 @@ import json import os from pathlib import Path +import pytest + +from cdx_toolkit.settings import CACHE_DIR + +try: + import botocore.session + from botocore.config import Config + from botocore.exceptions import NoCredentialsError, ClientError, EndpointConnectionError + _HAS_BOTOCORE = True +except ImportError: # pragma: no cover - exercised in minimal installs + botocore = None + Config = None + NoCredentialsError = ClientError = EndpointConnectionError = Exception + _HAS_BOTOCORE = False + +try: + import fsspec # noqa: F401 + _HAS_FSSPEC = True +except ImportError: # pragma: no cover - exercised in minimal installs + _HAS_FSSPEC = False + import functools from typing import Dict, Optional import requests import responses import base64 +import shutil from unittest.mock import patch TEST_DATA_BASE_PATH = Path(__file__).parent / 'data' +TEST_S3_BUCKET = os.environ.get('CDXT_TEST_S3_BUCKET', 'commoncrawl-ci-temp') +DISABLE_S3_TESTS = bool(os.environ.get('CDXT_DISABLE_S3_TESTS', False)) + +# Cache for AWS access check to avoid repeated network calls +_aws_s3_access_cache = None + + +@pytest.fixture(scope='session', autouse=True) +def cleanup_cache(): + """Delete cache directory before each test to ensure clean state""" + cache_dir = os.path.expanduser(CACHE_DIR) + if os.path.exists(cache_dir): + shutil.rmtree(cache_dir) + + +@pytest.fixture(scope='session', autouse=True) +def set_mock_time(): + """Set CDXT_MOCK_TIME environment variable for consistent test results""" + # August 15, 2025 - ensures tests use CC-MAIN-2025-33 which exists in mock data + if 'CDXT_MOCK_TIME' not in os.environ: + os.environ['CDXT_MOCK_TIME'] = '1755259200' + + +def check_aws_s3_access(): + """Check if AWS S3 access is available (cached result).""" + global _aws_s3_access_cache + + if not _HAS_BOTOCORE: + return False + + if _aws_s3_access_cache is not None: + return _aws_s3_access_cache + + try: + config = Config(retries={'max_attempts': 1, 'mode': 'standard'}) + session = botocore.session.Session() + s3_client = session.create_client('s3', config=config) + + # Try list objects on test bucket + s3_client.list_objects_v2(Bucket=TEST_S3_BUCKET, MaxKeys=1) + _aws_s3_access_cache = True + except (NoCredentialsError, ClientError, ConnectionError, EndpointConnectionError): + _aws_s3_access_cache = False + + return _aws_s3_access_cache + + +def requires_aws_s3(func): + """Pytest decorator that skips test if AWS S3 access is not available or disabled.""" + if not _HAS_BOTOCORE or not _HAS_FSSPEC: + return pytest.mark.skipif( + True, reason='S3 dependencies are not installed; install warcio[s3] to enable S3 tests.' + )(func) + + return pytest.mark.skipif(DISABLE_S3_TESTS, reason='AWS S3 access is disabled via environment variable.')( + pytest.mark.skipif( + not check_aws_s3_access(), reason='AWS S3 access not available (no credentials or permissions)' + )(func) + ) def flexible_param_matcher(expected_params): diff --git a/tests/unit/test_warc.py b/tests/unit/test_warc.py index e5df43f..428e132 100644 --- a/tests/unit/test_warc.py +++ b/tests/unit/test_warc.py @@ -1,7 +1,278 @@ import cdx_toolkit.warc +from tests.conftest import requires_aws_s3 +from unittest.mock import Mock +import pytest def test_wb_redir_to_original(): location = 'https://web.archive.org/web/20110209062054id_/http://commoncrawl.org/' ret = 'http://commoncrawl.org/' assert cdx_toolkit.warc.wb_redir_to_original(location) == ret + + +@requires_aws_s3 +def test_fetch_warc_record_from_s3(): + record = cdx_toolkit.warc.fetch_warc_record( + capture={ + 'url': 'https://bibliotheque.missiondefrance.fr/index.php?lvl=bulletin_display&id=319', + 'filename': 'crawl-data/CC-MAIN-2024-30/segments/1720763514759.37/warc/CC-MAIN-20240716142214-20240716172214-00337.warc.gz', # noqa: E501 + 'offset': 111440525, + 'length': 9754, + }, + warc_download_prefix='s3://commoncrawl', + ) + record_content = record.content_stream().read().decode(errors='ignore') + + assert record.rec_type == 'response' + assert record.length == 75825 + assert 'Catalogue en ligne Mission de France' in record_content + + +def test_get_fake_wb_warc_status_code(caplog): + """Test status code handling in fake_wb_warc function.""" + + # Test case 1: Status codes match - no warnings + caplog.clear() + mock_resp = Mock() + mock_resp.status_code = 200 + mock_resp.reason = 'OK' + mock_resp.headers = {'Content-Type': 'text/html'} + mock_resp.content = b'test' + + capture = {'url': 'http://example.com', 'timestamp': '20240101120000', 'status': '200'} + + record = cdx_toolkit.warc.fake_wb_warc( + url='http://example.com', + wb_url='https://web.archive.org/web/20240101120000id_/http://example.com', + resp=mock_resp, + capture=capture, + ) + + assert record is not None + assert record.rec_type == 'response' + assert 'revisit record vivified' not in caplog.text + assert 'redirect capture came back 200' not in caplog.text + assert 'status code is now' not in caplog.text + + # Test case 2: Revisit record vivified (200 response, '-' status in capture) + caplog.clear() + capture_revisit = {'url': 'http://example.com', 'timestamp': '20240101120000', 'status': '-'} + + record = cdx_toolkit.warc.fake_wb_warc( + url='http://example.com', + wb_url='https://web.archive.org/web/20240101120000id_/http://example.com', + resp=mock_resp, + capture=capture_revisit, + ) + + assert record is not None + assert 'revisit record vivified by wayback' in caplog.text + + # Test case 3: Redirect capture came back 200 (200 response, 3xx status in capture) + caplog.clear() + capture_redirect = {'url': 'http://example.com', 'timestamp': '20240101120000', 'status': '301'} + + record = cdx_toolkit.warc.fake_wb_warc( + url='http://example.com', + wb_url='https://web.archive.org/web/20240101120000id_/http://example.com', + resp=mock_resp, + capture=capture_redirect, + ) + + assert record is not None + assert 'redirect capture came back 200' in caplog.text + + # Test case 4: Wayback returns 302, capture has 3xx status - should use capture status + caplog.clear() + mock_resp_302 = Mock() + mock_resp_302.status_code = 302 + mock_resp_302.reason = 'Found' + mock_resp_302.headers = { + 'Content-Type': 'text/html', + 'Location': 'https://web.archive.org/web/20240101120000id_/http://example.com/new', + } + mock_resp_302.content = b'' + + capture_301 = {'url': 'http://example.com', 'timestamp': '20240101120000', 'status': '301'} + + record = cdx_toolkit.warc.fake_wb_warc( + url='http://example.com', + wb_url='https://web.archive.org/web/20240101120000id_/http://example.com', + resp=mock_resp_302, + capture=capture_301, + ) + + assert record is not None + # The status line should use 301 from capture, not 302 from response + assert '301' in record.http_headers.get_statuscode() + # No warnings for this case - it's expected behavior + assert 'revisit record vivified' not in caplog.text + assert 'redirect capture came back 200' not in caplog.text + assert 'status code is now' not in caplog.text + + # Test case 5: 302 response with 307 capture status - should use 307 + caplog.clear() + capture_307 = {'url': 'http://example.com', 'timestamp': '20240101120000', 'status': '307'} + + record = cdx_toolkit.warc.fake_wb_warc( + url='http://example.com', + wb_url='https://web.archive.org/web/20240101120000id_/http://example.com', + resp=mock_resp_302, + capture=capture_307, + ) + + assert record is not None + assert '307' in record.http_headers.get_statuscode() + # No warnings for this case either + assert 'revisit record vivified' not in caplog.text + assert 'redirect capture came back 200' not in caplog.text + assert 'status code is now' not in caplog.text + + # Test case 6: Mismatched status codes (not covered by special cases) + caplog.clear() + mock_resp_404 = Mock() + mock_resp_404.status_code = 404 + mock_resp_404.reason = 'Not Found' + mock_resp_404.headers = {'Content-Type': 'text/html'} + mock_resp_404.content = b'Not found' + + capture_200 = {'url': 'http://example.com', 'timestamp': '20240101120000', 'status': '200'} + + record = cdx_toolkit.warc.fake_wb_warc( + url='http://example.com', + wb_url='https://web.archive.org/web/20240101120000id_/http://example.com', + resp=mock_resp_404, + capture=capture_200, + ) + + assert record is not None + # This should trigger the "surprised" warning (else case) + assert 'status code is now' in caplog.text + + +def test_unique_warc_filename(): + """Test _unique_warc_filename method of CDXToolkitWARCWriter.""" + + # Test case 1: Basic filename generation with gzip and no subprefix + writer = cdx_toolkit.warc.CDXToolkitWARCWriter( + prefix='/tmp/test-prefix', subprefix=None, info='test info', gzip=True + ) + + filename = writer._unique_warc_filename() + # Use the actual file_system_prefix from the writer for cross-platform compatibility + expected = f'{writer.file_system_prefix}-000000.extracted.warc.gz' + assert filename == expected + assert writer.segment == 0 + + # Test case 2: Filename generation without gzip + writer_no_gzip = cdx_toolkit.warc.CDXToolkitWARCWriter( + prefix='/tmp/test-prefix', subprefix=None, info='test info', gzip=False + ) + + filename = writer_no_gzip._unique_warc_filename() + expected = f'{writer_no_gzip.file_system_prefix}-000000.extracted.warc' + assert filename == expected + assert not filename.endswith('.gz') + + # Test case 3: Filename generation with subprefix + writer_subprefix = cdx_toolkit.warc.CDXToolkitWARCWriter( + prefix='/tmp/test-prefix', subprefix='mysub', info='test info', gzip=True + ) + + filename = writer_subprefix._unique_warc_filename() + expected = f'{writer_subprefix.file_system_prefix}-mysub-000000.extracted.warc.gz' + assert filename == expected + assert 'mysub' in filename + + # Test case 4: Filename generation with subprefix and no gzip + writer_subprefix_no_gzip = cdx_toolkit.warc.CDXToolkitWARCWriter( + prefix='/tmp/test-prefix', subprefix='another', info='test info', gzip=False + ) + + filename = writer_subprefix_no_gzip._unique_warc_filename() + expected = f'{writer_subprefix_no_gzip.file_system_prefix}-another-000000.extracted.warc' + assert filename == expected + assert 'another' in filename + assert not filename.endswith('.gz') + + # Test case 5: Handling of existing files - should increment segment + writer_increment = cdx_toolkit.warc.CDXToolkitWARCWriter( + prefix='/tmp/test-increment', subprefix=None, info='test info', gzip=True + ) + + # Mock the file_system.exists to simulate existing files + original_exists = writer_increment.file_system.exists + call_count = [0] + + def mock_exists(path): + call_count[0] += 1 + # First two calls return True (files exist), third returns False + if call_count[0] <= 2: + return True + return original_exists(path) + + writer_increment.file_system.exists = mock_exists + + filename = writer_increment._unique_warc_filename() + # Should have incremented segment twice (0 and 1 existed, 2 is free) + expected = f'{writer_increment.file_system_prefix}-000002.extracted.warc.gz' + assert filename == expected + assert writer_increment.segment == 2 + + # Restore original + writer_increment.file_system.exists = original_exists + + # Test case 6: Multiple segments with subprefix + writer_multi = cdx_toolkit.warc.CDXToolkitWARCWriter( + prefix='/tmp/test-multi', subprefix='batch1', info='test info', gzip=True + ) + writer_multi.segment = 5 + + filename = writer_multi._unique_warc_filename() + expected = f'{writer_multi.file_system_prefix}-batch1-000005.extracted.warc.gz' + assert filename == expected + assert '000005' in filename + + +def test_is_s3_url(): + assert cdx_toolkit.warc._is_s3_url('s3://bucket/key') + assert cdx_toolkit.warc._is_s3_url('s3:bucket/key') + assert not cdx_toolkit.warc._is_s3_url('https://example.com/file.warc.gz') + + +def test_url_to_fs_local_without_fsspec(monkeypatch, tmp_path): + monkeypatch.setattr(cdx_toolkit.warc, '_HAS_FSSPEC', False) + monkeypatch.setattr(cdx_toolkit.warc, 'fsspec', None) + + fs, prefix = cdx_toolkit.warc._url_to_fs(str(tmp_path)) + + assert isinstance(fs, cdx_toolkit.warc._LocalFileSystem) + assert prefix == str(tmp_path) + + test_file = tmp_path / 'local.warc' + with fs.open(str(test_file), 'wb') as handle: + handle.write(b'test') + assert fs.exists(str(test_file)) + + +def test_url_to_fs_requires_deps_for_s3(monkeypatch): + monkeypatch.setattr(cdx_toolkit.warc, '_HAS_FSSPEC', False) + monkeypatch.setattr(cdx_toolkit.warc, 'fsspec', None) + + with pytest.raises(RuntimeError, match=r'cdx_toolkit\[s3\]'): + cdx_toolkit.warc._url_to_fs('s3://bucket/key') + + +def test_fetch_warc_record_requires_s3_deps(monkeypatch): + monkeypatch.setattr(cdx_toolkit.warc, '_HAS_FSSPEC', False) + monkeypatch.setattr(cdx_toolkit.warc, 'fsspec', None) + + capture = { + 'url': 'http://example.com', + 'filename': 'dummy.warc.gz', + 'offset': 0, + 'length': 1, + } + + with pytest.raises(RuntimeError, match=r'cdx_toolkit\[s3\]'): + cdx_toolkit.warc.fetch_warc_record(capture, warc_download_prefix='s3://bucket')