RxJava has many uses, especially for applications requiring any amount of asynchronous operations. If you do not have experience with RxJava (or any of its sister implementations), I recommend checking it out and seeing if it can help improve your applications. I will not be covering the basics of RxJava in this blog, so a familiarity with the core concepts is recommended.
In this blog I will be covering how to create a simple bus implementation using RxJava in a pure Java application. This implementation can be ported to other Java-based platforms, such as Android. There are a few particulars I will cover before jumping into the implementation.
Subjects
A Subject is an Object in RxJava that has the properties of both an Observable and an Observer/Subscriber. It can both subscribe to Observables to receive data, as well as emit data to Subscribers who subscribe to it. It can also emit data by directly calling the onNext method. In this manner, it can be used as a middle-man who receives data, manipulates it in some way, then emits it to its own subscribers.
There are two type parameters for a Subject: T and R. T is type of data it will emit (from the Observer interface), while R is the type of data it will receive (From the Observable class). Typically, these will be the same type unless data manipulation is taking place.
There are a few types of specialized Subjects that are worth noting for this implementation. The most basic Subject implementation is PublishSubject. PublishSubject will simply emit data to each subscriber each time its onNext method is called (either manually or from an Observable).
A SerializedSubject can be used to wrap another Subject to make calls to its methods thread-safe.
Implementation
A simple bus implementation has the ability to register for events and send events. In this implementation, an event can be any Object with any level of complexity.
So to start out, we will need a singleton bus class which I will call RxBus.
public class RxBus { private static final RxBus INSTANCE = new RxBus(); public static RxBus getInstance() { return INSTANCE; }}
Next we’ll add a PublishSubject wrapped in a SerializedSubject to the class. The PublishSubject will be used to emit events to subscribers listening for those events. The type parameters will be Object to allow any class to be an event.
public class RxBus {
private static final RxBus INSTANCE = new RxBus();
private final Subject<Object, Object> mBusSubject = new SerializedSubject<>(PublishSubject.create());
public static RxBus getInstance() {
return INSTANCE;
}
}
We’ll start with adding a register method. This method will take in a Class type of the event we’re interested in, as well the action to take when receiving an event. The resulting Subscription will be returned to the caller so they can unsubscribe whenever they deem appropriate.
public class RxBus {
private static final RxBus INSTANCE = new RxBus();
private final Subject<Object, Object> mBusSubject = new SerializedSubject<>(PublishSubject.create());
public static RxBus getInstance() {
return INSTANCE;
}
public <T> Subscription register(final Class<T> eventClass, Action1<T> onNext) {
return mBusSubject
.filter(event -> event.getClass().equals(eventClass))
.map(obj -> (T) obj)
.subscribe(onNext);
}
}
Using the PublishSubject, we filter out any event that is not the event type we’re interested in, map it to T (Which is defined by the caller), then subscribe to it using the onNext implementation passed in through the method.
Lastly, we need a method for posting events. This is simply passing the event to the onNext method of the Subject.
public class RxBus {
private static final RxBus INSTANCE = new RxBus();
private final Subject<Object, Object> mBusSubject = new SerializedSubject<>(PublishSubject.create());
public static RxBus getInstance() {
return INSTANCE;
}
public <T> Subscription register(final Class<T> eventClass, Action1<T> onNext) {
return mBusSubject
.filter(event -> event.getClass().equals(eventClass))
.map(obj -> (T) obj)
.subscribe(onNext);
}
public void post(Object event) {
mBusSubject.onNext(event);
}
}
That’s it! Here is a sample using the above implementation (Event classes are empty implementations):
public class Main {
private static WorkerThread sWorkerThread = new WorkerThread();
public static void main(String[] args) throws IOException {
sWorkerThread.start();
System.out.println("Press the enter key to start and stop work!");
for (; ; ) {
System.in.read();
if (!sWorkerThread.isRunning()) {
RxBus.getInstance().post(new StartWorkEvent());
} else {
RxBus.getInstance().post(new StopWorkEvent());
}
}
}
private static class WorkerThread extends Thread {
private Subscription mStartWorkSubscription;
private Subscription mStopWorkSubscription;
private boolean mRunning;
@Override
public void run() {
mStartWorkSubscription = RxBus.getInstance().register(StartWorkEvent.class, event -> {
mRunning = true;
synchronized (this) {
notify();
}
});
mStopWorkSubscription = RxBus.getInstance().register(StopWorkEvent.class, event -> {
mRunning = false;
});
for (; ; ) {
if (Thread.interrupted()) {
cleanUp();
return;
}
if (!mRunning) {
System.out.println("Stopping work");
try {
synchronized (this) {
wait();
}
} catch (InterruptedException e) {
cleanUp();
return;
}
System.out.println("Starting work");
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
cleanUp();
return;
}
doWork();
}
}
private void doWork() {
System.out.print(".");
}
private void cleanUp() {
mStartWorkSubscription.unsubscribe();
mStopWorkSubscription.unsubscribe();
}
public boolean isRunning() {
return mRunning;
}
}
}
You can find the source code online here!