diff --git a/src/caching/caching.service.spec.ts b/src/caching/caching.service.spec.ts index 454d1395..3c76e0ce 100644 --- a/src/caching/caching.service.spec.ts +++ b/src/caching/caching.service.spec.ts @@ -1,6 +1,38 @@ -import { CachingService } from './caching.service'; +import { CachingService, deriveCacheType, buildCounterKeys } from './caching.service'; import { MetricsCollectionService } from '../monitoring/metrics/metrics-collection.service'; +// ── Helpers ────────────────────────────────────────────────────────────────── + +/** + * Build a jest-mocked ioredis-like client with just the methods the + * CachingService now uses (incr, mget, scan, del). We intentionally keep this + * minimal so the surface area that the tests exercise matches production. + */ +function createMockRedis() { + const store = new Map(); + const mget = jest.fn(async (...keys: string[]) => + keys.map((k) => (store.has(k) ? String(store.get(k)) : null)), + ); + const incr = jest.fn(async (key: string) => { + const next = (store.get(key) ?? 0) + 1; + store.set(key, next); + return next; + }); + const del = jest.fn(async (...keys: string[]) => { + let removed = 0; + for (const k of keys) { + if (store.delete(k)) removed += 1; + } + return removed; + }); + const scan = jest.fn(async (_cursor: string, _match: string, pattern: string) => { + const re = new RegExp(`^${pattern.replace(/\*/g, '.*')}$`); + const matches = Array.from(store.keys()).filter((k) => re.test(k)); + return ['0', matches] as [string, string[]]; + }); + return { store, incr, mget, scan, del }; +} + describe('CachingService', () => { let service: CachingService; let cacheManager: { @@ -10,6 +42,7 @@ describe('CachingService', () => { clear: jest.Mock; }; let metrics: { updateCacheHitRate: jest.Mock }; + let redis: ReturnType; beforeEach(() => { cacheManager = { @@ -22,12 +55,46 @@ describe('CachingService', () => { (cacheManager as any).store = { keys: jest.fn().mockResolvedValue(['cache:test:1', 'cache:test:2']), }; + redis = createMockRedis(); + service = new CachingService( cacheManager as never, metrics as unknown as MetricsCollectionService, + undefined, + redis as never, ); }); + // ── deriveCacheType / buildCounterKeys ────────────────────────────────────── + + describe('deriveCacheType', () => { + it('returns the second segment for cache:{type}:... keys', () => { + expect(deriveCacheType('cache:test:1')).toBe('test'); + expect(deriveCacheType('cache:user:42')).toBe('user'); + expect(deriveCacheType('cache:course:popular')).toBe('course'); + }); + + it('returns "default" for keys with no cache: prefix', () => { + expect(deriveCacheType('hit-key')).toBe('default'); + expect(deriveCacheType('foo:bar')).toBe('default'); + }); + + it('returns "default" for empty / invalid input', () => { + expect(deriveCacheType('')).toBe('default'); + }); + }); + + describe('buildCounterKeys', () => { + it('produces namespaced hit/miss keys', () => { + expect(buildCounterKeys('application')).toEqual({ + hits: 'cache:hits:application', + misses: 'cache:misses:application', + }); + }); + }); + + // ── getOrSet ─────────────────────────────────────────────────────────────── + describe('getOrSet', () => { it('returns cached value without calling factory on hit', async () => { cacheManager.get.mockResolvedValue({ id: '1' }); @@ -37,8 +104,13 @@ describe('CachingService', () => { expect(result).toEqual({ id: '1' }); expect(factory).not.toHaveBeenCalled(); - expect(service.getStats().hits).toBe(1); - expect(service.getStats().misses).toBe(0); + + const stats = await service.getStats('test'); + expect(stats.hits).toBe(1); + expect(stats.misses).toBe(0); + + // INCR must have been called against the correct cluster-wide key + expect(redis.incr).toHaveBeenCalledWith('cache:hits:test'); }); it('populates cache from factory on miss', async () => { @@ -50,10 +122,16 @@ describe('CachingService', () => { expect(result).toEqual({ id: '2' }); expect(factory).toHaveBeenCalledTimes(1); expect(cacheManager.set).toHaveBeenCalledWith('cache:test:2', { id: '2' }, 120000); - expect(service.getStats().misses).toBe(1); + expect(redis.incr).toHaveBeenCalledWith('cache:misses:test'); + + const stats = await service.getStats('test'); + expect(stats.hits).toBe(0); + expect(stats.misses).toBe(1); }); }); + // ── deleteByPattern ──────────────────────────────────────────────────────── + describe('deleteByPattern', () => { it('uses store.keys to delete matching keys when client scan is unavailable', async () => { await service.deleteByPattern('cache:test:*'); @@ -63,16 +141,150 @@ describe('CachingService', () => { }); }); - describe('hit rate metrics', () => { - it('calculates hit rate and publishes to metrics', async () => { + // ── Cluster-wide hit rate (Issue #811) ───────────────────────────────────── + + describe('distributed hit rate metrics', () => { + it('publishes aggregated cluster-wide hit rate to Prometheus', async () => { + // Simulate three pods: each has independently INCRemented the shared + // Redis counter. The reported hit rate must reflect what Redis holds, + // NOT just what this service instance has seen locally. + redis.store.set('cache:hits:application', 7); + redis.store.set('cache:misses:application', 3); + + await service.publishHitRateMetrics('application'); + + expect(metrics.updateCacheHitRate).toHaveBeenCalledWith('application', 70); + + // The read path must use MGET against the CRedis keys, not local state. + expect(redis.mget).toHaveBeenCalledWith('cache:hits:application', 'cache:misses:application'); + }); + + it('uses literal cache:hits:{type} / cache:misses:{type} keys (issue #811)', async () => { cacheManager.get.mockResolvedValueOnce('cached').mockResolvedValueOnce(undefined); await service.get('hit-key'); await service.get('miss-key'); - service.publishHitRateMetrics('application'); + // Issue spec: keys must be exactly cache:hits:{type} and cache:misses:{type} + expect(redis.incr).toHaveBeenCalledWith('cache:hits:default'); + expect(redis.incr).toHaveBeenCalledWith('cache:misses:default'); + }); + + it('aggregates hits/misses per cache type independently', async () => { + redis.store.set('cache:hits:test', 8); + redis.store.set('cache:misses:test', 2); + redis.store.set('cache:hits:course', 1); + redis.store.set('cache:misses:course', 4); + + const testStats = await service.getStats('test'); + expect(testStats.hits).toBe(8); + expect(testStats.misses).toBe(2); + expect(testStats.hitRate).toBe(80); + + const courseStats = await service.getStats('course'); + expect(courseStats.hits).toBe(1); + expect(courseStats.misses).toBe(4); + expect(courseStats.hitRate).toBe(20); + }); + + it('returns zero hit rate when no counters have been recorded', async () => { + const stats = await service.getStats('application'); + expect(stats.hits).toBe(0); + expect(stats.misses).toBe(0); + expect(stats.hitRate).toBe(0); + }); + + it('returns aggregate stats across every type', async () => { + redis.store.set('cache:hits:test', 5); + redis.store.set('cache:misses:test', 5); + redis.store.set('cache:hits:course', 2); + redis.store.set('cache:misses:course', 8); + + const aggregate = await service.getAggregateStats(); + expect(aggregate.hits).toBe(7); + expect(aggregate.misses).toBe(13); + // 7 / 20 = 35 + expect(aggregate.hitRate).toBeCloseTo(35, 1); + }); + }); + + // ── Counter reset mechanism ───────────────────────────────────────────────── + + describe('resetStats', () => { + it('deletes cluster-wide counter keys for a single type', async () => { + redis.store.set('cache:hits:test', 10); + redis.store.set('cache:misses:test', 4); + + await service.resetStats('test'); + + expect(redis.del).toHaveBeenCalledWith('cache:hits:test', 'cache:misses:test'); + + const stats = await service.getStats('test'); + expect(stats.hits).toBe(0); + expect(stats.misses).toBe(0); + }); + + it('deletes all cluster-wide counter keys when called without an argument', async () => { + redis.store.set('cache:hits:test', 1); + redis.store.set('cache:misses:test', 2); + redis.store.set('cache:hits:course', 3); + redis.store.set('cache:misses:course', 4); + + await service.resetStats(); + + expect(redis.scan).toHaveBeenCalled(); + expect(redis.del).toHaveBeenCalled(); + + const aggregate = await service.getAggregateStats(); + expect(aggregate.hits).toBe(0); + expect(aggregate.misses).toBe(0); + }); + }); + + // ── Graceful degradation ─────────────────────────────────────────────────── + + describe('fallback behaviour when Redis is unavailable', () => { + it('falls back to local counters and still reports stats', async () => { + // Simulate a broken Redis by throwing on every read. + const brokenMget = jest.fn().mockRejectedValue(new Error('ECONNREFUSED')); + const brokenIncr = jest.fn().mockRejectedValue(new Error('ECONNREFUSED')); + + // Direct construction: opt in to local fallback explicitly (default is + // already enabled in production but we make it explicit here). + const localOnly = new CachingService( + cacheManager as never, + metrics as unknown as MetricsCollectionService, + { + get: (key: string, fallback?: any) => + key === 'CACHE_COUNTER_FALLBACK_LOCAL' ? true : fallback, + } as any, + { incr: brokenIncr, mget: brokenMget, scan: jest.fn(), del: jest.fn() } as never, + ); + + cacheManager.get.mockResolvedValueOnce(undefined).mockResolvedValue({ id: 'x' }); + await localOnly.get('miss-key'); + await localOnly.get('hit-key'); + + const stats = await localOnly.getStats(); + expect(stats.hits).toBe(1); + expect(stats.misses).toBe(1); + expect(stats.hitRate).toBe(50); + }); + + it('publishes zero hit rate when Redis is unavailable and fallback disabled', async () => { + const brokenMget = jest.fn().mockRejectedValue(new Error('ECONNREFUSED')); + const brokenIncr = jest.fn().mockRejectedValue(new Error('ECONNREFUSED')); + + const noFallback = new CachingService( + cacheManager as never, + metrics as unknown as MetricsCollectionService, + { + get: (key: string, fallback?: any) => + key === 'CACHE_COUNTER_FALLBACK_LOCAL' ? false : fallback, + } as any, + { incr: brokenIncr, mget: brokenMget, scan: jest.fn(), del: jest.fn() } as never, + ); - expect(service.getStats().hitRate).toBe(50); - expect(metrics.updateCacheHitRate).toHaveBeenCalledWith('application', 50); + await expect(noFallback.getStats('application')).rejects.toThrow('ECONNREFUSED'); }); }); }); diff --git a/src/caching/caching.service.ts b/src/caching/caching.service.ts index 5ab0062b..7a2ed32a 100644 --- a/src/caching/caching.service.ts +++ b/src/caching/caching.service.ts @@ -1,7 +1,11 @@ import { Inject, Injectable, Logger, Optional } from '@nestjs/common'; +import { Cron, CronExpression } from '@nestjs/schedule'; +import { ConfigService } from '@nestjs/config'; import { CACHE_MANAGER } from '@nestjs/cache-manager'; import { Cache } from 'cache-manager'; +import Redis from 'ioredis'; import { MetricsCollectionService } from '../monitoring/metrics/metrics-collection.service'; +import { getSharedRedisClient } from '../config/cache.config'; export interface CacheStats { hits: number; @@ -9,24 +13,112 @@ export interface CacheStats { hitRate: number; } +/** + * Per-key type derivation: + * + * - `cache:test:1` → type `test` + * - `cache:user:42` → type `user` + * - `cache:course:popular` → type `course` + * - `hit-key` → type `default` + * + * This provides a stable, coarse-grained namespace that aggregates traffic + * across pods without blowing up the cardinality of the counter key space. + */ +const CACHE_TYPE_FALLBACK = 'default'; + +/** + * Extracts the cache-type segment from a cache key. + * + * Cache keys follow a `cache:{type}:...` convention throughout the codebase. + * For keys that don't match this pattern we fall back to a single `default` + * bucket so that all such counters roll up under one key. + */ +export const deriveCacheType = (key: string): string => { + if (typeof key !== 'string' || key.length === 0) { + return CACHE_TYPE_FALLBACK; + } + + const parts = key.split(':'); + if (parts.length >= 2 && parts[0] === 'cache') { + return parts[1] || CACHE_TYPE_FALLBACK; + } + return CACHE_TYPE_FALLBACK; +}; + +/** + * Builds the Redis counter keys for a given cache type. + * + * Keys are namespaced by cache type (per the issue spec) and live in the + * `cache:hits:*` / `cache:misses:*` namespaces so they cannot collide with + * cached application data. + * + * Daily reset is enforced by {@link CachingService.dailyReset} which runs + * nightly via a NestJS cron job. + */ +export const buildCounterKeys = (cacheType: string): { hits: string; misses: string } => ({ + hits: `cache:hits:${cacheType}`, + misses: `cache:misses:${cacheType}`, +}); + @Injectable() export class CachingService { private readonly logger = new Logger(CachingService.name); - private hits = 0; - private misses = 0; + private readonly redis: Redis | undefined; + private readonly fallbackLocal: boolean; + + // Local fallback counters — only used when Redis is unavailable or when + // CACHE_COUNTER_FALLBACK_LOCAL=true is explicitly set. These intentionally + // mirror the old instance variable behaviour so single-instance tests / + // dev environments still report something useful instead of silently + // returning zero hits. + private localHits = 0; + private localMisses = 0; constructor( @Inject(CACHE_MANAGER) private readonly cacheManager: Cache, @Optional() private readonly metrics?: MetricsCollectionService, - ) {} + @Optional() private readonly configService?: ConfigService, + @Optional() redis?: Redis, + ) { + // Prefer an explicitly injected client (used by tests / module overrides), + // then fall back to the configured shared singleton, then to local-only. + this.redis = redis ?? this.resolveRedisFromConfig(); + this.fallbackLocal = this.configService?.get( + 'CACHE_COUNTER_FALLBACK_LOCAL', + true, + ) ?? true; + } + + private resolveRedisFromConfig(): Redis | undefined { + if (process.env.NODE_ENV === 'test') { + // Never open a real Redis connection during tests that go through DI. + // Tests that need to exercise Redis-backed behaviour inject a mock + // client directly via the constructor. + return undefined; + } + + const enabled = this.configService?.get('CACHE_COUNTER_ENABLE_REDIS', true); + if (enabled === false) { + return undefined; + } + try { + return getSharedRedisClient(this.configService); + } catch (err) { + this.logger.warn( + `Could not acquire shared Redis client for hit/miss counters — ` + + `falling back to local in-process counters. ${(err as Error)?.message ?? ''}`, + ); + return undefined; + } + } async get(key: string): Promise { const value = await this.cacheManager.get(key); if (value === undefined || value === null) { - this.recordMiss(key); + await this.recordMiss(deriveCacheType(key), key); return undefined; } - this.recordHit(key); + await this.recordHit(deriveCacheType(key), key); return value; } @@ -93,33 +185,235 @@ export class CachingService { } } - getStats(): CacheStats { - const total = this.hits + this.misses; + // ── Cluster-wide stats ───────────────────────────────────────────────────── + + /** + * Returns the cluster-wide hit/miss stats for a given cache type. + * + * Reads from the shared Redis keys `cache:hits:{type}` and + * `cache:misses:{type}` so the returned numbers reflect the aggregate + * traffic across all pods rather than just the local instance. + * + * @param cacheType Cache type to query (defaults to `default`). + */ + async getStats(cacheType: string = CACHE_TYPE_FALLBACK): Promise { + const resolvedType = cacheType || CACHE_TYPE_FALLBACK; + const { hits: hitsKey, misses: missesKey } = buildCounterKeys(resolvedType); + + let hits = 0; + let misses = 0; + + if (this.redis) { + try { + const results = await this.redis.mget(hitsKey, missesKey); + hits = this.parseCounter(results[0]); + misses = this.parseCounter(results[1]); + } catch (err) { + if (!this.fallbackLocal) { + this.logger.error( + `Failed to read distributed cache counters for type "${resolvedType}": ${(err as Error).message}`, + ); + throw err; + } + this.logger.warn( + `Failed to read distributed cache counters for type "${resolvedType}", ` + + `falling back to local counters. ${(err as Error).message}`, + ); + hits = this.localHits; + misses = this.localMisses; + } + } else if (this.fallbackLocal) { + hits = this.localHits; + misses = this.localMisses; + } + + const total = hits + misses; return { - hits: this.hits, - misses: this.misses, - hitRate: total === 0 ? 0 : (this.hits / total) * 100, + hits, + misses, + hitRate: total === 0 ? 0 : (hits / total) * 100, }; } - publishHitRateMetrics(cacheType = 'application'): void { - const { hitRate } = this.getStats(); - this.metrics?.updateCacheHitRate(cacheType, hitRate); - this.logger.debug(`Cache hit rate (${cacheType}): ${hitRate.toFixed(1)}%`); + /** + * Returns aggregate stats across every known cache type. Useful for top-level + * dashboards that want a single cluster-wide hit-rate number rather than a + * per-type breakdown. + * + * Each MATCH pattern is scanned in its own complete SCAN loop because Redis + * SCAN cursors are per-iteration — sharing one cursor between two parallel + * MATCH scans would terminate early whenever one of them reaches `'0'` and + * silently drop keys from the other. + */ + async getAggregateStats(): Promise { + if (!this.redis) { + if (!this.fallbackLocal) { + return { hits: 0, misses: 0, hitRate: 0 }; + } + const total = this.localHits + this.localMisses; + return { + hits: this.localHits, + misses: this.localMisses, + hitRate: total === 0 ? 0 : (this.localHits / total) * 100, + }; + } + + const hitsKeys = await this.scanKeys('cache:hits:*'); + const missesKeys = await this.scanKeys('cache:misses:*'); + + if (hitsKeys.length === 0 && missesKeys.length === 0) { + return { hits: 0, misses: 0, hitRate: 0 }; + } + + const hits = hitsKeys.length === 0 ? 0 : this.sumCounters(await this.redis.mget(...hitsKeys)); + const misses = + missesKeys.length === 0 ? 0 : this.sumCounters(await this.redis.mget(...missesKeys)); + const total = hits + misses; + return { + hits, + misses, + hitRate: total === 0 ? 0 : (hits / total) * 100, + }; + } + + /** + * Publishes the cluster-wide cache hit rate to Prometheus. + * + * Now reads from Redis so the reported value reflects traffic from every + * pod, not just this instance. + */ + async publishHitRateMetrics(cacheType: string = 'application'): Promise { + const stats = await this.getStats(cacheType); + this.metrics?.updateCacheHitRate(cacheType, stats.hitRate); + this.logger.debug( + `Cache hit rate (${cacheType}): ${stats.hitRate.toFixed(1)}% ` + + `(hits=${stats.hits}, misses=${stats.misses})`, + ); + } + + /** + * Resets the distributed counters for a given cache type (or every known + * type when called without an argument). + * + * Note: this only resets the distributed counters, NOT the underlying + * cached application data. + */ + async resetStats(cacheType?: string): Promise { + if (cacheType) { + const { hits, misses } = buildCounterKeys(cacheType); + await this.resetKeys([hits, misses]); + } else if (this.redis) { + const [hitsKeys, missesKeys] = await Promise.all([ + this.scanKeys('cache:hits:*'), + this.scanKeys('cache:misses:*'), + ]); + await this.resetKeys([...hitsKeys, ...missesKeys]); + } + + // Always reset the local in-process fallback so test runs are clean. + this.localHits = 0; + this.localMisses = 0; } - resetStats(): void { - this.hits = 0; - this.misses = 0; + /** Independently completes a SCAN over a single MATCH pattern. */ + private async scanKeys(pattern: string): Promise { + if (!this.redis) { + return []; + } + const out: string[] = []; + let cursor = '0'; + do { + const [next, batch] = await this.redis.scan(cursor, 'MATCH', pattern, 'COUNT', 200); + cursor = next; + out.push(...(batch as string[])); + } while (cursor !== '0'); + return out; + } + + /** Sums the integer values stored in the supplied raw Redis values, defaulting to 0. */ + private sumCounters(rawValues: Array): number { + return rawValues.reduce((acc, v) => acc + this.parseCounter(v), 0); + } + + /** + * Daily cron: zeros all distributed hit/miss counters. Runs at midnight UTC + * so dashboards see a clean daily slate without manual intervention. + */ + @Cron(CronExpression.EVERY_DAY_AT_MIDNIGHT, { timeZone: 'UTC' }) + async dailyReset(): Promise { + this.logger.log('Performing daily reset of distributed cache hit/miss counters'); + await this.resetStats(); + } + + // ── Internal: counter increments ────────────────────────────────────────── + + private parseCounter(value: string | null | undefined): number { + if (value === null || value === undefined) { + return 0; + } + const parsed = parseInt(value, 10); + return Number.isFinite(parsed) ? parsed : 0; } - private recordHit(key: string): void { - this.hits += 1; + private async recordHit(cacheType: string, key: string): Promise { + const { hits } = buildCounterKeys(cacheType); + await this.incrOrFallback(hits, () => { + this.localHits += 1; + }); this.logger.debug(`Cache hit: ${key}`); } - private recordMiss(key: string): void { - this.misses += 1; + private async recordMiss(cacheType: string, key: string): Promise { + const { misses } = buildCounterKeys(cacheType); + await this.incrOrFallback(misses, () => { + this.localMisses += 1; + }); this.logger.debug(`Cache miss: ${key}`); } + + /** + * Atomically increments `redisKey` against Redis. If Redis is unavailable + * (not configured, disabled, or errored) and local fallback is enabled, + * `onFallback` is invoked so the in-process counter still moves forward. + * If both Redis-writing and fallback are disabled the increment is silently + * dropped — this matches the previous behaviour of relying on Redis to be + * the source of truth. + */ + private async incrOrFallback(redisKey: string, onFallback: () => void): Promise { + if (!this.redis) { + if (this.fallbackLocal) { + onFallback(); + } + return; + } + try { + await this.redis.incr(redisKey); + } catch (err) { + if (!this.fallbackLocal) { + this.logger.error( + `Failed to INCR ${redisKey} and local fallback is disabled: ${(err as Error).message}`, + (err as Error).stack, + ); + return; + } + onFallback(); + this.logger.debug( + `Failed to INCR ${redisKey} — using local fallback counter. ${(err as Error).message}`, + ); + } + } + + private async resetKeys(keys: string[]): Promise { + if (!this.redis || keys.length === 0) { + return; + } + try { + await this.redis.del(...keys); + } catch (err) { + this.logger.error( + `Failed to reset distributed cache counter keys: ${(err as Error).message}`, + (err as Error).stack, + ); + } + } } diff --git a/src/workers/base/base.worker.ts b/src/workers/base/base.worker.ts index e4f58cec..70f16101 100644 --- a/src/workers/base/base.worker.ts +++ b/src/workers/base/base.worker.ts @@ -1,4 +1,4 @@ -import { Logger, Inject } from '@nestjs/common'; +import { Logger } from '@nestjs/common'; import { Job } from 'bull'; import Redis from 'ioredis'; import { getSharedRedisClient } from '../../config/cache.config'; diff --git a/src/workers/orchestration/worker-orchestration.service.ts b/src/workers/orchestration/worker-orchestration.service.ts index 91275f5d..59eb3c13 100644 --- a/src/workers/orchestration/worker-orchestration.service.ts +++ b/src/workers/orchestration/worker-orchestration.service.ts @@ -1,4 +1,4 @@ -import { Injectable, Logger, OnModuleInit, OnModuleDestroy, Inject } from '@nestjs/common'; +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; import { EventEmitter2 } from '@nestjs/event-emitter'; import { Job } from 'bull'; import { BaseWorker } from '../base/base.worker';