Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
public class ClickHouseException extends RuntimeException {
protected boolean isRetryable = false;

protected String queryId;

public ClickHouseException(String message) {
super(message);
}
Expand All @@ -11,8 +13,21 @@ public ClickHouseException(String message, Throwable cause) {
super(message, cause);
}

public ClickHouseException(String message, Throwable cause, String queryId) {
super(message, cause);
this.queryId = queryId;
}

public ClickHouseException(Throwable cause) {
super(cause);
}
public boolean isRetryable() { return isRetryable; }

public void setQueryId(String queryId) {
this.queryId = queryId;
}

public String getQueryId() {
return queryId;
}
}
74 changes: 50 additions & 24 deletions client-v2/src/main/java/com/clickhouse/client/api/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,21 +139,18 @@ public class Client implements AutoCloseable {
// Server context
private String dbUser;
private String serverVersion;
private Object metricsRegistry;
private int retries;
private final Object metricsRegistry;
private final int retries;
private LZ4Factory lz4Factory = null;
private final Supplier<String> queryIdGenerator;

private Client(Collection<Endpoint> endpoints, Map<String,String> configuration,
ExecutorService sharedOperationExecutor, ColumnToMethodMatchingStrategy columnToMethodMatchingStrategy) {
this(endpoints, configuration, sharedOperationExecutor, columnToMethodMatchingStrategy, null);
}

private Client(Collection<Endpoint> endpoints, Map<String,String> configuration,
ExecutorService sharedOperationExecutor, ColumnToMethodMatchingStrategy columnToMethodMatchingStrategy, Object metricsRegistry) {
// Simple initialization
ExecutorService sharedOperationExecutor, ColumnToMethodMatchingStrategy columnToMethodMatchingStrategy,
Object metricsRegistry, Supplier<String> queryIdGenerator) {
this.configuration = ClientConfigProperties.parseConfigMap(configuration);
this.readOnlyConfig = Collections.unmodifiableMap(configuration);
this.metricsRegistry = metricsRegistry;
this.queryIdGenerator = queryIdGenerator;

// Serialization
this.pojoSerDe = new POJOSerDe(columnToMethodMatchingStrategy);
Expand Down Expand Up @@ -186,7 +183,7 @@ private Client(Collection<Endpoint> endpoints, Map<String,String> configuration,
}

this.endpoints = tmpEndpoints.build();
this.httpClientHelper = new HttpAPIClientHelper(this.configuration, metricsRegistry, initSslContext);
this.httpClientHelper = new HttpAPIClientHelper(this.configuration, this.metricsRegistry, initSslContext);

String retry = configuration.get(ClientConfigProperties.RETRY_ON_FAILURE.getKey());
this.retries = retry == null ? 0 : Integer.parseInt(retry);
Expand Down Expand Up @@ -268,6 +265,8 @@ public static class Builder {
private ExecutorService sharedOperationExecutor = null;
private ColumnToMethodMatchingStrategy columnToMethodMatchingStrategy;
private Object metricRegistry = null;
private Supplier<String> queryIdGenerator;

public Builder() {
this.endpoints = new HashSet<>();
this.configuration = new HashMap<>();
Expand Down Expand Up @@ -1073,6 +1072,16 @@ public Builder sslSocketSNI(String sni) {
return this;
}

/**
* Sets query id generator. Will be used when operation settings (InsertSettings, QuerySettings) do not have query id set.
* @param supplier
* @return
*/
public Builder queryIdGenerator(Supplier<String> supplier) {
this.queryIdGenerator = supplier;
return this;
}

public Client build() {
// check if endpoint are empty. so can not initiate client
if (this.endpoints.isEmpty()) {
Expand Down Expand Up @@ -1131,7 +1140,7 @@ public Client build() {
}

return new Client(this.endpoints, this.configuration, this.sharedOperationExecutor,
this.columnToMethodMatchingStrategy, this.metricRegistry);
this.columnToMethodMatchingStrategy, this.metricRegistry, this.queryIdGenerator);
}
}

Expand Down Expand Up @@ -1270,6 +1279,9 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
final int maxRetries = retry == null ? 0 : retry;

requestSettings.setOption(ClientConfigProperties.INPUT_OUTPUT_FORMAT.getKey(), format);
if (requestSettings.getQueryId() == null && queryIdGenerator != null) {
requestSettings.setQueryId(queryIdGenerator.get());
}
Supplier<InsertResponse> supplier = () -> {
long startTime = System.nanoTime();
// Selecting some node
Expand Down Expand Up @@ -1316,8 +1328,8 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
metrics.setQueryId(queryId);
return new InsertResponse(metrics);
} catch (Exception e) {
lastException = httpClientHelper.wrapException(String.format("Query request failed (Attempt: %s/%s - Duration: %s)",
(i + 1), (maxRetries + 1), durationSince(startTime)), e);
String msg = requestExMsg("Insert", (i + 1), durationSince(startTime).toMillis(), requestSettings.getQueryId());
lastException = httpClientHelper.wrapException(msg, e, requestSettings.getQueryId());
if (httpClientHelper.shouldRetry(e, requestSettings.getAllSettings())) {
LOG.warn("Retrying.", e);
selectedEndpoint = getNextAliveNode();
Expand All @@ -1326,8 +1338,10 @@ public CompletableFuture<InsertResponse> insert(String tableName, List<?> data,
}
}
}
throw new ClientException("Insert request failed after attempts: " + (maxRetries + 1) + " - Duration: " + durationSince(startTime), lastException);
};

String errMsg = requestExMsg("Insert", retries, durationSince(startTime).toMillis(), requestSettings.getQueryId());
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Off-by-one error in retry attempt count message

Low Severity

The error message after exhausting all retries incorrectly reports the attempt count. The loop runs retries + 1 times (0 through retries inclusive), but requestExMsg receives just retries instead of retries + 1. For example, with 3 retries configured (4 total attempts), the final error message incorrectly shows "attempt: 3" instead of "attempt: 4". The original code correctly used (maxRetries + 1).

Additional Locations (2)

Fix in Cursor Fix in Web

LOG.warn(errMsg);
throw (lastException == null ? new ClientException(errMsg) : lastException); };

return runAsyncOperation(supplier, requestSettings.getAllSettings());

Expand Down Expand Up @@ -1487,6 +1501,9 @@ public CompletableFuture<InsertResponse> insert(String tableName,
}
sqlStmt.append(" FORMAT ").append(format.name());
requestSettings.serverSetting(ClickHouseHttpProto.QPARAM_QUERY_STMT, sqlStmt.toString());
if (requestSettings.getQueryId() == null && queryIdGenerator != null) {
requestSettings.setQueryId(queryIdGenerator.get());
}
responseSupplier = () -> {
long startTime = System.nanoTime();
// Selecting some node
Expand Down Expand Up @@ -1518,8 +1535,8 @@ public CompletableFuture<InsertResponse> insert(String tableName,
metrics.setQueryId(queryId);
return new InsertResponse(metrics);
} catch (Exception e) {
lastException = httpClientHelper.wrapException(String.format("Insert failed (Attempt: %s/%s - Duration: %s)",
(i + 1), (retries + 1), durationSince(startTime)), e);
String msg = requestExMsg("Insert", (i + 1), durationSince(startTime).toMillis(), requestSettings.getQueryId());
lastException = httpClientHelper.wrapException(msg, e, requestSettings.getQueryId());
if (httpClientHelper.shouldRetry(e, requestSettings.getAllSettings())) {
LOG.warn("Retrying.", e);
selectedEndpoint = getNextAliveNode();
Expand All @@ -1536,8 +1553,9 @@ public CompletableFuture<InsertResponse> insert(String tableName,
}
}
}
LOG.warn("Insert request failed after attempts: {} - Duration: {}", retries + 1, durationSince(startTime));
throw (lastException == null ? new ClientException("Failed to complete insert operation") : lastException);
String errMsg = requestExMsg("Insert", retries, durationSince(startTime).toMillis(), requestSettings.getQueryId());
LOG.warn(errMsg);
throw (lastException == null ? new ClientException(errMsg) : lastException);
};

return runAsyncOperation(responseSupplier, requestSettings.getAllSettings());
Expand Down Expand Up @@ -1586,7 +1604,7 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, QuerySettings set
*
* <b>Notes:</b>
* <ul>
* <li>Server response format can be specified thru {@code settings} or in SQL query.</li>
* <li>Server response format can be specified through {@code settings} or in SQL query.</li>
* <li>If specified in both, the {@code sqlQuery} will take precedence.</li>
* </ul>
*
Expand All @@ -1612,6 +1630,9 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
if (queryParams != null) {
requestSettings.setOption(HttpAPIClientHelper.KEY_STATEMENT_PARAMS, queryParams);
}
if (requestSettings.getQueryId() == null && queryIdGenerator != null) {
requestSettings.setQueryId(queryIdGenerator.get());
}
responseSupplier = () -> {
long startTime = System.nanoTime();
// Selecting some node
Expand Down Expand Up @@ -1652,8 +1673,8 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec

} catch (Exception e) {
HttpAPIClientHelper.closeQuietly(httpResponse);
lastException = httpClientHelper.wrapException(String.format("Query request failed (Attempt: %s/%s - Duration: %s)",
(i + 1), (retries + 1), durationSince(startTime)), e);
String msg = requestExMsg("Query", (i + 1), durationSince(startTime).toMillis(), requestSettings.getQueryId());
lastException = httpClientHelper.wrapException(msg, e, requestSettings.getQueryId());
if (httpClientHelper.shouldRetry(e, requestSettings.getAllSettings())) {
LOG.warn("Retrying.", e);
selectedEndpoint = getNextAliveNode();
Expand All @@ -1662,8 +1683,9 @@ public CompletableFuture<QueryResponse> query(String sqlQuery, Map<String, Objec
}
}
}
LOG.warn("Query request failed after attempts: {} - Duration: {}", retries + 1, durationSince(startTime));
throw (lastException == null ? new ClientException("Failed to complete query") : lastException);
String errMsg = requestExMsg("Query", retries, durationSince(startTime).toMillis(), requestSettings.getQueryId());
LOG.warn(errMsg);
throw (lastException == null ? new ClientException(errMsg) : lastException);
};

return runAsyncOperation(responseSupplier, requestSettings.getAllSettings());
Expand Down Expand Up @@ -2141,4 +2163,8 @@ private Map<String, Object> buildRequestSettings(Map<String, Object> opSettings)
private Duration durationSince(long sinceNanos) {
return Duration.ofNanos(System.nanoTime() - sinceNanos);
}

private String requestExMsg(String operation, int attempt, long operationDuration, String queryId) {
return operation + " request failed (attempt: " + attempt +", duration: " + operationDuration + "ms, queryId: " + queryId + ")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@ public class ServerException extends ClickHouseException {

private final int transportProtocolCode;

public ServerException(int code, String message) {
this(code, message, 500);
}
private final String queryId;

public ServerException(int code, String message, int transportProtocolCode) {
public ServerException(int code, String message, int transportProtocolCode, String queryId) {
super(message);
this.code = code;
this.transportProtocolCode = transportProtocolCode;
this.isRetryable = discoverIsRetryable(code, message, transportProtocolCode);
this.queryId = queryId;
}

/**
Expand All @@ -39,8 +38,12 @@ public int getTransportProtocolCode() {
return transportProtocolCode;
}

public boolean isRetryable() {
return isRetryable;
/**
* Returns query ID that is returned by server in {@link com.clickhouse.client.api.http.ClickHouseHttpProto#HEADER_QUERY_ID}
* @return query id non-null string
*/
public String getQueryId() {
return queryId;
}

private boolean discoverIsRetryable(int code, String message, int transportProtocolCode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand Down Expand Up @@ -333,13 +334,16 @@ public CloseableHttpClient createHttpClient(boolean initSslContext, Map<String,
// private static final String ERROR_CODE_PREFIX_PATTERN = "Code: %d. DB::Exception:";
private static final String ERROR_CODE_PREFIX_PATTERN = "%d. DB::Exception:";


/**
* Reads status line and if error tries to parse response body to get server error message.
*
* @param httpResponse - HTTP response
* @return exception object with server code
*/
public Exception readError(ClassicHttpResponse httpResponse) {
final Header qIdHeader = httpResponse.getFirstHeader(ClickHouseHttpProto.HEADER_QUERY_ID);
final String queryId = qIdHeader == null ? "" : qIdHeader.getValue();
int serverCode = getHeaderInt(httpResponse.getFirstHeader(ClickHouseHttpProto.HEADER_EXCEPTION_CODE), 0);
InputStream body = null;
try {
Expand Down Expand Up @@ -400,10 +404,11 @@ public Exception readError(ClassicHttpResponse httpResponse) {
if (msg.trim().isEmpty()) {
msg = String.format(ERROR_CODE_PREFIX_PATTERN, serverCode) + " <Unreadable error message> (transport error: " + httpResponse.getCode() + ")";
}
return new ServerException(serverCode, "Code: " + msg, httpResponse.getCode());
return new ServerException(serverCode, "Code: " + msg + " (queryId= " + queryId + ")", httpResponse.getCode(), queryId);
} catch (Exception e) {
LOG.error("Failed to read error message", e);
return new ServerException(serverCode, String.format(ERROR_CODE_PREFIX_PATTERN, serverCode) + " <Unreadable error message> (transport error: " + httpResponse.getCode() + ")", httpResponse.getCode());
String msg = String.format(ERROR_CODE_PREFIX_PATTERN, serverCode) + " <Unreadable error message> (transport error: " + httpResponse.getCode() + ")";
return new ServerException(serverCode, msg + " (queryId= " + queryId + ")", httpResponse.getCode(), queryId);
}
}

Expand Down Expand Up @@ -731,7 +736,7 @@ public boolean shouldRetry(Throwable ex, Map<String, Object> requestSettings) {

// This method wraps some client specific exceptions into specific ClientException or just ClientException
// ClientException will be also wrapped
public RuntimeException wrapException(String message, Exception cause) {
public RuntimeException wrapException(String message, Exception cause, String queryId) {
if (cause instanceof ClientException || cause instanceof ServerException) {
return (RuntimeException) cause;
}
Expand All @@ -742,14 +747,18 @@ public RuntimeException wrapException(String message, Exception cause) {
cause instanceof ConnectException ||
cause instanceof UnknownHostException ||
cause instanceof NoRouteToHostException) {
return new ConnectionInitiationException(message, cause);
ConnectionInitiationException ex = new ConnectionInitiationException(message, cause);
ex.setQueryId(queryId);
return ex;
}

if (cause instanceof SocketTimeoutException || cause instanceof IOException) {
return new DataTransferException(message, cause);
DataTransferException ex = new DataTransferException(message, cause);
ex.setQueryId(queryId);
return ex;
}
// if we can not identify the exception explicitly we catch as our base exception ClickHouseException
return new ClickHouseException(message, cause);
return new ClickHouseException(message, cause, queryId);
}

private void correctUserAgentHeader(HttpRequest request, Map<String, Object> requestConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,14 @@ public long getResultRows() {
return response.getMetrics().getMetric(ServerMetrics.RESULT_ROWS).getLong();
}

/**
* Returns response query id
* @return query id of the request
*/
public String getQueryId() {
return response.getQueryId();
}

@Override
public void close() throws Exception {
response.close();
Expand Down
Loading
Loading