From 9818798d8cd94a5322eaa168d040afe9fdb3dffb Mon Sep 17 00:00:00 2001 From: Bhargav Dodla Date: Tue, 22 Oct 2024 17:34:24 -0700 Subject: [PATCH 1/3] feat: Added list proto methods --- .../feast/infra/registry/caching_registry.py | 108 ++++++++++++++++++ .../infra/registry/proto_registry_utils.py | 76 ++++++++++++ sdk/python/feast/infra/registry/sql.py | 86 ++++++++++++++ 3 files changed, 270 insertions(+) diff --git a/sdk/python/feast/infra/registry/caching_registry.py b/sdk/python/feast/infra/registry/caching_registry.py index 042eee06ab7..4f8acf4d505 100644 --- a/sdk/python/feast/infra/registry/caching_registry.py +++ b/sdk/python/feast/infra/registry/caching_registry.py @@ -19,6 +19,17 @@ from feast.permissions.permission import Permission from feast.project import Project from feast.project_metadata import ProjectMetadata +from feast.protos.feast.core.DataSource_pb2 import DataSourceList as DataSourceProtoList +from feast.protos.feast.core.Entity_pb2 import EntityList as EntityProtoList +from feast.protos.feast.core.FeatureService_pb2 import ( + FeatureServiceList as FeatureServiceProtoList, +) +from feast.protos.feast.core.FeatureView_pb2 import ( + FeatureViewList as FeatureViewProtoList, +) +from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( + OnDemandFeatureViewList as OnDemandFeatureViewProtoList, +) from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.saved_dataset import SavedDataset, ValidationReference from feast.stream_feature_view import StreamFeatureView @@ -467,3 +478,100 @@ def _start_thread_async_refresh(self, cache_ttl_seconds): def _exit_handler(self): self.registry_refresh_thread.cancel() + + # Methods to improve the registry calls + + @abstractmethod + def _list_feature_views_proto( + self, project: str, tags: Optional[dict[str, str]] + ) -> List[FeatureViewProtoList]: + pass + + def list_feature_views_proto( + self, + project: str, + allow_cache: bool = False, + tags: Optional[dict[str, str]] = None, + ) -> List[FeatureViewProtoList]: + if allow_cache: + self._refresh_cached_registry_if_necessary() + return proto_registry_utils.list_feature_views_proto( + self.cached_registry_proto, project, tags + ) + return self._list_feature_views_proto(project, tags) + + @abstractmethod + def _list_entities_proto( + self, project: str, tags: Optional[dict[str, str]] + ) -> List[EntityProtoList]: + pass + + def list_entities_proto( + self, + project: str, + allow_cache: bool = False, + tags: Optional[dict[str, str]] = None, + ) -> List[EntityProtoList]: + if allow_cache: + self._refresh_cached_registry_if_necessary() + return proto_registry_utils.list_entities_proto( + self.cached_registry_proto, project, tags + ) + return self._list_entities_proto(project, tags) + + @abstractmethod + def _list_data_sources_proto( + self, project: str, tags: Optional[dict[str, str]] + ) -> List[DataSourceProtoList]: + pass + + def list_data_sources_proto( + self, + project: str, + allow_cache: bool = False, + tags: Optional[dict[str, str]] = None, + ) -> List[DataSourceProtoList]: + if allow_cache: + self._refresh_cached_registry_if_necessary() + return proto_registry_utils.list_data_sources_proto( + self.cached_registry_proto, project, tags + ) + return self._list_data_sources_proto(project, tags) + + @abstractmethod + def _list_on_demand_feature_views_proto( + self, project: str, tags: Optional[dict[str, str]] + ) -> List[OnDemandFeatureViewProtoList]: + pass + + def list_on_demand_feature_views_proto( + self, + project: str, + allow_cache: bool = False, + tags: Optional[dict[str, str]] = None, + ) -> List[OnDemandFeatureViewProtoList]: + if allow_cache: + self._refresh_cached_registry_if_necessary() + return proto_registry_utils.list_on_demand_feature_views_proto( + self.cached_registry_proto, project, tags + ) + return self._list_on_demand_feature_views_proto(project, tags) + + @abstractmethod + def _list_feature_services_proto( + self, project: str, tags: Optional[dict[str, str]] + ) -> List[FeatureServiceProtoList]: + pass + + def list_feature_services_proto( + self, + project: str, + allow_cache: bool = False, + tags: Optional[dict[str, str]] = None, + ) -> List[FeatureServiceProtoList]: + if allow_cache: + self._refresh_cached_registry_if_necessary() + return proto_registry_utils.list_feature_services_proto( + self.cached_registry_proto, project, tags + ) + return self._list_feature_services_proto(project, tags) diff --git a/sdk/python/feast/infra/registry/proto_registry_utils.py b/sdk/python/feast/infra/registry/proto_registry_utils.py index fc5c3f6671e..01ad8a13874 100644 --- a/sdk/python/feast/infra/registry/proto_registry_utils.py +++ b/sdk/python/feast/infra/registry/proto_registry_utils.py @@ -21,6 +21,17 @@ from feast.permissions.permission import Permission from feast.project import Project from feast.project_metadata import ProjectMetadata +from feast.protos.feast.core.DataSource_pb2 import DataSourceList as DataSourceProtoList +from feast.protos.feast.core.Entity_pb2 import EntityList as EntityProtoList +from feast.protos.feast.core.FeatureService_pb2 import ( + FeatureServiceList as FeatureServiceProtoList, +) +from feast.protos.feast.core.FeatureView_pb2 import ( + FeatureViewList as FeatureViewProtoList, +) +from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( + OnDemandFeatureViewList as OnDemandFeatureViewProtoList, +) from feast.protos.feast.core.Registry_pb2 import ProjectMetadata as ProjectMetadataProto from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto from feast.saved_dataset import SavedDataset, ValidationReference @@ -367,3 +378,68 @@ def get_project(registry_proto: RegistryProto, name: str) -> Project: if projects_proto.spec.name == name: return Project.from_proto(projects_proto) raise ProjectObjectNotFoundException(name=name) + + +@registry_proto_cache_with_tags +def list_feature_views_proto( + registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]] +) -> List[FeatureViewProtoList]: + feature_views: List[FeatureViewProtoList] = [] + for feature_view_proto in registry_proto.feature_views: + if feature_view_proto.spec.project == project and utils.has_all_tags( + feature_view_proto.spec.tags, tags + ): + feature_views.append(feature_view_proto) + return feature_views + + +@registry_proto_cache_with_tags +def list_feature_services_proto( + registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]] +) -> List[FeatureServiceProtoList]: + feature_services = [] + for feature_service_proto in registry_proto.feature_services: + if feature_service_proto.spec.project == project and utils.has_all_tags( + feature_service_proto.spec.tags, tags + ): + feature_services.append(feature_service_proto) + return feature_services + + +@registry_proto_cache_with_tags +def list_on_demand_feature_views_proto( + registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]] +) -> List[OnDemandFeatureViewProtoList]: + on_demand_feature_views = [] + for on_demand_feature_view in registry_proto.on_demand_feature_views: + if on_demand_feature_view.spec.project == project and utils.has_all_tags( + on_demand_feature_view.spec.tags, tags + ): + on_demand_feature_views.append(on_demand_feature_view) + return on_demand_feature_views + + +@registry_proto_cache_with_tags +def list_entities_proto( + registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]] +) -> List[EntityProtoList]: + entities = [] + for entity_proto in registry_proto.entities: + if entity_proto.spec.project == project and utils.has_all_tags( + entity_proto.spec.tags, tags + ): + entities.append(entity_proto) + return entities + + +@registry_proto_cache_with_tags +def list_data_sources_proto( + registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]] +) -> List[DataSourceProtoList]: + data_sources = [] + for data_source_proto in registry_proto.data_sources: + if data_source_proto.project == project and utils.has_all_tags( + data_source_proto.tags, tags + ): + data_sources.append(data_source_proto) + return data_sources diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index a1ea1707a6f..90894074293 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -50,15 +50,26 @@ from feast.project import Project from feast.project_metadata import ProjectMetadata from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto +from feast.protos.feast.core.DataSource_pb2 import DataSourceList as DataSourceProtoList from feast.protos.feast.core.Entity_pb2 import Entity as EntityProto +from feast.protos.feast.core.Entity_pb2 import EntityList as EntityProtoList from feast.protos.feast.core.FeatureService_pb2 import ( FeatureService as FeatureServiceProto, ) +from feast.protos.feast.core.FeatureService_pb2 import ( + FeatureServiceList as FeatureServiceProtoList, +) from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto +from feast.protos.feast.core.FeatureView_pb2 import ( + FeatureViewList as FeatureViewProtoList, +) from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( OnDemandFeatureView as OnDemandFeatureViewProto, ) +from feast.protos.feast.core.OnDemandFeatureView_pb2 import ( + OnDemandFeatureViewList as OnDemandFeatureViewProtoList, +) from feast.protos.feast.core.Permission_pb2 import Permission as PermissionProto from feast.protos.feast.core.Project_pb2 import Project as ProjectProto from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto @@ -1326,3 +1337,78 @@ def get_project_metadata( datetime.utcfromtimestamp(int(metadata_value)) ) return project_metadata_model + + def _list_objects_proto( + self, + table: Table, + project: str, + proto_class: Any, + proto_field_name: str, + tags: Optional[dict[str, str]] = None, + ): + with self.read_engine.begin() as conn: + stmt = select(table).where(table.c.project_id == project) + rows = conn.execute(stmt).all() + if rows: + objects = [] + for row in rows: + obj = proto_class.FromString(row._mapping[proto_field_name]) + if utils.has_all_tags(dict(obj.spec.tags), tags): + objects.append(obj) + return objects + return [] + + def _list_feature_services_proto( + self, project: str, tags: Optional[dict[str, str]] + ) -> List[FeatureServiceProtoList]: + return self._list_objects_proto( + feature_services, + project, + FeatureServiceProto, + "feature_service_proto", + tags=tags, + ) + + def _list_feature_views_proto( + self, project: str, tags: Optional[dict[str, str]] + ) -> List[FeatureViewProtoList]: + return self._list_objects_proto( + feature_views, + project, + FeatureViewProto, + "feature_view_proto", + tags=tags, + ) + + def _list_on_demand_feature_views_proto( + self, project: str, tags: Optional[dict[str, str]] + ) -> List[OnDemandFeatureViewProtoList]: + return self._list_objects_proto( + on_demand_feature_views, + project, + OnDemandFeatureViewProto, + "feature_view_proto", + tags=tags, + ) + + def _list_entities_proto( + self, project: str, tags: Optional[dict[str, str]] + ) -> List[EntityProtoList]: + return self._list_objects_proto( + entities, + project, + EntityProto, + "entity_proto", + tags=tags, + ) + + def _list_data_sources_proto( + self, project: str, tags: Optional[dict[str, str]] + ) -> List[DataSourceProtoList]: + return self._list_objects_proto( + data_sources, + project, + DataSourceProto, + "data_source_proto", + tags=tags, + ) From 551ecf3c27a715a3006ddd02c1a6ae9336b061b2 Mon Sep 17 00:00:00 2001 From: Bhargav Dodla Date: Tue, 22 Oct 2024 19:43:20 -0700 Subject: [PATCH 2/3] refactor: Update return types in CachingRegistry methods --- .../feast/infra/registry/caching_registry.py | 20 +++---- .../infra/registry/proto_registry_utils.py | 30 +++++------ sdk/python/feast/infra/registry/sql.py | 54 +++++++++++++++---- 3 files changed, 70 insertions(+), 34 deletions(-) diff --git a/sdk/python/feast/infra/registry/caching_registry.py b/sdk/python/feast/infra/registry/caching_registry.py index 4f8acf4d505..a38bf17a11b 100644 --- a/sdk/python/feast/infra/registry/caching_registry.py +++ b/sdk/python/feast/infra/registry/caching_registry.py @@ -484,7 +484,7 @@ def _exit_handler(self): @abstractmethod def _list_feature_views_proto( self, project: str, tags: Optional[dict[str, str]] - ) -> List[FeatureViewProtoList]: + ) -> FeatureViewProtoList: pass def list_feature_views_proto( @@ -492,7 +492,7 @@ def list_feature_views_proto( project: str, allow_cache: bool = False, tags: Optional[dict[str, str]] = None, - ) -> List[FeatureViewProtoList]: + ) -> FeatureViewProtoList: if allow_cache: self._refresh_cached_registry_if_necessary() return proto_registry_utils.list_feature_views_proto( @@ -503,7 +503,7 @@ def list_feature_views_proto( @abstractmethod def _list_entities_proto( self, project: str, tags: Optional[dict[str, str]] - ) -> List[EntityProtoList]: + ) -> EntityProtoList: pass def list_entities_proto( @@ -511,7 +511,7 @@ def list_entities_proto( project: str, allow_cache: bool = False, tags: Optional[dict[str, str]] = None, - ) -> List[EntityProtoList]: + ) -> EntityProtoList: if allow_cache: self._refresh_cached_registry_if_necessary() return proto_registry_utils.list_entities_proto( @@ -522,7 +522,7 @@ def list_entities_proto( @abstractmethod def _list_data_sources_proto( self, project: str, tags: Optional[dict[str, str]] - ) -> List[DataSourceProtoList]: + ) -> DataSourceProtoList: pass def list_data_sources_proto( @@ -530,7 +530,7 @@ def list_data_sources_proto( project: str, allow_cache: bool = False, tags: Optional[dict[str, str]] = None, - ) -> List[DataSourceProtoList]: + ) -> DataSourceProtoList: if allow_cache: self._refresh_cached_registry_if_necessary() return proto_registry_utils.list_data_sources_proto( @@ -541,7 +541,7 @@ def list_data_sources_proto( @abstractmethod def _list_on_demand_feature_views_proto( self, project: str, tags: Optional[dict[str, str]] - ) -> List[OnDemandFeatureViewProtoList]: + ) -> OnDemandFeatureViewProtoList: pass def list_on_demand_feature_views_proto( @@ -549,7 +549,7 @@ def list_on_demand_feature_views_proto( project: str, allow_cache: bool = False, tags: Optional[dict[str, str]] = None, - ) -> List[OnDemandFeatureViewProtoList]: + ) -> OnDemandFeatureViewProtoList: if allow_cache: self._refresh_cached_registry_if_necessary() return proto_registry_utils.list_on_demand_feature_views_proto( @@ -560,7 +560,7 @@ def list_on_demand_feature_views_proto( @abstractmethod def _list_feature_services_proto( self, project: str, tags: Optional[dict[str, str]] - ) -> List[FeatureServiceProtoList]: + ) -> FeatureServiceProtoList: pass def list_feature_services_proto( @@ -568,7 +568,7 @@ def list_feature_services_proto( project: str, allow_cache: bool = False, tags: Optional[dict[str, str]] = None, - ) -> List[FeatureServiceProtoList]: + ) -> FeatureServiceProtoList: if allow_cache: self._refresh_cached_registry_if_necessary() return proto_registry_utils.list_feature_services_proto( diff --git a/sdk/python/feast/infra/registry/proto_registry_utils.py b/sdk/python/feast/infra/registry/proto_registry_utils.py index 01ad8a13874..d7de12a681a 100644 --- a/sdk/python/feast/infra/registry/proto_registry_utils.py +++ b/sdk/python/feast/infra/registry/proto_registry_utils.py @@ -383,63 +383,63 @@ def get_project(registry_proto: RegistryProto, name: str) -> Project: @registry_proto_cache_with_tags def list_feature_views_proto( registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]] -) -> List[FeatureViewProtoList]: - feature_views: List[FeatureViewProtoList] = [] +) -> FeatureViewProtoList: + feature_views: FeatureViewProtoList = FeatureViewProtoList() for feature_view_proto in registry_proto.feature_views: if feature_view_proto.spec.project == project and utils.has_all_tags( feature_view_proto.spec.tags, tags ): - feature_views.append(feature_view_proto) + feature_views.featureviews.append(feature_view_proto) return feature_views @registry_proto_cache_with_tags def list_feature_services_proto( registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]] -) -> List[FeatureServiceProtoList]: - feature_services = [] +) -> FeatureServiceProtoList: + feature_services = FeatureServiceProtoList() for feature_service_proto in registry_proto.feature_services: if feature_service_proto.spec.project == project and utils.has_all_tags( feature_service_proto.spec.tags, tags ): - feature_services.append(feature_service_proto) + feature_services.featureservices.append(feature_service_proto) return feature_services @registry_proto_cache_with_tags def list_on_demand_feature_views_proto( registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]] -) -> List[OnDemandFeatureViewProtoList]: - on_demand_feature_views = [] +) -> OnDemandFeatureViewProtoList: + on_demand_feature_views = OnDemandFeatureViewProtoList() for on_demand_feature_view in registry_proto.on_demand_feature_views: if on_demand_feature_view.spec.project == project and utils.has_all_tags( on_demand_feature_view.spec.tags, tags ): - on_demand_feature_views.append(on_demand_feature_view) + on_demand_feature_views.ondemandfeatureviews.append(on_demand_feature_view) return on_demand_feature_views @registry_proto_cache_with_tags def list_entities_proto( registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]] -) -> List[EntityProtoList]: - entities = [] +) -> EntityProtoList: + entities = EntityProtoList() for entity_proto in registry_proto.entities: if entity_proto.spec.project == project and utils.has_all_tags( entity_proto.spec.tags, tags ): - entities.append(entity_proto) + entities.entities.append(entity_proto) return entities @registry_proto_cache_with_tags def list_data_sources_proto( registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]] -) -> List[DataSourceProtoList]: - data_sources = [] +) -> DataSourceProtoList: + data_sources = DataSourceProtoList() for data_source_proto in registry_proto.data_sources: if data_source_proto.project == project and utils.has_all_tags( data_source_proto.tags, tags ): - data_sources.append(data_source_proto) + data_sources.datasources.append(data_source_proto) return data_sources diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index 90894074293..ae67de4e82c 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -4,7 +4,7 @@ from datetime import datetime, timezone from enum import Enum from pathlib import Path -from typing import Any, Callable, Dict, List, Optional, Union, cast +from typing import Any, Callable, Dict, List, Optional, Type, Union, cast from pydantic import StrictInt, StrictStr from sqlalchemy import ( # type: ignore @@ -1338,6 +1338,26 @@ def get_project_metadata( ) return project_metadata_model + def get_objects_list(self, proto_class: Type) -> Union[ + FeatureViewProtoList, + OnDemandFeatureViewProtoList, + EntityProtoList, + DataSourceProtoList, + FeatureServiceProtoList, + ]: + # Define the mapping from proto_class to list type + proto_class_to_list = { + FeatureViewProto: FeatureViewProtoList, + OnDemandFeatureViewProto: OnDemandFeatureViewProtoList, + EntityProto: EntityProtoList, + DataSourceProto: DataSourceProtoList, + FeatureServiceProto: FeatureServiceProtoList, + } + proto_list = proto_class_to_list.get(proto_class, None) + if proto_list is None: + raise ValueError(f"Unsupported proto class: {proto_class}") + return proto_list() + def _list_objects_proto( self, table: Table, @@ -1350,17 +1370,33 @@ def _list_objects_proto( stmt = select(table).where(table.c.project_id == project) rows = conn.execute(stmt).all() if rows: - objects = [] + objects = self.get_objects_list(proto_class) for row in rows: obj = proto_class.FromString(row._mapping[proto_field_name]) - if utils.has_all_tags(dict(obj.spec.tags), tags): - objects.append(obj) + if utils.has_all_tags( + dict( + obj.tags + if isinstance(objects, DataSourceProtoList) + else obj.spec.tags + ), + tags, + ): + if isinstance(objects, DataSourceProtoList): + objects.datasources.append(obj) + elif isinstance(objects, FeatureViewProtoList): + objects.featureviews.append(obj) + elif isinstance(objects, OnDemandFeatureViewProtoList): + objects.ondemandfeatureviews.append(obj) + elif isinstance(objects, EntityProtoList): + objects.entities.append(obj) + elif isinstance(objects, FeatureServiceProtoList): + objects.featureservices.append(obj) return objects return [] def _list_feature_services_proto( self, project: str, tags: Optional[dict[str, str]] - ) -> List[FeatureServiceProtoList]: + ) -> FeatureServiceProtoList: return self._list_objects_proto( feature_services, project, @@ -1371,7 +1407,7 @@ def _list_feature_services_proto( def _list_feature_views_proto( self, project: str, tags: Optional[dict[str, str]] - ) -> List[FeatureViewProtoList]: + ) -> FeatureViewProtoList: return self._list_objects_proto( feature_views, project, @@ -1382,7 +1418,7 @@ def _list_feature_views_proto( def _list_on_demand_feature_views_proto( self, project: str, tags: Optional[dict[str, str]] - ) -> List[OnDemandFeatureViewProtoList]: + ) -> OnDemandFeatureViewProtoList: return self._list_objects_proto( on_demand_feature_views, project, @@ -1393,7 +1429,7 @@ def _list_on_demand_feature_views_proto( def _list_entities_proto( self, project: str, tags: Optional[dict[str, str]] - ) -> List[EntityProtoList]: + ) -> EntityProtoList: return self._list_objects_proto( entities, project, @@ -1404,7 +1440,7 @@ def _list_entities_proto( def _list_data_sources_proto( self, project: str, tags: Optional[dict[str, str]] - ) -> List[DataSourceProtoList]: + ) -> DataSourceProtoList: return self._list_objects_proto( data_sources, project, From 568158cd4badcb58239cb3336d2926a5f7cbe2c2 Mon Sep 17 00:00:00 2001 From: Bhargav Dodla Date: Tue, 22 Oct 2024 20:00:41 -0700 Subject: [PATCH 3/3] refactor: Update return types in get_objects_list method --- sdk/python/feast/infra/registry/sql.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index ae67de4e82c..6ca542e79a7 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -1338,7 +1338,9 @@ def get_project_metadata( ) return project_metadata_model - def get_objects_list(self, proto_class: Type) -> Union[ + def get_objects_list( + self, proto_class: Type + ) -> Union[ FeatureViewProtoList, OnDemandFeatureViewProtoList, EntityProtoList,