diff --git a/pom.xml b/pom.xml
index cc60063..1cbb3f1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,7 +30,6 @@
url-shortener-demojdbcproject-course
- sloslo-workload
diff --git a/slo-workload/README.md b/slo-workload/README.md
index 7b874bf..4397789 100644
--- a/slo-workload/README.md
+++ b/slo-workload/README.md
@@ -5,14 +5,20 @@ reliability of YDB Java clients under load and chaos using the
[YDB SLO action](https://github.com/ydb-platform/ydb-slo-action).
Each submodule is a self-contained, runnable workload that follows the same
-contract as the SDK SLO workload in [`../slo`](../slo): it reads its
-configuration from environment variables, runs setup/run/teardown phases, and
-pushes OpenTelemetry (OTLP) metrics that the action scrapes and compares
-between the current PR run and a baseline run.
+contract: it reads its configuration from environment variables, runs
+setup/run/teardown phases, and pushes OpenTelemetry (OTLP) metrics that the
+action scrapes and compares between the current PR run and a baseline run.
+
+Shared harness code lives in [`core`](core) (`Config`, `Metrics`, KV row
+model, rate-limited runner). Every workload plugs a `KvClient` adapter into
+that runner so all of them emit the same metric contract.
| Module | Component under test | Description |
| --- | --- | --- |
+| [`query`](query) | `ydb-java-sdk` (query client) | Native SDK KV workload |
| [`jdbc`](jdbc) | `ydb-jdbc-driver` | Plain JDBC KV workload (no framework) |
+| [`spring-data-jdbc`](spring-data-jdbc) | `ydb-jdbc-driver` + `spring-data-jdbc-ydb` + `spring-ydb-retry` | Spring Data JDBC KV workload |
+| [`spring-data-jpa`](spring-data-jpa) | `ydb-jdbc-driver` + Hibernate 6 + `spring-ydb-retry` | Spring Data JPA KV workload |
## How a workload behaves
@@ -76,11 +82,27 @@ KV tunables are passed on the command line and parsed by JCommander:
--partition-size Auto-partitioning partition size in MB (default 1)
--min-partition-count Minimum number of table partitions (default 6)
--max-partition-count Maximum number of table partitions (default 1000)
---duration Override WORKLOAD_DURATION when > 0
+--duration / --time Override WORKLOAD_DURATION when > 0
+--shutdown-time Extra grace seconds for in-flight ops on shutdown (default 30)
+--max-attempts Per-operation attempt cap, initial + retries (default 10)
+--max-workers Hard cap on workers per operation type (default 64)
```
-Unknown flags are ignored, so a workload accepts command strings designed for
-other SDKs without erroring.
+Unknown flags are rejected — a typo in the ydb-slo-action invocation should
+fail loudly rather than silently fall back to defaults.
+
+The Spring-backed workloads expose one more knob via the `SLO_HIKARI_POOL_SIZE`
+environment variable (default `130`, sized for `2 × max-workers` plus headroom).
+Raise it together with `--max-workers` or the workload measures Hikari
+contention rather than the JDBC driver.
+
+### Cross-implementation comparability
+
+Every implementation derives the primary-key `hash` column from `id` with the
+same client-side mix (`RowGenerator.numericHash`). A table written by the
+`query` workload is therefore byte-compatible with the `jdbc` and Spring-Data
+workloads — useful when one prefills the table and another reads from it
+during cross-driver experiments.
## How CI uses this module
diff --git a/slo/pom.xml b/slo-workload/core/pom.xml
similarity index 51%
rename from slo/pom.xml
rename to slo-workload/core/pom.xml
index 8c3f49c..fb3ddf5 100644
--- a/slo/pom.xml
+++ b/slo-workload/core/pom.xml
@@ -6,83 +6,66 @@
tech.ydb.examples
- ydb-sdk-examples
+ slo-workload1.1.0-SNAPSHOT
+ ../pom.xml
- ydb-slo-workload
- YDB SLO workload
- SLO workload application for testing YDB Java SDK reliability under load and chaos
-
-
- 1.82
- 1.59.0
- 2.2.2
-
+ slo-workload-core
+ YDB SLO workload core
+
+ Driver-agnostic core of the YDB SLO workloads: OTLP metrics, env config,
+ the KV row model and the load-generating runner. Every concrete workload
+ (native query client, plain JDBC, Spring Data) plugs a KvClient into this
+ runner so all of them emit the exact same metric contract.
+
-
- tech.ydb
- ydb-sdk-query
-
-
com.beustjcommander
- ${jcommander.version}org.hdrhistogramHdrHistogram
- ${hdrhistogram.version}
+
+
+
+ com.google.guava
+ guavaio.opentelemetryopentelemetry-api
- ${opentelemetry.version}io.opentelemetryopentelemetry-sdk
- ${opentelemetry.version}io.opentelemetryopentelemetry-sdk-metrics
- ${opentelemetry.version}io.opentelemetryopentelemetry-exporter-otlp
- ${opentelemetry.version}
- org.apache.logging.log4j
- log4j-slf4j2-impl
+ org.slf4j
+ slf4j-api
- ydb-slo-workloadorg.apache.maven.plugins
- maven-dependency-plugin
-
-
- org.apache.maven.plugins
- maven-jar-plugin
+ maven-compiler-plugin
-
-
- true
- libs/
- tech.ydb.slo.Main
-
-
+ ${maven.compiler.release}
diff --git a/slo/src/main/java/tech/ydb/slo/Config.java b/slo-workload/core/src/main/java/tech/ydb/slo/core/Config.java
similarity index 65%
rename from slo/src/main/java/tech/ydb/slo/Config.java
rename to slo-workload/core/src/main/java/tech/ydb/slo/core/Config.java
index 2efdc7e..7fbaa94 100644
--- a/slo/src/main/java/tech/ydb/slo/Config.java
+++ b/slo-workload/core/src/main/java/tech/ydb/slo/core/Config.java
@@ -1,20 +1,8 @@
-package tech.ydb.slo;
-
-/**
- * Configuration for the SLO workload, populated from environment variables
- * provided by the YDB SLO action runtime.
- *
- *
The action sets these variables on the workload container:
- *
Every metric carries the {@code ref} label so the report action can
- * separate current and baseline series.
- */
public final class Metrics implements AutoCloseable {
public enum OperationType {
@@ -83,7 +60,7 @@ public String label() {
private static final AttributeKey ATTR_REF =
AttributeKey.stringKey("ref");
- // HDR histograms record latencies in microseconds with high precision up to 60 s.
+
private static final long HDR_MIN_MICROS = 1L;
private static final long HDR_MAX_MICROS = 60L * 1_000_000L;
private static final int HDR_SIGNIFICANT_DIGITS = 3;
@@ -95,7 +72,7 @@ public String label() {
private final LongCounter retryAttemptsTotal;
private final LongUpDownCounter pendingOperations;
- private final Map histograms = new ConcurrentHashMap<>();
+ private final Map histograms;
private Metrics(
SdkMeterProvider meterProvider,
@@ -103,7 +80,8 @@ private Metrics(
LongCounter operationsTotal,
LongCounter errorsTotal,
LongCounter retryAttemptsTotal,
- LongUpDownCounter pendingOperations
+ LongUpDownCounter pendingOperations,
+ Map histograms
) {
this.meterProvider = meterProvider;
this.ref = ref;
@@ -111,14 +89,11 @@ private Metrics(
this.errorsTotal = errorsTotal;
this.retryAttemptsTotal = retryAttemptsTotal;
this.pendingOperations = pendingOperations;
+ this.histograms = histograms;
}
- /*
- * Builds a {@code Metrics} instance configured to push OTLP metrics every
- * second to the endpoint from {@code config.otlpEndpoint()}. If the
- * endpoint is empty, all metrics are still observable in-process but never
- * exported.
- */
+
+
public static Metrics create(Config config) {
String ref = config.ref();
@@ -167,26 +142,26 @@ public static Metrics create(Config config) {
Map histograms = new ConcurrentHashMap<>();
- // Pre-create one histogram per operation_type so the first export
- // already produces gauge series. We only track successful operations:
- // failure latency is dominated by retry budgets / timeouts and would
- // skew the percentiles without telling us anything useful about SDK
- // performance. The SLO action's metrics.yaml filters by
- // operation_status="success" anyway.
+
+
+
+
+
+
for (OperationType type : OperationType.values()) {
histograms.put(type, newHistogram());
}
- // Build the three percentile gauges as raw observers — their values
- // are produced by a single batch callback below, which reads
- // p50/p95/p99 from the same histogram snapshot and then resets the
- // histogram. Reading all three percentiles from one snapshot avoids
- // races where p99 could be observed against a freshly-reset histogram
- // populated by p50, and resetting after each export means the gauge
- // reflects only latencies recorded during the last export interval —
- // matching the JS SDK's behaviour and avoiding cold-start tail drag
- // on the JVM (without reset, JIT-warmup outliers stick to p99 for
- // the rest of the run).
+
+
+
+
+
+
+
+
+
+
ObservableDoubleMeasurement p50Observer = meter.gaugeBuilder("sdk.operation.latency.p50.seconds")
.setUnit("s")
.setDescription("p50 operation latency in seconds")
@@ -207,23 +182,22 @@ public static Metrics create(Config config) {
p50Observer, p95Observer, p99Observer
);
- Metrics metrics = new Metrics(
+ return new Metrics(
provider,
ref,
operationsTotal,
errorsTotal,
retryAttemptsTotal,
- pendingOperations
+ pendingOperations,
+ histograms
);
- metrics.histograms.putAll(histograms);
- return metrics;
}
private static String metricsEndpoint(String otlpEndpoint) {
- // OTLP HTTP exporter expects the full /v1/metrics path. The SLO action
- // sets OTEL_EXPORTER_OTLP_ENDPOINT to the base URL (e.g.
- // http://ydb-prometheus:9090/api/v1/otlp), so we append the suffix
- // unless the user has already provided it.
+
+
+
+
String trimmed = otlpEndpoint.endsWith("/")
? otlpEndpoint.substring(0, otlpEndpoint.length() - 1)
: otlpEndpoint;
@@ -233,10 +207,8 @@ private static String metricsEndpoint(String otlpEndpoint) {
return trimmed + "/v1/metrics";
}
- /*
- * Records a started operation and returns a span used to record the
- * outcome.
- */
+
+
public Span startOperation(OperationType type) {
pendingOperations.add(1, Attributes.of(
ATTR_REF, ref,
@@ -245,10 +217,8 @@ public Span startOperation(OperationType type) {
return new Span(this, type, System.nanoTime());
}
- /**
- * Forces a final flush of pending metrics. Should be called before exit
- * to make sure the report action sees the last seconds of data.
- */
+
+
public void flush() {
meterProvider.forceFlush().join(10, TimeUnit.SECONDS);
}
@@ -278,12 +248,12 @@ private void recordOutcome(
ATTR_OPERATION_TYPE, type.label()
));
- // Latency is recorded only for successful operations. Failed
- // operations spend most of their time inside the retry budget /
- // timeout machinery, so their latency reflects the retry policy
- // rather than the SDK's performance. Mixing those samples into the
- // percentile gauges produces noisy spikes during chaos scenarios
- // and tells us nothing actionable.
+
+
+
+
+
+
if (status == OperationStatus.SUCCESS) {
Histogram histogram = histograms.computeIfAbsent(type, k -> newHistogram());
long clamped = Math.max(HDR_MIN_MICROS, Math.min(HDR_MAX_MICROS, latencyMicros));
@@ -297,13 +267,8 @@ private void recordOutcome(
}
}
- /**
- * Observes p50/p95/p99 for every populated histogram in one go and then
- * resets the histogram. Called from a single OTel batch callback so all
- * three percentiles are read from a consistent snapshot — without that,
- * a concurrent record could land between the p50 and p99 reads and
- * produce inconsistent values across gauges.
- */
+
+
private static void observeAndResetPercentiles(
Map histograms,
String ref,
@@ -313,23 +278,21 @@ private static void observeAndResetPercentiles(
) {
for (Map.Entry entry : histograms.entrySet()) {
OperationType type = entry.getKey();
- Histogram histogram = entry.getValue();
+ Histogram live = entry.getValue();
- long p50Micros;
- long p95Micros;
- long p99Micros;
- if (histogram.getTotalCount() == 0) {
+ Histogram snapshot = live.copy();
+ live.reset();
+ if (snapshot.getTotalCount() == 0) {
continue;
}
- p50Micros = histogram.getValueAtPercentile(50.0);
- p95Micros = histogram.getValueAtPercentile(95.0);
- p99Micros = histogram.getValueAtPercentile(99.0);
- histogram.reset();
-
- // Percentile gauges are always tagged with operation_status="success"
- // because we only record successful samples (see recordOutcome).
- // The SLO action's metrics.yaml filters on this same label, so the
- // gauges line up with what the report expects.
+ long p50Micros = snapshot.getValueAtPercentile(50.0);
+ long p95Micros = snapshot.getValueAtPercentile(95.0);
+ long p99Micros = snapshot.getValueAtPercentile(99.0);
+
+
+
+
+
Attributes attrs = Attributes.of(
ATTR_REF, ref,
ATTR_OPERATION_TYPE, type.label(),
@@ -345,9 +308,6 @@ private static Histogram newHistogram() {
return new AtomicHistogram(HDR_MIN_MICROS, HDR_MAX_MICROS, HDR_SIGNIFICANT_DIGITS);
}
- /**
- * One in-flight operation. Call exactly one of the {@code finish} methods.
- */
public static final class Span {
private final Metrics metrics;
private final OperationType type;
diff --git a/slo-workload/core/src/main/java/tech/ydb/slo/core/kv/KvClient.java b/slo-workload/core/src/main/java/tech/ydb/slo/core/kv/KvClient.java
new file mode 100644
index 0000000..25371f6
--- /dev/null
+++ b/slo-workload/core/src/main/java/tech/ydb/slo/core/kv/KvClient.java
@@ -0,0 +1,21 @@
+package tech.ydb.slo.core.kv;
+
+public interface KvClient extends AutoCloseable {
+
+
+
+ void createTable(KvWorkloadParams params, String tablePath) throws Exception;
+
+
+
+ void dropTable(String tablePath);
+
+
+
+ KvSession openSession() throws Exception;
+
+ @Override
+ default void close() throws Exception {
+
+ }
+}
diff --git a/slo-workload/core/src/main/java/tech/ydb/slo/core/kv/KvSchema.java b/slo-workload/core/src/main/java/tech/ydb/slo/core/kv/KvSchema.java
new file mode 100644
index 0000000..b3e6eae
--- /dev/null
+++ b/slo-workload/core/src/main/java/tech/ydb/slo/core/kv/KvSchema.java
@@ -0,0 +1,36 @@
+package tech.ydb.slo.core.kv;
+
+public final class KvSchema {
+
+ public static final String CREATE_TABLE_TEMPLATE = ""
+ + "CREATE TABLE IF NOT EXISTS `%s` ("
+ + " hash Uint64,"
+ + " id Uint64,"
+ + " payload_str Utf8,"
+ + " payload_double Double,"
+ + " payload_timestamp Timestamp,"
+ + " payload_hash Uint64,"
+ + " PRIMARY KEY (hash, id)"
+ + ") WITH ("
+ + " UNIFORM_PARTITIONS = %d,"
+ + " AUTO_PARTITIONING_BY_SIZE = ENABLED,"
+ + " AUTO_PARTITIONING_PARTITION_SIZE_MB = %d,"
+ + " AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = %d,"
+ + " AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = %d"
+ + ")";
+
+ public static final String DROP_TABLE_TEMPLATE = "DROP TABLE IF EXISTS `%s`";
+
+ public static final String UPSERT_TEMPLATE = ""
+ + "UPSERT INTO `%s` ("
+ + " hash, id, payload_str, payload_double, payload_timestamp, payload_hash"
+ + ") VALUES (?, ?, ?, ?, ?, ?)";
+
+ public static final String SELECT_TEMPLATE = ""
+ + "SELECT id, payload_str, payload_double, payload_timestamp, payload_hash"
+ + " FROM `%s`"
+ + " WHERE id = ? AND hash = ?";
+
+ private KvSchema() {
+ }
+}
diff --git a/slo-workload/core/src/main/java/tech/ydb/slo/core/kv/KvSession.java b/slo-workload/core/src/main/java/tech/ydb/slo/core/kv/KvSession.java
new file mode 100644
index 0000000..4c7dab3
--- /dev/null
+++ b/slo-workload/core/src/main/java/tech/ydb/slo/core/kv/KvSession.java
@@ -0,0 +1,17 @@
+package tech.ydb.slo.core.kv;
+
+public interface KvSession extends AutoCloseable {
+
+
+
+ OpOutcome read(long id, int timeoutMs);
+
+
+
+ OpOutcome write(Row row, int timeoutMs);
+
+ @Override
+ default void close() {
+
+ }
+}
diff --git a/slo/src/main/java/tech/ydb/slo/kv/KvWorkloadParams.java b/slo-workload/core/src/main/java/tech/ydb/slo/core/kv/KvWorkloadParams.java
similarity index 73%
rename from slo/src/main/java/tech/ydb/slo/kv/KvWorkloadParams.java
rename to slo-workload/core/src/main/java/tech/ydb/slo/core/kv/KvWorkloadParams.java
index 9adf6e7..2fbe33c 100644
--- a/slo/src/main/java/tech/ydb/slo/kv/KvWorkloadParams.java
+++ b/slo-workload/core/src/main/java/tech/ydb/slo/core/kv/KvWorkloadParams.java
@@ -1,15 +1,7 @@
-package tech.ydb.slo.kv;
+package tech.ydb.slo.core.kv;
import com.beust.jcommander.Parameter;
-/**
- * Tunable parameters for the KV workload.
- *
- *
Defaults match the SLO workloads in the Go and JavaScript SDKs so the
- * three runs are comparable. JCommander annotations let the operator override
- * any field from the command line, e.g.
- * {@code --read-rps 500 --write-rps 50}.
- */
@SuppressWarnings("FieldMayBeFinal")
public final class KvWorkloadParams {
@@ -62,11 +54,29 @@ public final class KvWorkloadParams {
private int maxPartitionCount = 1_000;
@Parameter(
- names = {"--duration"},
+ names = {"--duration", "--time"},
description = "Run duration in seconds (overrides WORKLOAD_DURATION when > 0)"
)
private int durationSeconds = 0;
+ @Parameter(
+ names = {"--shutdown-time"},
+ description = "Extra seconds, on top of --duration, given to in-flight ops before force-shutdown"
+ )
+ private int shutdownTimeSeconds = 30;
+
+ @Parameter(
+ names = {"--max-attempts"},
+ description = "Maximum total attempts per operation (initial + retries)"
+ )
+ private int maxAttempts = 10;
+
+ @Parameter(
+ names = {"--max-workers"},
+ description = "Hard cap on the number of worker threads per operation type"
+ )
+ private int maxWorkers = 64;
+
public int readRps() {
return readRps;
}
@@ -99,12 +109,8 @@ public int maxPartitionCount() {
return maxPartitionCount;
}
- /**
- * Effective run duration. If the CLI flag was omitted (left at 0), falls
- * back to the value supplied via the {@code WORKLOAD_DURATION} environment
- * variable through {@code Config}.
- * @return Effective run duration value
- */
+
+
public int durationSeconds() {
return durationSeconds;
}
@@ -112,4 +118,16 @@ public int durationSeconds() {
public void setDurationSeconds(int durationSeconds) {
this.durationSeconds = durationSeconds;
}
+
+ public int shutdownTimeSeconds() {
+ return shutdownTimeSeconds;
+ }
+
+ public int maxAttempts() {
+ return maxAttempts;
+ }
+
+ public int maxWorkers() {
+ return maxWorkers;
+ }
}
diff --git a/slo-workload/core/src/main/java/tech/ydb/slo/core/kv/OpOutcome.java b/slo-workload/core/src/main/java/tech/ydb/slo/core/kv/OpOutcome.java
new file mode 100644
index 0000000..34bda8a
--- /dev/null
+++ b/slo-workload/core/src/main/java/tech/ydb/slo/core/kv/OpOutcome.java
@@ -0,0 +1,37 @@
+package tech.ydb.slo.core.kv;
+
+public final class OpOutcome {
+ private final boolean success;
+ private final int retryAttempts;
+ private final String errorKind;
+
+ private OpOutcome(boolean success, int retryAttempts, String errorKind) {
+ this.success = success;
+ this.retryAttempts = Math.max(0, retryAttempts);
+ this.errorKind = errorKind;
+ }
+
+
+
+ public static OpOutcome success(int retryAttempts) {
+ return new OpOutcome(true, retryAttempts, null);
+ }
+
+
+
+ public static OpOutcome error(int retryAttempts, String errorKind) {
+ return new OpOutcome(false, retryAttempts, errorKind);
+ }
+
+ public boolean isSuccess() {
+ return success;
+ }
+
+ public int retryAttempts() {
+ return retryAttempts;
+ }
+
+ public String errorKind() {
+ return errorKind;
+ }
+}
diff --git a/slo/src/main/java/tech/ydb/slo/kv/Row.java b/slo-workload/core/src/main/java/tech/ydb/slo/core/kv/Row.java
similarity index 60%
rename from slo/src/main/java/tech/ydb/slo/kv/Row.java
rename to slo-workload/core/src/main/java/tech/ydb/slo/core/kv/Row.java
index 9f1c292..1d10f08 100644
--- a/slo/src/main/java/tech/ydb/slo/kv/Row.java
+++ b/slo-workload/core/src/main/java/tech/ydb/slo/core/kv/Row.java
@@ -1,24 +1,7 @@
-package tech.ydb.slo.kv;
+package tech.ydb.slo.core.kv;
import java.time.Instant;
-/**
- * A single row of the KV workload table.
- *
- *
The schema mirrors the one used by SLO workloads in other YDB SDKs
- * (Go, JavaScript) so reports across SDKs are comparable:
- *
The {@code hash} column is computed by YDB at insert time via
- * {@code Digest::NumericHash($id)}, so we don't carry it on the client.
- */
public final class Row {
private final long id;
private final String payloadStr;
diff --git a/slo-workload/jdbc/src/main/java/tech/ydb/slo/kv/RowGenerator.java b/slo-workload/core/src/main/java/tech/ydb/slo/core/kv/RowGenerator.java
similarity index 71%
rename from slo-workload/jdbc/src/main/java/tech/ydb/slo/kv/RowGenerator.java
rename to slo-workload/core/src/main/java/tech/ydb/slo/core/kv/RowGenerator.java
index 04fdb9d..60765b0 100644
--- a/slo-workload/jdbc/src/main/java/tech/ydb/slo/kv/RowGenerator.java
+++ b/slo-workload/core/src/main/java/tech/ydb/slo/core/kv/RowGenerator.java
@@ -1,17 +1,10 @@
-package tech.ydb.slo.kv;
+package tech.ydb.slo.core.kv;
import java.time.Instant;
import java.util.Base64;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
-/**
- * Generates rows for the KV workload.
- *
- *
Each row gets a monotonically increasing {@code id} and a random payload.
- * The format mirrors the SLO workloads in the Go and JS SDKs so the resulting
- * tables are comparable.
- */
public final class RowGenerator {
private static final int MIN_PAYLOAD_LENGTH = 20;
private static final int MAX_PAYLOAD_LENGTH = 40;
@@ -22,20 +15,15 @@ public RowGenerator(long startId) {
this.nextId = new AtomicLong(startId);
}
- /**
- * Generates a new row with a fresh monotonically increasing id.
- * @return a new row
- */
+
+
public Row generate() {
long id = nextId.getAndIncrement();
return generate(id);
}
- /**
- * Generates a row with an explicit id (used during prefill to control IDs).
- * @param id row id
- * @return a new row
- */
+
+
public static Row generate(long id) {
long payloadHash = ThreadLocalRandom.current().nextLong();
double payloadDouble = ThreadLocalRandom.current().nextDouble();
@@ -52,4 +40,13 @@ private static String randomPayloadString() {
ThreadLocalRandom.current().nextBytes(bytes);
return Base64.getEncoder().withoutPadding().encodeToString(bytes);
}
+
+
+
+ public static long numericHash(long id) {
+ long z = id + 0x9E3779B97F4A7C15L;
+ z = (z ^ (z >>> 30)) * 0xBF58476D1CE4E5B9L;
+ z = (z ^ (z >>> 27)) * 0x94D049BB133111EBL;
+ return z ^ (z >>> 31);
+ }
}
diff --git a/slo-workload/core/src/main/java/tech/ydb/slo/core/kv/WorkloadRunner.java b/slo-workload/core/src/main/java/tech/ydb/slo/core/kv/WorkloadRunner.java
new file mode 100644
index 0000000..b3a5c69
--- /dev/null
+++ b/slo-workload/core/src/main/java/tech/ydb/slo/core/kv/WorkloadRunner.java
@@ -0,0 +1,309 @@
+package tech.ydb.slo.core.kv;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.util.concurrent.RateLimiter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import tech.ydb.slo.core.Metrics;
+
+public final class WorkloadRunner {
+ private static final Logger logger = LoggerFactory.getLogger(WorkloadRunner.class);
+
+
+
+ private static final double PREFILL_SUCCESS_THRESHOLD = 0.5;
+
+ private final KvClient client;
+ private final Metrics metrics;
+ private final KvWorkloadParams params;
+ private final String tablePath;
+ private final RowGenerator generator;
+
+ public WorkloadRunner(KvClient client, Metrics metrics, KvWorkloadParams params, String tablePath) {
+ this.client = client;
+ this.metrics = metrics;
+ this.params = params;
+ this.tablePath = tablePath;
+ this.generator = new RowGenerator(params.prefillCount());
+ }
+
+
+
+ public void setup() throws Exception {
+ logger.info("creating table {}", tablePath);
+ client.createTable(params, tablePath);
+ logger.info("table {} created", tablePath);
+
+ if (params.prefillCount() <= 0) {
+ logger.info("prefill count <= 0, skipping prefill");
+ return;
+ }
+
+ logger.info("prefilling {} rows into {}", params.prefillCount(), tablePath);
+ int parallelism = Math.min(params.maxWorkers(), Math.max(1, params.prefillCount()));
+ ExecutorService prefillPool = Executors.newFixedThreadPool(
+ parallelism, namedThreadFactory("slo-prefill-")
+ );
+ AtomicLong nextId = new AtomicLong(0);
+ AtomicInteger failed = new AtomicInteger();
+ AtomicInteger sessionOpenFailures = new AtomicInteger();
+ try {
+ for (int w = 0; w < parallelism; w++) {
+ prefillPool.execute(() -> {
+ try (KvSession session = client.openSession()) {
+ long id;
+ while ((id = nextId.getAndIncrement()) < params.prefillCount()) {
+ OpOutcome outcome = session.write(
+ RowGenerator.generate(id), params.writeTimeoutMs());
+ if (!outcome.isSuccess()) {
+ int f = failed.incrementAndGet();
+ if (f <= 5) {
+ logger.warn("prefill row {} failed: {}", id, outcome.errorKind());
+ }
+ }
+ }
+ } catch (Exception e) {
+
+
+
+
+
+ sessionOpenFailures.incrementAndGet();
+ long firstUnclaimed = nextId.getAndSet(params.prefillCount());
+ if (firstUnclaimed < params.prefillCount()) {
+ failed.addAndGet((int) (params.prefillCount() - firstUnclaimed));
+ }
+ logger.error("prefill worker failed to open session: {}", e.toString());
+ }
+ });
+ }
+ } finally {
+ prefillPool.shutdown();
+ if (!prefillPool.awaitTermination(5, TimeUnit.MINUTES)) {
+ prefillPool.shutdownNow();
+ }
+ }
+
+ int total = params.prefillCount();
+ int failedCount = failed.get();
+ int succeeded = total - failedCount;
+ if (sessionOpenFailures.get() == parallelism) {
+ throw new IllegalStateException(
+ "all " + parallelism + " prefill workers failed to open a session — "
+ + "check YDB connectivity and credentials"
+ );
+ }
+ if (succeeded < total * PREFILL_SUCCESS_THRESHOLD) {
+ throw new IllegalStateException(
+ "prefill completed with " + failedCount + " failed rows out of " + total
+ + " (success rate < " + (int) (PREFILL_SUCCESS_THRESHOLD * 100)
+ + "%); reads would target an empty key-space, refusing to run"
+ );
+ }
+ if (failedCount > 0) {
+ logger.warn("prefill completed with {} failed rows out of {}", failedCount, total);
+ } else {
+ logger.info("prefill completed");
+ }
+ }
+
+
+
+ public void run() throws InterruptedException {
+ long durationSeconds = params.durationSeconds();
+ long endNanos = durationSeconds > 0
+ ? System.nanoTime() + TimeUnit.SECONDS.toNanos(durationSeconds)
+ : Long.MAX_VALUE;
+
+
+
+
+ AtomicLong writesIssued = new AtomicLong();
+
+ int readWorkers = workerCount(params.readRps());
+ int writeWorkers = workerCount(params.writeRps());
+
+ if (readWorkers == 0 && writeWorkers == 0) {
+ logger.warn("both read and write RPS are <= 0, run phase has nothing to do");
+ return;
+ }
+
+ ExecutorService readPool = null;
+ ExecutorService writePool = null;
+ try {
+ if (readWorkers > 0) {
+ readPool = Executors.newFixedThreadPool(readWorkers, namedThreadFactory("slo-read-"));
+ RateLimiter readLimiter = RateLimiter.create(params.readRps());
+ for (int i = 0; i < readWorkers; i++) {
+ readPool.execute(() -> readWorkerLoop(endNanos, readLimiter, writesIssued));
+ }
+ } else {
+ logger.info("read RPS <= 0, skipping read workers");
+ }
+
+ if (writeWorkers > 0) {
+ writePool = Executors.newFixedThreadPool(writeWorkers, namedThreadFactory("slo-write-"));
+ RateLimiter writeLimiter = RateLimiter.create(params.writeRps());
+ for (int i = 0; i < writeWorkers; i++) {
+ writePool.execute(() -> writeWorkerLoop(endNanos, writeLimiter, writesIssued));
+ }
+ } else {
+ logger.info("write RPS <= 0, skipping write workers");
+ }
+
+
+ long graceNanos = TimeUnit.SECONDS.toNanos(params.shutdownTimeSeconds());
+ long waitNanos = durationSeconds > 0
+ ? Math.max(0L, endNanos - System.nanoTime()) + graceNanos
+ : Long.MAX_VALUE;
+
+ if (readPool != null) {
+ readPool.shutdown();
+ }
+ if (writePool != null) {
+ writePool.shutdown();
+ }
+
+ if (readPool != null) {
+ long started = System.nanoTime();
+ if (!readPool.awaitTermination(waitNanos, TimeUnit.NANOSECONDS)) {
+ logger.warn("read pool did not drain within deadline, forcing shutdown");
+ readPool.shutdownNow();
+ }
+ waitNanos = Math.max(0L, waitNanos - (System.nanoTime() - started));
+ }
+ if (writePool != null) {
+ if (!writePool.awaitTermination(waitNanos, TimeUnit.NANOSECONDS)) {
+ logger.warn("write pool did not drain within deadline, forcing shutdown");
+ writePool.shutdownNow();
+ }
+ }
+ } finally {
+ forceShutdown(readPool, "read pool");
+ forceShutdown(writePool, "write pool");
+ }
+ }
+
+
+
+ public void teardown() {
+ logger.info("dropping table {}", tablePath);
+ client.dropTable(tablePath);
+ }
+
+
+
+ private void readWorkerLoop(long endNanos, RateLimiter limiter, AtomicLong writesIssued) {
+ try (KvSession session = client.openSession()) {
+ while (!Thread.currentThread().isInterrupted()) {
+ long remaining = endNanos - System.nanoTime();
+ if (remaining <= 0) {
+ return;
+ }
+
+
+
+ if (!limiter.tryAcquire(remaining, TimeUnit.NANOSECONDS)) {
+ return;
+ }
+ try {
+ readOnce(session, writesIssued.get());
+ } catch (Throwable t) {
+ logger.warn("read op threw unexpectedly: {}", t.toString());
+ }
+ }
+ } catch (Exception e) {
+ logger.warn("read worker failed to open session: {}", e.toString());
+ }
+ }
+
+ private void writeWorkerLoop(long endNanos, RateLimiter limiter, AtomicLong writesIssued) {
+ try (KvSession session = client.openSession()) {
+ while (!Thread.currentThread().isInterrupted()) {
+ long remaining = endNanos - System.nanoTime();
+ if (remaining <= 0) {
+ return;
+ }
+ if (!limiter.tryAcquire(remaining, TimeUnit.NANOSECONDS)) {
+ return;
+ }
+ try {
+ writeOnce(session, generator.generate());
+ writesIssued.incrementAndGet();
+ } catch (Throwable t) {
+ logger.warn("write op threw unexpectedly: {}", t.toString());
+ }
+ }
+ } catch (Exception e) {
+ logger.warn("write worker failed to open session: {}", e.toString());
+ }
+ }
+
+
+
+ private void readOnce(KvSession session, long writesObserved) {
+ long upperBound = Math.max(1L, params.prefillCount() + writesObserved);
+ long id = ThreadLocalRandom.current().nextLong(upperBound);
+
+ Metrics.Span span = metrics.startOperation(Metrics.OperationType.READ);
+ OpOutcome outcome = session.read(id, params.readTimeoutMs());
+ if (outcome.isSuccess()) {
+ span.finishSuccess(outcome.retryAttempts());
+ } else {
+ span.finishError(outcome.retryAttempts(), outcome.errorKind());
+ logger.debug("read {} failed: {}", id, outcome.errorKind());
+ }
+ }
+
+ private void writeOnce(KvSession session, Row row) {
+ Metrics.Span span = metrics.startOperation(Metrics.OperationType.WRITE);
+ OpOutcome outcome = session.write(row, params.writeTimeoutMs());
+ if (outcome.isSuccess()) {
+ span.finishSuccess(outcome.retryAttempts());
+ } else {
+ span.finishError(outcome.retryAttempts(), outcome.errorKind());
+ logger.debug("write {} failed: {}", row.id(), outcome.errorKind());
+ }
+ }
+
+
+
+ private int workerCount(int rps) {
+ if (rps <= 0) {
+ return 0;
+ }
+ return Math.min(params.maxWorkers(), Math.max(1, rps));
+ }
+
+ private static ThreadFactory namedThreadFactory(String prefix) {
+ AtomicInteger counter = new AtomicInteger();
+ return r -> {
+ Thread t = new Thread(r, prefix + counter.getAndIncrement());
+ t.setDaemon(true);
+ return t;
+ };
+ }
+
+ private static void forceShutdown(ExecutorService pool, String name) {
+ if (pool == null || pool.isTerminated()) {
+ return;
+ }
+ logger.warn("{} still active in cleanup, forcing shutdown", name);
+ pool.shutdownNow();
+ try {
+ if (!pool.awaitTermination(5, TimeUnit.SECONDS)) {
+ logger.warn("{} did not terminate after shutdownNow", name);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+}
diff --git a/slo-workload/jdbc/Dockerfile b/slo-workload/jdbc/Dockerfile
index 943be48..1331818 100644
--- a/slo-workload/jdbc/Dockerfile
+++ b/slo-workload/jdbc/Dockerfile
@@ -5,7 +5,9 @@
# and run parameters from environment variables and pushes OTLP metrics to the
# endpoint the action provides.
#
-# Build context: the `ydb-java-examples` repository root.
+# Build context: the `ydb-java-examples` repository root. For ydb-jdbc-driver
+# CI the context may also contain `./ydb-jdbc-driver`; when present, the driver
+# is installed from source and the workload is pinned to that exact version.
#
# Optional build args:
# MAVEN_IMAGE Builder image. Defaults to `maven:3.9-eclipse-temurin-17`.
@@ -23,9 +25,26 @@ COPY . /src
ARG YDB_JDBC_VERSION=""
-# Pin the JDBC driver version under test when provided, then build only the
-# workload module (and the parent context it needs).
-RUN if [ -n "${YDB_JDBC_VERSION}" ]; then \
+# Install the JDBC driver from source when the driver checkout is present in
+# the build context. Otherwise use YDB_JDBC_VERSION when provided.
+RUN if [ -d /src/ydb-jdbc-driver ]; then \
+ cd /src/ydb-jdbc-driver && \
+ mvn -B -q \
+ -DskipTests \
+ -Dmaven.javadoc.skip=true \
+ -Dmaven.source.skip=true \
+ -Dgpg.skip=true \
+ install && \
+ mvn -B -q help:evaluate -Dexpression=project.version -DforceStdout > /tmp/ydb-jdbc.version && \
+ YDB_JDBC_VERSION="$(cat /tmp/ydb-jdbc.version)" && \
+ cd /src && \
+ echo "Pinning ydb-jdbc-driver to source-built ${YDB_JDBC_VERSION}" && \
+ mvn -B -q versions:set-property \
+ -Dproperty=ydb.jdbc.version \
+ -DnewVersion="${YDB_JDBC_VERSION}" \
+ -DgenerateBackupPoms=false \
+ -pl slo-workload ; \
+ elif [ -n "${YDB_JDBC_VERSION}" ]; then \
echo "Pinning ydb-jdbc-driver to ${YDB_JDBC_VERSION}" && \
mvn -B -q versions:set-property \
-Dproperty=ydb.jdbc.version \
diff --git a/slo-workload/jdbc/README.md b/slo-workload/jdbc/README.md
index 19ee337..f28d8c9 100644
--- a/slo-workload/jdbc/README.md
+++ b/slo-workload/jdbc/README.md
@@ -40,14 +40,21 @@ relying on server-side YQL builtins inside parameterized statements.
## Retries
-Operations are retried with exponential backoff (up to 10 attempts). An error
-is considered retryable when the driver throws a `SQLRecoverableException` or
-`SQLTransientException` (which covers the driver's
-`YdbRetryableException`, `YdbConditionallyRetryableException`,
-`YdbUnavailbaleException` and `YdbTimeoutException`). The number of retries is
-recorded in `sdk_retry_attempts_total`, and the failure reason is reported via
-the `error_kind` label on `sdk_errors_total` (using the YDB status code when
-available).
+Reads (`SELECT`) and writes (`UPSERT`) are both idempotent, so retries decide
+based on the YDB status code: a `YdbStatusable` whose
+`StatusCode.isRetryable(true)` is true is retried (this covers ABORTED,
+OVERLOADED, UNAVAILABLE, BAD_SESSION, SESSION_BUSY, UNDETERMINED on idempotent
+operations). Anything else falls back to the JDBC marker types
+`SQLRecoverableException` / `SQLTransientException`. The retry attempt cap is
+configurable via `--max-attempts` (default 10) and backoff is capped at 1s.
+Connection-level errors (`SQLRecoverableException`,
+`SQLTransientConnectionException`, `SQLNonTransientConnectionException`)
+invalidate the worker's cached connection before the next attempt opens a
+fresh one.
+
+The number of retries is recorded in `sdk_retry_attempts_total`, and the
+failure reason is reported via the `error_kind` label on `sdk_errors_total`
+(using the YDB status code when available).
## Files
@@ -58,18 +65,18 @@ jdbc/
├── README.md
└── src/main/
├── java/tech/ydb/slo/
- │ ├── Config.java Reads env vars, resolves the JDBC URL
- │ ├── Main.java Entry point
- │ ├── Metrics.java OTLP metrics + HDR histograms
- │ └── kv/
- │ ├── KvWorkload.java Setup/run/teardown loop over JDBC
- │ ├── KvWorkloadParams.java JCommander-bound CLI flags
- │ ├── Row.java Row data class
- │ └── RowGenerator.java Random payload generator
+ │ ├── Main.java Entry point, loads the JDBC driver
+ │ └── jdbc/
+ │ └── JdbcKvClient.java KvClient: connection lifecycle + retry
└── resources/
└── log4j2.xml Console logging config
```
+The shared harness (`Config`, `Metrics`, `KvWorkloadParams`, `Row`,
+`RowGenerator`, `WorkloadRunner`, `Launcher`) lives in
+[`../core`](../core/src/main/java/tech/ydb/slo/core), so every implementation
+emits the same metric contract.
+
## Building and running locally
```bash
diff --git a/slo-workload/jdbc/pom.xml b/slo-workload/jdbc/pom.xml
index c4b174e..20475df 100644
--- a/slo-workload/jdbc/pom.xml
+++ b/slo-workload/jdbc/pom.xml
@@ -11,55 +11,22 @@
../pom.xml
- jdbc
+ slo-workload-jdbcjarJDBC SLO workloadSLO workload exercising the YDB JDBC driver, compatible with ydb-slo-action
-
- tech.ydb.jdbc
- ydb-jdbc-driver
-
-
-
-
- com.beust
- jcommander
-
-
-
-
- com.google.guava
- guava
-
-
-
-
- org.hdrhistogram
- HdrHistogram
+ tech.ydb.examples
+ slo-workload-core
-
- io.opentelemetry
- opentelemetry-api
-
-
- io.opentelemetry
- opentelemetry-sdk
-
-
- io.opentelemetry
- opentelemetry-sdk-metrics
-
-
- io.opentelemetry
- opentelemetry-exporter-otlp
+ tech.ydb.jdbc
+ ydb-jdbc-driver
-
org.apache.logging.log4jlog4j-slf4j2-impl
@@ -72,18 +39,11 @@
org.apache.maven.pluginsmaven-compiler-plugin
-
- 17
-
-
-
org.apache.maven.pluginsmaven-dependency-plugin
-
org.apache.maven.pluginsmaven-jar-plugin
diff --git a/slo-workload/jdbc/src/main/java/tech/ydb/slo/Config.java b/slo-workload/jdbc/src/main/java/tech/ydb/slo/Config.java
deleted file mode 100644
index 2b1ca3a..0000000
--- a/slo-workload/jdbc/src/main/java/tech/ydb/slo/Config.java
+++ /dev/null
@@ -1,148 +0,0 @@
-package tech.ydb.slo;
-
-/**
- * Configuration for the JDBC SLO workload, populated from environment
- * variables provided by the YDB SLO action runtime.
- *
- *
The action sets these variables on the workload container:
- *
{@code WORKLOAD_REF} — value used as the {@code ref} label on all metrics
- *
{@code WORKLOAD_NAME} — workload name (also used as part of the table path)
- *
{@code WORKLOAD_DURATION} — workload run duration in seconds (0 = unlimited)
- *
{@code OTEL_EXPORTER_OTLP_ENDPOINT} — OTLP endpoint for pushing metrics
- *
- *
- *
Because the component under test here is the JDBC driver, the
- * YDB connection is expressed as a JDBC URL ({@code jdbc:ydb:...}). The URL is
- * resolved in this order: {@code YDB_JDBC_URL} (used verbatim), then
- * {@code YDB_CONNECTION_STRING} (prefixed with {@code jdbc:ydb:}), then
- * {@code YDB_ENDPOINT} + {@code YDB_DATABASE}.
- */
-public final class Config {
- private final String jdbcUrl;
- private final String token;
- private final String ref;
- private final String workloadName;
- private final int durationSeconds;
- private final String otlpEndpoint;
-
- private Config(
- String jdbcUrl,
- String token,
- String ref,
- String workloadName,
- int durationSeconds,
- String otlpEndpoint
- ) {
- this.jdbcUrl = jdbcUrl;
- this.token = token;
- this.ref = ref;
- this.workloadName = workloadName;
- this.durationSeconds = durationSeconds;
- this.otlpEndpoint = otlpEndpoint;
- }
-
- public String jdbcUrl() {
- return jdbcUrl;
- }
-
- public String token() {
- return token;
- }
-
- public String ref() {
- return ref;
- }
-
- public String workloadName() {
- return workloadName;
- }
-
- public int durationSeconds() {
- return durationSeconds;
- }
-
- public String otlpEndpoint() {
- return otlpEndpoint;
- }
-
- /**
- * Loads configuration from environment variables.
- *
- * @return configuration instance
- * @throws IllegalStateException if required variables are missing or invalid
- */
- public static Config fromEnv() {
- String jdbcUrl = resolveJdbcUrl();
- if (jdbcUrl == null || jdbcUrl.isEmpty()) {
- throw new IllegalStateException(
- "YDB connection is not configured: set YDB_JDBC_URL, "
- + "YDB_CONNECTION_STRING or YDB_ENDPOINT + YDB_DATABASE"
- );
- }
-
- String token = envOrDefault("YDB_TOKEN", "");
- String ref = envOrDefault("WORKLOAD_REF", "unknown");
- String workloadName = envOrDefault("WORKLOAD_NAME", "java-slo-jdbc-workload");
- int durationSeconds = parseInt(envOrDefault("WORKLOAD_DURATION", "600"), 600);
- String otlpEndpoint = envOrDefault("OTEL_EXPORTER_OTLP_ENDPOINT", "");
-
- return new Config(jdbcUrl, token, ref, workloadName, durationSeconds, otlpEndpoint);
- }
-
- private static String resolveJdbcUrl() {
- String explicit = System.getenv("YDB_JDBC_URL");
- if (explicit != null && !explicit.isEmpty()) {
- return explicit;
- }
-
- String connectionString = System.getenv("YDB_CONNECTION_STRING");
- if (connectionString != null && !connectionString.isEmpty()) {
- return toJdbcUrl(connectionString);
- }
-
- String endpoint = System.getenv("YDB_ENDPOINT");
- String database = System.getenv("YDB_DATABASE");
- if (endpoint == null || endpoint.isEmpty() || database == null || database.isEmpty()) {
- return null;
- }
- return toJdbcUrl(composeConnectionString(endpoint, database));
- }
-
- /**
- * Turns a YDB connection string ({@code grpc://host:port/database}) into a
- * JDBC URL understood by the YDB JDBC driver. If the value already starts
- * with {@code jdbc:}, it is returned unchanged.
- */
- private static String toJdbcUrl(String connectionString) {
- if (connectionString.startsWith("jdbc:")) {
- return connectionString;
- }
- return "jdbc:ydb:" + connectionString;
- }
-
- private static String composeConnectionString(String endpoint, String database) {
- // Compose a connection string in the form grpc://host:port/database.
- if (endpoint.endsWith("/") && database.startsWith("/")) {
- return endpoint + database.substring(1);
- }
- if (!endpoint.endsWith("/") && !database.startsWith("/")) {
- return endpoint + "/" + database;
- }
- return endpoint + database;
- }
-
- private static String envOrDefault(String name, String defaultValue) {
- String value = System.getenv(name);
- return (value == null || value.isEmpty()) ? defaultValue : value;
- }
-
- private static int parseInt(String value, int defaultValue) {
- try {
- return Integer.parseInt(value);
- } catch (NumberFormatException e) {
- return defaultValue;
- }
- }
-}
diff --git a/slo-workload/jdbc/src/main/java/tech/ydb/slo/Main.java b/slo-workload/jdbc/src/main/java/tech/ydb/slo/Main.java
index 997f8b9..b370983 100644
--- a/slo-workload/jdbc/src/main/java/tech/ydb/slo/Main.java
+++ b/slo-workload/jdbc/src/main/java/tech/ydb/slo/Main.java
@@ -1,160 +1,30 @@
package tech.ydb.slo;
-import java.util.Properties;
+import tech.ydb.slo.core.Launcher;
+import tech.ydb.slo.jdbc.JdbcKvClient;
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.ParameterException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import tech.ydb.slo.kv.KvWorkload;
-import tech.ydb.slo.kv.KvWorkloadParams;
-
-/**
- * Entry point of the JDBC SLO workload.
- *
- *
Reads connection details and run parameters from environment variables
- * (see {@link Config}), parses workload-specific flags from the command line
- * (see {@link KvWorkloadParams}), and runs the KV workload phases — setup,
- * run, teardown — pushing metrics to the OTLP endpoint configured by the YDB
- * SLO action runtime.
- *
- *
The workload creates a partitioned table, prefills it with rows, and then
- * runs read and write loops at fixed RPS for the configured duration. Each
- * operation is timed and retried with exponential backoff; the outcome is
- * recorded into {@link Metrics} so the SLO action can compare current and
- * baseline runs.
- *
- *
Schema and queries mirror the KV workloads in the Go and JavaScript SDKs
- * so the produced metrics are directly comparable across SDKs. Unlike the
- * query-service workload, the primary-key {@code hash} column is derived on
- * the client (see {@link #numericHash(long)}) instead of via the server-side
- * {@code Digest::NumericHash}, which keeps the parameterized JDBC statements
- * free of type-inference ambiguity.
- *
- *
Concurrency model: each operation type (read / write) gets a dedicated
- * thread pool sized to the configured RPS. Every worker thread owns its own
- * JDBC {@link Connection} (the YDB driver's connections are not thread-safe),
- * pulls a permit from a shared Guava {@link RateLimiter}, and executes the
- * operation inline. There is no separate driver thread and no work queue.
- */
-public final class KvWorkload {
- private static final Logger logger = LoggerFactory.getLogger(KvWorkload.class);
-
- private static final String CREATE_TABLE_QUERY_TEMPLATE = ""
- + "CREATE TABLE IF NOT EXISTS `%s` ("
- + " hash Uint64,"
- + " id Uint64,"
- + " payload_str Utf8,"
- + " payload_double Double,"
- + " payload_timestamp Timestamp,"
- + " payload_hash Uint64,"
- + " PRIMARY KEY (hash, id)"
- + ") WITH ("
- + " UNIFORM_PARTITIONS = %d,"
- + " AUTO_PARTITIONING_BY_SIZE = ENABLED,"
- + " AUTO_PARTITIONING_PARTITION_SIZE_MB = %d,"
- + " AUTO_PARTITIONING_MIN_PARTITIONS_COUNT = %d,"
- + " AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = %d"
- + ")";
-
- private static final String DROP_TABLE_QUERY_TEMPLATE = "DROP TABLE `%s`";
-
- private static final String WRITE_QUERY_TEMPLATE = ""
- + "UPSERT INTO `%s` ("
- + " hash, id, payload_str, payload_double, payload_timestamp, payload_hash"
- + ") VALUES (?, ?, ?, ?, ?, ?)";
-
- private static final String READ_QUERY_TEMPLATE = ""
- + "SELECT id, payload_str, payload_double, payload_timestamp, payload_hash"
- + " FROM `%s`"
- + " WHERE id = ? AND hash = ?";
-
- /*
- * Hard cap on the number of worker threads spawned for a single operation
- * type. The SLO targets a few hundred RPS in CI; allowing more workers
- * than this just wastes threads on JIT-warmup contention without
- * improving throughput.
- */
- private static final int MAX_WORKERS = 64;
-
- /*
- * Maximum number of attempts (initial + retries) per operation before it
- * is recorded as a failure. Mirrors the order of magnitude of the
- * query-service SessionRetryContext default.
- */
- private static final int MAX_ATTEMPTS = 10;
-
- private static final long INITIAL_BACKOFF_MS = 10L;
- private static final long MAX_BACKOFF_MS = 1_000L;
-
- /*
- * Extra time, on top of the workload duration, given to worker pools to
- * complete their last in-flight operations before {@link #run()} forces
- * shutdown.
- */
- private static final long SHUTDOWN_GRACE_SECONDS = 30L;
-
- private final String jdbcUrl;
- private final Properties connectionProperties;
- private final Metrics metrics;
- private final KvWorkloadParams params;
- private final String tablePath;
- private final RowGenerator generator;
-
- public KvWorkload(
- String jdbcUrl,
- Properties connectionProperties,
- Metrics metrics,
- KvWorkloadParams params,
- String tablePath
- ) {
- this.jdbcUrl = jdbcUrl;
- this.connectionProperties = connectionProperties;
- this.metrics = metrics;
- this.params = params;
- this.tablePath = tablePath;
- this.generator = new RowGenerator(params.prefillCount());
- }
-
- /*
- * Creates the table (if missing) and prefills it with
- * {@code params.prefillCount()} rows using a bounded pool of worker
- * connections.
- */
- public void setup() throws InterruptedException, SQLException {
- logger.info("creating table {}", tablePath);
- try (Connection conn = openConnection();
- Statement stmt = conn.createStatement()) {
- stmt.execute(String.format(
- CREATE_TABLE_QUERY_TEMPLATE,
- tablePath,
- params.minPartitionCount(),
- params.partitionSizeMb(),
- params.minPartitionCount(),
- params.maxPartitionCount()
- ));
- }
- logger.info("table {} created", tablePath);
-
- if (params.prefillCount() <= 0) {
- logger.info("prefill count <= 0, skipping prefill");
- return;
- }
-
- logger.info("prefilling {} rows into {}", params.prefillCount(), tablePath);
- int parallelism = Math.min(MAX_WORKERS, Math.max(1, params.prefillCount()));
- ExecutorService prefillPool = Executors.newFixedThreadPool(
- parallelism, namedThreadFactory("slo-prefill-")
- );
- AtomicLong nextId = new AtomicLong(0);
- AtomicInteger failed = new AtomicInteger();
- try {
- for (int w = 0; w < parallelism; w++) {
- prefillPool.execute(() -> {
- try (WorkerConnection wc = new WorkerConnection()) {
- long id;
- while ((id = nextId.getAndIncrement()) < params.prefillCount()) {
- SQLException err = writeWithRetry(wc, RowGenerator.generate(id),
- params.writeTimeoutMs(), null);
- if (err != null) {
- int f = failed.incrementAndGet();
- if (f <= 5) {
- logger.warn("prefill row {} failed: {}", id, err.toString());
- }
- }
- }
- }
- });
- }
- } finally {
- prefillPool.shutdown();
- if (!prefillPool.awaitTermination(5, TimeUnit.MINUTES)) {
- prefillPool.shutdownNow();
- }
- }
- if (failed.get() > 0) {
- logger.warn("prefill completed with {} failed rows out of {}",
- failed.get(), params.prefillCount());
- } else {
- logger.info("prefill completed");
- }
- }
-
- /*
- * Runs the workload until the configured deadline or thread interruption.
- */
- public void run() throws InterruptedException {
- long durationSeconds = params.durationSeconds();
- long endNanos = durationSeconds > 0
- ? System.nanoTime() + TimeUnit.SECONDS.toNanos(durationSeconds)
- : Long.MAX_VALUE;
-
- // Track how many writes have completed so reads target a key-space
- // that's actually been populated. The generator was constructed with
- // nextId = prefillCount, so writes pick up where prefill left off.
- AtomicLong writesIssued = new AtomicLong();
-
- int readWorkers = workerCount(params.readRps());
- int writeWorkers = workerCount(params.writeRps());
-
- if (readWorkers == 0 && writeWorkers == 0) {
- logger.warn("both read and write RPS are <= 0, run phase has nothing to do");
- return;
- }
-
- ExecutorService readPool = null;
- ExecutorService writePool = null;
- try {
- if (readWorkers > 0) {
- readPool = Executors.newFixedThreadPool(readWorkers, namedThreadFactory("slo-read-"));
- RateLimiter readLimiter = RateLimiter.create(params.readRps());
- for (int i = 0; i < readWorkers; i++) {
- readPool.execute(() -> readWorkerLoop(endNanos, readLimiter, writesIssued));
- }
- } else {
- logger.info("read RPS <= 0, skipping read workers");
- }
-
- if (writeWorkers > 0) {
- writePool = Executors.newFixedThreadPool(writeWorkers, namedThreadFactory("slo-write-"));
- RateLimiter writeLimiter = RateLimiter.create(params.writeRps());
- for (int i = 0; i < writeWorkers; i++) {
- writePool.execute(() -> writeWorkerLoop(endNanos, writeLimiter, writesIssued));
- }
- } else {
- logger.info("write RPS <= 0, skipping write workers");
- }
-
- // Wait for workers to drain naturally as they hit the deadline.
- long graceNanos = TimeUnit.SECONDS.toNanos(SHUTDOWN_GRACE_SECONDS);
- long waitNanos = durationSeconds > 0
- ? Math.max(0L, endNanos - System.nanoTime()) + graceNanos
- : Long.MAX_VALUE;
-
- if (readPool != null) {
- readPool.shutdown();
- }
- if (writePool != null) {
- writePool.shutdown();
- }
-
- if (readPool != null) {
- long started = System.nanoTime();
- if (!readPool.awaitTermination(waitNanos, TimeUnit.NANOSECONDS)) {
- logger.warn("read pool did not drain within deadline, forcing shutdown");
- readPool.shutdownNow();
- }
- waitNanos = Math.max(0L, waitNanos - (System.nanoTime() - started));
- }
- if (writePool != null) {
- if (!writePool.awaitTermination(waitNanos, TimeUnit.NANOSECONDS)) {
- logger.warn("write pool did not drain within deadline, forcing shutdown");
- writePool.shutdownNow();
- }
- }
- } finally {
- forceShutdown(readPool, "read pool");
- forceShutdown(writePool, "write pool");
- }
- }
-
- /*
- * Drops the workload table. Called from the {@code finally} block in
- * {@code Main} so the database is left clean even on failure.
- */
- public void teardown() {
- logger.info("dropping table {}", tablePath);
- try (Connection conn = openConnection();
- Statement stmt = conn.createStatement()) {
- stmt.execute(String.format(DROP_TABLE_QUERY_TEMPLATE, tablePath));
- logger.info("table {} dropped", tablePath);
- } catch (SQLException e) {
- logger.warn("failed to drop table {}: {}", tablePath, e.toString());
- }
- }
-
- // --- worker loops ------------------------------------------------------
-
- private void readWorkerLoop(long endNanos, RateLimiter limiter, AtomicLong writesIssued) {
- try (WorkerConnection wc = new WorkerConnection()) {
- while (System.nanoTime() < endNanos && !Thread.currentThread().isInterrupted()) {
- limiter.acquire();
- try {
- readOnce(wc, writesIssued.get());
- } catch (Throwable t) {
- logger.warn("read op threw unexpectedly: {}", t.toString());
- }
- }
- }
- }
-
- private void writeWorkerLoop(long endNanos, RateLimiter limiter, AtomicLong writesIssued) {
- try (WorkerConnection wc = new WorkerConnection()) {
- while (System.nanoTime() < endNanos && !Thread.currentThread().isInterrupted()) {
- limiter.acquire();
- try {
- writeOnce(wc, generator.generate());
- writesIssued.incrementAndGet();
- } catch (Throwable t) {
- logger.warn("write op threw unexpectedly: {}", t.toString());
- }
- }
- }
- }
-
- // --- single operations -------------------------------------------------
-
- private void readOnce(WorkerConnection wc, long writesObserved) {
- long upperBound = Math.max(1L, params.prefillCount() + writesObserved);
- long id = ThreadLocalRandom.current().nextLong(upperBound);
- long hash = numericHash(id);
-
- Metrics.Span span = metrics.startOperation(Metrics.OperationType.READ);
- int attempts = 0;
- SQLException last = null;
- while (attempts < MAX_ATTEMPTS) {
- attempts++;
- try {
- wc.read(id, hash, timeoutSeconds(params.readTimeoutMs()));
- span.finishSuccess(attempts - 1);
- return;
- } catch (SQLException e) {
- last = e;
- if (!isRetryable(e) || attempts >= MAX_ATTEMPTS) {
- break;
- }
- wc.invalidateOnConnectionError(e);
- backoff(attempts);
- }
- }
- span.finishError(attempts - 1, classifyError(last));
- logger.debug("read {} failed: {}", id, last == null ? "?" : last.toString());
- }
-
- private void writeOnce(WorkerConnection wc, Row row) {
- Metrics.Span span = metrics.startOperation(Metrics.OperationType.WRITE);
- int[] attemptsOut = new int[1];
- SQLException err = writeWithRetry(wc, row, params.writeTimeoutMs(), attemptsOut);
- if (err == null) {
- span.finishSuccess(attemptsOut[0] - 1);
- } else {
- span.finishError(Math.max(0, attemptsOut[0] - 1), classifyError(err));
- logger.debug("write {} failed: {}", row.id(), err.toString());
- }
- }
-
- /*
- * Writes a single row with retry. When {@code attemptsOut} is non-null, the
- * total number of attempts is written to its first element. Returns
- * {@code null} on success or the last {@link SQLException} on failure.
- * Used both by the run phase (with metrics handled by the caller) and
- * prefill (silent).
- */
- private SQLException writeWithRetry(WorkerConnection wc, Row row, int timeoutMs, int[] attemptsOut) {
- long hash = numericHash(row.id());
- int attempts = 0;
- SQLException last = null;
- while (attempts < MAX_ATTEMPTS) {
- attempts++;
- try {
- wc.write(row, hash, timeoutSeconds(timeoutMs));
- if (attemptsOut != null) {
- attemptsOut[0] = attempts;
- }
- return null;
- } catch (SQLException e) {
- last = e;
- if (!isRetryable(e) || attempts >= MAX_ATTEMPTS) {
- break;
- }
- wc.invalidateOnConnectionError(e);
- backoff(attempts);
- }
- }
- if (attemptsOut != null) {
- attemptsOut[0] = attempts;
- }
- return last;
- }
-
- // --- helpers -----------------------------------------------------------
-
- private Connection openConnection() throws SQLException {
- return DriverManager.getConnection(jdbcUrl, connectionProperties);
- }
-
- private static int timeoutSeconds(int timeoutMs) {
- return Math.max(1, (timeoutMs + 999) / 1000);
- }
-
- private static boolean isRetryable(SQLException e) {
- return e instanceof SQLRecoverableException || e instanceof SQLTransientException;
- }
-
- private static boolean isConnectionError(SQLException e) {
- return e instanceof SQLRecoverableException || e instanceof SQLTransientConnectionException;
- }
-
- private static String classifyError(SQLException e) {
- if (e == null) {
- return "unknown";
- }
- if (e instanceof YdbStatusable) {
- try {
- return "ydb/" + ((YdbStatusable) e).getStatus().getCode().name().toLowerCase();
- } catch (RuntimeException ignored) {
- // fall through to the generic classification
- }
- }
- return e.getClass().getSimpleName().toLowerCase();
- }
-
- private static void backoff(int attempt) {
- long delay = Math.min(MAX_BACKOFF_MS, INITIAL_BACKOFF_MS * (1L << Math.min(attempt - 1, 20)));
- try {
- Thread.sleep(delay);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- /**
- * Derives the primary-key {@code hash} column from {@code id} using a
- * SplitMix64-style mix. Reads and writes both call this so they always
- * target the same key. The exact function does not need to match YQL's
- * {@code Digest::NumericHash}; it only needs to be deterministic and
- * well distributed across partitions.
- * @param id row id
- * @return derived hash value
- */
- private static long numericHash(long id) {
- long z = id + 0x9E3779B97F4A7C15L;
- z = (z ^ (z >>> 30)) * 0xBF58476D1CE4E5B9L;
- z = (z ^ (z >>> 27)) * 0x94D049BB133111EBL;
- return z ^ (z >>> 31);
- }
-
- private static int workerCount(int rps) {
- if (rps <= 0) {
- return 0;
- }
- return Math.min(MAX_WORKERS, Math.max(1, rps));
- }
-
- private static ThreadFactory namedThreadFactory(String prefix) {
- AtomicInteger counter = new AtomicInteger();
- return r -> {
- Thread t = new Thread(r, prefix + counter.getAndIncrement());
- t.setDaemon(true);
- return t;
- };
- }
-
- private static void forceShutdown(ExecutorService pool, String name) {
- if (pool == null || pool.isTerminated()) {
- return;
- }
- logger.warn("{} still active in cleanup, forcing shutdown", name);
- pool.shutdownNow();
- try {
- if (!pool.awaitTermination(5, TimeUnit.SECONDS)) {
- logger.warn("{} did not terminate after shutdownNow", name);
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- /**
- * A JDBC connection owned by a single worker thread, together with lazily
- * prepared read/write statements. On a connection-level error the holder
- * is invalidated so the next operation transparently reconnects.
- */
- private final class WorkerConnection implements AutoCloseable {
- private Connection connection;
- private PreparedStatement readStmt;
- private PreparedStatement writeStmt;
-
- private Connection connection() throws SQLException {
- if (connection == null || connection.isClosed()) {
- connection = openConnection();
- readStmt = null;
- writeStmt = null;
- }
- return connection;
- }
-
- private PreparedStatement readStmt() throws SQLException {
- Connection conn = connection();
- if (readStmt == null) {
- readStmt = conn.prepareStatement(String.format(READ_QUERY_TEMPLATE, tablePath));
- }
- return readStmt;
- }
-
- private PreparedStatement writeStmt() throws SQLException {
- Connection conn = connection();
- if (writeStmt == null) {
- writeStmt = conn.prepareStatement(String.format(WRITE_QUERY_TEMPLATE, tablePath));
- }
- return writeStmt;
- }
-
- void read(long id, long hash, int timeoutSeconds) throws SQLException {
- PreparedStatement stmt = readStmt();
- stmt.setQueryTimeout(timeoutSeconds);
- stmt.setLong(1, id);
- stmt.setLong(2, hash);
- try (ResultSet rs = stmt.executeQuery()) {
- // Touch the result set so we exercise the deserialization path.
- while (rs.next()) {
- rs.getLong("id");
- }
- }
- }
-
- void write(Row row, long hash, int timeoutSeconds) throws SQLException {
- PreparedStatement stmt = writeStmt();
- stmt.setQueryTimeout(timeoutSeconds);
- stmt.setLong(1, hash);
- stmt.setLong(2, row.id());
- stmt.setString(3, row.payloadStr());
- stmt.setDouble(4, row.payloadDouble());
- stmt.setTimestamp(5, Timestamp.from(row.payloadTimestamp()));
- stmt.setLong(6, row.payloadHash());
- stmt.executeUpdate();
- }
-
- void invalidateOnConnectionError(SQLException e) {
- if (isConnectionError(e)) {
- close();
- }
- }
-
- @Override
- public void close() {
- closeQuietly(readStmt);
- closeQuietly(writeStmt);
- closeQuietly(connection);
- readStmt = null;
- writeStmt = null;
- connection = null;
- }
-
- private void closeQuietly(AutoCloseable closeable) {
- if (closeable == null) {
- return;
- }
- try {
- closeable.close();
- } catch (Exception ignored) {
- // best-effort cleanup
- }
- }
- }
-}
diff --git a/slo-workload/jdbc/src/main/java/tech/ydb/slo/kv/KvWorkloadParams.java b/slo-workload/jdbc/src/main/java/tech/ydb/slo/kv/KvWorkloadParams.java
deleted file mode 100644
index 41f2156..0000000
--- a/slo-workload/jdbc/src/main/java/tech/ydb/slo/kv/KvWorkloadParams.java
+++ /dev/null
@@ -1,115 +0,0 @@
-package tech.ydb.slo.kv;
-
-import com.beust.jcommander.Parameter;
-
-/**
- * Tunable parameters for the KV workload.
- *
- *
Defaults match the SLO workloads in the Go and JavaScript SDKs so the
- * runs are comparable. JCommander annotations let the operator override
- * any field from the command line, e.g.
- * {@code --read-rps 500 --write-rps 50}.
- */
-@SuppressWarnings("FieldMayBeFinal")
-public final class KvWorkloadParams {
-
- @Parameter(
- names = {"--read-rps"},
- description = "Target read operations per second"
- )
- private int readRps = 1000;
-
- @Parameter(
- names = {"--write-rps"},
- description = "Target write operations per second"
- )
- private int writeRps = 100;
-
- @Parameter(
- names = {"--read-timeout-ms"},
- description = "Per-attempt read timeout in milliseconds"
- )
- private int readTimeoutMs = 10_000;
-
- @Parameter(
- names = {"--write-timeout-ms"},
- description = "Per-attempt write timeout in milliseconds"
- )
- private int writeTimeoutMs = 10_000;
-
- @Parameter(
- names = {"--prefill-count"},
- description = "Number of rows to prefill before the run phase"
- )
- private int prefillCount = 1_000;
-
- @Parameter(
- names = {"--partition-size"},
- description = "Auto-partitioning partition size in MB"
- )
- private int partitionSizeMb = 1;
-
- @Parameter(
- names = {"--min-partition-count"},
- description = "Minimum number of table partitions"
- )
- private int minPartitionCount = 6;
-
- @Parameter(
- names = {"--max-partition-count"},
- description = "Maximum number of table partitions"
- )
- private int maxPartitionCount = 1_000;
-
- @Parameter(
- names = {"--duration"},
- description = "Run duration in seconds (overrides WORKLOAD_DURATION when > 0)"
- )
- private int durationSeconds = 0;
-
- public int readRps() {
- return readRps;
- }
-
- public int writeRps() {
- return writeRps;
- }
-
- public int readTimeoutMs() {
- return readTimeoutMs;
- }
-
- public int writeTimeoutMs() {
- return writeTimeoutMs;
- }
-
- public int prefillCount() {
- return prefillCount;
- }
-
- public int partitionSizeMb() {
- return partitionSizeMb;
- }
-
- public int minPartitionCount() {
- return minPartitionCount;
- }
-
- public int maxPartitionCount() {
- return maxPartitionCount;
- }
-
- /**
- * Effective run duration. If the CLI flag was omitted (left at 0), falls
- * back to the value supplied via the {@code WORKLOAD_DURATION} environment
- * variable through {@code Config}.
- * @return Effective run duration value
- */
- public int durationSeconds() {
- return durationSeconds;
- }
-
- public void setDurationSeconds(int durationSeconds) {
- this.durationSeconds = durationSeconds;
- }
-}
diff --git a/slo-workload/jdbc/src/main/java/tech/ydb/slo/kv/Row.java b/slo-workload/jdbc/src/main/java/tech/ydb/slo/kv/Row.java
deleted file mode 100644
index a53ba89..0000000
--- a/slo-workload/jdbc/src/main/java/tech/ydb/slo/kv/Row.java
+++ /dev/null
@@ -1,64 +0,0 @@
-package tech.ydb.slo.kv;
-
-import java.time.Instant;
-
-/**
- * A single row of the KV workload table.
- *
- *
The schema mirrors the one used by SLO workloads in other YDB SDKs
- * (Go, JavaScript):
- *