Rx-Java 2 : Practical examples
This article is concerned about "How to use Rx-Java" in our daily situations, there are many articles that can explain every detail about Rx-Java, but this one is to make it easier to start using it even if you do not get the concept behind what is going on (Rx-Java needs time to understand it's concepts, but we can use some parts of it now)
Before we begin with examples, an important point to mention here ... in Rx-Java, we deal with data as "Streams", where we have a bunch of data, and we do some operations on this data ... an operation after another, until we take the final out-put form of this stream, as if we have some data pushed in a pipe-line, and this pipe-line keep playing with this data along the way until the end of the pipe ... this is a totally different way of thinking other than our imperative way of programming, where we take a bunch of data, and keep looping around it or keeping some of it in variables then updating these variables and so on.
Let's start with iterating over a bunch of data ... Integers
private List<Integer> integers = Arrays.asList(1, 2, 3, 4);
now we want to print this data, doing this the old way :
for (Integer i : integers) {
System.out.print(i);
}
we have to store every value in an "Integer" value named "i", then we print it ... while doing this in Rx-Java :
// create stream
Observable.fromIterable(integers)
// subscribe to this stream
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer i) {
// handle the items pushed down the stream (emitted)
System.out.print(i);
}
});
we first invoke Observable.fromIterable() to create a stream (pipe-line), this stream will iterate over the items passed to it, which are the "integers", and after looping over it, it will start "emitting" those items down this stream, in other words .. it will start pushing the integers in this stream one after another ... to receive the emitted items we need to subscribe to this stream ... through the subscribe() method
the subscribe method needs a Consumer Object in it's parameter, so when it receives an emitted item from the stream, it will invoke it's Consumer.accept() method, and pass to it this emitted item
So thinking about this example in terms of Rx-Java stream, we will create a Stream that will iterate over the "integers" List, and we will subscribe to this stream with a Consumer that prints the integer passed to it, so when ever an integer is passed to the subscribed Consumer, it will be passed to Consumer.accept() which will print the value
a cleaner way to do this is as follows :
Observable.fromIterable(integers)
.subscribe(printInteger());
Consumer<Integer> printInteger() {
return new Consumer<Integer>() {
@Override
public void accept(Integer i) {
System.out.print(i);
}
};
}
the above code is exactly the same, except that now our Consumer is returned by a method, which makes it re-usable in any other subscribe() method
Now let us iterate over Even numbers only
in the old way :
for (Integer i : integers) {
if(i % 2 == 0){
System.out.print(i);
}
}
now the code starts to have a new level of indentation, a new level of complexity, now we store the integer value in "i", then we check if "i" is even or not ... so we keep storing our data in variables and keep comparing them against our rules, while in Rx-Java we can do this :
Observable.fromIterable(integers)
.filter(byEvenNumbers())
.subscribe(printInteger());
Predicate<Integer> byEvenNumbers() {
return new Predicate<Integer>() {
@Override
public boolean test(Integer i){
return i % 2 == 0;
}
};
}
In Rx-Java, we have a Stream of Integers started by Observable.fromIterable(), where this stream will emit every Integer of the "integers" List, one at a time ... so we add a filter() operation, this filtering operation requires a Predicate as it's parameter, where ever it receives an emitted item, it passes it to the Predicate.test() method, if this method returns false, the emitted item will not continue to the next step down the stream, in other words ... will not be passed to the Consumer in subscribe(), and if the Predicate.test() returned true, the emitted item will continue down the stream to the next step and will be passed to the Consumer in subscribe() method
Notice that i did not write the printInteger() here, since it is now an independent method that can be declared even outside this class
Multi-Threading ... here comes the true difference
Now suppose that our code is running on two threads ... this is the old way :
final List<Integer> evenNumbers = new ArrayList<>(); // shared between threads
Thread threadOne = new Thread(new Runnable() {
@Override
public void run() {
for (Integer i : integers) {
if (i % 2 == 0) {
evenNumbers.add(i);
}
}
}
});
Thread threadTwo = new Thread(new Runnable() {
@Override
public void run() {
for (Integer evenNumber : evenNumbers) {
System.out.println(evenNumber);
}
}
});
threadOne.start();
threadOne.join();
threadTwo.start();
Let's agree that this is a big change in implementation, and also having a common list between 2 threads can cause lots issues
Now for the Rx-Java version of the same example :
Observable.fromIterable(integers)
.filter(byEvenNumbers())
.subscribeOn(Schedulers.newThread()) // threadOne
.observeOn(Schedulers.newThread()) // threadTwo
.subscribe(printInteger())
Those 2 new lines made every thing required for multi-threading, where subscribeOn() tells the Stream to operate on a new thread, and the observeOn() tells the stream that it will emit the items to the subscribed Consumer in another thread
as Rx-Java provides Schedulers, a way to tell the stream on which thread it should operate ... in android, it is very common to see this :
Observable.fromIterable(integers)
.filter(byEvenNumbers())
.subscribeOn(Schedulers.io()) // IO thread
.observeOn(AndroidSchedulers.mainThread()) // UI thread
.subscribe(printInteger())
in this snippet, the Stream will do all the heavy stuff (iterating and filtering) in an IO thread (one of the threads that are used for server requests or any IO operations), and after the Stream finishes, it will emit the items to be printed on the UI thread
another way of doing the above code, if we were on the UI thread, we can write the above code as follows :
Observable.fromIterable(integers)
.filter(byEvenNumbers())
.subscribeOn(Schedulers.io()) // IO thread
.blockingSubscribe(printInteger()) // current thread
Declarative vs Imperative styles
The Main differences between Rx-Java and our old ways is the declarative way of coding, where we can keep track of every thing we do in a stream, where we start a stream form some data, we filter those data by even numbers, we make all the stream on an IO thread, and when the stream finishes we receive the emitted items on our thread ... every step is a method call, unlike our old Imperative way, where we have to keep track in our minds of what is that "if" inside the "for" and what is the current state of the shared List between 2 threads
you may have noticed also is that Rx-Java helps writing stateless code, no variables, no shared data between threads, every method is re-usable in another stream, we can use byEvenNumbers() in any stream that emits Integers
When can we say that we need to use Rx-Java ?
When we need to manipulate some data, like data structures or collections, and for advanced usage, when ever timing the execution of code is important, when we care if a method should be executed before another one, or at the same time (multi-threading), or any case where timing the execution matters
another basic operator that you may need to look for after understanding the above code is map(), also error handling is a big subject, maybe in another article
Straight to the point (Y)
Great work!!
بالنسبة للسؤالين اللي في الاول، انت صح في الاتنين ال predicate هي Function بترجع boolean
والله انت راجل شاطر