Intro to Lua and Openresty, Part 6: Data Processing Sink

Posted on March 6, 2017

In Part 5 of this series, we connected to and wrote some JSON data to a table in redis. This next post will demonstrate how to use write a simple (but fast and reliable) data processing sink with redis as our queue to store messages in before processing them.

For this example, we will use the resty.redis lua module.

If you are following along in the code, we are here.

Requirements for the basic data processing sink

This exercise is getting further into the unknown, and it might take a few steps before we have a working solution.

Got stuck iterating over various redis client libraries to find one that would work..

Testing hiredis

Let’s test the connection, we’ll go to examples/07-redis-data-sink/tests/ for this. Here is our code:

local cjson = require "cjson"
local redis  = require "hiredis"
local client, err, err_code = hiredis.connect("127.0.0.1", 6379)
if not client then
  print("failed to connect to redis..")
  print("error: " .. err)
  print("code:  " .. err_code)
  return
end

Here’s our Makefile:

build:
        docker build --tag=ping-redis:7 --rm=true .

run:
        docker run -d --name redis --net host -p 127.0.0.1:6379:6379 redis:alpine
        docker run -d --name ping-redis  --net host -p 127.0.0.1:8000:8000 ping-redis:7

fail:
        docker run -it --rm --entrypoint /usr/local/openresty/luajit/bin/luajit ping-redis:7 ping.lua

dev:
        docker run -it --rm -v `pwd`:/src ping-redis:7

dev-redis:
        docker run -d --name redis --net host -p 127.0.0.1:6379:6379 redis:alpine
        docker run --rm -it --name ping-redis  --net host -v `pwd`:/src ping-redis:7

clean:
        docker stop redis       || true
        docker stop ping-redis  || true
        docker rm   redis       || true
        docker rm   ping-redis  || true

Here’s our Dockerfile:

FROM openresty/openresty:alpine-fat

RUN /usr/local/openresty/luajit/bin/luarocks install lua-hiredis
ADD *.lua /src/
WORKDIR /src/
ENTRYPOINT /bin/sh

Build that image:

ᐅ make build
docker build --tag=ping-redis:7 --rm=true .
Sending build context to Docker daemon 4.608 kB
Step 1 : FROM openresty/openresty:alpine-fat
 ---> 366babf2b04d
Step 2 : RUN /usr/local/openresty/luajit/bin/luarocks install lua-hiredis
 ---> Using cache
 ---> 9f523055586e
Step 3 : ADD *.lua /src/
 ---> 2dd66d628a62
Removing intermediate container d5a31e9154d6
Step 4 : WORKDIR /src/
 ---> Running in 6a2c97caef1c
 ---> 4070a74cbaaa
Removing intermediate container 6a2c97caef1c
Step 5 : ENTRYPOINT /bin/sh
 ---> Running in 7b9f72e426e2
 ---> 101646a10ba3
Removing intermediate container 7b9f72e426e2
Successfully built 101646a10ba3

First test, with no redis available:

ᐅ make fail
docker run -it --rm --entrypoint /usr/local/openresty/luajit/bin/luajit ping-redis:7 ping.lua
failed to connect to redis..
error: Connection refused
code:  1

Test with redis available:

ᐅ make run
docker run -d --name redis --net host -p 127.0.0.1:6379:6379 redis:alpine
39980a15a0465e637019bb29c3a531398a3362cf35b9623a29c712e3d8f49208
docker run -it --rm --net host --entrypoint /usr/local/openresty/luajit/bin/luajit ping-redis:7 ping.lua
PONG:
true

YAY!

Let’s test some push /// pop /// RPOPLPUSH..

For this test, we’ll write another stand-alone script that simulates a few steps in the general workflow the worker goes through when processing a single item on the queue. This will do the following:

The push/pop workflow looks like:

 +------+
 | JSON |
 +--+---+
    |
    |
    V
+--------+
|producer|
+---+----+
    |
    |
    |  LPUSH    +---+---+---+---+    RPOPLPUSH    +---+---+---+---+
    +---------> | d | c | b | a +->-->---+--->--->+ x | . | . |   |
                +---+---+---+---+        |        +---+---+---+---+
                                         |          |
                                         V          V LREM when done
                                     +---+----+
                                     |Consumer|
                                     +--------+

Relevant docs for these redis operations:

Note that it’s also worth understanding the difference between RPOPLPUSH and BRPOPLPUSH (the blocking variant).

The code for our test is simple but a tad verbose:

local redis  = require "hiredis"
-- return redis client, or fail and exit
connect = function (host)
  local rc, err, err_code = hiredis.connect(host, 6379)
  if not rc then
    print("failed to connect to redis..")
    print("error: " .. err)
    print("code:  " .. err_code)
    os.exit(1)
  else
    return rc
  end
end
-- send a PING to redis and print True/False for PONG as result
ping_pong = function(client)
  print("PONG:")
  print(client:command("PING") == hiredis.status.PONG)
end
-- push a few test keys simulating a writing producer
push_keys = function(client)
  rc:command("LPUSH", "enqueued", "a")
  rc:command("LPUSH", "enqueued", "b")
  rc:command("LPUSH", "enqueued", "c")
  rc:command("LPUSH", "enqueued", "d")
end
-- return the lua table that is the redis list, in full
get_list = function(client, list)
  return client:command("LRANGE", list, 0, -1)
end
-- for the lua table t, print the key/value pairs (one level)
print_table = function(t)
  for k,v in pairs(t) do
    print(k, v)
  end
end
-- wrap redis RPOPLPUSH
rpoplpush = function(q, p)
  return rc:command("RPOPLPUSH", q, p)
end
-- wrap redis LREM
drop = function(tbl, key)
  return rc:command("LREM", tbl, 1, key)
end
--
-- MAIN
rc = connect("127.0.0.1")
ping_pong(rc)
-- push some keys to the q
push_keys(rc)
-- print out those keys
print("LRANGE enqueued:")
q = get_list(rc, "enqueued")
print_table(q)
-- RPOPLPUSH one key over to processing
pop = rpoplpush("enqueued", "processing")
print("pop the queue, now processing: " .. pop)
-- retrieve and print the two lists as they are now..
q = get_list(rc, "enqueued")
print("queue is now:")
print_table(q)
--
p = get_list(rc, "processing")
print("processing is now:")
print_table(p)
-- ok, we're done with the key, let's drop it
print("done with:")
print(pop)
print("drop from processing..")
print(drop("processing", pop))
-- retrieve and print the two lists as they are now..
q = get_list(rc, "enqueued")
print("queue:")
print_table(q)
--
p = get_list(rc, "processing")
print("processing:")
print_table(p)
-- goodbye redis
rc:close()

This is our first time defining functions, they are in the form:

foo = function(args)
  stmt
end

Here is our Makefile:

build:
        docker build --tag=push-pop:7 --rm=true .

run:
        docker run -d --name redis --net host -p 127.0.0.1:6379:6379 redis:alpine
        docker run -it --rm --net host --entrypoint /usr/local/openresty/luajit/bin/luajit push-pop:7 queue-test.lua

dev:
        docker run -it --rm -v `pwd`:/src push-pop:7

dev-redis:
        docker run -d --name redis --net host -p 127.0.0.1:6379:6379 redis:alpine
        docker run --rm -it --name push-pop  --net host -v `pwd`:/src push-pop:7

clean:
        docker stop redis     || true
        docker stop push-pop  || true
        docker rm   redis     || true
        docker rm   push-pop  || true

Here is our Dockerfile:

FROM openresty/openresty:alpine-fat

RUN /usr/local/openresty/luajit/bin/luarocks install lua-hiredis
ADD *.lua /src/
WORKDIR /src/
ENTRYPOINT /bin/sh

Let’s build the image:

ᐅ make build
docker build --tag=push-pop:7 --rm=true .
Sending build context to Docker daemon 18.94 kB
Step 1 : FROM openresty/openresty:alpine-fat
 ---> 366babf2b04d
Step 2 : RUN /usr/local/openresty/luajit/bin/luarocks install lua-hiredis
 ---> Using cache
 ---> 9f523055586e
Step 3 : ADD *.lua /src/
 ---> ffd368d4359d
Removing intermediate container cd9f8b513598
Step 4 : WORKDIR /src/
 ---> Running in bc6b44a78bcc
 ---> df60616db41a
Removing intermediate container bc6b44a78bcc
Step 5 : ENTRYPOINT /bin/sh
 ---> Running in 5eca9952a6c1
 ---> f57567ee339a
Removing intermediate container 5eca9952a6c1
Successfully built f57567ee339a

Run the quick tests:

ᐅ make run
docker run -d --name redis --net host -p 127.0.0.1:6379:6379 redis:alpine
f898eff2d7ff61368757992277e58e427d70504eb2f4f128e324bcc8d4255c43
docker run -it --rm --net host --entrypoint /usr/local/openresty/luajit/bin/luajit push-pop:7 queue-test.lua
PONG:
true
LRANGE enqueued:
1       d
2       c
3       b
4       a
pop the queue, now processing: a
queue is now:
1       d
2       c
3       b
processing is now:
1       a
done with:
a
drop from processing..
1
queue:
1       d
2       c
3       b
processing:

OK, Let’s get more serious..

We’ll need three pieces to this puzzle:

The worker’s logic would look like:

while true
  item = RPOPLPUSH(q, p)
  process(item)
  donedrop(item)
  sleep(delay)

To run tests on this stack, we will also want a 4th component, a producer.lua that fills redis with some keys for the worker to process.

Having run the simpler tests in this exercise, we can now complete the primary goals for this exercise:

Let’s start with the Dockerfile:

FROM openresty/openresty:alpine-fat

ENV REDIS_HOST 127.0.0.1
RUN /usr/local/openresty/luajit/bin/luarocks install lua-hiredis
RUN /usr/local/openresty/luajit/bin/luarocks install lua-cjson
ADD *.lua /src/
WORKDIR /src/
ENTRYPOINT /bin/sh

The Makefile:

build:
        docker build --tag=sink:7 --rm=true .

run:
        docker run -d --name redis --net host -p 127.0.0.1:6379:6379 redis:alpine
        docker run -d --name sink  --net host --entrypoint /usr/local/openresty/luajit/bin/luajit sink:7 worker.lua

dev:
        docker run -it --rm --entrypoint /bin/sh -v `pwd`:/src sink:7

dev-redis:
        docker run -d --name redis --net host -p 127.0.0.1:6379:6379 redis:alpine
        docker run --rm -it --name sink --net host --entrypoint /bin/sh -v `pwd`:/src sink:7

clean:
        docker stop redis || true
        docker stop sink  || true
        docker rm   redis || true
        docker rm   sink  || true

logs:
        docker logs -f sink

cat-posts:
        docker exec -it redis redis-cli -c LRANGE enqueued 0 -1

load-redis:
        docker exec sink /usr/local/openresty/luajit/bin/luajit /src/producer.lua

app-shell:
        docker exec -it sink  /bin/sh

redis-shell:
        docker exec -it redis redis-cli

Here is our producer.lua:

local cjson = require "cjson"
local redis = require "hiredis"
-- names for our lists in redis
local q     = "enqueued"
-- return redis client, or fail and exit
connect = function (host)
  local rc, err, err_code = hiredis.connect(host, 6379)
  if not rc then
    print("failed to connect to redis..")
    print("error: " .. err)
    print("code:  " .. err_code)
    os.exit(1)
  else
    return rc
  end
end
-- push a key to the queue
enqueue = function(key)
  rc:command("LPUSH", q, key)
end
--
-- MAIN
rc = connect(os.getenv("REDIS_HOST"))
assert(rc)
for l=1, 100000
do
  print("enqueue: " .. l)
  date_fmt = "%m-%d-%Y--%H-%M-%S"
  enqueue(cjson.encode({timestamp = os.date(date_fmt), msg = "hi! this is " .. l}))
  l = l + 1
end

..and the worker.lua:

local cjson = require "cjson"
-- install lua-hiredis
local redis = require "hiredis"
-- names for our lists in redis
local q     = "enqueued"
local p     = "processing"
local delay = 0.001
-- return redis client, or fail and exit
connect = function (host)
  local rc, err, err_code = hiredis.connect(host, 6379)
  if not rc then
    print("failed to connect to redis..")
    print("error: " .. err)
    print("code:  " .. err_code)
    os.exit(1)
  else
    return rc
  end
end
-- retrieve work from redis, store it in "processing" table
get_work = function()
  return rc:command("RPOPLPUSH", q, p)
end
-- "do" the work
process = function (i)
  print(i)
end
-- work is done, drop it from the processing table
dondrop = function (i)
  return rc:command("LREM", p, 1, i)
end
-- pause for a moment..
-- could also use socket.sleep(sec) from the "socket" library
sleep = function(t)
  os.execute("sleep " .. tonumber(t))
end
--
-- MAIN
rc = connect(os.getenv("REDIS_HOST"))
assert(rc)
-- loop doing work until you can't
while true do
  item, err, code = get_work(q, p)
  if item.name == "NIL" then
    -- pass
  else
    --print("got item!")
    process(item)
    dondrop(item)
    sleep(delay)
  end
end
rc:close()

Let’s build the docker image:

ᐅ make build
docker build --tag=sink:7 --rm=true .
Sending build context to Docker daemon 69.12 kB
Step 1 : FROM openresty/openresty:alpine-fat
 ---> 366babf2b04d
Step 2 : ENV REDIS_HOST 127.0.0.1
 ---> Using cache
 ---> de5f965dde03
Step 3 : RUN /usr/local/openresty/luajit/bin/luarocks install lua-hiredis
 ---> Using cache
 ---> 7c775cdb7262
Step 4 : RUN /usr/local/openresty/luajit/bin/luarocks install lua-cjson
 ---> Using cache
 ---> ae1368fbc83c
Step 5 : ADD *.lua /src/
 ---> df004e1f873f
Removing intermediate container f99ad0182f1e
Step 6 : WORKDIR /src/
 ---> Running in c83514aa60cf
 ---> 532d74acb351
Removing intermediate container c83514aa60cf
Step 7 : ENTRYPOINT /bin/sh
 ---> Running in 779030fb4864
 ---> f487b5262d67
Removing intermediate container 779030fb4864
Successfully built f487b5262d67

Run redis and the worker:

ᐅ make run
docker run -d --name redis --net host -p 127.0.0.1:6379:6379 redis:alpine
57f9820ed7091761dddfd2547391c6509e4c065d36ef0f2c989ff00fdc4e8950
docker run -d --name sink  --net host --entrypoint /usr/local/openresty/luajit/bin/luajit sink:7 worker.lua
2bf77a4d780c516d6685c5c1ef6c8e5a1b7be9ec07ae7c1d6a8618bd0cccde68

We should see them with docker ps:

ᐅ docker ps
CONTAINER ID  IMAGE        COMMAND                CREATED        STATUS   PORTS NAMES
2bf77a4d780c  sink:7       "/usr/local/openresty" 32 seconds ago Up 32 seconds  sink
57f9820ed709  redis:alpine "docker-entrypoint.sh" 32 seconds ago Up 32 seconds  redis

In one shell/terminal, watch the logs..

ᐅ make logs
docker logs -f sink

Load up redis with a bunch of data (100k keys):

ᐅ make load-redis | tail
enqueue: 99991
enqueue: 99992
enqueue: 99993
enqueue: 99994
enqueue: 99995
enqueue: 99996
enqueue: 99997
enqueue: 99998
enqueue: 99999
enqueue: 100000

As soon as that starts, you should see log activity from the worker, something like:

...
{"timestamp":"03-05-2017--03-06-08","msg":"hi! this is 99995"}
{"timestamp":"03-05-2017--03-06-08","msg":"hi! this is 99996"}
{"timestamp":"03-05-2017--03-06-08","msg":"hi! this is 99997"}
{"timestamp":"03-05-2017--03-06-08","msg":"hi! this is 99998"}
{"timestamp":"03-05-2017--03-06-08","msg":"hi! this is 99999"}

Continue on to (another) interlude, Exploring package.path