Kafka Module
The Kafka module provides integration with Apache Kafka, a distributed event streaming platform. This module allows you to manage Kafka brokers, topics, and messages in your Dagger pipelines.
Features
- Kafka broker management
- Topic creation and management
- Message production and consumption
- ZooKeeper integration
- Multi-broker support
- Custom configuration
- Security settings
- Monitoring capabilities
Installation
To use the Kafka module in your Dagger pipeline:
import (
"dagger.io/dagger"
"github.com/felipepimentel/daggerverse/libraries/kafka"
)
Usage Examples
Basic Kafka Broker Setup
func (m *MyModule) Example(ctx context.Context) (*Service, error) {
kafka := dag.Kafka().New()
// Start Kafka broker
return kafka.Broker(
ctx,
"2.13-3.5.0", // version
9092, // port
nil, // custom config
)
}
Topic Management
func (m *MyModule) ManageTopics(ctx context.Context) error {
kafka := dag.Kafka().New()
// Create and configure topics
return kafka.CreateTopic(
ctx,
"my-topic",
3, // partitions
2, // replication factor
map[string]string{
"cleanup.policy": "delete",
"retention.ms": "604800000",
},
)
}
Message Operations
func (m *MyModule) HandleMessages(ctx context.Context) error {
kafka := dag.Kafka().New()
// Produce and consume messages
return kafka.
ProduceMessage(
ctx,
"my-topic",
"key",
"value",
).
ConsumeMessages(
ctx,
"my-topic",
"my-group",
10, // max messages
)
}
GitHub Actions Integration
You can use this module in your GitHub Actions workflows:
name: Kafka Operations
on: [push]
jobs:
kafka:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Kafka
uses: dagger/dagger-action@v1
with:
module: github.com/felipepimentel/daggerverse/libraries/kafka
args: |
do -p '
kafka := Kafka().New()
broker := kafka.Broker(
ctx,
"2.13-3.5.0",
9092,
nil,
)
kafka.CreateTopic(
ctx,
"my-topic",
3,
2,
nil,
)
'
API Reference
Kafka
Main module struct that provides access to Kafka functionality.
Constructor
New() *Kafka
- Creates a new Kafka instance
- Default version: “2.13-3.5.0”
- Default platform: “linux/amd64”
Methods
Broker(ctx context.Context, version string, port int, config map[string]string) (*Service, error)
- Starts a Kafka broker
- Parameters:
version
: Kafka versionport
: Broker portconfig
: Custom broker configuration
CreateTopic(ctx context.Context, name string, partitions int, replicationFactor int, config map[string]string) error
- Creates a Kafka topic
- Parameters:
name
: Topic namepartitions
: Number of partitionsreplicationFactor
: Replication factorconfig
: Topic configuration
ProduceMessage(ctx context.Context, topic string, key string, value string) error
- Produces a message to a topic
- Parameters:
topic
: Target topickey
: Message keyvalue
: Message value
ConsumeMessages(ctx context.Context, topic string, group string, maxMessages int) error
- Consumes messages from a topic
- Parameters:
topic
: Source topicgroup
: Consumer groupmaxMessages
: Maximum messages to consume
Best Practices
- Broker Configuration
- Use appropriate replication factors
- Configure proper retention policies
- Monitor broker health
- Topic Management
- Plan partitioning strategy
- Set appropriate configs
- Monitor topic performance
- Message Handling
- Use consistent serialization
- Handle errors appropriately
- Monitor consumer lag
- Security
- Enable authentication
- Use encryption in transit
- Follow security best practices
Troubleshooting
Common issues and solutions:
- Connection Issues
Error: broker not available Solution: Check broker status and network connectivity
- Topic Creation Failures
Error: invalid replication factor Solution: Ensure replication factor <= number of brokers
- Message Problems
Error: message too large Solution: Check message size limits and broker configuration
Configuration Example
# server.properties
broker.id=0
listeners=PLAINTEXT://:9092
log.dirs=/tmp/kafka-logs
zookeeper.connect=localhost:2181
num.partitions=3
default.replication.factor=2
log.retention.hours=168
Advanced Usage
Multi-Broker Setup
func (m *MyModule) MultiNode(ctx context.Context) error {
kafka := dag.Kafka().New()
// Start multiple brokers
broker1 := kafka.Broker(ctx, "2.13-3.5.0", 9092, map[string]string{
"broker.id": "1",
})
broker2 := kafka.Broker(ctx, "2.13-3.5.0", 9093, map[string]string{
"broker.id": "2",
})
broker3 := kafka.Broker(ctx, "2.13-3.5.0", 9094, map[string]string{
"broker.id": "3",
})
return nil
}
Custom ZooKeeper Configuration
func (m *MyModule) CustomZK(ctx context.Context) (*Service, error) {
kafka := dag.Kafka().New()
// Configure with custom ZooKeeper
return kafka.WithZooKeeper(
ctx,
"3.8.0", // ZK version
2181, // ZK port
map[string]string{
"tickTime": "2000",
"initLimit": "10",
},
).Broker(ctx, "2.13-3.5.0", 9092, nil)
}