Thursday, December 12, 2019

Important Configuration Parameters of Kafka Producers

The list of Kafka configurations for Producer is quite large. But, the good news is that you are not forced to configure all of them; Kafka provides a default for most of them.  This may work for most of the cases. But, if you are particular about the performance, reliability,  throughput, latency then it's worth revisiting them and customizing as per your specific need. 

This post, I will cover some of the important configurations. 
Kafka Reference - here.


compression.type

Default value = none. (i.e. No compression).
Available values = none, gzip, snappy, lz4

This is the algorithm that will be used by the producer (sitting in your application) to compress data before sending them to the brokers. If multiple messages are getting batched together before sending then this configuration improves performance. Enabling compression will reduce network utilization and storage. Snappy (invented by Google) provides decent compression ratios with low CPU overhead. Gzip, typically provides a better compression ratio but uses more CPU. So if network bandwidth is limited choose Gzip else go for Snappy. 

batch.size

Default value = 16384 (i.e. 16K bytes)

Kafka Producer batches messages for each partition before sending them to the specific partition. This parameter controls the amount of memory (in bytes) which will be used for each batch. Kafka producer uses batch size and the timeout (linger.ms) to decide when to send. The producer will try to accumulate as many messages are possible (<= batch.size) and then send all of them in one go. If the batch size is very small, the Producer will be sending messages more frequently (0 value will disable batching).  A larger batch size may waste some memory as the allocated memory might not get fully utilized. 


linger.ms

Default value = 0

This value allows the Producer to group together records/messages before they get sent to the broker. This is the amount of time in milliseconds for which the producer will wait for accumulating messages in a batch. If this value is not set (default), then the producer will send messages as and when they arrive. Latency will be minimum for the default value. Setting this value to say, 5 will increase the latency but at the same time, it will also increase throughput (as you can send more messages in one go, so less overhead per message). If there is no load then setting it to 5 will increase latency by up to 5 ms. 


acks

Default value = 1

This controls the number of acknowledgments the producer requires the leader to have received before considering a request complete. This affects the durability of the message.

acks=0

The message is considered to be written successfully to Kafka if the producer managed to send it over then network. 


Resolving ClassNotFoundException in Java

ClassNotFoundException is a checked exception and subclass of java.lang.Exception. Recall that, checked exceptions need to be handled either by providing try/catch or by throwing it to the caller. You get this exception when JVM is unable to load a class from the classpath. So, troubleshooting this exception requires an understanding of how classes get loaded and the significance of classpath.

Root Cause of java.lang.ClassNotFoundException

Java doc of java.lang.ClassNotFoundException puts it quite clearly. It gets thrown in below cases when the class definition is not found-
  • Load class using forname method of class Class
  • Load class using findSystemClass method of ClassLoader
  • Load using loadClass method of ClassLoader
References:
http://javaeesupportpatterns.blogspot.in/2012/11/javalangclassnotfoundexception-how-to.html 

Distributed Data System Patterns

I have published the post on Medium, here



Monday, December 2, 2019

Performance Parameters for a System

Performance is characterized by the amount of useful work accomplished by a computer system compared to the time and resources used.

Depending on the context, this may involve achieving one or more of the following:
  • Short response time/low latency for a given piece of work
  • High throughput (rate of processing work)
  • Low utilization of computing resource(s)

Response Time / Latency

The time between a client sending a request and receiving the response. The response time is what the client sees which includes the service time of the request and network & queuing delay. 

Even if you make the same request time and again you will see a varying response time on every try. In practice, service or application handling a variety of request, the response time can vary a lot. One obvious reason is, a request for a user having a lot of data will be slower than another user which doesn't have much data. Other reasons could be - random additional latency, loss of a network packet during TCP transmission, GC pause, page fault forcing read from disk, other mechanical or network faults. That's why we need to think of response time not as a single number but as a distribution of values. 

If 95th percentile (p95) response time is 1.5 seconds, that means 95 out of 100 requests take less than 1.5 seconds, and 5 out of 100 requests take 1.5 seconds or more. 

low latency - achieving a short response time - is the most interesting aspect of performance, because it has a strong connection with physical (rather than financial) limitations.

In a distributed system, there is a minimum latency that cannot be overcome: the speed of light limits how fast information can travel, and hardware components have a minimum latency cost incurred per operation (think RAM and hard drives but also CPUs).


Throughput

The number of requests or records which can be processed per second, or the total time it takes to run a job on a dataset of a certain size.

There are tradeoffs involved in optimizing for any of these outcomes. For example, a system may achieve higher throughput by processing larger batches of work thereby reducing operational overhead. The tradeoff would be longer response times for individual pieces of work due to batching.

Resource Utilization

We want the optimal usage of the hardware resources which includes CPU, RAM, Network bandwidth. Or, in other words, do more with fewer resources. This will help in the scaling of the system. 

Saturday, July 20, 2019

Good practices for Accessing Couchbase Programatically

Couchbase is one of the most popular distributed, fault-tolerant, and highly performant Document-oriented as well as a key-value store. Like any other database (relational or NoSQL), Couchbase provides language-specific Client SDK to access DB and perform CRUD operations.  You can also access Couchbase through the command-line tool, cbc or from the Couchbase web console. 

This post will focus on some of the good practices for accessing Couchbase.  I will be using Java SDK for this post; the concepts are applicable for any language though.

Initialize the Connection

All operations performed against Couchbase (cluster) is through Bucket instance.  From the relational world perspective, Bucket is the database instance.  And to create bucket instance, we need to have the instance of the Cluster

The important point to be noted is that we should only create ONE connection to the Couchbase cluster and ONE connection to each bucket, and then statically reference those connections for use across the application. Reusing the connections will ensure that underlying resources are utilized to the fullest. 

// connects to cluster running on localhost
Cluster cluster = CouchbaseCluster.create();   
// Connects cluster on 10.0.0.1 and if it fails then tries 10.0.0.2
Custer cluster = CouchbaseCluster.create("10.0.0.1", "10.0.0.2");

Now, the Cluster instance is created, we can create Bucket instance to complete the initialization.

// Opens the default bucket
Bucket bucket = cluster.openBucket();   
// Opens connections to demo bucket
Bucket bucket = cluster.openBucket("demo");   
// Opens connections to SECURED demo bucket
Bucket bucket = cluster.openBucket("demo", "p@ssword");   

Tips#
It's good practice to pass at least two IP addresses in the create method.  At the time of this call if the first host is down (for some reason); 2nd host will be tried. These IPs will be used only during initialization. If there is only one host and it's down, then you are out of luck and bucket instance will not be created!

Thoughts on GC friendly programmig

Garbage Collections in Java gets triggered automatically to reclaim some of the occupied memory by freeing up objects. Hotspot VM divides the Heap into different memory segments to optimize the garbage collection cycle. It mainly separates objects into two segments - young generation and old generation.

Objects get initially created into young gen. Young gen is quite small and thus minor garbage collection runs on it. If objects survive the minor GC; then they get moved to the old gen. So it's better to use short-lived and immutable objects than long-lived mutable objects.

Minor GC is quite fast (as its runs on smaller memory segment) and hence it's less disruptive. The ideal scenario will be that GC never compacts old gen. So if full GC can be avoided you will achieve the best performance.

So a lot depends on how you have configured your heap memory and another important factor is do you code keeping in mind these aspects.

http://www.ibm.com/developerworks/library/j-leaks/
http://stackoverflow.com/questions/6470651/creating-a-memory-leak-with-java/6471947#6471947

Graph DFS traversal

This post, I will be focusing on Depth-First traversal. 

The difference between BFS and DFS traversal lies in the order in which vertices are explored. And this mainly comes due to the data structure used to do traversal. BFS uses queue whereas DFS uses a stack data structure to perform traversal. Below diagram illustrates the order of traversal. 


DFS Progress

DFS Implementation

DFS implementation uses UndirectedGraph.java from the BFS post. Below class has two implementations of DFS traversals (recursive and iterative). Method, traverse(..) is stack based implementation whereas traverseRecur(..) is recursive implementation.

package graph.algo;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;

/**
 * Depth-First Traversal of a graph represented as adjacency list
 * 
 * @author Siddheshwar
 * 
 */
public class DFS<V> {
 private Graph<V> graph; // graph on which DFS will be performed
 private Deque<V> stack; // Deque used to implement stack
 private Set<V> visited; // stores set of visited nodes

 public DFS(Graph<V> graph) {
  this.graph = graph;
  stack = new ArrayDeque<>();
  visited = new LinkedHashSet<>(); // to maintain the insertion order
 }

 /**
  * Iterative/stack based DFS implementation
  * 
  * @param source
  */
 public void traverse(V source) {
  Objects.requireNonNull(source, "source is manadatory!");
  if (this.graph == null || this.graph.isEmpty()) {
   throw new IllegalStateException(
     "Valid graph object is required !!!");
  }

  stack.push(source);
  this.markAsVisited(source);
  boolean pop = false;
  V stackTopVertex;
  System.out.print("  " + source);

  while (!stack.isEmpty()) {

   if (pop == true)
    stackTopVertex = stack.pop();
   else
    stackTopVertex = stack.peek();

   List<V> neighbors = graph.getAdjacentVertices(stackTopVertex);

   if (!neighbors.isEmpty()
     && hasUnvisitedNeighbor(neighbors, visited)) {
    for (V a : neighbors) {
     if (!this.isVertexVisited(a)) {
      System.out.print("  " + a);
      visited.add(a);
      stack.push(a);
      // break from loop if an unvisited neighbor is found
      break;
     }
    }
   } else {
    // if all neighbors are visited
    pop = true;
   }
  }
 }

 /**
  * Recursive implementation of DFS
  * 
  * @param source
  */
 public void traverseRecur(V source) {
  Objects.requireNonNull(source, "source is manadatory!");

  this.markAsVisited(source);
  System.out.print(" " + source);

  // get neighbors in sorted manner
  List<V> neighbors = this.graph.getAdjacentVertices(source);

  for (V n : neighbors) {
   if (!this.isVertexVisited(n)) {
    traverseRecur(n);
   }
  }
 }

 /**
  * checks if any of the neighbor is unvisited
  * 
  * @param neighbors
  * @param visited
  * @return
  */
 private boolean hasUnvisitedNeighbor(List<V> neighbors, Set<V> visited) {
  for (V i : neighbors) {
   if (!visited.contains(i))
    return true;
  }
  return false;
 }

 /**
  * Returns true if vertex is already visited
  * 
  * @param i
  * @return
  */
 private boolean isVertexVisited(V i) {
  return this.visited.contains(i);
 }

 /**
  * Mark a vertex visited
  * 
  * @param i
  */
 private void markAsVisited(V i) {
  this.visited.add(i);
 }

 // test method
 public static void main(String[] args) {
  Graph<Integer> graph = new Graph<>();
  graph.addEdge(1, 2);
  graph.addEdge(1, 5);
  graph.addEdge(1, 6);
  graph.addEdge(2, 3);
  graph.addEdge(2, 5);
  graph.addEdge(3, 4);
  graph.addEdge(5, 4);

  // for undirected graph
  graph.addEdge(2, 1);
  graph.addEdge(5, 1);
  graph.addEdge(6, 1);
  graph.addEdge(3, 2);
  graph.addEdge(5, 2);
  graph.addEdge(4, 3);
  graph.addEdge(4, 5);

  System.out.print("DFS -->");
  DFS<Integer> dfs = new DFS<>(graph);

  /**
   * stack based DFS traversal
   */
  dfs.traverse(1);

  /**
   * Recursive DFS traversal
   */
  // dfs.traverseRecur(1);

  // validation
  /**
   * after traversal; stack should be empty. And visited should have
   * vertices in DFS order
   */
  System.out.println("\nIs Stack is empty :"
    + (dfs.stack.isEmpty() ? "yes" : "no"));
  System.out.println("visited :" + dfs.visited);

 }

}

Output:
DFS -->  1  2  3  4  5  6
Is Stack is empty :yes
visited :[1, 2, 3, 4, 5, 6]



Time Complexity

Assuming above implementation of Graph i.e. Adjacency List, |V| is number of vertices and |E| is number of edges. 
So complexity is O(|V|+|E|)

--
happy learning !!!