diff --git a/sdk/python/feast/infra/online_stores/eg_valkey.py b/sdk/python/feast/infra/online_stores/eg_valkey.py index e83c1a74c17..2279db3f037 100644 --- a/sdk/python/feast/infra/online_stores/eg_valkey.py +++ b/sdk/python/feast/infra/online_stores/eg_valkey.py @@ -407,7 +407,7 @@ def online_write_batch( ttl = online_store_config.key_ttl_seconds if ttl: pipe.expire(name=valkey_key_bin, time=ttl) - results = pipe.execute() + results = pipe.execute() if progress: progress(len(results)) diff --git a/sdk/python/feast/infra/online_stores/redis.py b/sdk/python/feast/infra/online_stores/redis.py index bcee8627891..ae0b41ca200 100644 --- a/sdk/python/feast/infra/online_stores/redis.py +++ b/sdk/python/feast/infra/online_stores/redis.py @@ -351,10 +351,10 @@ def online_write_batch( num_cmds += 2 if num_cmds >= num_cmds_per_pipeline_execute: # TODO: May be add retries with backoff - pipe.execute() # flush + results = pipe.execute() # flush num_cmds = 0 if num_cmds: - pipe.execute() # flush any remaining data in the last batch + results = pipe.execute() # flush any remaining data in the last batch else: # check if a previous record under the key bin exists # TODO: investigate if check and set is a better approach rather than pulling all entity ts and then setting @@ -400,7 +400,7 @@ def online_write_batch( ttl = online_store_config.key_ttl_seconds if ttl: pipe.expire(name=redis_key_bin, time=ttl) - results = pipe.execute() + results = pipe.execute() if progress: progress(len(results)) diff --git a/sdk/python/tests/unit/infra/online_store/test_redis.py b/sdk/python/tests/unit/infra/online_store/test_redis.py index e5d1b21f96b..4e92b19a54f 100644 --- a/sdk/python/tests/unit/infra/online_store/test_redis.py +++ b/sdk/python/tests/unit/infra/online_store/test_redis.py @@ -46,7 +46,7 @@ def base_repo_config_kwargs(): @pytest.fixture -def repo_config(base_repo_config_kwargs) -> RepoConfig: +def repo_config_without_docker_connection_string(base_repo_config_kwargs) -> RepoConfig: return RepoConfig( **base_repo_config_kwargs, online_store=RedisOnlineStoreConfig( @@ -56,9 +56,7 @@ def repo_config(base_repo_config_kwargs) -> RepoConfig: @pytest.fixture -def repo_config_with_docker_connection_string( - redis_online_store_config, base_repo_config_kwargs -) -> RepoConfig: +def repo_config(redis_online_store_config, base_repo_config_kwargs) -> RepoConfig: return RepoConfig( **base_repo_config_kwargs, online_store=RedisOnlineStoreConfig( @@ -84,13 +82,15 @@ def feature_view(): return feature_view -def test_generate_entity_redis_keys(redis_online_store: RedisOnlineStore, repo_config): +def test_generate_entity_redis_keys( + redis_online_store: RedisOnlineStore, repo_config_without_docker_connection_string +): entity_keys = [ EntityKeyProto(join_keys=["entity"], entity_values=[ValueProto(int32_val=1)]), ] actual = redis_online_store._generate_redis_keys_for_entities( - repo_config, entity_keys + repo_config_without_docker_connection_string, entity_keys ) expected = [ b"\x01\x00\x00\x00\x02\x00\x00\x00\x06\x00\x00\x00entity\x03\x00\x00\x00\x04\x00\x00\x00\x01\x00\x00\x00test" @@ -176,7 +176,7 @@ def test_get_features_for_entity(redis_online_store: RedisOnlineStore, feature_v @pytest.mark.docker def test_redis_online_write_batch_with_timestamp_as_sortkey( - repo_config_with_docker_connection_string: RepoConfig, + repo_config: RepoConfig, redis_online_store: RedisOnlineStore, ): ( @@ -185,15 +185,13 @@ def test_redis_online_write_batch_with_timestamp_as_sortkey( ) = _create_sorted_feature_view_with_timestamp_as_sortkey() redis_online_store.online_write_batch( - config=repo_config_with_docker_connection_string, + config=repo_config, table=feature_view, data=data, progress=None, ) - connection_string = ( - repo_config_with_docker_connection_string.online_store.connection_string - ) + connection_string = repo_config.online_store.connection_string connection_string_split = connection_string.split(":") conn_dict = {} conn_dict["host"] = connection_string_split[0] @@ -209,9 +207,9 @@ def test_redis_online_write_batch_with_timestamp_as_sortkey( ) redis_key_bin_driver_1 = _redis_key( - repo_config_with_docker_connection_string.project, + repo_config.project, entity_key_driver_1, - entity_key_serialization_version=repo_config_with_docker_connection_string.entity_key_serialization_version, + entity_key_serialization_version=repo_config.entity_key_serialization_version, ) zset_key_driver_1 = redis_online_store.zset_key_bytes( @@ -223,9 +221,9 @@ def test_redis_online_write_batch_with_timestamp_as_sortkey( entity_values=[ValueProto(int32_val=2)], ) redis_key_bin_driver_2 = _redis_key( - repo_config_with_docker_connection_string.project, + repo_config.project, entity_key_driver_2, - entity_key_serialization_version=repo_config_with_docker_connection_string.entity_key_serialization_version, + entity_key_serialization_version=repo_config.entity_key_serialization_version, ) zset_key_driver_2 = redis_online_store.zset_key_bytes( @@ -269,7 +267,7 @@ def test_redis_online_write_batch_with_timestamp_as_sortkey( @pytest.mark.docker def test_redis_online_write_batch_with_float_as_sortkey( - repo_config_with_docker_connection_string: RepoConfig, + repo_config: RepoConfig, redis_online_store: RedisOnlineStore, ): ( @@ -278,15 +276,13 @@ def test_redis_online_write_batch_with_float_as_sortkey( ) = _create_sorted_feature_view_with_float_as_sortkey() redis_online_store.online_write_batch( - config=repo_config_with_docker_connection_string, + config=repo_config, table=feature_view, data=data, progress=None, ) - connection_string = ( - repo_config_with_docker_connection_string.online_store.connection_string - ) + connection_string = repo_config.online_store.connection_string connection_string_split = connection_string.split(":") conn_dict = {} conn_dict["host"] = connection_string_split[0] @@ -302,9 +298,9 @@ def test_redis_online_write_batch_with_float_as_sortkey( ) redis_key_bin_driver_1 = _redis_key( - repo_config_with_docker_connection_string.project, + repo_config.project, entity_key_driver_1, - entity_key_serialization_version=repo_config_with_docker_connection_string.entity_key_serialization_version, + entity_key_serialization_version=repo_config.entity_key_serialization_version, ) zset_key_driver_1 = redis_online_store.zset_key_bytes( @@ -316,9 +312,9 @@ def test_redis_online_write_batch_with_float_as_sortkey( entity_values=[ValueProto(int32_val=2)], ) redis_key_bin_driver_2 = _redis_key( - repo_config_with_docker_connection_string.project, + repo_config.project, entity_key_driver_2, - entity_key_serialization_version=repo_config_with_docker_connection_string.entity_key_serialization_version, + entity_key_serialization_version=repo_config.entity_key_serialization_version, ) zset_key_driver_2 = redis_online_store.zset_key_bytes( @@ -371,7 +367,8 @@ def repo_config_before(redis_online_store_config): def test_multiple_sort_keys_not_supported( - repo_config: RepoConfig, redis_online_store: RedisOnlineStore + repo_config_without_docker_connection_string: RepoConfig, + redis_online_store: RedisOnlineStore, ): ( feature_view, @@ -383,7 +380,7 @@ def test_multiple_sort_keys_not_supported( match=r"Only one sort key is supported for Range query use cases in Redis, but found 2 sort keys in the", ): redis_online_store.online_write_batch( - config=repo_config, + config=repo_config_without_docker_connection_string, table=feature_view, data=data, progress=None, @@ -391,7 +388,8 @@ def test_multiple_sort_keys_not_supported( def test_non_numeric_sort_key_not_supported( - repo_config: RepoConfig, redis_online_store: RedisOnlineStore + repo_config_without_docker_connection_string: RepoConfig, + redis_online_store: RedisOnlineStore, ): ( feature_view, @@ -402,7 +400,7 @@ def test_non_numeric_sort_key_not_supported( TypeError, match=r"Unsupported sort key type STRING. Only numerics or timestamp" ): redis_online_store.online_write_batch( - config=repo_config, + config=repo_config_without_docker_connection_string, table=feature_view, data=data, progress=None, diff --git a/sdk/python/tests/unit/infra/online_store/test_valkey.py b/sdk/python/tests/unit/infra/online_store/test_valkey.py index 924da3c331d..4ef2a9bdf69 100644 --- a/sdk/python/tests/unit/infra/online_store/test_valkey.py +++ b/sdk/python/tests/unit/infra/online_store/test_valkey.py @@ -48,7 +48,7 @@ def base_repo_config_kwargs(): @pytest.fixture -def repo_config(base_repo_config_kwargs) -> RepoConfig: +def repo_config_without_docker_connection_string(base_repo_config_kwargs) -> RepoConfig: return RepoConfig( **base_repo_config_kwargs, online_store=EGValkeyOnlineStoreConfig( @@ -58,9 +58,7 @@ def repo_config(base_repo_config_kwargs) -> RepoConfig: @pytest.fixture -def repo_config_with_docker_connection_string( - valkey_online_store_config, base_repo_config_kwargs -) -> RepoConfig: +def repo_config(valkey_online_store_config, base_repo_config_kwargs) -> RepoConfig: return RepoConfig( **base_repo_config_kwargs, online_store=EGValkeyOnlineStoreConfig( @@ -71,7 +69,7 @@ def repo_config_with_docker_connection_string( @pytest.mark.docker def test_valkey_online_write_batch_with_timestamp_as_sortkey( - repo_config_with_docker_connection_string: RepoConfig, + repo_config: RepoConfig, valkey_online_store: EGValkeyOnlineStore, ): ( @@ -80,15 +78,13 @@ def test_valkey_online_write_batch_with_timestamp_as_sortkey( ) = _create_sorted_feature_view_with_timestamp_as_sortkey() valkey_online_store.online_write_batch( - config=repo_config_with_docker_connection_string, + config=repo_config, table=feature_view, data=data, progress=None, ) - connection_string = ( - repo_config_with_docker_connection_string.online_store.connection_string - ) + connection_string = repo_config.online_store.connection_string connection_string_split = connection_string.split(":") conn_dict = {} conn_dict["host"] = connection_string_split[0] @@ -104,9 +100,9 @@ def test_valkey_online_write_batch_with_timestamp_as_sortkey( ) redis_key_bin_driver_1 = _redis_key( - repo_config_with_docker_connection_string.project, + repo_config.project, entity_key_driver_1, - entity_key_serialization_version=repo_config_with_docker_connection_string.entity_key_serialization_version, + entity_key_serialization_version=repo_config.entity_key_serialization_version, ) zset_key_driver_1 = valkey_online_store.zset_key_bytes( @@ -118,9 +114,9 @@ def test_valkey_online_write_batch_with_timestamp_as_sortkey( entity_values=[ValueProto(int32_val=2)], ) redis_key_bin_driver_2 = _redis_key( - repo_config_with_docker_connection_string.project, + repo_config.project, entity_key_driver_2, - entity_key_serialization_version=repo_config_with_docker_connection_string.entity_key_serialization_version, + entity_key_serialization_version=repo_config.entity_key_serialization_version, ) zset_key_driver_2 = valkey_online_store.zset_key_bytes( @@ -164,7 +160,7 @@ def test_valkey_online_write_batch_with_timestamp_as_sortkey( @pytest.mark.docker def test_valkey_online_write_batch_with_float_as_sortkey( - repo_config_with_docker_connection_string: RepoConfig, + repo_config: RepoConfig, valkey_online_store: EGValkeyOnlineStore, ): ( @@ -173,15 +169,13 @@ def test_valkey_online_write_batch_with_float_as_sortkey( ) = _create_sorted_feature_view_with_float_as_sortkey() valkey_online_store.online_write_batch( - config=repo_config_with_docker_connection_string, + config=repo_config, table=feature_view, data=data, progress=None, ) - connection_string = ( - repo_config_with_docker_connection_string.online_store.connection_string - ) + connection_string = repo_config.online_store.connection_string connection_string_split = connection_string.split(":") conn_dict = {} conn_dict["host"] = connection_string_split[0] @@ -197,9 +191,9 @@ def test_valkey_online_write_batch_with_float_as_sortkey( ) redis_key_bin_driver_1 = _redis_key( - repo_config_with_docker_connection_string.project, + repo_config.project, entity_key_driver_1, - entity_key_serialization_version=repo_config_with_docker_connection_string.entity_key_serialization_version, + entity_key_serialization_version=repo_config.entity_key_serialization_version, ) zset_key_driver_1 = valkey_online_store.zset_key_bytes( @@ -211,9 +205,9 @@ def test_valkey_online_write_batch_with_float_as_sortkey( entity_values=[ValueProto(int32_val=2)], ) redis_key_bin_driver_2 = _redis_key( - repo_config_with_docker_connection_string.project, + repo_config.project, entity_key_driver_2, - entity_key_serialization_version=repo_config_with_docker_connection_string.entity_key_serialization_version, + entity_key_serialization_version=repo_config.entity_key_serialization_version, ) zset_key_driver_2 = valkey_online_store.zset_key_bytes( @@ -253,7 +247,8 @@ def test_valkey_online_write_batch_with_float_as_sortkey( def test_multiple_sort_keys_not_supported( - repo_config: RepoConfig, valkey_online_store: EGValkeyOnlineStore + repo_config_without_docker_connection_string: RepoConfig, + valkey_online_store: EGValkeyOnlineStore, ): ( feature_view, @@ -265,7 +260,7 @@ def test_multiple_sort_keys_not_supported( match=r"Only one sort key is supported for Range query use cases in Valkey, but found 2 sort keys in the", ): valkey_online_store.online_write_batch( - config=repo_config, + config=repo_config_without_docker_connection_string, table=feature_view, data=data, progress=None, @@ -273,7 +268,8 @@ def test_multiple_sort_keys_not_supported( def test_non_numeric_sort_key_not_supported( - repo_config: RepoConfig, valkey_online_store: EGValkeyOnlineStore + repo_config_without_docker_connection_string: RepoConfig, + valkey_online_store: EGValkeyOnlineStore, ): ( feature_view, @@ -284,7 +280,7 @@ def test_non_numeric_sort_key_not_supported( TypeError, match=r"Unsupported sort key type STRING. Only numerics or timestamp" ): valkey_online_store.online_write_batch( - config=repo_config, + config=repo_config_without_docker_connection_string, table=feature_view, data=data, progress=None,