Wednesday, February 12, 2014

Arbitrary parallel (well, almost) ad-hoc queries with PostgreSQL

Contemporary PostgreSQL lacks the ability to run single queries on multiple cores, nodes etc., i.e. it lacks automatic horizontal scaling. While this seems to be under development, what can be done today?

PL/Proxy allows database partitioning and RUN ON ALL executes the function on all nodes simultaneously. PL/Proxy is limited to the partitioned execution of functions and has good reasons for this design. But PostgreSQL can execute dynamic SQL within functions, so let's see how far we can get.

Worker function (on all worker nodes):

CREATE OR REPLACE FUNCTION parallel_query(statement text)
  RETURNS SETOF record AS
$BODY$
DECLARE r record;
BEGIN
IF lower($1) LIKE 'select%' THEN
FOR r IN EXECUTE $1 LOOP
RETURN NEXT r;
END LOOP;
ELSE
RAISE EXCEPTION 'Only queries allowed';
END IF;
END
$BODY$
  LANGUAGE plpgsql VOLATILE;


Proxy function (on all head nodes):

CREATE OR REPLACE FUNCTION parallel_query(statement text)
  RETURNS SETOF record AS
$BODY$
 CLUSTER 'head'; RUN ON ALL;
$BODY$
  LANGUAGE plproxy VOLATILE;


Table (on all worker nodes):

CREATE TABLE users
(
  username text NOT NULL,
  CONSTRAINT users_pkey PRIMARY KEY (username)
)
WITH (
  OIDS=FALSE
);


With 10000 rows in two nodes, partitioned by username hash (~5000 on each node)

select * from parallel_query('select * from users') as (username text);

returns all 10000 rows. Since the nodes can be databases within the same server, there is no need for additional hardware, server installations etc. But if more performance is required in the future, adding more boxes is possible.

All it takes is logical partitioning and a bit of PL/pgSQL if you really need to run parallel queries.

There are some differences though. Take the following query:

select * from execute_query('select max(username) from users') as (username text);

"user_name_9995"
"user_name_9999"

It now returns two maximums, one for each partition. To get the expected result a second stage is needed:

select max(username) from execute_query('select max(username) from users') as (username text);

"user_name_9999"

The same applies for other aggregation functions like avg() etc.

The proxy function can finally be hidden in a VIEW:

CREATE OR REPLACE VIEW "users" AS select * from parallel_query('select * from users') as (username text);