Message Brokers
Apache Kafka
Convoy supports ingesting events from Kafka topics. This is currently only supported in outgoing projects.
Connecting to your Kafka topic
To connect your Kafka topic, you need to supply:
- The kafka cluster hostname and port (e.g., localhost:9092)
- The topic name (e.g., audit-log-events)
- The number of workers — the number of partitions for the topic should match the number of workers.
- The consumer group id —it is recommended that you set one— which by default this is set to an empty string.
- Supply authentication configured on the cluster
Authentication
- Auth types: PLAIN and SCRAM
- Hash types: SHA512 and SHA256
- TLS: Enabled and Disabled
Ingestion Options
There are two ways to ingest events into Convoy from your Kafka topic:
-
Format the payload using the structure below as a guide and write it to your topic.
reference payload{ "event_type": "string, required", "data": "object, required", "custom_headers": { // optional "x-convoy-message-type": "single", "sample-header": "sample-value" }, "idempotency_key": "string, optional", "owner_id": "string, optional", // if included, the event is sent to all endpoints with the owner-id "endpoint_id": "string, optional" // if included, the event is sent to a single endpoint }
If
x-convoy-message-type
set to broadcast, the event will be sent to all endpoints in the project, ignoring both theendpoint_id
andowner_id
values.For a full list of reference payloads, see our guide on ingesting events
-
Send your arbitrarily formatted event payloads to your topic and use Convoy’s transform functions to mutate them at the point of ingestion.
transform functionfunction transform(payload) { return { "endpoint_id": "", "owner_id": "", "event_type": "sample", "data": payload, // payload contains arbitrary message data read from your topic "custom_headers": { "sample-header": "sample-value" }, "idempotency_key": "" } }
Things to note
- Messages are read of the topic one-by-one by each worker reading off a partition.
- Acknowledgements are done per message.
- Ingestion is rate-limited (50 per second), and this can be configured by setting the
CONVOY_INGEST_RATE
environment variable.
Was this page helpful?