Saturday, July 25, 2015

HBase Spring Data Sample

Document  Version 1.0

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:hdp="http://www.springframework.org/schema/hadoop"
    xmlns:p="http://www.springframework.org/schema/p"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsdxmlns
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
    http://www.springframework.org/schema/hadoop http://www.springframework.org/schema/hadoop/spring-hadoop.xsd">

    <context:component-scan base-package="springdt" />
    <context:property-placeholder location="hbase.properties"/>

    <hdp:configuration id="hadoopConfiguration">
      fs.defaultFS=hdfs://127.0.0.1:9000
    </hdp:configuration>

    <hdp:hbase-configuration configuration-ref="hadoopConfiguration" zk-quorum="127.0.0.1" zk-port="2181"/>

    <bean id="hbaseTemplate" class="org.springframework.data.hadoop.hbase.HbaseTemplate">
        <property name="configuration" ref="hbaseConfiguration"/>
    </bean>
<bean id="hBaseService" class="springdt.HBaseService"/>

</beans>


The "HbaseService" bean is the only bean of this sample. We will show the code right away.

4. Write code accessing HBase

In this sample we will create a table in HBase called "report", and try store some data in the table. The table has one column family called "data", and only one column named "file". The value of "file" is file name, which could be report name in real life.

package springdt;

import javax.inject.Inject;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.support.AbstractApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.data.hadoop.hbase.HbaseTemplate;
import org.springframework.data.hadoop.hbase.TableCallback;
import org.springframework.stereotype.Service;

/**
 * Demonstrating how to access HBase using Spring Data for Hadoop.
 * 
 * @author swang
 * 
 */
@Service
public class HBaseService {

@Autowired
private Configuration hbaseConfiguration;

@Inject
private HbaseTemplate hbTemplate;

// Table info
final String tableName = "report";
final String columnFamilyData = "data";
final String colFile = "file";
final String rowNamePattern = "row";
final String value = "report24.csv-";

/**

* @throws Exception
*/

public void run() throws Exception {
// 1. create table
createTable();
// 2. add data entry
addData();
}

/**
* Creates HBase table

* @throws Exception
*/
public void createTable() throws Exception {
HBaseAdmin admin = new HBaseAdmin(hbaseConfiguration);

if (admin.tableExists(tableName)) {
admin.disableTable(tableName);
admin.deleteTable(tableName);
}

HTableDescriptor tableDes = new HTableDescriptor(tableName);
HColumnDescriptor cf1 = new HColumnDescriptor(columnFamilyData);
tableDes.addFamily(cf1);
admin.createTable(tableDes);
}

/**
* Adds data entry for report.
*/
private void addData() {
hbTemplate.execute(tableName, new TableCallback<Boolean>() {

public Boolean doInTable(HTableInterface table) throws Throwable {
for (int i = 0; i < 1000; i++) {
Put p = new Put(Bytes.toBytes(rowNamePattern + i));
p.add(Bytes.toBytes(columnFamilyData),
Bytes.toBytes(colFile), Bytes.toBytes(value + i));
table.put(p);
}
return new Boolean(true);
}
});
}

public static void main(String[] args) throws Exception {
AbstractApplicationContext ctx = new ClassPathXmlApplicationContext(
"SpringBeans.xml");

HBaseService hBaseService = (HBaseService) ctx.getBean("hBaseService");
hBaseService.run();
}
}

The "SpringBeans.xml" we created in step 3 should be put in "src/main/resources", so that it could be found at runtime.

5. Run the sample

We assume you have HBase over Hadoop installed. Make sure the HBase version is compatible with Hadoop. In my case I have "hbase-1.0.1.1" and "hadoop-2.7.0".
Now start HBase server.
Run "HBaseService" in Eclipse as java application. In Eclipse console  you will see output like this:




2015-07-25 15:10:52 INFO  ClientCnxn:852 - Socket connection established to 127.0.0.1/127.0.0.1:2181, initiating session
2015-07-25 15:10:52 INFO  ClientCnxn:1235 - Session establishment complete on server 127.0.0.1/127.0.0.1:2181, sessionid = 0x14ec4b8cf30000e, negotiated timeout = 90000
2015-07-25 15:10:53 INFO  HBaseAdmin:978 - Started disable of report
2015-07-25 15:10:55 INFO  HBaseAdmin:1033 - Disabled report
2015-07-25 15:10:55 INFO  HBaseAdmin:738 - Deleted report


"HBaseService" should have created 1000 ehtries in HBase table "report". Check this with hbase shell:



hbase(main):003:0> scan "report"
ROW          COLUMN+CELL
row0          column=data:file, timestamp=1437823593865, value=report24.csv-0
row1          column=data:file, timestamp=1437823593897, value=report24.csv-1
...
row999     column=data:file, timestamp=1437823603565, value=report24.csv-999 
1000 row(s) in 2.0600 seconds



Now we are done with the simple example. Have fun!


Friday, September 13, 2013

ActiveMQ Configuration for persistent Message Delivery


Document  Version 1.0

Copyright © 2012-2013 beijing.beijing.012@gmail.com


Wether message is delivered persistent or not, depends on both message producer. i.e. the client, and configuration of server, in our case ActiveMQ. 

1. On the Client Side 


Message Producer can tell ActiveMQ to deliver message in persistent mode in 2 ways:

- with MessageProducer#setDeliveryMode(..), this set the default delivery mode for all messges sent with this producer.
- MessageProducer# send(..), this set delivery mode on message level, i.e.  set delivery mode for each message.


2. On the Server Side 


If persistence delivery is not enabled on ActiveMQ, messages will not be delivered in persistence mode even if message producer  set  delivery mode as "PERSISTENT"

2.1 Set persistent mode

With ActiveMQ, persistent node is set on broker tag of activemq.xml

The default:

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">

When no persistent properties is set  for broker, the default is  persistent="true". i.e.


<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" persistent="true" >


2.2 Configure persistence Adapter


When broker is configured to run in persistence mode, it requires a "persistenceAdapter", for example:


     <persistenceAdapter>
            <kahaDB directory="${activemq.base}/data/kahadb"/>
        </persistenceAdapter>

The above is also the default configuration, if no persistence apdapter is configured in activemq.xml

2.3 Pesistence Store Usage

Having configured persistence adapter as "khdb", as next step we need to tell how much disk size is available for "khdb" to store messages:

   <systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage limit="64 mb"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="100 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="50 gb"/> <!-- default was 50 gb -->
                </tempUsage>
            </systemUsage>
        </systemUsage>

When no "systemUsage" section is configured, the ActiveMQ use default 100gb for "khdb" storeUsage.
When there is no enough space available on the disk, ActiveMQ will show ERROR/WARN message at startup like:

2013-09-13 09:05:15,136 | WARN  | Store limit is 102400 mb, whilst the data directory: /activemq/data/localhost/KahaDB only has 20603 mb of usable space | org.apache.activemq.broker.BrokerService | main

ActiveMQ can be started and live with such error/warnings, But if you are running a production system, be sure to check if you really need more disk.

2.4 Running in "NON_PERSISTENT" mode


When broker is configured with "persistent=false", then ActiveMQ persists no messges at all in DB even if client set delivery mode as "persistent".

Running in "persistent=false" mode, ActiveMQ use default MemoryPersistenceAdapter. All messages resides in ActiveMQ' system memory. When the limit of the system usage is reached, message will be stored in "tempStore" on file system. 



<systemUsage>
            <systemUsage>
                <memoryUsage>
                    <memoryUsage limit="10 gb"/>
                </memoryUsage>
                <storeUsage>
                    <storeUsage limit="100 gb"/>
                </storeUsage>
                <tempUsage>
                    <tempUsage limit="50 gb"/> <!-- default was 50 gb -->
                </tempUsage>
            </systemUsage>
        </systemUsage>


Here the "memoryUsage" of system memory, is RAM, and is not the whole memory used by ActiveMQ, but only the memory used for storing messages. When the total available RAM on the machine is less than what is configured above, ActiveMQ can still start and works, but logs error info like:


2013-09-13 09:49:19,078 | ERROR | Memory Usage for the Broker (10240 mb) is more than the maximum available for the JVM: 981 mb | org.apache.activemq.broker.BrokerService | main


My test machine has only 8gb RAM, but here above configuration requires 10gb for ActiveMQ's in memory store.


Beside the above error, at sartup, my ActiveMQ show also following error info:


  2013-09-13 09:49:19,078 | ERROR | Temporary Store limit is 51200 mb, whilst the temporary data directory: /activemq/data/localhost/tmp_storage only has 20570 mb of usable space | 


This is because, ActiveMQ is configured to use 50GB temp space on disk, howeve my machine has only about 20gb free.


The configuration for "storeUsage" here is not used by ActiveMQ, sicnce in non-persistent mode, no data is writen is written to DB.




Monday, July 15, 2013

HTML5 WebSocket Sample with Java Servlet on Server Side


Document  Version 1.0

Copyright © 2012-2013 beijing.beijing.012@gmail.com


Beside Server-Sent Events, WebSocket is an another important feature introduced by HTML5 for browser to update web page content automatically from server.  The most important differences between WebSocket and Server-Sent Events are:  

  • Server-Sent Events is one way communication whereas WebSocket use full-duplex channel.
  • Server-Sent Event is HTTP,  WebSocket introduced new protocol "ws" protocol

Here I am  not going compare WebSocket and Server-Sent Events in detail, I will just show  you a simple WebSecket sample to get a feeling.


In the example below, browser opens a window, send text message to server. Server just send message back to browser, shown in the window. (by the way, the sample could be extended with small effort to work as a chat application ...)


The client is just a html page with embedded javascript. The Server is a Java Servlet web application running on Tomcat 7.0.29. (WebSocket, "ws" protocol support with Tomcat since version 7.0.24 ).



1. Create a simple web application "WebSocketSample"


User your favorite IDE (I use Eclipse) to create a Web Application. Be sure to use servlet 3.x style.  
The sample web application consists of :
  • two java classes, which are the server side code
  • one "html" page which is the client


2.  MyWebSocketServlet.java



The sample uses Tomcat implementation of of "WebSocket", so we need to have "catalina.jar" on class path.
Add servlet-api.jar to class path as well.


package sample;

import java.util.concurrent.ConcurrentHashMap;

import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpSession;

import org.apache.catalina.websocket.StreamInbound;
import org.apache.catalina.websocket.WebSocketServlet;

/**
 * WebSocketServlet is contained in catalina.jar. It also needs servlet-api.jar
 * on build path
 *
 * @author wangs
 *
 */
@WebServlet("/wsocket")
public class MyWebSocketServlet extends WebSocketServlet {

private static final long serialVersionUID = 1L;

// for new clients, <sessionId, streamInBound>
private static ConcurrentHashMap<String, StreamInbound> clients = new ConcurrentHashMap<String, StreamInbound>();

@Override
protected StreamInbound createWebSocketInbound(String protocol,
HttpServletRequest httpServletRequest) {

// Check if exists
HttpSession session = httpServletRequest.getSession();

// find client
StreamInbound client = clients.get(session.getId());
if (null != client) {
return client;

} else {
client = new MyInBound(httpServletRequest);
clients.put(session.getId(), client);
}

return client;
}

public StreamInbound getClient(String sessionId) {
return clients.get(sessionId);
}

public void addClient(String sessionId, StreamInbound streamInBound) {
clients.put(sessionId, streamInBound);
}
}




3.  MyInbund.java


Since Tomcat's WsOutbound has indirect dependency on "tomcat-koyote.jar", please add this jar to class path to resolve compile error like "the hierarchy of the type ... is inconsistent ..."


package sample;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;

import javax.servlet.http.HttpServletRequest;

import org.apache.catalina.websocket.MessageInbound;
import org.apache.catalina.websocket.WsOutbound;

/**
 * Need tomcat-koyote.jar on class path, otherwise has compile error "the hierarchy of the type ... is inconsistent"
 * @author wangs
 *
 */
public class MyInBound extends MessageInbound{

private String name;

private WsOutbound myoutbound;

public MyInBound(HttpServletRequest httpSerbletRequest) {

}
@Override
public void onOpen(WsOutbound outbound) {
System.out.println("on open..");
this.myoutbound = outbound;
try {
this.myoutbound.writeTextMessage(CharBuffer.wrap("hi, what's your name?"));

} catch (Exception e) {
throw new RuntimeException(e);
}

}

@Override
public void onClose(int status) {
System.out.println("Close client");
//remove from list
}

@Override
protected void onBinaryMessage(ByteBuffer arg0) throws IOException {

}

@Override
protected void onTextMessage(CharBuffer inChar) throws IOException {

System.out.println("Accept msg");
CharBuffer outbuf = CharBuffer.wrap("- " + this.name + " says : ");
CharBuffer buf = CharBuffer.wrap(inChar);

if(name != null) {
this.myoutbound.writeTextMessage(outbuf);
this.myoutbound.writeTextMessage(buf);
} else {
this.name = inChar.toString();

CharBuffer welcome = CharBuffer.wrap("== Welcome " + this.name + "!");
this.myoutbound.writeTextMessage(welcome);
}

this.myoutbound.flush();

}

}




4.  wssockrt.html




<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<title>Tomcat web socket</title>
<script type="text/javascript">
var ws = new WebSocket("ws://localhost:8080/WebSocketSample/wsocket");
ws.onopen = function () {
};

   ws.onmessage = function(message) {
document.getElementById("msgArea").textContent += message.data + "\n";              
   };

   function postToServer() {
ws.send(document.getElementById("msg").value);
document.getElementById("msg").value = "";
}

function closeConnect() {
ws.close();
}
</script>
</head>

<body>
  <div>
<textarea rows="4" cols="100" id="msgArea" readonly></textarea>
</div>
<div>
<input id="msg" type="text"/>
<button type="submit" id="sendButton" onclick="postToServer()">Send</button>
</div>
</body>
</html>


5.  Export ".war" and deploy it on a running Tomcat 



Export the "WebSocketSample.war"and deploy it on a running Tomcat server.

Open the link below with browser:

http://localhost:8080/WebSocketSample/wsocket.html

You will see:



Now type your name and send:

So it woks!


6.  The "WebSocket" Header  


When we take a close look at the headers of the Request and Response above:





We will see, browser is not speaking "http" with server any more,  now it speaks "websocket" protocol.