Active Object pattern. Part 1. Basis.
The Active Object pattern is a way to approach thread safety by serializing operations on an object. This pattern is also known by other names, such as “Actor” in Scala or “Task” in Ada. I decided to write a basic implementation to explore its properties.
In a nutshell Active Object pattern is a concurrent generalization of the command pattern. All operations (commands) are added to a queue and they are executed by a single thread which is created exclusively for this purpose.
Let's look at the basic implementation, which demonstrates how to setup the main processing loop, accept commands, and gracefully shutdown. The shutdown logic prevents the object from receiving new commands after a shutdown started, and exits the execution loop once all previously queued commands are completed.
public class ActiveObject implements AutoCloseable {
private final Thread thread;
This is the core of the implementation - a queue that serializes the execution
private BlockingQueue<Command> commands = new LinkedTransferQueue<>();
Flag that marks that all queued commands have been executed
private volatile boolean isReadyForShutdown;
Flag to mark that the object stop accepting new commands in preparation for shutdown
private volatile boolean prepareForShutdown;
In the constructor we create a thread that will execute the commands
public ActiveObject() {
thread = Thread.ofVirtual().unstarted(() -> {
boolean isInterrupted = false;
while (!commands.isEmpty() || !isInterrupted) {
try {
Command task = commands.take();
task.execute();
} catch (InterruptedException e) {
isInterrupted = true;
}
}
isReadyForShutdown = true;
if (isInterrupted) {
Thread.currentThread().interrupt();
}
});
thread.start();
}
The method which accepts the commands and add the to the working queue to be executed.
public void accept(Command command) {
try {
We don't accept new commands after the shutdown is started, already queued command will still be executed.
if (!prepareForShutdown) {
commands.put(command);
} else {
throw new IllegalStateException("Object is shutting down");
}
} catch (InterruptedException e) {
thread.interrupt();
}
}
Stop accepting new commands and wait for all queued commands to finish their execution
public void shutdown() {
stop accepting new commands
prepareForShutdown = true;
Interrupt the working thread
thread.interrupt();
Recommended by LinkedIn
wait uninterruptedly for the worker thread to execute all queued commands.
Object monitor = new Object();
synchronized (monitor) {
boolean interrupted = Thread.interrupted();
while (!isReadyForShutdown) {
try {
monitor.wait(100);
} catch (InterruptedException e) {
interrupted = true;
}
}
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
Close method to use ActiveObject with a try-with-resource block
@Override
public void close() {
shutdown();
}
Using Active Object pattern, you can implement thread safe classes that do not require additional synchronization. Let's implement a simple thread safe list that supports adding and printing of the elements.
public class ConcurrentList<T> implements SimpleList<T> {
The subject to which we will apply our commands
final List<T> list = new ArrayList<>();
We create an instance of ActiveObject to be used for the command execution
final ActiveObject activeObject = new ActiveObject();
The rest of the methods pass commands to the active object that will be applied to the subject:
@Override
public void add(T message) {
activeObject.accept(() -> {
list.add(message);
});
}
@Override
public void remove(T message) {
activeObject.accept(() -> {
list.remove(message);
});
}
@Override
public void print(int s) {
activeObject.accept(() -> {
int size = Math.min(list.size(), s);
for (int i = 0; i < size; i++) {
System.out.print(list.get(i) + " ");
}
});
}
@Override
public void print() {
activeObject.accept(() -> {
int size = list.size();
for (int i = 0; i < size; i++) {
System.out.print(list.get(i) + " ");
}
});
}
And finally, a method call to gracefully shutdown the active object.
public void shutdown() {
activeObject.shutdown();
}
This example shows how easy it is to implement a thread-safe, non-blocking solution - since blocking only occurs when the internal queue is full, which is highly unlikely (unless you run out of memory, in which case something has gone wrong anyway).
However, there are some limitations to this implementation. The most noticeable is that it only supports commands that don’t return results. Secondly, it lacks robust error handling.
These are some areas I’d like to explore in future posts. It would be interesting to examine how various industrial asynchronous frameworks like Akka or Vert.x handle these challenges. I hope that such an investigation will help determine when it’s worth bringing in a large, powerful, and sometimes intimidating framework, and when a simple solution like Active Object is enough.
Another interesting question post-JDK 21 is when it’s beneficial to use virtual threads as worker threads.
The Active Object pattern provides a lightweight way to make objects thread-safe without explicit synchronization. It's easy to implement, extend, and reason about, and for many cases, it might be just what you need.
Have you ever used this pattern in your projects? What challenges or variations did you encounter? I’d love to hear your experiences.
#activeobject #multithreading #java #akka #vertx #nonblocking
Aside from the valuable central point of this article, it's great to see proper management of interruption.