diff --git a/compliance-monitor/monitor.py b/compliance-monitor/monitor.py index 0d9ee9823..e8d5cc701 100755 --- a/compliance-monitor/monitor.py +++ b/compliance-monitor/monitor.py @@ -42,6 +42,7 @@ db_get_keys, db_insert_report, db_get_recent_results2, db_patch_approval2, db_get_report, db_ensure_schema, db_get_apikeys, db_update_apikey, db_filter_apikeys, db_clear_delegates, db_find_subjects, db_insert_result2, db_get_relevant_results2, db_add_delegate, db_get_group, + db_get_relevant_compliance_results, db_insert_compliance_result, db_insert_event, ) @@ -463,11 +464,12 @@ async def post_report( if not documents: raise HTTPException(status_code=200, detail="empty reports") - allowed_subjects = {auth_subject} | set(delegation_subjects) - for document in documents: - check_role(account, document['subject'], ROLES['append_any']) - if document['subject'] not in allowed_subjects: - raise HTTPException(status_code=401, detail="delegation problem?") + reported_subjects = {document['subject'] for document in documents} + for subj in reported_subjects: + check_role(account, subj, ROLES['append_any']) + extra_subjects = reported_subjects - {auth_subject} - set(delegation_subjects) + if extra_subjects: + raise HTTPException(status_code=401, detail="delegation problem?") with conn.cursor() as cur: for document, json_text in zip(documents, json_texts): @@ -493,6 +495,36 @@ async def post_report( result = rdata['result'] approval = 1 == result # pre-approve good result db_insert_result2(cur, checked_at, subject, scopeuuid, version, check, result, approval, reportid) + + checked_at = datetime.now() + # add new compliance result if existing compliance result is not newer than `threshold` + threshold = checked_at - timedelta(hours=12) + for approved_only in (False, True): + # fetch latest compliance results before new report + rows = db_get_relevant_compliance_results(cur, approved_only=approved_only) + results0 = defaultdict(lambda: defaultdict(dict)) + for row in rows: + subj, scope_uuid, version, result, _, ch_at = row + if subj not in reported_subjects: + continue + results0[subj][scope_uuid][version] = (result, ch_at) + # compute latest compliance results after new report + rows2 = db_get_relevant_results2(cur, approved_only=approved_only) + results = convert_result_rows_to_dict2(rows2, get_scopes()) + # update compliance table and report changes + for subj, subj_results in results.items(): + if subj not in reported_subjects: + continue + for scope_uuid, scope_results in subj_results.items(): + for version, version_results in scope_results['versions'].items(): + result, ch_at = results0[subj][scope_uuid].get(version, (0, threshold)) + new_result = version_results['result'] + if new_result != result or ch_at <= threshold: + print(ch_at, threshold) + db_insert_compliance_result(cur, checked_at, subj, scope_uuid, version, new_result, approved_only) + if new_result != result: + db_insert_event(cur, checked_at, subj, scope_uuid, version, result, new_result, approved_only) + print(f"{subj} {scope_uuid} {version}: {result} -> {new_result}") conn.commit() diff --git a/compliance-monitor/sql.py b/compliance-monitor/sql.py index 2d39635a6..c585c38fa 100644 --- a/compliance-monitor/sql.py +++ b/compliance-monitor/sql.py @@ -3,7 +3,7 @@ # list schema versions in ascending order SCHEMA_VERSION_KEY = 'version' -SCHEMA_VERSIONS = ['v1', 'v2', 'v3', 'v4'] +SCHEMA_VERSIONS = ['v1', 'v2', 'v3', 'v4', 'v5'] # use ... (Ellipsis) here to indicate that no default value exists (will lead to error if no value is given) ACCOUNT_DEFAULTS = {'subject': ..., 'api_key': ..., 'roles': ..., 'group': None} PUBLIC_KEY_DEFAULTS = {'public_key': ..., 'public_key_type': ..., 'public_key_name': ...} @@ -143,6 +143,33 @@ def db_ensure_schema_v4(cur: cursor): ''') +def db_ensure_schema_v5(cur: cursor): + # v5 mainly extends v4 + db_ensure_schema_v4(cur) + # introduce tables compliance that track compliance over time + cur.execute(''' + CREATE TABLE IF NOT EXISTS compliance ( + resultid SERIAL PRIMARY KEY, + checked_at timestamp NOT NULL, + subject text NOT NULL, + scopeuuid text NOT NULL, + version text NOT NULL, + result int, + approval boolean + ); + CREATE TABLE IF NOT EXISTS event ( + eventid SERIAL PRIMARY KEY, + eventdate timestamp NOT NULL, + subject text NOT NULL, + scopeuuid text NOT NULL, + version text NOT NULL, + old_result int, + new_result int, + approval boolean + ); + ''') + + def db_upgrade_data_v1_v2(cur): # we are going to drop table result, but use delete anyway to have the transaction safety cur.execute(''' @@ -219,6 +246,10 @@ def db_upgrade_schema(conn: connection, cur: cursor): db_ensure_schema_v4(cur) db_set_schema_version(cur, 'v4') conn.commit() + elif current == 'v4': + db_ensure_schema_v5(cur) + db_set_schema_version(cur, 'v5') + conn.commit() def db_ensure_schema(conn: connection): @@ -424,3 +455,70 @@ def db_patch_approval2(cur: cursor, record): RETURNING resultid;''', record) resultid, = cur.fetchone() return resultid + + +def db_insert_compliance_result( + cur: cursor, checked_at, subject, scopeuuid, version, result, approval +): + # this is an exception in that we don't use a record parameter (it's just not as practical here) + cur.execute(''' + INSERT INTO compliance (checked_at, subject, scopeuuid, version, result, approval) + VALUES (%s, %s, %s, %s, %s, %s) + RETURNING resultid;''', (checked_at, subject, scopeuuid, version, result, approval)) + resultid, = cur.fetchone() + return resultid + + +def db_get_relevant_compliance_results( + cur: cursor, + subject=None, scopeuuid=None, version=None, approved_only=True, +): + """for each combination of scope/version/check, get the most recent test result that is still valid""" + # find the latest result per subject/scopeuuid/version/checkid for this subject + # DISTINCT ON is a Postgres-specific construct that comes in very handy here :) + cur.execute(sql.SQL(''' + SELECT DISTINCT ON (subject, scopeuuid, version) + subject, scopeuuid, version, result, approval, checked_at + FROM compliance + {filter_condition} + ORDER BY subject, scopeuuid, version, checked_at DESC; + ''').format( + filter_condition=make_where_clause( + sql.SQL('approval') if approved_only else None, + None if scopeuuid is None else sql.SQL('scopeuuid = %(scopeuuid)s'), + None if version is None else sql.SQL('version = %(version)s'), + None if subject is None else sql.SQL('subject = %(subject)s'), + ), + ), {"subject": subject, "scopeuuid": scopeuuid, "version": version}) + return cur.fetchall() + + +def db_insert_event( + cur: cursor, eventdate, subject, scopeuuid, version, old_result, new_result, approval +): + # this is an exception in that we don't use a record parameter (it's just not as practical here) + cur.execute(''' + INSERT INTO event (eventdate, subject, scopeuuid, version, old_result, new_result, approval) + VALUES (%s, %s, %s, %s, %s, %s, %s) + RETURNING eventid;''', (eventdate, subject, scopeuuid, version, old_result, new_result, approval)) + resultid, = cur.fetchone() + return resultid + + +def db_get_recent_events(cur: cursor, approved, limit, skip, max_age_days=None): + """list recent events without grouping by scope/version/check""" + columns = ('date', 'subject', 'scopeuuid', 'version', 'old_result', 'new_result', 'approval') + cur.execute(sql.SQL(''' + SELECT eventdate, subject, scopeuuid, version, old_result, new_result, approval + FROM event + {where_clause} + ORDER BY eventdate + LIMIT %(limit)s OFFSET %(skip)s;''').format( + where_clause=make_where_clause( + None if max_age_days is None else sql.SQL( + f"eventdate > NOW() - interval '{max_age_days:d} days'" + ), + None if approved is None else sql.SQL('approval = %(approved)s'), + ), + ), {"limit": limit, "skip": skip, "approved": approved}) + return [{col: val for col, val in zip(columns, row)} for row in cur.fetchall()]