Getting Started
Before we get started with writing a service using Ziggurat, We need to ensure the dependencies are in place. You will need:
Clojure ( >= v1.10.0)
- You can follow Clojure Documentation for installing clojure
Leiningen
- You can follow Leiningen Documentation for installing leiningen
Kafka and Zookeeper
- You can follow Kafka Getting Started for starting kafka brokers and zookeeper instances
Rabbitmq
- You can follow the Rabbitmq Installation Guide for installing Rabbitmq
In this tutorial we'll make a service that turns on (or off) the air-conditioner by processing a stream of temperature readings. Lets call the service inspector-cool
.
So what we need the service to do is
- read the temperature stream from kafka
- check the temperature against some threshold temperatures and maintain the temperature of AC accordingly.
Lets create a service with leiningen and cd into the service. Leiningen scaffolds the code with a structure as shown below.
> lein new app inspector_cool> cd inspector_cool> tree .├── CHANGELOG.md├── LICENSE├── README.md├── doc│ └── intro.md├── project.clj├── resources├── src│ └── inspector_cool│ └── core.clj└── test└── inspector_cool└── core_test.clj6 directories, 7 files
The first thing that we'll do is add ziggurat as the dependency in our project.clj. Add [tech.gojek/ziggurat "2.7.2"]
to the dependencies vector in project.clj. Your project.clj should resemble this
(defproject inspector_cool "0.1.0-SNAPSHOT":description "FIXME: write description":url "http://example.com/FIXME":license {:name "EPL-2.0 OR GPL-2.0-or-later WITH Classpath-exception-2.0":url "https://www.eclipse.org/legal/epl-2.0/"}:dependencies [[org.clojure/clojure "1.9.0"][tech.gojek/ziggurat "2.7.2"]]:main ^:skip-aot inspector-cool.core:target-path "target/%s":profiles {:uberjar {:aot :all}})
Note: The latest version of ziggurat can be fetched from clojars.
Now that the project.clj is in place, lets fetch the dependencies
lein deps
Now lets start reading the temperature data from kafka. To do that we'll first need to edit the config to point our service to the correct kafka brokers and the proto class to correctly dereserialize the message. Create a file config.edn
and add this to it.
{:ziggurat {:app-name "inspector_cool":env [:dev :keyword]:stream-router {:temperature {:application-id "inspector_cool_test":bootstrap-servers "localhost:6667":stream-threads-count [1 :int]:origin-topic "temperature-stream":proto-class "temperature-proto"}}}}
Here :proto-class
is the the class-name of the protobuf that is used to serialize the messages in the topic. We are working on the feature to add support for other methods of serialization as well. This issue is tracking the updates
With the config in place, we can now define the mapper-function (the function that will be applied on every message that is read from the stream). In the core.clj file, write this down
(ns inspector-cool.core(:require [ziggurat.init :as ziggurat]))(defn start-function [])(defn stop-function [])(defn mapper-function [message](println message))(defn -main[& args](ziggurat/main start-function stop-function {:temperature {:handler-fn mapper-function}}))
Notice how we are passing functions to the ziggurat interface on the last line in core.clj
. The start and stop functions are used for state-management (initializing and graceful shutdown of the application) and the mapper-function is mapped over every message the the application recieves from the stream. So in this case we are printing all the messages that we get from kafka onto the console.
The start-function can be used to initialize connection pools for api-calls or databases, and the stop function will then release these connections (by shutting down the connection pool).
With the config and core.clj in place lets see if we are able to read messages off of kafka. Run
lein run
This should initialize the streams and start printing the messages from kafka onto the console.
Now that we have successfully started reading the messages from kafka, lets write the logic turning the Air conditioner on (or off) based on some threshold temperatures. Lets write down the logic in core.clj for the same.
(ns inspector-cool.core(:require [ziggurat.init :as ziggurat][ziggurat.config :refer [config]][inspector-cool.aircon :as aircon]))(defn start-function [])(defn stop-function [])(defn mapper-function [message](cond(< (:temperature message) (:temperature-threshold-low config)) (aircon/switch-on)(> (:temperature message) (:temperature-threshold-high config)) (aircon/switch-off):else :skip))(defn -main[& args](ziggurat/main start-function stop-function {:temperature {:handler-fn mapper-function}}))
Note that we are returning :skip
in the :else
condition in the cond. Ziggurat expects the mapper-function to return any of the keywords :success, :skip, :retry
. So the functions aircon/switch-on
and aircon/switch-off
should return :success
when the api-call succeeds. If the API call fails, it should return :retry
, so the message will be retried. The logic as to how the messages are retried is out of the scope of this guide.
aircon/switch-on
and aircon/switch-off
are basic http call implementations that will make the api-call to the airconditioner.
And yes, if you are wondering how the config will be defined for temperature-threshold-low
and temperature-threshold-high
, here is what the config.edn looks like.
{:ziggurat {:app-name "inspector_cool":env [:dev :keyword]:stream-router {:temperature {:application-id "inspector_cool_test":bootstrap-servers "localhost:6667":stream-threads-count [1 :int]:origin-topic "temperature-stream":proto-class "temperature-proto"}}}:temperature-threshold {:low 18:high 25}}