diff --git a/.gitignore b/.gitignore index cd2d973..adbfbd5 100644 --- a/.gitignore +++ b/.gitignore @@ -38,4 +38,7 @@ build/ .DS_Store ### CodeBuddy ### -.codebuddy/ \ No newline at end of file +.codebuddy/ + +### Subagent Configuration ### +.agents/ \ No newline at end of file diff --git a/lance-flink-1.17/pom.xml b/lance-flink-1.17/pom.xml new file mode 100644 index 0000000..0ccaaa4 --- /dev/null +++ b/lance-flink-1.17/pom.xml @@ -0,0 +1,244 @@ + + + 4.0.0 + + + org.apache.flink + lance-flink-root + 1.0.0-SNAPSHOT + ../pom.xml + + + lance-flink-1.17 + Lance Flink Connector - Flink 1.17 + Lance Flink Connector for Apache Flink 1.17 + jar + + + ${flink117.version} + + + + + + org.lance + lance-namespace-core + + + + org.lance + lance-namespace-apache-client + + + + com.lancedb + lance-core + + + + + org.apache.arrow + arrow-vector + + + + org.apache.arrow + arrow-memory-netty + + + + org.apache.arrow + arrow-c-data + + + + + org.apache.flink + flink-streaming-java + + + + org.apache.flink + flink-table-api-java + + + + org.apache.flink + flink-table-api-java-bridge + + + + org.apache.flink + flink-table-common + + + + org.apache.flink + flink-connector-base + + + + + org.slf4j + slf4j-api + + + + + org.junit.jupiter + junit-jupiter-api + + + + org.junit.jupiter + junit-jupiter-engine + + + + org.junit.jupiter + junit-jupiter-params + + + + org.apache.flink + flink-test-utils + + + + org.apache.flink + flink-runtime + test-jar + + + + org.apache.flink + flink-streaming-java + test-jar + + + + org.apache.flink + flink-table-planner_2.12 + + + + org.mockito + mockito-core + + + + org.mockito + mockito-junit-jupiter + + + + org.assertj + assertj-core + + + + org.apache.logging.log4j + log4j-slf4j-impl + + + + org.apache.logging.log4j + log4j-api + + + + org.apache.logging.log4j + log4j-core + + + + org.apache.flink + flink-clients + + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-source + generate-sources + + add-source + + + + ${project.basedir}/../src/main/java + + + + + add-test-source + generate-test-sources + + add-test-source + + + + ${project.basedir}/../src/test/java + + + + + add-resource + generate-resources + + add-resource + + + + + ${project.basedir}/../src/main/resources + + + + + + add-test-resource + generate-test-resources + + add-test-resource + + + + + ${project.basedir}/../src/test/resources + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + + org.apache.maven.plugins + maven-jar-plugin + + + + org.apache.maven.plugins + maven-source-plugin + + + + org.jacoco + jacoco-maven-plugin + + + + diff --git a/lance-flink-1.17/src/test/resources/log4j2-test.xml b/lance-flink-1.17/src/test/resources/log4j2-test.xml new file mode 100644 index 0000000..c73119e --- /dev/null +++ b/lance-flink-1.17/src/test/resources/log4j2-test.xml @@ -0,0 +1,15 @@ + + + + + + + + + + + + + + + diff --git a/lance-flink-1.18/pom.xml b/lance-flink-1.18/pom.xml new file mode 100644 index 0000000..098b5d2 --- /dev/null +++ b/lance-flink-1.18/pom.xml @@ -0,0 +1,244 @@ + + + 4.0.0 + + + org.apache.flink + lance-flink-root + 1.0.0-SNAPSHOT + ../pom.xml + + + lance-flink-1.18 + Lance Flink Connector - Flink 1.18 + Lance Flink Connector for Apache Flink 1.18 + jar + + + ${flink118.version} + + + + + + org.lance + lance-namespace-core + + + + org.lance + lance-namespace-apache-client + + + + com.lancedb + lance-core + + + + + org.apache.arrow + arrow-vector + + + + org.apache.arrow + arrow-memory-netty + + + + org.apache.arrow + arrow-c-data + + + + + org.apache.flink + flink-streaming-java + + + + org.apache.flink + flink-table-api-java + + + + org.apache.flink + flink-table-api-java-bridge + + + + org.apache.flink + flink-table-common + + + + org.apache.flink + flink-connector-base + + + + + org.slf4j + slf4j-api + + + + + org.junit.jupiter + junit-jupiter-api + + + + org.junit.jupiter + junit-jupiter-engine + + + + org.junit.jupiter + junit-jupiter-params + + + + org.apache.flink + flink-test-utils + + + + org.apache.flink + flink-runtime + test-jar + + + + org.apache.flink + flink-streaming-java + test-jar + + + + org.apache.flink + flink-table-planner_2.12 + + + + org.mockito + mockito-core + + + + org.mockito + mockito-junit-jupiter + + + + org.assertj + assertj-core + + + + org.apache.logging.log4j + log4j-slf4j-impl + + + + org.apache.logging.log4j + log4j-api + + + + org.apache.logging.log4j + log4j-core + + + + org.apache.flink + flink-clients + + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-source + generate-sources + + add-source + + + + ${project.basedir}/../src/main/java + + + + + add-test-source + generate-test-sources + + add-test-source + + + + ${project.basedir}/../src/test/java + + + + + add-resource + generate-resources + + add-resource + + + + + ${project.basedir}/../src/main/resources + + + + + + add-test-resource + generate-test-resources + + add-test-resource + + + + + ${project.basedir}/../src/test/resources + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + + org.apache.maven.plugins + maven-jar-plugin + + + + org.apache.maven.plugins + maven-source-plugin + + + + org.jacoco + jacoco-maven-plugin + + + + diff --git a/lance-flink-1.18/src/test/resources/log4j2-test.xml b/lance-flink-1.18/src/test/resources/log4j2-test.xml new file mode 100644 index 0000000..c73119e --- /dev/null +++ b/lance-flink-1.18/src/test/resources/log4j2-test.xml @@ -0,0 +1,15 @@ + + + + + + + + + + + + + + + diff --git a/lance-flink-1.19/pom.xml b/lance-flink-1.19/pom.xml new file mode 100644 index 0000000..a89cce8 --- /dev/null +++ b/lance-flink-1.19/pom.xml @@ -0,0 +1,243 @@ + + + 4.0.0 + + + org.apache.flink + lance-flink-root + 1.0.0-SNAPSHOT + ../pom.xml + + + lance-flink-1.19 + Lance Flink Connector - Flink 1.19 + Lance Flink Connector for Apache Flink 1.19 + jar + + + ${flink119.version} + + + + + + org.lance + lance-namespace-core + + + + org.lance + lance-namespace-apache-client + + + + com.lancedb + lance-core + + + + + org.apache.arrow + arrow-vector + + + + org.apache.arrow + arrow-memory-netty + + + + org.apache.arrow + arrow-c-data + + + + + org.apache.flink + flink-streaming-java + + + + org.apache.flink + flink-table-api-java + + + + org.apache.flink + flink-table-api-java-bridge + + + + org.apache.flink + flink-table-common + + + + org.apache.flink + flink-connector-base + + + + + org.slf4j + slf4j-api + + + + + org.junit.jupiter + junit-jupiter-api + + + + org.junit.jupiter + junit-jupiter-engine + + + + org.junit.jupiter + junit-jupiter-params + + + + org.apache.flink + flink-test-utils + + + + org.apache.flink + flink-runtime + test-jar + + + + org.apache.flink + flink-streaming-java + test-jar + + + + org.apache.flink + flink-table-planner_2.12 + + + + org.mockito + mockito-core + + + + org.mockito + mockito-junit-jupiter + + + + org.assertj + assertj-core + + + + org.apache.logging.log4j + log4j-slf4j-impl + + + + org.apache.logging.log4j + log4j-api + + + + org.apache.logging.log4j + log4j-core + + + + org.apache.flink + flink-clients + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-source + generate-sources + + add-source + + + + ${project.basedir}/../src/main/java + + + + + add-test-source + generate-test-sources + + add-test-source + + + + ${project.basedir}/../src/test/java + + + + + add-resource + generate-resources + + add-resource + + + + + ${project.basedir}/../src/main/resources + + + + + + add-test-resource + generate-test-resources + + add-test-resource + + + + + ${project.basedir}/../src/test/resources + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + + org.apache.maven.plugins + maven-jar-plugin + + + + org.apache.maven.plugins + maven-source-plugin + + + + org.jacoco + jacoco-maven-plugin + + + + diff --git a/lance-flink-1.19/src/test/resources/log4j2-test.xml b/lance-flink-1.19/src/test/resources/log4j2-test.xml new file mode 100644 index 0000000..c73119e --- /dev/null +++ b/lance-flink-1.19/src/test/resources/log4j2-test.xml @@ -0,0 +1,15 @@ + + + + + + + + + + + + + + + diff --git a/lance-flink-1.20/pom.xml b/lance-flink-1.20/pom.xml new file mode 100644 index 0000000..7be37ab --- /dev/null +++ b/lance-flink-1.20/pom.xml @@ -0,0 +1,243 @@ + + + 4.0.0 + + + org.apache.flink + lance-flink-root + 1.0.0-SNAPSHOT + ../pom.xml + + + lance-flink-1.20 + Lance Flink Connector - Flink 1.20 + Lance Flink Connector for Apache Flink 1.20 + jar + + + ${flink120.version} + + + + + + org.lance + lance-namespace-core + + + + org.lance + lance-namespace-apache-client + + + + com.lancedb + lance-core + + + + + org.apache.arrow + arrow-vector + + + + org.apache.arrow + arrow-memory-netty + + + + org.apache.arrow + arrow-c-data + + + + + org.apache.flink + flink-streaming-java + + + + org.apache.flink + flink-table-api-java + + + + org.apache.flink + flink-table-api-java-bridge + + + + org.apache.flink + flink-table-common + + + + org.apache.flink + flink-connector-base + + + + + org.slf4j + slf4j-api + + + + + org.junit.jupiter + junit-jupiter-api + + + + org.junit.jupiter + junit-jupiter-engine + + + + org.junit.jupiter + junit-jupiter-params + + + + org.apache.flink + flink-test-utils + + + + org.apache.flink + flink-runtime + test-jar + + + + org.apache.flink + flink-streaming-java + test-jar + + + + org.apache.flink + flink-table-planner_2.12 + + + + org.mockito + mockito-core + + + + org.mockito + mockito-junit-jupiter + + + + org.assertj + assertj-core + + + + org.apache.logging.log4j + log4j-slf4j-impl + + + + org.apache.logging.log4j + log4j-api + + + + org.apache.logging.log4j + log4j-core + + + + org.apache.flink + flink-clients + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-source + generate-sources + + add-source + + + + ${project.basedir}/../src/main/java + + + + + add-test-source + generate-test-sources + + add-test-source + + + + ${project.basedir}/../src/test/java + + + + + add-resource + generate-resources + + add-resource + + + + + ${project.basedir}/../src/main/resources + + + + + + add-test-resource + generate-test-resources + + add-test-resource + + + + + ${project.basedir}/../src/test/resources + + + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + + org.apache.maven.plugins + maven-jar-plugin + + + + org.apache.maven.plugins + maven-source-plugin + + + + org.jacoco + jacoco-maven-plugin + + + + diff --git a/lance-flink-1.20/src/test/resources/log4j2-test.xml b/lance-flink-1.20/src/test/resources/log4j2-test.xml new file mode 100644 index 0000000..c73119e --- /dev/null +++ b/lance-flink-1.20/src/test/resources/log4j2-test.xml @@ -0,0 +1,15 @@ + + + + + + + + + + + + + + + diff --git a/pom.xml b/pom.xml index d78486c..3823539 100644 --- a/pom.xml +++ b/pom.xml @@ -4,24 +4,39 @@ 4.0.0 org.apache.flink - flink-connector-lance + lance-flink-root 1.0.0-SNAPSHOT - jar + pom - Flink Connector Lance + Lance Flink Connector Apache Flink Connector for Lance vector data format - https://github.com/apache/flink-connector-lance + https://github.com/lance-format/lance-flink UTF-8 UTF-8 1.8 1.8 - - - 1.16.1 + + + 1.0.0-SNAPSHOT + + 0.23.3 + 0.5.4 + + 14.0.0 + + + 1.17.2 + 1.18.1 + 1.19.1 + 1.20.0 + + ${flink117.version} + + 5.9.3 1.7.36 2.20.0 @@ -29,332 +44,422 @@ 3.24.2 - - - - - org.apache.flink - flink-streaming-java - ${flink.version} - provided - - - - - org.apache.flink - flink-table-api-java - ${flink.version} - provided - - - - - org.apache.flink - flink-table-api-java-bridge - ${flink.version} - provided - - - - - org.apache.flink - flink-table-common - ${flink.version} - provided - - - - - org.apache.flink - flink-connector-base - ${flink.version} - provided - - - - - com.lancedb - lance-core - ${lance.version} - - - - - org.apache.arrow - arrow-vector - ${arrow.version} - - - - org.apache.arrow - arrow-memory-netty - ${arrow.version} - - - - org.apache.arrow - arrow-c-data - ${arrow.version} - - - - - org.slf4j - slf4j-api - ${slf4j.version} - - - - - - org.junit.jupiter - junit-jupiter-api - ${junit.version} - test - - - - org.junit.jupiter - junit-jupiter-engine - ${junit.version} - test - - - - org.junit.jupiter - junit-jupiter-params - ${junit.version} - test - - - - - org.apache.flink - flink-test-utils - ${flink.version} - test - - - - org.apache.flink - flink-runtime - ${flink.version} - test-jar - test - - - - org.apache.flink - flink-streaming-java - ${flink.version} - test-jar - test - - - - - org.apache.flink - flink-table-planner_2.12 - ${flink.version} - test - - - - - org.mockito - mockito-core - ${mockito.version} - test - - - - org.mockito - mockito-junit-jupiter - ${mockito.version} - test - - - - - org.assertj - assertj-core - ${assertj.version} - test - - - - - org.apache.logging.log4j - log4j-slf4j-impl - ${log4j.version} - test - - - - org.apache.logging.log4j - log4j-api - ${log4j.version} - test - - - - org.apache.logging.log4j - log4j-core - ${log4j.version} - test - - - - - org.apache.flink - flink-clients - ${flink.version} - test - - + + + + + org.lance + lance-namespace-core + ${lance-namespace.version} + + + + org.lance + lance-namespace-apache-client + ${lance-namespace.version} + + + + com.lancedb + lance-core + ${lance.version} + + + + + org.apache.arrow + arrow-vector + ${arrow.version} + + + + org.apache.arrow + arrow-memory-netty + ${arrow.version} + + + + org.apache.arrow + arrow-c-data + ${arrow.version} + + + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + + org.apache.flink + flink-table-api-java + ${flink.version} + provided + + + + org.apache.flink + flink-table-api-java-bridge + ${flink.version} + provided + + + + org.apache.flink + flink-table-common + ${flink.version} + provided + + + + org.apache.flink + flink-connector-base + ${flink.version} + provided + + + + + org.slf4j + slf4j-api + ${slf4j.version} + + + + + org.junit.jupiter + junit-jupiter-api + ${junit.version} + test + + + + org.junit.jupiter + junit-jupiter-engine + ${junit.version} + test + + + + org.junit.jupiter + junit-jupiter-params + ${junit.version} + test + + + + org.apache.flink + flink-test-utils + ${flink.version} + test + + + + org.apache.flink + flink-runtime + ${flink.version} + test-jar + test + + + + org.apache.flink + flink-streaming-java + ${flink.version} + test-jar + test + + + + org.apache.flink + flink-table-planner_2.12 + ${flink.version} + test + + + + org.mockito + mockito-core + ${mockito.version} + test + + + + org.mockito + mockito-junit-jupiter + ${mockito.version} + test + + + + org.assertj + assertj-core + ${assertj.version} + test + + + + org.apache.logging.log4j + log4j-slf4j-impl + ${log4j.version} + test + + + + org.apache.logging.log4j + log4j-api + ${log4j.version} + test + + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + test + + + + org.apache.flink + flink-clients + ${flink.version} + test + + + - - - - org.apache.maven.plugins - maven-compiler-plugin - 3.11.0 - - ${maven.compiler.source} - ${maven.compiler.target} - - -Xlint:all - - - - - - - org.apache.maven.plugins - maven-surefire-plugin - 3.1.2 - - - - - - - org.apache.maven.plugins - maven-shade-plugin - 3.5.0 - - - package - - shade - - - - - org.apache.flink:flink-streaming-java - org.apache.flink:flink-table-api-java - org.apache.flink:flink-table-api-java-bridge - org.apache.flink:flink-table-common - org.apache.flink:flink-connector-base - org.slf4j:* - - - - - *:* + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.11.0 + + ${maven.compiler.source} + ${maven.compiler.target} + + -Xlint:all + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 3.1.2 + + + + + org.codehaus.mojo + build-helper-maven-plugin + 3.4.0 + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.5.0 + + + package + + shade + + + - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA + org.apache.flink:flink-streaming-java + org.apache.flink:flink-table-api-java + org.apache.flink:flink-table-api-java-bridge + org.apache.flink:flink-table-common + org.apache.flink:flink-connector-base + org.slf4j:* - - - - - - - - - org.apache.arrow - org.apache.flink.connector.lance.shaded.arrow - - - - - - + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + + + + + + + org.apache.arrow + org.apache.flink.connector.lance.shaded.arrow + + + + + + + + + + org.apache.maven.plugins + maven-jar-plugin + 3.3.0 + + + + true + true + + + + + + + + org.apache.maven.plugins + maven-source-plugin + 3.3.0 + + + attach-sources + + jar-no-fork + + + + + + + + org.apache.maven.plugins + maven-javadoc-plugin + 3.5.0 + + none + + + + + + org.jacoco + jacoco-maven-plugin + 0.8.10 + + + + prepare-agent + + + + report + test + + report + + + + + + + + org.apache.maven.plugins + maven-enforcer-plugin + 3.3.0 + + + enforce-maven + + enforce + + + + + 3.6.3 + + + [1.8,) + + + + + + + + + + + com.diffplug.spotless + spotless-maven-plugin + 2.40.0 + + + + 1.7 + + + + + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + + + + - + org.apache.maven.plugins - maven-jar-plugin - 3.3.0 - - - - true - true - - - + maven-compiler-plugin - - org.apache.maven.plugins - maven-source-plugin - 3.3.0 - - - attach-sources - - jar-no-fork - - - + maven-surefire-plugin - - org.apache.maven.plugins - maven-javadoc-plugin - 3.5.0 - - none - - - - - - org.jacoco - jacoco-maven-plugin - 0.8.10 - - - - prepare-agent - - - - report - test - - report - - - + maven-enforcer-plugin + + lance-flink-1.17 + lance-flink-1.18 + lance-flink-1.19 + lance-flink-1.20 + + - central https://repo.maven.apache.org/maven2 - apache-snapshots https://repository.apache.org/content/repositories/snapshots/ @@ -366,4 +471,5 @@ + \ No newline at end of file diff --git a/src/main/java/org/apache/flink/connector/lance/catalog/namespace/AbstractLanceNamespaceAdapter.java b/src/main/java/org/apache/flink/connector/lance/catalog/namespace/AbstractLanceNamespaceAdapter.java new file mode 100644 index 0000000..dc46c3b --- /dev/null +++ b/src/main/java/org/apache/flink/connector/lance/catalog/namespace/AbstractLanceNamespaceAdapter.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.lance.catalog.namespace; + +import java.util.List; +import java.util.Map; + +/** + * Abstract adapter interface for Lance Namespace operations. + * + * This interface defines the contract for implementing namespace-based catalog operations, + * allowing for different backend implementations (directory-based, REST-based, etc.). + */ +public interface AbstractLanceNamespaceAdapter extends AutoCloseable { + + /** + * Initialize the adapter. + */ + void init(); + + /** + * List all namespaces at root level. + */ + List listNamespaces(); + + /** + * List namespaces under a parent namespace. + */ + List listNamespaces(String... parentNamespace); + + /** + * Check if a namespace exists. + */ + boolean namespaceExists(String... namespaceId); + + /** + * Create a namespace. + */ + void createNamespace(Map properties, String... namespaceId); + + /** + * Drop a namespace. + */ + void dropNamespace(boolean cascade, String... namespaceId); + + /** + * Get namespace metadata. + */ + Map getNamespaceMetadata(String... namespaceId); + + /** + * List tables in a namespace. + */ + List listTables(String... namespaceId); + + /** + * Check if a table exists. + */ + boolean tableExists(String... tableId); + + /** + * Create an empty table. + */ + void createEmptyTable(String location, Map properties, String... tableId); + + /** + * Drop a table. + */ + void dropTable(String... tableId); + + /** + * Get table metadata. + */ + TableMetadata getTableMetadata(String... tableId); + + /** + * Table metadata holder. + */ + class TableMetadata { + private final String location; + private final Map storageOptions; + + public TableMetadata(String location, Map storageOptions) { + this.location = location; + this.storageOptions = storageOptions; + } + + public String getLocation() { + return location; + } + + public Map getStorageOptions() { + return storageOptions; + } + } +} diff --git a/src/main/java/org/apache/flink/connector/lance/catalog/namespace/BaseLanceNamespaceCatalog.java b/src/main/java/org/apache/flink/connector/lance/catalog/namespace/BaseLanceNamespaceCatalog.java new file mode 100644 index 0000000..24254c1 --- /dev/null +++ b/src/main/java/org/apache/flink/connector/lance/catalog/namespace/BaseLanceNamespaceCatalog.java @@ -0,0 +1,576 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.lance.catalog.namespace; + +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.catalog.CatalogBaseTable; +import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.CatalogPartition; +import org.apache.flink.table.catalog.CatalogFunction; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.CatalogPartitionSpec; +import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; +import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException; +import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.catalog.exceptions.PartitionAlreadyExistsException; +import org.apache.flink.table.catalog.exceptions.PartitionNotExistException; +import org.lance.namespace.LanceNamespace; +import org.lance.namespace.model.CreateNamespaceRequest; +import org.lance.namespace.model.CreateEmptyTableRequest; +import org.lance.namespace.model.DescribeNamespaceRequest; +import org.lance.namespace.model.DescribeNamespaceResponse; +import org.lance.namespace.model.DescribeTableRequest; +import org.lance.namespace.model.DescribeTableResponse; +import org.lance.namespace.model.DropNamespaceRequest; +import org.lance.namespace.model.DropTableRequest; +import org.lance.namespace.model.ListNamespacesRequest; +import org.lance.namespace.model.ListNamespacesResponse; +import org.lance.namespace.model.ListTablesRequest; +import org.lance.namespace.model.ListTablesResponse; +import org.lance.namespace.model.NamespaceExistsRequest; +import org.lance.namespace.model.TableExistsRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +/** + * Base Flink Catalog built on top of Lance Namespace. + * + *

Delegates namespace/table operations directly to the {@link LanceNamespace} API, + * while using {@link LanceNamespaceAdapter} to manage connection lifecycle.

+ */ +public abstract class BaseLanceNamespaceCatalog extends AbstractCatalog { + + private static final Logger LOG = LoggerFactory.getLogger(BaseLanceNamespaceCatalog.class); + + protected final LanceNamespaceAdapter namespaceAdapter; + protected final LanceNamespaceConfig config; + protected final Optional extraLevel; + protected final Optional parentPrefix; + + public BaseLanceNamespaceCatalog(String catalogName, LanceNamespaceAdapter adapter, LanceNamespaceConfig config) { + super(catalogName, "default"); + + this.namespaceAdapter = Objects.requireNonNull(adapter, "Namespace adapter cannot be null"); + this.config = Objects.requireNonNull(config, "Configuration cannot be null"); + + LOG.info("Initializing BaseLanceNamespaceCatalog: {}", catalogName); + + // Configure extra level + if (config.getExtraLevel().isPresent()) { + this.extraLevel = config.getExtraLevel(); + } else if (config.isDirectoryNamespace()) { + this.extraLevel = Optional.of("default"); + } else { + this.extraLevel = Optional.empty(); + } + + // Configure parent prefix + this.parentPrefix = config.getParentArray(); + + LOG.info("Catalog configuration - impl: {}, extraLevel: {}, parentPrefix: {}", + config.getImpl(), extraLevel, parentPrefix); + } + + /** + * Returns the underlying LanceNamespace instance. + */ + protected LanceNamespace namespace() { + return namespaceAdapter.getNamespace(); + } + + // ========== Database Operations ========== + + @Override + public void createDatabase(String name, CatalogDatabase database, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + + LOG.info("Creating database: {} (ignoreIfExists={})", name, ignoreIfExists); + + try { + if (databaseExists(name)) { + if (ignoreIfExists) { + LOG.info("Database already exists, skipping creation: {}", name); + return; + } else { + throw new DatabaseAlreadyExistException(getName(), name); + } + } + + String[] namespacePath = transformDatabaseNameToNamespace(name); + CreateNamespaceRequest request = new CreateNamespaceRequest(); + request.setId(Arrays.asList(namespacePath)); + Map properties = database.getProperties(); + if (properties != null) { + request.setProperties(properties); + } + namespace().createNamespace(request); + + LOG.info("Database created successfully: {}", name); + } catch (DatabaseAlreadyExistException e) { + throw e; + } catch (Exception e) { + LOG.error("Failed to create database: {}", name, e); + throw new CatalogException("Failed to create database: " + name, e); + } + } + + @Override + public void dropDatabase(String name, boolean ignoreIfNotExists, boolean cascade) + throws DatabaseNotExistException, CatalogException { + + LOG.info("Dropping database: {} (cascade={})", name, cascade); + + try { + if (!databaseExists(name)) { + if (ignoreIfNotExists) { + LOG.info("Database does not exist, skipping drop: {}", name); + return; + } else { + throw new DatabaseNotExistException(getName(), name); + } + } + + String[] namespacePath = transformDatabaseNameToNamespace(name); + DropNamespaceRequest request = new DropNamespaceRequest(); + request.setId(Arrays.asList(namespacePath)); + request.setBehavior(cascade ? "CASCADE" : "RESTRICT"); + namespace().dropNamespace(request); + + LOG.info("Database dropped successfully: {}", name); + } catch (DatabaseNotExistException e) { + throw e; + } catch (Exception e) { + LOG.error("Failed to drop database: {}", name, e); + throw new CatalogException("Failed to drop database: " + name, e); + } + } + + @Override + public List listDatabases() throws CatalogException { + LOG.debug("Listing databases"); + + try { + ListNamespacesRequest request = new ListNamespacesRequest(); + ListNamespacesResponse response = namespace().listNamespaces(request); + if (response.getNamespaces() != null) { + Set namespaceSet = response.getNamespaces(); + return new ArrayList<>(namespaceSet); + } + return new ArrayList<>(); + } catch (Exception e) { + LOG.error("Failed to list databases", e); + throw new CatalogException("Failed to list databases", e); + } + } + + @Override + public CatalogDatabase getDatabase(String name) + throws DatabaseNotExistException, CatalogException { + + LOG.debug("Getting database: {}", name); + + try { + if (!databaseExists(name)) { + throw new DatabaseNotExistException(getName(), name); + } + + String[] namespacePath = transformDatabaseNameToNamespace(name); + DescribeNamespaceRequest request = new DescribeNamespaceRequest(); + request.setId(Arrays.asList(namespacePath)); + DescribeNamespaceResponse response = namespace().describeNamespace(request); + Map metadata = response.getProperties() != null + ? response.getProperties() : new HashMap<>(); + + return new org.apache.flink.table.catalog.CatalogDatabaseImpl(metadata, ""); + } catch (DatabaseNotExistException e) { + throw e; + } catch (Exception e) { + LOG.error("Failed to get database: {}", name, e); + throw new CatalogException("Failed to get database: " + name, e); + } + } + + @Override + public boolean databaseExists(String name) { + LOG.debug("Checking if database exists: {}", name); + + try { + String[] namespacePath = transformDatabaseNameToNamespace(name); + NamespaceExistsRequest request = new NamespaceExistsRequest(); + request.setId(Arrays.asList(namespacePath)); + namespace().namespaceExists(request); + return true; + } catch (Exception e) { + LOG.debug("Database does not exist: {}", name); + return false; + } + } + + // ========== Table Operations ========== + + @Override + public void createTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + + LOG.info("Creating table: {} (ignoreIfExists={})", tablePath, ignoreIfExists); + + try { + String dbName = tablePath.getDatabaseName(); + String tblName = tablePath.getObjectName(); + + if (!databaseExists(dbName)) { + throw new DatabaseNotExistException(getName(), dbName); + } + + if (tableExists(tablePath)) { + if (ignoreIfExists) { + LOG.info("Table already exists, skipping creation: {}", tablePath); + return; + } else { + throw new TableAlreadyExistException(getName(), tablePath); + } + } + + String[] tableId = transformTableNameToId(dbName, tblName); + CreateEmptyTableRequest request = new CreateEmptyTableRequest(); + request.setId(Arrays.asList(tableId)); + Map properties = table.getOptions(); + if (properties != null) { + request.setProperties(properties); + } + namespace().createEmptyTable(request); + + LOG.info("Table created successfully: {}", tablePath); + } catch (TableAlreadyExistException | DatabaseNotExistException e) { + throw e; + } catch (Exception e) { + LOG.error("Failed to create table: {}", tablePath, e); + throw new CatalogException("Failed to create table: " + tablePath, e); + } + } + + @Override + public void dropTable(ObjectPath tablePath, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + + LOG.info("Dropping table: {}", tablePath); + + try { + if (!tableExists(tablePath)) { + if (ignoreIfNotExists) { + LOG.info("Table does not exist, skipping drop: {}", tablePath); + return; + } else { + throw new TableNotExistException(getName(), tablePath); + } + } + + String[] tableId = transformTableNameToId(tablePath.getDatabaseName(), tablePath.getObjectName()); + DropTableRequest request = new DropTableRequest(); + request.setId(Arrays.asList(tableId)); + namespace().dropTable(request); + + LOG.info("Table dropped successfully: {}", tablePath); + } catch (TableNotExistException e) { + throw e; + } catch (Exception e) { + LOG.error("Failed to drop table: {}", tablePath, e); + throw new CatalogException("Failed to drop table: " + tablePath, e); + } + } + + @Override + public List listTables(String databaseName) + throws DatabaseNotExistException, CatalogException { + + LOG.debug("Listing tables in database: {}", databaseName); + + try { + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException(getName(), databaseName); + } + + String[] namespacePath = transformDatabaseNameToNamespace(databaseName); + ListTablesRequest request = new ListTablesRequest(); + request.setId(Arrays.asList(namespacePath)); + ListTablesResponse response = namespace().listTables(request); + if (response.getTables() != null) { + Set tableSet = response.getTables(); + return new ArrayList<>(tableSet); + } + return new ArrayList<>(); + } catch (DatabaseNotExistException e) { + throw e; + } catch (Exception e) { + LOG.error("Failed to list tables in database: {}", databaseName, e); + throw new CatalogException("Failed to list tables in database: " + databaseName, e); + } + } + + @Override + public CatalogTable getTable(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + + LOG.debug("Getting table: {}", tablePath); + + try { + if (!tableExists(tablePath)) { + throw new TableNotExistException(getName(), tablePath); + } + + String[] tableId = transformTableNameToId(tablePath.getDatabaseName(), tablePath.getObjectName()); + DescribeTableRequest request = new DescribeTableRequest(); + request.setId(Arrays.asList(tableId)); + DescribeTableResponse response = namespace().describeTable(request); + + String location = response.getLocation(); + Map options = response.getProperties() != null + ? response.getProperties() : new HashMap<>(); + LanceNamespaceAdapter.TableMetadata metadata = new LanceNamespaceAdapter.TableMetadata(location, options); + + return createCatalogTable(tablePath.getDatabaseName(), tablePath.getObjectName(), metadata); + } catch (TableNotExistException e) { + throw e; + } catch (Exception e) { + LOG.error("Failed to get table: {}", tablePath, e); + throw new CatalogException("Failed to get table: " + tablePath, e); + } + } + + @Override + public boolean tableExists(ObjectPath tablePath) { + LOG.debug("Checking if table exists: {}", tablePath); + + try { + String[] tableId = transformTableNameToId(tablePath.getDatabaseName(), tablePath.getObjectName()); + TableExistsRequest request = new TableExistsRequest(); + request.setId(Arrays.asList(tableId)); + namespace().tableExists(request); + return true; + } catch (Exception e) { + LOG.debug("Table does not exist: {}", tablePath); + return false; + } + } + + // ========== Abstract method to be implemented by subclasses ========== + + protected abstract CatalogTable createCatalogTable( + String databaseName, + String tableName, + LanceNamespaceAdapter.TableMetadata metadata) throws CatalogException; + + // ========== Helper methods ========== + + protected String[] transformDatabaseNameToNamespace(String databaseName) { + String[] baseNamespace = new String[] {databaseName}; + + if (parentPrefix.isPresent()) { + String[] parent = parentPrefix.get(); + String[] result = new String[parent.length + baseNamespace.length]; + System.arraycopy(parent, 0, result, 0, parent.length); + System.arraycopy(baseNamespace, 0, result, parent.length, baseNamespace.length); + return result; + } else if (extraLevel.isPresent()) { + String[] result = new String[baseNamespace.length + 1]; + result[0] = extraLevel.get(); + System.arraycopy(baseNamespace, 0, result, 1, baseNamespace.length); + return result; + } else { + return baseNamespace; + } + } + + protected String[] transformTableNameToId(String databaseName, String tableName) { + String[] dbPath = transformDatabaseNameToNamespace(databaseName); + String[] result = new String[dbPath.length + 1]; + System.arraycopy(dbPath, 0, result, 0, dbPath.length); + result[dbPath.length] = tableName; + return result; + } + + // ========== Not implemented - Partition operations not supported ========== + + @Override + public void createPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, + CatalogPartition partition, boolean ignoreIfExists) + throws TableNotExistException, PartitionAlreadyExistsException, CatalogException { + throw new CatalogException("Partition operations are not supported"); + } + + @Override + public void dropPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, + boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new CatalogException("Partition operations are not supported"); + } + + @Override + public List listPartitions(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + throw new CatalogException("Partition operations are not supported"); + } + + @Override + public CatalogPartition getPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException, CatalogException { + throw new CatalogException("Partition operations are not supported"); + } + + @Override + public boolean partitionExists(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws CatalogException { + throw new CatalogException("Partition operations are not supported"); + } + + @Override + public void alterPartition(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, + CatalogPartition newPartition, boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new CatalogException("Partition operations are not supported"); + } + + @Override + public List listPartitionsByFilter(ObjectPath tablePath, List filters) + throws CatalogException { + throw new CatalogException("Partition operations are not supported"); + } + + @Override + public void alterTableStatistics(ObjectPath tablePath, CatalogTableStatistics tableStatistics, + boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + LOG.debug("Alter table statistics not supported: {}", tablePath); + } + + @Override + public void alterPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, + CatalogTableStatistics partitionStatistics, boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new CatalogException("Partition operations are not supported"); + } + + @Override + public void alterTableColumnStatistics(ObjectPath tablePath, CatalogColumnStatistics columnStatistics, + boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + LOG.debug("Alter table column statistics not supported: {}", tablePath); + } + + @Override + public void alterPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec, + CatalogColumnStatistics columnStatistics, boolean ignoreIfNotExists) + throws PartitionNotExistException, CatalogException { + throw new CatalogException("Partition operations are not supported"); + } + + @Override + public CatalogTableStatistics getTableStatistics(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + LOG.debug("Get table statistics not supported: {}", tablePath); + return null; + } + + @Override + public CatalogColumnStatistics getTableColumnStatistics(ObjectPath tablePath) + throws TableNotExistException, CatalogException { + LOG.debug("Get table column statistics not supported: {}", tablePath); + return null; + } + + @Override + public CatalogTableStatistics getPartitionStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException, CatalogException { + throw new CatalogException("Partition operations are not supported"); + } + + @Override + public CatalogColumnStatistics getPartitionColumnStatistics(ObjectPath tablePath, CatalogPartitionSpec partitionSpec) + throws PartitionNotExistException, CatalogException { + throw new CatalogException("Partition operations are not supported"); + } + + @Override + public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + LOG.debug("Alter table not supported: {}", tablePath); + } + + @Override + public void renameTable(ObjectPath tablePath, String newTableName, boolean ignoreIfNotExists) + throws CatalogException { + LOG.debug("Rename table not supported: {} -> {}", tablePath, newTableName); + } + + @Override + public void alterDatabase(String name, CatalogDatabase newDatabase, boolean ignoreIfNotExists) + throws DatabaseNotExistException, CatalogException { + LOG.debug("Alter database not supported: {}", name); + } + + @Override + public void dropFunction(ObjectPath functionPath, boolean ignoreIfNotExists) + throws CatalogException { + throw new CatalogException("Function operations are not supported"); + } + + @Override + public void createFunction(ObjectPath functionPath, CatalogFunction function, boolean ignoreIfExists) + throws CatalogException { + throw new CatalogException("Function operations are not supported"); + } + + @Override + public void alterFunction(ObjectPath functionPath, CatalogFunction newFunction, boolean ignoreIfNotExists) + throws CatalogException { + throw new CatalogException("Function operations are not supported"); + } + + @Override + public List listFunctions(String databaseName) + throws CatalogException { + throw new CatalogException("Function operations are not supported"); + } + + @Override + public CatalogFunction getFunction(ObjectPath functionPath) + throws CatalogException { + throw new CatalogException("Function operations are not supported"); + } + + @Override + public boolean functionExists(ObjectPath functionPath) + throws CatalogException { + throw new CatalogException("Function operations are not supported"); + } +} diff --git a/src/main/java/org/apache/flink/connector/lance/catalog/namespace/LanceCatalogFactory.java b/src/main/java/org/apache/flink/connector/lance/catalog/namespace/LanceCatalogFactory.java new file mode 100644 index 0000000..4a2145a --- /dev/null +++ b/src/main/java/org/apache/flink/connector/lance/catalog/namespace/LanceCatalogFactory.java @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.lance.catalog.namespace; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Factory for creating and managing Lance Namespace connections. + * + *

Provides multiple ways to create {@link LanceNamespaceAdapter} instances, + * supporting different backend implementations such as directory mode and REST mode.

+ * + *

Example usage:

+ *
+ * LanceCatalogFactory factory = new LanceCatalogFactory();
+ * LanceNamespaceConfig config = LanceNamespaceConfig.builder()
+ *     .impl("dir")
+ *     .root("/data/lance")
+ *     .build();
+ * LanceNamespaceAdapter adapter = factory.createAdapter(config);
+ * // Use the LanceNamespace API directly
+ * LanceNamespace ns = adapter.getNamespace();
+ * 
+ */ +public class LanceCatalogFactory { + + private static final Logger LOG = LoggerFactory.getLogger(LanceCatalogFactory.class); + + private volatile BufferAllocator sharedAllocator; + + /** + * Creates a factory with a shared BufferAllocator. + */ + public LanceCatalogFactory() { + this.sharedAllocator = new RootAllocator(); + LOG.info("Created LanceCatalogFactory with shared RootAllocator"); + } + + /** + * Creates a factory with a custom BufferAllocator. + */ + public LanceCatalogFactory(BufferAllocator allocator) { + this.sharedAllocator = Objects.requireNonNull(allocator, "Allocator cannot be null"); + LOG.info("Created LanceCatalogFactory with custom allocator"); + } + + /** + * Creates a LanceNamespaceAdapter from the given configuration. + * + * @param config namespace configuration + * @return a configured LanceNamespaceAdapter + */ + public LanceNamespaceAdapter createAdapter(LanceNamespaceConfig config) { + LOG.info("Creating LanceNamespaceAdapter with config"); + + Objects.requireNonNull(config, "Configuration cannot be null"); + + try { + Map properties = new HashMap<>(); + properties.put(LanceNamespaceConfig.KEY_IMPL, config.getImpl()); + if (config.getRoot().isPresent()) { + properties.put(LanceNamespaceConfig.KEY_ROOT, config.getRoot().get()); + } + if (config.getUri().isPresent()) { + properties.put(LanceNamespaceConfig.KEY_URI, config.getUri().get()); + } + if (config.getExtraLevel().isPresent()) { + properties.put(LanceNamespaceConfig.KEY_EXTRA_LEVEL, config.getExtraLevel().get()); + } + if (config.getParent().isPresent()) { + properties.put(LanceNamespaceConfig.KEY_PARENT, config.getParent().get()); + } + + return LanceNamespaceAdapter.create(properties); + } catch (Exception e) { + LOG.error("Failed to create adapter", e); + throw new RuntimeException("Failed to create LanceNamespaceAdapter", e); + } + } + + /** + * Creates a LanceNamespaceAdapter from the given properties map. + * + * @param properties properties map + * @return a configured LanceNamespaceAdapter + */ + public LanceNamespaceAdapter createAdapter(Map properties) { + LOG.info("Creating LanceNamespaceAdapter from properties"); + + Objects.requireNonNull(properties, "Properties cannot be null"); + + LanceNamespaceConfig config = LanceNamespaceConfig.from(properties); + return createAdapter(config); + } + + /** + * Creates a directory-mode LanceNamespaceAdapter. + * + * @param rootPath root directory path + * @return a configured LanceNamespaceAdapter + */ + public LanceNamespaceAdapter createDirectoryAdapter(String rootPath) { + LOG.info("Creating directory namespace adapter with root: {}", rootPath); + + Objects.requireNonNull(rootPath, "Root path cannot be null"); + + LanceNamespaceConfig config = LanceNamespaceConfig.builder() + .impl("dir") + .root(rootPath) + .build(); + + return createAdapter(config); + } + + /** + * Creates a REST-mode LanceNamespaceAdapter. + * + * @param uri REST service URI + * @return a configured LanceNamespaceAdapter + */ + public LanceNamespaceAdapter createRestAdapter(String uri) { + LOG.info("Creating REST namespace adapter with URI: {}", uri); + + Objects.requireNonNull(uri, "URI cannot be null"); + + LanceNamespaceConfig config = LanceNamespaceConfig.builder() + .impl("rest") + .uri(uri) + .build(); + + return createAdapter(config); + } + + /** + * Returns the shared BufferAllocator. + */ + public BufferAllocator getSharedAllocator() { + return sharedAllocator; + } + + /** + * Closes the factory and releases resources. + */ + public void close() { + LOG.info("Closing LanceCatalogFactory"); + try { + if (sharedAllocator != null) { + sharedAllocator.close(); + } + } catch (Exception e) { + LOG.warn("Error closing shared allocator", e); + } + } +} diff --git a/src/main/java/org/apache/flink/connector/lance/catalog/namespace/LanceNamespaceAdapter.java b/src/main/java/org/apache/flink/connector/lance/catalog/namespace/LanceNamespaceAdapter.java new file mode 100644 index 0000000..76ce472 --- /dev/null +++ b/src/main/java/org/apache/flink/connector/lance/catalog/namespace/LanceNamespaceAdapter.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.lance.catalog.namespace; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.lance.namespace.LanceNamespace; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Lance Namespace connection manager. + * + *

Responsible for creating and managing the lifecycle of {@link LanceNamespace} instances + * (connection initialization and resource cleanup). This class does not wrap any business methods + * of LanceNamespace; callers should use {@link #getNamespace()} to obtain the instance and invoke + * the LanceNamespace native API directly.

+ */ +public class LanceNamespaceAdapter implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(LanceNamespaceAdapter.class); + + private final BufferAllocator allocator; + private final LanceNamespaceConfig config; + private LanceNamespace namespace; + + public LanceNamespaceAdapter(BufferAllocator allocator, LanceNamespaceConfig config) { + this.allocator = Objects.requireNonNull(allocator, "Allocator cannot be null"); + this.config = Objects.requireNonNull(config, "Config cannot be null"); + } + + /** + * Creates an Adapter instance from a properties map. + */ + public static LanceNamespaceAdapter create(Map properties) { + LanceNamespaceConfig config = LanceNamespaceConfig.from(properties); + BufferAllocator allocator = new RootAllocator(); + return new LanceNamespaceAdapter(allocator, config); + } + + /** + * Initializes the LanceNamespace connection. + */ + public void init() { + try { + if (namespace != null) { + return; + } + + Map properties = new HashMap<>(); + properties.put(LanceNamespaceConfig.KEY_IMPL, config.getImpl()); + config.getRoot().ifPresent(root -> properties.put(LanceNamespaceConfig.KEY_ROOT, root)); + config.getUri().ifPresent(uri -> properties.put(LanceNamespaceConfig.KEY_URI, uri)); + + namespace = LanceNamespace.connect(config.getImpl(), properties, allocator); + LOG.info("LanceNamespace initialized successfully with impl: {}", config.getImpl()); + } catch (Exception e) { + LOG.error("Failed to initialize LanceNamespace", e); + throw new RuntimeException("Failed to initialize LanceNamespace", e); + } + } + + /** + * Returns the underlying LanceNamespace instance, initializing it automatically if needed. + */ + public LanceNamespace getNamespace() { + if (namespace == null) { + init(); + } + return namespace; + } + + /** + * Returns the BufferAllocator. + */ + public BufferAllocator getAllocator() { + return allocator; + } + + /** + * Returns the configuration. + */ + public LanceNamespaceConfig getConfig() { + return config; + } + + /** + * Closes the connection and releases resources. + */ + @Override + public void close() { + try { + if (namespace instanceof AutoCloseable) { + try { + ((AutoCloseable) namespace).close(); + } catch (Exception e) { + LOG.debug("Error invoking close() on namespace", e); + } + } + } catch (Exception e) { + LOG.warn("Error during namespace cleanup", e); + } + + if (allocator != null) { + allocator.close(); + } + } + + /** + * Holder class for table metadata. + */ + public static class TableMetadata { + private final String location; + private final Map storageOptions; + + public TableMetadata(String location, Map storageOptions) { + this.location = location; + this.storageOptions = storageOptions; + } + + public String getLocation() { + return location; + } + + public Map getStorageOptions() { + return storageOptions; + } + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/flink/connector/lance/catalog/namespace/LanceNamespaceConfig.java b/src/main/java/org/apache/flink/connector/lance/catalog/namespace/LanceNamespaceConfig.java new file mode 100644 index 0000000..ad3f541 --- /dev/null +++ b/src/main/java/org/apache/flink/connector/lance/catalog/namespace/LanceNamespaceConfig.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.lance.catalog.namespace; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +/** + * Configuration for Lance Namespace integration. + * + * Supports: + * - Namespace implementation selection (dir, rest, custom) + * - Implementation-specific parameters (root path, REST URI) + * - Extra level configuration (for Spark compatibility) + * - Parent prefix support (for Hive 3 compatibility) + */ +public class LanceNamespaceConfig { + + // Configuration keys + public static final String KEY_IMPL = "impl"; + public static final String KEY_ROOT = "root"; + public static final String KEY_URI = "uri"; + public static final String KEY_EXTRA_LEVEL = "extra_level"; + public static final String KEY_PARENT = "parent"; + public static final String KEY_PARENT_DELIMITER = "parent_delimiter"; + + private final String impl; + private final Map properties; + private final Optional extraLevel; + private final Optional parent; + private final String parentDelimiter; + + /** + * Create configuration from properties map. + */ + public static LanceNamespaceConfig from(Map properties) { + return new LanceNamespaceConfig(properties); + } + + /** + * Create builder for configuration. + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Private constructor. + */ + private LanceNamespaceConfig(Map properties) { + this.properties = new HashMap<>(Objects.requireNonNull(properties, "Properties cannot be null")); + + // Extract required impl + this.impl = properties.get(KEY_IMPL); + if (this.impl == null || this.impl.isEmpty()) { + throw new IllegalArgumentException("Missing required configuration: " + KEY_IMPL); + } + + // Extract optional extra level + String extraLevelValue = properties.get(KEY_EXTRA_LEVEL); + this.extraLevel = extraLevelValue != null && !extraLevelValue.isEmpty() ? + Optional.of(extraLevelValue) : Optional.empty(); + + // Extract optional parent prefix + String parentValue = properties.get(KEY_PARENT); + this.parent = parentValue != null && !parentValue.isEmpty() ? + Optional.of(parentValue) : Optional.empty(); + + // Extract parent delimiter + this.parentDelimiter = properties.getOrDefault(KEY_PARENT_DELIMITER, "."); + } + + /** + * Get namespace implementation type. + */ + public String getImpl() { + return impl; + } + + /** + * Get all configuration properties. + */ + public Map getProperties() { + return Collections.unmodifiableMap(properties); + } + + /** + * Get root path for directory namespace implementation. + */ + public Optional getRoot() { + return Optional.ofNullable(properties.get(KEY_ROOT)); + } + + /** + * Get URI for REST namespace implementation. + */ + public Optional getUri() { + return Optional.ofNullable(properties.get(KEY_URI)); + } + + /** + * Get extra level configuration (for Spark compatibility). + */ + public Optional getExtraLevel() { + return extraLevel; + } + + /** + * Get parent prefix configuration (for Hive 3 compatibility). + */ + public Optional getParent() { + return parent; + } + + /** + * Get parent delimiter. + */ + public String getParentDelimiter() { + return parentDelimiter; + } + + /** + * Get parent prefix as array. + */ + public Optional getParentArray() { + return parent.map(p -> p.split(java.util.regex.Pattern.quote(parentDelimiter))); + } + + /** + * Check if directory namespace implementation. + */ + public boolean isDirectoryNamespace() { + return "dir".equals(impl); + } + + /** + * Check if REST namespace implementation. + */ + public boolean isRestNamespace() { + return "rest".equals(impl); + } + + /** + * Check if extra level should be automatically configured. + */ + public boolean shouldAutoConfigureExtraLevel() { + return !extraLevel.isPresent() && isDirectoryNamespace(); + } + + @Override + public String toString() { + return "LanceNamespaceConfig{" + + "impl='" + impl + '\'' + + ", extraLevel=" + extraLevel + + ", parent=" + parent + + ", properties=" + properties + + '}'; + } + + /** + * Builder for LanceNamespaceConfig. + */ + public static class Builder { + private final Map properties = new HashMap<>(); + + public Builder impl(String impl) { + properties.put(KEY_IMPL, impl); + return this; + } + + public Builder root(String root) { + properties.put(KEY_ROOT, root); + return this; + } + + public Builder uri(String uri) { + properties.put(KEY_URI, uri); + return this; + } + + public Builder extraLevel(String extraLevel) { + properties.put(KEY_EXTRA_LEVEL, extraLevel); + return this; + } + + public Builder parent(String parent) { + properties.put(KEY_PARENT, parent); + return this; + } + + public Builder parentDelimiter(String delimiter) { + properties.put(KEY_PARENT_DELIMITER, delimiter); + return this; + } + + public Builder property(String key, String value) { + properties.put(key, value); + return this; + } + + public Builder properties(Map props) { + properties.putAll(props); + return this; + } + + public LanceNamespaceConfig build() { + return new LanceNamespaceConfig(properties); + } + } +} diff --git a/src/main/java/org/apache/flink/connector/lance/table/LanceDynamicTableSource.java b/src/main/java/org/apache/flink/connector/lance/table/LanceDynamicTableSource.java index dfcf186..5ec7634 100644 --- a/src/main/java/org/apache/flink/connector/lance/table/LanceDynamicTableSource.java +++ b/src/main/java/org/apache/flink/connector/lance/table/LanceDynamicTableSource.java @@ -25,7 +25,6 @@ import org.apache.flink.connector.lance.config.LanceOptions; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.source.DataStreamScanProvider; import org.apache.flink.table.connector.source.DynamicTableSource; diff --git a/src/test/java/org/apache/flink/connector/lance/catalog/namespace/LanceNamespaceAdapterITCase.java b/src/test/java/org/apache/flink/connector/lance/catalog/namespace/LanceNamespaceAdapterITCase.java new file mode 100644 index 0000000..ae94c7f --- /dev/null +++ b/src/test/java/org/apache/flink/connector/lance/catalog/namespace/LanceNamespaceAdapterITCase.java @@ -0,0 +1,697 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.lance.catalog.namespace; + +import org.lance.namespace.LanceNamespace; +import org.lance.namespace.model.CreateNamespaceRequest; +import org.lance.namespace.model.CreateEmptyTableRequest; +import org.lance.namespace.model.DescribeNamespaceRequest; +import org.lance.namespace.model.DescribeNamespaceResponse; +import org.lance.namespace.model.DescribeTableRequest; +import org.lance.namespace.model.DescribeTableResponse; +import org.lance.namespace.model.DropNamespaceRequest; +import org.lance.namespace.model.DropTableRequest; +import org.lance.namespace.model.ListNamespacesRequest; +import org.lance.namespace.model.ListNamespacesResponse; +import org.lance.namespace.model.ListTablesRequest; +import org.lance.namespace.model.ListTablesResponse; +import org.lance.namespace.model.NamespaceExistsRequest; +import org.lance.namespace.model.TableExistsRequest; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** + * Lance Namespace 集成测试。 + * + *

本测试直接使用 Lance Namespace API(通过 LanceNamespaceAdapter 管理连接), + * 使用真实的 Lance Namespace 后端(DirectoryNamespace 和 RestNamespace),无 mock。 + */ +@DisplayName("Lance Namespace Integration Test") +class LanceNamespaceAdapterITCase { + + @TempDir + Path tempDir; + + private LanceNamespaceAdapter adapter; + private LanceNamespace ns; + private String warehousePath; + + @BeforeEach + void setUp() { + warehousePath = tempDir.resolve("warehouse").toString(); + + Map properties = new HashMap<>(); + properties.put(LanceNamespaceConfig.KEY_IMPL, "dir"); + properties.put(LanceNamespaceConfig.KEY_ROOT, warehousePath); + + adapter = LanceNamespaceAdapter.create(properties); + adapter.init(); + ns = adapter.getNamespace(); + } + + @AfterEach + void tearDown() throws Exception { + if (adapter != null) { + adapter.close(); + } + } + + // ========== 辅助方法 ========== + + private void createNamespace(Map props, String... id) { + CreateNamespaceRequest req = new CreateNamespaceRequest(); + req.setId(Arrays.asList(id)); + if (props != null) { + req.setProperties(props); + } + ns.createNamespace(req); + } + + private boolean namespaceExists(String... id) { + try { + NamespaceExistsRequest req = new NamespaceExistsRequest(); + req.setId(Arrays.asList(id)); + ns.namespaceExists(req); + return true; + } catch (Exception e) { + return false; + } + } + + private List listNamespaces(String... parent) { + ListNamespacesRequest req = new ListNamespacesRequest(); + if (parent.length > 0) { + req.setId(Arrays.asList(parent)); + } + ListNamespacesResponse resp = ns.listNamespaces(req); + if (resp.getNamespaces() != null) { + return new ArrayList<>(resp.getNamespaces()); + } + return new ArrayList<>(); + } + + private Map getNamespaceMetadata(String... id) { + DescribeNamespaceRequest req = new DescribeNamespaceRequest(); + req.setId(Arrays.asList(id)); + DescribeNamespaceResponse resp = ns.describeNamespace(req); + return resp.getProperties() != null ? resp.getProperties() : new HashMap<>(); + } + + private void dropNamespace(boolean cascade, String... id) { + DropNamespaceRequest req = new DropNamespaceRequest(); + req.setId(Arrays.asList(id)); + req.setBehavior(cascade ? "CASCADE" : "RESTRICT"); + ns.dropNamespace(req); + } + + private void createEmptyTable(String location, Map props, String... id) { + CreateEmptyTableRequest req = new CreateEmptyTableRequest(); + req.setId(Arrays.asList(id)); + req.setLocation(location); + if (props != null) { + req.setProperties(props); + } + ns.createEmptyTable(req); + } + + private boolean tableExists(String... id) { + try { + TableExistsRequest req = new TableExistsRequest(); + req.setId(Arrays.asList(id)); + ns.tableExists(req); + return true; + } catch (Exception e) { + return false; + } + } + + private List listTables(String... namespaceId) { + ListTablesRequest req = new ListTablesRequest(); + req.setId(Arrays.asList(namespaceId)); + ListTablesResponse resp = ns.listTables(req); + if (resp.getTables() != null) { + return new ArrayList<>(resp.getTables()); + } + return new ArrayList<>(); + } + + private LanceNamespaceAdapter.TableMetadata getTableMetadata(String... id) { + DescribeTableRequest req = new DescribeTableRequest(); + req.setId(Arrays.asList(id)); + DescribeTableResponse resp = ns.describeTable(req); + String location = resp.getLocation(); + Map options = resp.getProperties() != null ? resp.getProperties() : new HashMap<>(); + return new LanceNamespaceAdapter.TableMetadata(location, options); + } + + private void dropTable(String... id) { + DropTableRequest req = new DropTableRequest(); + req.setId(Arrays.asList(id)); + ns.dropTable(req); + } + + // ==================== Namespace 管理测试 ==================== + + @Test + @DisplayName("Test creating namespace") + void testCreateNamespace() { + String namespaceName = "test_db"; + Map properties = new HashMap<>(); + properties.put("description", "Test database"); + + createNamespace(properties, namespaceName); + + assertThat(namespaceExists(namespaceName)).isTrue(); + Map metadata = getNamespaceMetadata(namespaceName); + assertThat(metadata).isNotNull(); + } + + @Test + @DisplayName("Test creating nested namespace") + void testCreateNestedNamespace() { + String parentNamespace = "parent_db"; + String childNamespace = "child_db"; + + createNamespace(new HashMap<>(), parentNamespace); + createNamespace(new HashMap<>(), parentNamespace, childNamespace); + + assertThat(namespaceExists(parentNamespace)).isTrue(); + assertThat(namespaceExists(parentNamespace, childNamespace)).isTrue(); + } + + @Test + @DisplayName("Test listing all top-level namespaces") + void testListNamespaces() { + createNamespace(new HashMap<>(), "db1"); + createNamespace(new HashMap<>(), "db2"); + createNamespace(new HashMap<>(), "db3"); + + List namespaces = listNamespaces(); + + assertThat(namespaces).isNotNull(); + assertThat(namespaces).contains("db1", "db2", "db3"); + assertThat(namespaces.size()).isGreaterThanOrEqualTo(3); + } + + @Test + @DisplayName("Test listing child namespaces") + void testListChildNamespaces() { + String parent = "my_warehouse"; + createNamespace(new HashMap<>(), parent); + createNamespace(new HashMap<>(), parent, "schema1"); + createNamespace(new HashMap<>(), parent, "schema2"); + + List childNamespaces = listNamespaces(parent); + + assertThat(childNamespaces).isNotNull(); + assertThat(childNamespaces).contains("schema1", "schema2"); + } + + @Test + @DisplayName("Test checking namespace existence") + void testNamespaceExists() { + String namespaceName = "existing_db"; + createNamespace(new HashMap<>(), namespaceName); + + assertThat(namespaceExists(namespaceName)).isTrue(); + assertThat(namespaceExists("non_existing_db")).isFalse(); + } + + @Test + @DisplayName("Test dropping namespace") + void testDropNamespace() { + String namespaceName = "temp_db"; + createNamespace(new HashMap<>(), namespaceName); + assertThat(namespaceExists(namespaceName)).isTrue(); + + dropNamespace(false, namespaceName); + + assertThat(namespaceExists(namespaceName)).isFalse(); + } + + @Test + @DisplayName("Test cascade dropping namespace and its contents") + void testDropNamespaceCascade() { + String namespaceName = "cascade_db"; + createNamespace(new HashMap<>(), namespaceName); + + dropNamespace(true, namespaceName); + + assertThat(namespaceExists(namespaceName)).isFalse(); + } + + @Test + @DisplayName("Test getting namespace metadata") + void testGetNamespaceMetadata() { + String namespaceName = "metadata_db"; + Map properties = new HashMap<>(); + properties.put("owner", "admin"); + properties.put("environment", "test"); + + createNamespace(properties, namespaceName); + + Map metadata = getNamespaceMetadata(namespaceName); + + assertThat(metadata).isNotNull(); + assertThat(metadata).containsKeys("owner", "environment"); + } + + // ==================== Table 管理测试 ==================== + + @Test + @DisplayName("Test creating table in namespace") + void testCreateTable() { + String namespaceName = "my_db"; + String tableName = "my_table"; + createNamespace(new HashMap<>(), namespaceName); + + String tableLocation = warehousePath + "/" + namespaceName + "/" + tableName; + Map tableProperties = new HashMap<>(); + tableProperties.put("format", "lance"); + + createEmptyTable(tableLocation, tableProperties, namespaceName, tableName); + + assertThat(tableExists(namespaceName, tableName)).isTrue(); + } + + @Test + @DisplayName("Test creating multiple tables in same namespace") + void testCreateMultipleTables() { + String namespaceName = "test_db"; + createNamespace(new HashMap<>(), namespaceName); + + String[] tableNames = {"users", "products", "orders", "analytics"}; + + for (String tableName : tableNames) { + String tableLocation = warehousePath + "/" + namespaceName + "/" + tableName; + createEmptyTable(tableLocation, new HashMap<>(), namespaceName, tableName); + } + + List tables = listTables(namespaceName); + assertThat(tables).isNotNull(); + assertThat(tables).contains(tableNames); + } + + @Test + @DisplayName("Test listing all tables in namespace") + void testListTables() { + String namespaceName = "query_db"; + createNamespace(new HashMap<>(), namespaceName); + + createEmptyTable( + warehousePath + "/" + namespaceName + "/table1", + new HashMap<>(), + namespaceName, "table1" + ); + createEmptyTable( + warehousePath + "/" + namespaceName + "/table2", + new HashMap<>(), + namespaceName, "table2" + ); + + List tables = listTables(namespaceName); + + assertThat(tables).isNotNull(); + assertThat(tables).contains("table1", "table2"); + assertThat(tables.size()).isGreaterThanOrEqualTo(2); + } + + @Test + @DisplayName("Test checking table existence") + void testTableExists() { + String namespaceName = "check_db"; + String tableName = "check_table"; + createNamespace(new HashMap<>(), namespaceName); + + String tableLocation = warehousePath + "/" + namespaceName + "/" + tableName; + createEmptyTable(tableLocation, new HashMap<>(), namespaceName, tableName); + + assertThat(tableExists(namespaceName, tableName)).isTrue(); + assertThat(tableExists(namespaceName, "non_existing_table")).isFalse(); + } + + @Test + @DisplayName("Test getting table metadata") + void testGetTableMetadata() { + String namespaceName = "metadata_db"; + String tableName = "metadata_table"; + createNamespace(new HashMap<>(), namespaceName); + + String tableLocation = warehousePath + "/" + namespaceName + "/" + tableName; + Map tableProperties = new HashMap<>(); + tableProperties.put("format", "lance"); + tableProperties.put("index", "ivf"); + + createEmptyTable(tableLocation, tableProperties, namespaceName, tableName); + + LanceNamespaceAdapter.TableMetadata metadata = + getTableMetadata(namespaceName, tableName); + + assertThat(metadata).isNotNull(); + assertThat(metadata.getLocation()).isNotNull(); + assertThat(metadata.getStorageOptions()).isNotNull(); + } + + @Test + @DisplayName("Test dropping table") + void testDropTable() { + String namespaceName = "drop_db"; + String tableName = "drop_table"; + createNamespace(new HashMap<>(), namespaceName); + + String tableLocation = warehousePath + "/" + namespaceName + "/" + tableName; + createEmptyTable(tableLocation, new HashMap<>(), namespaceName, tableName); + assertThat(tableExists(namespaceName, tableName)).isTrue(); + + dropTable(namespaceName, tableName); + + assertThat(tableExists(namespaceName, tableName)).isFalse(); + } + + @Test + @DisplayName("Test dropping multiple tables in namespace") + void testDropMultipleTables() { + String namespaceName = "cleanup_db"; + createNamespace(new HashMap<>(), namespaceName); + + String[] tableNames = {"temp1", "temp2", "temp3"}; + for (String tableName : tableNames) { + String tableLocation = warehousePath + "/" + namespaceName + "/" + tableName; + createEmptyTable(tableLocation, new HashMap<>(), namespaceName, tableName); + } + + for (String tableName : tableNames) { + assertThat(tableExists(namespaceName, tableName)).isTrue(); + } + + for (String tableName : tableNames) { + dropTable(namespaceName, tableName); + } + + for (String tableName : tableNames) { + assertThat(tableExists(namespaceName, tableName)).isFalse(); + } + } + + // ==================== 综合场景测试 ==================== + + @Test + @DisplayName("Test complete table CRUD lifecycle") + void testCompleteTableCrudLifecycle() { + // 1. 创建 namespace + String namespaceName = "complete_db"; + Map dbProps = new HashMap<>(); + dbProps.put("owner", "admin"); + createNamespace(dbProps, namespaceName); + assertThat(namespaceExists(namespaceName)).isTrue(); + + // 2. 创建 table + String tableName = "complete_table"; + String tableLocation = warehousePath + "/" + namespaceName + "/" + tableName; + Map tableProps = new HashMap<>(); + tableProps.put("format", "lance"); + createEmptyTable(tableLocation, tableProps, namespaceName, tableName); + assertThat(tableExists(namespaceName, tableName)).isTrue(); + + // 3. 列出 tables + List tables = listTables(namespaceName); + assertThat(tables).contains(tableName); + + // 4. 获取 table metadata + LanceNamespaceAdapter.TableMetadata tableMetadata = + getTableMetadata(namespaceName, tableName); + assertThat(tableMetadata.getLocation()).contains(tableName); + + // 5. 删除 table + dropTable(namespaceName, tableName); + assertThat(tableExists(namespaceName, tableName)).isFalse(); + + // 6. 删除 namespace + dropNamespace(true, namespaceName); + assertThat(namespaceExists(namespaceName)).isFalse(); + } + + @Test + @DisplayName("Test independence of multiple namespaces") + void testMultipleNamespaceIndependence() { + String db1 = "database1"; + String db2 = "database2"; + String tableName = "test_table"; + + createNamespace(new HashMap<>(), db1); + createNamespace(new HashMap<>(), db2); + + createEmptyTable( + warehousePath + "/" + db1 + "/" + tableName, + new HashMap<>(), + db1, tableName + ); + createEmptyTable( + warehousePath + "/" + db2 + "/" + tableName, + new HashMap<>(), + db2, tableName + ); + + assertThat(tableExists(db1, tableName)).isTrue(); + assertThat(tableExists(db2, tableName)).isTrue(); + + dropTable(db1, tableName); + assertThat(tableExists(db1, tableName)).isFalse(); + assertThat(tableExists(db2, tableName)).isTrue(); + } + + @Test + @DisplayName("Test naming with underscores and numbers") + void testSpecialCharacterNaming() { + String namespaceName = "test_db_123"; + String tableName = "data_table_v2_001"; + + createNamespace(new HashMap<>(), namespaceName); + + String tableLocation = warehousePath + "/" + namespaceName + "/" + tableName; + createEmptyTable(tableLocation, new HashMap<>(), namespaceName, tableName); + + assertThat(namespaceExists(namespaceName)).isTrue(); + assertThat(tableExists(namespaceName, tableName)).isTrue(); + } + + @Test + @DisplayName("Test creating many tables in single namespace") + void testCreateManyTables() { + String namespaceName = "scale_db"; + createNamespace(new HashMap<>(), namespaceName); + + int tableCount = 50; + + for (int i = 0; i < tableCount; i++) { + String tableName = "table_" + String.format("%03d", i); + String tableLocation = warehousePath + "/" + namespaceName + "/" + tableName; + createEmptyTable(tableLocation, new HashMap<>(), namespaceName, tableName); + } + + List tables = listTables(namespaceName); + assertThat(tables).isNotNull(); + assertThat(tables.size()).isGreaterThanOrEqualTo(tableCount); + } + + @Test + @DisplayName("Test creating existing namespace throws exception") + void testCreateExistingNamespaceThrowsException() { + String namespaceName = "existing_db"; + createNamespace(new HashMap<>(), namespaceName); + + assertThatThrownBy(() -> + createNamespace(new HashMap<>(), namespaceName) + ).isNotNull(); + } + + @Test + @DisplayName("Test dropping non-existing namespace throws exception") + void testDropNonExistingNamespaceThrowsException() { + assertThatThrownBy(() -> + dropNamespace(false, "non_existing_db") + ).isNotNull(); + } + + @Test + @DisplayName("Test dropping non-existing table throws exception") + void testDropNonExistingTableThrowsException() { + String namespaceName = "error_db"; + createNamespace(new HashMap<>(), namespaceName); + + assertThatThrownBy(() -> + dropTable(namespaceName, "non_existing_table") + ).isNotNull(); + } + + @Test + @DisplayName("Test adapter closes correctly and releases resources") + void testAdapterCloseAndResourceCleanup() throws Exception { + String namespaceName = "cleanup_db"; + createNamespace(new HashMap<>(), namespaceName); + + assertThat(namespaceExists(namespaceName)).isTrue(); + + adapter.close(); + + // 创建新的 adapter 验证数据持久化 + Map properties = new HashMap<>(); + properties.put(LanceNamespaceConfig.KEY_IMPL, "dir"); + properties.put(LanceNamespaceConfig.KEY_ROOT, warehousePath); + + LanceNamespaceAdapter newAdapter = LanceNamespaceAdapter.create(properties); + newAdapter.init(); + LanceNamespace newNs = newAdapter.getNamespace(); + + try { + NamespaceExistsRequest req = new NamespaceExistsRequest(); + req.setId(Arrays.asList(namespaceName)); + newNs.namespaceExists(req); + // 如果没有抛异常,说明 namespace 存在 + } finally { + newAdapter.close(); + } + } + + // ==================== REST Namespace 测试 ==================== + + @Nested + @DisplayName("REST Namespace Backend Tests") + @EnabledIfEnvironmentVariable(named = "LANCE_REST_URI", matches = ".+") + class RestNamespaceTests { + + private LanceNamespaceAdapter restAdapter; + private LanceNamespace restNs; + + @BeforeEach + void setUp() { + String restUri = System.getenv("LANCE_REST_URI"); + + Map properties = new HashMap<>(); + properties.put(LanceNamespaceConfig.KEY_IMPL, "rest"); + properties.put(LanceNamespaceConfig.KEY_URI, restUri); + + restAdapter = LanceNamespaceAdapter.create(properties); + restAdapter.init(); + restNs = restAdapter.getNamespace(); + } + + @AfterEach + void tearDown() throws Exception { + if (restAdapter != null) { + restAdapter.close(); + } + } + + @Test + @DisplayName("Test REST namespace creation and listing") + void testRestNamespaceOperations() { + String testNamespace = "rest_test_db_" + System.currentTimeMillis(); + + try { + CreateNamespaceRequest createReq = new CreateNamespaceRequest(); + createReq.setId(Arrays.asList(testNamespace)); + restNs.createNamespace(createReq); + + NamespaceExistsRequest existsReq = new NamespaceExistsRequest(); + existsReq.setId(Arrays.asList(testNamespace)); + restNs.namespaceExists(existsReq); + + ListNamespacesRequest listReq = new ListNamespacesRequest(); + ListNamespacesResponse listResp = restNs.listNamespaces(listReq); + assertThat(listResp.getNamespaces()).contains(testNamespace); + } finally { + try { + DropNamespaceRequest dropReq = new DropNamespaceRequest(); + dropReq.setId(Arrays.asList(testNamespace)); + dropReq.setBehavior("CASCADE"); + restNs.dropNamespace(dropReq); + } catch (Exception ignored) { + } + } + } + + @Test + @DisplayName("Test REST table operations") + void testRestTableOperations() { + String testNamespace = "rest_table_db_" + System.currentTimeMillis(); + String testTable = "rest_test_table"; + + try { + CreateNamespaceRequest createNsReq = new CreateNamespaceRequest(); + createNsReq.setId(Arrays.asList(testNamespace)); + restNs.createNamespace(createNsReq); + + CreateEmptyTableRequest createTblReq = new CreateEmptyTableRequest(); + createTblReq.setId(Arrays.asList(testNamespace, testTable)); + restNs.createEmptyTable(createTblReq); + + TableExistsRequest existsReq = new TableExistsRequest(); + existsReq.setId(Arrays.asList(testNamespace, testTable)); + restNs.tableExists(existsReq); + + ListTablesRequest listReq = new ListTablesRequest(); + listReq.setId(Arrays.asList(testNamespace)); + ListTablesResponse listResp = restNs.listTables(listReq); + assertThat(listResp.getTables()).contains(testTable); + + DescribeTableRequest descReq = new DescribeTableRequest(); + descReq.setId(Arrays.asList(testNamespace, testTable)); + DescribeTableResponse descResp = restNs.describeTable(descReq); + assertThat(descResp).isNotNull(); + + DropTableRequest dropTblReq = new DropTableRequest(); + dropTblReq.setId(Arrays.asList(testNamespace, testTable)); + restNs.dropTable(dropTblReq); + + try { + restNs.tableExists(existsReq); + // 如果没抛异常说明还存在,这不应该发生 + assertThat(false).isTrue(); + } catch (Exception e) { + // 预期: table 已不存在 + } + } finally { + try { + DropNamespaceRequest dropReq = new DropNamespaceRequest(); + dropReq.setId(Arrays.asList(testNamespace)); + dropReq.setBehavior("CASCADE"); + restNs.dropNamespace(dropReq); + } catch (Exception ignored) { + } + } + } + } +}