This post shows how can we marry an exiting method which generates a Collection into an Event Stream and then React to it using RxJava. I have discussed fundamentals of RxJava in this previous post.
Let's assume that we have a method which generates first k Fibonacci sequences and returns them as a Collection.
Reactive Programming is programming with Asynchronous Data Streams. In this previous post, I have discussed fundamentals of RxJava. This post will show different ways to apply RxJava constructs on above method.
--happy Rx !!!
Let's assume that we have a method which generates first k Fibonacci sequences and returns them as a Collection.
//Generates first k fibonacci numbers
private static List<Integer> generateFibonacci(int count) { List<Integer> resp = new ArrayList<> (); resp.add (0); resp.add (1); for(int i=2; i< count; i++){ resp.add(resp.get (i-1) + resp.get (i-2)); } return resp; }
Reactive Programming is programming with Asynchronous Data Streams. In this previous post, I have discussed fundamentals of RxJava. This post will show different ways to apply RxJava constructs on above method.
Observable Creation
Observable provides methods to convert an array, Collection, Future, or even a normal function into Observable.
from - Converts an Iterable sequence into an Observable that emits the items in the sequence.
Observable.from (generateFibonacci (10) );
Please note that, method returns a Collection which is of type Iterable so it can be converted into an Observable.
fromCallable - Returns an Observable that, when an observer subscribes to it, invokes a function you specify and then emits the value returned from that function.
Observable.fromCallable (() -> generateFibonacci (10));
Transforming Observable
RxJava provides different operation which can be applied on Observable to transform it. Let transform above observable in such a way that only fibonacci element which are less than 15 gets finally pushed to the subscriber.filter - Filters items emitted by an Observable by only emitting those that satisfy a specified predicate.//1st Approach Predicate<Integer> predicate = new Predicate<Integer> ( ) { @Override public boolean test(Integer integer) { return integer < 15; } }; Observable.from (generateFibonacci (10)) .filter (i -> predicate.test (i)); //2nd Approach Observable.from (generateFibonacci (10)) .filter (integer -> { return integer < 15; }); //3rd Approach Observable.from (generateFibonacci (10)) .filter ((integer) -> integer < 15);
Subscribing Observable
Observables emit events and subscribers react to it. Both Observable and Subscriber/Observers are independent and they just agree to the contract.
Observable.from(generateFibonacci(10))
.subscribe (System.out::println);
//Or a more detailed and formal subscriber which gives all 3 event handlers!
Observable.from (generateFibonacci (10))
.subscribe(new Subscriber<Integer> () {
@Override
public void onNext(Integer s) { System.out.println(s); }
@Override
public void onCompleted() { System.out.println("Completed!"); }
@Override
public void onError(Throwable e) { System.out.println("Ouch!"); }
});
--happy Rx !!!