import * as React from 'react'
  /* @jsx mdx */
import { mdx } from '@mdx-js/react';
/* @jsxRuntime classic */

/* @jsx mdx */

import DefaultLayout from "/home/runner/work/ziggurat-web/ziggurat-web/node_modules/gatsby-theme-docz/src/base/Layout.js";
export const _frontmatter = {};
const layoutProps = {
  _frontmatter
};
const MDXLayout = DefaultLayout;
export default function MDXContent({
  components,
  ...props
}) {
  return <MDXLayout {...layoutProps} {...props} components={components} mdxType="MDXLayout">


    <h1 {...{
      "id": "retries-and-queues"
    }}>{`Retries and Queues`}</h1>
    <hr></hr>
    <h2 {...{
      "id": "content"
    }}>{`Content`}</h2>
    <ol>
      <li parentName="ol"><a parentName="li" {...{
          "href": "#introduction"
        }}>{`Introduction`}</a></li>
      <li parentName="ol"><a parentName="li" {...{
          "href": "#topology-of-queues"
        }}>{`Topology of the Queues`}</a></li>
      <li parentName="ol"><a parentName="li" {...{
          "href": "#queue-stats"
        }}>{`Queue Stats`}</a></li>
      <li parentName="ol"><a parentName="li" {...{
          "href": "#peaking-replaying-and-deleting-deadset"
        }}>{`Peaking & Replaying DeadSet`}</a></li>
      <li parentName="ol"><a parentName="li" {...{
          "href": "#what-happens-when-the-actor-goes-down"
        }}>{`What happens when the actor goes down`}</a></li>
    </ol>
    <h2 {...{
      "id": "introduction"
    }}>{`Introduction`}</h2>
    <p>{`When processing a stream of events, there are many times where the processing might fail.
Either because of the issue in the function logic or some issue downstream.`}</p>
    <p>{`In the first case, we don't want to lose the message. In second, we
possibly want to retry the message and expect that the downstream
service might be down intermittently.`}</p>
    <p>{`This acts as a foundation of the requirement of a queue to fall back to.`}</p>
    <p>{`Ziggurat provides Retry-As-A-Service, which means your messages
automatically get queued on failures.`}</p>
    <p>{`Let's take a deep dive`}</p>
    <h2 {...{
      "id": "topology-of-queues"
    }}>{`Topology of Queues`}</h2>
    <p>{`Every application using ziggurat creates 3 dedicated queues in Rabbitmq:`}</p>
    <ol>
      <li parentName="ol">
        <p parentName="li"><strong parentName="p">{`Delayed Queue:`}</strong>{`
When an error occurs for a message in the mapper-function. The message is put in the delay queue with a TTL (can be set in the config).
They wait in the queue until the TTL expires. Then they are put into the instant queue.`}</p>
      </li>
      <li parentName="ol">
        <p parentName="li"><strong parentName="p">{`Instant Queue:`}</strong>{`
The retry logic in ziggurat reads messages from the instant queue and retries them.
If the retry fails again it puts the message back into the delay queue with the predefined TTL.
This happens 3(or however many times the RETRY_COUNT config defines it). If the message still does not succeed it is put into the Dead-set queue.`}</p>
      </li>
    </ol>
    <p><strong parentName="p">{`This results in a linear backoff for the retires of the same message.`}</strong></p>
    <ol {...{
      "start": 3
    }}>
      <li parentName="ol"><strong parentName="li">{`Dead Set Queue:`}</strong>{`
Messages are put into the dead set when they fail to successfully process even after however many times the RETRY_COUNT defines it.
You can retry messages from the dead set queue by making an API call to the actor.`}</li>
    </ol>
    <h2 {...{
      "id": "peaking-replaying-and-deleting-deadset"
    }}>{`Peaking, Replaying and Deleting DeadSet`}</h2>
    <p>{`DeadSet is the last place a message can reach in its life cycle because of failures.
Ziggurat does not automatically retry the messages in the dead set as it could be due to a bug in the mapper-function and the
user can trigger the retry of messages once the problem has been fixed.`}</p>
    <p>{`There are three built-in APIs to view and trigger retry on the DeadSet:`}</p>
    <p>{`  Assuming your stream routes are defined as follows:`}</p>
    <pre><code parentName="pre" {...{
        "className": "language-clojure"
      }}>{`  (ziggurat/main start-fn stop-fn {:stream-id-1 {:handler-fn main-fn}
                                   :stream-id-2 {:handler-fn (fn [] :outbound-channel)
                                                 :outbound-channel main-fn}})
`}</code></pre>
    <ol>
      <li parentName="ol">{`Peek: A GET API where you can fetch `}<inlineCode parentName="li">{`N`}</inlineCode>{` messages from the deadset to see which message went in.`}</li>
    </ol>
    <ul>
      <li parentName="ul">{`For multi-stream applications (that read from multiple kafka streams)`}
        <pre parentName="li"><code parentName="pre" {...{}}>{`curl -X GET \\
  'http://localhost:8010/v1/dead_set?count=10&topic-entity=stream-id-1'
`}</code></pre>
      </li>
      <li parentName="ul">{`For multi-stream applications with channels`}
        <pre parentName="li"><code parentName="pre" {...{}}>{`curl -X GET \\
  'http://localhost:8010/v1/dead_set?count=10&topic-entity=stream-id-2&channel=outbound-channel'
`}</code></pre>
        {`where`}</li>
      <li parentName="ul"><inlineCode parentName="li">{`10`}</inlineCode>{` is the number of messages you wish to view`}</li>
      <li parentName="ul"><inlineCode parentName="li">{`8010`}</inlineCode>{` is the port that is listening to the HTTP requests`}</li>
      <li parentName="ul"><inlineCode parentName="li">{`stream-id-*`}</inlineCode>{` is the topic entity of your actor`}</li>
      <li parentName="ul"><inlineCode parentName="li">{`outbound-channel`}</inlineCode>{` is the channel on topic-entity `}<inlineCode parentName="li">{`stream-id-2`}</inlineCode></li>
    </ul>
    <ol {...{
      "start": 2
    }}>
      <li parentName="ol">{`Replay: A POST API where you can select `}<inlineCode parentName="li">{`N`}</inlineCode>{` messages to be retried from the DeadSet.`}</li>
    </ol>
    <ul>
      <li parentName="ul">{`For multi-stream applications (that read from multiple kafka streams)`}
        <pre parentName="li"><code parentName="pre" {...{}}>{`curl -X POST \\
  http://localhost:8010/v1/dead_set/replay \\
  -H 'content-type: application/json' \\
  -d '{"count":"10", "topic_entity":"stream-id-1"}'
`}</code></pre>
      </li>
      <li parentName="ul">{`For multi-stream applications with channels`}
        <pre parentName="li"><code parentName="pre" {...{}}>{`curl -X POST \\
  http://localhost:8010/v1/dead_set/replay \\
  -H 'content-type: application/json' \\
  -d '{"count":"10", "topic_entity":"stream-id-2", "channel": "outbound-channel"}'
`}</code></pre>
        {`where`}</li>
      <li parentName="ul"><inlineCode parentName="li">{`10`}</inlineCode>{` is the number of messages you wish to retry`}</li>
      <li parentName="ul"><inlineCode parentName="li">{`8010`}</inlineCode>{` is the port that is listening to the HTTP requests`}</li>
      <li parentName="ul"><inlineCode parentName="li">{`stream-id-*`}</inlineCode>{` is the topic entity of your actor`}</li>
      <li parentName="ul"><inlineCode parentName="li">{`outbound-channel`}</inlineCode>{` is the channel on topic-entity `}<inlineCode parentName="li">{`stream-id-2`}</inlineCode></li>
    </ul>
    <ol {...{
      "start": 3
    }}>
      <li parentName="ol">{`Delete: A DELETE API where you delete the last `}<inlineCode parentName="li">{`N`}</inlineCode>{` messages from the
DeadSet queue.`}</li>
    </ol>
    <ul>
      <li parentName="ul">
        <p parentName="li"><em parentName="p">{`This feature is available in ziggurat version >= `}<inlineCode parentName="em">{`2.8.0`}</inlineCode></em></p>
      </li>
      <li parentName="ul">
        <p parentName="li">{`For deleting messages from an actor without channels`}</p>
        <pre parentName="li"><code parentName="pre" {...{}}>{`curl -X DELETE \\
http://localhost:8010/v1/dead_set \\
-H 'content-type: application/json' \\
-d '{"count":"10", "topic-entity":"booking"}'
`}</code></pre>
      </li>
      <li parentName="ul">
        <p parentName="li">{`For deleteing messages from an actor with channels`}</p>
        <pre parentName="li"><code parentName="pre" {...{}}>{`curl -X DELETE \\
http://localhost:8010/v1/dead_set \\
-H 'content-type: application/json' \\
-d '{"count":"10", "topic-entity":"booking", "channel": "dss-outbound"}'
`}</code></pre>
        <p parentName="li">{`where`}</p>
        <ul parentName="li">
          <li parentName="ul"><inlineCode parentName="li">{`10`}</inlineCode>{` is the number of messages you wish to retry`}</li>
          <li parentName="ul"><inlineCode parentName="li">{`8010`}</inlineCode>{` is the port that is listening to the HTTP requests`}</li>
          <li parentName="ul"><inlineCode parentName="li">{`booking`}</inlineCode>{` is the topic entity of your actor`}</li>
          <li parentName="ul"><inlineCode parentName="li">{`dss-outbound`}</inlineCode>{` is the channel on topic-entity `}<inlineCode parentName="li">{`booking`}</inlineCode></li>
        </ul>
      </li>
    </ul>
    <p><strong parentName="p">{`You cannot get/replay/delete a specific message`}</strong></p>
    <h2 {...{
      "id": "what-happens-when-the-actor-goes-down"
    }}>{`What happens when the actor goes down`}</h2>
    <p>{`When the actor goes down all the processing stops. So there won't be any new messages getting read from Kafka.
No new messages will come into any of the queues.`}</p>

    </MDXLayout>;
}
;
MDXContent.isMDXComponent = true;
      