Skip to content

Commit 69b81a6

Browse files
feat(data-retention): granular PII redaction stages (input + block outputs) (#5272)
* feat(data-retention): granular PII redaction stages (input + block outputs) * fix(data-retention): propagate block-output redaction into child workflows * fix(data-retention): close block-output redaction gaps on streaming + resume * fix(data-retention): drain+mask streamed output, resolve PII policy unconditionally (no fail-open) * test(testing): support leftJoin().where().limit() in shared db mock * fix(data-retention): mask agent/Pi memory writes under block-output redaction * fix(data-retention): guard partial PII stages in GET normalize * fix(data-retention): mask seeded memory messages under block-output redaction * fix(guardrails): fail closed on misaligned Presidio batch responses * fix(data-retention): enabled stage with no entity types redacts all (no fail-open) * fix(data-retention): reject enabled stage with no entity types; empty = off everywhere * docs(data-retention): note resume remask covers inline values only * fix(data-retention): scrub offloaded large-value refs from logs when block-output redaction is off * fix(data-retention): hydrate, mask, and re-store large-value refs in logs (preserve redacted content) * fix(data-retention): always apply logs policy to large-value refs when logs stage is on * perf(data-retention): drop redaction byte ceiling, parallelize chunks (env-tunable), remove request timeouts, sync large-value walk * feat(data-retention): gate granular PII stages behind pii-granular-redaction flag - New pii-granular-redaction feature flag (fallback PII_GRANULAR_REDACTION), layered on pii-redaction, gating the execution-altering input + block-output stages - Route returns piiGranularRedactionEnabled and rejects enabling granular stages when off - UI shows only the Logs stage tab unless the flag is on; clamps active stage - Drop the per-search Select all toggle; add a Deselect all action to the PII section header * docs(pii): describe Presidio as a standalone service, not a sidecar Presidio now runs as its own ECS service (and, in Helm, its own Deployment + Service) reached over the network via PII_URL — not a sidecar in the app task. Update README, code comments, env docs, Dockerfiles, and the Helm chart docs to match, and note the deploy requirement that PII_URL must be reachable. * fix(data-retention): re-mask offloaded large-value refs on resume + don't lock out granular saves - Resume/run-from-block restore now hydrates → masks → re-stores large-value refs in restored blockStates (not just inline strings), so a value offloaded before the block-output stage was enabled can't warm raw PII into downstream blocks. Fails fast. - pii-large-values: add onFailure mode (throw on the execution path, scrub for logs) and redactLargeValueRefsInValue for arbitrary (non-RedactablePayload) values - Granular flag gate now rejects only NEW off→on granular enablement, so orgs that already configured granular stages can still save retention settings when the flag is off
1 parent 7457184 commit 69b81a6

43 files changed

Lines changed: 2110 additions & 510 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

apps/pii/server.py

Lines changed: 90 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,13 @@
1010
from typing import Any
1111

1212
from fastapi import FastAPI
13-
from presidio_analyzer import AnalyzerEngine, Pattern, PatternRecognizer, RecognizerResult
13+
from presidio_analyzer import (
14+
AnalyzerEngine,
15+
BatchAnalyzerEngine,
16+
Pattern,
17+
PatternRecognizer,
18+
RecognizerResult,
19+
)
1420
from presidio_analyzer.nlp_engine import NlpEngineProvider
1521
from presidio_analyzer.predefined_recognizers import (
1622
AuAbnRecognizer,
@@ -133,6 +139,7 @@ def build_analyzer() -> AnalyzerEngine:
133139

134140

135141
analyzer = build_analyzer()
142+
batch_analyzer = BatchAnalyzerEngine(analyzer_engine=analyzer)
136143
anonymizer = AnonymizerEngine()
137144

138145
# Propagates to uvicorn's root handler, so timing lands in the container log stream.
@@ -149,13 +156,65 @@ class AnalyzeRequest(BaseModel):
149156
return_decision_process: bool = False
150157

151158

159+
class AnalyzeBatchRequest(BaseModel):
160+
texts: list[str]
161+
language: str = "en"
162+
entities: list[str] | None = None
163+
score_threshold: float | None = None
164+
165+
152166
class AnonymizeRequest(BaseModel):
153167
text: str
154168
analyzer_results: list[dict[str, Any]] = []
155169
anonymizers: dict[str, dict[str, Any]] | None = None
156170
operators: dict[str, dict[str, Any]] | None = None
157171

158172

173+
class AnonymizeBatchItem(BaseModel):
174+
text: str
175+
analyzer_results: list[dict[str, Any]] = []
176+
177+
178+
class AnonymizeBatchRequest(BaseModel):
179+
items: list[AnonymizeBatchItem] = []
180+
anonymizers: dict[str, dict[str, Any]] | None = None
181+
operators: dict[str, dict[str, Any]] | None = None
182+
183+
184+
def build_operators(
185+
raw_operators: dict[str, dict[str, Any]] | None,
186+
) -> dict[str, OperatorConfig] | None:
187+
if not raw_operators:
188+
return None
189+
operators: dict[str, OperatorConfig] = {}
190+
for entity, raw_cfg in raw_operators.items():
191+
op_cfg = dict(raw_cfg)
192+
op_type = op_cfg.pop("type", "replace")
193+
operators[entity] = OperatorConfig(op_type, op_cfg)
194+
return operators
195+
196+
197+
def run_anonymize(
198+
text: str,
199+
raw_results: list[dict[str, Any]],
200+
operators: dict[str, OperatorConfig] | None,
201+
):
202+
analyzer_results = [
203+
RecognizerResult(
204+
entity_type=r["entity_type"],
205+
start=r["start"],
206+
end=r["end"],
207+
score=r.get("score", 1.0),
208+
)
209+
for r in raw_results
210+
]
211+
return anonymizer.anonymize(
212+
text=text,
213+
analyzer_results=analyzer_results,
214+
operators=operators,
215+
)
216+
217+
159218
@app.get("/health")
160219
def health() -> dict[str, str]:
161220
return {"status": "ok"}
@@ -186,35 +245,28 @@ def analyze(req: AnalyzeRequest) -> list[dict[str, Any]]:
186245
return [r.to_dict() for r in results]
187246

188247

248+
@app.post("/analyze_batch")
249+
def analyze_batch(req: AnalyzeBatchRequest) -> list[list[dict[str, Any]]]:
250+
"""Analyze many texts in one pass (spaCy nlp.pipe), returning one span list
251+
per input in request order — the batched counterpart to /analyze."""
252+
results = batch_analyzer.analyze_iterator(
253+
texts=req.texts,
254+
language=req.language,
255+
entities=req.entities or None,
256+
score_threshold=req.score_threshold,
257+
)
258+
return [[r.to_dict() for r in per_text] for per_text in results]
259+
260+
189261
@app.post("/anonymize")
190262
def anonymize(req: AnonymizeRequest) -> dict[str, Any]:
191263
started = time.perf_counter()
192-
analyzer_results = [
193-
RecognizerResult(
194-
entity_type=r["entity_type"],
195-
start=r["start"],
196-
end=r["end"],
197-
score=r.get("score", 1.0),
198-
)
199-
for r in req.analyzer_results
200-
]
201-
raw_operators = req.anonymizers or req.operators
202-
operators = None
203-
if raw_operators:
204-
operators = {}
205-
for entity, raw_cfg in raw_operators.items():
206-
op_cfg = dict(raw_cfg)
207-
op_type = op_cfg.pop("type", "replace")
208-
operators[entity] = OperatorConfig(op_type, op_cfg)
209-
result = anonymizer.anonymize(
210-
text=req.text,
211-
analyzer_results=analyzer_results,
212-
operators=operators,
213-
)
264+
operators = build_operators(req.anonymizers or req.operators)
265+
result = run_anonymize(req.text, req.analyzer_results, operators)
214266
logger.info(
215267
"anonymize chars=%d spans=%d duration_ms=%.1f",
216268
len(req.text),
217-
len(analyzer_results),
269+
len(req.analyzer_results),
218270
(time.perf_counter() - started) * 1000,
219271
)
220272
return {
@@ -230,3 +282,17 @@ def anonymize(req: AnonymizeRequest) -> dict[str, Any]:
230282
for item in result.items
231283
],
232284
}
285+
286+
287+
@app.post("/anonymize_batch")
288+
def anonymize_batch(req: AnonymizeBatchRequest) -> dict[str, list[str]]:
289+
"""Mask many texts in one pass, returning masked text per item in request
290+
order — the batched counterpart to /anonymize. Anonymization is pure string
291+
work (no NLP), so callers should send only items with detected spans."""
292+
operators = build_operators(req.anonymizers or req.operators)
293+
return {
294+
"texts": [
295+
run_anonymize(item.text, item.analyzer_results, operators).text
296+
for item in req.items
297+
]
298+
}

apps/sim/app/api/guardrails/mask-batch/route.ts

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,10 @@ const logger = createLogger('GuardrailsMaskBatchAPI')
1111

1212
/**
1313
* Internal batch PII masking. The log-redaction persist path runs in both the
14-
* Next.js server and the trigger.dev runtime, but the Presidio sidecars live only
15-
* in the app task — so redaction calls this endpoint server-to-server (internal
16-
* JWT) to keep Presidio centralized here.
14+
* Next.js server and the trigger.dev runtime, but only the app task reaches the
15+
* Presidio service (it holds `PII_URL` and the internal-network access) — so
16+
* redaction calls this endpoint server-to-server (internal JWT) to keep the
17+
* Presidio call centralized here.
1718
*/
1819
export const POST = withRouteHandler(async (request: NextRequest) => {
1920
const auth = await checkInternalAuth(request, { requireWorkflowId: false })
@@ -35,7 +36,7 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
3536
})
3637
return NextResponse.json({ masked })
3738
} catch (error) {
38-
// An unreachable/misconfigured Presidio sidecar makes maskPIIBatch throw; fail
39+
// An unreachable/misconfigured Presidio service makes maskPIIBatch throw; fail
3940
// loudly here (the caller scrubs to REDACTION_FAILED, so PII is never leaked).
4041
logger.error('PII batch masking failed', {
4142
error: getErrorMessage(error),

apps/sim/app/api/organizations/[id]/data-retention/route.ts

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,50 @@ function normalizeConfigured(
4242
rules: settings.piiRedaction.rules.map((rule) => ({
4343
...rule,
4444
language: coercePiiLanguage(rule.language),
45+
stages: rule.stages
46+
? {
47+
input: {
48+
...rule.stages.input,
49+
language: coercePiiLanguage(rule.stages.input?.language),
50+
},
51+
blockOutputs: {
52+
...rule.stages.blockOutputs,
53+
language: coercePiiLanguage(rule.stages.blockOutputs?.language),
54+
},
55+
logs: {
56+
...rule.stages.logs,
57+
language: coercePiiLanguage(rule.stages.logs?.language),
58+
},
59+
}
60+
: undefined,
4561
})),
4662
}
4763
: null,
4864
retentionOverrides: settings?.retentionOverrides ?? null,
4965
}
5066
}
5167

68+
/**
69+
* Which granular stages (`input`/`blockOutputs`) are already enabled per rule
70+
* target (`workspaceId ?? ''` = the org default). Used to gate the
71+
* `pii-granular-redaction` flag on *new* enablement only: when the flag is off,
72+
* an org that already configured granular stages must still be able to re-save
73+
* unrelated settings (the UI re-sends the full PII snapshot every save), so we
74+
* reject only a stage transitioning off→on, never a preserved one.
75+
*/
76+
function granularStageEnablement(
77+
settings: OrganizationRetentionValues['piiRedaction']
78+
): Map<string, { input: boolean; blockOutputs: boolean }> {
79+
const map = new Map<string, { input: boolean; blockOutputs: boolean }>()
80+
for (const rule of settings?.rules ?? []) {
81+
map.set(rule.workspaceId ?? '', {
82+
input: rule.stages?.input?.enabled === true,
83+
blockOutputs: rule.stages?.blockOutputs?.enabled === true,
84+
})
85+
}
86+
return map
87+
}
88+
5289
/**
5390
* GET /api/organizations/[id]/data-retention
5491
* Returns the organization's data retention settings.
@@ -87,7 +124,10 @@ export const GET = withRouteHandler(
87124
}
88125

89126
const isEnterprise = !isBillingEnabled || (await isOrganizationOnEnterprisePlan(organizationId))
90-
const piiRedactionEnabled = await isFeatureEnabled('pii-redaction')
127+
const [piiRedactionEnabled, piiGranularRedactionEnabled] = await Promise.all([
128+
isFeatureEnabled('pii-redaction'),
129+
isFeatureEnabled('pii-granular-redaction'),
130+
])
91131
const configured = normalizeConfigured(org.dataRetentionSettings)
92132
const defaults = enterpriseDefaults()
93133

@@ -99,6 +139,7 @@ export const GET = withRouteHandler(
99139
configured,
100140
effective: isEnterprise ? configured : defaults,
101141
piiRedactionEnabled,
142+
piiGranularRedactionEnabled,
102143
},
103144
})
104145
}
@@ -167,7 +208,10 @@ export const PUT = withRouteHandler(
167208
return NextResponse.json({ error: 'Organization not found' }, { status: 404 })
168209
}
169210

170-
const piiRedactionEnabled = await isFeatureEnabled('pii-redaction')
211+
const [piiRedactionEnabled, piiGranularRedactionEnabled] = await Promise.all([
212+
isFeatureEnabled('pii-redaction'),
213+
isFeatureEnabled('pii-granular-redaction'),
214+
])
171215

172216
const current = normalizeConfigured(currentOrg.dataRetentionSettings)
173217
const merged: DataRetentionSettings = { ...current }
@@ -187,6 +231,29 @@ export const PUT = withRouteHandler(
187231
{ status: 403 }
188232
)
189233
}
234+
if (!piiGranularRedactionEnabled) {
235+
// Reject only a granular stage transitioning off→on; a body that merely
236+
// preserves already-enabled granular stages must still save (the UI
237+
// re-sends the full snapshot on every save), so existing orgs aren't
238+
// locked out of unrelated retention changes when the flag is off.
239+
const currentGranular = granularStageEnablement(current.piiRedaction)
240+
const newlyEnablesGranular = (body.piiRedaction?.rules ?? []).some((rule) => {
241+
const cur = currentGranular.get(rule.workspaceId ?? '')
242+
return (
243+
(rule.stages?.input?.enabled === true && !cur?.input) ||
244+
(rule.stages?.blockOutputs?.enabled === true && !cur?.blockOutputs)
245+
)
246+
})
247+
if (newlyEnablesGranular) {
248+
return NextResponse.json(
249+
{
250+
error:
251+
'Granular PII redaction (workflow input and block outputs) is not enabled for this organization',
252+
},
253+
{ status: 403 }
254+
)
255+
}
256+
}
190257
merged.piiRedaction = body.piiRedaction
191258
}
192259
if (body.retentionOverrides !== undefined) {
@@ -251,6 +318,7 @@ export const PUT = withRouteHandler(
251318
configured,
252319
effective: configured,
253320
piiRedactionEnabled,
321+
piiGranularRedactionEnabled,
254322
},
255323
})
256324
}

0 commit comments

Comments
 (0)