Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run!",
"modification": 7,
"modification": 1,
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.beam.runners.core.triggers.TriggerStateMachineRunner;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
Expand Down Expand Up @@ -93,6 +94,11 @@
}) // TODO(https://github.com/apache/beam/issues/20497)
public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {

// Experiments guarding optimizations in development. No backward compatibility guarantees.
public static final String UNSTABLE_NOT_UPDATE_COMPATIBLE_NEW_WINDOW_OPTIMIZATION =
"unstable_not_update_compatible_new_window_optimization";
public static final String UNSTABLE_DISABLE_WATERMARK_KNOWN_EMPTY_OPTIMIZATION =
"unstable_disable_watermark_known_empty_optimization";
/**
* The {@link ReduceFnRunner} depends on most aspects of the {@link WindowingStrategy}.
*
Expand Down Expand Up @@ -211,6 +217,9 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
*/
private final NonEmptyPanes<K, W> nonEmptyPanes;

private final boolean useNewWindowOptimization;
private final boolean disableWatermarkKnownEmptyOptimization;

public ReduceFnRunner(
K key,
WindowingStrategy<?, W> windowingStrategy,
Expand All @@ -237,6 +246,15 @@ public ReduceFnRunner(

this.nonEmptyPanes = NonEmptyPanes.create(this.windowingStrategy, this.reduceFn);

this.useNewWindowOptimization =
windowingStrategy.getWindowFn().isNonMerging()
&& ExperimentalOptions.hasExperiment(
options, UNSTABLE_NOT_UPDATE_COMPATIBLE_NEW_WINDOW_OPTIMIZATION);

this.disableWatermarkKnownEmptyOptimization =
ExperimentalOptions.hasExperiment(
options, UNSTABLE_DISABLE_WATERMARK_KNOWN_EMPTY_OPTIMIZATION);

// Note this may incur I/O to load persisted window set data.
this.activeWindows = createActiveWindowSet();

Expand All @@ -256,7 +274,8 @@ public ReduceFnRunner(
new TriggerStateMachineRunner<>(
triggerStateMachine,
new TriggerStateMachineContextFactory<>(
windowingStrategy.getWindowFn(), stateInternals, activeWindows));
windowingStrategy.getWindowFn(), stateInternals, activeWindows),
this.useNewWindowOptimization);
}

private ActiveWindowSet<W> createActiveWindowSet() {
Expand All @@ -275,6 +294,16 @@ boolean hasNoActiveWindows() {
return activeWindows.getActiveAndNewWindows().isEmpty();
}

@VisibleForTesting
TriggerStateMachineRunner<W> getTriggerRunner() {
return triggerRunner;
}

@VisibleForTesting
ReduceFnContextFactory<K, InputT, OutputT, W> getContextFactory() {
return contextFactory;
}

private Set<W> windowsThatAreOpen(Collection<W> windows) {
Set<W> result = new HashSet<>();
for (W window : windows) {
Expand Down Expand Up @@ -613,6 +642,20 @@ private void processElement(Map<W, W> windowToMergeResult, WindowedValue<InputT>
StateStyle.RENAMED,
value.causedByDrain());

// TODO: Make sure the NewWindowOptimization does not create unbounded trigger state
// in GlobalWindow
if (useNewWindowOptimization && triggerRunner.isNew(directContext.state())) {
// Blindly clear state to ensure Windmill doesn't do unnecessary reads.
Comment thread
arunpandianp marked this conversation as resolved.
// TODO: Instead of the clears here, we could mark these states as empty locally
// in the state cache and/or explicitly tell that the entries are non-existent
reduceFn.clearState(renamedContext);
paneInfoTracker.clear(directContext.state());
if (!disableWatermarkKnownEmptyOptimization) {
watermarkHold.setKnownEmpty(renamedContext);
}
nonEmptyPanes.clearPane(renamedContext.state());
}

nonEmptyPanes.recordContent(renamedContext.state());
scheduleGarbageCollectionTimer(directContext);

Expand Down Expand Up @@ -747,7 +790,7 @@ public void onTimers(Iterable<TimerData> timers) throws Exception {

// Perform prefetching of state to determine if the trigger should fire.
if (windowActivation.isGarbageCollection) {
triggerRunner.prefetchIsClosed(directContext.state());
triggerRunner.prefetchFinishedSet(directContext.state());
} else {
triggerRunner.prefetchShouldFire(directContext.window(), directContext.state());
}
Expand Down Expand Up @@ -926,7 +969,7 @@ private void prefetchEmit(
ReduceFn<K, InputT, OutputT, W>.Context directContext,
ReduceFn<K, InputT, OutputT, W>.Context renamedContext) {
triggerRunner.prefetchShouldFire(directContext.window(), directContext.state());
triggerRunner.prefetchIsClosed(directContext.state());
triggerRunner.prefetchFinishedSet(directContext.state());
prefetchOnTrigger(directContext, renamedContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,23 @@ public void clearHolds(ReduceFn<?, ?, ?, W>.Context context) {
context.state().access(EXTRA_HOLD_TAG).clear();
}

/**
* <b><i>For internal use only; no backwards-compatibility guarantees.</i></b>
*
* <p>Permit marking the watermark holds as empty locally, without necessarily clearing them in
* the backend.
*/
public void setKnownEmpty(ReduceFn<?, ?, ?, W>.Context context) {
WindowTracing.debug(
"WatermarkHold.setKnownEmpty: For key:{}; window:{}; inputWatermark:{}; outputWatermark:{}",
context.key(),
context.window(),
timerInternals.currentInputWatermarkTime(),
timerInternals.currentOutputWatermarkTime());
context.state().access(elementHoldTag).setKnownEmpty();
context.state().access(EXTRA_HOLD_TAG).setKnownEmpty();
}

/** Return the current data hold, or null if none. Does not clear. For debugging only. */
public @Nullable Instant getDataCurrent(ReduceFn<?, ?, ?, W>.Context context) {
return context.state().access(elementHoldTag).read();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.runners.core.triggers;

import java.util.BitSet;
import org.checkerframework.checker.nullness.qual.Nullable;

/** A {@link FinishedTriggers} implementation based on an underlying {@link BitSet}. */
public class FinishedTriggersBitSet implements FinishedTriggers {
Expand Down Expand Up @@ -60,4 +61,17 @@ public void clearRecursively(ExecutableTriggerStateMachine trigger) {
public FinishedTriggersBitSet copy() {
return new FinishedTriggersBitSet((BitSet) bitSet.clone());
}

@Override
public boolean equals(@Nullable Object obj) {
if (!(obj instanceof FinishedTriggersBitSet)) {
return false;
}
return bitSet.equals(((FinishedTriggersBitSet) obj).bitSet);
}

@Override
public int hashCode() {
return bitSet.hashCode();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,16 @@ public class TriggerStateMachineRunner<W extends BoundedWindow> {

private final ExecutableTriggerStateMachine rootTrigger;
private final TriggerStateMachineContextFactory<W> contextFactory;
private final boolean useNewWindowOptimization;

public TriggerStateMachineRunner(
ExecutableTriggerStateMachine rootTrigger,
TriggerStateMachineContextFactory<W> contextFactory) {
TriggerStateMachineContextFactory<W> contextFactory,
boolean useNewWindowOptimization) {
checkState(rootTrigger.getTriggerIndex() == 0);
this.rootTrigger = rootTrigger;
this.contextFactory = contextFactory;
this.useNewWindowOptimization = useNewWindowOptimization;
}

private FinishedTriggersBitSet readFinishedBits(ValueState<BitSet> state) {
Expand All @@ -81,9 +84,11 @@ private FinishedTriggersBitSet readFinishedBits(ValueState<BitSet> state) {
}

@Nullable BitSet bitSet = state.read();
return bitSet == null
? FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree())
: FinishedTriggersBitSet.fromBitSet(bitSet);
if (bitSet == null) {
return FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree());
}

return FinishedTriggersBitSet.fromBitSet(bitSet);
}

private void clearFinishedBits(ValueState<BitSet> state) {
Expand All @@ -99,19 +104,29 @@ public boolean isClosed(StateAccessor<?> state) {
return readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger);
}

public void prefetchIsClosed(StateAccessor<?> state) {
/** Return true if the window is new (no trigger state has ever been persisted). */
public boolean isNew(StateAccessor<?> state) {
return isFinishedSetNeeded() && state.access(FINISHED_BITS_TAG).read() == null;
}

@VisibleForTesting
public BitSet getFinishedBits(StateAccessor<?> state) {
return readFinishedBits(state.access(FINISHED_BITS_TAG)).getBitSet();
}

public void prefetchFinishedSet(StateAccessor<?> state) {
if (isFinishedSetNeeded()) {
state.access(FINISHED_BITS_TAG).readLater();
}
}

public void prefetchForValue(W window, StateAccessor<?> state) {
prefetchIsClosed(state);
prefetchFinishedSet(state);
rootTrigger.invokePrefetchOnElement(contextFactory.createPrefetchContext(window, rootTrigger));
}

public void prefetchShouldFire(W window, StateAccessor<?> state) {
prefetchIsClosed(state);
prefetchFinishedSet(state);
rootTrigger.invokePrefetchShouldFire(contextFactory.createPrefetchContext(window, rootTrigger));
}

Expand Down Expand Up @@ -180,13 +195,23 @@ public void onFire(W window, Timers timers, StateAccessor<?> state) throws Excep
persistFinishedSet(state, finishedSet);
}

private void persistFinishedSet(
StateAccessor<?> state, FinishedTriggersBitSet modifiedFinishedSet) {
@VisibleForTesting
void persistFinishedSet(StateAccessor<?> state, FinishedTriggersBitSet modifiedFinishedSet) {
if (!isFinishedSetNeeded()) {
return;
}

ValueState<BitSet> finishedSetState = state.access(FINISHED_BITS_TAG);

if (useNewWindowOptimization) {
if (finishedSetState.read() == null
|| !readFinishedBits(finishedSetState).equals(modifiedFinishedSet)) {
// Write a value even if the bitset was empty
finishedSetState.write(modifiedFinishedSet.getBitSet());
}
return;
}

if (!readFinishedBits(finishedSetState).equals(modifiedFinishedSet)) {
Copy link
Copy Markdown
Contributor Author

@arunpandianp arunpandianp May 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: before this PR this check always returned true due to equals method missing in FinishedTriggersBitSet. With the added equals method, won't be writing finishedSetState every time.

if (modifiedFinishedSet.getBitSet().isEmpty()) {
finishedSetState.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,19 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.beam.runners.core.ReduceFnContextFactory.StateStyle;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.triggers.DefaultTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachine;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.TimeDomain;
Expand Down Expand Up @@ -2343,4 +2347,58 @@ public interface TestOptions extends PipelineOptions {

void setValue(int value);
}

@Test
public void testNewWindowOptimization() throws Exception {
WindowingStrategy<?, IntervalWindow> strategy =
WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
.withTrigger(AfterPane.elementCountAtLeast(2))
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES);

PipelineOptions options = PipelineOptionsFactory.create();
options
.as(ExperimentalOptions.class)
.setExperiments(
Collections.singletonList("unstable_not_update_compatible_new_window_optimization"));
ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
ReduceFnTester.nonCombining(strategy, options);

IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10));

assertTrue(
"Window should be new",
tester
.createRunner()
.getTriggerRunner()
.isNew(
tester.createRunner().getContextFactory().base(window, StateStyle.DIRECT).state()));

// 1. First element for a new window.
tester.injectElements(TimestampedValue.of(1, new Instant(1)));

BitSet bitSet =
tester
.createRunner()
.getTriggerRunner()
.getFinishedBits(
tester.createRunner().getContextFactory().base(window, StateStyle.DIRECT).state());
assertTrue("Bitset should be empty", bitSet.isEmpty());
assertFalse("Trigger should not be finished", bitSet.get(0));

assertFalse(
"Window should no longer be new",
tester
.createRunner()
.getTriggerRunner()
.isNew(
tester.createRunner().getContextFactory().base(window, StateStyle.DIRECT).state()));

// 2. Second element for the same window.
tester.injectElements(TimestampedValue.of(2, new Instant(2)));

// Extract output.
List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
// 2 elements fired at end of window.
assertThat(output, contains(isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,19 @@ ReduceFnTester<Integer, Iterable<Integer>, W> nonCombining(
NullSideInputReader.empty());
}

public static <W extends BoundedWindow>
ReduceFnTester<Integer, Iterable<Integer>, W> nonCombining(
WindowingStrategy<?, W> windowingStrategy, PipelineOptions options) throws Exception {
return new ReduceFnTester<>(
windowingStrategy,
TriggerStateMachines.stateMachineForTrigger(
TriggerTranslation.toProto(windowingStrategy.getTrigger())),
SystemReduceFn.buffering(VarIntCoder.of()),
IterableCoder.of(VarIntCoder.of()),
options,
NullSideInputReader.empty());
}

/**
* Creates a {@link ReduceFnTester} for the given {@link WindowingStrategy} and {@link
* TriggerStateMachine}, for mocking the interactions between {@link ReduceFnRunner} and the
Expand Down
Loading
Loading