Using Gatherers to Solve a LeetCode Problem and Beyond.
A couple of weeks ago, I read an article by Grzegorz Piwowarek on how to write an efficient gatherer to retrieve the last elements in a stream https://4comprehension.com/java-last-gatherer/.
Grzegorz's article reminded me of a LeetCode problem I once saw https://leetcode.com/problems/kth-largest-element-in-an-array/description/. If we use the gatherer from the article on a sorted array, it would even produce a similar result.
In the case of a sorted array, we could use LastGatherer from the article:
List<T> resList = Stream.of(data)
.gather(new LastGatherer<>(k))
.toList();
return resList.get(0);
However, our array is unsorted, so we need to modify the gatherer and use a data structure that would give us the same result without sorting. We can achieve this using a binary heap. More specifically, its Java implementation: PriorityQueue. Let's make the changes.
The initializer will now look like:
@Override
public Supplier<PriorityQueue<T>> initializer() {
return PriorityQueue::new;
}
We need to update the integrator as PriorityQueue has a slightly different interface, so we replace removeFirst with poll:
@Override
public Integrator<PriorityQueue<T>, T, T> integrator() {
return Integrator.ofGreedy(((state, element, _) -> {
state.offer(element);
if(state.size() > n) {
state.poll();
}
return true;
} ));
}
The finisher would mostly remain the same. Viktor Klang pointed out that for downstream can be rejecting the elements so we need to check that condition before starting pushing down and after each push:
@Override
public BiConsumer<PriorityQueue<T>, Downstream<? super T>> finisher() {
return (state, downstream) -> {
// The downstream might be rejecting so we check
// before starting, and on each push.
boolean accepting = !downstream.isRejecting();
if(accepting) {
T elem = state.poll();
while(elem != null) {
accepting = downstream.push(elem);
if(!accepting) { break; }
elem = state.poll();
}
}
// if the downstream stopped accepting at some point
if(!accepting)
state.clear();
};
}
That's it. The input can now be any unsorted sequence of integers!
To make it easier for the reader to try, here is the complete code of the new gatherer:
private record MaxNElements<T>(int n) implements Gatherer<T, PriorityQueue<T>, T> {
@Override
public Supplier<PriorityQueue<T>> initializer() {
return PriorityQueue::new;
}
@Override
public Integrator<PriorityQueue<T>, T, T> integrator() {
return Integrator.ofGreedy(((state, element, _) -> {
state.offer(element);
if(state.size() > n) {
state.poll();
}
return true;
} ));
}
@Override
public BiConsumer<PriorityQueue<T>, Downstream<? super T>> finisher() {
return (state, downstream) -> {
// The downstream might be rejecting so we need to check the condition before
// starting, as well as the result of each push.
boolean accepting = !downstream.isRejecting();
if(accepting) {
T elem = state.poll();
while(elem != null) {
accepting = downstream.push(elem);
if(!accepting) {break;}
elem = state.poll();
}
}
// if the downstream stopped accepting at some point clear the state;
if(!accepting)
state.clear();
};
}
Combining this gatherer with standard gatherers enables elegant solutions to certain problems. Suppose there is a stream of integers and we want to consider only top n elements of each group of m elements. For example, consider a sensor which produces 10 measurements each minute and we want to use only the top 3 measurements. Here is what that looks like:
List<Integer> list = Stream.of(data)
.gather(Gatherers.windowFixed(m))
.flatMap(window -> window.stream()
.gather(new MaxNElements<>(n))).
toList();
These examples demonstrate that gatherers are a powerful feature for writing efficient and readable code.
I’m glad you’re enjoying Gatherers! Since the downstream might be rejecting elements in the finisher, it is usually a good idea to check !isRejecting() ahead of pushing, and then make pushes conditioned on the return of the previous push, which would look something like: if(!downstream.isRejecting()) { for(T elem = heap.poll(); elem != null && downstream.push(elem); elem = heap.poll()) {} } // optionally clear the heap in an else-branch here