diff --git a/examples/pom.xml b/examples/pom.xml index 5a97c95f..5b10c042 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -381,6 +381,28 @@ + + accumulator-blackhole + package + + dockerBuild + + + + amazoncorretto:11 + + + + io.numaproj.numaflow.examples.accumulator.blackhole.BlackholeFactory + + + + + numaflow-java-examples/accumulator-blackhole:${docker.tag} + + + + on-success-sink package diff --git a/examples/src/main/java/io/numaproj/numaflow/examples/accumulator/blackhole/BlackholeFactory.java b/examples/src/main/java/io/numaproj/numaflow/examples/accumulator/blackhole/BlackholeFactory.java new file mode 100644 index 00000000..4b0b3a2a --- /dev/null +++ b/examples/src/main/java/io/numaproj/numaflow/examples/accumulator/blackhole/BlackholeFactory.java @@ -0,0 +1,62 @@ +package io.numaproj.numaflow.examples.accumulator.blackhole; + +import io.numaproj.numaflow.accumulator.Server; +import io.numaproj.numaflow.accumulator.model.Accumulator; +import io.numaproj.numaflow.accumulator.model.AccumulatorFactory; +import io.numaproj.numaflow.accumulator.model.Datum; +import io.numaproj.numaflow.accumulator.model.Message; +import io.numaproj.numaflow.accumulator.model.OutputStreamObserver; +import lombok.extern.slf4j.Slf4j; + +/** + * Blackhole is an accumulator that intentionally discards every datum it receives without + * forwarding any data downstream. + * + *

A naive implementation would simply read the input stream and emit nothing. However, an + * accumulator that never emits anything for the datums it consumes leaves the framework unable to + * release the per-datum tracked state, leading to unbounded memory growth. + * + *

Instead, this example emits a drop message for every datum using {@link Message#toDrop(Datum)}. + * A drop message is not forwarded to the next vertex, but it still allows the framework to advance + * the watermark and release the tracked state for that datum - giving us "blackhole" semantics + * without leaking memory. This pattern is useful for multiplexer-, cross-join-, or filter-style + * accumulators that legitimately need to omit some (or all) of their inputs. + */ +@Slf4j +public class BlackholeFactory extends AccumulatorFactory { + + public static void main(String[] args) throws Exception { + log.info("Starting blackhole accumulator server.."); + Server server = new Server(new BlackholeFactory()); + + // Start the server + server.start(); + + // wait for the server to shut down + server.awaitTermination(); + log.info("Blackhole accumulator server exited.."); + } + + @Override + public Blackhole createAccumulator() { + return new Blackhole(); + } + + public static class Blackhole extends Accumulator { + @Override + public void processMessage(Datum datum, OutputStreamObserver outputStream) { + log.info( + "Dropping datum with event time: {}, watermark: {}", + datum.getEventTime().toEpochMilli(), + datum.getWatermark().toEpochMilli()); + // Emit a drop message: nothing is forwarded downstream, but the framework still + // advances the watermark and releases the tracked state for this datum. + outputStream.send(Message.toDrop(datum)); + } + + @Override + public void handleEndOfStream(OutputStreamObserver outputStreamObserver) { + log.info("End of stream received, nothing to flush for blackhole accumulator"); + } + } +} diff --git a/src/main/java/io/numaproj/numaflow/accumulator/model/Message.java b/src/main/java/io/numaproj/numaflow/accumulator/model/Message.java index 8e06087c..ae63c80f 100644 --- a/src/main/java/io/numaproj/numaflow/accumulator/model/Message.java +++ b/src/main/java/io/numaproj/numaflow/accumulator/model/Message.java @@ -7,6 +7,8 @@ /** Message is used to wrap the data returned by Accumulator functions. */ @Getter public class Message { + private static final String[] DROP_TAGS = {"U+005C__DROP__"}; + private final Instant eventTime; private final Instant watermark; private final Map headers; @@ -32,6 +34,23 @@ public Message(Datum datum) { this.tags = null; } + /** + * Creates a Message from the given Datum with drop tags set, so the message is not forwarded to + * the next vertex but still allows the accumulator to advance the watermark and release the + * tracked state. It is advised to use the incoming Datum to construct the message, because event + * time, watermark, id and headers of the message are derived from the Datum. Only use a custom + * implementation of the Datum if you know what you are doing. + * + * @param datum {@link Datum} object to drop the results for + * @return a {@link Message} tagged to be dropped + */ + public static Message toDrop(Datum datum) { + Message message = new Message(datum); + message.setValue(new byte[0]); + message.setTags(DROP_TAGS); + return message; + } + /* * sets the value of the message * diff --git a/src/test/java/io/numaproj/numaflow/accumulator/model/MessageTest.java b/src/test/java/io/numaproj/numaflow/accumulator/model/MessageTest.java new file mode 100644 index 00000000..57640cd5 --- /dev/null +++ b/src/test/java/io/numaproj/numaflow/accumulator/model/MessageTest.java @@ -0,0 +1,109 @@ +package io.numaproj.numaflow.accumulator.model; + +import org.junit.Test; + +import java.time.Instant; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +public class MessageTest { + + @Test + public void testMessageFromDatum() { + Datum datum = buildDatum(); + Message message = new Message(datum); + + assertArrayEquals("hello".getBytes(), message.getValue()); + assertArrayEquals(new String[]{"key1", "key2"}, message.getKeys()); + assertArrayEquals(null, message.getTags()); + assertEquals("test-id", message.getId()); + assertEquals(Instant.ofEpochMilli(60000), message.getEventTime()); + assertEquals(Instant.ofEpochMilli(59000), message.getWatermark()); + } + + @Test + public void testToDrop() { + Datum datum = buildDatum(); + Message message = Message.toDrop(datum); + + // The DROP tag must be set so the message is not forwarded downstream. + String[] dropTags = {"U+005C__DROP__"}; + assertArrayEquals(dropTags, message.getTags()); + // No value is forwarded, but the identifying/watermark fields are carried over so the + // accumulator can advance the watermark and release the tracked state. + assertArrayEquals(new byte[0], message.getValue()); + assertArrayEquals(new String[]{"key1", "key2"}, message.getKeys()); + assertEquals("test-id", message.getId()); + assertEquals(Instant.ofEpochMilli(60000), message.getEventTime()); + assertEquals(Instant.ofEpochMilli(59000), message.getWatermark()); + } + + private Datum buildDatum() { + Map headers = new HashMap<>(); + headers.put("x", "y"); + return new TestDatum( + new String[]{"key1", "key2"}, + "hello".getBytes(), + Instant.ofEpochMilli(59000), + Instant.ofEpochMilli(60000), + headers, + "test-id"); + } + + private static class TestDatum implements Datum { + private final String[] keys; + private final byte[] value; + private final Instant watermark; + private final Instant eventTime; + private final Map headers; + private final String id; + + TestDatum( + String[] keys, + byte[] value, + Instant watermark, + Instant eventTime, + Map headers, + String id) { + this.keys = keys; + this.value = value; + this.watermark = watermark; + this.eventTime = eventTime; + this.headers = headers; + this.id = id; + } + + @Override + public byte[] getValue() { + return value; + } + + @Override + public String[] getKeys() { + return keys; + } + + @Override + public Instant getEventTime() { + return eventTime; + } + + @Override + public Instant getWatermark() { + return watermark; + } + + @Override + public Map getHeaders() { + return headers; + } + + @Override + public String getID() { + return id; + } + } +}