In the
announcement of
PipelineDB, Josh Berkus gave a hint that - and how - a similar functionality might be achieved with stock PostgreSQL. Let's see, if we can analyze a continuous twitter stream too...
First, we need a fast table to act as the ringbuffer:
CREATE UNLOGGED TABLE tstest.tstream
(
rec timestamp with time zone NOT NULL DEFAULT clock_timestamp(),
tevent jsonb NOT NULL,
CONSTRAINT pk_tstream PRIMARY KEY (rec)
)
WITH (
OIDS=FALSE
);
To inject new tweets into the buffer and trim the outdated ones, I used this little
injector, written in Java using the
hosebird client and the example code provided there.
Once it runs in it's infinite loop, the table is populated with new tweets, while tweets older than one hour are evicted every five minutes by courtesy of a fixed rate timer task. So, the maximum size of the buffer table is one hour and five minutes worth of tweets in the worst case.
The analysis view is taken verbatim from the PipelineDB example:
CREATE OR REPLACE VIEW tstest.tagstream as
SELECT jsonb_array_elements(tevent #>
ARRAY['entities','hashtags']) ->> 'text' AS tag
FROM tstest.tstream
WHERE rec >
( clock_timestamp() - interval '1 hour' );
Plus the missing ')' before ->> 'text'. :-)
As is the most popular tags query:
select tag, count(*) as tag_count from tstest.tagstream group
by tag order by tag_count desc limit 10;
Does it work? Yes.
Since I don't have access to my development machine right now, stay tuned for Part 2 with some results.