Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
345 changes: 211 additions & 134 deletions README.md

Large diffs are not rendered by default.

4,429 changes: 0 additions & 4,429 deletions chunky.txt

This file was deleted.

Empty file added enrichment/__init__.py
Empty file.
56 changes: 56 additions & 0 deletions enrichment/features.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import re
import math
import logging
import pandas as pd

logger = logging.getLogger(__name__)

_DOMAIN_RE = re.compile(
r'^(www\.)?[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?'
r'(\.[a-zA-Z]{2,})+$'
)


def engineer_features(df: pd.DataFrame) -> pd.DataFrame:
"""Add engineered feature columns to the scraped DataFrame.

Input columns expected: phoneNumber, completePhoneNumber, domain, url, stars, reviews.
Returns a new DataFrame with additional columns appended.
"""
df = df.copy()

phone = df.get('phoneNumber', pd.Series('', index=df.index)).fillna('')
intl_phone = df.get('completePhoneNumber', pd.Series('', index=df.index)).fillna('')
domain = df.get('domain', pd.Series('', index=df.index)).fillna('')
url = df.get('url', pd.Series('', index=df.index)).fillna('')

df['has_phone'] = (phone.str.strip() != '') | (intl_phone.str.strip() != '')
df['has_website'] = (domain.str.strip() != '') | (url.str.strip() != '')
df['domain_valid'] = domain.apply(_validate_domain)
df['rating_score'] = _compute_rating_scores(df)
df['review_density'] = _normalize_reviews(df.get('reviews', pd.Series(0, index=df.index)))

logger.info("Feature engineering complete: %d rows, %d columns", len(df), len(df.columns))
return df


def _validate_domain(domain: str) -> bool:
if not domain or not isinstance(domain, str):
return False
return bool(_DOMAIN_RE.match(domain.strip()))


def _compute_rating_scores(df: pd.DataFrame) -> pd.Series:
stars = pd.to_numeric(df.get('stars', pd.Series(0, index=df.index)), errors='coerce').fillna(0.0)
reviews = pd.to_numeric(df.get('reviews', pd.Series(0, index=df.index)), errors='coerce').fillna(0.0)
# stars (0–5) weighted by log of review volume to penalise suspiciously high
# ratings with very few reviews
return (stars * reviews.apply(lambda r: math.log1p(r))).round(4)


def _normalize_reviews(reviews_series: pd.Series) -> pd.Series:
numeric = pd.to_numeric(reviews_series, errors='coerce').fillna(0.0)
max_val = numeric.max()
if max_val == 0:
return pd.Series(0.0, index=reviews_series.index)
return (numeric / max_val).round(4)
108 changes: 108 additions & 0 deletions enrichment/scoring.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""Lead scoring system.

Scores are 0–100 and fully interpretable — each component is independently
capped and documented below. No black-box ML.

Score breakdown (max 100):
reviews 0–30 — volume of social proof
rating 0–25 — quality of social proof
website 0–30 — digital presence (has_website, domain_valid, web signals)
phone 0–15 — reachability

Segments:
micro 0–24
small 25–49
medium 50–74
large 75–100
"""
import logging
import pandas as pd

logger = logging.getLogger(__name__)


def _score_reviews(val) -> float:
try:
n = int(val)
except (ValueError, TypeError):
return 0.0
if n >= 500:
return 30.0
if n >= 200:
return 24.0
if n >= 100:
return 18.0
if n >= 50:
return 12.0
if n >= 10:
return 7.0
if n >= 1:
return 3.0
return 0.0


def _score_rating(val) -> float:
try:
s = float(val)
except (ValueError, TypeError):
return 0.0
if s >= 4.5:
return 25.0
if s >= 4.0:
return 20.0
if s >= 3.5:
return 15.0
if s >= 3.0:
return 10.0
if s > 0:
return 5.0
return 0.0


def _score_website(row) -> float:
pts = 0.0
if row.get('has_website'):
pts += 10.0
if row.get('domain_valid'):
pts += 5.0
if row.get('web_has_contact'):
pts += 5.0
if row.get('web_has_services'):
pts += 5.0
if row.get('web_is_modern'):
pts += 5.0
return pts


def _score_phone(row) -> float:
return 15.0 if row.get('has_phone') else 0.0


def _segment(score: float) -> str:
if score < 25:
return 'micro'
if score < 50:
return 'small'
if score < 75:
return 'medium'
return 'large'


def compute_scores(df: pd.DataFrame) -> pd.DataFrame:
"""Add 'score' (0–100, float) and 'segment' columns to the DataFrame."""
df = df.copy()

review_pts = df['reviews'].apply(_score_reviews)
rating_pts = df['stars'].apply(_score_rating)
website_pts = df.apply(_score_website, axis=1)
phone_pts = df.apply(_score_phone, axis=1)

raw = review_pts + rating_pts + website_pts + phone_pts
df['score'] = raw.clip(0, 100).round(1)
df['segment'] = df['score'].apply(_segment)

logger.info(
"Scoring complete — segment distribution: %s",
df['segment'].value_counts().to_dict(),
)
return df
165 changes: 165 additions & 0 deletions enrichment/web_scraper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
"""Lightweight async website scraper for the enrichment pipeline.

Fetches the homepage of each lead's website and extracts simple signals:
- presence of contact / services pages
- basic keyword extraction
- heuristic for modern JS frameworks

All failures are caught and return an empty signal dict so the pipeline
never crashes due to a single unreachable domain.
"""
import asyncio
import logging
import re
from typing import Dict, List

import aiohttp

try:
from bs4 import BeautifulSoup
_BS4_AVAILABLE = True
except ImportError:
_BS4_AVAILABLE = False

logger = logging.getLogger(__name__)

_HEADERS = {
'User-Agent': (
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/124.0.0.0 Safari/537.36'
),
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
}

_CONTACT_KW = frozenset(['contact', 'contacto', 'kontakt', 'reach us', 'get in touch', 'reach out'])
_SERVICE_KW = frozenset([
'services', 'products', 'solutions', 'offerings', 'portfolio',
'what we do', 'our work', 'capabilities', 'servicios', 'productos',
])
_MODERN_SIGNALS = ['__next', '__nuxt', 'react', 'vue', 'angular', 'gatsby', 'svelte']
_WORD_RE = re.compile(r'\b[a-z]{4,15}\b')

_EMPTY_RESULT: Dict = {
'web_has_contact': False,
'web_has_services': False,
'web_keywords': '',
'web_is_modern': False,
'web_scraped': False,
}

_STOP_WORDS = frozenset([
'that', 'this', 'with', 'have', 'from', 'they', 'will', 'been',
'your', 'more', 'were', 'when', 'there', 'their', 'what', 'about',
'which', 'also', 'into', 'than', 'then', 'some', 'make', 'just',
'like', 'time', 'know', 'take', 'year', 'good', 'people', 'come',
])


async def _fetch(session: aiohttp.ClientSession, url: str, timeout: int) -> str | None:
if not url.startswith(('http://', 'https://')):
url = 'https://' + url
try:
async with session.get(
url,
headers=_HEADERS,
timeout=aiohttp.ClientTimeout(total=timeout),
ssl=False,
allow_redirects=True,
max_redirects=5,
) as resp:
if resp.status == 200:
return await resp.text(errors='replace')
logger.debug("[%s] HTTP %d", url, resp.status)
except Exception as exc:
logger.debug("[%s] fetch failed: %s", url, exc)
return None


def _analyze(html: str) -> Dict:
if _BS4_AVAILABLE:
soup = BeautifulSoup(html, 'html.parser')
text = soup.get_text(separator=' ', strip=True).lower()
else:
# Fallback: strip tags with regex
text = re.sub(r'<[^>]+>', ' ', html).lower()

html_lower = html.lower()

has_contact = any(kw in text for kw in _CONTACT_KW)
has_services = any(kw in text for kw in _SERVICE_KW)
is_modern = any(sig in html_lower for sig in _MODERN_SIGNALS)

words = _WORD_RE.findall(text)
freq: Dict[str, int] = {}
for w in words:
if w not in _STOP_WORDS:
freq[w] = freq.get(w, 0) + 1
top = sorted(freq, key=freq.get, reverse=True)[:10] # type: ignore[arg-type]

return {
'web_has_contact': has_contact,
'web_has_services': has_services,
'web_keywords': ', '.join(top),
'web_is_modern': is_modern,
'web_scraped': True,
}


async def _enrich_one(
session: aiohttp.ClientSession,
url: str,
semaphore: asyncio.Semaphore,
timeout: int,
) -> Dict:
if not url or not url.strip():
return dict(_EMPTY_RESULT)
async with semaphore:
html = await _fetch(session, url.strip(), timeout)
if html:
return _analyze(html)
return dict(_EMPTY_RESULT)


async def _run_batch(
urls: List[str],
max_concurrent: int,
timeout: int,
) -> List[Dict]:
semaphore = asyncio.Semaphore(max_concurrent)
connector = aiohttp.TCPConnector(ssl=False, limit=max_concurrent + 5)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [_enrich_one(session, url, semaphore, timeout) for url in urls]
return await asyncio.gather(*tasks)


def enrich_websites(
urls: List[str],
*,
max_concurrent: int = 10,
batch_size: int = 100,
timeout: int = 10,
) -> List[Dict]:
"""Enrich a list of URLs with website signals.

Processes in batches to keep memory bounded for large inputs.
Never raises — failed URLs return empty signal dicts.
"""
if not _BS4_AVAILABLE:
logger.warning(
"beautifulsoup4 not installed — falling back to regex HTML stripping. "
"Install it with: pip install beautifulsoup4"
)

results: List[Dict] = []
total = len(urls)
for batch_start in range(0, total, batch_size):
batch = urls[batch_start: batch_start + batch_size]
batch_num = batch_start // batch_size + 1
total_batches = (total + batch_size - 1) // batch_size
logger.info("Web scraping batch %d/%d (%d URLs)", batch_num, total_batches, len(batch))
batch_results = asyncio.run(_run_batch(batch, max_concurrent, timeout))
results.extend(batch_results)
return results
Loading