Einführung Apache Kafka mit Docker und Spring Boot
Published on 30/06/2018
6 min read
In category
common
Apache Kafka ist eine distributed streaming platform – generell heißt das, Apache Kafka unterstützt bei der Erstellung einer Publisher-Subscriber Umgebung in dem Kafka eine Streaming Pipeline zwischen den Teilnehmern aufbaut. Diese Platform erfüllt die Kriterien bezüglich Verteilung, Echtzeitfähigkeit und Zuverlässigkeit.
Diese Einführung betrachtet dabei die Einrichtung von Kafka mit Docker und den gängigsten Anwendungsfällen in Spring Boot realisiert.
Einführung
Mit Apache Kafka können also Streams zwischen den Teilnehmern aufgebaut werden. Dabei muss Kafka gemäß den Publisher-Subscriber-Prinzipt die Konsumenten nicht kennen. Ein Stream besteht in Kafka aus mehreren Nachrichten (Records). Wobei solch ein Record einer Kategorie angehört (Topic) und aus Werte-Paar mit Zeitstempel besteht.
Jeder Record hat intern eine eindeutige Sequenz-Id (Offset), welche u.a. die Reihenfolge der Records vorgibt. Mit diesem Offset wird pro Konsument ermittelt, welche Records ihm noch zur Verfügung stehen.
Kafka persistiert die Records für eine konfigurierbare Zeit und räumt die alten Records auf – unabhängig davon ob diese konsumiert wurden oder nicht.
Ein Konsument kann sich an den Records entlanghangeln und selbständig entscheiden, ob er das nächste Record haben möchte, zu einem alten Offset springen will oder nur den aktuellsten.
Um die Ausfallsicherheit zu erhöhen, werden die Records eines Topics auf mehrere Partitions, welche wiederum auf unterschiedliche Server liegen sollten, verteilt. Dabei werden die Partitions im Cluster repliziert. Pro Partition für ein Record wird ein Leader definiert, der für die Lese- und Schreib-Aktivitäten zuständig ist. Zusätzlich gibt es pro Record mehrere Follower welche eine Replikation erhalten. Beim Ausfall des Leaders für eine Partition wird ein neuer von den Follower ermittelt.
Die Verteilung der Records eines Topics auf den Partitions kann mittels Round-Robin oder nach einem fachlichen Schlüssel durchgeführt werden.
Hier sei angemerkt, dass Kafka die Reihenfolge nur innerhalb einer Partition zusichern kann und nicht über alle Partitions einer Topics. Wenn in einem Topic die Reihenfolge aller Records ein wichtiges Kriterium ist, dann muss ein Topic mit nur einer Partition erstellt werden.
Dies wird deutlich, bei der Betrachtung des Konsumenten-Konzept von Kafka. Ein Konsument (Consumer Instanz) gehört einer Consumer-Group an, welche mehrere Instanzen eines Consumers umfasst. Damit wird auch die Ausfallsicherheit von Konsumenten (Subscriber) gewährleistet. Kafka verteilt die Partitions auf die Consumer-Groups und dabei bekommt jede Consumer Instanz aus der Consumer-Group ein Record von der zugewiesenen Partition. Alle Consumer-Groups bekommen alle Records aus dem Topic, jedoch bekommt nicht jede Consumer Instanz jeden Record.
Skizzenhafte Darstellung der Beziehung zwischen Topic und Consumer Groups
Kafka versucht mit dem Konzept die zwei Modelle Queue und Publish-Subscrib zu vereinen. In dem Queue-Modell können mehrere Prozesse die Nachrichten abarbeiten, jedoch ist dieses Modell nicht multi-subscriber fähig. D.h. die ermittelten Nachrichten stehen anderen Konsumenten nicht mehr zur Verfügung. Beim reinen Publish-Subscrib werden alle Nachrichten per Broadcast an alle Konsumenten verteilt. D.h. bei mehreren fachlichen identischen Konsumenten – wegen der Ausfallsicherheit – bekommen diese auch die Nachrichten und eine interne Koordination der Abarbeitung muss realisiert werden.
Mit dem Consumer Groups in Kafka werde die Nachrichten an fachlich zusammenhängende Gruppen verteilt (publish-subscribe) und innerhalb dieser Gruppe können einzelne Instanzen die Nachrichten abarbeiten (Queue).
Mehr Details bietet die gute Kafka Dokumentation [1].
Einrichtung
Für die Einrichtung von Kafka wird auf das Docker Image von Confluent.io [2] zurückgegriffen. Confluent.io ist die neu Firma, mit den Gründern von Apache Kafka, die nun Enterprise- und Cloud-Lösungen basierend auf Kafka anbietet.
Neben kostenpflichten Produkten bietet Confluent.io auch kostenlose Open Source Lösungen an, wie z.B. verschiedene Docker Images [3].
Für die Einrichtung wird folgende Docker Image verwendet: confluentinc/cp-kafka
Alternativ kann Apache Kafka direkt installiert werden, siehe hierzu [4].
Die Confluent Open Source Variante besitzt ein paar weitere Features wie
- REST Schnittstelle zur Verwaltung von Apache Kafka
- Ein Schema Registry
- KSQL, eine SQL ähnliche Schnittstelle zur Abfrage von Streams
Confluent.io bietet bei Github ein paar Docker Compose Dateien an
https://github.com/confluentinc/cp-docker-images/tree/master/examples
Für die weitere Einrichtung wird eine Kafka Umgebung mit einem Knoten aufgebaut.
Dabei wird das Docker Compose vom Confluent.io verwendet und die Umgebung kann wie folgt gestartet werden:
docker-compose up -d
Anschließend steht Zookeeper und Kafka selbst unter localhost:29092 zur Verfügung.
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
0e63357fa32e confluentinc/cp-kafka:latest "/etc/confluent/dock…" About a minute ago Up About a minute docker_kafka_1
46356a0045d0 confluentinc/cp-zookeeper:latest "/etc/confluent/dock…" About a minute ago Up About a minute docker_zookeeper_1
Als nächstes wird eine neue Topic angelegt, falls nicht schon vorhanden:
docker-compose -f docker-compose-single.yml exec kafka \
kafka-topics --create --topic a-topic.v1.case1 \
--partitions 1 --replication-factor 1 \
--if-not-exists \
--zookeeper localhost:32181
Created topic "a-topic.v1.case1".
Es wird dabei nur eine Partition angelegt, da wir auch nur einen Knoten haben.
Details lassen sich wie folgt anzeigen:
# List existing topics
docker-compose -f docker-compose-single.yml exec kafka \
kafka-topics --list --zookeeper localhost:32181
__confluent.support.metrics
a-topic.v1.case1
# Display details to a topic
docker-compose -f docker-compose-single.yml exec kafka \
kafka-topics --describe --topic a-topic.v1.case1 --zookeeper localhost:32181
Topic:a-topic.v1.case1 PartitionCount:1 ReplicationFactor:1 Configs:
Topic: a-topic.v1.case1 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Producer und Consumer
Mit Spring Boot lässt sich einfach ein Producer und Consumer konfigurieren. Für den Versand wird ein KafkaTemplate angeboten und für den Empfang steht ein @KafkaListener zur Verfügung.
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
final Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
final Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, host);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
Ein Fragment für den Versand mit dem KafkaTemplate kann wie folgt aussehen:
public void send(final String topic, final String message) {
LOG.info("sending message='{}' to topic='{}'", message, topic);
final ListenableFuture<SendResult<Integer, String>> result = kafkaTemplate.send(topic, message);
SendResult<Integer, String> sendResult;
try {
sendResult = result.get();
final RecordMetadata metaData = sendResult.getRecordMetadata();
LOG.info("recordMetadata: {}", metaData);
} catch (InterruptedException | ExecutionException e) {
LOG.error("Waiting for result failed.", e);
}
}
Der Empfang wird mittels dem Listener automatisch aufgerufen:
@KafkaListener(topics = "${app.kafka.topic.atopic}")
public void receive(@Payload final String message, @Headers final MessageHeaders headers) {
LOG.info("received message='{}'", message);
headers.keySet().forEach(key -> LOG.info("{}: {}", key, headers.get(key)));
}
Fazit
Apache Kafka + Docker + Spring Boot, eine Kombination um sehr schnell ein Szenario basierend auf Kafka aufzubauen.
Der Source Code ist in GitHub zu finden.
Referenzen
- [1] Apache Kafka Intro: https://kafka.apache.org/intro.html
- [2] Confluent.io: https://www.confluent.io/
- [3] Confluent Docker Hub: https://hub.docker.com/u/confluentinc/
- [4] Apache Kafka Quickstart/Installation: https://kafka.apache.org/quickstart