From 8441326768230c2ba7f2574a25993980f1a3c031 Mon Sep 17 00:00:00 2001 From: Ludwig Lierhammer Date: Fri, 29 May 2026 08:49:42 +0200 Subject: [PATCH 1/2] remove duplicate detection codes --- .pre-commit-config.yaml | 1 - docs/tool-overview-databundle.rst | 18 - src/cdm_reader_mapper/__init__.py | 6 - src/cdm_reader_mapper/core/databundle.py | 206 ---- src/cdm_reader_mapper/duplicates/__init__.py | 3 - .../duplicates/_duplicate_settings.py | 77 -- .../duplicates/duplicates.py | 903 ------------------ tests/_duplicates.py | 380 -------- tests/test_databundle.py | 122 --- tests/test_duplicates.py | 385 -------- tests/test_duplicates_data.py | 121 --- 11 files changed, 2222 deletions(-) delete mode 100755 src/cdm_reader_mapper/duplicates/__init__.py delete mode 100755 src/cdm_reader_mapper/duplicates/_duplicate_settings.py delete mode 100755 src/cdm_reader_mapper/duplicates/duplicates.py delete mode 100755 tests/_duplicates.py delete mode 100755 tests/test_duplicates.py delete mode 100755 tests/test_duplicates_data.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index fa08c7bb..a4f578b5 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -23,7 +23,6 @@ repos: - id: fix-byte-order-marker - id: name-tests-test args: [ '--pytest-test-first' ] - exclude: ^tests/_duplicates.py$ - id: no-commit-to-branch args: [ '--branch', 'main' ] - id: trailing-whitespace diff --git a/docs/tool-overview-databundle.rst b/docs/tool-overview-databundle.rst index 123849b4..1e98fbeb 100755 --- a/docs/tool-overview-databundle.rst +++ b/docs/tool-overview-databundle.rst @@ -84,22 +84,4 @@ Now the meteorological data can be maqpped to the Common Data Model (CDM_) using For more information how the mapping is working, please see :ref:`tool-overview-mapper` and/or :ref:`how-to-register-a-new-data-model-mapping`. -:ref:`dupdetect` -^^^^^^^^^^^^^^^^ - -After mapping to the CDM format it is useful to check if the CDM tables contain any duplicates. The duplicate checker included in the ``cdm_reader_mapper`` toolbox is based on python record linkage toolkit RecordLinkage_. - -The first step is to call the method function :func:`.DataBundle.duplicate_check`. This function scans the CDM tables for any duplicates. - -.. code-block:: console - - db_dup = db.duplicate_check() - -Afterwards there are two options how to deal with the detected duplicates: - -1. :func:`.DataBundle.flag_duplicates` -2. :func:`.DataBundle.remove_duplicates` - -The first function flags the detected duplicates. For more information about the flags see `CDM code tables for duplicate_status`_ and `CDM code tables for report_quality`_. The second function removes the detected duplicates. - .. include:: hyperlinks.rst diff --git a/src/cdm_reader_mapper/__init__.py b/src/cdm_reader_mapper/__init__.py index 93c3c0ca..aebc705c 100755 --- a/src/cdm_reader_mapper/__init__.py +++ b/src/cdm_reader_mapper/__init__.py @@ -19,10 +19,6 @@ from .core.reader import read from .core.writer import write from .data import test_data -from .duplicates.duplicates import ( - DupDetect, - duplicate_check, -) from .mdf_reader.reader import read_data, read_mdf from .mdf_reader.writer import write_data from .metmetpy import ( @@ -35,11 +31,9 @@ __all__ = [ "DataBundle", - "DupDetect", "cdm_tables", "correct_datetime", "correct_pt", - "duplicate_check", "map_model", "read", "read_data", diff --git a/src/cdm_reader_mapper/core/databundle.py b/src/cdm_reader_mapper/core/databundle.py index 9b4b4cbd..65723b30 100755 --- a/src/cdm_reader_mapper/core/databundle.py +++ b/src/cdm_reader_mapper/core/databundle.py @@ -17,7 +17,6 @@ split_by_index, ) from cdm_reader_mapper.common.iterators import ParquetStreamReader, is_valid_iterator -from cdm_reader_mapper.duplicates.duplicates import DupDetect, duplicate_check from cdm_reader_mapper.metmetpy import ( correct_datetime, correct_pt, @@ -1414,208 +1413,3 @@ def write( mode=mode, **kwargs, ) - - def duplicate_check(self, inplace: bool = False, **kwargs: Any) -> DataBundle | None: - r""" - Duplicate check in :py:attr:`data`. - - Parameters - ---------- - inplace : bool, default: False - If True overwrite :py:attr:`data` in :py:class:`~DataBundle` - else return a copy of :py:class:`~DataBundle` with :py:attr:`data` as CDM tables. - \**kwargs : Any - Additional keyword-arguments for duplicate check. - - Returns - ------- - :py:class:`~DataBundle` or None - DataBundle containing new :py:class:`~DupDetect` class for further duplicate check methods or None if "inplace=True". - - See Also - -------- - DataBundle.get_duplicates : Get duplicate matches in `data`. - DataBundle.flag_duplicates : Flag detected duplicates in `data`. - DataBundle.remove_duplicates : Remove detected duplicates in `data`. - - Notes - ----- - Following columns have to be provided: - - * `longitude` - * `latitude` - * `primary_station_id` - * `report_timestamp` - * `station_course` - * `station_speed` - - This adds a new class :py:class:`~DupDetect` to :py:class:`~DataBundle`. - This class is necessary for further duplicate check methods. - - For more information see :py:func:`duplicate_check` - - Examples - -------- - >>> db.duplicate_check() - """ - db_ = self._get_db(inplace) - if db_ is None: - return None - if db_._mode == "tables" and "header" in db_._data: - data = db_._data["header"] - else: - data = db_._data - db_.DupDetect = duplicate_check(data, **kwargs) - return self._return_db(db_, inplace) - - def flag_duplicates(self, inplace: bool = False, **kwargs: Any) -> DataBundle | None: - r""" - Flag detected duplicates in :py:attr:`data`. - - Parameters - ---------- - inplace : bool, default: False - If True overwrite :py:attr:`data` in :py:class:`~DataBundle` - else return a copy of :py:class:`~DataBundle` with :py:attr:`data` containing flagged duplicates. - \**kwargs : Any - Additional keyword-arguments for flagging duplicates. - - Returns - ------- - :py:class:`~DataBundle` or None - DataBundle containing duplicate flags in :py:attr:`data` or None if "inplace=True". - - Raises - ------ - RuntimeError - Before flagging duplicates, a duplictate check has to be done, :py:func:`DataBundle.duplicate_check`. - - See Also - -------- - DataBundle.remove_duplicates : Remove detected duplicates in `data`. - DataBundle.get_duplicates : Get duplicate matches in `data`. - DataBundle.duplicate_check : Duplicate check in `data`. - - Notes - ----- - For more information see :py:func:`DupDetect.flag_duplicates` - - Examples - -------- - Flag duplicates without overwriting :py:attr:`data`. - - >>> flagged_tables = db.flag_duplicates() - - Flag duplicates with overwriting :py:attr:`data`. - - >>> db.flag_duplicates(inplace=True) - >>> flagged_tables = db.data - """ - db_ = self._get_db(inplace) - if db_ is None: - return None - - if db_.DupDetect is None: - raise RuntimeError("Before flagging duplicates, a duplictate check has to be done: 'db.duplicate_check()'") - - db_.DupDetect.flag_duplicates(**kwargs) - - if db_._mode == "tables" and "header" in db_._data: - db_._data["header"] = db_.DupDetect.result - else: - db_._data = db_.DupDetect.result - return self._return_db(db_, inplace) - - def get_duplicates(self, **kwargs: Any) -> pd.DataFrame: - r""" - Get duplicate matches in :py:attr:`data`. - - Parameters - ---------- - \**kwargs : Any - Additional keyword-arguments used for getting duplicates. - - Returns - ------- - pd.DataFrame - DataFrame containing duplicate matches. - - Raises - ------ - RuntimeError - Before getting duplicates, a duplictate check has to be done, :py:func:`DataBundle.duplicate_check`. - - See Also - -------- - DataBundle.remove_duplicates : Remove detected duplicates in `data`. - DataBundle.flag_duplicates : Flag detected duplicates in `data`. - DataBundle.duplicate_check : Duplicate check in `data`. - - Notes - ----- - For more information see :py:func:`DupDetect.get_duplicates` - - Examples - -------- - >>> matches = db.get_duplicates() - """ - if self.DupDetect is None: - raise RuntimeError("Before getting duplicates, a duplictate check has to be done: 'db.duplicate_check()'") - return self.DupDetect.get_duplicates(**kwargs) - - def remove_duplicates(self, inplace: bool = False, **kwargs: Any) -> DataBundle | None: - r""" - Remove detected duplicates in :py:attr:`data`. - - Parameters - ---------- - inplace : bool, default: False - If True overwrite :py:attr:`data` in :py:class:`~DataBundle` - else return a copy of :py:class:`~DataBundle` with :py:attr:`data` containing no duplicates. - \**kwargs : Any - Additional keyword-arguments used to remove duplicates. - - Returns - ------- - :py:class:`~DataBundle` or None - DataBundle without duplicated rows or None if "inplace=True". - - Raises - ------ - RuntimeError - Before removing duplicates, a duplictate check has to be done, :py:func:`DataBundle.duplicate_check`. - - See Also - -------- - DataBundle.flag_duplicates : Flag detected duplicates in `data`. - DataBundle.get_duplicates : Get duplicate matches in `data`. - DataBundle.duplicate_check : Duplicate check in `data`. - - Notes - ----- - For more information see :py:func:`DupDetect.remove_duplicates` - - Examples - -------- - Remove duplicates without overwriting :py:attr:`data`. - - >>> removed_tables = db.remove_duplicates() - - Remove duplicates with overwriting :py:attr:`data`. - - >>> db.remove_duplicates(inplace=True) - >>> removed_tables = db.data - """ - db_ = self._get_db(inplace) - if db_ is None: - return None - - if db_.DupDetect is None: - raise RuntimeError("Before removing duplicates, a duplictate check has to be done: 'db.duplicate_check()'") - - db_.DupDetect.remove_duplicates(**kwargs) - header_ = db_.DupDetect.result - if not isinstance(db_._data, pd.DataFrame): - raise TypeError("data has unsupported type: {type(db_._data)}.") - db_._data = db_._data[db_._data.index.isin(header_.index)] - return self._return_db(db_, inplace) diff --git a/src/cdm_reader_mapper/duplicates/__init__.py b/src/cdm_reader_mapper/duplicates/__init__.py deleted file mode 100755 index 5e57deb7..00000000 --- a/src/cdm_reader_mapper/duplicates/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -"""Climate Data Model (CDM) mapper package.""" - -from __future__ import annotations diff --git a/src/cdm_reader_mapper/duplicates/_duplicate_settings.py b/src/cdm_reader_mapper/duplicates/_duplicate_settings.py deleted file mode 100755 index 5f108749..00000000 --- a/src/cdm_reader_mapper/duplicates/_duplicate_settings.py +++ /dev/null @@ -1,77 +0,0 @@ -"""Settings for duplicate check.""" - -from __future__ import annotations -from typing import Any - -from recordlinkage import Compare -from recordlinkage.compare import Numeric - - -__all__ = ["Compare"] - -_method_kwargs = { - "left_on": "report_timestamp", - "window": 5, - "block_on": ["primary_station_id"], -} - -_compare_kwargs = { - "primary_station_id": {"method": "exact"}, - "longitude": { - "method": "numeric", - "kwargs": {"method": "step", "offset": 0.11}, - }, - "latitude": { - "method": "numeric", - "kwargs": {"method": "step", "offset": 0.11}, - }, - "report_timestamp": { - "method": "date2", - "kwargs": {"method": "gauss", "offset": 60.0}, - }, - "station_speed": { - "method": "numeric", - "kwargs": {"method": "step", "offset": 0.09}, - }, - "station_course": { - "method": "numeric", - "kwargs": {"method": "step", "offset": 0.9}, - }, -} - -_histories = { - "duplicate_status": "Added duplicate information - flag", - "duplicates": "Added duplicate information - duplicates", -} - - -class Date2(Numeric): # type: ignore[misc] - """Copy of ``rl.compare.Numeric`` class.""" - - pass - - -def date2(object: Compare, *args: Any, **kwargs: Any) -> Compare: - r""" - New method for ``rl.Compare`` object using ``Date2`` object. - - Parameters - ---------- - object : Compare - Object to with the new method should be added. - \*args : Any - Positional argument for `Date2`. - \**kwargs : Any - Keyword-arguments for `Date2`. - - Returns - ------- - Compare - Compare object with new method. - """ - compare = Date2(*args, **kwargs) - object.add(compare) - return object - - -Compare.date2 = date2 diff --git a/src/cdm_reader_mapper/duplicates/duplicates.py b/src/cdm_reader_mapper/duplicates/duplicates.py deleted file mode 100755 index 8722c721..00000000 --- a/src/cdm_reader_mapper/duplicates/duplicates.py +++ /dev/null @@ -1,903 +0,0 @@ -"""Common Data Model (CDM) pandas duplicate check.""" - -from __future__ import annotations -import datetime -from collections.abc import Iterable -from copy import deepcopy -from typing import Any - -import numpy as np -import pandas as pd -import recordlinkage as rl - -from ._duplicate_settings import Compare, _compare_kwargs, _histories, _method_kwargs - - -def convert_series(df: pd.DataFrame, conversion: dict[Any, Any]) -> pd.DataFrame: - """ - Convert data types in Dataframe. - - Parameters - ---------- - df : pd.DataFrame - Input DataFrame. - conversion : dict - Conversion dictionary conating columns and - new data type as key-value pairs. - - Returns - ------- - pd.DataFrame - DataFrame with converted data types. - """ - - def convert_date_to_float(date: pd.Series | pd.DatetimeIndex) -> pd.Series: - """ - Convert datetime values to float seconds relative to the minimum value. - - Parameters - ---------- - date : pd.Series or pd.DatetimeIndex - Datetime-like values to convert. - - Returns - ------- - pd.Series - Float values representing seconds since the minimum datetime in `date`. - """ - date = date.astype("datetime64[ns]") - return (date - date.min()) / np.timedelta64(1, "s") - - df = df.copy() - for column, method in conversion.items(): - try: - df[column] = df[column].astype(method) - except TypeError: - df[column] = locals()[method](df[column]) - - df = df.infer_objects(copy=False).fillna(9999.0) - return df - - -def add_history(df: pd.DataFrame, indexes: Iterable[int]) -> pd.DataFrame: - """ - Append duplicate information to the 'history' column of a DataFrame. - - Parameters - ---------- - df : pd.DataFrame - The DataFrame containing a 'history' column. - indexes : list[int] or pd.Index - Row indexes where history should be updated. - - Returns - ------- - pd.DataFrame - A new DataFrame with updated 'history' column for the selected rows. - - Notes - ----- - - If 'history' column does not exist, it will be created with empty strings. - - Each message is prefixed with a UTC timestamp in "YYYY-MM-DD HH:MM:SS" format. - """ - - def _datetime_now() -> str: - """ - Get actual datetime. - - Returns - ------- - str - Actual datetime as string representative ("%Y-%m-%d %H:%M:%S"). - """ - try: - now = datetime.datetime.now(datetime.UTC) - except AttributeError: - now = datetime.datetime.utcnow() - - return now.strftime("%Y-%m-%d %H:%M:%S") - - df = df.copy() - - if "history" not in df.columns: - df["history"] = "" - - history_tstmp = _datetime_now() - addition = "".join([f"; {history_tstmp}. {add}" for add in _histories.items()]) - df.loc[indexes, "history"] = df.loc[indexes, "history"] + addition - return df - - -def add_duplicates(df: pd.DataFrame, dups: pd.DataFrame) -> pd.DataFrame: - """ - Add duplicate information to the DataFrame based on the `dups` table. - - Parameters - ---------- - df : pd.DataFrame - DataFrame containing a 'report_id' column. - dups : pd.DataFrame - DataFrame where the index corresponds to rows in `df` and - the values are lists of duplicate indices or duplicate IDs. - - Returns - ------- - pd.DataFrame - A new DataFrame with a 'duplicates' column containing duplicates - as a sorted string list, e.g., "{ID1,ID2}". - - Notes - ----- - - If a row has no duplicates, its 'duplicates' column is left unchanged. - - Supports duplicates represented either by IDs (str) or by indices (int) of `report_id`. - """ - - def _add_dups(row: pd.Series) -> pd.Series: - """ - Add duplicates as string representatives to series. - - Parameters - ---------- - row : pd.Series - Single row of a pd.DataFrame. - - Returns - ------- - pd.Series - Duplicates as string representatives added to `row`. - """ - idx = row.name - if idx not in dups.index: - return row - - dup_idx = dups.loc[idx].to_list() - if isinstance(dup_idx[0][0], str): - v_ = sorted(dup_idx[0]) - else: - v_ = report_ids.iloc[dup_idx[0]] - v_ = sorted(v_.tolist()) - row["duplicates"] = "{" + ",".join(v_) + "}" - return row - - df = df.copy() - - if "duplicates" not in df.columns: - df["duplicates"] = "" - - report_ids = df["report_id"] - - dtypes = df.dtypes - result = df.apply(lambda x: _add_dups(x), axis=1) - return result.astype(dtypes) - - -def add_report_quality(df: pd.DataFrame, indexes_bad: Iterable[int]) -> pd.DataFrame: - """ - Update the 'report_quality' column in a DataFrame for bad reports. - - Parameters - ---------- - df : pd.DataFrame - DataFrame containing at least a 'report_quality' column. - indexes_bad : iterable of int - Row indices in the DataFrame to mark as bad quality (value=1). - - Returns - ------- - pd.DataFrame - DataFrame with updated 'report_quality' column. - """ - df = df.copy() - df["report_quality"] = df["report_quality"].astype(int) - df.loc[indexes_bad, "report_quality"] = 1 - return df - - -class DupDetect: - """ - Class to detect, flag, and remove duplicate entries in a DataFrame using a comparison matrix from recordlinkage. - - Parameters - ---------- - data : pd.DataFrame - Original dataset. - compared : pd.DataFrame - Comparison matrix of the dataset. - method : str - Duplicate detection method used for recordlinkage indexing. - method_kwargs : dict - Keyword arguments for recordlinkage indexing method. - compare_kwargs : dict - Keyword arguments used for recordlinkage.Compare. - """ - - def __init__( - self, - data: pd.DataFrame, - compared: pd.DataFrame, - method: str, - method_kwargs: dict[Any, Any], - compare_kwargs: dict[Any, Any], - ) -> None: - """ - Initialize a DupDetect instance. - - Parameters - ---------- - data : pd.DataFrame - Original dataset. - compared : pd.DataFrame - Comparison matrix of the dataset. - method : str - Duplicate detection method used for recordlinkage indexing. - method_kwargs : dict - Keyword arguments for recordlinkage indexing method. - compare_kwargs : dict - Keyword arguments used for recordlinkage.Compare. - """ - self.data = data.copy() - self.compared = compared - self.method = method - self.method_kwargs = method_kwargs - self.compare_kwargs = compare_kwargs - - def _get_limit(self, limit: str | float | None) -> float: - """ - Resolve the duplicate threshold limit. - - Parameters - ---------- - limit : str or float - 'default', None, or a numeric limit. - - Returns - ------- - float - Threshold for total score to consider duplicates. - """ - default_limit = 0.991 - if limit is None or limit == "default": - return default_limit - - return float(limit) - - def _get_equal_musts(self) -> list[str]: - """ - Identify columns that must be equal for duplicates. - - Returns - ------- - list[str] - Columns that must match exactly to consider duplicates. - """ - equal_musts: list[str] = [] - for value in self.compare_kwargs.keys(): - if isinstance(value, str): - value_lst = [value] - else: - value_lst = list(value) - equal_musts.extend(v for v in value_lst if v in self.data.columns) - return equal_musts - - def _total_score(self) -> None: - """Compute total similarity score for each row in `self.compared`.""" - pcmax = self.compared.shape[1] - self.score = 1 - (abs(self.compared.sum(axis=1) - pcmax) / pcmax) - - def get_duplicates( - self, - keep: str | int = "first", - limit: str | float | None = "default", - equal_musts: str | list[str] | None = None, - overwrite: bool = True, - ) -> pd.DataFrame: - """ - Identify duplicate matches based on the comparison matrix. - - Parameters - ---------- - keep : str or int - Which entry to keep: 'first', 'last', or -1, 0. - limit : str or float, optional, default: default - Threshold of total similarity score to consider as duplicate. - equal_musts : str or list[str], optional - Columns that must exactly match. - overwrite : bool, default: True - Whether to recompute matches if already calculated. - - Returns - ------- - pd.DataFrame - DataFrame containing matched duplicates. - """ - if keep not in ["first", "last", -1, 0]: - raise ValueError("keep has to be one of 'first', 'last', -1 or 0.") - - if keep == "first": - keep = -1 - elif keep == "last": - keep = 0 - - self.keep = keep - if keep == 0: - self.drop = -1 - elif keep == -1: - self.drop = 0 - - if overwrite is True: - self._total_score() - self.limit = self._get_limit(limit) - cond = self.score >= self.limit - if equal_musts is None: - equal_musts = self._get_equal_musts() - if isinstance(equal_musts, str): - equal_musts = [equal_musts] - for must in equal_musts: - cond = cond & (self.compared[must]) - self.matches = self.compared[cond] - return self.matches - - def flag_duplicates( - self, - keep: str | int = "first", - limit: str | float | None = "default", - equal_musts: str | list[str] | None = None, - ) -> pd.DataFrame: - r""" - Get result dataset with flagged duplicates. - - Parameters - ---------- - keep : str or int, default: first - Which entry should be kept in result dataset. - limit : str, int or float, optional - Limit of total score that as to be exceeded to be declared as a duplicate. - Defaults to .991. - equal_musts : str or list, optional - Hashable of column name(s) that must totally be equal to be declared as a duplicate. - Default: All column names found in method_kwargs. - - Returns - ------- - pd.DataFrame - Input DataFrame with flagged duplicates, including duplicate_status_ and quality_flag_. - - References - ---------- - .. _duplicate_status: https://glamod.github.io/cdm-obs-documentation/tables/code_tables/duplicate_status/duplicate_status.html - .. _quality_flag: https://glamod.github.io/cdm-obs-documentation/tables/code_tables/quality_flag/quality_flag.html - """ - - def _get_similars(drop_dict: dict[str | int, Any], keeps: Any) -> tuple[Any, Any]: - """ - Get similar entries from a comparison dictionary. - - Parameters - ---------- - drop_dict : dict - Dictionary containing values under keys `drop_` and `keep_` used - to determine similarity relationships. - keeps : Any - Reference collection used to determine whether a value in `drop` is - considered a match. - - Returns - ------- - tuple of Any and Any - A tuple containing the matched `drop` and `keep` values converted - to integers if possible. If the values are not convertible or no match - is found, returns `(None, None)`. - """ - if drop_dict[drop] in keeps: - drops = drop_dict[drop] - keeps = drop_dict[keep] - try: - return int(drops), int(keeps) - except ValueError: - return drops, keeps - - return None, None - - def _get_duplicates(x: pd.DataFrame, last: Any) -> pd.Series: - """ - Extract unique duplicate values from a DataFrame column. - - Parameters - ---------- - x : pd.DataFrame - Input DataFrame containing the column to inspect. - last : Any - Column name used to extract values for duplicate detection. - - Returns - ------- - pd.Series - Series containing a single key "dups" with the list of unique - duplicate values found in the specified column. - """ - b = list(set(x[last].values)) - return pd.Series({"dups": b}) - - def _delete_values_equal_keys(dictionary: dict[Any, Any]) -> tuple[dict[Any, Any], list[Any]]: - """ - Remove entries where keys and values are identical. - - Parameters - ---------- - dictionary : dict - Input mapping of keys to values. - - Returns - ------- - tuple of dict and list of Any - A tuple containing: - - A filtered dictionary with identical key-value pairs removed - - A list of values that were removed because key == value - """ - new_dictionary = {} - drops = [] - for k, v in dictionary.items(): - if k == v: - drops.append(v) - continue - new_dictionary[k] = v - return new_dictionary, drops - - def replace_keeps_and_drops(df: pd.DataFrame, keep: Any) -> pd.DataFrame: - """ - Iteratively resolve and replace duplicate mappings in a DataFrame. - - Parameters - ---------- - df : pd.DataFrame - Input DataFrame containing values to be deduplicated. - keep : Any - Column name used to identify canonical ("keep") values. - - Returns - ------- - pd.DataFrame - Updated DataFrame with resolved duplicate mappings and cleaned - keep-column values. - """ - keeps = df[keep].values - while True: - df = df.sort_index() - replaces = df.apply(lambda row, keeps=keeps: _get_similars(row, keeps), axis=1) - replaces = {k: v for k, v in dict(replaces.values).items() if k is not None} - replaces, drops = _delete_values_equal_keys(replaces) - keys = replaces.keys() - values = replaces.values() - if len(drops) > 0: - df = df.drop(drops, axis="index") - df[keep] = df[keep].replace(replaces) - if not set(keys).intersection(values): - return df - - self.get_duplicates(keep=keep, limit=limit, equal_musts=equal_musts) - result = self.data.copy() - - dtypes = result.dtypes - - result["duplicate_status"] = 0 - if not hasattr(self, "matches"): - self.get_duplicates(limit="default", equal_musts=equal_musts) - - indexes = self.matches.index - indexes_df = indexes.to_frame() - drop = indexes_df.columns[self.drop] - keep = indexes_df.columns[self.keep] - indexes_df = indexes_df.drop_duplicates(subset=[drop]) - indexes_df = replace_keeps_and_drops(indexes_df, keep) - - dup_keep = indexes_df.groupby(indexes_df[keep]).apply( - lambda x: _get_duplicates(x, drop), - include_groups=False, - ) - dup_drop = indexes_df.groupby(indexes_df[drop]).apply( - lambda x: _get_duplicates(x, keep), - include_groups=False, - ) - duplicates = pd.concat([dup_keep, dup_drop]) - - indexes_good = indexes_df[keep].values.tolist() - indexes_bad = indexes_df[drop].values.tolist() - indexes = indexes_good + indexes_bad - result.loc[indexes_good, "duplicate_status"] = 1 - result.loc[indexes_bad, "duplicate_status"] = 3 - result = add_report_quality(result, indexes_bad=indexes_bad) - result = add_history(result, indexes) - result = result.sort_index(ascending=True) - result = add_duplicates(result, duplicates) - - self.result = result.astype(dtypes) - self.data = self.data.sort_index(ascending=True) - - return self.result - - def remove_duplicates( - self, - keep: str | int = "first", - limit: str | float | None = "default", - equal_musts: str | list[str] | None = None, - ) -> pd.DataFrame: - """ - Remove duplicate entries from the dataset. - - Parameters - ---------- - keep : str or int - Which entry to keep ('first' or 'last'). - limit : str or float, optional - Minimum similarity score to declare duplicates. - equal_musts : str or list[str], optional - Columns that must exactly match. - - Returns - ------- - pd.DataFrame - Dataset without duplicates. - """ - self.get_duplicates(keep=keep, limit=limit, equal_musts=equal_musts) - result = self.data.copy() - drops = self.matches.index.get_level_values(self.drop) - result = result.drop(drops) - self.result = result.sort_index(ascending=True) - self.data = self.data.sort_index(ascending=True) - return self.result - - -def set_comparer(compare_dict: dict[Any, Any]) -> Compare: - """ - Build a recordlinkage Compare object with optional conversion dictionary. - - Parameters - ---------- - compare_dict : dict - Dictionary of columns to compare, - e.g. {"column_name": {"method": "exact" | "numeric" | "date2", "kwargs": {...}}}. - - Returns - ------- - recordlinkage.Compare - Compare object with added comparison methods and a 'conversion' attribute. - """ - comparer = Compare() - comparer.conversion = {} - for column, c_dict in compare_dict.items(): - try: - method = c_dict["method"] - except KeyError as err: - raise KeyError( - "compare_kwargs must be hierarchically ordered: {: {'method': }}. 'method' not found" - ) from err - try: - kwargs = c_dict["kwargs"] - except KeyError: - kwargs = {} - getattr(comparer, method)( - column, - column, - label=column, - **kwargs, - ) - if method == "numeric": - comparer.conversion[column] = float - if method == "date": - comparer.conversion[column] = "datetime64[ns]" - if method == "date2": - comparer.conversion[column] = "convert_date_to_float" - - return comparer - - -def remove_ignores(dic: dict[Any, Any], columns: str | list[str]) -> dict[Any, Any]: - """ - Remove dictionary entries where keys or values match ignored columns. - - Parameters - ---------- - dic : dict - Original dictionary to filter. - columns : str or list[str] - Column(s) to ignore. - - Returns - ------- - dict - Filtered dictionary without the ignored columns. - """ - new_dict = {} - if isinstance(columns, str): - columns = [columns] - for k, v in dic.items(): - if k in columns: - continue - if v in columns: - continue - if isinstance(v, list): - v2 = [v_ for v_ in v if v_ not in columns] - if len(v2) == 0: - continue - v = v2 - new_dict[k] = v - return new_dict - - -def change_offsets(dic: dict[Any, Any], dic_o: dict[Any, Any]) -> dict[Any, Any]: - """ - Update the 'offset' value in compare dictionary kwargs. - - Parameters - ---------- - dic : dict - Original compare dictionary. - dic_o : dict - Dictionary mapping column names to new offsets. - - Returns - ------- - dict - Updated compare dictionary with modified offsets. - """ - for key in dic.keys(): - if key not in dic_o.keys(): - continue - dic[key]["kwargs"]["offset"] = dic_o[key] - return dic - - -def reindex_nulls(df: pd.DataFrame, null_label: Any) -> pd.DataFrame: - """ - Reindex a DataFrame in ascending order based on the number of 'null' strings in each row. - - Parameters - ---------- - df : pd.DataFrame - Input DataFrame. Cells with the string "null" are counted as nulls. - null_label : Any - Missing value representative. - - Returns - ------- - pd.DataFrame - DataFrame reindexed so that rows with fewer 'null' values appear first. - Original row order is preserved for rows with the same null count. - """ - - def is_missing(x: Any) -> bool: - """ - Determine whether a value is considered missing. - - This function supports scalar values as well as nested iterables - (lists, tuples, numpy arrays). A value is considered missing if it is: - - NaN (as defined by ``pandas.isna``) - - Equal to ``null_label`` - - Any element inside an iterable is missing (recursively checked) - - Parameters - ---------- - x : Any - Value to check for missingness. - - Returns - ------- - bool - True if the value (or any nested value) is missing, otherwise False. - """ - if isinstance(x, (list, tuple, np.ndarray)): - return any(is_missing(x_) for x_ in x) - - if pd.isna(x): - return True - - if x == null_label: - return True - - return False - - def count_nulls(row: pd.Series) -> int: - """ - Count the number of missing values in a pandas Series. - - Parameters - ---------- - row : pd.Series - Input row or Series to evaluate. - - Returns - ------- - int - Number of missing values in the Series. - """ - return sum(is_missing(x) for x in row) - - null_counts = df.apply(count_nulls, axis=1) - - if null_counts.empty: - return df - - sorted_index = null_counts.sort_values(kind="stable").index - return df.loc[sorted_index] - - -class Comparer: - """ - Wrapper around recordlinkage.Compare to compute pairwise comparisons on a DataFrame. - - This class initializes a recordlinkage indexer and Compare object, optionally converting - the data types before computing the comparisons. - - Parameters - ---------- - data : pd.DataFrame - The dataset to compare. - method : str - The indexing method from `recordlinkage.index`, e.g., 'SortedNeighbourhood'. - method_kwargs : dict - Keyword arguments to pass to the indexing method. - compare_kwargs : dict - Dictionary specifying columns and comparison methods for recordlinkage.Compare. - pairs_df : list[pd.DataFrame], optional - Optional pre-split DataFrames to pass to the indexer. Defaults to `[data]`. - convert_data : bool, default False - Whether to convert data using `compare_kwargs` conversion dictionary. - """ - - def __init__( - self, - data: pd.DataFrame, - method: str, - method_kwargs: dict[Any, Any], - compare_kwargs: dict[Any, Any], - pairs_df: list[pd.DataFrame] | None = None, - convert_data: bool = False, - ): - """ - Initialize a Comparer instance. - - Parameters - ---------- - data : pd.DataFrame - The dataset to compare. - method : str - The indexing method from `recordlinkage.index`, e.g., 'SortedNeighbourhood'. - method_kwargs : dict - Keyword arguments to pass to the indexing method. - compare_kwargs : dict - Dictionary specifying columns and comparison methods for recordlinkage.Compare. - pairs_df : list[pd.DataFrame], optional - Optional pre-split DataFrames to pass to the indexer. Defaults to `[data]`. - convert_data : bool, default False - Whether to convert data using `compare_kwargs` conversion dictionary. - """ - indexer = getattr(rl.index, method)(**method_kwargs) - comparer = set_comparer(compare_kwargs) - if convert_data is True: - data_cp = convert_series(data, comparer.conversion) - else: - data_cp = data.copy() - - if pairs_df is None: - pairs_df = [data_cp] - pairs = indexer.index(*pairs_df) - self.compared = comparer.compute(pairs, data_cp) - self.data = data_cp - - -def duplicate_check( - data: pd.DataFrame, - method: str = "SortedNeighbourhood", - method_kwargs: dict[Any, Any] | None = None, - compare_kwargs: dict[Any, Any] | None = None, - table_name: str | None = None, - ignore_columns: str | None = None, - ignore_entries: dict[str, Any] | None = None, - offsets: dict[str, Any] | None = None, - reindex_by_null: bool = True, - null_label: Any = "null", -) -> DupDetect: - """ - Run a duplicate check on a dataset using recordlinkage. - - Returns a DupDetect object. - - Parameters - ---------- - data : pandas.DataFrame - Dataset for duplicate check. - method : str, default: SortedNeighbourhood - Duplicate check method for recordlinkage. - method_kwargs : dict, optional - Keyword arguments for recordlinkage duplicate check. - Defaults to _method_kwargs. - compare_kwargs : dict, optional - Keyword arguments for recordlinkage.Compare object. - Defaults to _compare_kwargs. - table_name : str, optional - Name of the CDM table to be selected from data. - ignore_columns : str or list, optional - Name of data columns to be ignored for duplicate check. - ignore_entries : dict, optional - Key: Column name. - Value: value to be ignored. - E.g. offsets={"station_speed": null}. - offsets : dict, optional - Change offsets for recordlinkage Compare object. - Key: Column name. - Value: new offset. - E.g. offsets={"latitude": 0.1}. - reindex_by_null : bool, optional - If True data is re-indexed in ascending order according to the number of nulls in each row. - null_label : str, optional - Null label which is used if `reindex_by_null` is True. - - Returns - ------- - cdm_reader_mapper.DupDetect - A DupDetect instance. - """ - if reindex_by_null is True: - data = reindex_nulls(data, null_label=null_label) - - index = data.index - data.reset_index(drop=True) - - if table_name: - data = data[table_name] - if not method_kwargs: - method_kwargs = deepcopy(_method_kwargs) - if not compare_kwargs: - compare_kwargs = deepcopy(_compare_kwargs) - if ignore_columns: - method_kwargs = remove_ignores(method_kwargs, ignore_columns) - compare_kwargs = remove_ignores(compare_kwargs, ignore_columns) - if offsets: - compare_kwargs = change_offsets(compare_kwargs, offsets) - - dtypes = data.dtypes - - comparer = Comparer( - data=data, - method=method, - method_kwargs=method_kwargs, - compare_kwargs=compare_kwargs, - convert_data=True, - ) - compared = comparer.compared - data_ = comparer.data - - if ignore_entries is None: - return DupDetect(data, compared, method, method_kwargs, compare_kwargs) - - compared = [compared] - - for column_, entry_ in ignore_entries.items(): - if not isinstance(entry_, list): - entry_ = [entry_] - entries = data[column_].isin(entry_) - - d1 = data.mask(entries).dropna(how="all") - d2 = data.where(entries).dropna(how="all") - if d1.empty: - continue - if d2.empty: - continue - - method_kwargs_ = remove_ignores(method_kwargs, column_) - compare_kwargs_ = remove_ignores(compare_kwargs, column_) - - compared_ = Comparer( - data=data_, - method=method, - method_kwargs=method_kwargs_, - compare_kwargs=compare_kwargs_, - pairs_df=[d2, d1], - ).compared - compared_[list(ignore_entries.keys())] = 1 - compared.append(compared_) - - compared = pd.concat(compared) - data.set_index(index, inplace=True) - data = data.astype(dtypes) - return DupDetect(data, compared, method, method_kwargs, compare_kwargs) diff --git a/tests/_duplicates.py b/tests/_duplicates.py deleted file mode 100755 index 744e9f64..00000000 --- a/tests/_duplicates.py +++ /dev/null @@ -1,380 +0,0 @@ -from __future__ import annotations - -import pandas as pd - -from cdm_reader_mapper import read, test_data - - -def _manipulate_header(df): - df_ = _manipulation(df["header"]) - df_.columns = pd.MultiIndex.from_product([["header"], df_.columns]) - return df_ - - -def _manipulation(df): - df = df.copy() - # Duplicate : Different report_id's - # Failure in data set; - # each report needs a specific report_id - df.loc[5] = df.loc[4] - df.loc[5, "report_id"] = "ICOADS-302-N688EY" - df.loc[5, "report_quality"] = 2 - - # No Duplicate: Lat and Lon values differ to much - # valid is .5 degrees - df.loc[6] = df.loc[4] - df.loc[6, "report_id"] = "ICOADS-302-N688EZ" - df.loc[6, "latitude"] = -65.80 - df.loc[6, "longitude"] = 21.20 - df.loc[6, "report_quality"] = 2 - - # Duplicate: report timestamp differs no enough - # valid is 60 seconds - df.loc[7] = df.loc[1] - df.loc[7, "report_id"] = "ICOADS-302-N688DT" - df.loc[7, "report_timestamp"] = "2022-02-01 00:01:00" - df.loc[7, "report_quality"] = 2 - - # No Duplicate: report timestamp differs to much - # valid is 60 seconds - df.loc[8] = df.loc[1] - df.loc[8, "report_id"] = "ICOADS-302-N688DU" - df.loc[8, "report_timestamp"] = "2022-02-02 00:00:00" - df.loc[8, "report_quality"] = 2 - - # Duplicate : Different report_id's - # Failure in data set - df.loc[9] = df.loc[2] - df.loc[9, "report_id"] = "ICOADS-302-N688DW" - df.loc[9, "report_quality"] = 2 - - # Duplicate : Different report_id's - # Failure in data set - # each report needs a specific report_id - df.loc[10] = df.loc[3] - df.loc[10, "report_id"] = "ICOADS-302-N688EF" - df.loc[10, "latitude"] = 66.00 - df.loc[10, "longitude"] = 8.50 - df.loc[10, "report_quality"] = 2 - - # Duplicate: Lat and Lon values differ not enough - # valid is .5 degrees - df.loc[11] = df.loc[3] - df.loc[11, "report_id"] = "ICOADS-302-N688EE" - df.loc[11, "latitude"] = 66.05 - df.loc[11, "longitude"] = 8.15 - df.loc[11, "report_quality"] = 2 - - # No Duplicate: primary_station_id differs - df.loc[12] = df.loc[3] - df.loc[12, "report_id"] = "ICOADS-302-N688ED" - df.loc[12, "primary_station_id"] = "MASKSTIP" - df.loc[12, "report_quality"] = 2 - - # Duplicate: Lat and Lon values differ not enough - # valid is .5 degrees - df.loc[13] = df.loc[3] - df.loc[13, "report_id"] = "ICOADS-302-N688EC" - df.loc[13, "latitude"] = 65.95 - df.loc[13, "longitude"] = 8.05 - df.loc[13, "report_quality"] = 2 - - # Duplicate: ignore primary_station_id SHIP - df.loc[14] = df.loc[3] - df.loc[14, "report_id"] = "ICOADS-302-N688EG" - df.loc[14, "primary_station_id"] = "SHIP" - df.loc[14, "report_quality"] = 2 - - # No Duplicate: Lat and Lon values differ to much - # valid is .5 degrees - df.loc[15] = df.loc[4] - df.loc[15, "report_id"] = "ICOADS-302-N688EV" - df.loc[15, "latitude"] = 65.60 - df.loc[15, "longitude"] = -21.40 - df.loc[15, "report_quality"] = 2 - - # Duplicate: Lat and Lon values differ not enough - # valid is .5 degrees - df.loc[16] = df.loc[4] - df.loc[16, "report_id"] = "ICOADS-302-N688EW" - df.loc[16, "latitude"] = 65.90 - df.loc[16, "longitude"] = -21.10 - df.loc[16, "report_quality"] = 2 - - # No Duplicate: - df.loc[17] = df.loc[1] - df.loc[17, "report_id"] = "ICOADS-302-N688EK" - df.loc[17, "station_course"] = 316.0 - - # No Duplicate: - df.loc[18] = df.loc[1] - df.loc[18, "report_id"] = "ICOADS-302-N688EL" - df.loc[18, "station_speed"] = 4.0 - - # Duplicate: - df.loc[19] = df.loc[1] - df.loc[19, "report_id"] = "ICOADS-302-N688EM" - # print(df["station_course"]) - df.loc[19, "station_course"] = pd.NA - - # Duplicate: - df.loc[20] = df.loc[1] - df.loc[20, "report_id"] = "ICOADS-302-N688EN" - df.loc[20, "station_speed"] = pd.NA - return df - - -def _get_test_data(imodel): - pattern = f"test_{imodel}" - data_file = test_data[pattern]["cdm_header"] - data_path = data_file.parent - return read( - data_path, - suffix=f"{imodel}*", - cdm_subset="header", - mode="tables", - extension="pq", - ) - - -exp1 = { - "duplicate_status": [0, 1, 1, 1, 1, 3, 0, 3, 0, 3, 0, 3, 0, 3, 0, 0, 3, 0, 0, 0, 0], - "report_quality": [1, 1, 0, 1, 1, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 2, 1, 1, 1, 1, 1], - "duplicates": [ - None, - "{ICOADS-302-N688DT}", - "{ICOADS-302-N688DW}", - "{ICOADS-302-N688EC,ICOADS-302-N688EE}", - "{ICOADS-302-N688EW,ICOADS-302-N688EY}", - "{ICOADS-302-N688EI}", - None, - "{ICOADS-302-N688DS}", - None, - "{ICOADS-302-N688DV}", - None, - "{ICOADS-302-N688EH}", - None, - "{ICOADS-302-N688EH}", - None, - None, - "{ICOADS-302-N688EI}", - None, - None, - None, - None, - ], -} - -exp2 = { - "duplicate_status": [0, 1, 1, 3, 1, 3, 0, 3, 0, 3, 0, 3, 1, 3, 3, 0, 3, 0, 0, 0, 0], - "report_quality": [1, 1, 0, 1, 1, 1, 2, 1, 2, 1, 2, 1, 2, 1, 1, 2, 1, 1, 1, 1, 1], - "duplicates": [ - None, - "{ICOADS-302-N688DT}", - "{ICOADS-302-N688DW}", - "{ICOADS-302-N688ED}", - "{ICOADS-302-N688EW,ICOADS-302-N688EY}", - "{ICOADS-302-N688EI}", - None, - "{ICOADS-302-N688DS}", - None, - "{ICOADS-302-N688DV}", - None, - "{ICOADS-302-N688ED}", - "{ICOADS-302-N688EC,ICOADS-302-N688EE,ICOADS-302-N688EG,ICOADS-302-N688EH}", - "{ICOADS-302-N688ED}", - "{ICOADS-302-N688ED}", - None, - "{ICOADS-302-N688EI}", - None, - None, - None, - None, - ], -} - -exp3 = { - "duplicate_status": [1, 3, 3, 3, 3, 3, 3, 3, 0, 3, 3, 3, 0, 3, 0, 3, 3, 3, 3, 3, 3], - "report_quality": [1, 1, 1, 1, 1, 1, 1, 1, 2, 1, 1, 1, 2, 1, 2, 1, 1, 1, 1, 1, 1], - "duplicates": [ - "{ICOADS-302-N688DS,ICOADS-302-N688DT,ICOADS-302-N688DV,ICOADS-302-N688DW,ICOADS-302-N688EC,ICOADS-302-N688EE,ICOADS-302-N688EF,ICOADS-302-N688EH,ICOADS-302-N688EI,ICOADS-302-N688EK,ICOADS-302-N688EL,ICOADS-302-N688EM,ICOADS-302-N688EN,ICOADS-302-N688EV,ICOADS-302-N688EW,ICOADS-302-N688EY,ICOADS-302-N688EZ}", - "{ICOADS-302-N688DR}", - "{ICOADS-302-N688DR}", - "{ICOADS-302-N688DR}", - "{ICOADS-302-N688DR}", - "{ICOADS-302-N688DR}", - "{ICOADS-302-N688DR}", - "{ICOADS-302-N688DR}", - None, - "{ICOADS-302-N688DR}", - "{ICOADS-302-N688DR}", - "{ICOADS-302-N688DR}", - None, - "{ICOADS-302-N688DR}", - None, - "{ICOADS-302-N688DR}", - "{ICOADS-302-N688DR}", - "{ICOADS-302-N688DR}", - "{ICOADS-302-N688DR}", - "{ICOADS-302-N688DR}", - "{ICOADS-302-N688DR}", - ], -} - -exp4 = { - "duplicate_status": [0, 1, 1, 1, 1, 3, 0, 3, 0, 3, 0, 3, 3, 3, 3, 0, 3, 0, 0, 0, 0], - "report_quality": [1, 1, 0, 1, 1, 1, 2, 1, 2, 1, 2, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1], - "duplicates": [ - None, - "{ICOADS-302-N688DT}", - "{ICOADS-302-N688DW}", - "{ICOADS-302-N688EC,ICOADS-302-N688ED,ICOADS-302-N688EE,ICOADS-302-N688EG}", - "{ICOADS-302-N688EW,ICOADS-302-N688EY}", - "{ICOADS-302-N688EI}", - None, - "{ICOADS-302-N688DS}", - None, - "{ICOADS-302-N688DV}", - None, - "{ICOADS-302-N688EH}", - "{ICOADS-302-N688EH}", - "{ICOADS-302-N688EH}", - "{ICOADS-302-N688EH}", - None, - "{ICOADS-302-N688EI}", - None, - None, - None, - None, - ], -} - -exp5 = { - "duplicate_status": [0, 1, 1, 1, 1, 3, 0, 3, 0, 3, 3, 3, 0, 3, 0, 3, 3, 0, 0, 0, 0], - "report_quality": [1, 1, 0, 1, 1, 1, 2, 1, 2, 1, 1, 1, 2, 1, 2, 1, 1, 1, 1, 1, 1], - "duplicates": [ - None, - "{ICOADS-302-N688DT}", - "{ICOADS-302-N688DW}", - "{ICOADS-302-N688EC,ICOADS-302-N688EE,ICOADS-302-N688EF}", - "{ICOADS-302-N688EV,ICOADS-302-N688EW,ICOADS-302-N688EY}", - "{ICOADS-302-N688EI}", - None, - "{ICOADS-302-N688DS}", - None, - "{ICOADS-302-N688DV}", - "{ICOADS-302-N688EH}", - "{ICOADS-302-N688EH}", - None, - "{ICOADS-302-N688EH}", - None, - "{ICOADS-302-N688EI}", - "{ICOADS-302-N688EI}", - None, - None, - None, - None, - ], -} - -exp6 = { - "duplicate_status": [0, 0, 1, 1, 1, 3, 0, 0, 0, 3, 0, 3, 0, 3, 0, 0, 3, 0, 0, 0, 0], - "report_quality": [1, 1, 0, 1, 1, 1, 2, 2, 2, 1, 2, 1, 2, 1, 2, 2, 1, 1, 1, 1, 1], - "duplicates": [ - None, - None, - "{ICOADS-302-N688DW}", - "{ICOADS-302-N688EC,ICOADS-302-N688EE}", - "{ICOADS-302-N688EW,ICOADS-302-N688EY}", - "{ICOADS-302-N688EI}", - None, - None, - None, - "{ICOADS-302-N688DV}", - None, - "{ICOADS-302-N688EH}", - None, - "{ICOADS-302-N688EH}", - None, - None, - "{ICOADS-302-N688EI}", - None, - None, - None, - None, - ], -} - -exp7 = { - "duplicate_status": [0, 1, 1, 1, 1, 3, 0, 3, 0, 3, 0, 3, 0, 3, 0, 0, 3, 0, 0, 3, 3], - "report_quality": [1, 1, 0, 1, 1, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 2, 1, 1, 1, 1, 1], - "duplicates": [ - None, - "{ICOADS-302-N688DT,ICOADS-302-N688EM,ICOADS-302-N688EN}", - "{ICOADS-302-N688DW}", - "{ICOADS-302-N688EC,ICOADS-302-N688EE}", - "{ICOADS-302-N688EW,ICOADS-302-N688EY}", - "{ICOADS-302-N688EI}", - None, - "{ICOADS-302-N688DS}", - None, - "{ICOADS-302-N688DV}", - None, - "{ICOADS-302-N688EH}", - None, - "{ICOADS-302-N688EH}", - None, - None, - "{ICOADS-302-N688EI}", - None, - None, - "{ICOADS-302-N688DS}", - "{ICOADS-302-N688DS}", - ], -} - -exp8 = { - "duplicate_status": [0, 1, 1, 3, 1, 3, 0, 3, 0, 3, 0, 3, 1, 3, 3, 0, 3, 0, 0, 3, 3], - "report_quality": [1, 1, 0, 1, 1, 1, 2, 1, 2, 1, 2, 1, 2, 1, 1, 2, 1, 1, 1, 1, 1], - "duplicates": [ - None, - "{ICOADS-302-N688DT,ICOADS-302-N688EM,ICOADS-302-N688EN}", - "{ICOADS-302-N688DW}", - "{ICOADS-302-N688ED}", - "{ICOADS-302-N688EW,ICOADS-302-N688EY}", - "{ICOADS-302-N688EI}", - None, - "{ICOADS-302-N688DS}", - None, - "{ICOADS-302-N688DV}", - None, - "{ICOADS-302-N688ED}", - "{ICOADS-302-N688EC,ICOADS-302-N688EE,ICOADS-302-N688EG,ICOADS-302-N688EH}", - "{ICOADS-302-N688ED}", - "{ICOADS-302-N688ED}", - None, - "{ICOADS-302-N688EI}", - None, - None, - "{ICOADS-302-N688DS}", - "{ICOADS-302-N688DS}", - ], -} - -method_kwargs_ = { - "left_on": "report_timestamp", - "window": 7, - "block_on": ["primary_station_id"], -} - -compare_kwargs_ = { - "primary_station_id": {"method": "exact"}, - "report_timestamp": { - "method": "date2", - "kwargs": {"method": "gauss", "offset": 60.0}, - }, -} -cdm_icoads = _get_test_data("icoads_r302_d792") -cdm_icoads.data = _manipulate_header(cdm_icoads.data) - -cdm_craid = _get_test_data("craid") diff --git a/tests/test_databundle.py b/tests/test_databundle.py index 635247be..94eab1d1 100755 --- a/tests/test_databundle.py +++ b/tests/test_databundle.py @@ -6,7 +6,6 @@ from cdm_reader_mapper import DataBundle from cdm_reader_mapper.common.iterators import ParquetStreamReader from cdm_reader_mapper.core._utilities import SubscriptableMethod -from cdm_reader_mapper.duplicates.duplicates import DupDetect YR = ("core", "YR") @@ -1036,124 +1035,3 @@ def test_map_model_psr(): ) pd.testing.assert_frame_equal(result.data.read()[expected.columns], expected, check_dtype=False) - - -def test_duplicate_check_single_index(): - data = pd.DataFrame( - { - "report_id": ["A", "B", "C", "D", "E", "F"], - "primary_station_id": ["S1", "S1", "S2", "S2", "S1", "S1"], - "longitude": [0.1, 0.1, 0.2, 0.1, 0.1, 0.1], - "latitude": [51.0, 51.2, 52.0, 51.0, 51.0, 51.0], - "report_timestamp": pd.to_datetime( - [ - "2023-01-01 00:00", - "2023-01-01 00:00", - "2023-01-01 00:00", - "2023-01-01 00:00", - "2023-01-01 00:00", - "2023-01-01 00:00", - ] - ), - "station_speed": [10.0, 10.0, 8.0, 10.0, 8.0, 10.0], - "station_course": [90, 90, 180, 90, 60, 90], - "report_quality": 2, - ("header", "duplicates"): "", - ("header", "duplicate_status"): 4, - ("header", "history"): "", - } - ) - - db = DataBundle( - data=data, - ) - - db_dupdetect = db.duplicate_check() - - assert hasattr(db_dupdetect, "DupDetect") - detector = db_dupdetect.DupDetect - - assert isinstance(detector, DupDetect) - assert detector.data.shape[0] == data.shape[0] - - duplicates = db_dupdetect.get_duplicates() - - assert isinstance(duplicates, pd.DataFrame) - - pd.testing.assert_index_equal(duplicates.index, pd.MultiIndex.from_tuples([(5, 0)])) - - flagged = db_dupdetect.flag_duplicates() - - pd.testing.assert_series_equal( - flagged.data["duplicates"], - pd.Series(["{F}", "", "", "", "", "{A}"], name="duplicates"), - ) - pd.testing.assert_series_equal( - flagged.data["duplicate_status"], - pd.Series([1, 0, 0, 0, 0, 3], name="duplicate_status"), - ) - - removed = db_dupdetect.remove_duplicates() - - pd.testing.assert_frame_equal(data.iloc[[0, 1, 2, 3, 4]], removed.data) - - -def test_duplicate_check_multi_index(): - data = pd.DataFrame( - { - ("header", "report_id"): ["A", "B", "C", "D", "E", "F"], - ("header", "primary_station_id"): ["S1", "S1", "S2", "S2", "S1", "S1"], - ("header", "longitude"): [0.1, 0.1, 0.2, 0.1, 0.1, 0.1], - ("header", "latitude"): [51.0, 51.2, 52.0, 51.0, 51.0, 51.0], - ("header", "report_timestamp"): pd.to_datetime( - [ - "2023-01-01 00:00", - "2023-01-01 00:00", - "2023-01-01 00:00", - "2023-01-01 00:00", - "2023-01-01 00:00", - "2023-01-01 00:00", - ] - ), - ("header", "station_speed"): [10.0, 10.0, 8.0, 10.0, 8.0, 10.0], - ("header", "station_course"): [90, 90, 180, 90, 60, 90], - ("header", "report_quality"): 2, - ("header", "duplicates"): "", - ("header", "duplicate_status"): 4, - ("header", "history"): "", - } - ) - - db = DataBundle( - data=data, - mode="tables", - ) - - db_dupdetect = db.duplicate_check() - - assert hasattr(db_dupdetect, "DupDetect") - detector = db_dupdetect.DupDetect - - assert isinstance(detector, DupDetect) - assert detector.data.shape[0] == data.shape[0] - - duplicates = db_dupdetect.get_duplicates() - - assert isinstance(duplicates, pd.DataFrame) - - pd.testing.assert_index_equal(duplicates.index, pd.MultiIndex.from_tuples([(5, 0)])) - - flagged = db_dupdetect.flag_duplicates() - - pd.testing.assert_series_equal( - flagged.data[("header", "duplicates")], - pd.Series(["{F}", "", "", "", "", "{A}"], name=("header", "duplicates")), - ) - pd.testing.assert_series_equal( - flagged.data[("header", "duplicate_status")], - pd.Series([1, 0, 0, 0, 0, 3], name=("header", "duplicate_status")), - ) - - removed = db_dupdetect.remove_duplicates() - - pd.testing.assert_frame_equal(data.iloc[[0, 1, 2, 3, 4]], removed.data) diff --git a/tests/test_duplicates.py b/tests/test_duplicates.py deleted file mode 100755 index f4fe5016..00000000 --- a/tests/test_duplicates.py +++ /dev/null @@ -1,385 +0,0 @@ -from __future__ import annotations - -import pandas as pd -import pytest - -from cdm_reader_mapper.duplicates._duplicate_settings import ( - Compare, - _compare_kwargs, - _histories, - _method_kwargs, -) -from cdm_reader_mapper.duplicates.duplicates import ( - Comparer, - DupDetect, - add_duplicates, - add_history, - add_report_quality, - change_offsets, - convert_series, - duplicate_check, - reindex_nulls, - remove_ignores, - set_comparer, -) - - -def test_convert_series_basic(): - df = pd.DataFrame({"a": ["1", "2", "3"], "b": ["10.5", "20.5", "30.5"]}) - conversion = {"a": "int", "b": "float"} - - expected = pd.DataFrame({"a": [1, 2, 3], "b": [10.5, 20.5, 30.5]}) - - result = convert_series(df, conversion) - pd.testing.assert_frame_equal(result, expected) - - -def test_convert_series_null_replacement(): - df = pd.DataFrame({"a": ["1", None, "3"], "b": [None, "2.5", None]}) - conversion = {"a": "float", "b": "float"} - - expected = pd.DataFrame({"a": [1.0, 9999.0, 3.0], "b": [9999.0, 2.5, 9999.0]}) - - result = convert_series(df, conversion) - pd.testing.assert_frame_equal(result, expected) - - -def test_convert_series_date_to_float(): - df = pd.DataFrame({"date": ["2023-01-01", "2023-01-02", "2023-01-03"]}) - conversion = {"date": "convert_date_to_float"} - - result = convert_series(df, conversion) - expected = pd.DataFrame({"date": [0.0, 86400.0, 172800.0]}) - - pd.testing.assert_frame_equal(result, expected) - - -def test_convert_series_mixed(): - df = pd.DataFrame( - { - "num": ["1", None, "3"], - "val": ["10.5", "20.5", None], - "date": ["2023-01-01", None, "2023-01-03"], - } - ) - conversion = {"num": "Int64", "val": "float", "date": "convert_date_to_float"} - - result = convert_series(df, conversion) - expected = pd.DataFrame( - { - "num": [1, 9999, 3], - "val": [10.5, 20.5, 9999.0], - "date": [0.0, 9999.0, 172800.0], - } - ) - - pd.testing.assert_frame_equal(result, expected, check_dtype=False) - - -def test_add_history_basic(): - df = pd.DataFrame({"value": [10, 20, 30], "history": ["", "", ""]}) - - updated_df = add_history(df, [0, 2]) - - for idx in [0, 2]: - for msg in _histories.values(): - assert msg in updated_df.loc[idx, "history"] - - assert updated_df.loc[1, "history"] == "" - - -def test_add_history_creates_column(): - df = pd.DataFrame({"value": [1, 2, 3]}) - - updated_df = add_history(df, [0]) - for msg in _histories.values(): - assert msg in updated_df.loc[0, "history"] - assert updated_df.loc[1, "history"] == "" - - -def test_add_duplicates_strings(): - df = pd.DataFrame({"report_id": ["A", "B", "C", "D"]}) - - dups = pd.DataFrame( - { - 0: [["B", "C"], ["D"]], - }, - index=[0, 2], - ) - - updated = add_duplicates(df, dups) - - assert updated.loc[0, "duplicates"] == "{B,C}" - assert updated.loc[2, "duplicates"] == "{D}" - assert updated.loc[1, "duplicates"] == "" - assert updated.loc[3, "duplicates"] == "" - - -def test_add_duplicates_indices(): - df = pd.DataFrame({"report_id": ["A", "B", "C", "D"]}) - - dups = pd.DataFrame({0: [[1, 2], [3]]}, index=[0, 2]) - - updated = add_duplicates(df, dups) - - assert updated.loc[0, "duplicates"] == "{B,C}" - assert updated.loc[2, "duplicates"] == "{D}" - - -@pytest.mark.parametrize( - "initial, indexes_bad, expected", - [ - ([0, 0, 0], [1], [0, 1, 0]), - ([0, 0, 2], [0, 2], [1, 0, 1]), - ([1, 2, 3], [], [1, 2, 3]), - ], -) -def test_add_report_quality(initial, indexes_bad, expected): - df = pd.DataFrame({"report_quality": initial}) - result = add_report_quality(df, indexes_bad) - pd.testing.assert_series_equal( - result["report_quality"], - pd.Series(expected, name="report_quality"), - check_dtype=False, - ) - - -def test_set_comparer(): - compare_dict = { - "col1": {"method": "exact"}, - "col2": {"method": "numeric", "kwargs": {"method": "step", "offset": 0.1}}, - "col3": {"method": "date2"}, - } - comparer = set_comparer(compare_dict) - assert isinstance(comparer, Compare) - assert comparer.conversion["col2"] is float - assert comparer.conversion["col3"] == "convert_date_to_float" - - -def test_remove_ignores(): - dic = {"a": 1, "b": ["x", "y"], "c": "z"} - filtered = remove_ignores(dic, ["b", "c"]) - assert "b" not in filtered - assert "c" not in filtered - assert "a" in filtered - - -def test_change_offsets(): - dic = {"col1": {"kwargs": {"offset": 0.1}}, "col2": {"kwargs": {"offset": 0.2}}} - new_offsets = {"col1": 0.5} - updated = change_offsets(dic, new_offsets) - assert updated["col1"]["kwargs"]["offset"] == 0.5 - assert updated["col2"]["kwargs"]["offset"] == 0.2 - - -def test_reindex_nulls_orders_by_null_count(): - df = pd.DataFrame({"a": ["null", 1, "null", 2], "b": ["null", 2, 3, "null"]}) - result = reindex_nulls(df, null_label="null") - - expected_order = [1, 2, 3, 0] - assert list(result.index) == expected_order - - -def test_reindex_nulls_empty_df(): - df = pd.DataFrame() - result = reindex_nulls(df, null_label="null") - assert result.equals(df) - - -def test_comparer_basic(): - df = pd.DataFrame( - { - "report_id": ["A", "B", "C"], - "primary_station_id": ["S1", "S1", "S2"], - "longitude": [0.1, 0.15, 0.2], - "latitude": [51.0, 51.01, 52.0], - "report_timestamp": pd.to_datetime(["2023-01-01 00:00", "2023-01-01 00:01", "2023-01-02 00:00"]), - "station_speed": [10.0, 12.0, 8.0], - "station_course": [90, 180, 270], - } - ) - - comp = Comparer( - data=df, - method="SortedNeighbourhood", - method_kwargs=_method_kwargs, - compare_kwargs=_compare_kwargs, - convert_data=True, - ) - - assert isinstance(comp.data, pd.DataFrame) - assert isinstance(comp.compared, pd.DataFrame) - assert "primary_station_id" in comp.compared.columns - - -def test_duplicate_check_basic(): - df = pd.DataFrame( - { - "report_id": ["A", "B"], - "primary_station_id": ["S1", "S2"], - "longitude": [10, 20], - "latitude": [50, 60], - "report_timestamp": pd.to_datetime(["2023-01-01", "2023-01-02"]), - "station_speed": [5, 6], - "station_course": [100, 200], - } - ) - detector = duplicate_check(df, method="SortedNeighbourhood") - assert isinstance(detector, DupDetect) - assert detector.data.shape[0] == df.shape[0] - - -@pytest.fixture -def dummy_data(): - df = pd.DataFrame( - { - "report_id": ["A", "B", "C", "D", "E", "F"], - "primary_station_id": ["S1", "S1", "S2", "S2", "S1", "S1"], - "longitude": [0.1, 0.1, 0.2, 0.1, 0.1, 0.1], - "latitude": [51.0, 51.2, 52.0, 51.0, 51.0, 51.0], - "report_timestamp": pd.to_datetime( - [ - "2023-01-01 00:00", - "2023-01-01 00:00", - "2023-01-01 00:00", - "2023-01-01 00:00", - "2023-01-01 00:00", - "2023-01-01 00:00", - ] - ), - "station_speed": [10.0, 10.0, 8.0, 10.0, 8.0, 10.0], - "station_course": [90, 90, 180, 90, 60, 90], - "report_quality": 2, - } - ) - df.index = [0, 1, 2, 3, 4, 5] - return df - - -@pytest.mark.parametrize( - "kwargs, exp_ids", - [ - ({}, [(5, 0)]), - ({"offsets": {"latitude": 0.22}}, [(1, 0), (5, 0), (5, 1)]), - ( - {"ignore_columns": ["station_speed", "station_course"]}, - [(4, 0), (5, 0), (5, 4)], - ), - ({"ignore_entries": {"primary_station_id": "S2"}}, [(5, 0), (3, 0), (3, 5)]), - ({"ignore_entries": {"primary_station_id": ["S2"]}}, [(5, 0), (3, 0), (3, 5)]), - ], -) -def test_get_duplicates_kwargs(dummy_data, kwargs, exp_ids): - dd = duplicate_check( - dummy_data, - method="SortedNeighbourhood", - **kwargs, - ) - - assert hasattr(dd, "compared") - - dd.get_duplicates() - - assert hasattr(dd, "matches") - - pd.testing.assert_index_equal(dd.matches.index, pd.MultiIndex.from_tuples(exp_ids)) - - -def test_duplicate_check_reindex(dummy_data): - dd = duplicate_check( - dummy_data, - method="SortedNeighbourhood", - reindex_by_null=False, - ) - - assert hasattr(dd, "compared") - - result = dd.compared - - exp_idx = pd.MultiIndex.from_tuples([(1, 0), (3, 2), (4, 0), (4, 1), (5, 0), (5, 1), (5, 4)]) - pd.testing.assert_index_equal(dd.compared.index, exp_idx) - - assert list(result.columns) == [ - "primary_station_id", - "longitude", - "latitude", - "report_timestamp", - "station_speed", - "station_course", - ] - - -def test_get_duplicates_limit_and_equal_musts(dummy_data): - dd = duplicate_check(dummy_data, method="SortedNeighbourhood") - - matches_default = dd.get_duplicates(keep="first", limit=0.5) - expected_indexes = pd.MultiIndex.from_tuples([(5, 0)]) - pd.testing.assert_index_equal(matches_default.index, expected_indexes) - - matches_eq_str = dd.get_duplicates(keep="first", equal_musts="primary_station_id") - expected_indexes = pd.MultiIndex.from_tuples([(5, 0)]) - pd.testing.assert_index_equal(matches_eq_str.index, expected_indexes) - - matches_eq_list = dd.get_duplicates(keep="first", equal_musts=["primary_station_id", "longitude"]) - expected_indexes = pd.MultiIndex.from_tuples([(5, 0)]) - pd.testing.assert_index_equal(matches_eq_list.index, expected_indexes) - - -@pytest.mark.parametrize( - "keep, exp_duplicate_status, exp_duplicates", - [ - ("first", [1, 0, 0, 0, 0, 3], ["{F}", "", "", "", "", "{A}"]), - ("last", [3, 0, 0, 0, 0, 1], ["{F}", "", "", "", "", "{A}"]), - (0, [3, 0, 0, 0, 0, 1], ["{F}", "", "", "", "", "{A}"]), - (-1, [1, 0, 0, 0, 0, 3], ["{F}", "", "", "", "", "{A}"]), - ], -) -def test_flag_duplicates(dummy_data, keep, exp_duplicate_status, exp_duplicates): - dd = duplicate_check(dummy_data, method="SortedNeighbourhood") - - result = dd.flag_duplicates(keep=keep) - - assert "duplicate_status" in result.columns - assert "duplicates" in result.columns - assert "history" in result.columns - - expected_duplicate_status = pd.Series(exp_duplicate_status, name="duplicate_status") - expected_duplicates = pd.Series(exp_duplicates, name="duplicates") - - pd.testing.assert_series_equal(result["duplicate_status"], expected_duplicate_status) - pd.testing.assert_series_equal(result["duplicates"], expected_duplicates) - - -@pytest.mark.parametrize( - "keep, exp_idx", - [ - ("first", [0, 1, 2, 3, 4]), - ("last", [1, 2, 3, 4, 5]), - (0, [1, 2, 3, 4, 5]), - (-1, [0, 1, 2, 3, 4]), - ], -) -def test_remove_duplicates(dummy_data, keep, exp_idx): - dd = duplicate_check(dummy_data, method="SortedNeighbourhood") - - result = dd.remove_duplicates(keep=keep) - pd.testing.assert_index_equal(result.index, pd.Index(exp_idx)) - - -def test_get_total_score(dummy_data): - dd = duplicate_check(dummy_data, method="SortedNeighbourhood") - dd._total_score() - - assert hasattr(dd, "score") - - expected = pd.Series( - [5.0 / 6.0, 0.5, 2.0 / 3.0, 0.5, 1.0, 5.0 / 6.0, 2.0 / 3.0], - index=pd.MultiIndex.from_tuples([(1, 0), (3, 2), (4, 0), (4, 1), (5, 0), (5, 1), (5, 4)]), - ) - pd.testing.assert_series_equal(dd.score, expected) - - -def test_get_duplicates_raises(dummy_data): - dd = duplicate_check(dummy_data) - with pytest.raises(ValueError): - dd.get_duplicates(keep=1) diff --git a/tests/test_duplicates_data.py b/tests/test_duplicates_data.py deleted file mode 100755 index 71577122..00000000 --- a/tests/test_duplicates_data.py +++ /dev/null @@ -1,121 +0,0 @@ -from __future__ import annotations - -import pandas as pd -import pytest -from numpy.testing import assert_array_equal -from pandas.testing import assert_frame_equal - -from ._duplicates import ( - cdm_craid, - cdm_icoads, - compare_kwargs_, - exp1, - exp2, - exp3, - exp4, - exp5, - exp6, - exp7, - exp8, - method_kwargs_, -) - - -@pytest.mark.parametrize( - "method, method_kwargs, compare_kwargs, ignore_columns, ignore_entries, offsets, expected", - [ - (None, None, None, None, None, None, exp1), - ( - None, - None, - None, - None, - {"primary_station_id": ["SHIP", "MASKSTID"]}, - None, - exp2, - ), - ( - None, - None, - None, - None, - {"station_speed": pd.NA, "station_course": pd.NA}, - None, - exp7, - ), - ( - None, - None, - None, - None, - { - "primary_station_id": ["SHIP", "MASKSTID"], - "station_speed": pd.NA, - "station_course": pd.NA, - }, - None, - exp8, - ), - (None, method_kwargs_, None, None, None, None, exp1), - (None, None, compare_kwargs_, None, None, None, exp3), - (None, None, None, ["primary_station_id"], None, None, exp4), - ( - None, - None, - None, - None, - None, - {"latitude": 1.0, "longitude": 1.0, "report_timestamp": 360}, - exp5, - ), - ("Block", {"left_on": "report_timestamp"}, None, None, None, None, exp6), - ], -) -def test_duplicates_flag( - method, - method_kwargs, - compare_kwargs, - ignore_columns, - ignore_entries, - offsets, - expected, -): - if method is None: - method = "SortedNeighbourhood" - cdm_icoads.duplicate_check( - method=method, - method_kwargs=method_kwargs, - compare_kwargs=compare_kwargs, - ignore_columns=ignore_columns, - ignore_entries=ignore_entries, - offsets=offsets, - inplace=True, - ) - tables_dups_flagged = cdm_icoads.flag_duplicates() - result = tables_dups_flagged["header"] - assert_array_equal(result["duplicate_status"], expected["duplicate_status"]) - assert_array_equal(result["report_quality"], expected["report_quality"]) - assert_array_equal(result["duplicates"], expected["duplicates"]) - - -def test_duplicates_remove(): - cdm_icoads.duplicate_check( - ignore_entries={ - "primary_station_id": ["SHIP", "MASKSTID"], - "station_speed": pd.NA, - "station_course": pd.NA, - }, - inplace=True, - ) - - tables_dups_removed = cdm_icoads.remove_duplicates().data - expected = cdm_icoads.iloc[[0, 1, 2, 4, 6, 8, 10, 12, 15, 17, 18]] - assert_frame_equal(expected, tables_dups_removed) - - -def test_duplicates_craid(): - cdm_craid.duplicate_check(ignore_columns="primary_station_id", inplace=True) - cdm_craid.flag_duplicates(inplace=True) - assert_array_equal(cdm_craid[("header", "duplicate_status")], [0] * 10) - assert_array_equal(cdm_craid[("header", "report_quality")], [2] * 10) - assert_array_equal(cdm_craid[("header", "duplicates")], [None] * 10) From 15d4e2bd60945aa1b7e3f7d3623c08863b6c4d9e Mon Sep 17 00:00:00 2001 From: Ludwig Lierhammer Date: Fri, 29 May 2026 09:33:33 +0200 Subject: [PATCH 2/2] remove attrobute DupDetect from DataBundle --- src/cdm_reader_mapper/core/databundle.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/cdm_reader_mapper/core/databundle.py b/src/cdm_reader_mapper/core/databundle.py index 65723b30..4b3f20a8 100755 --- a/src/cdm_reader_mapper/core/databundle.py +++ b/src/cdm_reader_mapper/core/databundle.py @@ -153,7 +153,6 @@ def __init__( self._mask: pd.DataFrame | ParquetStreamReader = mask self._imodel = imodel self._mode = mode - self.DupDetect: DupDetect | None = None def __len__(self) -> int: """