Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 34 additions & 21 deletions src/lob_hlpr/hlpr.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging.handlers
import os
import re
import threading
from dataclasses import fields, is_dataclass
from datetime import datetime
from typing import Any
Expand Down Expand Up @@ -42,6 +43,9 @@ def enable_windows_ansi_support(): # pragma: no cover
# Determine if ANSI colors will work
_USE_COLOR = enable_windows_ansi_support()

# Lock protecting the one-time setup of rotating file handlers in lob_print
_log_handler_lock = threading.Lock()


class LobHlpr:
"""Helper functions for Lobaro tools."""
Expand Down Expand Up @@ -131,40 +135,49 @@ def lob_print(log_path: str, *args, **kwargs):
uncolored.
"""
color = kwargs.pop("color", None)
LobHlpr._print_color(*args, color=color, **kwargs)
sep = kwargs.pop("sep", " ")
kwargs.pop("end", None) # consumed by print, not meaningful for logging
LobHlpr._print_color(*args, color=color, sep=sep, **kwargs)

# get the directory from the log_path
log_dir = os.path.dirname(log_path)
os.makedirs(log_dir, exist_ok=True)
if log_dir:
os.makedirs(log_dir, exist_ok=True)
logger = logging.getLogger("lob_hlpr")
# Check to see if the file handler was already set up for root logger
logger.propagate = False # Prevent propagation to root logger
logger.setLevel(logging.INFO)
root_logger = logging.getLogger()

# Check if our file handler is already attached to root logger
has_file_handler = any(
isinstance(h, logging.handlers.RotatingFileHandler)
and h.baseFilename == os.path.abspath(log_path)
for h in root_logger.handlers
)

if not has_file_handler:
ch = logging.handlers.RotatingFileHandler(
log_path, maxBytes=268435456, backupCount=2
abs_log_path = os.path.abspath(log_path)
with _log_handler_lock:
# Re-check inside the lock to avoid a TOCTOU race when multiple
# threads call lob_print concurrently with the same log_path.
has_file_handler = any(
isinstance(h, logging.handlers.RotatingFileHandler)
and h.baseFilename == abs_log_path
for h in root_logger.handlers
)

formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
if not has_file_handler:
ch = logging.handlers.RotatingFileHandler(
log_path, maxBytes=268435456, backupCount=2
)

formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)

ch.setFormatter(formatter)
ch.setFormatter(formatter)

# Add handler to root logger so all loggers inherit it
root_logger.addHandler(ch)
logger.addHandler(ch)
# Add to root_logger so all other loggers inherit it via
# propagation. Also add directly to logger because logger has
# propagate=False, so it would not reach root_logger otherwise.
root_logger.addHandler(ch)
logger.addHandler(ch)
Comment thread
MrKevinWeiss marked this conversation as resolved.

logger.info(*args, **kwargs)
message = sep.join(str(a) for a in args)
for line in message.splitlines() or [message]:
logger.info("%s", line)

@staticmethod
def ascleandict(
Expand Down
66 changes: 66 additions & 0 deletions tests/test_lob_hlpr.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import json
import logging
import logging.handlers
import threading
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass

import pytest
Expand Down Expand Up @@ -161,6 +164,69 @@ def test_log_print_passes(tmp_path, capsys):
assert log_content.count("Another test message") == 1


def test_log_print_multiline(tmp_path, capsys):
"""Multiline messages are split into one log record per line.

The console output must be unchanged (print handles newlines natively),
while the log file must contain each line as a separate entry so that
log parsers and grep work without special handling.
"""
test_file = tmp_path / "multiline.log"
hlp.lob_print(str(test_file), "line one\nline two\nline three")

captured = capsys.readouterr()
# Console retains the original newlines via print
assert "line one\nline two\nline three" in captured.out

log_content = test_file.read_text()
# Each line is a separate log record — no embedded newlines in any record
log_lines = [ln for ln in log_content.splitlines() if ln.strip()]
assert any("line one" in ln for ln in log_lines)
assert any("line two" in ln for ln in log_lines)
assert any("line three" in ln for ln in log_lines)
# None of the individual log records should span multiple lines
assert not any("\n" in ln for ln in log_lines)


def test_log_print_concurrent(tmp_path):
"""lob_print is safe to call from multiple threads simultaneously.

Concurrent callers sharing the same log_path must result in exactly one
RotatingFileHandler attached to root_logger and each message written
exactly once to the log file.
"""
test_file = tmp_path / "concurrent.log"
n_threads = 20
barrier = threading.Barrier(n_threads)

def worker(i):
# All threads reach the barrier before any of them calls lob_print,
# maximising the chance of a real race on handler setup.
barrier.wait()
hlp.lob_print(str(test_file), f"msg-{i:04d}")

with ThreadPoolExecutor(max_workers=n_threads) as pool:
futures = [pool.submit(worker, i) for i in range(n_threads)]
for f in futures:
f.result() # re-raises any exception from the thread

root_logger = logging.getLogger()
file_handlers = [
h
for h in root_logger.handlers
if isinstance(h, logging.handlers.RotatingFileHandler)
and h.baseFilename == str(test_file.resolve())
]
assert len(file_handlers) == 1, (
f"Expected exactly 1 RotatingFileHandler, got {len(file_handlers)}"
)

log_content = test_file.read_text()
for i in range(n_threads):
count = log_content.count(f"msg-{i:04d}")
assert count == 1, f"msg-{i:04d} appeared {count} times in log (expected 1)"


def test_ascleandict_rejects_non_dataclass():
"""Test that ascleandict raises TypeError for non-dataclass inputs."""
with pytest.raises(TypeError):
Expand Down