Conversation
| from PIL import Image | ||
|
|
||
|
|
||
| def download_single_image(url: str, session: requests.Session) -> bytes | None: |
There was a problem hiding this comment.
can you try the ray download expr and bench the perf, there is a major update to this feature ray-project/ray#61735 that recently got merge that you should try
| pool_connections=100, pool_maxsize=100, max_retries=0, | ||
| ) | ||
| session.mount("http://", adapter) | ||
| session.mount("https://", adapter) |
There was a problem hiding this comment.
use ray download expr and compare with this
| file_extensions=["parquet"], | ||
| columns=["url", "caption"], | ||
| filesystem=HfFileSystem(token=os.environ["HF_TOKEN"]), | ||
| concurrency=10, |
There was a problem hiding this comment.
Why the concurrency is 10, don't you pass concurrency as a param?
| memory: 32Gi | ||
| min_nodes: 0 | ||
| max_nodes: 10 | ||
| - name: a10g-gpu-workers |
There was a problem hiding this comment.
have you been using cpus in the a10g workers as well?
There was a problem hiding this comment.
Yes, the CPUs should also be utilized.
| """ | ||
| reader = ImageReaderStage(batch_size=config.batch_size, num_gpus_per_worker=0) | ||
| reader.resources.cpus = config.reader_cpus_per_task | ||
| reader.resources.gpus = 0.01 |
There was a problem hiding this comment.
is placement group created in the nemo side of code?
There was a problem hiding this comment.
why is gpus set here
I could probably be more organized with this. This is essentially a trick I use to make DALI get's scheduled on the gpu nodes and not the cpu nodes which are meant for downloading the images.
is placement group created in the nemo side of code?
No, I don't believe they are used anywhere in the example.
|
|
||
| if config.max_entries is not None: | ||
| ds = ds.limit(config.max_entries) | ||
| ds = ds.repartition(num_blocks=max(100, config.max_entries // 1000)) |
There was a problem hiding this comment.
also num_blocks needs to be related to the num of cpus available?
| logger.info(f"Download complete: {total_success} images in {num_shards} shards ({success_rate:.1f}% success rate)") | ||
|
|
||
| # Use executors that avoid scheduling on CPU-only head node | ||
| streaming_executor = RayDataExecutor(ignore_head_node=True) |
There was a problem hiding this comment.
does the head node get used by default?
I remember anyscale doesn't allow scheduling on head node in multi-node case
There was a problem hiding this comment.
Yea, I think we can remove these, mainly since I added some logic on the nemo curator side to handle this. For some reason I was running into errors there, let me take a closer look and let you know.
End-to-end example that downloads images from a HuggingFace parquet dataset, generates CLIP embeddings on GPUs using NeMo Curator, finds near-duplicates via K-means + DBSCAN, and writes a clean deduplicated dataset — all running as a distributed Anyscale job.