🗓️ 20012025 1436
📎
flink_state
Why is Flink Involved in Managing State?
Basically, because Flink has some good features:
- Local:
- Flink state is kept local to the machine that processes it
- Can be accessed at memory speed
- Durable: Flink state is fault-tolerant
- Automatically checkpointed at regular intervals
- Restored upon failure
- Vertically scalable
- Flink state can be kept in embedded RocksDB instances
- Scale by adding more local disk
- Horizontally scalable
- Flink state is redistributed as your cluster grows and shrinks
Rich Functions
- There are "rich" variants of Flink's function interfaces
- Contain additional methods
Method | Description |
---|---|
open(Configuration c) | Called once during operator initialization (e.g., to load static data or establish connections) |
close() | Called at the end of the operator's lifecycle |
getRuntimeContext() | Provides access to Flink’s runtime context, including state management |
Keyed State
State that is partitioned by key when working with a keyed stream (e.g., using keyBy
)
INFO
Flink maintains a separate key/value store for each key, allowing state to be tied to specific keys in the stream.
ValueState
A simple form of keyed state where one value per key is stored (there are other forms)
How it Works
- Initialization
- A
ValueStateDescriptor
is used to define the name and type of the state. - Done in
open()
- State is accessed via
getRuntimeContext().getState(descriptor)
.
- Access and Update
- During processing, Flink dynamically associates the state with the key of the current event.
- Methods like
value()
andupdate()
allow reading and updating the state.
Distributed State Management
- Keyed state (including
ValueState
) is sharded across Flink nodes, with each parallel instance managing state for its assigned keys - The state for a key is local to the parallel instance handling that key and is not shared across nodes.
public static class Deduplicator extends RichFlatMapFunction<Event, Event> {
ValueState<Boolean> keyHasBeenSeen;
@Override
public void open(Configuration conf) {
ValueStateDescriptor<Boolean> desc = new ValueStateDescriptor<>("keyHasBeenSeen", Types.BOOLEAN);
keyHasBeenSeen = getRuntimeContext().getState(desc);
}
@Override
public void flatMap(Event event, Collector<Event> out) throws Exception {
if (keyHasBeenSeen.value() == null) {
out.collect(event);
keyHasBeenSeen.update(true);
}
}
}
Clearing State
If keyed state is not cleared, it will keep growing for every distinct key encountered
Therefore, keyed state should be cleared when it's no longer needed
This can be done through:
- Manual clearing
keyHasBeenSeen.clear();
- Timers: Clear state after a period of inactivity (e.g., with ProcessFunction).
- State Time-to-Live (TTL)
Non-keyed (Operator) State
NOTE
Rarely needed in user-defined functions
- Primarily used in implementation of sources and sinks
- Operates different
Connected Streams
- Pattern for allowing a single operator to process two input streams simultaneously
- Enable dynamic transformations by streaming in thresholds, rules, or parameters
- Common use cases:
- Streaming joins
- Advanced processing requiring interaction between two streams
Simple transformation
Connected stream
Keying Requirement
- Both streams must be keyed in a compatible way (same keying logic and keyspace)
- Ensures that events with the same key from both streams are processed by the same parallel instance
Features
State Sharing:
- Connected streams can share keyed state.
- For example, one stream can update state, while the other uses it for processing.
RichCoFlatMapFunction:
- A special function that processes connected streams.
- Provides
flatMap1
for the first stream andflatMap2
for the second stream. - Supports stateful processing via the rich function interface.
Considerations
- Race Conditions:
- The order of
flatMap1
andflatMap2
calls is not guaranteed. - Flink processes events from the two streams independently, which can lead to race conditions.
- Use managed state to buffer events if ordering is critical.
- The order of
- Custom Operators:
- For precise control over input consumption, use a custom operator implementing
InputSelectable
.
- For precise control over input consumption, use a custom operator implementing
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> control = env
.fromElements("DROP", "IGNORE")
.keyBy(x -> x);
DataStream<String> streamOfWords = env
.fromElements("Apache", "DROP", "Flink", "IGNORE")
.keyBy(x -> x);
control
.connect(streamOfWords)
.flatMap(new ControlFunction())
.print();
env.execute();
}
public static class ControlFunction extends RichCoFlatMapFunction<String, String, String> {
private ValueState<Boolean> blocked;
@Override
public void open(Configuration config) {
blocked = getRuntimeContext()
.getState(new ValueStateDescriptor<>("blocked", Boolean.class));
}
@Override
public void flatMap1(String control_value, Collector<String> out) throws Exception {
blocked.update(Boolean.TRUE);
}
@Override
public void flatMap2(String data_value, Collector<String> out) throws Exception {
if (blocked.value() == null) {
out.collect(data_value);
}
}
}