Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.exception.BigQueryJdbcException;
import com.google.cloud.bigquery.exception.BigQueryJdbcSqlFeatureNotSupportedException;
import com.google.cloud.bigquery.exception.BigQueryJdbcSqlSyntaxErrorException;
import com.google.cloud.bigquery.jdbc.BigQueryConnection;
import com.google.cloud.bigquery.jdbc.BigQueryDriver;
import com.google.cloud.bigquery.jdbc.BigQueryStatement;
import com.google.cloud.bigquery.jdbc.DataSource;
import com.google.common.collect.ImmutableMap;
import java.io.File;
Expand All @@ -56,6 +58,10 @@
import java.time.LocalTime;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -406,6 +412,207 @@ public void testLocation() throws SQLException {
connection2.close();
}

@Test
public void testCancelLocation() throws Exception {
String connection_uri =
"jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;PROJECTID="
+ PROJECT_ID
+ ";OAUTHTYPE=3;LOCATION=EU";

Driver driver = BigQueryDriver.getRegisteredDriver();
Connection connection = driver.connect(connection_uri, new Properties());
Statement statement = connection.createStatement();

// Query a dataset in the EU with a heavy query so that we can cancel it while it runs.
String query =
"SELECT COUNT(*) FROM `bigquery-public-data.covid19_italy_eu.data_by_province` a "
+ "CROSS JOIN `bigquery-public-data.covid19_italy_eu.data_by_province` b "
+ "CROSS JOIN `bigquery-public-data.covid19_italy_eu.data_by_province` c";

// Run the query in a separate thread so we can cancel it from the main thread
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<SQLException> future =
executor.submit(
() -> {
try {
statement.executeQuery(query);
return null;
} catch (SQLException e) {
return e;
}
});

// Wait a short moment to let the query submit and start running
Thread.sleep(1500);

// Cancel the query execution
statement.cancel();

// Verify that the query threw a SQLException indicating it was cancelled
SQLException exception = future.get(15, TimeUnit.SECONDS);
assertNotNull(exception, "Expected SQLException to be thrown due to cancellation");
assertTrue(
exception.getMessage().contains("cancelled")
|| exception.getMessage().contains("Job was cancelled")
|| exception.getMessage().contains("Query was cancelled"),
"Unexpected exception message: " + exception.getMessage());

connection.close();
executor.shutdown();
}

@Test
public void testEuConnectionQueryAndCancel() throws Exception {
// 1. Create a JDBC connection explicitly set to the EU region
String connectionUri =
"jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;PROJECTID="
+ PROJECT_ID
+ ";OAUTHTYPE=3;LOCATION=EU;JobCreationMode=1;useQueryCache=false";

Driver driver = BigQueryDriver.getRegisteredDriver();
try (Connection connection = driver.connect(connectionUri, new Properties())) {
Statement statement = connection.createStatement();

// 2. Run a query in the EU region
String euQuery =
"SELECT COUNT(*) FROM `bigquery-public-data.covid19_italy_eu.data_by_province` a "
+ "CROSS JOIN `bigquery-public-data.covid19_italy_eu.data_by_province` b "
+ "CROSS JOIN `bigquery-public-data.covid19_italy_eu.data_by_province` c /* "
+ java.util.UUID.randomUUID()
+ " */";

ExecutorService executor = Executors.newSingleThreadExecutor();
Future<SQLException> future =
executor.submit(
() -> {
try {
statement.executeQuery(euQuery);
return null;
} catch (SQLException e) {
return e;
}
});

// Retrieve the JobId from the statement with polling
BigQueryStatement rawStmt = statement.unwrap(BigQueryStatement.class);
java.lang.reflect.Field jobIdsField = BigQueryStatement.class.getDeclaredField("jobIds");
jobIdsField.setAccessible(true);
@SuppressWarnings("unchecked")
java.util.List<JobId> jobIds = (java.util.List<JobId>) jobIdsField.get(rawStmt);
long startTime = System.currentTimeMillis();
while (jobIds.isEmpty() && (System.currentTimeMillis() - startTime < 15000)) {
Thread.sleep(500);
}
assertFalse(jobIds.isEmpty(), "Expected at least one active Job ID in the statement");
JobId runningJobId = jobIds.get(0);

// Verify that the JobId location is indeed "EU"
assertEquals("EU", runningJobId.getLocation(), "Expected JobId to have location EU");

// 3. Verify that the job can be cancelled successfully
statement.cancel();

SQLException cancelException = future.get(40, TimeUnit.SECONDS);
assertNotNull(cancelException, "Expected SQLException to be thrown due to cancellation");
assertTrue(
cancelException.getMessage().contains("cancelled")
|| cancelException.getMessage().contains("Job was cancelled")
|| cancelException.getMessage().contains("Query was cancelled"),
"Unexpected exception message: " + cancelException.getMessage());

executor.shutdown();

// 4. Use the same connection to run a query in the US region (should fail)
String usQuery =
"SELECT COUNT(*) FROM `bigquery-public-data.usa_names.usa_1910_current` LIMIT 10";
SQLException usException =
assertThrows(
SQLException.class,
() -> {
statement.executeQuery(usQuery);
},
"Expected SQLException because US dataset cannot be queried via EU connection");

assertTrue(
usException.getMessage().contains("Not found: Table")
|| usException.getMessage().contains("Location")
|| usException.getMessage().contains("europe")
|| usException.getMessage().contains("EU")
|| usException.getMessage().contains("permission")
|| usException.getMessage().contains("not exist"),
"Unexpected exception message for cross-region query: " + usException.getMessage());
}
}

@Test
public void testJdbcAutoLocationCancel() throws Exception {
// 1. Establish connection WITHOUT location parameter
String connectionUri =
"jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;PROJECTID="
+ PROJECT_ID
+ ";OAUTHTYPE=3;JobCreationMode=1;useQueryCache=false";

Driver driver = BigQueryDriver.getRegisteredDriver();
Connection connection = driver.connect(connectionUri, new Properties());
Statement statement = connection.createStatement();

// Query a dataset in the EU. Since connection has no location, BigQuery routes the job to EU
// automatically.
String query =
"SELECT COUNT(*) FROM `bigquery-public-data.covid19_italy_eu.data_by_province` a "
+ "CROSS JOIN `bigquery-public-data.covid19_italy_eu.data_by_province` b "
+ "CROSS JOIN `bigquery-public-data.covid19_italy_eu.data_by_province` c /* "
+ java.util.UUID.randomUUID()
+ " */";

ExecutorService executor = Executors.newSingleThreadExecutor();
Future<SQLException> future =
executor.submit(
() -> {
try {
statement.executeQuery(query);
return null;
} catch (SQLException e) {
return e;
}
});

// Retrieve the JobId from the statement with polling
BigQueryStatement rawStmt = statement.unwrap(BigQueryStatement.class);
java.lang.reflect.Field jobIdsField = BigQueryStatement.class.getDeclaredField("jobIds");
jobIdsField.setAccessible(true);
@SuppressWarnings("unchecked")
java.util.List<JobId> jobIds = (java.util.List<JobId>) jobIdsField.get(rawStmt);
long startTime = System.currentTimeMillis();
while (jobIds.isEmpty() && (System.currentTimeMillis() - startTime < 15000)) {
Thread.sleep(500);
}
assertFalse(jobIds.isEmpty(), "Expected at least one active Job ID in the statement");
JobId runningJobId = jobIds.get(0);

// Verify that the JobId has location set to "EU" even though the connection did not specify any
// location!
assertEquals(
"EU",
runningJobId.getLocation(),
"Expected JobId to have location EU populated by the backend");

// Cancel the query execution via the JDBC Statement
statement.cancel();

// Verify that the query threw a SQLException indicating it was cancelled
SQLException exception = future.get(40, TimeUnit.SECONDS);
assertNotNull(exception, "Expected SQLException to be thrown due to cancellation");
assertTrue(
exception.getMessage().contains("cancelled")
|| exception.getMessage().contains("Job was cancelled")
|| exception.getMessage().contains("Query was cancelled"),
"Unexpected exception message: " + exception.getMessage());

connection.close();
executor.shutdown();
}
Comment on lines +416 to +614

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The Connection, Statement, and ExecutorService resources are not closed in an exception-safe manner. If an assertion fails or an exception is thrown (e.g., during future.get()), the connection.close() and executor.shutdown() calls will be bypassed, leading to resource leaks. Use try-with-resources for the JDBC resources and a finally block to ensure the executor is shut down.

  @Test
  public void testCancelLocation() throws Exception {
    String connection_uri =
        "jdbc:bigquery://https://www.googleapis.com/bigquery/v2:443;PROJECTID="
            + PROJECT_ID
            + ";OAUTHTYPE=3;LOCATION=EU";

    Driver driver = BigQueryDriver.getRegisteredDriver();
    ExecutorService executor = Executors.newSingleThreadExecutor();
    try (Connection connection = driver.connect(connection_uri, new Properties());
        Statement statement = connection.createStatement()) {

      // Query a dataset in the EU with a heavy query so that we can cancel it while it runs.
      String query =
          "SELECT COUNT(*) FROM bigquery_public_data.covid19_italy_eu.data_by_province a "
              + "CROSS JOIN bigquery_public_data.covid19_italy_eu.data_by_province b "
              + "CROSS JOIN bigquery_public_data.covid19_italy_eu.data_by_province c";

      // Run the query in a separate thread so we can cancel it from the main thread
      Future<SQLException> future = executor.submit(() -> {
        try {
          statement.executeQuery(query);
          return null;
        } catch (SQLException e) {
          return e;
        }
      });

      // Wait a short moment to let the query submit and start running
      Thread.sleep(1500);

      // Cancel the query execution
      statement.cancel();

      // Verify that the query threw a SQLException indicating it was cancelled
      SQLException exception = future.get(15, TimeUnit.SECONDS);
      assertNotNull(exception, "Expected SQLException to be thrown due to cancellation");
      assertTrue(exception.getMessage().contains("cancelled") || exception.getMessage().contains("Job was cancelled") || exception.getMessage().contains("Query was cancelled"),
          "Unexpected exception message: " + exception.getMessage());
    } finally {
      executor.shutdown();
    }
  }
References
  1. When managing a collection of closeable resources, ensure they are closed in the reverse order of their creation (LIFO). The implementation must be exception-safe to prevent resource leaks, meaning all opened resources should be closed even if exceptions occur during their creation or closing.


@Test
public void testIncorrectLocation() throws SQLException {
String connection_uri =
Expand Down
Loading