Projekt realizuje proces akwizycji danych (data ingestion) z zewnętrznych źródeł Open Data oraz ich zapis do systemu plików HDFS (Hadoop Distributed File System).
Pipeline został zaprojektowany tak, aby:
- pobierać dane z API NYC Open Data,
- obsługiwać zarówno dane statyczne, jak i dynamiczne,
- wykrywać przyrost danych (incremental load),
- automatycznie zapisywać dane do HDFS,
- prowadzić szczegółowe logi procesu.
Pipeline korzysta z trzech datasetów:
-
Dane o inspekcjach restauracji NYC
-
Źródła:
- CSV (pełny dump)
- JSON (do incremental load)
- Dane o nieruchomościach
- Pobierany tylko raz
- Dane policyjne
- Pobierany tylko raz
Pipeline dzieli dane na dwa typy:
- Pobierane tylko jeśli plik nie istnieje
- Zapisywane lokalnie i do HDFS
-
Obsługiwane w trybie:
- FULL LOAD (pierwsze uruchomienie)
- INCREMENTAL LOAD (kolejne uruchomienia)
- Jeśli brak → FULL LOAD
- Jeśli istnieje → próbujemy znaleźć różnicę
- Pobierane jest ostatnie 100 wartości
camisz lokalnego pliku
-
Dane pobierane z endpointu JSON:
- limity:
5000 → 10000 → 20000 → 50000
- limity:
- Szukamy pierwszego rekordu, którego
camiswystępuje w ostatnich zapisanych danych - Pozycja tego rekordu wyznacza punkt przecięcia
- Wszystkie rekordy przed match’em = nowe dane
-
Nowe dane są dodawane na początek starego zbioru
-
Tworzone są dwa pliki:
DOHMH_<timestamp>.csv→ wersjonowanyDOHMH_latest.csv→ aktualny snapshot
Jeśli nie znaleziono matcha:
- wykonywany jest FULL LOAD
Każdy plik jest:
-
kopiowany do HDFS:
/datasets/{dataset_name}/ -
nadpisywany jeśli istnieje
-
ustawiana jest replikacja:
hdfs dfs -setrep -w 3
Pipeline zapisuje logi w katalogu logs/:
Każdy log zawiera:
- timestamp
- poziom logu (INFO, WARN, ERROR, SUCCESS)
- opis operacji
- rozpoczęcie pipeline
- pobieranie plików
- liczba nowych rekordów
- błędy HTTP
- fallback do FULL LOAD
- upload do HDFS
- pobiera pliki w trybie streaming
- pokazuje progress bar (
tqdm) - obsługuje duże pliki
- retry (3 próby)
- timeout
- logowanie błędów
- zabezpieczenie przed przerwaniem pipeline
- główna logika incremental load
- obsługa FULL / INCREMENTAL
- merge danych
- zapis do plików
- wyszukuje punkt przecięcia danych
- działa na podstawie pola
camis
- odczytuje końcowe rekordy z pliku
- przygotowuje dane do porównania
- pobiera dane statyczne tylko raz
- zapobiega duplikacji
- wysyła pliki do HDFS
- ustawia replikację = 3
uv sync
uv run python main.py- Python 3.10+
- Hadoop + HDFS
- dostęp do komendy
hdfs dfs
| Scenariusz | Zachowanie |
|---|---|
| Pierwsze uruchomienie | FULL LOAD wszystkich danych |
| Kolejne uruchomienie | Incremental dla DOHMH |
| Brak matcha | FULL LOAD |
| Błąd API | retry |
| Brak pliku statycznego | pobranie |
| Plik istnieje | pominięcie |
✔ obsługa dużych danych ✔ incremental ingestion ✔ odporność na błędy ✔ logowanie procesu ✔ integracja z Hadoop ✔ wersjonowanie danych ✔ progress bar
- match oparty na
camismoże być wrażliwy na zmiany kolejności danych - brak dedykowanego klucza czasowego w datasetach
- pełny reload w przypadku braku dopasowania
Pipeline realizuje kompletny proces:
- pobrania danych,
- detekcji zmian,
- aktualizacji zbiorów,
- zapisu do HDFS,
- logowania i obsługi błędów.
Rozwiązanie spełnia wymagania zadania oraz odzwierciedla rzeczywiste podejście stosowane w systemach Big Data.