View on GitHub

Rxjava8

Implementation of reactive extension support using Java 8

Download this project as a .zip file Download this project as a tar.gz file

Welcome to RxJava.

Implementation of core features of reactive extension using Java 8. It is inspired by Microsoft's Rx library at https://rx.codeplex.com/, but it doesn't support all of their APIs.

Building

git clone git@github.com:bhatti/RxJava8.git
./gradlew jar

Version

License

How To Guide

Creating Observable from Array of objects

   Observable.from("Erica", "Matt", "John", "Mike").subscribe(System.out::println, 
      Throwable::printStackTrace, () -> System.out.println("done"));

Creating Observable from Collection

   List<String> names = Arrays.asList("Erica", "Matt", "John", "Mike", 
      "Scott", "Alex", "Jeff", "Brad"); 
   Observable.from(names).subscribe(System.out::println, 
      Throwable::printStackTrace, () -> System.out.println("done"));

Creating Observable from Stream

   Stream<String> names = Stream.of("Erica", "Matt", "John", "Mike", 
      "Scott", "Alex", "Jeff", "Brad"); 
   // note third argument for onComplete is optional
   Observable.from(names).subscribe(name -> System.out.println(name), 
      error -> error.printStackTrace());

Creating Observable from Iterator

   Stream<String> names = Stream.of("Erica", "Matt", "John", "Mike", 
      "Scott", "Alex", "Jeff", "Brad"); 
   Observable.from(names.iterator()).subscribe(name -> System.out.println(name), 
      error -> error.printStackTrace());

Creating Observable from Spliterator

   List<String> names = Arrays.asList("Erica", "Matt", "John", "Mike", 
      "Scott", "Alex", "Jeff", "Brad"); 
   Observable.from(names.spliterator()).subscribe(System.out::println, 
      Throwable::printStackTrace);

Creating Observable from a single object

   Observable.just("value").subscribe(v -> System.out.println(v), 
      error -> error.printStackTrace());
   // if a single object is collection, it would be treated as a single entity, e.g.
   Observable.just(Arrays.asList(1, 2, 3)).subscribe( num -> System.out.println(num), 
      error -> error.printStackTrace());

Creating Observable for an error

   Observable.throwing(new Error("test error")).subscribe(System.out::println, 
      error -> System.err.println(error));
   // this will print error 

Creating Observable from a consumer function

   Observable.create(observer -> {
      for (String name : names) {
         observer.onNext(name);
      }
      observer.onCompleted();
   }).subscribe(System.out::println, Throwable::printStackTrace);

Creating Observable from range

   // Creates range of numbers starting at from until it reaches to exclusively
   Observable.range(4, 8).subscribe(num -> System.out.println(num), 
      error -> error.printStackTrace());
   // will print 4, 5, 6, 7

Creating Observable for integer numbers

   // Creates infinite integers starting at given number and incremented by 1
   Observable.integers(4).limit(4).subscribe(num -> System.out.println(num), 
      error -> error.printStackTrace());
   // will print 4, 5, 6, 7

Creating empty Observable - it would call onCompleted right away

   Observable.empty().subscribe(System.out::println, 
      Throwable::printStackTrace, () -> System.out.println("Completed"));

Creating never Observable - it would not call any of call back methods

   Observable.never().subscribe(System.out::println, Throwable::printStackTrace);

Changing Scheduler

By default Observable notifies observer asynchronously using thread-pool scheduler but you can change default scheduler as follows:

Using thread-pool scheduler

   Observable.from("Erica", "Matt", "John").subscribeOn(Scheduler.newThreadPoolScheduler()).
      subscribe(System.out::println, Throwable::printStackTrace);

Using new-thread scheduler - it will create new thread

   Observable.from("Erica", "Matt", "John").subscribeOn(Scheduler.newNewThreadScheduler()).
      subscribe(System.out::println, Throwable::printStackTrace);

Using timer thread with interval - it will notify at each interval

   Observable.from("Erica", "Matt", "John").subscribeOn(Scheduler.newTimerSchedulerWithMilliInterval(1000)).
      subscribe(System.out::println, Throwable::printStackTrace);
   // this will print each name every second

Using immediate scheduler

This scheduler will call callback functions right away on the same thread. You can use this scheduler for a smaller amount of data that you want to consume synchronously. However, you cannot unsubscribe as it runs on the same thread.

   Observable.from("Erica", "Matt", "John").
      subscribeOn(Scheduler.newImmediateScheduler()).
      subscribe(System.out::println, Throwable::printStackTrace);
   // this will print each name every second

Transforming

Observables keep sequence of items as streams and they support map/flatMap operation as supported by standard Stream class, e.g.

Map

   Observable.from("Erica", "Matt", "John").map(name -> name.hashCode()).
      subscribe(System.out::println, Throwable::printStackTrace);

FlatMap

FlatMap merges list of lists into a single list when doing transformation, e.g.

   Stream<List<Integer>> integerListStream = Stream.of( Arrays.asList(1, 2), 
      Arrays.asList(3, 4), Arrays.asList(5));
   Observable.from(integerListStream).flatMap(integerList -> integerList.stream()).
      subscribe(System.out::println, Throwable::printStackTrace);

Filtering

Observables supports basic filtering support as provided by Java Streams, e.g.

Filter

   Observable.from("Erica", "Matt", "John", "Mike", "Scott", 
      "Alex", "Jeff", "Brad").filter(name -> name.startsWith("M")).
      subscribe(System.out::println, Throwable::printStackTrace);
   // This will only print Matt and Mike

Skip - skips given number of elements

   Stream<String> names = Stream.of("Erica", "Matt", "John", "Mike", 
      "Scott", "Alex", "Jeff", "Brad"); 
   Observable.from(names).skip().subscribe(System.out::println, 
      Throwable::printStackTrace);
   // This will skip Erica and John

Limit

   Stream<String> names = Stream.of("Erica", "Matt", "John", "Mike", 
      "Scott", "Alex", "Jeff", "Brad"); 
   Observable.from(names).limit(2).subscribe(System.out::println, 
      Throwable::printStackTrace);
   // This will only print first two names

Distinct - removes duplicates

   Stream<String> names = Stream.of("Erica", "Matt", "John", "Erica");
   Observable.from(names).distinct.subscribe(System.out::println, 
      Throwable::printStackTrace);
   // This will print Erica only once

Merge - concates two observable data

   Observable<Integer> observable1 = Observable.from(Stream.of(1, 2, 3));
   Observable<Integer> observable2 = Observable.from(Stream.of(4, 5, 6));
   observable1.merge(observable2).subscribe(System.out::println, 
      Throwable::printStackTrace);
   // This will print 1, 2, 3, 4, 5, 6

Parallel - parallel processing internal stream

   Observable<Integer> observable = Observable.range(1, 101)
                   .subscribeOn(Scheduler.newNewThreadScheduler())
                   .parallel().subscribe(System.out::println, 
      Throwable::printStackTrace);
   // This will print 1, 2, 3, ... 100

toList - returns internal objects as list

   List<Integer> list = Observable.from(1, 2).merge(Observable.from(3, 4)).toList();
   // This will return list of 1, 2, 3, 4

toSet - returns internal objects as list

   Set<Integer> set = Observable.from(1, 2).merge(Observable.from(3, 4)).merge(Observable.just(3)).toSet();
   // This will return set containg 1, 2, 3, 4 (unordered and without any duplicates)

Support or Contact

Email bhatti AT plexobject DOT com for any questions or suggestions.