Intro to Lua and Openresty, Part 6: Data Processing Sink
In Part 5 of this series, we connected to and wrote some JSON data to a table in redis. This next post will demonstrate how to use write a simple (but fast and reliable) data processing sink with redis as our queue to store messages in before processing them.
For this example, we will use the resty.redis
lua module.
If you are following along in the code, we are here.
Requirements for the basic data processing sink
- Basic
- stand-alone (no openresty or nginx)
- connect to redis
- watch queue for new items
- for each item in the queue, do something with it (print it to stdout for now)
- if there’s no work, just sit idle
This exercise is getting further into the unknown, and it might take a few steps before we have a working solution.
Got stuck iterating over various redis client libraries to find one that would work..
resty.redis
(which I used in exercise 6) is not available outside the openresty environment (and I’m not sure how to run lua in that env without nginx - or at least thengx
variable if not running in nginx).moon-redis
is a data modeling libraryredis-lua
appeared to have a dependency onresty.redis
(at least I believe so)hiredis
appears to work, but is a pile of C that hasn’t been updated since 2014, so there’s that (but supposedly used in hi-volume production).
Testing hiredis
…
Let’s test the connection, we’ll go to examples/07-redis-data-sink/tests/
for this. Here is our code:
local cjson = require "cjson"
local redis = require "hiredis"
local client, err, err_code = hiredis.connect("127.0.0.1", 6379)
if not client then
print("failed to connect to redis..")
print("error: " .. err)
print("code: " .. err_code)
return
end
Here’s our Makefile
:
build:
docker build --tag=ping-redis:7 --rm=true .
run:
docker run -d --name redis --net host -p 127.0.0.1:6379:6379 redis:alpine
docker run -d --name ping-redis --net host -p 127.0.0.1:8000:8000 ping-redis:7
fail:
docker run -it --rm --entrypoint /usr/local/openresty/luajit/bin/luajit ping-redis:7 ping.lua
dev:
docker run -it --rm -v `pwd`:/src ping-redis:7
dev-redis:
docker run -d --name redis --net host -p 127.0.0.1:6379:6379 redis:alpine
docker run --rm -it --name ping-redis --net host -v `pwd`:/src ping-redis:7
clean:
docker stop redis || true
docker stop ping-redis || true
docker rm redis || true
docker rm ping-redis || true
Here’s our Dockerfile
:
FROM openresty/openresty:alpine-fat
RUN /usr/local/openresty/luajit/bin/luarocks install lua-hiredis
ADD *.lua /src/
WORKDIR /src/
ENTRYPOINT /bin/sh
Build that image:
ᐅ make build
docker build --tag=ping-redis:7 --rm=true .
Sending build context to Docker daemon 4.608 kB
Step 1 : FROM openresty/openresty:alpine-fat
---> 366babf2b04d
Step 2 : RUN /usr/local/openresty/luajit/bin/luarocks install lua-hiredis
---> Using cache
---> 9f523055586e
Step 3 : ADD *.lua /src/
---> 2dd66d628a62
Removing intermediate container d5a31e9154d6
Step 4 : WORKDIR /src/
---> Running in 6a2c97caef1c
---> 4070a74cbaaa
Removing intermediate container 6a2c97caef1c
Step 5 : ENTRYPOINT /bin/sh
---> Running in 7b9f72e426e2
---> 101646a10ba3
Removing intermediate container 7b9f72e426e2
Successfully built 101646a10ba3
First test, with no redis available:
ᐅ make fail
docker run -it --rm --entrypoint /usr/local/openresty/luajit/bin/luajit ping-redis:7 ping.lua
failed to connect to redis..
error: Connection refused
code: 1
Test with redis available:
ᐅ make run
docker run -d --name redis --net host -p 127.0.0.1:6379:6379 redis:alpine
39980a15a0465e637019bb29c3a531398a3362cf35b9623a29c712e3d8f49208
docker run -it --rm --net host --entrypoint /usr/local/openresty/luajit/bin/luajit ping-redis:7 ping.lua
PONG:
true
YAY!
Let’s test some push /// pop /// RPOPLPUSH..
For this test, we’ll write another stand-alone script that simulates a few steps in the general workflow the worker goes through when processing a single item on the queue. This will do the following:
- connect to redis
- left push a few values to a list (3 or 4)
- use
RPOPLPUSH
to pop one value from theenqueued
list, and push that value to aprocessing
list - do something with that value (print it)
- use
LREM
to remove the item from the processing list.- this should work fine for basic situations, but would need more thorough testing to guard against race conditions across multiple workers (though it ought to be ok if the keys are unique)
The push/pop workflow looks like:
+------+
| JSON |
+--+---+
|
|
V
+--------+
|producer|
+---+----+
|
|
| LPUSH +---+---+---+---+ RPOPLPUSH +---+---+---+---+
+---------> | d | c | b | a +->-->---+--->--->+ x | . | . | |
+---+---+---+---+ | +---+---+---+---+
| |
V V LREM when done
+---+----+
|Consumer|
+--------+
Relevant docs for these redis operations:
Note that it’s also worth understanding the difference between RPOPLPUSH
and BRPOPLPUSH
(the blocking variant).
The code for our test is simple but a tad verbose:
local redis = require "hiredis"
-- return redis client, or fail and exit
connect = function (host)
local rc, err, err_code = hiredis.connect(host, 6379)
if not rc then
print("failed to connect to redis..")
print("error: " .. err)
print("code: " .. err_code)
os.exit(1)
else
return rc
end
end
-- send a PING to redis and print True/False for PONG as result
= function(client)
ping_pong print("PONG:")
print(client:command("PING") == hiredis.status.PONG)
end
-- push a few test keys simulating a writing producer
= function(client)
push_keys :command("LPUSH", "enqueued", "a")
rc:command("LPUSH", "enqueued", "b")
rc:command("LPUSH", "enqueued", "c")
rc:command("LPUSH", "enqueued", "d")
rcend
-- return the lua table that is the redis list, in full
= function(client, list)
get_list return client:command("LRANGE", list, 0, -1)
end
-- for the lua table t, print the key/value pairs (one level)
= function(t)
print_table for k,v in pairs(t) do
print(k, v)
end
end
-- wrap redis RPOPLPUSH
= function(q, p)
rpoplpush return rc:command("RPOPLPUSH", q, p)
end
-- wrap redis LREM
= function(tbl, key)
drop return rc:command("LREM", tbl, 1, key)
end
--
-- MAIN
= connect("127.0.0.1")
rc (rc)
ping_pong-- push some keys to the q
(rc)
push_keys-- print out those keys
print("LRANGE enqueued:")
= get_list(rc, "enqueued")
q (q)
print_table-- RPOPLPUSH one key over to processing
= rpoplpush("enqueued", "processing")
pop print("pop the queue, now processing: " .. pop)
-- retrieve and print the two lists as they are now..
= get_list(rc, "enqueued")
q print("queue is now:")
(q)
print_table--
= get_list(rc, "processing")
p print("processing is now:")
(p)
print_table-- ok, we're done with the key, let's drop it
print("done with:")
print(pop)
print("drop from processing..")
print(drop("processing", pop))
-- retrieve and print the two lists as they are now..
= get_list(rc, "enqueued")
q print("queue:")
(q)
print_table--
= get_list(rc, "processing")
p print("processing:")
(p)
print_table-- goodbye redis
:close() rc
This is our first time defining functions, they are in the form:
= function(args)
foo
stmtend
Here is our Makefile
:
build:
docker build --tag=push-pop:7 --rm=true .
run:
docker run -d --name redis --net host -p 127.0.0.1:6379:6379 redis:alpine
docker run -it --rm --net host --entrypoint /usr/local/openresty/luajit/bin/luajit push-pop:7 queue-test.lua
dev:
docker run -it --rm -v `pwd`:/src push-pop:7
dev-redis:
docker run -d --name redis --net host -p 127.0.0.1:6379:6379 redis:alpine
docker run --rm -it --name push-pop --net host -v `pwd`:/src push-pop:7
clean:
docker stop redis || true
docker stop push-pop || true
docker rm redis || true
docker rm push-pop || true
Here is our Dockerfile
:
FROM openresty/openresty:alpine-fat
RUN /usr/local/openresty/luajit/bin/luarocks install lua-hiredis
ADD *.lua /src/
WORKDIR /src/
ENTRYPOINT /bin/sh
Let’s build the image:
ᐅ make build
docker build --tag=push-pop:7 --rm=true .
Sending build context to Docker daemon 18.94 kB
Step 1 : FROM openresty/openresty:alpine-fat
---> 366babf2b04d
Step 2 : RUN /usr/local/openresty/luajit/bin/luarocks install lua-hiredis
---> Using cache
---> 9f523055586e
Step 3 : ADD *.lua /src/
---> ffd368d4359d
Removing intermediate container cd9f8b513598
Step 4 : WORKDIR /src/
---> Running in bc6b44a78bcc
---> df60616db41a
Removing intermediate container bc6b44a78bcc
Step 5 : ENTRYPOINT /bin/sh
---> Running in 5eca9952a6c1
---> f57567ee339a
Removing intermediate container 5eca9952a6c1
Successfully built f57567ee339a
Run the quick tests:
ᐅ make run
docker run -d --name redis --net host -p 127.0.0.1:6379:6379 redis:alpine
f898eff2d7ff61368757992277e58e427d70504eb2f4f128e324bcc8d4255c43
docker run -it --rm --net host --entrypoint /usr/local/openresty/luajit/bin/luajit push-pop:7 queue-test.lua
PONG:
true
LRANGE enqueued:
1 d
2 c
3 b
4 a
pop the queue, now processing: a
queue is now:
1 d
2 c
3 b
processing is now:
1 a
done with:
a
drop from processing..
1
queue:
1 d
2 c
3 b
processing:
OK, Let’s get more serious..
We’ll need three pieces to this puzzle:
- redis - docker image, easy
- webapp (nginx/openresty) - accepts POST and writes to redis
- worker (stand-alone Lua script) - attempts to pop from queue and process data
The worker’s logic would look like:
while true
item = RPOPLPUSH(q, p)
process(item)
donedrop(item)
sleep(delay)
To run tests on this stack, we will also want a 4th component, a producer.lua
that fills redis with some keys for the worker to process.
Having run the simpler tests in this exercise, we can now complete the primary goals for this exercise:
- connect to redis
- watch the queue
- process an item when one is available (print it)
- site idle while there are no items on the queue
- It should be easy to load new values onto the queue
Let’s start with the Dockerfile
:
FROM openresty/openresty:alpine-fat
ENV REDIS_HOST 127.0.0.1
RUN /usr/local/openresty/luajit/bin/luarocks install lua-hiredis
RUN /usr/local/openresty/luajit/bin/luarocks install lua-cjson
ADD *.lua /src/
WORKDIR /src/
ENTRYPOINT /bin/sh
The Makefile
:
build:
docker build --tag=sink:7 --rm=true .
run:
docker run -d --name redis --net host -p 127.0.0.1:6379:6379 redis:alpine
docker run -d --name sink --net host --entrypoint /usr/local/openresty/luajit/bin/luajit sink:7 worker.lua
dev:
docker run -it --rm --entrypoint /bin/sh -v `pwd`:/src sink:7
dev-redis:
docker run -d --name redis --net host -p 127.0.0.1:6379:6379 redis:alpine
docker run --rm -it --name sink --net host --entrypoint /bin/sh -v `pwd`:/src sink:7
clean:
docker stop redis || true
docker stop sink || true
docker rm redis || true
docker rm sink || true
logs:
docker logs -f sink
cat-posts:
docker exec -it redis redis-cli -c LRANGE enqueued 0 -1
load-redis:
docker exec sink /usr/local/openresty/luajit/bin/luajit /src/producer.lua
app-shell:
docker exec -it sink /bin/sh
redis-shell:
docker exec -it redis redis-cli
Here is our producer.lua
:
local cjson = require "cjson"
local redis = require "hiredis"
-- names for our lists in redis
local q = "enqueued"
-- return redis client, or fail and exit
connect = function (host)
local rc, err, err_code = hiredis.connect(host, 6379)
if not rc then
print("failed to connect to redis..")
print("error: " .. err)
print("code: " .. err_code)
os.exit(1)
else
return rc
end
end
-- push a key to the queue
= function(key)
enqueue :command("LPUSH", q, key)
rcend
--
-- MAIN
= connect(os.getenv("REDIS_HOST"))
rc assert(rc)
for l=1, 100000
do
print("enqueue: " .. l)
= "%m-%d-%Y--%H-%M-%S"
date_fmt (cjson.encode({timestamp = os.date(date_fmt), msg = "hi! this is " .. l}))
enqueue= l + 1
l end
..and the worker.lua
:
local cjson = require "cjson"
-- install lua-hiredis
local redis = require "hiredis"
-- names for our lists in redis
local q = "enqueued"
local p = "processing"
local delay = 0.001
-- return redis client, or fail and exit
connect = function (host)
local rc, err, err_code = hiredis.connect(host, 6379)
if not rc then
print("failed to connect to redis..")
print("error: " .. err)
print("code: " .. err_code)
os.exit(1)
else
return rc
end
end
-- retrieve work from redis, store it in "processing" table
= function()
get_work return rc:command("RPOPLPUSH", q, p)
end
-- "do" the work
= function (i)
process print(i)
end
-- work is done, drop it from the processing table
= function (i)
dondrop return rc:command("LREM", p, 1, i)
end
-- pause for a moment..
-- could also use socket.sleep(sec) from the "socket" library
= function(t)
sleep os.execute("sleep " .. tonumber(t))
end
--
-- MAIN
= connect(os.getenv("REDIS_HOST"))
rc assert(rc)
-- loop doing work until you can't
while true do
, err, code = get_work(q, p)
itemif item.name == "NIL" then
-- pass
else
--print("got item!")
(item)
process(item)
dondrop(delay)
sleepend
end
:close() rc
Let’s build the docker image:
ᐅ make build
docker build --tag=sink:7 --rm=true .
Sending build context to Docker daemon 69.12 kB
Step 1 : FROM openresty/openresty:alpine-fat
---> 366babf2b04d
Step 2 : ENV REDIS_HOST 127.0.0.1
---> Using cache
---> de5f965dde03
Step 3 : RUN /usr/local/openresty/luajit/bin/luarocks install lua-hiredis
---> Using cache
---> 7c775cdb7262
Step 4 : RUN /usr/local/openresty/luajit/bin/luarocks install lua-cjson
---> Using cache
---> ae1368fbc83c
Step 5 : ADD *.lua /src/
---> df004e1f873f
Removing intermediate container f99ad0182f1e
Step 6 : WORKDIR /src/
---> Running in c83514aa60cf
---> 532d74acb351
Removing intermediate container c83514aa60cf
Step 7 : ENTRYPOINT /bin/sh
---> Running in 779030fb4864
---> f487b5262d67
Removing intermediate container 779030fb4864
Successfully built f487b5262d67
Run redis and the worker:
ᐅ make run
docker run -d --name redis --net host -p 127.0.0.1:6379:6379 redis:alpine
57f9820ed7091761dddfd2547391c6509e4c065d36ef0f2c989ff00fdc4e8950
docker run -d --name sink --net host --entrypoint /usr/local/openresty/luajit/bin/luajit sink:7 worker.lua
2bf77a4d780c516d6685c5c1ef6c8e5a1b7be9ec07ae7c1d6a8618bd0cccde68
We should see them with docker ps
:
ᐅ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
2bf77a4d780c sink:7 "/usr/local/openresty" 32 seconds ago Up 32 seconds sink
57f9820ed709 redis:alpine "docker-entrypoint.sh" 32 seconds ago Up 32 seconds redis
In one shell/terminal, watch the logs..
ᐅ make logs
docker logs -f sink
Load up redis with a bunch of data (100k keys):
ᐅ make load-redis | tail
enqueue: 99991
enqueue: 99992
enqueue: 99993
enqueue: 99994
enqueue: 99995
enqueue: 99996
enqueue: 99997
enqueue: 99998
enqueue: 99999
enqueue: 100000
As soon as that starts, you should see log activity from the worker, something like:
...
{"timestamp":"03-05-2017--03-06-08","msg":"hi! this is 99995"}
{"timestamp":"03-05-2017--03-06-08","msg":"hi! this is 99996"}
{"timestamp":"03-05-2017--03-06-08","msg":"hi! this is 99997"}
{"timestamp":"03-05-2017--03-06-08","msg":"hi! this is 99998"}
{"timestamp":"03-05-2017--03-06-08","msg":"hi! this is 99999"}