Einführung Apache Kafka mit Docker und Spring Boot

Apache Kafka ist eine distributed streaming platform – generell heißt das, Apache Kafka unterstützt bei der Erstellung einer Publisher-Subscriber Umgebung in dem Kafka eine Streaming Pipeline zwischen den Teilnehmern aufbaut. Diese Platform erfüllt die Kriterien bezüglich Verteilung, Echtzeitfähigkeit und Zuverlässigkeit.

Diese Einführung betrachtet dabei die Einrichtung von Kafka mit Docker und den gängigsten Anwendungsfällen in Spring Boot realisiert.

Einführung

Mit Apache Kafka können also Streams zwischen den Teilnehmern aufgebaut werden. Dabei muss Kafka gemäß den Publisher-Subscriber-Prinzipt die Konsumenten nicht kennen. Ein Stream besteht in Kafka aus mehreren Nachrichten (Records). Wobei solch ein Record einer Kategorie angehört (Topic) und aus Werte-Paar mit Zeitstempel besteht.

Jeder Record hat intern eine eindeutige Sequenz-Id (Offset), welche u.a. die Reihenfolge der Records vorgibt. Mit diesem Offset wird pro Konsument ermittelt, welche Records ihm noch zur Verfügung stehen.

Kafka persistiert die Records für eine konfigurierbare Zeit und räumt die alten Records auf – unabhängig davon ob diese konsumiert wurden oder nicht.

Ein Konsument kann sich an den Records entlanghangeln und selbständig entscheiden, ob er das nächste Record haben möchte, zu einem alten Offset springen will oder nur den aktuellsten.

Um die Ausfallsicherheit zu erhöhen, werden die Records eines Topics auf mehrere Partitions, welche wiederum auf unterschiedliche Server liegen sollten, verteilt. Dabei werden die Partitions im Cluster repliziert. Pro Partition für ein Record wird ein Leader definiert, der für die Lese- und Schreib-Aktivitäten zuständig ist. Zusätzlich gibt es pro Record mehrere Follower welche eine Replikation erhalten. Beim Ausfall des Leaders für eine Partition wird ein neuer von den Follower ermittelt.

Die Verteilung der Records eines Topics auf den Partitions kann mittels Round-Robin oder nach einem fachlichen Schlüssel durchgeführt werden.

Hier sei angemerkt, dass Kafka die Reihenfolge nur innerhalb einer Partition zusichern kann und nicht über alle Partitions einer Topics. Wenn in einem Topic die Reihenfolge aller Records ein wichtiges Kriterium ist, dann muss ein Topic mit nur einer Partition erstellt werden.

Dies wird deutlich, bei der Betrachtung des Konsumenten-Konzept von Kafka. Ein Konsument (Consumer Instanz) gehört einer Consumer-Group an, welche mehrere Instanzen eines Consumers umfasst. Damit wird auch die Ausfallsicherheit von Konsumenten (Subscriber) gewährleistet. Kafka verteilt die Partitions auf die Consumer-Groups und dabei bekommt jede Consumer Instanz aus der Consumer-Group ein Record von der zugewiesenen Partition. Alle Consumer-Groups bekommen alle Records aus dem Topic, jedoch bekommt nicht jede Consumer Instanz jeden Record.

Kafka Topics 300x278

Skizzenhafte Darstellung der Beziehung zwischen Topic und Consumer Groups

Kafka versucht mit dem Konzept die zwei Modelle Queue und Publish-Subscrib zu vereinen. In dem Queue-Modell können mehrere Prozesse die Nachrichten abarbeiten, jedoch ist dieses Modell nicht multi-subscriber fähig. D.h. die ermittelten Nachrichten stehen anderen Konsumenten nicht mehr zur Verfügung. Beim reinen Publish-Subscrib werden alle Nachrichten per Broadcast an alle Konsumenten verteilt. D.h. bei mehreren fachlichen identischen Konsumenten – wegen der Ausfallsicherheit – bekommen diese auch die Nachrichten und eine interne Koordination der Abarbeitung muss realisiert werden.

Mit dem Consumer Groups in Kafka werde die Nachrichten an fachlich zusammenhängende Gruppen verteilt (publish-subscribe) und innerhalb dieser Gruppe können einzelne Instanzen die Nachrichten abarbeiten (Queue).

Mehr Details bietet die gute Kafka Dokumentation [1].

Einrichtung

Für die Einrichtung von Kafka wird auf das Docker Image von Confluent.io [2] zurückgegriffen. Confluent.io ist die neu Firma, mit den Gründern von Apache Kafka, die nun Enterprise- und Cloud-Lösungen basierend auf Kafka anbietet.

Neben kostenpflichten Produkten bietet Confluent.io auch kostenlose Open Source Lösungen an, wie z.B. verschiedene Docker Images [3].

Für die Einrichtung wird folgende Docker Image verwendet: confluentinc/cp-kafka

Alternativ kann Apache Kafka direkt installiert werden, siehe hierzu [4].

Die Confluent Open Source Variante besitzt ein paar weitere Features wie

  • REST Schnittstelle zur Verwaltung von Apache Kafka
  • Ein Schema Registry
  • KSQL, eine SQL ähnliche Schnittstelle zur Abfrage von Streams

Confluent.io bietet bei Github ein paar Docker Compose Dateien an

https://github.com/confluentinc/cp-docker-images/tree/master/examples

Für die weitere Einrichtung wird eine Kafka Umgebung mit einem Knoten aufgebaut.

Dabei wird das Docker Compose vom Confluent.io verwendet und die Umgebung kann wie folgt gestartet werden:

docker-compose up -d

Anschließend steht Zookeeper und Kafka selbst unter localhost:29092 zur Verfügung.

$ docker ps
CONTAINER ID        IMAGE                              COMMAND                  CREATED              STATUS              PORTS               NAMES
0e63357fa32e        confluentinc/cp-kafka:latest       "/etc/confluent/dock…"   About a minute ago   Up About a minute                       docker_kafka_1
46356a0045d0        confluentinc/cp-zookeeper:latest   "/etc/confluent/dock…"   About a minute ago   Up About a minute                       docker_zookeeper_1

Als nächstes wird eine neue Topic angelegt, falls nicht schon vorhanden:

docker-compose -f docker-compose-single.yml exec kafka  \
   kafka-topics --create --topic a-topic.v1.case1 \
   --partitions 1 --replication-factor 1 \
   --if-not-exists \
   --zookeeper localhost:32181

Created topic "a-topic.v1.case1".

Es wird dabei nur eine Partition angelegt, da wir auch nur einen Knoten haben.

Details lassen sich wie folgt anzeigen:

# List existing topics
docker-compose -f docker-compose-single.yml exec kafka  \
     kafka-topics --list --zookeeper localhost:32181

__confluent.support.metrics
a-topic.v1.case1


# Display details to a topic
docker-compose -f docker-compose-single.yml exec kafka  \
     kafka-topics --describe --topic a-topic.v1.case1 --zookeeper localhost:32181


Topic:a-topic.v1.case1	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: a-topic.v1.case1	Partition: 0	Leader: 1	Replicas: 1	Isr: 1

Producer und Consumer

Mit Spring Boot lässt sich einfach ein Producer und Consumer konfigurieren. Für den Versand wird ein KafkaTemplate angeboten und für den Empfang steht ein @KafkaListener zur Verfügung.

@Bean
	ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
		final ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
		factory.setConsumerFactory(consumerFactory());
		return factory;
	}

	@Bean
	public ConsumerFactory<Integer, String> consumerFactory() {
		return new DefaultKafkaConsumerFactory<>(consumerConfigs());
	}

	@Bean
	public Map<String, Object> consumerConfigs() {
		final Map<String, Object> props = new HashMap<>();
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, host);
		props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
		props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		return props;
	}

	@Bean
	public ProducerFactory<Integer, String> producerFactory() {
		return new DefaultKafkaProducerFactory<>(producerConfigs());
	}

	@Bean
	public Map<String, Object> producerConfigs() {
		final Map<String, Object> props = new HashMap<>();
		props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, host);
		props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
		return props;
	}

	@Bean
	public KafkaTemplate<Integer, String> kafkaTemplate() {
		return new KafkaTemplate<Integer, String>(producerFactory());
	}

Ein Fragment für den Versand mit dem KafkaTemplate kann wie folgt aussehen:

public void send(final String topic, final String message) {
		LOG.info("sending message='{}' to topic='{}'", message, topic);

		final ListenableFuture<SendResult<Integer, String>> result = kafkaTemplate.send(topic, message);

		SendResult<Integer, String> sendResult;
		try {
			sendResult = result.get();
			final RecordMetadata metaData = sendResult.getRecordMetadata();

			LOG.info("recordMetadata: {}", metaData);
		} catch (InterruptedException | ExecutionException e) {
			LOG.error("Waiting for result failed.", e);
		}
	}

Der Empfang wird mittels dem Listener automatisch aufgerufen:

@KafkaListener(topics = "${app.kafka.topic.atopic}")
	public void receive(@Payload final String message, @Headers final MessageHeaders headers) {
		LOG.info("received message='{}'", message);

		headers.keySet().forEach(key -> LOG.info("{}: {}", key, headers.get(key)));
	}

Fazit

Apache Kafka + Docker + Spring Boot, eine Kombination um sehr schnell ein Szenario basierend auf Kafka aufzubauen.

Der Source Code ist in GitHub zu finden.

Referenzen

comment

Comments

arrow_back

Previous

Showcase: Node.js and App ID

Next

OpenShift Einführung mit automatischen Deployments
arrow_forward