Thursday, October 30, 2014

ToroDB Adventures: Adding unimplemented commands

From

> db.createCollection("test")
{ "ok" : 0, "errmsg" : "Unimplemented command: create", "code" : 1000002 }


to

> db.createCollection("test")
{ "ok" : 1 }


in five simple steps:

1.) Find out how the command is called internally. toroDB tells you that in the message:

"Unimplemented command: create"

so it is "create".

2.) Look up the "create" command. Start in QueryCommandProcessor.java with the QueryCommandGroup enum which contains all enums of all known commands. Since it is an administration command, you'll find it in AdministrationQueryCommand.java.

3.)  "create" does nothing at the moment, so add some code.
I just copied it from "createIndexes" so

create,

becomes now

create {
        @Override
        public void doCall(RequestBaseMessage queryMessage, BSONDocument query, ProcessorCaller caller) throws Exception {
            caller.create(query);
        }
    },


4.) Now, ProcessorCaller needs to know about the new command. It's  enclosed in QueryCommandProcessor.java, so by adding

public void create(@Nonnull BSONDocument document) throws Exception {
queryCommandProcessor.create(document, messageReplier);
}

to ProcessorCaller AND the prototype

public void create(@Nonnull BSONDocument document, @Nonnull MessageReplier messageReplier) throws Exception;

to the enclosing QueryCommandProcessor class, make it known.

5.) Implement the command in ToroQueryCommandProcessor.java, which is the actual implementation of QueryCommandProcessor:

@Override
    public void create(BSONDocument document, MessageReplier messageReplier) {
      Map keyValues = new HashMap();
      keyValues.put("ok", MongoWP.OK);
       
      String collectionName = (String) document.getValue("create");
      Boolean capped = (Boolean) document.getValue("capped");
      Boolean autoIndexId = (Boolean) document.getValue("autoIndexId");
      Boolean usePowerOf2Sizes = (Boolean) document.getValue("usePowerOf2Sizes");
      Double size = (Double) document.getValue("size");
      Double max = (Double) document.getValue("max");
       
      BSONDocument reply = new MongoBSONDocument(keyValues);
      messageReplier.replyMessageNoCursor(reply);
    }


And that's pretty much it. As of now it just reads all allowed values from the command and acknowledges OK. But now everything is set to make it a "real" command if needed.

Monday, October 27, 2014

The case for map/reduce with relational databases

I was at pgconf.eu and held a lightning talk about map/reduce with PostgreSQL. Upfront, I was asked "Why do you want to do that anyway?" and my initial response was like, "Because I can.". :-)

But that got me thinking about the real case behind the idea. What is the heart of map/reduce? Citing from the original paper:

"MapReduce is a programming model and an associated implementation for processing and generating large data sets." Note the word "sets"?

"Users specify a map function that processes a key/value pair to generate a set of intermediate key/value pairs, and a reduce function that merges all intermediate values associated with the same intermediate key.", or, expressed in a more formal way

map (k1,v1)                -> list(k2,v2)
reduce (k2,list(v2))    -> list(v2)


Please note, that this is a bit more precise than the initial definition. The paper explains:

"I.e., the input keys and values are drawn from a different domain than the output keys and values. Furthermore, the intermediate keys and values are from the same domain as the output keys and values."

This is important, because the authors are introducing a domain transformation of the input data here. That is, in my opinion, already the heart of map/reduce.

Going back to the initial definition, this is basically what all RDBMS already do when processing parallel queries, be it by builtin ability or bolted on like with PL/Proxy + PostgreSQL: In the first step the input set is broken down to partitions, then the query runs on that partitions in parallel and produces intermediate result sets and finally that intermediate result sets are aggregated to the final result set. But the formal definition above adds a little twist, the domain transformation.

To clarify this, I'll use the canonical example, counting words in a text. The map function converts semi structured data, a text with lines of arbitrary length, into a well structured set of key (a word) and value (its count) tuples. This is the difference and the key to the power of map/redcue.
The ability to handle semi structured data which the relational model usually does not handle very well. (And I won't say unstructured data. Truly unstructured data is statistical noise.)

But modern RDBMS, especially PostgreSQL, often already have functions to transform semi structured data into relations and/or allow for user defined functions to extend their capabilities and that allows for running a map/reduce type job inside a RDBMS. Still, why would somebody want to do this?

1.) Integration
An awful lot of data is stored in relational models and will stay there. Simultaneously, especially for analytical workloads which become more and more important, the need for integrating relational and semi-structured data grows. Why handle them in different systems when one will do?
This decision of course heavily depends on the real world requirements. But rest assured that the datacenter guys who have to run the show will like to operate one database better than 2..n.

2.) Sets
Remember the word "sets" from the initial definition? And now the definition of a "relation" in a RDBMS:

"R is a relation on these n domains if it is a set of elements of the form (d1, d2, ..., dn) where dj ∈ Dj for each j=1,2,...,n." (E. F. Codd (Oct 1972). "Further normalization of the database relational model". "Data Base Systems". Courant Institute: Prentice-Hall. ISBN 013196741X.)

If a relation is a set of tuples with values from some domain D and map/reduce does domain transformation on key/value pairs (aka. tuples) what does that call for? Right, a very efficient set processor. Since relational DBMS are very efficient set processors by nature, they allow for
writing compact map/reduce functions that are also less error prone due to the declarative nature of SQL.

To clarify what I mean take a look at the following map and reduce functions for wordcount written for MongoDB in JavaScript from here:

var map = function() { 
    var summary = this.summary;
    if (summary) {
        // quick lowercase to normalize per your requirements
        summary = summary.toLowerCase().split(" ");
        for (var i = summary.length - 1; i >= 0; i--) {
            // might want to remove punctuation, etc. here
            if (summary[i])  {      // make sure there's something
               emit(summary[i], 1); // store a 1 for each word
            }
        }
    }
};

var reduce = function( key, values ) {   
    var count = 0;   
    values.forEach(function(v) {           
        count +=v;   
    });
    return count;
}

db.so.mapReduce(map, reduce, {out: "word_count"})


PostgreSQL:

For the code, see my previous post.

Well, while it seems to require more code than MongoDB, there is a subtle difference. The most PostgreSQL code is standard boilerplate to write a set returning function and to make PL/Proxy work. Once you got that right, you usually never have to look back. The actual work is done in two lines of SQL:

SELECT TRIM(both from word),count(1) FROM (SELECT regexp_split_to_table(line, E'\\W+') as word FROM kjb) w GROUP BY word

and 

SELECT word,sum(count) FROM map_kjb() AS (word text, count bigint) WHERE word != '' GROUP BY word

IMHO, the ability to express this with extensively tried and tested functions instead of having to implement them yourself combined with strong typing is worth so much, that one should give it a try before looking somewhere else. Granted, raw performance may become so paramount that an alternative technology might be called for, but if you already use PostgreSQL now there is another reason to like and not to leave it.

And this is it, the long answer I didn't already had ready at the conference.

P.S. It also allows to move computation instead of data around.

Monday, October 20, 2014

pgconf.eu 2014

On the worker nodes:

 CREATE TABLE kjb
(
  line text
);



CREATE OR REPLACE FUNCTION map_kjb()
  RETURNS SETOF record AS
$BODY$
DECLARE r record;
BEGIN
FOR r IN (SELECT TRIM(both from word),count(1) FROM (SELECT regexp_split_to_table(line, E'\\W+') as word FROM kjb) w GROUP BY word) LOOP
RETURN NEXT r;
END LOOP;
RETURN;
END;
$BODY$
  LANGUAGE plpgsql VOLATILE
  COST 1000
  ROWS 1000;


On the head node:

CREATE TYPE t_mr_wc AS (word text, count bigint);

CREATE OR REPLACE FUNCTION map_kjb()
  RETURNS SETOF record AS
$BODY$
CLUSTER 'head';
RUN ON ALL;
$BODY$
  LANGUAGE plproxy VOLATILE
  COST 1000
  ROWS 1000;


CREATE OR REPLACE FUNCTION reduce_kjb()
  RETURNS SETOF t_mr_wc AS
$BODY$ DECLARE result public.t_mr_wc; BEGIN
FOR result IN select word,sum(count) from map_kjb() AS (word text,count bigint) where word != '' group by word LOOP RETURN NEXT result; END LOOP; END; $BODY$
  LANGUAGE plpgsql VOLATILE
  COST 100
  ROWS 100;


1. Load data into tables.

2. Get word count:

SELECT * FROM reduce_kjb();

3. Get top 21 word counts: 

SELECT * FROM reduce_kjb() ORDER BY 2 DESC LIMIT 21;