diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json index 090751435f20..e623d3373a93 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run!", - "modification": 7, + "modification": 1, } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index 0721ddc4685e..4d3bfdbe4b19 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -37,6 +37,7 @@ import org.apache.beam.runners.core.triggers.TriggerStateMachineRunner; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.state.TimeDomain; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -93,6 +94,11 @@ }) // TODO(https://github.com/apache/beam/issues/20497) public class ReduceFnRunner { + // Experiments guarding optimizations in development. No backward compatibility guarantees. + public static final String UNSTABLE_NOT_UPDATE_COMPATIBLE_NEW_WINDOW_OPTIMIZATION = + "unstable_not_update_compatible_new_window_optimization"; + public static final String UNSTABLE_DISABLE_WATERMARK_KNOWN_EMPTY_OPTIMIZATION = + "unstable_disable_watermark_known_empty_optimization"; /** * The {@link ReduceFnRunner} depends on most aspects of the {@link WindowingStrategy}. * @@ -211,6 +217,9 @@ public class ReduceFnRunner { */ private final NonEmptyPanes nonEmptyPanes; + private final boolean useNewWindowOptimization; + private final boolean disableWatermarkKnownEmptyOptimization; + public ReduceFnRunner( K key, WindowingStrategy windowingStrategy, @@ -237,6 +246,15 @@ public ReduceFnRunner( this.nonEmptyPanes = NonEmptyPanes.create(this.windowingStrategy, this.reduceFn); + this.useNewWindowOptimization = + windowingStrategy.getWindowFn().isNonMerging() + && ExperimentalOptions.hasExperiment( + options, UNSTABLE_NOT_UPDATE_COMPATIBLE_NEW_WINDOW_OPTIMIZATION); + + this.disableWatermarkKnownEmptyOptimization = + ExperimentalOptions.hasExperiment( + options, UNSTABLE_DISABLE_WATERMARK_KNOWN_EMPTY_OPTIMIZATION); + // Note this may incur I/O to load persisted window set data. this.activeWindows = createActiveWindowSet(); @@ -256,7 +274,8 @@ public ReduceFnRunner( new TriggerStateMachineRunner<>( triggerStateMachine, new TriggerStateMachineContextFactory<>( - windowingStrategy.getWindowFn(), stateInternals, activeWindows)); + windowingStrategy.getWindowFn(), stateInternals, activeWindows), + this.useNewWindowOptimization); } private ActiveWindowSet createActiveWindowSet() { @@ -275,6 +294,16 @@ boolean hasNoActiveWindows() { return activeWindows.getActiveAndNewWindows().isEmpty(); } + @VisibleForTesting + TriggerStateMachineRunner getTriggerRunner() { + return triggerRunner; + } + + @VisibleForTesting + ReduceFnContextFactory getContextFactory() { + return contextFactory; + } + private Set windowsThatAreOpen(Collection windows) { Set result = new HashSet<>(); for (W window : windows) { @@ -613,6 +642,20 @@ private void processElement(Map windowToMergeResult, WindowedValue StateStyle.RENAMED, value.causedByDrain()); + // TODO: Make sure the NewWindowOptimization does not create unbounded trigger state + // in GlobalWindow + if (useNewWindowOptimization && triggerRunner.isNew(directContext.state())) { + // Blindly clear state to ensure Windmill doesn't do unnecessary reads. + // TODO: Instead of the clears here, we could mark these states as empty locally + // in the state cache and/or explicitly tell that the entries are non-existent + reduceFn.clearState(renamedContext); + paneInfoTracker.clear(directContext.state()); + if (!disableWatermarkKnownEmptyOptimization) { + watermarkHold.setKnownEmpty(renamedContext); + } + nonEmptyPanes.clearPane(renamedContext.state()); + } + nonEmptyPanes.recordContent(renamedContext.state()); scheduleGarbageCollectionTimer(directContext); @@ -747,7 +790,7 @@ public void onTimers(Iterable timers) throws Exception { // Perform prefetching of state to determine if the trigger should fire. if (windowActivation.isGarbageCollection) { - triggerRunner.prefetchIsClosed(directContext.state()); + triggerRunner.prefetchFinishedSet(directContext.state()); } else { triggerRunner.prefetchShouldFire(directContext.window(), directContext.state()); } @@ -926,7 +969,7 @@ private void prefetchEmit( ReduceFn.Context directContext, ReduceFn.Context renamedContext) { triggerRunner.prefetchShouldFire(directContext.window(), directContext.state()); - triggerRunner.prefetchIsClosed(directContext.state()); + triggerRunner.prefetchFinishedSet(directContext.state()); prefetchOnTrigger(directContext, renamedContext); } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java index 15ae8dfe5f1a..b9185ccfba3f 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java @@ -466,6 +466,23 @@ public void clearHolds(ReduceFn.Context context) { context.state().access(EXTRA_HOLD_TAG).clear(); } + /** + * For internal use only; no backwards-compatibility guarantees. + * + *

Permit marking the watermark holds as empty locally, without necessarily clearing them in + * the backend. + */ + public void setKnownEmpty(ReduceFn.Context context) { + WindowTracing.debug( + "WatermarkHold.setKnownEmpty: For key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", + context.key(), + context.window(), + timerInternals.currentInputWatermarkTime(), + timerInternals.currentOutputWatermarkTime()); + context.state().access(elementHoldTag).setKnownEmpty(); + context.state().access(EXTRA_HOLD_TAG).setKnownEmpty(); + } + /** Return the current data hold, or null if none. Does not clear. For debugging only. */ public @Nullable Instant getDataCurrent(ReduceFn.Context context) { return context.state().access(elementHoldTag).read(); diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggersBitSet.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggersBitSet.java index 7eebb4474c6c..967e1ef43f08 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggersBitSet.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggersBitSet.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.core.triggers; import java.util.BitSet; +import org.checkerframework.checker.nullness.qual.Nullable; /** A {@link FinishedTriggers} implementation based on an underlying {@link BitSet}. */ public class FinishedTriggersBitSet implements FinishedTriggers { @@ -60,4 +61,17 @@ public void clearRecursively(ExecutableTriggerStateMachine trigger) { public FinishedTriggersBitSet copy() { return new FinishedTriggersBitSet((BitSet) bitSet.clone()); } + + @Override + public boolean equals(@Nullable Object obj) { + if (!(obj instanceof FinishedTriggersBitSet)) { + return false; + } + return bitSet.equals(((FinishedTriggersBitSet) obj).bitSet); + } + + @Override + public int hashCode() { + return bitSet.hashCode(); + } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java index e3791821b728..90daa5ac75b2 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java @@ -63,13 +63,16 @@ public class TriggerStateMachineRunner { private final ExecutableTriggerStateMachine rootTrigger; private final TriggerStateMachineContextFactory contextFactory; + private final boolean useNewWindowOptimization; public TriggerStateMachineRunner( ExecutableTriggerStateMachine rootTrigger, - TriggerStateMachineContextFactory contextFactory) { + TriggerStateMachineContextFactory contextFactory, + boolean useNewWindowOptimization) { checkState(rootTrigger.getTriggerIndex() == 0); this.rootTrigger = rootTrigger; this.contextFactory = contextFactory; + this.useNewWindowOptimization = useNewWindowOptimization; } private FinishedTriggersBitSet readFinishedBits(ValueState state) { @@ -81,9 +84,11 @@ private FinishedTriggersBitSet readFinishedBits(ValueState state) { } @Nullable BitSet bitSet = state.read(); - return bitSet == null - ? FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree()) - : FinishedTriggersBitSet.fromBitSet(bitSet); + if (bitSet == null) { + return FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree()); + } + + return FinishedTriggersBitSet.fromBitSet(bitSet); } private void clearFinishedBits(ValueState state) { @@ -99,19 +104,29 @@ public boolean isClosed(StateAccessor state) { return readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger); } - public void prefetchIsClosed(StateAccessor state) { + /** Return true if the window is new (no trigger state has ever been persisted). */ + public boolean isNew(StateAccessor state) { + return isFinishedSetNeeded() && state.access(FINISHED_BITS_TAG).read() == null; + } + + @VisibleForTesting + public BitSet getFinishedBits(StateAccessor state) { + return readFinishedBits(state.access(FINISHED_BITS_TAG)).getBitSet(); + } + + public void prefetchFinishedSet(StateAccessor state) { if (isFinishedSetNeeded()) { state.access(FINISHED_BITS_TAG).readLater(); } } public void prefetchForValue(W window, StateAccessor state) { - prefetchIsClosed(state); + prefetchFinishedSet(state); rootTrigger.invokePrefetchOnElement(contextFactory.createPrefetchContext(window, rootTrigger)); } public void prefetchShouldFire(W window, StateAccessor state) { - prefetchIsClosed(state); + prefetchFinishedSet(state); rootTrigger.invokePrefetchShouldFire(contextFactory.createPrefetchContext(window, rootTrigger)); } @@ -180,13 +195,23 @@ public void onFire(W window, Timers timers, StateAccessor state) throws Excep persistFinishedSet(state, finishedSet); } - private void persistFinishedSet( - StateAccessor state, FinishedTriggersBitSet modifiedFinishedSet) { + @VisibleForTesting + void persistFinishedSet(StateAccessor state, FinishedTriggersBitSet modifiedFinishedSet) { if (!isFinishedSetNeeded()) { return; } ValueState finishedSetState = state.access(FINISHED_BITS_TAG); + + if (useNewWindowOptimization) { + if (finishedSetState.read() == null + || !readFinishedBits(finishedSetState).equals(modifiedFinishedSet)) { + // Write a value even if the bitset was empty + finishedSetState.write(modifiedFinishedSet.getBitSet()); + } + return; + } + if (!readFinishedBits(finishedSetState).equals(modifiedFinishedSet)) { if (modifiedFinishedSet.getBitSet().isEmpty()) { finishedSetState.clear(); diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java index 85f6573be23e..72b379a7c71b 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java @@ -40,15 +40,19 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.BitSet; +import java.util.Collections; import java.util.List; import java.util.Random; import java.util.concurrent.ThreadLocalRandom; +import org.apache.beam.runners.core.ReduceFnContextFactory.StateStyle; import org.apache.beam.runners.core.metrics.MetricsContainerImpl; import org.apache.beam.runners.core.triggers.DefaultTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachine; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.metrics.MetricName; import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.state.TimeDomain; @@ -2343,4 +2347,58 @@ public interface TestOptions extends PipelineOptions { void setValue(int value); } + + @Test + public void testNewWindowOptimization() throws Exception { + WindowingStrategy strategy = + WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) + .withTrigger(AfterPane.elementCountAtLeast(2)) + .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES); + + PipelineOptions options = PipelineOptionsFactory.create(); + options + .as(ExperimentalOptions.class) + .setExperiments( + Collections.singletonList("unstable_not_update_compatible_new_window_optimization")); + ReduceFnTester, IntervalWindow> tester = + ReduceFnTester.nonCombining(strategy, options); + + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); + + assertTrue( + "Window should be new", + tester + .createRunner() + .getTriggerRunner() + .isNew( + tester.createRunner().getContextFactory().base(window, StateStyle.DIRECT).state())); + + // 1. First element for a new window. + tester.injectElements(TimestampedValue.of(1, new Instant(1))); + + BitSet bitSet = + tester + .createRunner() + .getTriggerRunner() + .getFinishedBits( + tester.createRunner().getContextFactory().base(window, StateStyle.DIRECT).state()); + assertTrue("Bitset should be empty", bitSet.isEmpty()); + assertFalse("Trigger should not be finished", bitSet.get(0)); + + assertFalse( + "Window should no longer be new", + tester + .createRunner() + .getTriggerRunner() + .isNew( + tester.createRunner().getContextFactory().base(window, StateStyle.DIRECT).state())); + + // 2. Second element for the same window. + tester.injectElements(TimestampedValue.of(2, new Instant(2))); + + // Extract output. + List>> output = tester.extractOutput(); + // 2 elements fired at end of window. + assertThat(output, contains(isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10))); + } } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index 43b6a3cb0cb0..d1f137bd4936 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -129,6 +129,19 @@ ReduceFnTester, W> nonCombining( NullSideInputReader.empty()); } + public static + ReduceFnTester, W> nonCombining( + WindowingStrategy windowingStrategy, PipelineOptions options) throws Exception { + return new ReduceFnTester<>( + windowingStrategy, + TriggerStateMachines.stateMachineForTrigger( + TriggerTranslation.toProto(windowingStrategy.getTrigger())), + SystemReduceFn.buffering(VarIntCoder.of()), + IterableCoder.of(VarIntCoder.of()), + options, + NullSideInputReader.empty()); + } + /** * Creates a {@link ReduceFnTester} for the given {@link WindowingStrategy} and {@link * TriggerStateMachine}, for mocking the interactions between {@link ReduceFnRunner} and the diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunnerTest.java new file mode 100644 index 000000000000..81c807532756 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunnerTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.BitSet; +import org.apache.beam.runners.core.StateAccessor; +import org.apache.beam.sdk.state.ValueState; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +@RunWith(JUnit4.class) +public class TriggerStateMachineRunnerTest { + + @Mock private StateAccessor mockState; + @Mock private ValueState mockFinishedSetState; + @Mock private TriggerStateMachineContextFactory mockContextFactory; + @Mock private TriggerStateMachine mockTriggerStateMachine; + + private ExecutableTriggerStateMachine rootTrigger; + + @Before + public void setUp() { + MockitoAnnotations.initMocks(this); + when(mockState.access(TriggerStateMachineRunner.FINISHED_BITS_TAG)) + .thenReturn((ValueState) mockFinishedSetState); + rootTrigger = ExecutableTriggerStateMachine.create(mockTriggerStateMachine); + } + + @Test + public void testPersistFinishedSet_emptyAndOptimizationEnabled() throws Exception { + when(mockFinishedSetState.read()).thenReturn(null); + + TriggerStateMachineRunner runner = + new TriggerStateMachineRunner<>( + rootTrigger, + (TriggerStateMachineContextFactory) mockContextFactory, + true /* useNewWindowOptimization */); + + FinishedTriggersBitSet modifiedFinishedSet = FinishedTriggersBitSet.emptyWithCapacity(1); + + runner.persistFinishedSet(mockState, modifiedFinishedSet); + + // Should write empty bitset because optimization is enabled + verify(mockFinishedSetState).write(modifiedFinishedSet.getBitSet()); + } + + @Test + public void testPersistFinishedSet_emptyAndOptimizationDisabled() throws Exception { + when(mockFinishedSetState.read()).thenReturn(null); + + TriggerStateMachineRunner runner = + new TriggerStateMachineRunner<>( + rootTrigger, + (TriggerStateMachineContextFactory) mockContextFactory, + false /* useNewWindowOptimization */); + + FinishedTriggersBitSet modifiedFinishedSet = FinishedTriggersBitSet.emptyWithCapacity(1); + + runner.persistFinishedSet(mockState, modifiedFinishedSet); + + // Should NOT write empty bitset because optimization is disabled and it was already empty (read + // returned null) + verify(mockFinishedSetState, never()).write(modifiedFinishedSet.getBitSet()); + } + + private void runTestPersistFinishedSet_nonEmpty(boolean useNewWindowOptimization) + throws Exception { + when(mockFinishedSetState.read()).thenReturn(null); + + TriggerStateMachineRunner runner = + new TriggerStateMachineRunner<>( + rootTrigger, + (TriggerStateMachineContextFactory) mockContextFactory, + useNewWindowOptimization); + + FinishedTriggersBitSet modifiedFinishedSet = FinishedTriggersBitSet.emptyWithCapacity(1); + modifiedFinishedSet.setFinished(rootTrigger, true); + + runner.persistFinishedSet(mockState, modifiedFinishedSet); + + // Should write non-empty bitset + verify(mockFinishedSetState).write(modifiedFinishedSet.getBitSet()); + } + + @Test + public void testPersistFinishedSet_nonEmpty() throws Exception { + runTestPersistFinishedSet_nonEmpty(false); + } + + @Test + public void testPersistFinishedSet_nonEmptyAndOptimizationEnabled() throws Exception { + runTestPersistFinishedSet_nonEmpty(true); + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java index 613d87c127b7..492d3630ba99 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.dataflow.worker.windmill.state; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + import java.io.Closeable; import java.io.IOException; import java.util.concurrent.ExecutionException; @@ -37,6 +39,7 @@ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) public class WindmillWatermarkHold extends WindmillState implements WatermarkHoldState { + // The encoded size of an Instant. private static final int ENCODED_SIZE = 8; @@ -46,6 +49,7 @@ public class WindmillWatermarkHold extends WindmillState implements WatermarkHol private final String stateFamily; private boolean cleared = false; + private boolean knownEmpty = false; /** * If non-{@literal null}, the known current hold value, or absent if we know there are no output * watermark holds. If {@literal null}, the current hold value could depend on holds in Windmill @@ -77,6 +81,17 @@ public void clear() { localAdditions = null; } + @Override + public void setKnownEmpty() { + checkState(localAdditions == null, "setKnownEmpty called with local additions"); + checkState(!cleared, "setKnownEmpty called after clearing"); + checkState( + cachedValue == null || !cachedValue.isPresent(), + "setKnownEmpty called with a cached value"); + cachedValue = Optional.absent(); + knownEmpty = true; + } + @Override @SuppressWarnings("FutureReturnValueIgnored") public WindmillWatermarkHold readLater() { @@ -133,48 +148,67 @@ public Future persist( Future result; - if (!cleared && localAdditions == null) { - // No changes, so no need to update Windmill and no need to cache any value. - return Futures.immediateFuture(Windmill.WorkItemCommitRequest.newBuilder().buildPartial()); - } - - if (cleared && localAdditions == null) { - // Just clearing the persisted state; blind delete - Windmill.WorkItemCommitRequest.Builder commitBuilder = - Windmill.WorkItemCommitRequest.newBuilder(); - commitBuilder - .addWatermarkHoldsBuilder() - .setTag(stateKey.byteString()) - .setStateFamily(stateFamily) - .setReset(true); + if (knownEmpty) { + if (localAdditions != null) { + // 1. We know it's empty, so we can just update with localAdditions + Windmill.WorkItemCommitRequest.Builder commitBuilder = + Windmill.WorkItemCommitRequest.newBuilder(); + commitBuilder + .addWatermarkHoldsBuilder() + .setTag(stateKey.byteString()) + .setStateFamily(stateFamily) + .addTimestamps(WindmillTimeUtils.harnessToWindmillTimestamp(localAdditions)); - result = Futures.immediateFuture(commitBuilder.buildPartial()); - } else if (cleared && localAdditions != null) { - // Since we cleared before adding, we can do a blind overwrite of persisted state - Windmill.WorkItemCommitRequest.Builder commitBuilder = - Windmill.WorkItemCommitRequest.newBuilder(); - commitBuilder - .addWatermarkHoldsBuilder() - .setTag(stateKey.byteString()) - .setStateFamily(stateFamily) - .setReset(true) - .addTimestamps(WindmillTimeUtils.harnessToWindmillTimestamp(localAdditions)); + cachedValue = Optional.of(localAdditions); + result = Futures.immediateFuture(commitBuilder.buildPartial()); + } else { + // 2. State is known to be empty and there are no local additions. + // Whether 'cleared' was called or not, the desired state is empty. + // So no need to update Windmill. + result = + Futures.immediateFuture(Windmill.WorkItemCommitRequest.newBuilder().buildPartial()); + } + } else if (cleared) { + if (localAdditions == null) { + // 3. Just clearing the persisted state; blind delete + Windmill.WorkItemCommitRequest.Builder commitBuilder = + Windmill.WorkItemCommitRequest.newBuilder(); + commitBuilder + .addWatermarkHoldsBuilder() + .setTag(stateKey.byteString()) + .setStateFamily(stateFamily) + .setReset(true); - cachedValue = Optional.of(localAdditions); + result = Futures.immediateFuture(commitBuilder.buildPartial()); + } else { + // 4. Since we cleared before adding, we can do an overwrite of persisted state + Windmill.WorkItemCommitRequest.Builder commitBuilder = + Windmill.WorkItemCommitRequest.newBuilder(); + commitBuilder + .addWatermarkHoldsBuilder() + .setTag(stateKey.byteString()) + .setStateFamily(stateFamily) + .setReset(true) + .addTimestamps(WindmillTimeUtils.harnessToWindmillTimestamp(localAdditions)); - result = Futures.immediateFuture(commitBuilder.buildPartial()); - } else if (!cleared && localAdditions != null) { - // Otherwise, we need to combine the local additions with the already persisted data + cachedValue = Optional.of(localAdditions); + result = Futures.immediateFuture(commitBuilder.buildPartial()); + } + } else if (localAdditions != null) { + // 5. Otherwise, we need to combine the local additions with the already persisted data result = combineWithPersisted(); } else { - throw new IllegalStateException("Unreachable condition"); + // No changes, so no need to update Windmill and no need to cache any value. + return Futures.immediateFuture(Windmill.WorkItemCommitRequest.newBuilder().buildPartial()); } final int estimatedByteSize = ENCODED_SIZE + stateKey.byteString().size(); + return Futures.lazyTransform( result, result1 -> { cleared = false; + knownEmpty = false; localAdditions = null; if (cachedValue != null) { cache.put(namespace, stateKey, WindmillWatermarkHold.this, estimatedByteSize); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java index de036214b958..5601041a48f5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java @@ -3025,18 +3025,175 @@ public void testWatermarkClearBeforeRead() throws Exception { StateTag addr = StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); - WatermarkHoldState bag = underTest.state(NAMESPACE, addr); + WatermarkHoldState hold = underTest.state(NAMESPACE, addr); - bag.clear(); - assertThat(bag.read(), Matchers.nullValue()); + hold.clear(); + assertThat(hold.read(), Matchers.nullValue()); + + hold.add(new Instant(300)); + assertThat(hold.read(), Matchers.equalTo(new Instant(300))); + + // Shouldn't need to read from windmill because the value is already available. + Mockito.verifyNoMoreInteractions(mockReader); + } - bag.add(new Instant(300)); - assertThat(bag.read(), Matchers.equalTo(new Instant(300))); + @Test + public void testWatermarkSetKnownEmptyBeforeRead() throws Exception { + StateTag addr = + StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); + + WatermarkHoldState hold = underTest.state(NAMESPACE, addr); + + hold.setKnownEmpty(); + assertThat(hold.read(), Matchers.nullValue()); + + hold.add(new Instant(300)); + assertThat(hold.read(), Matchers.equalTo(new Instant(300))); // Shouldn't need to read from windmill because the value is already available. Mockito.verifyNoMoreInteractions(mockReader); } + @Test + public void testWatermarkSetKnownEmptyThenAddPersist() throws Exception { + StateTag addr = + StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); + + WatermarkHoldState hold = underTest.state(NAMESPACE, addr); + + hold.setKnownEmpty(); + hold.add(new Instant(1000)); + + Windmill.WorkItemCommitRequest.Builder commitBuilder = + Windmill.WorkItemCommitRequest.newBuilder(); + underTest.persist(commitBuilder); + + assertEquals(1, commitBuilder.getWatermarkHoldsCount()); + + Windmill.WatermarkHold watermarkHold = commitBuilder.getWatermarkHolds(0); + assertEquals(key(NAMESPACE, "watermark"), watermarkHold.getTag()); + assertEquals(TimeUnit.MILLISECONDS.toMicros(1000), watermarkHold.getTimestamps(0)); + + Mockito.verifyNoInteractions(mockReader); + + assertBuildable(commitBuilder); + } + + @Test + public void testNewWatermarkAddPersist() throws Exception { + StateTag addr = + StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); + + WatermarkHoldState hold = underTestNewKey.state(NAMESPACE, addr); + + hold.add(new Instant(1000)); + + Windmill.WorkItemCommitRequest.Builder commitBuilder = + Windmill.WorkItemCommitRequest.newBuilder(); + underTestNewKey.persist(commitBuilder); + + assertEquals(1, commitBuilder.getWatermarkHoldsCount()); + + Windmill.WatermarkHold watermarkHold = commitBuilder.getWatermarkHolds(0); + assertEquals(key(NAMESPACE, "watermark"), watermarkHold.getTag()); + assertEquals(TimeUnit.MILLISECONDS.toMicros(1000), watermarkHold.getTimestamps(0)); + + Mockito.verifyNoInteractions(mockReader); + + assertBuildable(commitBuilder); + } + + @Test + public void testNewWatermarkClearPersist() throws Exception { + StateTag addr = + StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); + + WatermarkHoldState hold = underTestNewKey.state(NAMESPACE, addr); + + hold.add(new Instant(1000)); + + Windmill.WorkItemCommitRequest.Builder commitBuilder = + Windmill.WorkItemCommitRequest.newBuilder(); + underTestNewKey.persist(commitBuilder); + + assertEquals(1, commitBuilder.getWatermarkHoldsCount()); + + Windmill.WatermarkHold watermarkHold = commitBuilder.getWatermarkHolds(0); + assertEquals(key(NAMESPACE, "watermark"), watermarkHold.getTag()); + assertEquals(TimeUnit.MILLISECONDS.toMicros(1000), watermarkHold.getTimestamps(0)); + + Mockito.verifyNoInteractions(mockReader); + + assertBuildable(commitBuilder); + } + + @Test + public void testWatermarkSetKnownEmptyThenClearPersist() throws Exception { + StateTag addr = + StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); + + WatermarkHoldState hold = underTest.state(NAMESPACE, addr); + + hold.setKnownEmpty(); + hold.clear(); + + Windmill.WorkItemCommitRequest.Builder commitBuilder = + Windmill.WorkItemCommitRequest.newBuilder(); + underTest.persist(commitBuilder); + + assertEquals(0, commitBuilder.getWatermarkHoldsCount()); + + Mockito.verifyNoInteractions(mockReader); + + assertBuildable(commitBuilder); + } + + @Test + public void testWatermarkSetKnownEmptyThenPersist() throws Exception { + StateTag addr = + StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); + + WatermarkHoldState hold = underTest.state(NAMESPACE, addr); + + hold.setKnownEmpty(); + + Windmill.WorkItemCommitRequest.Builder commitBuilder = + Windmill.WorkItemCommitRequest.newBuilder(); + underTest.persist(commitBuilder); + + assertEquals(0, commitBuilder.getWatermarkHoldsCount()); + + Mockito.verifyNoInteractions(mockReader); + + assertBuildable(commitBuilder); + } + + @Test + public void testWatermarkSetKnownEmptyThenClearThenAddPersist() throws Exception { + StateTag addr = + StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); + + WatermarkHoldState hold = underTest.state(NAMESPACE, addr); + + hold.setKnownEmpty(); + hold.clear(); + hold.add(new Instant(1000)); + + Windmill.WorkItemCommitRequest.Builder commitBuilder = + Windmill.WorkItemCommitRequest.newBuilder(); + underTest.persist(commitBuilder); + + assertEquals(1, commitBuilder.getWatermarkHoldsCount()); + + Windmill.WatermarkHold watermarkHold = commitBuilder.getWatermarkHolds(0); + assertEquals(key(NAMESPACE, "watermark"), watermarkHold.getTag()); + assertEquals(TimeUnit.MILLISECONDS.toMicros(1000), watermarkHold.getTimestamps(0)); + + Mockito.verifyNoInteractions(mockReader); + + assertBuildable(commitBuilder); + } + @Test public void testWatermarkPersistEarliest() throws Exception { StateTag addr = diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java index 6d4183da101f..f8b09bcf03a6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java @@ -38,4 +38,14 @@ public interface WatermarkHoldState extends GroupingState { @Override WatermarkHoldState readLater(); + + /** + * For internal use only; no backwards-compatibility guarantees. + * + *

Permit marking the state as empty locally, without necessarily clearing it in the backend. + * + *

This may be used by runners to optimize out unnecessary state reads. + */ + @Internal + default void setKnownEmpty() {} }