× {{alert.msg}} Never ask again
Receive New Tutorials
GET IT FREE

How to Use RxJava: Basic Guide

– {{showDate(postTime)}}

rxjava

Introduction

Reactive eXtensions is an API to create and act on streams of data in real-time while addressing the limitations of Observer pattern and asynchronous programming like memory leaks, and concurrency limitations.

But what is RxJava?

RxJava is an open-source project that originated at Netflix as a Java implementation of Reactive eXtensions.

In this article, let us get started with practical RxJava.

1. Dependency management

You can add the dependency below to your pom.xml to enable the latest RxJava updates in your programs,

<dependency>
    <groupId>io.reactivex</groupId>
    <artifactId>rxjava</artifactId>
    <version>1.2.1</version>
</dependency>

2. Observable and Subscriber

Observer Design Pattern is one of the basic building blocks of RxJava.

The two main components of RxJava are Observable and Subscriber. As the name suggests, Observable is a class that emits a stream of data or events and Subscriber is a class that watches observables by subscribing to them. Subscriber is notified when an Observable emits a value or event or when an error occurred or when there are no more events to be emitted. A well designed Observable class will call Subscriber‘s onNext() method and at the end will call Subscriber‘s onCompleted() or onError() exactly once.

The Observable will emit its data only when it has at least one Subscriber. Observable class has many static methods called operators to create the Observable objects. Below is the sample code snippet of calling just operator of Observable:

Observable<String> sampleObservable = Observable.just("Welcome to RxJava");

In the code above, we created a new Observable called sampleObservable. As no observers or subscribers are subscribed to this sampleObservable, it will not emit its data now.

Our next steps will be to create a Subscriber object of a class that implements Observer interface and subscribe to the sampleObservable in order to receive the events. Below is the code snippet that achieves the same:

Subscriber<String> sampleSubscriber = new Subscriber() {

	@Override
	public void onCompleted() {
		System.out.println("Emitting Complete!");
	}
	
    @Override
	public void onError(Throwable e) {

    }

    @Override
	public void onNext(String value) {
		System.out.println("onNext method: " + value);
	}
}

And finally subscribing to the Observable,

sampleObservable.subscribe(sampleSubscriber);

Subscriber class implements the Observer interface and provides some additional useful methods along with the default onNext(), onCompleted() and onError() methods. Some of them are:

  • onStart() , a method which is invoked when the Subscriber has been a subscriber to the Observable and before the Observable has emitted any data.
  • isUnsubscribed(), a method that indicates whether this Subscriber has unsubscribed from the list of subscriptions.
  • unsubscribe(), a method that stops the receipt of notifications.

3. Observable operators

In this section, we will see some of the operators provided by RxJava API by default which can create, transform, and perform other operations on Observable. Also, Any Observable can have its output transformed and multiple Operators can be chained onto the Observable.

Operators on Observable are broadly classified by the operation they perform and some of them are:

3.1. Transformational

Some of the operators under this Transformational are,

  • map() , a method which transforms the items emitted by an Observable by applying a function to each of them.
  • buffer(), a method which periodically gathers items from an Observable into bundles and emits these bundles rather than emitting the items one at a time.
  • scan(), a method which applies a function to each item emitted by an Observable, sequentially, and emits each successive value.
3.2. Utility Operators

Some of the operators under this Utility Operators are:

  • timestamp(), a method which attaches a timestamp to every item emitted by an Observable.
  • serialize(), a method which forces an Observable to make serialized calls and to be well-behaved.
  • observeOn(), a method which specifies on which Scheduler a Subscriber should observe the Observable.
  • doOnTerminate(), a method which registers an action to take when an Observable completes, either successfully or with an error.
3.3. Filtering

Some of the operators under this Filtering are:

  • filter(), a method which filters items emitted by an Observable,
    last(), a method which emits only the last item emitted by an Observable.
  • distinct(), a method which suppresses duplicate items emitted by the source Observable.
  • timeout(), a method which emits items from a source Observable, but issue an exception if no item is emitted in a specified timespan.
3.4 String

Some of the operators under this String are,

  • byLine(), a method which converts an Observable of Strings into an Observable of Lines by treating the source sequence as a stream and splitting it on line-endings.
  • join(), a method which converts an Observable that emits a sequence of strings into an Observable that emits a single string that concatenates them all, separating them by a specified string.
  • stringConcat(), a method which converts an Observable that emits a sequence of strings into an Observable that emits a single string that concatenates them all.
3.5 Combining

Some of the operators under this Combining are,

  • merge(), a method which combines multiple Observables into one.
  • zip()e provided operators, RxJava API also allows us to create custom operators suited to our requirement.

4. Subject

There will be scenarios where an object will need to act as Observable for some Observers as well as Observer of another Observable.

This can be implemented simply by calling these as Subject and a ReplaySubject can be called as specified below:

ReplaySubject<Object> subject = ReplaySubject.create();

ReplaySubject is a class inherited from Subject that buffers all items it observes and replays them to any Observer that subscribes.

5. Error handling

As we know by now, Subscriber has a method called onError() which gets invoked when an error occurs. Generally, Observable will notify the Observers by calling the onError() method instead of throwing exceptions. We can respond to the notifications to the onError() method in order to handle different exceptions.

Conclusion

In this article, we’ve touched the basics of RxJava, along with some of the key operators provided in the API to help you get started. Code examples for some of the discussed operators above can be found in here.

Other tutorials you might be interested in:


Author’s Bio

Kiran Gonela is an accomplished Big Data Solution Architect and evolving Machine Learning expert with around 11 years of professional experience. He has a proven track record of architecting and delivering widely distributed and highly scalable solutions in cloud as well as on-premise. He focuses on adding value to the business with specialties that include Business Development, Big Data Architecture, Big Data/Hadoop implementations, Hadoop Administrator, RFP/RFI proposal, and evangelizing new cutting edge technologies.




Questions about this tutorial?  Get Live 1:1 help from Java experts!
RajhaRajesuwari S
RajhaRajesuwari S
5.0
Full Stack PHP / NODE/REACT/ WORDPRESS/SHOPIFY web developer
I am an experienced full stack developer 15 years in the field with consistent knowledge in developing web portals with expertise in all opensource...
Hire this Expert
Anton Anderson
Anton Anderson
5.0
Self-taught Visual C++ Developer & Manufacturing Test Engineer
I am a Windows software developer who has developed custom physical computing and data acquisition software applications for a major international...
Hire this Expert
comments powered by Disqus