Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions ci3/cache_log
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,27 @@ function live_publish_log {
done
}

function http_publish_log {
cat $outfile | http_log_update $key $CI_REDIS_EXPIRE
}

function http_live_publish_log {
while [ -f $outfile ]; do
if [ $(( $(date +%s) - $(stat -c %Y "$outfile") )) -le 5 ]; then
http_publish_log
fi
for i in {1..5}; do
[ ! -f $outfile ] && break
sleep 1
done
done
}

function http_publish_log_final {
finalize_log
cat $outfile | http_log_post $key $CI_REDIS_EXPIRE
}

if [ "$CI_REDIS_AVAILABLE" -eq 1 ]; then
log_str="$name log id: ${yellow}$DASHBOARD_URL/$key${reset}"
echo_stderr -e "$log_str"
Expand All @@ -69,6 +90,24 @@ if [ "$CI_REDIS_AVAILABLE" -eq 1 ]; then
redact | cat >>$outfile
fi
publish_log_final
elif [ -n "${CI_LOG_API_KEY:-}" ]; then
# No direct Redis access, but we can post logs via HTTP to the dashboard.
log_str="$name log id: ${yellow}$DASHBOARD_URL/$key${reset}"
echo_stderr -e "$log_str"
exec 2>/dev/null
if [ "$dup" -ne 1 ]; then
exec >/dev/null
fi
http_live_publish_log >&- 2>&- &
if [ "$dup" -eq 1 ]; then
redact | tee -a $outfile
if [ "$(cat $outfile | wc -l)" -ge 16 ]; then
echo_stderr -e "$log_str"
fi
else
redact | cat >>$outfile
fi
http_publish_log_final
else
if [ "$dup" -eq 1 ]; then
redact | cat
Expand Down
111 changes: 111 additions & 0 deletions ci3/dashboard/rk.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

_s3 = boto3.client('s3', region_name='us-east-2')
DASHBOARD_PASSWORD = os.getenv('DASHBOARD_PASSWORD', '')
CI_LOG_API_KEY = os.getenv('CI_LOG_API_KEY', '')
CI_METRICS_PORT = int(os.getenv('CI_METRICS_PORT', '8081'))
CI_METRICS_URL = os.getenv('CI_METRICS_URL', f'http://localhost:{CI_METRICS_PORT}')

Expand Down Expand Up @@ -563,6 +564,116 @@ def make_options(param_name, options, current_value, suffix=''):
return redirect(f'/{run_id}')


# ---- Log ingestion API ----
# Allows CI runners to post logs via HTTP instead of needing direct Redis/S3 credentials.

def _check_log_api_key():
"""Validate the CI log API key from the Authorization header. Returns error Response or None."""
if not CI_LOG_API_KEY:
return Response('Log API not configured', status=503)
auth_header = request.headers.get('Authorization', '')
if auth_header != f'Bearer {CI_LOG_API_KEY}':
return Response('Unauthorized', status=401)
return None

def _write_log_to_stores(key, data, expire=None):
"""Write log data (bytes) to Redis and S3."""
if expire is None:
expire = 60 * 60 * 24 * 14 # 2 weeks

# Ensure data is gzipped for storage
if data[:2] == b'\x1f\x8b':
gz_data = data
else:
gz_data = gzip.compress(data)

# Write to Redis
try:
r.setex(key, expire, gz_data)
except Exception as e:
print(f"Redis write failed for {key}: {e}")

# Write to S3 in background
def s3_write():
try:
prefix = key[:4]
s3_key = f"{S3_LOGS_PREFIX}/{prefix}/{key}.log.gz"
_s3.put_object(Bucket=S3_LOGS_BUCKET, Key=s3_key, Body=gz_data)
except Exception as e:
print(f"S3 write failed for {key}: {e}")
threading.Thread(target=s3_write, daemon=True).start()

@app.route('/api/log/<key>', methods=['POST'])
def post_log(key):
"""Accept a log upload and write to Redis + S3.

Body: raw log content (plain text or gzipped).
Headers:
Authorization: Bearer <CI_LOG_API_KEY>
Content-Encoding: gzip (optional, if body is pre-compressed)
Query params:
expire: TTL in seconds (default: 2 weeks)
final: if "1", also write to S3 for permanent storage (default: true)
"""
err = _check_log_api_key()
if err:
return err

# Validate key format (alphanumeric + hyphens + underscores, max 128 chars)
if not re.match(r'^[a-zA-Z0-9_-]{1,128}$', key):
return Response('Invalid key format', status=400)

data = request.get_data()
if not data:
return Response('Empty body', status=400)

# Cap at 50MB
if len(data) > 50 * 1024 * 1024:
return Response('Payload too large', status=413)

expire = request.args.get('expire', 60 * 60 * 24 * 14, type=int)
_write_log_to_stores(key, data, expire)

return Response(json.dumps({'ok': True, 'key': key}), mimetype='application/json', status=201)

@app.route('/api/log/<key>', methods=['PUT'])
def update_log(key):
"""Update a log in Redis only (for live streaming / in-progress updates).

Same auth and body format as POST, but only writes to Redis (not S3)
since this is for transient live updates.
"""
err = _check_log_api_key()
if err:
return err

if not re.match(r'^[a-zA-Z0-9_-]{1,128}$', key):
return Response('Invalid key format', status=400)

data = request.get_data()
if not data:
return Response('Empty body', status=400)

if len(data) > 50 * 1024 * 1024:
return Response('Payload too large', status=413)

expire = request.args.get('expire', 60 * 60 * 24 * 14, type=int)

# Ensure data is gzipped for Redis
if data[:2] == b'\x1f\x8b':
gz_data = data
else:
gz_data = gzip.compress(data)

try:
r.setex(key, expire, gz_data)
except Exception as e:
return Response(json.dumps({'error': f'Redis write failed: {e}'}),
mimetype='application/json', status=500)

return Response(json.dumps({'ok': True, 'key': key}), mimetype='application/json')


# ---- Reverse proxy to ci-metrics server ----

_proxy_session = requests.Session()
Expand Down
16 changes: 11 additions & 5 deletions ci3/denoise
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,19 @@ function format_log_output {
}

function publish_log {
[ "$CI_REDIS_AVAILABLE" -eq 0 ] && return
format_log_output | redis_setexz $key $CI_REDIS_EXPIRE
if [ "$CI_REDIS_AVAILABLE" -eq 1 ]; then
format_log_output | redis_setexz $key $CI_REDIS_EXPIRE
elif [ -n "${CI_LOG_API_KEY:-}" ]; then
format_log_output | http_log_update $key $CI_REDIS_EXPIRE
fi
}

function publish_log_final {
[ "$CI_REDIS_AVAILABLE" -eq 0 ] && return
format_log_output | cache_persistent $key $CI_REDIS_EXPIRE
if [ "$CI_REDIS_AVAILABLE" -eq 1 ]; then
format_log_output | cache_persistent $key $CI_REDIS_EXPIRE
elif [ -n "${CI_LOG_API_KEY:-}" ]; then
format_log_output | http_log_post $key $CI_REDIS_EXPIRE
fi
}

function live_publish_log {
Expand All @@ -90,7 +96,7 @@ function live_publish_log {
done
}

if [ "$CI_REDIS_AVAILABLE" -eq 1 ]; then
if [ "$CI_REDIS_AVAILABLE" -eq 1 ] || [ -n "${CI_LOG_API_KEY:-}" ]; then
live_publish_log &
publish_pid=$!
log_info="(${yellow}${url}${reset})"
Expand Down
20 changes: 14 additions & 6 deletions ci3/run_test_cmd
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ slack_notify_fail=0

if [ "$CI" -eq 1 ]; then
# CI overrides.
if [ "$CI_REDIS_AVAILABLE" -eq 1 ]; then
if [ "$CI_REDIS_AVAILABLE" -eq 1 ] || [ -n "${CI_LOG_API_KEY:-}" ]; then
live_logging=1
publish=1
fi
Expand Down Expand Up @@ -111,17 +111,25 @@ trap sig_handler SIGTERM SIGINT

function publish_log {
local expire=${1:-$CI_REDIS_EXPIRE}
cat $tmp_file 2>/dev/null | redis_setexz $log_key $expire
if [ "$CI_REDIS_AVAILABLE" -eq 1 ]; then
cat $tmp_file 2>/dev/null | redis_setexz $log_key $expire
elif [ -n "${CI_LOG_API_KEY:-}" ]; then
cat $tmp_file 2>/dev/null | http_log_update $log_key $expire
fi
}

function publish_log_final {
local expire=${1:-$CI_REDIS_EXPIRE}
cat $tmp_file 2>/dev/null | cache_persistent $log_key $expire
if [ "$CI_REDIS_AVAILABLE" -eq 1 ]; then
cat $tmp_file 2>/dev/null | cache_persistent $log_key $expire
elif [ -n "${CI_LOG_API_KEY:-}" ]; then
cat $tmp_file 2>/dev/null | http_log_post $log_key $expire
fi
}

# Finalize the current log and start a fresh one with a new unique key.
function rotate_log {
if [ "$CI_REDIS_AVAILABLE" -eq 1 ]; then
if [ "$CI_REDIS_AVAILABLE" -eq 1 ] || [ -n "${CI_LOG_API_KEY:-}" ]; then
publish_log_final "$@"
fi
log_key=$(uuid)
Expand Down Expand Up @@ -205,7 +213,7 @@ function run_test {
}

function finalize_test {
if [ "$CI_REDIS_AVAILABLE" -eq 1 ]; then
if [ "$CI_REDIS_AVAILABLE" -eq 1 ] || [ -n "${CI_LOG_API_KEY:-}" ]; then
# If the test succeeded and we're in CI, set success flag for test. This key is unique to the test.
# If the test succeeded and we're logging passes, save the test log.
# If the test failed, save the test log.
Expand Down Expand Up @@ -395,7 +403,7 @@ else

if [ $code -eq 0 ]; then
# Publish the retry's log, then point back at the failure for flake reporting.
if [ "$CI_REDIS_AVAILABLE" -eq 1 ]; then
if [ "$CI_REDIS_AVAILABLE" -eq 1 ] || [ -n "${CI_LOG_API_KEY:-}" ]; then
publish_log_final
fi
log_key=$failure_log_key
Expand Down
45 changes: 44 additions & 1 deletion ci3/source_cache
Original file line number Diff line number Diff line change
Expand Up @@ -75,5 +75,48 @@ function cache_persistent {
} >&- 2>&- &
}

# HTTP-based log posting — used when Redis/S3 aren't directly accessible (e.g. GH runners).
# Posts to the CI dashboard's /api/log endpoint.
# Requires CI_LOG_API_KEY to be set. CI_LOG_POST_URL defaults to DASHBOARD_URL.
export CI_LOG_POST_URL="${CI_LOG_POST_URL:-${DASHBOARD_URL:-http://ci.aztec-labs.com}}"
function http_log_post {
local key="$1"
local expire="${2:-$CI_REDIS_EXPIRE}"
local tmpfile="/tmp/http_log_$$_${key}.gz"

cat | gzip > "$tmpfile"
curl -sf -X POST \
"${CI_LOG_POST_URL}/api/log/${key}?expire=${expire}" \
-H "Authorization: Bearer ${CI_LOG_API_KEY}" \
-H "Content-Type: application/octet-stream" \
--data-binary @"$tmpfile" \
-o /dev/null &>/dev/null
rm -f "$tmpfile"
}

# Live update (Redis only, no S3) via HTTP.
function http_log_update {
local key="$1"
local expire="${2:-$CI_REDIS_EXPIRE}"
local tmpfile="/tmp/http_log_$$_${key}.gz"

cat | gzip > "$tmpfile"
curl -sf -X PUT \
"${CI_LOG_POST_URL}/api/log/${key}?expire=${expire}" \
-H "Authorization: Bearer ${CI_LOG_API_KEY}" \
-H "Content-Type: application/octet-stream" \
--data-binary @"$tmpfile" \
-o /dev/null &>/dev/null
rm -f "$tmpfile"
}

# Persistent cache that uses HTTP posting when Redis is not directly available.
function cache_persistent_http {
local key="$1"
local expire="$2"

cat | http_log_post "$key" "$expire"
}

# Export functions
export -f cache_s3_transfer cache_s3_transfer_to cache_disk_transfer cache_disk_transfer_to cache_persistent
export -f cache_s3_transfer cache_s3_transfer_to cache_disk_transfer cache_disk_transfer_to cache_persistent http_log_post http_log_update cache_persistent_http
Loading