Ziggurat
Edit page
HomeGetting StartedConceptsMiddlewareRetries and QueuesStreaming Frameworks ComparisonUpgrade GuideRelease Notes
Use-Cases

Getting Started


Before we get started with writing a service using Ziggurat, We need to ensure the dependencies are in place. You will need:

  1. Clojure ( >= v1.10.0)

    • You can follow Clojure Documentation for installing clojure
  2. Leiningen

  3. Kafka and Zookeeper

  4. Rabbitmq


In this tutorial we'll make a service that turns on (or off) the air-conditioner by processing a stream of temperature readings. Lets call the service inspector-cool. So what we need the service to do is

  • read the temperature stream from kafka
  • check the temperature against some threshold temperatures and maintain the temperature of AC accordingly.

Lets create a service with leiningen and cd into the service. Leiningen scaffolds the code with a structure as shown below.

> lein new app inspector_cool
> cd inspector_cool
> tree .
├── CHANGELOG.md
├── LICENSE
├── README.md
├── doc
│ └── intro.md
├── project.clj
├── resources
├── src
│ └── inspector_cool
│ └── core.clj
└── test
└── inspector_cool
└── core_test.clj
6 directories, 7 files

The first thing that we'll do is add ziggurat as the dependency in our project.clj. Add [tech.gojek/ziggurat "2.7.2"] to the dependencies vector in project.clj. Your project.clj should resemble this

(defproject inspector_cool "0.1.0-SNAPSHOT"
:description "FIXME: write description"
:url "http://example.com/FIXME"
:license {:name "EPL-2.0 OR GPL-2.0-or-later WITH Classpath-exception-2.0"
:url "https://www.eclipse.org/legal/epl-2.0/"}
:dependencies [[org.clojure/clojure "1.9.0"]
[tech.gojek/ziggurat "2.7.2"]]
:main ^:skip-aot inspector-cool.core
:target-path "target/%s"
:profiles {:uberjar {:aot :all}})

Note: The latest version of ziggurat can be fetched from clojars.

Now that the project.clj is in place, lets fetch the dependencies

lein deps

Now lets start reading the temperature data from kafka. To do that we'll first need to edit the config to point our service to the correct kafka brokers and the proto class to correctly dereserialize the message. Create a file config.edn and add this to it.

{:ziggurat {:app-name "inspector_cool"
:env [:dev :keyword]
:stream-router {:temperature {:application-id "inspector_cool_test"
:bootstrap-servers "localhost:6667"
:stream-threads-count [1 :int]
:origin-topic "temperature-stream"
:proto-class "temperature-proto"}}}}

Here :proto-class is the the class-name of the protobuf that is used to serialize the messages in the topic. We are working on the feature to add support for other methods of serialization as well. This issue is tracking the updates

With the config in place, we can now define the mapper-function (the function that will be applied on every message that is read from the stream). In the core.clj file, write this down

(ns inspector-cool.core
(:require [ziggurat.init :as ziggurat]))
(defn start-function []
)
(defn stop-function []
)
(defn mapper-function [message]
(println message))
(defn -main
[& args]
(ziggurat/main start-function stop-function {:temperature {:handler-fn mapper-function}}))

Notice how we are passing functions to the ziggurat interface on the last line in core.clj. The start and stop functions are used for state-management (initializing and graceful shutdown of the application) and the mapper-function is mapped over every message the the application recieves from the stream. So in this case we are printing all the messages that we get from kafka onto the console.

The start-function can be used to initialize connection pools for api-calls or databases, and the stop function will then release these connections (by shutting down the connection pool).

With the config and core.clj in place lets see if we are able to read messages off of kafka. Run

lein run

This should initialize the streams and start printing the messages from kafka onto the console.

Now that we have successfully started reading the messages from kafka, lets write the logic turning the Air conditioner on (or off) based on some threshold temperatures. Lets write down the logic in core.clj for the same.

(ns inspector-cool.core
(:require [ziggurat.init :as ziggurat]
[ziggurat.config :refer [config]]
[inspector-cool.aircon :as aircon]))
(defn start-function []
)
(defn stop-function []
)
(defn mapper-function [message]
(cond
(< (:temperature message) (:temperature-threshold-low config)) (aircon/switch-on)
(> (:temperature message) (:temperature-threshold-high config)) (aircon/switch-off)
:else :skip))
(defn -main
[& args]
(ziggurat/main start-function stop-function {:temperature {:handler-fn mapper-function}}))

Note that we are returning :skip in the :else condition in the cond. Ziggurat expects the mapper-function to return any of the keywords :success, :skip, :retry. So the functions aircon/switch-on and aircon/switch-off should return :success when the api-call succeeds. If the API call fails, it should return :retry, so the message will be retried. The logic as to how the messages are retried is out of the scope of this guide.

aircon/switch-on and aircon/switch-off are basic http call implementations that will make the api-call to the airconditioner.

And yes, if you are wondering how the config will be defined for temperature-threshold-low and temperature-threshold-high, here is what the config.edn looks like.

{:ziggurat {:app-name "inspector_cool"
:env [:dev :keyword]
:stream-router {:temperature {:application-id "inspector_cool_test"
:bootstrap-servers "localhost:6667"
:stream-threads-count [1 :int]
:origin-topic "temperature-stream"
:proto-class "temperature-proto"}}}
:temperature-threshold {:low 18
:high 25}}