diff --git a/async/Concurrent_Data_Fetching.ts b/async/Concurrent_Data_Fetching.ts new file mode 100644 index 00000000..a5dd458b --- /dev/null +++ b/async/Concurrent_Data_Fetching.ts @@ -0,0 +1,205 @@ +/** + * Professional Example: Concurrent Data Fetching in TypeScript + * + * This module demonstrates best practices for fetching multiple independent + * resources concurrently using `Promise.all` and `Promise.allSettled`. + * Concurrent fetching significantly improves performance compared to + * sequential fetching when requests do not depend on each other. + */ + +import { Semaphore } from './Semaphore' + +// --- Type Definitions --- + +export interface User { + id: number + name: string + email: string +} + +export interface Post { + id: number + userId: number + title: string + body: string +} + +export interface Comment { + id: number + postId: number + name: string + body: string +} + +// --- Mock API Service --- +// In a real application, these would be actual fetch calls (e.g., using native fetch or axios). + +const api = { + async fetchUsers(): Promise { + // Simulating network latency + await new Promise((resolve) => setTimeout(resolve, 500)) + return [ + { id: 1, name: 'Alice Smith', email: 'alice@example.com' }, + { id: 2, name: 'Bob Jones', email: 'bob@example.com' } + ] + }, + + async fetchPosts(): Promise { + await new Promise((resolve) => setTimeout(resolve, 800)) + return [ + { + id: 101, + userId: 1, + title: 'TypeScript Tips', + body: 'Use strict mode.' + }, + { + id: 102, + userId: 2, + title: 'Async/Await', + body: 'Makes promises easier to read.' + } + ] + }, + + async fetchComments(): Promise { + await new Promise((resolve) => setTimeout(resolve, 300)) + // Simulating a potential network error for demonstration purposes + if (Math.random() < 0.1) { + throw new Error('Network Error') + } + return [{ id: 1001, postId: 101, name: 'Charlie', body: 'Great tips!' }] + } +} + +// --- Concurrent Fetching Implementations --- + +/** + * Example 1: Using Promise.all + * + * Best when you need ALL requests to succeed. If any single promise rejects, + * the entire Promise.all rejects immediately (fail-fast behavior). + */ +export async function fetchAllDataStrict(): Promise<{ + users: User[] + posts: Post[] + comments: Comment[] +}> { + try { + console.log('Starting concurrent fetch (Strict)...') + const startTime = Date.now() + + // The requests are initiated concurrently. + // We await the resolution of all promises together. + const [users, posts, comments] = await Promise.all([ + api.fetchUsers(), + api.fetchPosts(), + api.fetchComments() + ]) + + const duration = Date.now() - startTime + console.log(`Successfully fetched all data in ${duration}ms`) + + return { users, posts, comments } + } catch (error) { + // If ANY of the fetches fail, we catch the error here. + console.error('Critical failure during concurrent data fetch:', error) + throw new Error('Failed to load application data. Please try again later.') + } +} + +/** + * Example 2: Using Promise.allSettled + * + * Best when requests are independent and you want to handle successes and + * failures individually without failing the entire operation if one request fails. + */ +export async function fetchDataResiliently(): Promise { + console.log('\nStarting concurrent fetch (Resilient)...') + const startTime = Date.now() + + const results = await Promise.allSettled([ + api.fetchUsers(), + api.fetchPosts(), + api.fetchComments() + ]) + + const duration = Date.now() - startTime + console.log(`Finished resilient fetch in ${duration}ms`) + + // Process results safely using type narrowing + const [usersResult, postsResult, commentsResult] = results + + if (usersResult.status === 'fulfilled') { + console.log(`✅ Loaded ${usersResult.value.length} users.`) + } else { + console.error(`❌ Failed to load users:`, usersResult.reason) + } + + if (postsResult.status === 'fulfilled') { + console.log(`✅ Loaded ${postsResult.value.length} posts.`) + } else { + console.error(`❌ Failed to load posts:`, postsResult.reason) + } + + if (commentsResult.status === 'fulfilled') { + console.log(`✅ Loaded ${commentsResult.value.length} comments.`) + } else { + console.warn( + `⚠️ Comments could not be loaded, continuing without them:`, + commentsResult.reason + ) + } +} + +/** + * Example 3: Rate-Limited Concurrent Fetching (Using Semaphore) + * + * Executes multiple async tasks with a limit on concurrency. + * This is CRITICAL for bulk fetching to avoid rate-limiting, network congestion, + * or overwhelming the server/database. + * + * @param tasks Array of functions that return Promises. + * @param limit The maximum number of concurrent executions. + */ +export async function concurrentFetch( + tasks: (() => Promise)[], + limit: number +): Promise { + const semaphore = new Semaphore(limit) + return Promise.all(tasks.map((task) => semaphore.run(task))) +} + +/** + * Demonstrates bulk fetching with concurrency limits. + */ +export async function fetchBulkDataWithLimits(): Promise { + console.log('\nStarting rate-limited bulk fetch (Max 3 concurrent)...') + const startTime = Date.now() + + // Create 10 dummy tasks that take 200ms each + const tasks = Array.from({ length: 10 }, (_, i) => async () => { + await new Promise((resolve) => setTimeout(resolve, 200)) + console.log(`Task ${i + 1} completed.`) + return i + 1 + }) + + // Limit to 3 concurrent requests at any given time + await concurrentFetch(tasks, 3) + + const duration = Date.now() - startTime + console.log(`Finished rate-limited fetch in ${duration}ms.`) +} + +// --- Execution --- +// If you want to test this file, you can uncomment the lines below: +// async function runExamples() { +// try { +// await fetchAllDataStrict(); +// await fetchDataResiliently(); +// await fetchBulkDataWithLimits(); +// } catch (err) { +// console.error("Application encountered a top-level error.", err); +// } +// } +// runExamples(); diff --git a/async/Semaphore.ts b/async/Semaphore.ts new file mode 100644 index 00000000..ac22009e --- /dev/null +++ b/async/Semaphore.ts @@ -0,0 +1,121 @@ +/** + * @function Semaphore + * @description A Semaphore is a synchronization primitive that limits the number + * of concurrent asynchronous operations. It maintains a set of permits. + * Each acquire() blocks if necessary until a permit is available, and then takes it. + * Each release() adds a permit, potentially releasing a blocking acquirer. + * + * @see https://en.wikipedia.org/wiki/Semaphore_(programming) + */ +export class Semaphore { + private queue: Array<{ + resolve: () => void + reject: (reason?: any) => void + timeoutId?: NodeJS.Timeout + }> = [] + private activeCount: number = 0 + + /** + * @param maxConcurrency The maximum number of concurrent operations allowed. + */ + constructor(private readonly maxConcurrency: number) { + if (maxConcurrency <= 0) { + throw new Error('Max concurrency must be at least 1.') + } + } + + /** + * Acquires a permit from the semaphore. + * If no permits are available, it returns a promise that resolves + * when a permit is released by another task. + * + * @param timeoutMs Optional. The maximum amount of time (in ms) to wait in the queue. + * @returns {Promise} A promise that resolves when a permit is acquired. + */ + public async acquire(timeoutMs?: number): Promise { + if (this.activeCount < this.maxConcurrency) { + this.activeCount++ + return Promise.resolve() + } + + return new Promise((resolve, reject) => { + const queueItem: { + resolve: () => void + reject: (reason?: any) => void + timeoutId?: NodeJS.Timeout + } = { resolve, reject } + + if (timeoutMs !== undefined) { + queueItem.timeoutId = setTimeout(() => { + // Remove from queue + const index = this.queue.indexOf(queueItem) + if (index !== -1) { + this.queue.splice(index, 1) + } + reject( + new Error( + `Timeout of ${timeoutMs}ms exceeded while waiting for Semaphore permit.` + ) + ) + }, timeoutMs) + } + + this.queue.push(queueItem) + }) + } + + /** + * Releases a permit back to the semaphore. + * If there are tasks waiting in the queue, the first one is notified + * and allowed to proceed. + */ + public release(): void { + const nextTask = this.queue.shift() + if (nextTask) { + // Clear the timeout if the task had one + if (nextTask.timeoutId) { + clearTimeout(nextTask.timeoutId) + } + // Pass the permit directly to the next waiting task + nextTask.resolve() + } else { + // No one is waiting, so just decrement the active count + this.activeCount-- + } + } + + /** + * A helper method that wraps an asynchronous task. + * It handles the acquisition and release of the permit automatically, + * even if the task fails. + * + * @param task A function that returns a Promise. + * @param queueTimeoutMs Optional. Throw an error if the task waits in the queue longer than this. + * @returns {Promise} The result of the task. + */ + public async run( + task: () => Promise, + queueTimeoutMs?: number + ): Promise { + await this.acquire(queueTimeoutMs) + try { + return await task() + } finally { + this.release() + } + } + + /** + * Returns the current number of active permits. + */ + public getActiveCount(): number { + return this.activeCount + } + + /** + * Returns the number of tasks currently waiting for a permit. + */ + public getQueueLength(): number { + return this.queue.length + } +} diff --git a/async/test/Semaphores.test.ts b/async/test/Semaphores.test.ts new file mode 100644 index 00000000..2525fadd --- /dev/null +++ b/async/test/Semaphores.test.ts @@ -0,0 +1,101 @@ +import { Semaphore } from '../Semaphore' +import { concurrentFetch } from '../Concurrent_Data_Fetching' + +describe('Semaphore', () => { + it('should limit concurrency to the specified amount', async () => { + const limit = 2 + const semaphore = new Semaphore(limit) + let activeTasks = 0 + let maxObservedActive = 0 + + const task = async () => { + activeTasks++ + maxObservedActive = Math.max(maxObservedActive, activeTasks) + await new Promise((resolve) => setTimeout(resolve, 50)) + activeTasks-- + } + + const tasks = Array.from({ length: 5 }, () => semaphore.run(task)) + await Promise.all(tasks) + + expect(maxObservedActive).toBeLessThanOrEqual(limit) + }) + + it('should handle task failures gracefully and release permits', async () => { + const semaphore = new Semaphore(1) + + const failingTask = async () => { + throw new Error('Task failed') + } + + const successfulTask = async () => { + return 'Success' + } + + await expect(semaphore.run(failingTask)).rejects.toThrow('Task failed') + + const result = await semaphore.run(successfulTask) + expect(result).toBe('Success') + expect(semaphore.getActiveCount()).toBe(0) + }) + + it('should timeout if waiting in the queue too long', async () => { + const semaphore = new Semaphore(1) + + // Task 1 takes the only permit and holds it for 100ms + const longTask = async () => + new Promise((resolve) => setTimeout(resolve, 100)) + const p1 = semaphore.run(longTask) + + // Task 2 tries to run, but will only wait in queue for 20ms + const impatientTask = async () => { + return 'Done' + } + + await expect(semaphore.run(impatientTask, 20)).rejects.toThrow( + 'Timeout of 20ms exceeded while waiting for Semaphore permit.' + ) + + await p1 // Wait for the first task to finish cleanly + }) +}) + +describe('concurrentFetch', () => { + it('should return results in the correct order', async () => { + const task1 = async () => { + await new Promise((r) => setTimeout(r, 50)) + return 'First' + } + const task2 = async () => { + await new Promise((r) => setTimeout(r, 10)) + return 'Second' + } + const task3 = async () => { + await new Promise((r) => setTimeout(r, 30)) + return 'Third' + } + + const results = await concurrentFetch([task1, task2, task3], 2) + + expect(results).toEqual(['First', 'Second', 'Third']) + }) + + it('should execute all tasks without exceeding the limit', async () => { + let active = 0 + let maxActive = 0 + + const createTask = (id: number) => async () => { + active++ + maxActive = Math.max(maxActive, active) + await new Promise((r) => setTimeout(r, 20)) + active-- + return id + } + + const tasks = Array.from({ length: 10 }, (_, i) => createTask(i)) + const results = await concurrentFetch(tasks, 3) + + expect(maxActive).toBeLessThanOrEqual(3) + expect(results).toEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) + }) +})