Sunday, January 29, 2017

Applying RxJava on a Fibonacci Sequence Generator

This post shows how can we marry an exiting method which generates a Collection into an Event Stream and then React to it using RxJava. I have discussed fundamentals of RxJava in this previous post.

Let's assume that we have a method which generates first k Fibonacci sequences and returns them as a Collection.


//Generates first k fibonacci numbers
private static List<Integer> generateFibonacci(int count) {
        List<Integer> resp = new ArrayList<> ();
        resp.add (0);
        resp.add (1);
        for(int i=2; i< count; i++){
            resp.add(resp.get (i-1) + resp.get (i-2));
        }
        return resp;
    }


Reactive Programming is programming with Asynchronous Data Streams. In this previous post, I have discussed fundamentals of RxJava. This post will show different ways to apply RxJava constructs on above method.

Observable Creation

Observable provides methods to convert an array, Collection, Future, or even a normal function into Observable. 

from - Converts an Iterable sequence into an Observable that emits the items in the sequence. 

Observable.from (generateFibonacci (10) );

Please note that, method returns a Collection which is of type Iterable so it can be converted into an Observable. 

fromCallable - Returns an Observable that, when an observer subscribes to it, invokes a function you specify and then emits the value returned from that function.
Observable.fromCallable (() -> generateFibonacci (10));

Transforming Observable

RxJava provides different operation which can be applied on Observable to transform it. Let transform above observable in such a way that only fibonacci element which are less than 15 gets finally pushed to the subscriber. 
filter - Filters items emitted by an Observable by only emitting those that satisfy a specified predicate.
   //1st Approach
        Predicate<Integer> predicate = new Predicate<Integer> ( ) {
            @Override
            public boolean test(Integer integer) {
                return integer < 15;
            }
        };

        Observable.from (generateFibonacci (10))
                .filter (i -> predicate.test (i));

        //2nd Approach
        Observable.from (generateFibonacci (10))
                .filter (integer -> {
                    return integer < 15;
                });

        //3rd Approach
        Observable.from (generateFibonacci (10))
                .filter ((integer) -> integer < 15);

Subscribing Observable

Observables emit events and subscribers react to it. Both Observable and Subscriber/Observers are independent and they just agree to the contract. 


      Observable.from(generateFibonacci(10))

                .subscribe (System.out::println);



        //Or a more detailed and formal subscriber which gives all 3 event handlers!



        Observable.from (generateFibonacci (10))

                .subscribe(new Subscriber<Integer> () {
                    @Override
                    public void onNext(Integer s) { System.out.println(s); }

                    @Override
                    public void onCompleted() { System.out.println("Completed!"); }

                    @Override
                    public void onError(Throwable e) { System.out.println("Ouch!"); }
                });


--happy Rx !!!

Thursday, January 26, 2017

Reactive Programming through RxJava

Rx was originally developed as Reactive Extension for C#. Later, Netflix ported it for JVM platform under Apache license, named as RxJava!

RxJava is also an extension of Observer Design Pattern.This post, I will be focusing mostly on the fundamentals and constructs of RxJava.

Rx Building Blocks

The two fundamental constructs of Rx are Observables and Subscribers. Observables abstract event stream and emit them asynchronously to the subscibers (or observers).  There could be zero or more events and it terminates either by successfully completing or ending to an error. Streams can be created out of anything- list of integers, user clicks in UI, DB queries, tweeter feeds and what not. It's up to you, you can convert any sequence of data, input, function etc into a stream. And on top of streams, we have the functional toolkit to filter, modify, combine, merge these events/streams. We can merge two streams to form a single stream, map values from one stream to another. Below image shows the UI click represented as events. An instance of Subscription represents the connection between Observables and Subscribers. 



Observables: rx.Observable<T>

Observable is the first core component of Rx. Let's see what are ways through which observables can be created-

//1
List<Integer> list = Arrays.asList (1,2,3,4 );
Observable<List<Integer>> listObservable = Observable.just (list);

//2

Observable.from(Arrays.asList(1,2,3)); 
//from can take an Array, Iterable and Future

//3

Callable<String> call = () -> function1();
Observable.fromCallable(call)

Observable.fromCallable(()-> {throw new RuntimeException("some error")}); 




The observable class provides method- subscribe which will be used to push values to the observers. The subscriber is basically an implementation of Observer interface. Observable pushes three kinds of events as shown below as part of Observer interface. 

Observer
Provides a mechanism for receiving push-based notification. This interface provides three methods which will get called accordingly.

public Interface Observer<T>{
      void onCompleted();  
      void onError(Throwable e);
     void onNext(T t);
}

onCompleted() notifies the observer that observable has finished sending events.
onError() notifies the observer that observable has encountered an error / unhandled exception.
onNext() provides the subscribed observer with the new item/event.

Usually, in the code, you will not be able to explicitly notice Observers and those three callback methods; thanks to lambda and some other shorthands. To subscribe to an Observable, it's not necessary to provide an instance of Observer at all. Also, it's not always necessary to provide an implementation of onNext, onCompleted and onError; you could just provide a subset of them.


Consuming Observables (i.e. Subscribing)

Consuming Observables basically means Subscribing to them. Only when you subscribe, you can receive events abstracted by observable. 

Observable
                .just(1, 2, 3)
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("Completed Observable.");
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        System.err.println("Whoops: " + throwable.getMessage());
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println("Got: " + integer);
                    }
                });

Output

Got: 1
Got: 2
Got: 3

Completed Observable.

A well-formed Observable will call onNext 0 or more times and then will call either onError or onNext exactly once. 


Now let's simulate through an example; how observable can throw an Error.


Observable
                .just(1, 2, 3)
                .doOnNext(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        if (integer.equals(2)) {
                            throw new RuntimeException("I don't like 2");
                        }
                    }
                })
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {
                        System.out.println("Completed Observable.");
                    }

                    @Override

                    public void onError(Throwable throwable) {
                        System.err.println("Oops: " + throwable.getMessage());
                    }

                    @Override

                    public void onNext(Integer integer) {
                        System.out.println("Got: " + integer);
                    }
                });


Output

Got: 1
Oops: I don't like 2

The first value gets through without any obstacle, but the second value throws an exception and then terminates the Observable. This will get confirmed if you run above snippet.

We don't need to always implement full subscriber every time. 
        Observable
                .just(1, 2, 3)
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        System.out.println("Got: " + integer);
                    }
                });

    
In this case, if error happens it will come on your face as there is no handler for that. It's best to always implement error handler right from the beginning.


Note: 
RxJava is single threaded by default.
To understand different operations visually: http://rxmarbles.com/

References:

http://reactivex.io/tutorials.html
https://gist.github.com/staltz/868e7e9bc2a7b8c1f754
http://docs.couchbase.com/developer/java-2.0/observables.html
http://blog.danlew.net/2014/09/15/grokking-rxjava-part-1/
http://blog.danlew.net/2015/12/08/error-handling-in-rxjava/



Saturday, January 21, 2017

Scaling your Data Storage

The traditional approach of storing all data is - store it in a single data source and handle all read-write operations in a more centralized fashion. In this approach, you keep adding more memory and/or processing power (or buy a bigger and more powerful machine) as the load on your system grows. Well, this works for most cases, but a time will come when this doesn't scale. Upgrading hardware doesn't always help to achieve scalability.

So obvious solution to this problem is - start partitioning your data. 
This post covers possible partitioning approaches.

Let's take an example of an online system where you are storing user profile details and photographs (of course, along with other details). 

Dividing Schema / Scale Up

One way to partition data is - store profile detail on one server and photographs on other.  This way only specific read and write queries are sent and hence scaling is possible. This approach of partitioning is known as vertical partitioning. This approach basically divides your schema. So if you want to fetch complete data of a given user, you need to fetch data from two different data sources. 

Replicating Schema / Sharding / Scale Out

In this approach, you replicate the schema and then decide what data goes where. So all instances are exactly the same.  In this approach, profile details, as well as the photograph of a user, will be in a single instance.

Advantages:
  • If an instance goes down, only some of the users will get affected (High Availability), so your system overall doesn't get impacted. 
  • There is no master-slave thing here so we can perform write in parallel. Writing is a major bottleneck for many systems. 
  • We can load balance our web server and access shard instances on different paths, this leads to faster queries. 
  • Data which are used together are stored together. This leads to data denormalization, but need to keep in mind that, it doesn't mean that data is not separated logically (so we are still storing profile and photographs logically separate). So no complex join, data is read and written in one operation. 
  • Segregating data in smaller shards helps in performance, as it can remain in cache. It also makes managing data easier, fast backup and restore. 


Function as First Class Citizens in Java

Java (in version 8) added support for treating functions as first-class citizens. Before this, Java programmers had to wrap standalone functions with Runnables or other anonymous interface implementation class to be able to refer to them. 

Function as first-class citizen/type/object/entity/value:
Functions can be passed around like any other value!

What this basically means is that-

  •  functions can be passed as arguments to other functions, 
  • functions can be returned as values from other functions, 
  • functions can be assigned to variables 
  • functions can be stored in data structures. 

Before Java 8, Objects and primitives data types were treated as first-class citizens (Classes and Functions were treated as second class as they were not values). But, functions are just as valuable to manipulate and reuse. Java inherited this modern feature from other functional programming languages (like JavaScript, Scala). This will help in improving productivity as programmers now can code at the different level of abstraction. 

Read more in detail about what is first class, here


Lambda expression enables you to provide an implementation of the functional interfaces (which has only one method ) directly inline and they treat the whole expression as an instance of the functional interface. Below, we will see how lambda enables functional interfaces to be treated as first-class objects.


Function As Value

java.util.function package (ref) provides multipurpose functional interfaces which come in handy.


import java.util.function.BiFunction;
import java.util.function.Predicate;

/**
 * Creates Inline lambda functions and stores as variable. And then function can be called directly from variable.
 */
public class FunctionAsValue {
    public static void main(String[] args){
        Predicate<String> predicate = (s) -> s!=null || !s.isEmpty ();
        System.out.println(predicate.test ("geekrai"));

        //static method which confirms to Predicate method signature can be stored in invoked
        Predicate<String> pedicateRef = FunctionAsValue::isNotEmpty;
        System.out.println(pedicateRef.test ("geekrai"));

        //BiFunction accepts two arguments and produces a result
        BiFunction<String, String, String> concat = (s, t) -> s+t;
        System.out.println(concat.apply ("hello", " world"));
    }

    private static boolean isNotEmpty(String s){
        return s!=null || !s.isEmpty ();
    }
}


Output:

true
true
hello world


Function As Parameter




import java.util.function.Function;

/**
 * Illustrates how to pass function as parameter
 */
public class FunctionAsParam {
    public static void main(String[] args){
        //pass function as parameter to transform method
        System.out.println(transform ("geek", "Rai", s -> s.toUpperCase ()));

        Function<String, String> toLower = s -> s.toLowerCase ();

        System.out.println(transform("geek", "Rai", toLower));
    }

    private static String transform(String a, String b, Function<String,String> toUpper){
        if(toUpper != null){
            a = toUpper.apply (a);
            b = toUpper.apply (b);
        }
        return a + b;
    }
}

Output:
GEEKRAI
geekrai

--happy functional programming !!!


Friday, January 6, 2017

Pure Functions

This post discusses one of the important aspects of functional programming, Pure Function using Java example.

Pure functions are side-effect free functions or stateless functions. The output of pure functions depends purely (or completely) on input. What side-effect free basically means is that pure functions should not modify argument as well as any other state variables. 

Now, the obvious question is why is it so important. We will see later how it's one of the important ingredients for functional and Reactive programming. Let take a simple Java class to understand pure functions.


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package functional;

import java.util.Random;

public class PureFunction {
 private int state = 420;

 // pure function
 public int addTwo(int x) {
  return x + 2;
 }

 // Impure function
 public int addState(int x) {
  return x + state;
 }

 // Impure Function
 public int randomizeArgument(int x) {
  Random r = new Random(1000);
  return x + r.nextInt(1000);
 }
}

Method, addTwo purely depends on its input/argument. No matter how many times you call it, it will behave in a predictable manner (i.e just add 2 to the argument and return the added number as the response). While the response of the other two methods (addState and randomizeArgument) is not going to be consistent even if it's called with same argument multiple times. The response of method addState depends on the instance variable of the class. If it gets mutated somewhere else, the response of the method will not be consistent. Similarly, the randomizedArgument method response will vary depending on the random value.

Let's cover some of the important points

  • If a pure function references any instance variable of the class, then that instance variable should be declared as final.
  • Pure functions help in highly concurrent and scalable environment by making the behavior predictable. 
  • You get parallelism free of cost if your method doesn't access a shared mutable data to perform its job. 

--happy learning !!!