Codementor Events

RxJava | Introduction to `using` operator

Published Aug 03, 2018
RxJava | Introduction to `using` operator

Hello guys!

This is my first article about Reactive Programming, using RxJava 2.x.

You know that Reactive Programming is well-known and being extremely contributed nowadays. It has a lot of applications in real-world contexts. Many bloggers and contributors research, write, test, and apply it every day. Some of practical applications are:

  • Handle multiple continuous request and response without making callback hell (ex: logging in by username and password to a network endpoint, then caching the received token to SharePreferences, then fetching user profile with this token and save profile into database, finally finish the flow).
  • Observe any changes of anything (ex: you want to listen into database whenever it has changes with a new record has been added, updated or removed; or listen into presenter (MVP) / view model (MVVM) whenever it has changed data to notify to UI.
  • Easy to test, maintain and extend

Suppose that all you guys have already known about RxJava, at least Observable, Subscription, Disposable, some basic Operators (create, defer, just, fromCallable, flatMap, map, …), Transformer, Error Handling, Subject, … If you have never known them before, please take a look at some resources below (I will write an article to introduce you guys about Reactive Programming in action soon):

In this article, I just want to show you my practical approach when using an operator in RxJava. So I take using operator for example.

What is using operator?

To understand it, I follow my steps below:

1. Definition

I take a quick look at a definition in http://reactivex.io/documentation/operators/using.html.

You can see that creating a using operator is creating a disposable resource that has the same lifespan as the Observable. I can imagine basically that it can create Observable having a resource (can be UI resource, data resource, …) which can be disposable automatically within its own observable.

2. Source

I take a deep look at RxJava repository and find this operator’s implementation: https://github.com/ReactiveX/RxJava/blob/2.x/src/main/java/io/reactivex/internal/operators/observable/ObservableUsing.java.
In this source code, I found that it has 4 parameters to be constructed, its function prototype looks like

ObservableUsing(Callable<? extends D> resourceSupplier,
 Function<? super D, ? extends ObservableSource<? extends T>> sourceSupplier,
 Consumer<? super D> disposer,
 boolean eager)

How does Observable class define usage for this operator ? Hmm…

public static <T, D> Observable<T> using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends ObservableSource<? extends T>> sourceSupplier, Consumer<? super D> disposer) {
 return using(resourceSupplier, sourceSupplier, disposer, true);
 }

Ignore about T and D first (it just a generic type of which class extend to or super from), we can see that:

  • resourceSupplier is the factory function to create a resource object that depends on the ObservableSource
  • sourceSupplier is the factory function to create an ObservableSource
  • disposer is the function that will dispose of the resource

So this method prototype can construct an ObservableSource that creates a dependent resource object which is disposed of when the downstream calls dispose().

Then I think that if I use using operator, I can create an common Observable, which can be provided a resource, process by a ObservableSource from this resource and automatically dispose by a prepared disposer.

3. Test

Almost source code of RxJava are done with Unit Test. I then take a look at https://github.com/ReactiveX/RxJava/blob/2.x/src/test/java/io/reactivex/internal/operators/observable/ObservableUsingTest.java to see how it works deeply.

Ok, suppose that I have a resource (built from Callable interface), a function to process this resource from an ObservableSource (built from Function interface) and a callback to release this resource when my observable is disposed or unsubscribed.

You can see a simple unit test they have written:

final Resource resource = mock(Resource.class);
 when(resource.getTextFromWeb()).thenReturn(“Hello world!”);

Callable<Resource> resourceFactory = new Callable<Resource>() {
 [@Override](http://twitter.com/Override "Twitter profile for @Override")
 public Resource call() {
 return resource;
 }
 };

Function<Resource, Observable<String>> observableFactory = new Function<Resource, Observable<String>>() {
 [@Override](http://twitter.com/Override "Twitter profile for @Override")
 public Observable<String> apply(Resource res) {
 return Observable.fromArray(res.getTextFromWeb().split(“ “));
 }
 };

Observer<String> observer = TestHelper.mockObserver();

Observable<String> o = Observable.using(resourceFactory, observableFactory,
 new DisposeAction(), disposeEagerly);
 o.subscribe(observer);

InOrder inOrder = inOrder(observer);
 inOrder.verify(observer, times(1)).onNext(“Hello”);
 inOrder.verify(observer, times(1)).onNext(“world!”);
 inOrder.verify(observer, times(1)).onComplete();
 inOrder.verifyNoMoreInteractions();

// The resouce should be closed
 verify(resource, times(1)).dispose();

In this case, we have a resource having abilities to get text from web then return ” Hello World ” and dispose function. Also we have an Observable source can split this text got from web by “ ” character. So the unit test must verify that the text received in onNext callback should be correspondingly Hello , World , onComplete callback must be called, and then our resource have to be disposed.

4. Application

From three steps above, we can imagine that what exactly using operator does. Let’s think about real-world application if we use it. Hmm, what’s your thought ?

Summary again, we can build a resource, process it by an Observable source then make it dispose automatically. So just think simply that we have an Observable doing a task (ex: login to server, save a record to database, …), we want to hook a ProgressDialog into it for blocking the UI, when our Observable finish its task, we hide this ProgressDialog.

Is it a real-world application and practical for us to apply ? We should not think anymore and let’s make some unit test for fun ahihi. I use Mockito framework in order to mock some UI class, RxJava also has many related testing classes, so you guys should not worry about it, just do it.

Firstly, we want to make a test that our Observable taking 2 seconds to emit a Hello World string. Please see my code below:

// observable
Observable<String> observable = Observable.timer(2000, TimeUnit.MILLISECONDS)
 .map(time -> “Hello World”);

// resource supplier
ProgressDialog mockDialog = mock(ProgressDialog.class);
doNothing().when(mockDialog).dismiss();
Callable<ProgressDialog> resourceSupplier = () -> mockDialog;

// source supplier
Function<ProgressDialog, Observable<String>> sourceSupplier = progressDialog -> Observable.create(emitter -> {
 progressDialog.setOnCancelListener(dialog -> emitter.onComplete());
 progressDialog.setOnDismissListener(dialog -> emitter.onComplete());
 observable.subscribe(emitter::onNext, emitter::onError, emitter::onComplete);
 });

// disposer
Consumer<ProgressDialog> disposer = Dialog::dismiss;

// test
TestObserver<String> testObserver = TestObserver.create();
Observable.using(resourceSupplier, sourceSupplier, disposer).subscribeWith(testObserver);
if (testObserver.awaitTerminalEvent()) {
 testObserver.assertValue(“Hello World”);
 testObserver.assertNoErrors();
 testObserver.assertComplete();

verify(mockDialog, times(1)).dismiss();
}

Secondly, we want to make a test that our Observable meets an exception when doing its task. Please see my code below:

// observable
Observable<String> observable = Observable.timer(2000, TimeUnit.MILLISECONDS)
 .flatMap(time -> Observable.error(new IllegalArgumentException(“Goodbye World”)));

// resource supplier
ProgressDialog mockDialog = mock(ProgressDialog.class);
doNothing().when(mockDialog).dismiss();
Callable<ProgressDialog> resourceSupplier = () -> mockDialog;

// source supplier
Function<ProgressDialog, Observable<String>> sourceSupplier = progressDialog -> Observable.create(emitter -> {
 progressDialog.setOnCancelListener(dialog -> emitter.onComplete());
 progressDialog.setOnDismissListener(dialog -> emitter.onComplete());
 observable.subscribe(emitter::onNext, emitter::onError, emitter::onComplete);
});

// disposer
Consumer<ProgressDialog> disposer = Dialog::dismiss;

// test
TestObserver<String> testObserver = TestObserver.create();
Observable.using(resourceSupplier, sourceSupplier, disposer).subscribeWith(testObserver);
if (testObserver.awaitTerminalEvent()) {
 testObserver.assertSubscribed();
 testObserver.assertNoValues();
 testObserver
 .assertError(throwable -> throwable instanceof IllegalArgumentException && throwable.getMessage().equals(“Goodbye World”));
 testObserver.assertNotComplete();

verify(mockDialog, times(1)).dismiss();
}

We have two simple test cases in order to understand and apply using operator practically. Actually, we have to make more unit test to ensure that our code are correct. My examples below only use Observable source to demonstrate, we can make more unit test on Single, Completable, Maybe or Flowable.

My real-world application using this operator is located at: https://github.com/tranngoclam/rx-progress-dialog.

If you guys are interested about my approach for understanding a thing of Reactive Programming, we can discuss more.

Thanks for reading!!!
Have a nice day and happy coding 😉

Discover and read more posts from Lam Tran
get started
post commentsBe the first to share your opinion
Show more replies