diff --git a/go/internal/feast/featurestore.go b/go/internal/feast/featurestore.go index d2127c4d4f1..f15427bb232 100644 --- a/go/internal/feast/featurestore.go +++ b/go/internal/feast/featurestore.go @@ -62,11 +62,15 @@ func NewFeatureStore(config *registry.RepoConfig, callback transformation.Transf if err != nil { return nil, err } - sanitizedProjectName := strings.Replace(config.Project, "_", "-", -1) - productName := os.Getenv("PRODUCT") - endpoint := fmt.Sprintf("%s-transformations.%s.svc.cluster.local:80", sanitizedProjectName, productName) - transformationService, _ := transformation.NewGrpcTransformationService(config, endpoint) + var transformationService *transformation.GrpcTransformationService = nil + + if config.GoTransformationsServer { + if config.GoTransformationsEndpoint == "" { + return nil, errors.New("Transformations server endpoint is missing. Update featue_store.yaml with go_transformations_endpint configuration") + } + transformationService, _ = transformation.NewGrpcTransformationService(config, config.GoTransformationsEndpoint) + } return &FeatureStore{ config: config, registry: registry, diff --git a/go/internal/feast/registry/repoconfig.go b/go/internal/feast/registry/repoconfig.go index 2b140ad5da6..daddbcd9126 100644 --- a/go/internal/feast/registry/repoconfig.go +++ b/go/internal/feast/registry/repoconfig.go @@ -34,6 +34,11 @@ type RepoConfig struct { RepoPath string `json:"repo_path"` // EntityKeySerializationVersion EntityKeySerializationVersion int64 `json:"entity_key_serialization_version"` + // If false, use gopy bindings to calculate ODFV transformations. + // "True" value required for Go feature server to serve ODFVs with stability and at scale. + GoTransformationsServer bool `json:"go_transformations_server"` + // Transformation server base endpoint. + GoTransformationsEndpoint string `json:"go_transformations_endpoint"` } type RegistryConfig struct { diff --git a/go/internal/feast/registry/repoconfig_test.go b/go/internal/feast/registry/repoconfig_test.go index 540ffd0b6c7..cf48bd42246 100644 --- a/go/internal/feast/registry/repoconfig_test.go +++ b/go/internal/feast/registry/repoconfig_test.go @@ -220,3 +220,30 @@ func TestGetRegistryConfig_CacheTtlSecondsTypes(t *testing.T) { assert.Equal(t, int64(60), registryConfig.CacheTtlSeconds) } } + +func TestGoTransformationsEndpoint(t *testing.T) { + dir, err := os.MkdirTemp("", "feature_repo_*") + assert.Nil(t, err) + defer func() { + assert.Nil(t, os.RemoveAll(dir)) + }() + filePath := filepath.Join(dir, "feature_store.yaml") + data := []byte(` +registry: + path: data/registry.db +project: feature_repo +provider: local +online_store: + type: redis + connection_string: "localhost:6379" +go_transformations_endpoint: https://go.dev:9999 +go_transformations_server: True +`) + err = os.WriteFile(filePath, data, 0666) + assert.Nil(t, err) + config, err := NewRepoConfigFromFile(dir) + assert.Nil(t, err) + assert.Equal(t, dir, config.RepoPath) + assert.Equal(t, "https://go.dev:9999", config.GoTransformationsEndpoint) + assert.Equal(t, true, config.GoTransformationsServer) +} diff --git a/sdk/python/feast/repo_config.py b/sdk/python/feast/repo_config.py index 127dcdcfe8d..98fe84f2c0c 100644 --- a/sdk/python/feast/repo_config.py +++ b/sdk/python/feast/repo_config.py @@ -177,6 +177,18 @@ class RepoConfig(FeastBaseModel): go_feature_retrieval: Optional[bool] = False """ If True, use the embedded Go code to retrieve features instead of the Python SDK. """ + go_transformations_server: Optional[bool] = False + """ If True, use the transformations server to perform ODVF transformations in Go feature server. """ + + go_transformations_endpoint: Optional[StrictStr] = "" + """ Specify the endpoint for Go feature server to find the transformations server. + NOTE: Unless go_transformations_server is False, the Go feature server will throw errors if this is + blank or null. + """ + + go_transformations_port: Optional[StrictInt] = 80 + """ Specify the port for Go feature server to find the transformations server. """ + entity_key_serialization_version: StrictInt = 1 """ Entity key serialization version: This version is used to control what serialization scheme is used when writing data to the online store. @@ -221,6 +233,7 @@ def __init__(self, **data: Any): }, } self._offline_config = spark_offline_config + self._go_transformation_server = True self._online_store = None if "online_store" in data: