Middleware
Ziggurat V3.0.0-alpha.3 (will be released in 3.0.0 stable version) introduced the support for middleware in Ziggurat.
In older versions of Ziggurat, the mapper-function passed to (ziggurat/init)
would get the message as a parameter which was an object of Clojure PersistentHashMap. Ziggurat assumed that all messages read from Kafka were serialized using Protobuf and it would thus deserialize the message internally.
As the userbase of Ziggurat has grown and the problem space people are trying to tackle with Ziggurat has widened, we realize that this was a wrong assumption and so from this release Ziggurat will just pass the serialized message directly to the mapper-function.
We have provided a default middleware that replicates the logic (deserialization of proto) and thus users can use and thus have backward compatibility.
Migration from 2.x - 3.x for middleware
All you have to do to migrate to the newer version is update the mapper-function. So, for example
Change
;; 2.x(defn main-fn[message](println message):success)(ziggurat/main start-fn stop-fn {:stream-id {:handler-fn main-fn}})
to
;; 3.x(defn main-fn[message](println message):success)(def handler-fn(-> main-fn(ziggurat.middleware.default/protobuf->hash protoClass :stream-id)))(ziggurat/main start-fn stop-fn {:stream-id {:handler-fn handler-fn}})
Defining your own Middleware
If you wish to define your own middleware function, ensure that the function follows these definition restrictions:
- The first argument should be a handler-function
- It should return a function that takes 1 argument: the message
An example of a middleware that just logs a message:
(defn wrap-log-message [handler-fn](fn [message](log/info message)(handler-fn message)))
Chaining Middleware
You can also chain multiple middleware functions to incorporate different behaviours. So for example if you wish to deserialize a message and then log it we can chain these middlewares on wrap your mapper-fn with them.
;; protobuf->hash is a default middleware that Ziggurat gives to deserialize proto(def handler-fn(-> mapper-fn(wrap-log-message)(ziggurat.middleware.default/protobuf->hash protoClass :stream-id)))