Rx-Java 2 : Practical examples

Rx-Java 2 : Practical examples

This article is concerned about "How to use Rx-Java" in our daily situations, there are many articles that can explain every detail about Rx-Java, but this one is to make it easier to start using it even if you do not get the concept behind what is going on (Rx-Java needs time to understand it's concepts, but we can use some parts of it now)

Before we begin with examples, an important point to mention here ... in Rx-Java, we deal with data as "Streams", where we have a bunch of data, and we do some operations on this data ... an operation after another, until we take the final out-put form of this stream, as if we have some data pushed in a pipe-line, and this pipe-line keep playing with this data along the way until the end of the pipe ... this is a totally different way of thinking other than our imperative way of programming, where we take a bunch of data, and keep looping around it or keeping some of it in variables then updating these variables and so on.

Let's start with iterating over a bunch of data ... Integers

private List<Integer> integers = Arrays.asList(1, 2, 3, 4);

now we want to print this data, doing this the old way :

for (Integer i : integers) {
    System.out.print(i);
}

we have to store every value in an "Integer" value named "i", then we print it ... while doing this in Rx-Java :

// create stream
Observable.fromIterable(integers)
        // subscribe to this stream
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer i) {
                // handle the items pushed down the stream (emitted)
                System.out.print(i);
            }
        });

we first invoke Observable.fromIterable() to create a stream (pipe-line), this stream will iterate over the items passed to it, which are the "integers", and after looping over it, it will start "emitting" those items down this stream, in other words .. it will start pushing the integers in this stream one after another ... to receive the emitted items we need to subscribe to this stream ... through the subscribe() method

the subscribe method needs a Consumer Object in it's parameter, so when it receives an emitted item from the stream, it will invoke it's Consumer.accept() method, and pass to it this emitted item

So thinking about this example in terms of Rx-Java stream, we will create a Stream that will iterate over the "integers" List, and we will subscribe to this stream with a Consumer that prints the integer passed to it, so when ever an integer is passed to the subscribed Consumer, it will be passed to Consumer.accept() which will print the value

a cleaner way to do this is as follows :

Observable.fromIterable(integers)
        .subscribe(printInteger());

Consumer<Integer> printInteger() {
    return new Consumer<Integer>() {
        @Override
        public void accept(Integer i) {
            System.out.print(i);
        }
    };
}

the above code is exactly the same, except that now our Consumer is returned by a method, which makes it re-usable in any other subscribe() method

Now let us iterate over Even numbers only

in the old way :

for (Integer i : integers) {
    if(i % 2 == 0){
        System.out.print(i);
    }
}

now the code starts to have a new level of indentation, a new level of complexity, now we store the integer value in "i", then we check if "i" is even or not ... so we keep storing our data in variables and keep comparing them against our rules, while in Rx-Java we can do this :

Observable.fromIterable(integers)
            .filter(byEvenNumbers())
            .subscribe(printInteger());


Predicate<Integer> byEvenNumbers() {
    return new Predicate<Integer>() {
        @Override
        public boolean test(Integer i){
            return i % 2 == 0;
        }
    };
}

In Rx-Java, we have a Stream of Integers started by Observable.fromIterable(), where this stream will emit every Integer of the "integers" List, one at a time ... so we add a filter() operation, this filtering operation requires a Predicate as it's parameter, where ever it receives an emitted item, it passes it to the Predicate.test() method, if this method returns false, the emitted item will not continue to the next step down the stream, in other words ... will not be passed to the Consumer in subscribe(), and if the Predicate.test() returned true, the emitted item will continue down the stream to the next step and will be passed to the Consumer in subscribe() method

Notice that i did not write the printInteger() here, since it is now an independent method that can be declared even outside this class

Multi-Threading ... here comes the true difference

Now suppose that our code is running on two threads ... this is the old way :

final List<Integer> evenNumbers = new ArrayList<>(); // shared between threads

Thread threadOne = new Thread(new Runnable() {
    @Override
    public void run() {
        for (Integer i : integers) {
            if (i % 2 == 0) {
                evenNumbers.add(i);
            }
        }

    }
});


Thread threadTwo = new Thread(new Runnable() {
    @Override
    public void run() {
        for (Integer evenNumber : evenNumbers) {
            System.out.println(evenNumber);
        }
    }
});

threadOne.start();
threadOne.join();
threadTwo.start();

Let's agree that this is a big change in implementation, and also having a common list between 2 threads can cause lots issues

Now for the Rx-Java version of the same example :

Observable.fromIterable(integers)
        .filter(byEvenNumbers())
        .subscribeOn(Schedulers.newThread()) // threadOne
        .observeOn(Schedulers.newThread())   // threadTwo
        .subscribe(printInteger())

Those 2 new lines made every thing required for multi-threading, where subscribeOn() tells the Stream to operate on a new thread, and the observeOn() tells the stream that it will emit the items to the subscribed Consumer in another thread

as Rx-Java provides Schedulers, a way to tell the stream on which thread it should operate ... in android, it is very common to see this :

Observable.fromIterable(integers)
        .filter(byEvenNumbers())
        .subscribeOn(Schedulers.io())                // IO thread
        .observeOn(AndroidSchedulers.mainThread())   // UI thread
        .subscribe(printInteger())

in this snippet, the Stream will do all the heavy stuff (iterating and filtering) in an IO thread (one of the threads that are used for server requests or any IO operations), and after the Stream finishes, it will emit the items to be printed on the UI thread

another way of doing the above code, if we were on the UI thread, we can write the above code as follows :

Observable.fromIterable(integers)
        .filter(byEvenNumbers())
        .subscribeOn(Schedulers.io())                // IO thread
        .blockingSubscribe(printInteger())           // current thread

Declarative vs Imperative styles

The Main differences between Rx-Java and our old ways is the declarative way of coding, where we can keep track of every thing we do in a stream, where we start a stream form some data, we filter those data by even numbers, we make all the stream on an IO thread, and when the stream finishes we receive the emitted items on our thread ... every step is a method call, unlike our old Imperative way, where we have to keep track in our minds of what is that "if" inside the "for" and what is the current state of the shared List between 2 threads

you may have noticed also is that Rx-Java helps writing stateless code, no variables, no shared data between threads, every method is re-usable in another stream, we can use byEvenNumbers() in any stream that emits Integers

When can we say that we need to use Rx-Java ?

When we need to manipulate some data, like data structures or collections, and for advanced usage, when ever timing the execution of code is important, when we care if a method should be executed before another one, or at the same time (multi-threading), or any case where timing the execution matters

another basic operator that you may need to look for after understanding the above code is map(), also error handling is a big subject, maybe in another article


Straight to the point (Y)

Like
Reply

بالنسبة للسؤالين اللي في الاول، انت صح في الاتنين ال predicate هي Function بترجع boolean

Like
Reply

والله انت راجل شاطر

Like
Reply

To view or add a comment, sign in

More articles by Ahmed Adel Ismail

  • Sharing data across multiple Mobile Squads - with examples

    Earlier I shared an article suggesting a solution to a common problem with teams following the "Spotify Model", which…

  • SDD - Squad Driven Design

    Working in multiple big teams I've found that we are always trying to apply our known best practices in software, but…

    4 Comments
  • Easier Testing with MVVM, MVI, MVP and Kotlin Multiplatform

    Before we start, this article requires basic knowledge about the following topics : Clean Architecture Unit Testing…

    9 Comments
  • Android - A Cleaner Clean Architecture

    It has been a while now since Clean Architecture was out, and even many of us started embracing hexagonal (ports and…

    9 Comments
  • Beyond Functional Programming

    In the Android industry, lately functional programming was the all new stuff to learn, RxJava, Kotlin, and the whole…

    7 Comments
  • Dependency Injection in Clean Architecture

    After Google's Opinionated Guide to Dependency Injection Video, Google made a clear statement that they want developers…

    18 Comments
  • Meta Programming in Android

    Year after year we are getting rid of the boilerplate code that we need to write for small and simple tasks in Android,…

    2 Comments
  • MVI Pattern For Android In 4 Steps

    Lately I wrote an article about MVI pattern, but as we are facing new problems every day and face more use-cases, we…

    7 Comments
  • Agile - Moving Fast

    We always here about Agile, and think about which methodology do we use, what practices do we have, team velocity…

    1 Comment
  • Kotlin Unit Testing with Mockito

    I've always believed that, if the code is designed to be tested, we wont need any testing framework or library ..

    17 Comments

Others also viewed

Explore content categories