diff --git a/protos/feast/registry/RegistryServer.proto b/protos/feast/registry/RegistryServer.proto index 44529f5409c..7c018f2b483 100644 --- a/protos/feast/registry/RegistryServer.proto +++ b/protos/feast/registry/RegistryServer.proto @@ -68,6 +68,20 @@ service RegistryServer{ rpc Refresh (RefreshRequest) returns (google.protobuf.Empty) {} rpc Proto (google.protobuf.Empty) returns (feast.core.Registry) {} + // Search RPCs + rpc SearchProjects (SearchProjectsRequest) returns (SearchProjectsResponse) {} +} + +message SearchProjectsRequest { + string search_text = 1; + google.protobuf.Timestamp updated_at = 2; + int32 page_size = 3; + int32 page_index = 4; +} + +message SearchProjectsResponse { + repeated feast.core.ProjectMetadata projects = 1; + int32 total_page_indices = 2; } message RefreshRequest { diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index eef3197c798..8ac040a7a1b 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -5,7 +5,7 @@ from datetime import datetime, timezone from enum import Enum from pathlib import Path -from typing import Any, Callable, Dict, List, Optional, Union +from typing import Any, Callable, Dict, List, Optional, Tuple, Union from pydantic import StrictStr from sqlalchemy import ( # type: ignore @@ -16,8 +16,10 @@ MetaData, String, Table, + and_, create_engine, delete, + func, insert, select, update, @@ -1001,14 +1003,14 @@ def get_all_projects(self) -> List[ProjectMetadata]: project_name=project_id ) - project_metadata_model: ProjectMetadata = project_metadata_dict[ + project_metadata: ProjectMetadata = project_metadata_dict[ project_id ] if metadata_key == FeastMetadataKeys.PROJECT_UUID.value: - project_metadata_model.project_uuid = metadata_value + project_metadata.project_uuid = metadata_value if metadata_key == FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value: - project_metadata_model.last_updated_timestamp = ( + project_metadata.last_updated_timestamp = ( datetime.fromtimestamp(int(metadata_value), tz=timezone.utc) ) return list(project_metadata_dict.values()) @@ -1043,3 +1045,101 @@ def _get_project_metadata( return project_metadata else: return None + + def search_projects( + self, + search_text: str = "", + updated_at: Optional[datetime] = None, + page_size: int = 10, + page_index: int = 0, + ) -> Tuple[List[ProjectMetadata], int]: + """ + Search for projects based on the provided search parameters with pagination, + using SQL queries to handle filtering efficiently, including a LIKE statement for matching search_text. + Returns a tuple of the list of ProjectMetadata objects, and the total number of pages. + + :param search_text: Filter by project name using a LIKE statement. + :param updated_at: Filter projects updated after this timestamp (datetime). + :param page_size: The number of results to return per page. + :param page_index: The index for fetching the next page (used as an offset). + :return: A tuple containing the list of ProjectMetadata objects, and the total number of pages. + """ + project_metadata_dict: Dict[str, ProjectMetadata] = {} + + with self.engine.begin() as conn: + # Base SQL query to count total number of matching projects + count_stmt = select(func.count().label("total")).select_from(feast_metadata) + + if search_text: + count_stmt = count_stmt.where( + feast_metadata.c.project_id.like(f"%{search_text}%") + ) + + if updated_at is not None: + updated_at_timestamp = int(updated_at.timestamp()) + count_stmt = count_stmt.where( + and_( + feast_metadata.c.metadata_key + == FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value, + feast_metadata.c.metadata_value >= str(updated_at_timestamp), + ) + ) + + # Execute the count query to get the total number of matching projects + total_projects = conn.execute(count_stmt).scalar() or 0 + + # gRPC defaults empty page_size to 0, which overrides the default value of 10. Have to explicitly set it to 10 here. + page_size = page_size if page_size > 0 else 10 + # Calculate total pages + total_page_indices = (total_projects + page_size - 1) // page_size + + # Base SQL query for retrieving projects + stmt = select(feast_metadata) + + if search_text: + stmt = stmt.where(feast_metadata.c.project_id.like(f"%{search_text}%")) + + if updated_at is not None: + stmt = stmt.where( + and_( + feast_metadata.c.metadata_key + == FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value, + feast_metadata.c.metadata_value >= str(updated_at_timestamp), + ) + ) + + # Apply ordering and pagination + stmt = ( + stmt.order_by(feast_metadata.c.project_id) + .limit(page_size) + .offset(page_index * page_size) + ) + + rows = conn.execute(stmt).all() + + if rows: + for row in rows: + project_id = row._mapping["project_id"] + metadata_key = row._mapping["metadata_key"] + metadata_value = row._mapping["metadata_value"] + + if project_id not in project_metadata_dict: + project_metadata_dict[project_id] = ProjectMetadata( + project_name=project_id + ) + + project_metadata: ProjectMetadata = project_metadata_dict[ + project_id + ] + + if metadata_key == FeastMetadataKeys.PROJECT_UUID.value: + project_metadata.project_uuid = metadata_value + + if metadata_key == FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value: + project_metadata.last_updated_timestamp = ( + datetime.utcfromtimestamp(int(metadata_value)) + ) + + project_list = list(project_metadata_dict.values()) + + return project_list, total_page_indices