diff --git a/.github/workflows/build_wheels.yml b/.github/workflows/build_wheels.yml index 72cb485b80b..0f77e2f30a4 100644 --- a/.github/workflows/build_wheels.yml +++ b/.github/workflows/build_wheels.yml @@ -97,7 +97,6 @@ jobs: # There's a `git restore` in here because `make install-go-ci-dependencies` is actually messing up go.mod & go.sum. run: | pip install -U pip setuptools wheel twine - make install-protoc-dependencies make build-ui git status git restore go.mod go.sum diff --git a/Makefile b/Makefile index 2ee3b60771d..77c7dcd13c2 100644 --- a/Makefile +++ b/Makefile @@ -395,10 +395,8 @@ install-go-ci-dependencies: go install github.com/go-python/gopy python -m pip install "pybindgen==0.22.1" "protobuf>=4.24.0,<5" -install-protoc-dependencies: - pip install --ignore-installed "protobuf>=4.24.0,<5" "grpcio-tools>=1.56.2,<2" mypy-protobuf==3.1.0 - -compile-protos-go: install-go-proto-dependencies install-protoc-dependencies +compile-protos-go: install-go-proto-dependencies + pip install --ignore-installed "protobuf==4.25.4" "grpcio-tools<=1.56.0" mypy-protobuf==3.1.0 python setup.py build_go_protos install-feast-ci-locally: diff --git a/environment-setup.md b/environment-setup.md index 5dde9dfd942..0f91ca4fee0 100644 --- a/environment-setup.md +++ b/environment-setup.md @@ -13,7 +13,6 @@ pip install cryptography -U conda install protobuf conda install pymssql pip install -e ".[dev]" -make install-protoc-dependencies PYTHON=3.9 make install-python-ci-dependencies PYTHON=3.9 ``` 4. start the docker daemon diff --git a/protos/feast/registry/RegistryServer.proto b/protos/feast/registry/RegistryServer.proto index 44529f5409c..adb6c0227ea 100644 --- a/protos/feast/registry/RegistryServer.proto +++ b/protos/feast/registry/RegistryServer.proto @@ -4,6 +4,7 @@ package feast.registry; import "google/protobuf/empty.proto"; import "google/protobuf/timestamp.proto"; +import "google/protobuf/wrappers.proto"; import "feast/core/Registry.proto"; import "feast/core/Entity.proto"; import "feast/core/DataSource.proto"; @@ -68,6 +69,44 @@ service RegistryServer{ rpc Refresh (RefreshRequest) returns (google.protobuf.Empty) {} rpc Proto (google.protobuf.Empty) returns (feast.core.Registry) {} + // Search RPCs + rpc ExpediaSearchProjects (ExpediaSearchProjectsRequest) returns (ExpediaSearchProjectsResponse) {} + rpc ExpediaSearchFeatureViews (ExpediaSearchFeatureViewsRequest) returns (ExpediaSearchFeatureViewsResponse) {} +} + +message ExpediaProjectAndRelatedFeatureViews { + feast.core.ProjectMetadata project_metadata = 1; + repeated feast.core.FeatureView feature_views = 2; +} + +message ExpediaSearchFeatureViewsRequest { + string search_text = 1; + google.protobuf.BoolValue online = 2; + string application = 3; + string team = 4; + google.protobuf.Timestamp created_at = 5; + google.protobuf.Timestamp updated_at = 6; + int32 page_size = 7; + int32 page_index = 8; +} + +message ExpediaSearchFeatureViewsResponse { + repeated feast.core.FeatureView feature_views = 1; + int32 total_feature_views = 2; + int32 total_page_indices = 3; +} + +message ExpediaSearchProjectsRequest { + string search_text = 1; + google.protobuf.Timestamp updated_at = 2; + int32 page_size = 3; + int32 page_index = 4; +} + +message ExpediaSearchProjectsResponse { + repeated ExpediaProjectAndRelatedFeatureViews projects_and_related_feature_views = 1; + int32 total_projects = 3; + int32 total_page_indices = 4; } message RefreshRequest { diff --git a/pyproject.toml b/pyproject.toml index 00170ab443e..e021ec4fcea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,5 +1,15 @@ [build-system] -requires = ["setuptools>=60", "wheel", "setuptools_scm>=6.2", "grpcio", "grpcio-tools>=1.47.0", "mypy-protobuf==3.1", "sphinx!=4.0.0"] +requires = [ + "grpcio-tools<=1.56.0", + "grpcio<=1.56.0", + "mypy-protobuf==3.1", + "protobuf==4.25.4", + "pybindgen==0.22.0", + "setuptools>=60", + "setuptools_scm>=6.2", + "sphinx!=4.0.0", + "wheel", +] build-backend = "setuptools.build_meta" [tool.setuptools_scm] @@ -26,4 +36,4 @@ exclude = [ "pb2.py", ".pyi", "protos", - "sdk/python/feast/embedded_go/lib"] + "sdk/python/feast/embedded_go/lib"] \ No newline at end of file diff --git a/sdk/python/feast/infra/registry/sql.py b/sdk/python/feast/infra/registry/sql.py index eef3197c798..91a0e568d10 100644 --- a/sdk/python/feast/infra/registry/sql.py +++ b/sdk/python/feast/infra/registry/sql.py @@ -1,7 +1,7 @@ import logging import time import uuid -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime, timezone from enum import Enum from pathlib import Path @@ -18,6 +18,7 @@ Table, create_engine, delete, + func, insert, select, update, @@ -60,6 +61,7 @@ from feast.protos.feast.core.ValidationProfile_pb2 import ( ValidationReference as ValidationReferenceProto, ) +from feast.protos.feast.registry import RegistryServer_pb2 from feast.repo_config import RegistryConfig from feast.saved_dataset import SavedDataset, ValidationReference from feast.stream_feature_view import StreamFeatureView @@ -1001,14 +1003,14 @@ def get_all_projects(self) -> List[ProjectMetadata]: project_name=project_id ) - project_metadata_model: ProjectMetadata = project_metadata_dict[ + project_metadata: ProjectMetadata = project_metadata_dict[ project_id ] if metadata_key == FeastMetadataKeys.PROJECT_UUID.value: - project_metadata_model.project_uuid = metadata_value + project_metadata.project_uuid = metadata_value if metadata_key == FeastMetadataKeys.LAST_UPDATED_TIMESTAMP.value: - project_metadata_model.last_updated_timestamp = ( + project_metadata.last_updated_timestamp = ( datetime.fromtimestamp(int(metadata_value), tz=timezone.utc) ) return list(project_metadata_dict.values()) @@ -1043,3 +1045,260 @@ def _get_project_metadata( return project_metadata else: return None + + def expedia_search_projects( + self, + search_text: str = "", + updated_at: Optional[datetime] = None, + page_size: int = 10, + page_index: int = 0, + ) -> RegistryServer_pb2.ExpediaSearchProjectsResponse: + project_metadata_dict: Dict[str, ProjectMetadata] = {} + + with self.engine.begin() as conn: + # Base SQL query to count total number of matching projects + count_stmt = select(func.count(feast_metadata.c.project_id.distinct())) + + if search_text: + count_stmt = count_stmt.where( + feast_metadata.c.project_id.like(f"%{search_text}%") + ) + + if updated_at is not None: + updated_at_timestamp = updated_at.timestamp() + count_stmt = count_stmt.where( + feast_metadata.c.last_updated_timestamp >= updated_at_timestamp + ) + + total_count = conn.execute(count_stmt).scalar() or 0 + + total_page_indices = (total_count + page_size - 1) // page_size + + # Base SQL query for retrieving projects, grouped by project_id + stmt = ( + select( + feast_metadata.c.project_id, + func.array_agg(feast_metadata.c.metadata_key).label("keys"), + func.array_agg(feast_metadata.c.metadata_value).label("values"), + func.max(feast_metadata.c.last_updated_timestamp).label( + "last_updated_timestamp" + ), + ) + .group_by(feast_metadata.c.project_id) + .order_by(feast_metadata.c.project_id) + .limit(page_size) + .offset(page_index * page_size) + ) + + if search_text: + stmt = stmt.where(feast_metadata.c.project_id.like(f"%{search_text}%")) + + if updated_at is not None: + updated_at_timestamp = updated_at.timestamp() + stmt = stmt.where( + feast_metadata.c.last_updated_timestamp >= updated_at_timestamp + ) + + rows = conn.execute(stmt).all() + + for row in rows: + project_id = row._mapping["project_id"] + keys = row._mapping["keys"] + values = row._mapping["values"] + last_updated_timestamp = row._mapping["last_updated_timestamp"] + + if project_id not in project_metadata_dict: + project_metadata_dict[project_id] = ProjectMetadata( + project_name=project_id, + last_updated_timestamp=datetime.utcfromtimestamp( + last_updated_timestamp + ), + ) + + project_metadata: ProjectMetadata = project_metadata_dict[project_id] + + for key, value in zip(keys, values): + if key == FeastMetadataKeys.PROJECT_UUID.value: + project_metadata.project_uuid = value + + project_list = list(project_metadata_dict.values()) + project_ids = [project.project_name for project in project_list] + + # Fetch all feature views in one query for the relevant projects + with self.engine.begin() as conn: + feature_views_stmt = select( + feature_views.c.project_id, feature_views.c.feature_view_proto + ).where(feature_views.c.project_id.in_(project_ids)) + + feature_view_rows = conn.execute(feature_views_stmt).all() + + # Group feature views by project + feature_views_by_project: Dict[str, List[FeatureViewProto]] = {} + for row in feature_view_rows: + project_id = row._mapping["project_id"] + feature_view_proto = FeatureViewProto.FromString( + row._mapping["feature_view_proto"] + ) + # for some reason, project is not set in the proto, so we set it here + feature_view_proto.spec.project = project_id + + if project_id not in feature_views_by_project: + feature_views_by_project[project_id] = [] + feature_views_by_project[project_id].append(feature_view_proto) + + # Use a thread pool to parallelize fetching feature views + def process_project(project): + project_metadata_proto = project.to_proto() + feature_views_proto = feature_views_by_project.get(project.project_name, []) + return RegistryServer_pb2.ExpediaProjectAndRelatedFeatureViews( + project_metadata=project_metadata_proto, + feature_views=feature_views_proto, + ) + + projects_and_related_feature_views: List[ + RegistryServer_pb2.ExpediaProjectAndRelatedFeatureViews + ] = [] + + # Parallel processing using ThreadPoolExecutor + with ThreadPoolExecutor() as executor: + future_to_project = { + executor.submit(process_project, project): project + for project in project_list + } + + for future in as_completed(future_to_project): + try: + result = future.result() + projects_and_related_feature_views.append(result) + except Exception as e: + logger.error(f"Error processing project: {e}") + + # Sort the results by project name, which was lost during parallel processing + projects_and_related_feature_views.sort( + key=lambda x: x.project_metadata.project.lower() + ) + + return RegistryServer_pb2.ExpediaSearchProjectsResponse( + projects_and_related_feature_views=projects_and_related_feature_views, + total_projects=total_count, + total_page_indices=total_page_indices, + ) + + def expedia_search_feature_views( + self, + search_text: Optional[str] = None, + online: Optional[bool] = None, + application: Optional[str] = None, + team: Optional[str] = None, + created_at: Optional[datetime] = None, + updated_at: Optional[datetime] = None, + page_size: int = 10, + page_index: int = 0, + ) -> RegistryServer_pb2.ExpediaSearchFeatureViewsResponse: + """ + Search for feature views based on the provided search parameters with pagination. + """ + offset = page_index * page_size + results = [] + filtered_results = [] + + # These filters require im-memory filtering, as the data is inside the proto and cannot be queried directly + in_memory_filtering_required = any( + [online is not None, application, team, created_at, updated_at] + ) + + with self.engine.begin() as conn: + if not in_memory_filtering_required: + stmt = ( + select(feature_views) + .where(feature_views.c.feature_view_name.like(f"%{search_text}%")) + .order_by(feature_views.c.feature_view_name) + .limit(page_size) + .offset(offset) + ) + + rows = conn.execute(stmt).all() + + for row in rows: + feature_view_proto = FeatureViewProto.FromString( + row._mapping["feature_view_proto"] + ) + # for some reason, project is not set in the proto, so we set it here + feature_view_proto.spec.project = row._mapping["project_id"] + results.append(feature_view_proto) + + total_stmt = ( + select(func.count()) + .select_from(feature_views) + .where(feature_views.c.feature_view_name.like(f"%{search_text}%")) + ) + total_count = conn.execute(total_stmt).scalar() or 0 + total_page_indices = (total_count + page_size - 1) // page_size + + # early return to avoid fetching data again + return RegistryServer_pb2.ExpediaSearchFeatureViewsResponse( + feature_views=results, + total_feature_views=total_count, + total_page_indices=total_page_indices, + ) + + # Doing in-memory filtering below + stmt = select(feature_views) + if search_text: + stmt = stmt.where( + feature_views.c.feature_view_name.like(f"%{search_text}%") + ).order_by(feature_views.c.feature_view_name) + + rows = conn.execute(stmt).all() + + for row in rows: + feature_view_proto = FeatureViewProto.FromString( + row._mapping["feature_view_proto"] + ) + # for some reason, project is not set in the proto, so we set it here + feature_view_proto.spec.project = row._mapping["project_id"] + add_to_results = True + + if online is not None and feature_view_proto.spec.online != online: + add_to_results = False + + if ( + application + and feature_view_proto.spec.tags.get("application") != application + ): + add_to_results = False + + if team and feature_view_proto.spec.tags.get("team") != team: + add_to_results = False + + if created_at: + if ( + feature_view_proto.meta.created_timestamp.ToDatetime() + < created_at + ): + add_to_results = False + + if updated_at is not None: + if ( + feature_view_proto.meta.last_updated_timestamp.ToDatetime() + < updated_at + ): + add_to_results = False + + if add_to_results: + filtered_results.append(feature_view_proto) + + # Calculate total filtered results + total_count = len(filtered_results) + + # Calculate total page indices based on filtered results + total_page_indices = (total_count + page_size - 1) // page_size + + # Apply pagination to the filtered results + paginated_results = filtered_results[offset : offset + page_size] + + return RegistryServer_pb2.ExpediaSearchFeatureViewsResponse( + feature_views=paginated_results, + total_feature_views=total_count, + total_page_indices=total_page_indices, + ) diff --git a/setup.py b/setup.py index 6ea0f76d379..f0424525a0c 100644 --- a/setup.py +++ b/setup.py @@ -41,7 +41,6 @@ "mmh3", "numpy>=1.22,<2", "pandas>=1.4.3,<3", - "protobuf>=4.24.0,<5.0.0", "pyarrow>=4", "pydantic>=2.0.0", "pygments>=2.12.0,<3", @@ -148,10 +147,10 @@ ] GRPCIO_REQUIRED = [ - "grpcio>=1.56.2,<2", - "grpcio-tools>=1.56.2,<2", - "grpcio-reflection>=1.56.2,<2", - "grpcio-health-checking>=1.56.2,<2", + "grpcio<=1.56.0", + "grpcio-tools<=1.56.0", + "grpcio-reflection<=1.56.0", + "grpcio-health-checking<=1.56.0", ] DUCKDB_REQUIRED = ["ibis-framework[duckdb]>=9.0.0,<10"] @@ -168,7 +167,7 @@ "virtualenv==20.23.0", "cryptography>=35.0,<43", "ruff>=0.3.3", - "grpcio-testing>=1.56.2,<2", + "grpcio-testing<=1.56.0", # FastAPI does not correctly pull starlette dependency on httpx see thread(https://github.com/tiangolo/fastapi/issues/5656). "httpx>=0.23.3", "minio==7.1.0", @@ -249,7 +248,8 @@ # Only set use_scm_version if git executable exists (setting this variable causes pip to use git under the hood) if shutil.which("git"): - use_scm_version = {"root": ".", "relative_to": __file__, "tag_regex": TAG_REGEX} + use_scm_version = {"root": ".", + "relative_to": __file__, "tag_regex": TAG_REGEX} else: use_scm_version = None @@ -281,7 +281,8 @@ def finalize_options(self): def python_folder(self): if self.inplace: return os.path.join( - os.path.dirname(__file__) or os.getcwd(), "sdk/python/feast/protos" + os.path.dirname(__file__) or os.getcwd( + ), "sdk/python/feast/protos" ) return os.path.join(self.build_lib, "feast/protos") @@ -353,17 +354,20 @@ def _ensure_go_and_proto_toolchain(): semver_string = re.search(r"go[\S]+", str(version)).group().lstrip("go") parts = semver_string.split(".") if not (int(parts[0]) >= 1 and int(parts[1]) >= 16): - raise RuntimeError(f"Go compiler too old; expected 1.16+ found {semver_string}") + raise RuntimeError( + f"Go compiler too old; expected 1.16+ found {semver_string}") path_val = _generate_path_with_gopath() try: - subprocess.check_call(["protoc-gen-go", "--version"], env={"PATH": path_val}) + subprocess.check_call( + ["protoc-gen-go", "--version"], env={"PATH": path_val}) subprocess.check_call( ["protoc-gen-go-grpc", "--version"], env={"PATH": path_val} ) except Exception as e: - raise RuntimeError("Unable to find go/grpc extensions for protoc") from e + raise RuntimeError( + "Unable to find go/grpc extensions for protoc") from e class BuildGoProtosCommand(Command): @@ -449,7 +453,8 @@ def run(self): python_requires=REQUIRES_PYTHON, url=URL, packages=find_packages( - where=PYTHON_CODE_PREFIX, exclude=("java", "infra", "sdk/python/tests", "ui") + where=PYTHON_CODE_PREFIX, exclude=( + "java", "infra", "sdk/python/tests", "ui") ), package_dir={"": PYTHON_CODE_PREFIX}, install_requires=REQUIRED, @@ -498,11 +503,12 @@ def run(self): entry_points={"console_scripts": ["feast=feast.cli:cli"]}, use_scm_version=use_scm_version, setup_requires=[ - "setuptools_scm", - "grpcio>=1.56.2,<2", - "grpcio-tools>=1.56.2,<2", - "mypy-protobuf>=3.1", + "grpcio-tools<=1.56.0", + "grpcio<=1.56.0", + "mypy-protobuf==3.1", + "protobuf==4.25.4", "pybindgen==0.22.0", + "setuptools_scm>=6.2", ], cmdclass={ "build_python_protos": BuildPythonProtosCommand,