Skip to content

Nemo curator clean#51

Draft
avigyabb wants to merge 6 commits intoanyscale:mainfrom
avigyabb:nemo-curator-clean
Draft

Nemo curator clean#51
avigyabb wants to merge 6 commits intoanyscale:mainfrom
avigyabb:nemo-curator-clean

Conversation

@avigyabb
Copy link
Copy Markdown
Contributor

End-to-end example that downloads images from a HuggingFace parquet dataset, generates CLIP embeddings on GPUs using NeMo Curator, finds near-duplicates via K-means + DBSCAN, and writes a clean deduplicated dataset — all running as a distributed Anyscale job.

from PIL import Image


def download_single_image(url: str, session: requests.Session) -> bytes | None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you try the ray download expr and bench the perf, there is a major update to this feature ray-project/ray#61735 that recently got merge that you should try

pool_connections=100, pool_maxsize=100, max_retries=0,
)
session.mount("http://", adapter)
session.mount("https://", adapter)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use ray download expr and compare with this

file_extensions=["parquet"],
columns=["url", "caption"],
filesystem=HfFileSystem(token=os.environ["HF_TOKEN"]),
concurrency=10,
Copy link
Copy Markdown
Contributor

@xyuzh xyuzh Apr 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the concurrency is 10, don't you pass concurrency as a param?

memory: 32Gi
min_nodes: 0
max_nodes: 10
- name: a10g-gpu-workers
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have you been using cpus in the a10g workers as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the CPUs should also be utilized.

"""
reader = ImageReaderStage(batch_size=config.batch_size, num_gpus_per_worker=0)
reader.resources.cpus = config.reader_cpus_per_task
reader.resources.gpus = 0.01
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is gpus set here

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is placement group created in the nemo side of code?

Copy link
Copy Markdown
Contributor Author

@avigyabb avigyabb Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is gpus set here

I could probably be more organized with this. This is essentially a trick I use to make DALI get's scheduled on the gpu nodes and not the cpu nodes which are meant for downloading the images.

is placement group created in the nemo side of code?

No, I don't believe they are used anywhere in the example.


if config.max_entries is not None:
ds = ds.limit(config.max_entries)
ds = ds.repartition(num_blocks=max(100, config.max_entries // 1000))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also num_blocks needs to be related to the num of cpus available?

logger.info(f"Download complete: {total_success} images in {num_shards} shards ({success_rate:.1f}% success rate)")

# Use executors that avoid scheduling on CPU-only head node
streaming_executor = RayDataExecutor(ignore_head_node=True)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the head node get used by default?
I remember anyscale doesn't allow scheduling on head node in multi-node case

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I think we can remove these, mainly since I added some logic on the nemo curator side to handle this. For some reason I was running into errors there, let me take a closer look and let you know.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants