Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions protos/feast/registry/RegistryServer.proto
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,20 @@ service RegistryServer{
rpc Refresh (RefreshRequest) returns (google.protobuf.Empty) {}
rpc Proto (google.protobuf.Empty) returns (feast.core.Registry) {}

// Search RPCs
rpc SearchProjects (SearchProjectsRequest) returns (SearchProjectsResponse) {}
}

message SearchProjectsRequest {
string search_text = 1;
google.protobuf.Timestamp updated_at = 2;
int32 page_size = 3;
int32 page_index = 4;
}

message SearchProjectsResponse {
repeated feast.core.ProjectMetadata projects = 1;
int32 total_page_indices = 2;
}

message RefreshRequest {
Expand Down
108 changes: 104 additions & 4 deletions sdk/python/feast/infra/registry/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Union
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

from pydantic import StrictStr
from sqlalchemy import ( # type: ignore
Expand All @@ -16,8 +16,10 @@
MetaData,
String,
Table,
and_,
create_engine,
delete,
func,
insert,
select,
update,
Expand Down Expand Up @@ -1001,14 +1003,14 @@ def get_all_projects(self) -> List[ProjectMetadata]:
project_name=project_id
)

project_metadata_model: ProjectMetadata = project_metadata_dict[
project_metadata: ProjectMetadata = project_metadata_dict[
project_id
]
if metadata_key == FeastMetadataKeys.PROJECT_UUID.value:
project_metadata_model.project_uuid = metadata_value
project_metadata.project_uuid = metadata_value

if metadata_key == FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value:
project_metadata_model.last_updated_timestamp = (
project_metadata.last_updated_timestamp = (
datetime.fromtimestamp(int(metadata_value), tz=timezone.utc)
)
return list(project_metadata_dict.values())
Expand Down Expand Up @@ -1043,3 +1045,101 @@ def _get_project_metadata(
return project_metadata
else:
return None

def search_projects(
self,
search_text: str = "",
updated_at: Optional[datetime] = None,
page_size: int = 10,
page_index: int = 0,
) -> Tuple[List[ProjectMetadata], int]:
"""
Search for projects based on the provided search parameters with pagination,
using SQL queries to handle filtering efficiently, including a LIKE statement for matching search_text.
Returns a tuple of the list of ProjectMetadata objects, and the total number of pages.

:param search_text: Filter by project name using a LIKE statement.
:param updated_at: Filter projects updated after this timestamp (datetime).
:param page_size: The number of results to return per page.
:param page_index: The index for fetching the next page (used as an offset).
:return: A tuple containing the list of ProjectMetadata objects, and the total number of pages.
"""
project_metadata_dict: Dict[str, ProjectMetadata] = {}

with self.engine.begin() as conn:
# Base SQL query to count total number of matching projects
count_stmt = select(func.count().label("total")).select_from(feast_metadata)

if search_text:
count_stmt = count_stmt.where(
feast_metadata.c.project_id.like(f"%{search_text}%")
)

if updated_at is not None:
updated_at_timestamp = int(updated_at.timestamp())
count_stmt = count_stmt.where(
and_(
feast_metadata.c.metadata_key
== FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value,
feast_metadata.c.metadata_value >= str(updated_at_timestamp),
)
)

# Execute the count query to get the total number of matching projects
total_projects = conn.execute(count_stmt).scalar() or 0

# gRPC defaults empty page_size to 0, which overrides the default value of 10. Have to explicitly set it to 10 here.
page_size = page_size if page_size > 0 else 10
# Calculate total pages
total_page_indices = (total_projects + page_size - 1) // page_size

# Base SQL query for retrieving projects
stmt = select(feast_metadata)

if search_text:
stmt = stmt.where(feast_metadata.c.project_id.like(f"%{search_text}%"))

if updated_at is not None:
stmt = stmt.where(
and_(
feast_metadata.c.metadata_key
== FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value,
feast_metadata.c.metadata_value >= str(updated_at_timestamp),
)
)

# Apply ordering and pagination
stmt = (
stmt.order_by(feast_metadata.c.project_id)
.limit(page_size)
.offset(page_index * page_size)
)

rows = conn.execute(stmt).all()

if rows:
for row in rows:
project_id = row._mapping["project_id"]
metadata_key = row._mapping["metadata_key"]
metadata_value = row._mapping["metadata_value"]

if project_id not in project_metadata_dict:
project_metadata_dict[project_id] = ProjectMetadata(
project_name=project_id
)

project_metadata: ProjectMetadata = project_metadata_dict[
project_id
]

if metadata_key == FeastMetadataKeys.PROJECT_UUID.value:
project_metadata.project_uuid = metadata_value

if metadata_key == FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value:
project_metadata.last_updated_timestamp = (
datetime.utcfromtimestamp(int(metadata_value))
)

project_list = list(project_metadata_dict.values())

return project_list, total_page_indices