The Rx Way of Doing Things

Published Jan 10, 2018Last updated Jan 17, 2018
The Rx Way of Doing Things

The reactive way of programming makes many complex problems infinitely easy. This post sheds some light on the reactive versus imperative ways of solving a problem. I will try to explain how reactive programming makes a developer’s life easy by doing more in less code.

Problem Statement

Consider the following problem statement:

  1. You have to make an API call that returns a map of ObjectId to Object.
  2. You need to convert this map to a list of objects that contain object-id.
  3. Now you need to filter some objects from this list.
  4. For each of these filtered objects, you need to do another API call.
  5. The above API calls should be done sequentially. Each subsequent API call should be done after a delay of 100 miliseconds.

This is a real life scenario. I had written implementation for this in one of our Android apps. We use Retrofit for networking, RxJava-2 for reactive programming, and Retrolambda for Java8, like syntax and boilerplate reduction.

Now, think about how you will implement this in an imperative world.

The Imperative Way

Let me help you. In an imperative world:

  1. You will probably do an API call with the help of Retrofit (if you are using Retrofit) or in an AsyncTask.
  2. When this API returns, you will iterate the map and convert it to a list on the main thread (of course, you can do it in a different thread, but I want to keep things simple).
  3. You would then do a second iteration for filtering out desired objects.
  4. Now, things get interesting because you need to perform API calls for each of these remaining objects on the list. You probably will need to write a Service or AsyncTask that now performs synchronous API calls after a sleep time of 100 ms.

That is lot of work, and I am not even considering edge cases and error handling for the sake of simplicity. Think of the case when an API returns a large list of objects and you can’t afford to process that on the main thread.

Well, Rx does all of this heavy lifting for you. Let's see how.

The Rx Way

Reactive programming is a totally different approach towards writing code. Everything in a reactive world is considered a stream. These streams are immutable and there are a plethora of operators available that operate on these streams. The real power of reactive programming lies in these tried and tested operators.

Rx makes the task of switching between the threads as easy as having an apple pie. Our above problem statement can be accomplished with RxJava by a mere 10 lines of code.

Here is a sample from the production code:

APIProvider.getInstance()
.getService()
.getMyObjects(userName)
.subscribeOn(Schedules.io())
.map(MyObject::toList)
.flatMap(rx.Observable::from)
.filter(myObject->myObject.getName().equals(Constants.SOME_NAME)) 
// adding a 10ms delay in subsequent api calls 
.flatMap(myObject -> Observable.timer(100, TimeUnit.MILLISECONDS).map(y -> myObject))
.doOnNext(rule -> { 
    // deleting my object one by one 
    APIProvider.getInstance()
    .getService()
    .deleteMyObject(userName, rule.getId()) .subscribe(); 
    })
.subscribe();

Let's understand this code line by line:

APIProvider.getInstance().getService().getMyObjects(userName)

It's Retrofit's way of doing an API call. This API returns a map of MyObjectId to MyObject. An example JSON looks like this:

{"1":{"name":"object1"},"2":{"name":"object2"}}
.subscribeOn(Schedules.io())

subscribeOn is an Rx operator that directs RxJava to start processing on a scheduler thread (a background thread). Just this line lets you switch your processing to a background thread. How cool is that!

Map is a transformation operator that allows us to convert things. Here, we are converting Map to List. toList is a static method written in a MyObject class. This method converts a Map<ObjectId, MyObject> to List<MyObject>. The list version has object ids in the object itself. MyObject::toList is the Java 8 way of calling this method, named Method Reference. The transformed data structure now looks like this:

[{"id":"1","name":"object1"},{"id":"2","name":"object2"}]
.flatMap(rx.Observable::from)

This statement allows us to receive each MyObject as separate events on an observable stream. Before this, we were having a whole List or Map as a single event on an observable stream. To perform filtering and API calls on each of these separately, we need to have them as independent events on a stream.

.filter(myObject->myObject.getName().equals(Constants.SOME_NAME))

This is pretty obvious. Filter operator allows us to filter out events that do not match the filtering criteria. Any object whose name does not match with Constants.SOME_NAME is not allowed to propagate further in the stream.

.flatMap(myObject -> Observable.timer(100, TimeUnit.MILLISECONDS).map(y -> myObject))

This interesting piece of code allows us to add a delay of 100 ms between subsequent results. Inside flatmap, we are creating another stream using the Timer observable. As per documentation, Timer returns an Observable that emits a single number zero after a delay period you specify.

Since we do not want to lose our MyObjects, we need to map this zero back to MyObject. The outer flatmap is making sure that the events are received on the current stream.

doOnNext(myObject -> { // deleting my object one by one 
APIProvider.getInstance().getService()
.deleteMyObject(userName, myObject.getId()) 
.subscribe();
})

Finally, since we have a nice stream of MyObjects, including a delay of 100ms, we can perform our API calls. That is what is happening inside the doOnNext operator. The doOnNext operator allows us to add a hook on each event in a stream.

Fire! That is what this operator does. This the simplest version of the Subscribe operator that we have used here. You can read more about it in docs.

If you have noticed, all of this work is happening in a background thread without us having to worry about anything. In my case, I had no need to process API results in the Android main thread. But, in case you do need to, you can just use theObserveOn operator to switch threads any time.

So, what do you think about it? Let me know in the comments.

Happy coding!!

Discover and read more posts from Abhishek Bansal
get started