EECS-4415M Project #3 Scan solution


Original Work


5/5 - (8 votes)

Pipelining is the other side of the coin of streaming. When an operation is pipelined, it only requires a single scan of the data. Thus, a pipelined operation can be potentially used to handle data-stream processing.

This assignment is to write a user-defined aggregate to handle a running-average aggregation in a one-pass manner (“streaming”). We shall use the database system PostgreSQL (often called just Postgres, for short).

The Stream

In this case, the “stream” will be a table. Your query will go through the table effectively in a single scan, computing the running aggregate of average for each group. A database engine, of course, can do more complicated things: sorting the table various times, making multiple passes, etc. And we could set things up to stream data into a query for the database engine to handle, rather than have the query operate over a table. We aim to write our query and user-defined aggregate in such a way that the query is executed in a purely pipelined way, needing to scan the table only a single time.

Let us define the table for the assignment as

create table Stream (
    id      int,
    grp     int,
    measure int,
    constraint streamPK
        primary key (id),
    constraint idNotNeg
        check (id >= 0),
    constraint grpNotNeg
        check (grp >= 0)

The table Stream will be populated with tuples with id‘s ascending, starting with 0. These are like timestamps for the arrival of the tuples. Adjacent tuples may be in the same group (grp). E.g.,

insert into Stream (id, grp, measure)
    ( 0, 0,  2),
    ( 1, 0,  3),
    ( 2, 1,  5),
    ( 3, 1,  7),
    ( 4, 1, 11),
    ( 5, 0, 13),
    ( 6, 0, 17),
    ( 7, 0, 19),
    ( 8, 0, 23),
    ( 9, 2, 29),
    (10, 2, 31),
    (11, 2, 37),
    (12, 5, 41),
    (13, 3, 43);

Of course, Stream could be very large. A “group“ of tuples are all adjacent tuples by id with the same grp value. Thus, there are six groups above. (Group value 0 is used twice, but for two different groups; group value 1 breaks the adjacency of 0‘s.) The grp values are just labels, so the order of the grp labels is immaterial (unlike the id‘s). And, of course, a single group’s sequence can be arbitrarily long.

We want to report for each tuple in the stream the running average of measure in its group. For the first tuple of a group, the average is just that tuple’s measure value itself. For the second tuple of the group, it is the average of the first and second tuples’ measure values. And so forth.

Run on Stream above, our SQL “stream-processing program” should produce the answer table

 id | grp | measure |     average      
  0 |   0 |       2 |                2
  1 |   0 |       3 |              2.5
  2 |   1 |       5 |                5
  3 |   1 |       7 |                6
  4 |   1 |      11 | 7.66666666666667
  5 |   0 |      13 |               13
  6 |   0 |      17 |               15
  7 |   0 |      19 | 16.3333333333333
  8 |   0 |      23 |               18
  9 |   2 |      29 |               29
 10 |   2 |      31 |               30
 11 |   5 |      37 |               37
 12 |   3 |      41 |               41
 13 |   3 |      43 |               42
(14 rows)

For this assignment, we shall use PostgreSQL, a truly excellent leading object-relational database system that is free and open-source.

Postgres is ubiquitous, and you will find it in use everywhere. Feel free to install it on your own box, if you want. It is reasonably straightforward. We use it for various purposes in the EECS department, too.

For ease for this assignment, we shall use Docker again, as we did for Project #2, and a Postgres container. Or you can use the Postgres server on Prism (the EECS computing services).

PostgreSQL by Container

Installation: Docker & the Postgres Image

Install Docker on your box (your computer), the community edition, if you have not already. (Refer to the FAQ in the previous assignment.) The Postgres image we shall use is simply called postgres. This is the officially endorsed PostgreSQL image from the Docker library. Install the image for Docker by

mybox% docker pull postgres

You can also find the postgres image at GitHubGitHub – docker-library/postgres: Docker Official Image packaging for Postgres. Documentation for working with the postgres image is at the Docker library site: postgres | Docker Documentation.

Postgres Container

Create a Postgres container in which you will run your job.

mybox% docker run --name Postgres -e POSTGRES_PASSWORD=Bleah -d postgres

The --name flag above gives a name to your container, to make it easier to access. You can give it whatever name you want; “Postgres” is a reasonable name. The -e flag sets an environment variable within the conatiner. Set the postgres password to what you want; you will need it if you start a psql shell within the container. The -d flag means the container will run in background as a “daemon” / service.

To stop your container running,

mybox% docker stop Postgres

And to start it up again,

mybox% docker start Postgres

The Postgres system is a client-server model. Running in the container is a Postgres database system. The utility psql is a command-line client — a shell — for Postgres. The container has psql installed.

Unlike the Hadoop container we used in Project #2, this Postgres container does not contain a skeleton linux with a bash shell that we can enter. It is meant to provide an operational Postgres server that applications (clients) from the “outside” can access. In fact, if you installed psql natively on your box, or, say, a popular GUI client for Postgres such as PGAdmin — and you configured the container correctly — then you could run applications natively on your host machine that would interact with the Postgres server “in the bottle” (that is, within the running container). (Note that such configuration is more involved than I am willing to document for this assignment.)

We can also use the psql shell available within the container. That is what we will do here. To invoke an interactive psql shell,

mbox% docker run -it --rm --link Postgres:postgres postgres psql -h postgres -U postgres

(Note it is “\q” to quit out of the shell.)

We can also put what we want into a file and “run” that file using psql. This will be our general approach. This is two steps. First, we have to put the file in the container’s file system where the container’s psql can find it. Second, we invoke the container’s psql to run the file.

Say you have our SQL in myfile.sql on your (host) machine. We use docker to copy it to within the container.

mbox% docker cp ./myfile.sql Postgres:/docker-entrypoint-initdb.d/myfile.sql

Then to execute it via psql:

mbox% docker exec -u postgres Postgres psql postgres postgres -f docker-entrypoint-initdb.d/myfile.sql

(Lots of occurrences of “postgres” in that command! Can you parse out which each is doing?)

PostgreSQL on Prism

We run a Postgres server named db in Prism. You are welcome to use it for this assignment.

Each student in the class has had an account created on db. A student’s db account name is the same as their Prism account name. The password has been set as the student’s student ID.


Setting up your own PostgreSQL Server

You are welcome to install Postgres on your own machine — or may have already — and use that for the assignment.

Just ensure that your solution works either in the containerized version as above or on db, the PostgreSQL server on Prism, before submitting it.


Make an SQL script that

  1. creates the stream, which is

    a. a table Stream with columns idgrp, and measure (like in the example above in §Pipeline ¶The Stream),

    b. populated with a few tuples for testing purposes,

  2. defines a user-defined aggregate function to handle the running average (call it, say, runningAvg), and
  3. issues an SQL query that exercises your aggregate runningAvg, and that does it in a way that is guaranteed to be one pass.

An example might help: see §Example: Running Sum below.


We want to ensure that our approach is pipelined. That is, Stream can be processed in a single pass. And second, we have to compute the running average in a safe way.

Thus, there are two things to avoid:

  1. anything in the query that would force multiple passes (e.g., group by or partition by);
  2. potential overflow computing the running average.

There is a 20% penalty each for violating these!

For the first, it is easy to be safe. Just stick closely to the “template” query in the example.

For the second, we want to compute the running average in such a way that we would not run into overflow problems or bad error accumulation for long running averages.

Computing mean incrementally

The “standard” way to accumulate the running mean (average) step by step would be to know the running sum — the sum of the numbers (x1,x2,) that we have seen so far — and the running count — how many numbers we have seen so far — at given step i. For the running sum:


We set s0=0 to start things off. Our count is just i. For our state at step i, we need to keep si and i. Then, the mean at step i is si/i.

However, imagine that our sequence is very long. The sum could grow to be very large, so large that we would overflow our int register.

Instead, we can calculate the mean, mi, directly (for i>0) at each step. (Let m0=0.)


Note that, in this case, for our state at step i, we need to keep mi and i. This looks better; it is a clear formulation for iteratively computing the average. But it really does nothing to address our overflow issue. Let us reformulate then.


Now, it is not a problem.

User-defined aggregate functions

A user-defined aggregate function needs two functions defined.

  1. state function

    This is called for each tuple iteratively that is part of the aggregation. it takes two arguments: a. state b. value

    The value is from the tuple, and is what we are aggregating with respect to. The state is an internal value, or record of values (accomplished via a composite type), used for the aggregate to hold information it needs.

    The return value of the state function is a new state.

  2. final function

    This extracts from the state the aggregate value representing the aggregation for return.

    It takes one argument: state.

    Note that the return value (the aggregate) must be the same type as what the aggregate function is aggregating over.

This last bit about the return type needing to be the same as the type that is being aggregated over is not always so convenient. Note that, for this assignment, the running average is over integer, but it returns a float for the aggregation value. To accommodate this, feel free to cast the integer measure to float, and write your aggregate function in terms of float.


The SQL standards include what are often called the “OLAP” extensions. These include window aggregation. What window aggregation allows one to do is to compute an aggregate per tuple with respect to a neighbourhood of that tuple. These are incredibly powerful extensions to SQL. For those interested to see more, look over Graeme Birchall’s SQL Cookbook, which is superb. His manual is for IBM DB2, but the SQL is standard, and so applies to Postgres too.

This is convenient in that one does not need to do an aggregation with respect to a group by, and then to join the results back with the initial “table”. In fact, a join could be fatal for our application; it could break the pipeline.

We can use window aggregation to “look” at the tuple before (or after) the “current” tuple to check its value (with respect to to some ordering of the tuples, order by id here). For instance, we can use this to see whether the current tuple is in the same group (grp) as the previous tuple. The example for running sum below uses this technique.

Example: Running Sum

Let us look at an SQL script that follows the three steps enumerated in §Strategy for computing a running sum. The script file: runningSum.sql. (You are welcome to use this as a starting template for your assignment.)

Create the stream

We mimic a stream with a table named Stream.

create table Stream (
    id      int,
    grp     int,
    measure int,
    constraint streamPK
        primary key (id),
    constraint idNotNeg
        check (id >= 0),
    constraint grpNotNeg
        check (grp >= 0)

And populate it with an simple example stream.

insert into Stream (id, grp, measure)
    ( 0, 0,  2),
    ( 1, 0,  3),
    ( 2, 1,  5),
    ( 3, 1,  7),
    ( 4, 1, 11),
    ( 5, 0, 13),
    ( 6, 0, 17),
    ( 7, 0, 19),
    ( 8, 0, 23),
    ( 9, 2, 29),
    (10, 2, 31),
    (11, 5, 37),
    (12, 3, 41),
    (13, 3, 43);

(Yes, this is the very same as above in §Pipeline ¶The Stream.)

You can, of course, test your code with a much longer stream. But you need not get carried away. A small sample run like this suffices.

The user-defined aggregate function

Running-sum‘s aggregate function uses a composite type as the type being aggregated over.

create type intRec as (
    number  int,
    restart boolean

The field number is the mesure we are summing. The field restart indicates whether this is the start of a tuple group.

The state function then is

create function runningSum_state(int, intRec)
returns int
language plpgsql
as $f$
    declare i alias for $1;
    declare a alias for $2;
    declare j int;
        if a.restart or i is null then
            j := a.number;
        elsif a.number is null then
            j := i;
            j := a.number + i;
        end if;
        return j;

The aggregate function’s internal state in this case is simply and integer (int). It holds the sum up through the present tuple.

The final function converts the state into an intRC. Recall that the output type of an aggregator has to be the same as the type being aggregated. For this, the value of the restart flag is immaterial.

create function runningSum_final(int)
returns intRec
language sql
as $f$
    select cast(($1, false) as intRec);

Finally, the aggregate function is declared.

create aggregate runningSum(intRec) (
    sfunc     = runningSum_state,
    stype     = int,
    finalfunc = runningSum_final

The field stype is declaring the type of the internal state.

The SQL query that pipelines Stream to compute the running sum is done as follows.

    -- look at the neighbour tuple to the left to fetch its grp value
    CellLeft (id, grp, measure, lft) as (
        select  id,
                    max(grp) over (
                        order by id
                        rows between
                        1 preceding
                        1 preceding ),
                    -1 )
        from Stream
    -- determine whether current tuple is start of a group
    CellStart(id, grp, measure, start) as (
        select  id,
                    when grp = lft then 0
                    else                1
                as boolean)
        from CellLeft
    -- bundle the measure and start-flag into an intRC
    CellFlag(id, grp, intRC) as (
        select  id,
                cast((measure, start) as intRec)
        from CellStart
    -- call our runningSum aggregator
    CellRun(id, grp, measure, runningRC) as (
        select  id,
                    over (order by id)
        from CellFlag
    -- extract the running sum from the composite
    CellAggr(id, grp, measure, running) as (
        select  id,
        from CellRun
-- report
select id, grp, measure, running
from CellAggr
order by id;

Create a file named runningAvg.sql, similar to the example of runningSum.sql, that implements the three steps enumerated in §Strategy, but for computing a running average.


Use the “submit” command on a PRISM machine to turn in your program.

% submit 4415 scan runningAvg.sql

Due: before midnight Monday 1 March.


My host OS is Windows. Will I have the headache of the “curse of DOS” again with the \r\n EOF’s?!

Fortunately, no. Postgres and psql seem to handle DOS format fine.

If measure were to be null for a record, how should it be handled?

For each tuple in the stream, you output a tuple with the group’s running average. If there has only been nulls for the measures, you would output a null. But once there is a non-null, you are effectively reporting the average of the non-nulls seen in that group so far.

PG hanko