On the worker nodes:
CREATE TABLE kjb(
line text
);
CREATE OR REPLACE FUNCTION map_kjb()
RETURNS SETOF record AS
$BODY$
DECLARE r record;
BEGIN
FOR r IN (SELECT TRIM(both from word),count(1) FROM (SELECT regexp_split_to_table(line, E'\\W+') as word FROM kjb) w GROUP BY word) LOOP
RETURN NEXT r;
END LOOP;
RETURN;
END;
$BODY$
LANGUAGE plpgsql VOLATILE
COST 1000
ROWS 1000;
On the head node:
CREATE TYPE t_mr_wc AS (word text, count bigint);CREATE OR REPLACE FUNCTION map_kjb()
RETURNS SETOF record AS
$BODY$
CLUSTER 'head';
RUN ON ALL;
$BODY$
LANGUAGE plproxy VOLATILE
COST 1000
ROWS 1000;
CREATE OR REPLACE FUNCTION reduce_kjb()
RETURNS SETOF t_mr_wc AS
$BODY$ DECLARE result public.t_mr_wc; BEGIN
FOR result IN select word,sum(count) from map_kjb() AS (word text,count bigint) where word != '' group by word LOOP RETURN NEXT result; END LOOP; END; $BODY$
LANGUAGE plpgsql VOLATILE
COST 100
ROWS 100;
1. Load data into tables.
2. Get word count:
SELECT * FROM reduce_kjb();
3. Get top 21 word counts:
SELECT * FROM reduce_kjb() ORDER BY 2 DESC LIMIT 21;
No comments:
Post a Comment