diff --git a/src/amp/admin/datasets.py b/src/amp/admin/datasets.py index b47f107..a195a65 100644 --- a/src/amp/admin/datasets.py +++ b/src/amp/admin/datasets.py @@ -290,3 +290,32 @@ def delete(self, namespace: str, name: str) -> None: """ path = f'/datasets/{namespace}/{name}' self._admin._request('DELETE', path) + + def get_sync_progress(self, namespace: str, name: str, revision: str = 'latest') -> models.SyncProgressResponse: + """Get sync progress for a dataset version. + + Returns per-table sync progress including current block numbers, + job status, and file statistics. This is useful for monitoring + the progress of data extraction jobs. + + Args: + namespace: Dataset namespace + name: Dataset name + revision: Version tag or semantic version (default: 'latest') + + Returns: + SyncProgressResponse with sync progress for all tables + + Raises: + DatasetNotFoundError: If dataset/version not found + GetSyncProgressError: If retrieval fails + + Example: + >>> client = AdminClient('http://localhost:8080') + >>> progress = client.datasets.get_sync_progress('_', 'eth_firehose', 'latest') + >>> for table in progress.tables: + ... print(f'{table.table_name}: block {table.current_block}, status: {table.job_status}') + """ + path = f'/datasets/{namespace}/{name}/versions/{revision}/sync-progress' + response = self._admin._request('GET', path) + return models.SyncProgressResponse.model_validate(response.json()) diff --git a/src/amp/admin/models.py b/src/amp/admin/models.py index 4e1273d..e7e1745 100644 --- a/src/amp/admin/models.py +++ b/src/amp/admin/models.py @@ -777,6 +777,68 @@ class DeployRequest(BaseModel): """ +class TableSyncProgress(BaseModel): + """ + Sync progress information for a single table + """ + + table_name: str + """ + Name of the table within the dataset + """ + current_block: Optional[int] = None + """ + Highest block number that has been synced (null if no data yet) + """ + start_block: Optional[int] = None + """ + Lowest block number that has been synced (null if no data yet) + """ + job_id: Optional[int] = None + """ + ID of the writer job (null if no active job) + """ + job_status: Optional[str] = None + """ + Status of the writer job (null if no active job) + """ + files_count: int + """ + Number of Parquet files written for this table + """ + total_size_bytes: int + """ + Total size of all Parquet files in bytes + """ + + +class SyncProgressResponse(BaseModel): + """ + API response containing sync progress information for a dataset + """ + + dataset_namespace: str + """ + Dataset namespace + """ + dataset_name: str + """ + Dataset name + """ + revision: str + """ + Requested revision + """ + manifest_hash: str + """ + Resolved manifest hash + """ + tables: list[TableSyncProgress] + """ + Sync progress for each table in the dataset + """ + + class WorkerDetailResponse(BaseModel): """ Detailed worker information returned by the API