Event Store logo

Menu

Documentation

Show Table of Contents

Competing Consumers

This document walks through the HTTP API for setting up and consuming competing consumer subscription groups. For an overview on competing consumers and how they relate to other subscription types please see the overview document.

The Administration UI includes a Competing Consumers section where a user is able to create, update, delete and view subscriptions and their statusses.

Creating a Persistent Subscription

The first thing to do before attempting to interact with a subscription group is to create one. You will receive an error if you attempt to create a subscription group more than once. This requires admin permissions.

URI Supported Content Types Method
/subscriptions/{stream}/{subscription_name} application/json PUT

Query Parameters

Parameter Description
stream The stream to the persistent subscription is on.
subscription_name The name of the subscription group.

Body

Parameter Description
resolveLinktos Tells the subscription to resolve link events.
startFrom Start the subscription from the position-th event in the stream.
extraStatistics Tells the backend to measure timings on the clients so statistics will contain histograms of them.
checkPointAfterMilliseconds The amount of time the system should try to checkpoint after.
liveBufferSize The size of the live buffer (in memory) before resorting to paging.
readBatchSize The size of the read batch when in paging mode.
bufferSize The number of messages that should be buffered when in paging mode.
maxCheckPointCount The maximum number of messages not checkpointed before forcing a checkpoint.
maxRetryCount Sets the number of times a message should be retried before being considered a bad message.
maxSubscriberCount Sets the maximum number of allowed subscribers
messageTimeoutMilliseconds Sets the timeout for a client before the message will be retried.
minCheckPointCount The minimum number of messages to write a checkpoint for.
namedConsumerStrategy RoundRobin/DispatchToSingle/Pinned

Updating a Persistent Subscription

You can edit the settings of an existing subscription while it is running. This will however drop the current subscribers and will reset the subscription internally. This requires admin permissions.

URI Supported Content Types Method
/subscriptions/{stream}/{subscription_name} application/json POST

Query Parameters

Parameter Description
stream The stream to the persistent subscription is on.
subscription_name The name of the subscription group.

Body

Same parameters as Creating a Persistent Subscription

Deleting a Persistent Subscription

URI Supported Content Types Method
/subscriptions/{stream}/{subscription_name} application/json DELETE

Query Parameters

Parameter Description
stream The stream to the persistent subscription is on.
subscription_name The name of the subscription group.

Reading a stream via a Persistent Subscription

By default, reading a stream via a persistent subscription will return a single event per request and will not embed the event properties as part of the response.

URI Supported Content Types Method
/subscriptions/{stream}/{subscription_name} /subscriptions/{stream}/{subscription_name}?embed={embed} /subscriptions/{stream}/{subscription}/{count}?embed={embed} application/vnd.eventstore.competingatom+xml application/vnd.eventstore.competingatom+json GET

Query Parameters

Parameter Description
stream The stream to the persistent subscription is on.
subscription_name The name of the subscription group.
count How many events to return for the request.
embed None, Content, Rich, Body, PrettyBody, TryHarder

See the Reading Streams for information regarding the different embed levels

Response

{
  "title": "All Events Persistent Subscription",
  "id": "http://localhost:2113/subscriptions/newstream/competing_consumers_group1",
  "updated": "2015-12-02T09:17:48.556545Z",
  "author": {
    "name": "EventStore"
  },
  "headOfStream": false,
  "links": [
    {
      "uri": "http://localhost:2113/subscriptions/newstream/competing_consumers_group1/ack%3Fids=c322e299-cb73-4b47-97c5-5054f920746f",
      "relation": "ackAll"
    },
    {
      "uri": "http://localhost:2113/subscriptions/newstream/competing_consumers_group1/nack%3Fids=c322e299-cb73-4b47-97c5-5054f920746f",
      "relation": "nackAll"
    },
    {
      "uri": "http://localhost:2113/subscriptions/newstream/competing_consumers_group1/1%3Fembed=None",
      "relation": "previous"
    },
    {
      "uri": "http://localhost:2113/subscriptions/newstream/competing_consumers_group1",
      "relation": "self"
    }
  ],
  "entries": [
    {
      "title": "1@newstream",
      "id": "http://localhost:2113/streams/newstream/1",
      "updated": "2015-12-02T09:17:48.556545Z",
      "author": {
        "name": "EventStore"
      },
      "summary": "SomeEvent",
      "links": [
        {
          "uri": "http://localhost:2113/streams/newstream/1",
          "relation": "edit"
        },
        {
          "uri": "http://localhost:2113/streams/newstream/1",
          "relation": "alternate"
        },
        {
          "uri": "http://localhost:2113/subscriptions/newstream/competing_consumers_group1/ack/c322e299-cb73-4b47-97c5-5054f920746f",
          "relation": "ack"
        },
        {
          "uri": "http://localhost:2113/subscriptions/newstream/competing_consumers_group1/nack/c322e299-cb73-4b47-97c5-5054f920746f",
          "relation": "nack"
        }
      ]
    }
  ]
}

Acknowledgements

Clients must acknowledge (or not acknowledge) messages in the competing consumer model. If the client fails to respond in the given timeout period, the message will be retried.

Note that you should be using the rel links in the feed for acknowledgements (not bookmark uris as they are subject to change in future versions. EG:

{
  "uri": "http://localhost:2113/subscriptions/newstream/competing_consumers_group1/ack/c322e299-cb73-4b47-97c5-5054f920746f",
  "relation": "ack"
},

Ack multiple messages

URI Supported Content Types Method
/subscriptions/{stream}/{subscription_name}/ack?ids={messageids} application/json POST

Query Parameters

Parameter Description
stream The stream to the persistent subscription is on.
subscription_name The name of the subscription group.
messageids The ids of the messages that needs to be acked

Ack a single message

URI Supported Content Types Method
/subscriptions/{stream}/{subscription_name}/ack/{messageid} application/json POST

Query Parameters

Parameter Description
stream The stream to the persistent subscription is on.
subscription_name The name of the subscription group.
messageid The id of the message that needs to be acked

Nack multiple messages

URI Supported Content Types Method
/subscriptions/{stream}/{subscription_name}/nack?ids={messageids}?action={action} application/json POST

Query Parameters

Parameter Description
stream The stream to the persistent subscription is on.
subscription_name The name of the subscription group.
action
  • Park: Don’t retry the message, park it until a request is sent to replay the parked messages
  • Retry: Retry the message
  • Skip: Discard the message
  • Stop: Stop the subscription
messageid The id of the message that needs to be acked

Nack a single message

URI Supported Content Types Method
/subscriptions/{stream}/{subscription_name}/nack/{messageid}?action={action} application/json POST

Replaying parked messages

URI Supported Content Types Method
/subscriptions/{stream}/{subscription_name}/replayParked application/json POST

Getting information for all subscriptions

URI Supported Content Types Method
/subscriptions application/json POST

Response

[
  {
    "links": [
      {
        "href": "http://localhost:2113/subscriptions/newstream/competing_consumers_group1/info",
        "rel": "detail"
      }
    ],
    "eventStreamId": "newstream",
    "groupName": "competing_consumers_group1",
    "parkedMessageUri": "http://localhost:2113/streams/$persistentsubscription-newstream::competing_consumers_group1-parked",
    "getMessagesUri": "http://localhost:2113/subscriptions/newstream/competing_consumers_group1/1",
    "status": "Live",
    "averageItemsPerSecond": 0.0,
    "totalItemsProcessed": 0,
    "lastProcessedEventNumber": -1,
    "lastKnownEventNumber": 5,
    "connectionCount": 0,
    "totalInFlightMessages": 0
  },
  {
    "links": [
      {
        "href": "http://localhost:2113/subscriptions/another_newstream/competing_consumers_group1/info",
        "rel": "detail"
      }
    ],
    "eventStreamId": "another_newstream",
    "groupName": "competing_consumers_group1",
    "parkedMessageUri": "http://localhost:2113/streams/$persistentsubscription-another_newstream::competing_consumers_group1-parked",
    "getMessagesUri": "http://localhost:2113/subscriptions/another_newstream/competing_consumers_group1/1",
    "status": "Live",
    "averageItemsPerSecond": 0.0,
    "totalItemsProcessed": 0,
    "lastProcessedEventNumber": -1,
    "lastKnownEventNumber": -1,
    "connectionCount": 0,
    "totalInFlightMessages": 0
  }
]

Getting information about the subscriptions for a stream

URI Supported Content Types Method
/subscriptions/{stream} application/json GET

Response

[
  {
    "links": [
      {
        "href": "http://localhost:2113/subscriptions/newstream/competing_consumers_group1/info",
        "rel": "detail"
      }
    ],
    "eventStreamId": "newstream",
    "groupName": "competing_consumers_group1",
    "parkedMessageUri": "http://localhost:2113/streams/$persistentsubscription-newstream::competing_consumers_group1-parked",
    "getMessagesUri": "http://localhost:2113/subscriptions/newstream/competing_consumers_group1/1",
    "status": "Live",
    "averageItemsPerSecond": 0.0,
    "totalItemsProcessed": 0,
    "lastProcessedEventNumber": -1,
    "lastKnownEventNumber": 5,
    "connectionCount": 0,
    "totalInFlightMessages": 0
  }
]

Getting information about a specific subscription

URI Supported Content Types Method
/subscriptions/{stream}/{subscription_name}/info application/json GET

Response

{
  "links": [
    {
      "href": "http://localhost:2113/subscriptions/newstream/competing_consumers_group1/info",
      "rel": "detail"
    },
    {
      "href": "http://localhost:2113/subscriptions/newstream/competing_consumers_group1/replayParked",
      "rel": "replayParked"
    }
  ],
  "config": {
    "resolveLinktos": false,
    "startFrom": 0,
    "messageTimeoutMilliseconds": 10000,
    "extraStatistics": false,
    "maxRetryCount": 10,
    "liveBufferSize": 500,
    "bufferSize": 500,
    "readBatchSize": 20,
    "preferRoundRobin": true,
    "checkPointAfterMilliseconds": 1000,
    "minCheckPointCount": 10,
    "maxCheckPointCount": 500,
    "maxSubscriberCount": 10,
    "namedConsumerStrategy": "RoundRobin"
  },
  "eventStreamId": "newstream",
  "groupName": "competing_consumers_group1",
  "status": "Live",
  "averageItemsPerSecond": 0.0,
  "parkedMessageUri": "http://localhost:2113/streams/$persistentsubscription-newstream::competing_consumers_group1-parked",
  "getMessagesUri": "http://localhost:2113/subscriptions/newstream/competing_consumers_group1/1",
  "totalItemsProcessed": 0,
  "countSinceLastMeasurement": 0,
  "lastProcessedEventNumber": -1,
  "lastKnownEventNumber": 5,
  "readBufferCount": 6,
  "liveBufferCount": 5,
  "retryBufferCount": 0,
  "totalInFlightMessages": 0,
  "connections": []
}

Event Store docs are hosted on GitHub. The repository is public and it’s open to issues and pull requests. Contributions, corrections and feedback are all welcome.