In the previous article we discussed usage of hll countes for finding the number of unique values in a collection. But in this article we will see how to use them in Cassandra db internally in order to provide such functionality using user defined functions(UDF) and aggregates(UDA).

Important

This approach can work only with Cassandra 2.2.x. In Cassandra 3.x there are more restrictions for UDF so it is impossible to use external libraries inside user functions. In the future may be there will be the way to turn off the restrictions(see CASSANDRA-9892) but now it is impossible as well.

Software

For this example I use:

  • docker
  • docker-machine
  • docker-compose
  • cassandra:2.2.8
  • hll-library - hyperloglog
  • sbt
  • Datastax DevCenter - for testing CQL queries. You can use cqlsh if it is more convenient for you

Steps

  • Install the necessary software
  • Check out code of the example from here: https://github.com/coffius/koffio-uda-hll
  • sbt assembly - build the jar file the hll
  • docker build - build a docker image for Cassandra with our library inside
  • docker-compose up - start the created Cassandra container inside docker
  • Test hll counters - there are hll_uda.cql and test_data.cql with example queries for Cassandra

More information about how exactly it works you can find below.

Simple example of UDA

At first let’s take a look at how to define a very simple custom aggregate function:

-- A simple example of creating functions for a user defined aggregate
CREATE OR REPLACE FUNCTION udf.avg_state ( state tuple<int,bigint>, val int ) CALLED ON NULL INPUT RETURNS tuple<int,bigint> LANGUAGE java AS
  'if (val !=null) { state.setInt(0, state.getInt(0)+1); state.setLong(1, state.getLong(1)+val.intValue()); } return state;';

CREATE OR REPLACE FUNCTION udf.avg_final ( state tuple<int,bigint> ) CALLED ON NULL INPUT RETURNS double LANGUAGE java AS
  'double r = 0; if (state.getInt(0) == 0) return null; r = state.getLong(1); r/= state.getInt(0); return Double.valueOf(r);';

CREATE AGGREGATE IF NOT EXISTS udf.avg ( int )
SFUNC avg_state STYPE tuple<int,bigint> FINALFUNC avg_final INITCOND (0,0);

For one aggregate we need two functions - a state function and a final function. The first accumulates the state and the second makes final calculation using the state from the first one. More information about it you can find here in official docs.

Prepare HLL for Cassandra

The first thing that we have to do is to implement two functions that we can use in Cassandra aggregate. We cannot implement necessary logic directly in CQL-query like above because it would be too complex. For example, it is impossible to import classes right in CQL-code, so we need to use full names for classes like io.koff.full.path.to.Clazz. So we do it using Java and then add the result jar into the Cassandra classpath in the docker container.

Example is here: HLL.java

This is our state function:

/**
 * State function. Aggregates values to hll counter
 * @param state byte buffer with serialized HLL counter
 * @param value the value to add to counter
 * @return hll counter with the new value
 */
public final static ByteBuffer stateFunc(ByteBuffer state, int value) {
	int capacity = state.capacity();
	// if it is the first call then state buffer should be empty
	if(capacity == 0) {
		HyperLogLog.HyperLogLogBuilder builder = new HyperLogLog.HyperLogLogBuilder();
		HyperLogLog hll = builder.build();
		hll.addInt(value);
		return serialize(hll);
	} else {
		// if there is data in state buffer then we need to use it
		HyperLogLog hll = deserialize(state);
		hll.addInt(value);
		return serialize(hll);
	}
}

And this is our final function:

public final static long finalFunc(ByteBuffer state) {
	HyperLogLog hll = deserialize(state);
	return hll.count();
}

Important note - both functions should depend only on their parameters(see pure functions)

Deploy code

Next thing we should do is to add our code to Cassandra and enable UDF in Cassandra cofnig in order to make possible to use aggregates.

At first we build a fat jar(jar with all dependencies) using sbt-assembly plugin.

in build.sbt:

assemblyJarName in assembly := "hll.jar"
// we do not need test classes in the result jar
test in assembly := {}
// define an output name of the result jar in the root folder. A bit easier for docker to find it there
target in assembly := new java.io.File(".")
//we do not need the scala library in the result jar
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
// compile java code with compatibility with JDK 7 which is used for Cassandra inside Docker
javacOptions ++= Seq("-source", "1.7", "-target", "1.7")

in project/assembly.sbt:

// use sbt-assembly for building a fat jar
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")

Then we add the jar file to the docker container along with the patched Cassandra config where the flag enable_user_defined_functions is set to true:

Dockerfile

FROM cassandra:2.2.8

# copy the cassandra config with the permission for user defined functions
COPY ./cassandra.yaml /etc/cassandra/cassandra.yaml

# copy the jar with out implementation of hll functions
COPY ./hll.jar /usr/share/cassandra/lib/

Create UDA for HLL

The last step is to create UDA which will use our functions:

hll_uda.cql

-- Keyspace definition
CREATE KEYSPACE IF NOT EXISTS udf
WITH replication = {
	'class' : 'SimpleStrategy',
	'replication_factor' : 1
};

-- Functions for hll counters
CREATE OR REPLACE FUNCTION udf.hll_state ( state blob, val int ) CALLED ON NULL INPUT RETURNS blob LANGUAGE java AS
  'return io.koff.udf.HLL.stateFunc(state, val);';

CREATE OR REPLACE FUNCTION udf.hll_final ( state blob ) CALLED ON NULL INPUT RETURNS bigint LANGUAGE java AS
  'return io.koff.udf.HLL.finalFunc(state);';

-- we need an empty ByteBuffer at the first step so we make it from the empty string
CREATE AGGREGATE IF NOT EXISTS udf.hll ( int ) SFUNC hll_state STYPE blob FINALFUNC hll_final INITCOND textAsBlob('');

We need to use full names for our functions.

Test

Now we are ready to test it :)

Let’s make a simple table where we will store events about user visits for different wares and add some data in it:

CREATE TABLE IF NOT EXISTS udf.ware_hits (
	ware_id int,
	date_time timestamp,
	visitor_id int,
	PRIMARY KEY (ware_id, date_time)
);

INSERT INTO udf.ware_hits (ware_id, date_time, visitor_id) VALUES (1, toTimestamp(now()), 1);
INSERT INTO udf.ware_hits (ware_id, date_time, visitor_id) VALUES (1, toTimestamp(now()), 2);
INSERT INTO udf.ware_hits (ware_id, date_time, visitor_id) VALUES (1, toTimestamp(now()), 3);
INSERT INTO udf.ware_hits (ware_id, date_time, visitor_id) VALUES (1, toTimestamp(now()), 1);
INSERT INTO udf.ware_hits (ware_id, date_time, visitor_id) VALUES (1, toTimestamp(now()), 2);
INSERT INTO udf.ware_hits (ware_id, date_time, visitor_id) VALUES (1, toTimestamp(now()), 3);
INSERT INTO udf.ware_hits (ware_id, date_time, visitor_id) VALUES (1, toTimestamp(now()), 4);
INSERT INTO udf.ware_hits (ware_id, date_time, visitor_id) VALUES (1, toTimestamp(now()), 5);
INSERT INTO udf.ware_hits (ware_id, date_time, visitor_id) VALUES (1, toTimestamp(now()), 6);

So now there are 9 events for ware[id: 1] with 6 unique users. We can get this numbers using these queries:

SELECT count(visitor_id) as total_count FROM udf.ware_hits WHERE ware_id = 1; --total_count = 9
SELECT udf.hll(visitor_id) as unique_users FROM udf.ware_hits WHERE ware_id = 1; --out aggregate returns unique_users = 6

Afterword

It was shown how to implement custom aggregates for Cassandra by the example of hll counters. But before use this idea you also need to understand how aggregates work in Cassandra in the distributed environment. This information you can find in the articles here and here. Maybe Spark will be better choice in your case.


comments powered by Disqus