diff --git a/client-v2/src/main/java/com/clickhouse/client/api/ClickHouseException.java b/client-v2/src/main/java/com/clickhouse/client/api/ClickHouseException.java index 83da17938..4793515d0 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/ClickHouseException.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/ClickHouseException.java @@ -3,6 +3,8 @@ public class ClickHouseException extends RuntimeException { protected boolean isRetryable = false; + protected String queryId; + public ClickHouseException(String message) { super(message); } @@ -11,8 +13,21 @@ public ClickHouseException(String message, Throwable cause) { super(message, cause); } + public ClickHouseException(String message, Throwable cause, String queryId) { + super(message, cause); + this.queryId = queryId; + } + public ClickHouseException(Throwable cause) { super(cause); } public boolean isRetryable() { return isRetryable; } + + public void setQueryId(String queryId) { + this.queryId = queryId; + } + + public String getQueryId() { + return queryId; + } } diff --git a/client-v2/src/main/java/com/clickhouse/client/api/Client.java b/client-v2/src/main/java/com/clickhouse/client/api/Client.java index e76788388..6dc507260 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/Client.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/Client.java @@ -139,21 +139,18 @@ public class Client implements AutoCloseable { // Server context private String dbUser; private String serverVersion; - private Object metricsRegistry; - private int retries; + private final Object metricsRegistry; + private final int retries; private LZ4Factory lz4Factory = null; + private final Supplier queryIdGenerator; private Client(Collection endpoints, Map configuration, - ExecutorService sharedOperationExecutor, ColumnToMethodMatchingStrategy columnToMethodMatchingStrategy) { - this(endpoints, configuration, sharedOperationExecutor, columnToMethodMatchingStrategy, null); - } - - private Client(Collection endpoints, Map configuration, - ExecutorService sharedOperationExecutor, ColumnToMethodMatchingStrategy columnToMethodMatchingStrategy, Object metricsRegistry) { - // Simple initialization + ExecutorService sharedOperationExecutor, ColumnToMethodMatchingStrategy columnToMethodMatchingStrategy, + Object metricsRegistry, Supplier queryIdGenerator) { this.configuration = ClientConfigProperties.parseConfigMap(configuration); this.readOnlyConfig = Collections.unmodifiableMap(configuration); this.metricsRegistry = metricsRegistry; + this.queryIdGenerator = queryIdGenerator; // Serialization this.pojoSerDe = new POJOSerDe(columnToMethodMatchingStrategy); @@ -186,7 +183,7 @@ private Client(Collection endpoints, Map configuration, } this.endpoints = tmpEndpoints.build(); - this.httpClientHelper = new HttpAPIClientHelper(this.configuration, metricsRegistry, initSslContext); + this.httpClientHelper = new HttpAPIClientHelper(this.configuration, this.metricsRegistry, initSslContext); String retry = configuration.get(ClientConfigProperties.RETRY_ON_FAILURE.getKey()); this.retries = retry == null ? 0 : Integer.parseInt(retry); @@ -268,6 +265,8 @@ public static class Builder { private ExecutorService sharedOperationExecutor = null; private ColumnToMethodMatchingStrategy columnToMethodMatchingStrategy; private Object metricRegistry = null; + private Supplier queryIdGenerator; + public Builder() { this.endpoints = new HashSet<>(); this.configuration = new HashMap<>(); @@ -1073,6 +1072,16 @@ public Builder sslSocketSNI(String sni) { return this; } + /** + * Sets query id generator. Will be used when operation settings (InsertSettings, QuerySettings) do not have query id set. + * @param supplier + * @return + */ + public Builder queryIdGenerator(Supplier supplier) { + this.queryIdGenerator = supplier; + return this; + } + public Client build() { // check if endpoint are empty. so can not initiate client if (this.endpoints.isEmpty()) { @@ -1131,7 +1140,7 @@ public Client build() { } return new Client(this.endpoints, this.configuration, this.sharedOperationExecutor, - this.columnToMethodMatchingStrategy, this.metricRegistry); + this.columnToMethodMatchingStrategy, this.metricRegistry, this.queryIdGenerator); } } @@ -1270,6 +1279,9 @@ public CompletableFuture insert(String tableName, List data, final int maxRetries = retry == null ? 0 : retry; requestSettings.setOption(ClientConfigProperties.INPUT_OUTPUT_FORMAT.getKey(), format); + if (requestSettings.getQueryId() == null && queryIdGenerator != null) { + requestSettings.setQueryId(queryIdGenerator.get()); + } Supplier supplier = () -> { long startTime = System.nanoTime(); // Selecting some node @@ -1316,8 +1328,8 @@ public CompletableFuture insert(String tableName, List data, metrics.setQueryId(queryId); return new InsertResponse(metrics); } catch (Exception e) { - lastException = httpClientHelper.wrapException(String.format("Query request failed (Attempt: %s/%s - Duration: %s)", - (i + 1), (maxRetries + 1), durationSince(startTime)), e); + String msg = requestExMsg("Insert", (i + 1), durationSince(startTime).toMillis(), requestSettings.getQueryId()); + lastException = httpClientHelper.wrapException(msg, e, requestSettings.getQueryId()); if (httpClientHelper.shouldRetry(e, requestSettings.getAllSettings())) { LOG.warn("Retrying.", e); selectedEndpoint = getNextAliveNode(); @@ -1326,8 +1338,10 @@ public CompletableFuture insert(String tableName, List data, } } } - throw new ClientException("Insert request failed after attempts: " + (maxRetries + 1) + " - Duration: " + durationSince(startTime), lastException); - }; + + String errMsg = requestExMsg("Insert", retries, durationSince(startTime).toMillis(), requestSettings.getQueryId()); + LOG.warn(errMsg); + throw (lastException == null ? new ClientException(errMsg) : lastException); }; return runAsyncOperation(supplier, requestSettings.getAllSettings()); @@ -1487,6 +1501,9 @@ public CompletableFuture insert(String tableName, } sqlStmt.append(" FORMAT ").append(format.name()); requestSettings.serverSetting(ClickHouseHttpProto.QPARAM_QUERY_STMT, sqlStmt.toString()); + if (requestSettings.getQueryId() == null && queryIdGenerator != null) { + requestSettings.setQueryId(queryIdGenerator.get()); + } responseSupplier = () -> { long startTime = System.nanoTime(); // Selecting some node @@ -1518,8 +1535,8 @@ public CompletableFuture insert(String tableName, metrics.setQueryId(queryId); return new InsertResponse(metrics); } catch (Exception e) { - lastException = httpClientHelper.wrapException(String.format("Insert failed (Attempt: %s/%s - Duration: %s)", - (i + 1), (retries + 1), durationSince(startTime)), e); + String msg = requestExMsg("Insert", (i + 1), durationSince(startTime).toMillis(), requestSettings.getQueryId()); + lastException = httpClientHelper.wrapException(msg, e, requestSettings.getQueryId()); if (httpClientHelper.shouldRetry(e, requestSettings.getAllSettings())) { LOG.warn("Retrying.", e); selectedEndpoint = getNextAliveNode(); @@ -1536,8 +1553,9 @@ public CompletableFuture insert(String tableName, } } } - LOG.warn("Insert request failed after attempts: {} - Duration: {}", retries + 1, durationSince(startTime)); - throw (lastException == null ? new ClientException("Failed to complete insert operation") : lastException); + String errMsg = requestExMsg("Insert", retries, durationSince(startTime).toMillis(), requestSettings.getQueryId()); + LOG.warn(errMsg); + throw (lastException == null ? new ClientException(errMsg) : lastException); }; return runAsyncOperation(responseSupplier, requestSettings.getAllSettings()); @@ -1586,7 +1604,7 @@ public CompletableFuture query(String sqlQuery, QuerySettings set * * Notes: *
    - *
  • Server response format can be specified thru {@code settings} or in SQL query.
  • + *
  • Server response format can be specified through {@code settings} or in SQL query.
  • *
  • If specified in both, the {@code sqlQuery} will take precedence.
  • *
* @@ -1612,6 +1630,9 @@ public CompletableFuture query(String sqlQuery, Map { long startTime = System.nanoTime(); // Selecting some node @@ -1652,8 +1673,8 @@ public CompletableFuture query(String sqlQuery, Map query(String sqlQuery, Map buildRequestSettings(Map opSettings) private Duration durationSince(long sinceNanos) { return Duration.ofNanos(System.nanoTime() - sinceNanos); } + + private String requestExMsg(String operation, int attempt, long operationDuration, String queryId) { + return operation + " request failed (attempt: " + attempt +", duration: " + operationDuration + "ms, queryId: " + queryId + ")"; + } } diff --git a/client-v2/src/main/java/com/clickhouse/client/api/ServerException.java b/client-v2/src/main/java/com/clickhouse/client/api/ServerException.java index 0c6ed7574..c3f5aad38 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/ServerException.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/ServerException.java @@ -10,15 +10,14 @@ public class ServerException extends ClickHouseException { private final int transportProtocolCode; - public ServerException(int code, String message) { - this(code, message, 500); - } + private final String queryId; - public ServerException(int code, String message, int transportProtocolCode) { + public ServerException(int code, String message, int transportProtocolCode, String queryId) { super(message); this.code = code; this.transportProtocolCode = transportProtocolCode; this.isRetryable = discoverIsRetryable(code, message, transportProtocolCode); + this.queryId = queryId; } /** @@ -39,8 +38,12 @@ public int getTransportProtocolCode() { return transportProtocolCode; } - public boolean isRetryable() { - return isRetryable; + /** + * Returns query ID that is returned by server in {@link com.clickhouse.client.api.http.ClickHouseHttpProto#HEADER_QUERY_ID} + * @return query id non-null string + */ + public String getQueryId() { + return queryId; } private boolean discoverIsRetryable(int code, String message, int transportProtocolCode) { diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java index 363222c26..e1c335a97 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java @@ -90,6 +90,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; @@ -333,6 +334,7 @@ public CloseableHttpClient createHttpClient(boolean initSslContext, Map (transport error: " + httpResponse.getCode() + ")"; } - return new ServerException(serverCode, "Code: " + msg, httpResponse.getCode()); + return new ServerException(serverCode, "Code: " + msg + " (queryId= " + queryId + ")", httpResponse.getCode(), queryId); } catch (Exception e) { LOG.error("Failed to read error message", e); - return new ServerException(serverCode, String.format(ERROR_CODE_PREFIX_PATTERN, serverCode) + " (transport error: " + httpResponse.getCode() + ")", httpResponse.getCode()); + String msg = String.format(ERROR_CODE_PREFIX_PATTERN, serverCode) + " (transport error: " + httpResponse.getCode() + ")"; + return new ServerException(serverCode, msg + " (queryId= " + queryId + ")", httpResponse.getCode(), queryId); } } @@ -731,7 +736,7 @@ public boolean shouldRetry(Throwable ex, Map requestSettings) { // This method wraps some client specific exceptions into specific ClientException or just ClientException // ClientException will be also wrapped - public RuntimeException wrapException(String message, Exception cause) { + public RuntimeException wrapException(String message, Exception cause, String queryId) { if (cause instanceof ClientException || cause instanceof ServerException) { return (RuntimeException) cause; } @@ -742,14 +747,18 @@ public RuntimeException wrapException(String message, Exception cause) { cause instanceof ConnectException || cause instanceof UnknownHostException || cause instanceof NoRouteToHostException) { - return new ConnectionInitiationException(message, cause); + ConnectionInitiationException ex = new ConnectionInitiationException(message, cause); + ex.setQueryId(queryId); + return ex; } if (cause instanceof SocketTimeoutException || cause instanceof IOException) { - return new DataTransferException(message, cause); + DataTransferException ex = new DataTransferException(message, cause); + ex.setQueryId(queryId); + return ex; } // if we can not identify the exception explicitly we catch as our base exception ClickHouseException - return new ClickHouseException(message, cause); + return new ClickHouseException(message, cause, queryId); } private void correctUserAgentHeader(HttpRequest request, Map requestConfig) { diff --git a/client-v2/src/main/java/com/clickhouse/client/api/query/Records.java b/client-v2/src/main/java/com/clickhouse/client/api/query/Records.java index 46d0e827d..f8eeba40c 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/query/Records.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/query/Records.java @@ -125,6 +125,14 @@ public long getResultRows() { return response.getMetrics().getMetric(ServerMetrics.RESULT_ROWS).getLong(); } + /** + * Returns response query id + * @return query id of the request + */ + public String getQueryId() { + return response.getQueryId(); + } + @Override public void close() throws Exception { response.close(); diff --git a/client-v2/src/test/java/com/clickhouse/client/ErrorHandlingTests.java b/client-v2/src/test/java/com/clickhouse/client/ErrorHandlingTests.java new file mode 100644 index 000000000..314e8c309 --- /dev/null +++ b/client-v2/src/test/java/com/clickhouse/client/ErrorHandlingTests.java @@ -0,0 +1,86 @@ +package com.clickhouse.client; + +import com.clickhouse.client.api.Client; +import com.clickhouse.client.api.DataTransferException; +import com.clickhouse.client.api.ServerException; +import com.clickhouse.client.api.enums.Protocol; +import com.clickhouse.client.api.query.QuerySettings; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.time.temporal.ChronoUnit; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +@Test(groups = {"integration"}) +public class ErrorHandlingTests extends BaseIntegrationTest { + + static { + System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "DEBUG"); + } + + /** + * Tests that a SQL error throws a ServerException. + */ + @Test(groups = {"integration"}) + void testServerError() throws Exception { + try (Client client = newClient().build()) { + // Execute a query against a non-existent table + client.query("SELECT * FROM non_existent_table_xyz_123").get(10, TimeUnit.SECONDS); + Assert.fail("Expected ServerException to be thrown"); + } catch (ServerException e) { + Assert.assertEquals(e.getCode(), ServerException.TABLE_NOT_FOUND); + Assert.assertFalse(e.getQueryId().isEmpty()); + Assert.assertTrue(e.getMessage().contains(e.getQueryId())); + } + } + + /** + * Tests that a SQL error throws a ServerException when async option is enabled. + */ + @Test(groups = {"integration"}) + void testServerErrorAsync() throws Exception { + try (Client client = newClient().useAsyncRequests(true).build()) { + // Execute a query against a non-existent table + client.query("SELECT * FROM non_existent_table_xyz_123").get(10, TimeUnit.SECONDS); + Assert.fail("Expected ServerException to be thrown"); + } catch (ExecutionException e) { + Assert.assertTrue(e.getCause() instanceof ServerException, + "Expected cause to be ServerException but was: " + e.getCause().getClass().getName()); + ServerException se = (ServerException) e.getCause(); + Assert.assertEquals(se.getCode(), ServerException.TABLE_NOT_FOUND, + "Expected TABLE_NOT_FOUND error code"); + Assert.assertEquals(se.getCode(), ServerException.TABLE_NOT_FOUND); + Assert.assertFalse(se.getQueryId().isEmpty()); + Assert.assertTrue(se.getMessage().contains(se.getQueryId())); + } + } + + /** + * Tests that a query exceeding max_execution_time throws a ServerException with TIMEOUT_EXCEEDED code. + */ + @Test(groups = {"integration"}) + void testQueryTimeout() throws Exception { + String queryId = "test-query-id"; + try (Client client = newClient().setSocketTimeout(1, ChronoUnit.SECONDS).build()) { + QuerySettings settings = new QuerySettings().setQueryId(queryId); + + // Execute a query that will take longer than 1 second using sleep function + client.query("SELECT sleep(3)", settings).get(10, TimeUnit.SECONDS); + Assert.fail("Expected ServerException to be thrown due to timeout"); + } catch (DataTransferException e) { + Assert.assertTrue(e.getMessage().contains(queryId)); + Assert.assertEquals(e.getQueryId(), queryId); + } + } + + protected Client.Builder newClient() { + ClickHouseNode node = getServer(ClickHouseProtocol.HTTP); + boolean isSecure = isCloud(); + return new Client.Builder() + .addEndpoint(Protocol.HTTP, node.getHost(), node.getPort(), isSecure) + .setUsername("default") + .setPassword(ClickHouseServerForTest.getPassword()) + .setDefaultDatabase(ClickHouseServerForTest.getDatabase()); + } +} diff --git a/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java b/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java index c8572e3cc..5c5b63583 100644 --- a/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java @@ -443,7 +443,7 @@ public void testErrorWithSuccessfulResponse() { Assert.fail("Expected exception"); } catch (ServerException e) { e.printStackTrace(); - Assert.assertEquals(e.getMessage(), "Code: 241. DB::Exception: Memory limit (for query) exceeded: would use 97.21 MiB"); + Assert.assertTrue(e.getMessage().startsWith("Code: 241. DB::Exception: Memory limit (for query) exceeded: would use 97.21 MiB")); } catch (Exception e) { e.printStackTrace(); Assert.fail("Unexpected exception", e); @@ -484,7 +484,7 @@ public void testServerErrorsUncompressed(int code, String message, String expect } catch (ServerException e) { e.printStackTrace(); Assert.assertEquals(e.getCode(), code); - Assert.assertEquals(e.getMessage(), expectedMessage); + Assert.assertTrue(e.getMessage().startsWith(expectedMessage)); } catch (Exception e) { e.printStackTrace(); Assert.fail("Unexpected exception", e);