diff --git a/nemo_curator_semantic_dedup/Dockerfile b/nemo_curator_semantic_dedup/Dockerfile new file mode 100644 index 0000000..1e2d845 --- /dev/null +++ b/nemo_curator_semantic_dedup/Dockerfile @@ -0,0 +1,72 @@ +# NeMo Curator Image Deduplication Example +# Uses CUDA 12.8 for GPU-accelerated processing +FROM anyscale/ray:2.54.0-slim-py312-cu128 + +# Note: Cache busting for git clone is done via CURATOR_CACHE_BUST arg below + +# Install system dependencies +RUN sudo apt-get update && \ + sudo apt-get install -y --no-install-recommends \ + build-essential \ + unzip \ + wget \ + curl \ + git && \ + sudo apt-get clean && \ + sudo rm -rf /var/lib/apt/lists/* + +# Install uv for fast package management +RUN curl -LsSf https://astral.sh/uv/install.sh | sh + +# Install Python dependencies +# Use uv pip install --system to install into the base anaconda environment +# so all Ray workers (not just the driver) have these packages +RUN python -m pip install --upgrade pip setuptools wheel + +# IMPORTANT: Uninstall any pre-existing RAPIDS/cuML packages from the base image +# The base image may have incompatible versions that conflict with scikit-learn +RUN python -m pip uninstall -y cuml-cu12 cudf-cu12 cugraph-cu12 pylibraft-cu12 raft-dask-cu12 rmm-cu12 || true && \ + echo "Cleaned up pre-existing RAPIDS packages" + +# Install NeMo-Curator from the fork branch with CUDA dependencies (DALI, cuML, RAFT, etc.) +ARG CURATOR_REPO=https://github.com/avigyabb/Curator.git +ARG CURATOR_REF=avi-test +RUN git clone --depth 1 -b ${CURATOR_REF} ${CURATOR_REPO} /home/ray/NeMo-Curator && \ + uv pip install --system /home/ray/NeMo-Curator[image_cuda12] + +# Re-upgrade scikit-learn AFTER nemo-curator in case it was downgraded +# cuML 25.6.* needs sklearn >= 1.5 (has _get_default_requests) +RUN uv pip install --system "scikit-learn>=1.5,<1.6" && \ + python -c "import sklearn; print(f'Final scikit-learn version: {sklearn.__version__}')" + +# Additional dependencies for image downloading and processing +RUN uv pip install --system \ + loguru \ + Pillow \ + aiohttp \ + tqdm \ + pandas \ + pyarrow \ + huggingface_hub \ + transformers \ + webdataset \ + pydantic-settings + +# Set environment variable for model directory +ENV MODEL_DIR=/home/ray/model_weights + +# NCCL diagnostic output — baked into the image so every worker node has it. +# (job.yaml env_vars only reach the driver; these ENV lines reach all nodes.) +ENV NCCL_DEBUG=WARN + +# Pre-download CLIP model weights into the image so workers never race on download +RUN python -c "\ +from nemo_curator.models.clip import CLIPImageEmbeddings; \ +CLIPImageEmbeddings.download_weights_on_node('/home/ray/model_weights')" + +# Create output directories +RUN mkdir -p /home/ray/data/webdataset \ + /home/ray/data/results \ + /home/ray/data/embeddings \ + /home/ray/data/removal_ids + diff --git a/nemo_curator_semantic_dedup/README.md b/nemo_curator_semantic_dedup/README.md new file mode 100644 index 0000000..79abd0a --- /dev/null +++ b/nemo_curator_semantic_dedup/README.md @@ -0,0 +1,124 @@ +# Image Semantic Deduplication with NeMo Curator + +This example uses [NVIDIA NeMo Curator](https://github.com/NVIDIA-NeMo/Curator) to perform GPU-accelerated semantic deduplication on image datasets. It reads image URLs from a HuggingFace parquet dataset, downloads them into [WebDataset](https://github.com/webdataset/webdataset) tar shards, generates CLIP embeddings on GPUs, clusters them with K-means + DBSCAN to find near-duplicates, and writes a clean deduplicated dataset. + +## Install the Anyscale CLI + +(required version = 0.26.82+) + +```bash +pip install -U anyscale +anyscale login +``` + +## Run the job + +Clone the example from GitHub. + +```bash +git clone https://github.com/anyscale/examples.git +cd examples/nemo_curator_semantic_dedup +``` + +Submit the job. Use `--env` to forward your HuggingFace token for dataset access. + +```bash +anyscale job submit -f job.yaml --env HF_TOKEN=$HF_TOKEN +``` + +## How it works + +The entrypoint defined in [job.yaml](./job.yaml) runs [image_dedup_example.py](./image_dedup_example.py), which executes four steps: + +``` +HuggingFace parquet (URLs + captions) + │ + ▼ +Step 1: Download images → WebDataset tar shards + │ + ▼ +Step 2: Generate CLIP embeddings (GPU) + │ + ▼ +Step 3: Semantic deduplication (K-means + DBSCAN) + │ + ▼ +Step 4: Write deduplicated dataset (new tar shards without duplicates) +``` + +### Step 1: Parquet to WebDataset + +`main()` calls `parquet_to_webdataset_ray()` in [helper.py](./helper.py), which builds a Ray Data pipeline to download images and pack them into WebDataset tar shards: + +``` +read_parquet (HF) → repartition → map_batches(download) → flat_map(validate) → map_batches(write_tar) +``` + +| Ray Data operator | Function | What it does | +|-------------------|----------|-------------| +| `read_parquet` | — | Lazily reads `url` and `caption` columns from HuggingFace via `HfFileSystem`. | +| `repartition` | — | Splits large parquet blocks (~millions of rows) into ~1000-row blocks so Ray can parallelize downstream work across the cluster. | +| `map_batches` | `image_download_batch` | Downloads images in batches of 100. Within each Ray task, a `ThreadPoolExecutor` with 100 threads downloads URLs concurrently — so you get parallelism at two levels: Ray distributes batches across cluster CPUs, and threads parallelize I/O within each batch. | +| `flat_map` | `process_image` | Validates each image with Pillow (`.verify()`), converts to RGB JPEG, and drops failures by returning `[]`. Handles all image modes (RGBA, palette, grayscale, CMYK) by compositing onto a white background. | +| `map_batches` | `write_tar_batch` | Packs `ENTRIES_PER_TAR` images (default 500) into a single `.tar` shard on cluster storage. Each shard gets a unique name based on node ID + UUID to avoid collisions when multiple nodes write simultaneously. | + +The entire pipeline streams end-to-end — Ray Data handles backpressure so fast stages don't overwhelm slow ones, and data flows through without loading the full dataset into memory. `.take_all()` at the end triggers execution. + +### Step 2: Image embedding pipeline + +`create_image_embedding_pipeline()` builds a NeMo Curator `Pipeline` with five stages: + +| Stage | What it does | +|-------|-------------| +| `FilePartitioningStage` | Discovers `.tar` files and groups them into partitions (`tar_files_per_partition=1`). Not a bottleneck at this scale — GPU throughput on the embedding stage is. | +| `ImageReaderStage` | Reads images from tar shards. DALI decodes JPEGs on the GPU. | +| `ImageEmbeddingStage` | Runs OpenAI CLIP ViT-L/14 to produce 768-dimensional embeddings for each image. | +| `ConvertImageBatchToDocumentBatchStage` | Converts from `ImageBatch` (numpy pixel arrays) to `DocumentBatch` (DataFrames), keeping only `image_id` and `embedding`. | +| `ParquetWriter` | Saves embeddings to parquet files on cluster storage. | + +This pipeline runs on a `RayDataExecutor`, which treats the stages as a streaming dataflow — data flows through one batch at a time in a producer-consumer pattern. It's well suited for stages that process data independently without coordination between workers. + +### Step 3: Semantic deduplication workflow + +`SemanticDeduplicationWorkflow` is where the actual deduplication happens: + +1. **K-means clustering** (`n_clusters=100`) — groups similar embeddings together using RAFT/NCCL across all GPU actors. +2. **DBSCAN within clusters** (`eps=0.01`) — finds near-duplicate pairs based on cosine distance within each cluster. + +This step runs on a `RayActorPoolExecutor`, which creates a fixed pool of long-lived Ray actors, each holding a GPU. This is needed because K-means is iterative: each actor holds a shard of the embeddings plus the current 100 cluster centroids in GPU memory, and on every iteration all actors reassign points, compute partial centroid sums, then synchronize via NCCL to update the shared centroids. That state (centroids + assignments) must persist across iterations until convergence — a stateless streaming executor, which processes each batch independently and forgets it, can't do that. + +### Step 4: Write deduplicated dataset + +`create_image_deduplication_pipeline()` re-reads the original tar shards, filters out images whose IDs appear in the removal list, and writes new tar shards. You can't selectively remove files from a tar archive — tar is a flat concatenation of file records with no editable index — so writing new shards without the duplicates is the only option. `ImageWriterStage` re-encodes each image's numpy pixel array back to JPEG with Pillow, packs them into new `.tar` files (up to 1000 images per tar), and writes a companion `.parquet` with metadata (`image_id`, `tar_file`, `member_name`, `original_path`). + +## Cluster storage + +All intermediate and output data lives under `/mnt/cluster_storage/`, a shared network filesystem (backed by S3) that is automatically mounted on every node in the cluster. This is necessary because Steps 2, 3, and 4 run as separate pipeline executions with different executors (`RayDataExecutor` for streaming stages, `RayActorPoolExecutor` for K-means). When one step finishes, its data is no longer in memory — the next step reads it back from disk. A shared filesystem ensures any node can read what any other node wrote, regardless of which step produced it. + +| Step | Directory | Contents | +|------|-----------|----------| +| 1 | `webdataset/` | WebDataset `.tar` files (downloaded images) | +| 2 | `embeddings/` | Parquet files with `image_id` + embedding vectors | +| 3 | `removal_ids/` | K-means results, pairwise results, and `duplicates/` parquet listing IDs to remove | +| 4 | `results/` | Final deduplicated WebDataset `.tar` files | + +## Configuration + +All configuration is done through environment variables in [job.yaml](./job.yaml). Override them at submit time: + +```bash +anyscale job submit -f job.yaml \ + --env HF_TOKEN=$HF_TOKEN +``` + +The [Dockerfile](./Dockerfile) builds a custom image with NeMo Curator's CUDA dependencies (`nemo-curator[image_cuda12]`), including DALI, cuML, and RAFT. The NeMo Curator Python package itself is overridden at runtime via `PYTHONPATH=Curator` in job.yaml, which points to a local `Curator/` directory uploaded with the working directory. + +## View the job + +View the job in the [jobs tab](https://console.anyscale.com/jobs) of the Anyscale console. + +## Learn more + +- [NeMo Curator Documentation](https://docs.nvidia.com/nemo/curator/latest/) +- [NeMo Curator Image Tutorials](https://github.com/NVIDIA-NeMo/Curator/tree/main/tutorials/image/getting-started) +- [Anyscale Jobs Documentation](https://docs.anyscale.com/platform/jobs/) diff --git a/nemo_curator_semantic_dedup/helper.py b/nemo_curator_semantic_dedup/helper.py new file mode 100644 index 0000000..bc5b041 --- /dev/null +++ b/nemo_curator_semantic_dedup/helper.py @@ -0,0 +1,172 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Helper functions for downloading and preparing image datasets as WebDataset tar files.""" + +from __future__ import annotations + +import io +import os +import uuid +from concurrent.futures import ThreadPoolExecutor +from typing import Any +import requests +from loguru import logger +from PIL import Image +import webdataset as wds + + +def download_single_image(url: str, session: requests.Session) -> bytes | None: + """Download a single image, returning bytes or None on failure.""" + try: + response = session.get(url, timeout=5, stream=True) + return response.content if response.status_code == 200 else None + except Exception: + return None + + +def image_download_batch(batch: dict[str, Any]) -> dict[str, Any]: + """Download a batch of images using ThreadPoolExecutor for parallelism.""" + session = requests.Session() + adapter = requests.adapters.HTTPAdapter( + pool_connections=100, pool_maxsize=100, max_retries=0, + ) + session.mount("http://", adapter) + session.mount("https://", adapter) + + # Use ThreadPoolExecutor for parallel downloads within this batch + # 50 threads means 50 concurrent downloads per Ray task + with ThreadPoolExecutor(max_workers=100) as executor: + batch["bytes"] = list(executor.map(lambda url: download_single_image(url, session), batch["url"])) + + return batch + + +def process_image(row: dict[str, Any]) -> list[dict[str, Any]]: + """Validate downloaded image bytes, convert to JPEG, and drop failures. + + Returns a single-element list on success or an empty list to drop the row. + Designed for use with Ray Data's flat_map. + """ + image_bytes = row.get("bytes") + if not image_bytes: + return [] + + try: + img = Image.open(io.BytesIO(image_bytes)) + img.verify() + img = Image.open(io.BytesIO(image_bytes)) + + # Robust RGB conversion for ALL image modes (L, LA, P, PA, RGBA, CMYK, etc.) + # This ensures CLIP gets 3-channel images + if img.mode != "RGB": + if img.mode == "P": + img = img.convert("RGBA") + # For any mode with alpha, composite onto white background + if img.mode in ("RGBA", "LA", "PA"): + background = Image.new("RGB", img.size, (255, 255, 255)) + # Use alpha channel as mask + if img.mode == "LA": + img = img.convert("RGBA") + background.paste(img, mask=img.split()[-1]) + img = background + else: + img = img.convert("RGB") + + if img.mode != "RGB" or img.size[0] < 3 or img.size[1] < 3: + return [] + + jpeg_buffer = io.BytesIO() + img.save(jpeg_buffer, format="JPEG", quality=95) + row["jpeg_bytes"] = jpeg_buffer.getvalue() + return [row] + except Exception: + return [] + + +def write_tar_batch(batch: dict[str, Any], output_dir: str) -> dict[str, Any]: + """Write a batch of images to a WebDataset tar shard.""" + import ray + + node_id = ray.get_runtime_context().get_node_id()[:8] + shard_id = f"{node_id}_{uuid.uuid4().hex[:8]}" + tar_path = os.path.join(output_dir, f"{shard_id}.tar") + + urls = batch["url"] + captions = batch["caption"] + jpeg_list = batch["jpeg_bytes"] + num_images = len(urls) + + with wds.TarWriter(tar_path) as sink: + for i in range(num_images): + sink.write({ + "__key__": f"{shard_id}_{i:06d}", + "jpg": jpeg_list[i], + "txt": str(captions[i]), + "json": {"url": urls[i], "caption": captions[i]}, + }) + + return {"shard_id": [shard_id], "success_count": [num_images], "total_count": [num_images]} + + +def parquet_to_webdataset_ray( + hf_dataset_path: str, + output_dir: str, + entries_per_tar: int = 1000, + max_entries: int | None = None, + concurrency: int | None = None, +) -> dict[str, int]: + """Convert HuggingFace parquet dataset to WebDataset tar files using Ray Data.""" + import ray + import ray.data + from functools import partial + from huggingface_hub import HfFileSystem + + os.makedirs(output_dir, exist_ok=True) + + ds = ray.data.read_parquet( + hf_dataset_path, + file_extensions=["parquet"], + columns=["url", "caption"], + filesystem=HfFileSystem(token=os.environ["HF_TOKEN"]), + concurrency=10, + ) + + if max_entries is not None: + ds = ds.limit(max_entries) + ds = ds.repartition(num_blocks=max(100, max_entries // 1000)) + + if concurrency is None: + cluster_resources = ray.cluster_resources() + concurrency = max(4, int(cluster_resources.get("CPU", 4))) + + # Download images, validate, convert to JPEG + ds = ds.map_batches(image_download_batch, batch_size=100, batch_format="numpy") + ds = ds.flat_map(process_image) + + # Write tar shards + results = ds.map_batches( + partial(write_tar_batch, output_dir=output_dir), + batch_size=entries_per_tar, + batch_format="numpy", + concurrency=concurrency, + ).take_all() + + total_success = sum(r["success_count"] for r in results) + num_shards = len(results) + total_attempted = max_entries if max_entries is not None else total_success + success_rate = (total_success / total_attempted * 100) if total_attempted > 0 else 0 + logger.info(f"Download complete: {total_success} images in {num_shards} shards ({success_rate:.1f}% success rate)") + + return {"total_success": total_success, "total_attempted": total_attempted, "num_shards": num_shards} diff --git a/nemo_curator_semantic_dedup/image_dedup_example.py b/nemo_curator_semantic_dedup/image_dedup_example.py new file mode 100644 index 0000000..1140e0e --- /dev/null +++ b/nemo_curator_semantic_dedup/image_dedup_example.py @@ -0,0 +1,209 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import shutil +from functools import partial + +from loguru import logger +from pydantic_settings import BaseSettings, SettingsConfigDict + +from helper import image_download_batch, process_image, write_tar_batch + +from nemo_curator.backends.experimental.ray_actor_pool import RayActorPoolExecutor +from nemo_curator.backends.experimental.ray_data import RayDataExecutor +from nemo_curator.pipeline import Pipeline +from nemo_curator.stages.deduplication.semantic import SemanticDeduplicationWorkflow +from nemo_curator.stages.file_partitioning import FilePartitioningStage +from nemo_curator.stages.image.deduplication.removal import ImageDuplicatesRemovalStage +from nemo_curator.stages.image.embedders.clip_embedder import ImageEmbeddingStage +from nemo_curator.stages.image.io.convert import ConvertImageBatchToDocumentBatchStage +from nemo_curator.stages.image.io.image_reader import ImageReaderStage +from nemo_curator.stages.image.io.image_writer import ImageWriterStage +from nemo_curator.stages.text.io.writer.parquet import ParquetWriter + + +class Config(BaseSettings): + """Configuration loaded from environment variables.""" + + model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8") + + input_parquet: str + input_wds_dir: str + output_dir: str + embeddings_dir: str + removal_dir: str + model_dir: str = "/home/ray/model_weights" + entries_per_tar: int = 1000 + max_entries: int | None = None + tar_files_per_partition: int = 1 + batch_size: int = 100 + embedding_batch_size: int = 32 + reader_cpus_per_task: int = 8 + + +def _make_reader(config: Config) -> ImageReaderStage: + """Create an ImageReaderStage with capped concurrency to prevent OOM. + + DALI reader tasks consume 10-55 GB each (full-resolution decode buffers). + By default each task requests only 1 CPU, so Ray schedules ~48 per node, + far exceeding the 184 GB node memory. Requesting more CPUs per task + limits concurrency (e.g. 16 CPUs → 3 readers per 48-CPU node). This + also prevents readers and embedding actors from running simultaneously, + avoiding GPU-VRAM exhaustion from reader CUDA contexts. + """ + reader = ImageReaderStage(batch_size=config.batch_size, num_gpus_per_worker=0) + reader.resources.cpus = config.reader_cpus_per_task + # Request a tiny GPU fraction to force placement on GPU nodes + # (without this, readers land on small cpu-downloader nodes) + reader.resources.gpus = 0.01 + return reader + + +def create_image_embedding_pipeline(config: Config) -> Pipeline: + """Create pipeline: read images -> generate CLIP embeddings -> save to parquet.""" + pipeline = Pipeline(name="image_embedding") + + pipeline.add_stage(FilePartitioningStage( + file_paths=config.input_wds_dir, + files_per_partition=config.tar_files_per_partition, + file_extensions=[".tar"], + )) + + pipeline.add_stage(_make_reader(config)) + + pipeline.add_stage(ImageEmbeddingStage( + model_dir=config.model_dir, + model_inference_batch_size=config.embedding_batch_size, + remove_image_data=True, + )) + + pipeline.add_stage(ConvertImageBatchToDocumentBatchStage(fields=["image_id", "embedding"])) + + pipeline.add_stage(ParquetWriter(path=config.embeddings_dir)) + + return pipeline + + +def create_embedding_deduplication_workflow(config: Config) -> Pipeline: + """Create semantic deduplication workflow using K-means + DBSCAN.""" + return SemanticDeduplicationWorkflow( + input_path=config.embeddings_dir, + output_path=config.removal_dir, + # Must match column names written by ConvertImageBatchToDocumentBatchStage + id_field="image_id", + embedding_field="embedding", + # Number of K-means clusters — controls the size of each pairwise + # comparison group. More clusters = smaller groups = faster pairwise + # but coarser semantic grouping. 100 is a reasonable default for ~10M images. + n_clusters=100, + # Cosine similarity threshold for marking duplicates. Pairs with + # similarity >= (1 - eps) are considered duplicates. 0.01 means + # images must be >= 99% similar to be flagged. + eps=0.01, + ) + + +def create_image_deduplication_pipeline(config: Config) -> Pipeline: + """Create pipeline: read images -> filter duplicates -> write deduplicated dataset.""" + pipeline = Pipeline(name="image_deduplication") + + pipeline.add_stage(FilePartitioningStage( + file_paths=config.input_wds_dir, + files_per_partition=config.tar_files_per_partition, + file_extensions=[".tar"], + )) + + pipeline.add_stage(_make_reader(config)) + + pipeline.add_stage(ImageDuplicatesRemovalStage( + removal_parquets_dir=config.removal_dir + "/duplicates", + duplicate_id_field="id", + )) + + pipeline.add_stage(ImageWriterStage( + output_dir=config.output_dir, + remove_image_data=True, + )) + + return pipeline + + +def main(config: Config) -> None: + """Main execution function for image semantic deduplication pipeline.""" + # Clean output directories from previous runs to avoid processing stale data + for d in [config.input_wds_dir, config.embeddings_dir, + config.removal_dir, config.output_dir]: + if os.path.exists(d): + shutil.rmtree(d) + + # Step 1: Download images and create WebDataset tar files + import ray + import ray.data + from huggingface_hub import HfFileSystem + + os.makedirs(config.input_wds_dir, exist_ok=True) + + ds = ray.data.read_parquet( + config.input_parquet, + file_extensions=["parquet"], + columns=["url", "caption"], + filesystem=HfFileSystem(token=os.environ["HF_TOKEN"]), + concurrency=10, + ) + + if config.max_entries is not None: + ds = ds.limit(config.max_entries) + ds = ds.repartition(num_blocks=max(100, config.max_entries // 1000)) + + cluster_cpus = int(ray.cluster_resources().get("CPU", 4)) + concurrency = max(4, cluster_cpus) + + ds = ds.map_batches(image_download_batch, batch_size=100, batch_format="numpy") + ds = ds.flat_map(process_image) + + # Write tar shards — these become input for the embedding pipeline below + results = ds.map_batches( + partial(write_tar_batch, output_dir=config.input_wds_dir), + batch_size=config.entries_per_tar, + batch_format="numpy", + concurrency=concurrency, + ).take_all() + + total_success = sum(r["success_count"] for r in results) + num_shards = len(results) + total_attempted = config.max_entries if config.max_entries is not None else total_success + success_rate = (total_success / total_attempted * 100) if total_attempted > 0 else 0 + logger.info(f"Download complete: {total_success} images in {num_shards} shards ({success_rate:.1f}% success rate)") + + # Use executors that avoid scheduling on CPU-only head node + streaming_executor = RayDataExecutor(ignore_head_node=True) + actor_executor = RayActorPoolExecutor(ignore_head_node=True) + + # Step 2: Generate CLIP embeddings + pipeline = create_image_embedding_pipeline(config) + pipeline.run(executor=streaming_executor) + + # Step 3: Find semantic duplicates using K-means + DBSCAN + workflow = create_embedding_deduplication_workflow(config) + workflow.run(kmeans_executor=actor_executor, pairwise_executor=actor_executor) + + # Step 4: Write deduplicated dataset + pipeline = create_image_deduplication_pipeline(config) + pipeline.run(executor=streaming_executor) + + +if __name__ == "__main__": + config = Config() + main(config) \ No newline at end of file diff --git a/nemo_curator_semantic_dedup/job.yaml b/nemo_curator_semantic_dedup/job.yaml new file mode 100644 index 0000000..2f62179 --- /dev/null +++ b/nemo_curator_semantic_dedup/job.yaml @@ -0,0 +1,102 @@ +# NeMo Curator Image Semantic Deduplication Job +# View the docs: https://docs.anyscale.com/reference/job-api#jobconfig +# +# This job runs a two-phase pipeline: +# Phase 1: Convert parquet (URLs) → WebDataset tar files (using Ray Data, distributed) +# Phase 2: Run NeMo Curator image deduplication (CLIP embeddings → semantic dedup) +# +# The parquet → tar conversion uses Ray Data to distribute image downloads +# across all nodes in the cluster, providing much better scalability than +# single-node processing. + +name: nemo-curator-image-dedup + +# Build custom image with NeMo Curator CUDA dependencies +containerfile: ./Dockerfile + +# Declarative compute config — portable across clouds and Kubernetes +# CPU-only head node + GPU worker nodes (using ignore_head_node=True in executors) +# See: https://docs.anyscale.com/configuration/compute/declarative +compute_config: + head_node: + required_resources: + CPU: 8 + memory: 32Gi + resources: + CPU: 0 # Prevent any task scheduling on head node + worker_nodes: + - name: cpu-downloaders + required_resources: + CPU: 16 + memory: 32Gi + min_nodes: 0 + max_nodes: 10 + - name: a10g-gpu-workers + required_resources: + CPU: 48 + memory: 192Gi + GPU: 4 + required_labels: + ray.io/accelerator-type: A10G + min_nodes: 1 + max_nodes: 10 + +# Working directory - use the repo root (absolute) so Curator/ is included +working_dir: /home/ray/default + +# Environment variables for job configuration +# Override these when submitting to use your own data paths +env_vars: + # Input parquet files with image URLs (url and caption columns) + # Read directly from HuggingFace + INPUT_PARQUET: "hf://datasets/laion/relaion400m/" + MAX_ENTRIES: "10000000" + + # Directory for WebDataset tar files (created from parquet) + # Use /mnt/cluster_storage for persistence, or /home/ray/data for ephemeral + INPUT_WDS_DIR: "/mnt/cluster_storage/nemo_curator/webdataset" + + # Output directory for deduplicated images + OUTPUT_DIR: "/mnt/cluster_storage/nemo_curator/results" + + # Directory to store CLIP embeddings + EMBEDDINGS_DIR: "/mnt/cluster_storage/nemo_curator/embeddings" + + # Directory for duplicate removal parquets + REMOVAL_DIR: "/mnt/cluster_storage/nemo_curator/removal_ids" + + # Model weights directory (pre-downloaded in Docker image) + MODEL_DIR: "/home/ray/model_weights" + + # Processing settings + BATCH_SIZE: "64" + EMBEDDING_BATCH_SIZE: "64" + TAR_FILES_PER_PARTITION: "1" + ENTRIES_PER_TAR: "500" + # CPUs per reader task — controls max concurrent readers per node. + # 48 CPUs / 16 = 3 readers max. This also prevents readers and embedding + # actors from running simultaneously on the same node, avoiding both + # system-RAM OOM (readers ~10-55 GB each) and GPU-VRAM OOM (reader + # processes create CUDA contexts that compete with CLIP model loading). + READER_CPUS_PER_TASK: "16" + + # Don't hide GPUs from tasks that request num_gpus=0 (needed for DALI) + RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO: "0" + + # Disable Python output buffering for real-time logs + PYTHONUNBUFFERED: "1" + + # NCCL diagnostic output (also set as Dockerfile ENV for worker nodes) + NCCL_DEBUG: "WARN" + + +# The entrypoint script (-u for unbuffered output) +entrypoint: python -u examples/nemo_curator_semantic_dedup/image_dedup_example.py + +# Don't retry on failure - easier to debug +max_retries: 0 + +# Kill after 48 hours for full dataset (adjust based on dataset size) +# Full relaion400m (~361M images) will take many hours +timeout_s: 172800 +