Thursday, March 13, 2014

Playing With Some Java Code To Interact with Cassandra

In earlier posts here and here; I introduced a problem that I am trying to solve with Cassandra as well as some tools I am using.

 My proto-type is still WIP and I will provide the whole thing shortly, but for now I will just toss out some JAVA code that I have written so far.

Recall:
  • I am using cassandra 2.0.5
  • I am using CCM to administer a local cluster.
  • In addition to cqlsh, I am using DataStax DevCenter for ICQL stuff

The sample code for this post and the companion posts is on github at https://github.com/fwelland/CassandraStatementTools.

The tools I am using for building this code are: 
  • Java 1.7.0_51
  • Netbeans 7.4
  • Gradle 1.10  and Gradle 1.3.0 plugin for Netbeans. 
  • Datastax JavaDriver  2.0.0 rc2
Finally, assume the follow class level attributes:

    private Cluster cluster;
    private Session session;

Here are some of the basic imports too:

import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Cluster.Builder;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;

Connection To a Cassandra Cluster

So here is the connect code:

    public void connect()
    {
        Builder bob = Cluster.builder(); 
        for(HostPort node : nodes)
        {
            bob.addContactPoint(node.getHost());
            Integer p = node.getPort(); 
            if(null != p)
                bob.withPort(p);
        }
        cluster = bob.build();
        Metadata metadata = cluster.getMetadata();
        verbosef("Connected to cluster: %s\n", metadata.getClusterName());
        for (Host host : metadata.getAllHosts())
        {
            verbosef("Datatacenter: %s; Host: %s; Rack: %s\n", host.getDatacenter(), host.getAddress(), host.getRack());
        }
        session = cluster.connect();
    }

Weed through some of the fluff and there is really 3 steps here:
  1. Adding a contact point(s) to a builder
  2. Have the builder make a Cluster connection
  3. From the cluster connection make a session.
It is the session that will be used for doing queries and other stuff.

Putting A Record Into A Table

So here is the code for inserting a record:

    private void addStatement()
            throws IOException
    {
        Calendar c = Calendar.getInstance();
        c.setTime(statementDate);
        int year = c.get(Calendar.YEAR);
        int day = c.get(Calendar.DAY_OF_MONTH);
        int month = c.get(Calendar.MONTH);
        String fileName = statementFile.getName();
        
        ByteBuffer buffer;
        try (RandomAccessFile aFile = new RandomAccessFile(statementFile, "r"); FileChannel inChannel = aFile.getChannel())
        {
            long fileSize = inChannel.size();
            buffer = ByteBuffer.allocate((int) fileSize);
            inChannel.read(buffer);
            buffer.rewind();
        }            
        Insert i = QueryBuilder.insertInto(keyspace,table);
        i.value("archived_statement_id", UUID.randomUUID());
        i.value("customer_id", customerId);
        i.value("day", day);
        i.value("month", month);
        i.value("year", year); 
        i.value("statement_type", statementType); 
        i.value("statement_filename", fileName);
        i.value("statement", buffer);
        i.setConsistencyLevel(clevel); 
        session.execute(i);        
    }

Here are some highlights of this hunk of code:
  • This code is using a fluid style insert builder.  I could have, for example strung a bunch of the value().value().value() together. 
  • As mentioned earlier, the session is the object used to do query interactions.

Fetching A Record From a Table

And here is some fetch code:

   public void selectStatements()
            throws Exception
    {
        Select q = QueryBuilder.select("archived_statement_id", "customer_id", "day", "month","year","statement_type", "statement_filename").from(keyspace, table);
        if(null != statementUUID)
        {
            q.where(eq("archived_statement_id", statementUUID));
        }
        
        if(null != clevel)
        {
            q.setConsistencyLevel(clevel);            
        }
        
        ResultSet rs = session.execute(q);
        for(Row r : rs)
        {
            System.out.println("uuid:  " + r.getUUID("archived_statement_id").toString());
        }
    }

And the salient points of this code:

  • This is a better example of the fluid query api!
  • The if/statementUUID stuff will make more sense when I add the stuff for selecting on a few different criteria -- for now this code only support PK selects. 
  • The consistency level stuff will come into play later. 

That's it for now.   I will be filling in some blanks in upcoming posts. 

No comments:

Post a Comment