Streaming with Kafka

Why we build it

In the simplest of cases, 2 applications can communiate with each other via simple REST APIs. This works very well in most cases, but it has downsides when the application deviates from basic operations:

For example, when communicating via REST, both servers have to be available and ready at the time of the request. If they are, everything will work as expected. Assuming both sides provide the correct endpoints, you can even implement bi-directional communication, and as long as multiple consumers are known to any server trying to send out messages, it will even allow a producer to distribute data to multiple consumers.

Kafka allows us to cover cases, where certain metrics matter, which can only be fulfilled by a traditional REST API landscape under certain circumstances, if at all. Some examples have been summarized by ChatGPT below:

1. High-Throughput Data Processing

  • Kafka: Exceptionally well-suited for handling high volumes of data, Kafka can efficiently process millions of messages per second.

  • REST: Typically not designed for such high throughput, REST APIs may struggle with large-scale data processing.

2. Real-Time Data Streaming

  • Kafka: Kafka shines in real-time data streaming scenarios. It's designed to handle continuous data streams, making it ideal for real-time analytics, monitoring, and messaging.

  • REST: REST is more request-response oriented and isn't natively built for continuous data streams.

3. Distributed Systems and Scalability

  • Kafka: It supports distributed system environments natively, allowing for scalable and fault-tolerant architectures.

  • REST: While REST APIs can be scaled, they require additional infrastructure and management to achieve similar levels of scalability and fault tolerance.

4. Data Persistence

  • Kafka: Offers data persistence, allowing messages to be stored and replayed. This is crucial for scenarios like event sourcing or recovering from system failures.

  • REST: Typically doesn’t involve data persistence; once a response is sent, the data isn’t stored by the REST API itself.

5. Decoupling of Data Producers and Consumers

  • Kafka: Allows producers and consumers to operate independently. Consumers can read data at their own pace without being online at the time of data production.

  • REST: Generally requires a more synchronous interaction between client and server.

6. Event-Driven Architecture

  • Kafka: Ideal for event-driven architectures, where the focus is on reacting to events (data changes) in real-time.

  • REST: While it can be used in event-driven architectures, REST is more suited for direct service-to-service communication and state transfer.

7. Log Aggregation and Monitoring

  • Kafka: Widely used for log aggregation from distributed systems, enabling centralized logging and monitoring.

  • REST: Not typically used for log aggregation; it's more focused on exposing and consuming services or data.

8. Fault Tolerance and Reliability

  • Kafka: Provides strong durability guarantees and fault tolerance, ensuring data isn't lost in case of system failures.

  • REST: Does not inherently provide these features; they must be implemented at the application or infrastructure level.

Now to be clear, not every one of these use cases imply the need for Kafka, and certainly not all of them are required to qualify Kafka a as the right solution. Nonetheless, it is a widely known and used technology choice, and it gets the job done.



What we build

Assume the following use case:

You subscribe to a data provider like Bloomberg, which delivers a feed of price data into your system landscape. Inside your landscape, you may want to use that data in multiple applications:

  1. A data warehouse for data analysis

  2. A position valuation service for risk management and accounting

  3. A client facing application providing price data to end users before they decide to buy a stock.

If each of these applications fetches that data from the external data provider every time, you will unnecessarily increase network traffic and likely exceed usage quotas specified in your SLA. Talking about SLAs, this example is entirely theoretical of course. In reality, any consumer of a data feed is well advised to check if the way they are using it, complies with the terms and conditions they signed.

In our example, we will instead implement 3 services:

  1. A producer, generating random price data once a second

  2. Two consumers, acting as consumers for the price data feed.

How we build it

Installing Kafka

For the installation of Apache, we will for now focus on the setup of a local cluster. Java T Point has created an excellent tutorial explaining the installation of Kafka and Zookeeper on a windows machine: Installation of Apache Kafka - javatpoint

Start Kafka Server

# ensure you run the command inside the directory where Kafka is installed
kafka-server-start.bat config\server.properties

Start Zookeeper

# ensure you run the command inside the directory where Kafka is installed
zookeeper-server-start.bat config\zookeeper.properties

Create Kafka Topic

Now that we have installed Kafka and Zookeeper and they are both running on our local machine, we can proceed by creating a Kafka Topic:

kafka-topics.bat -create -topic market-data -bootstrap-server localhost:9092 -replication-factor 1 -partitions 4

The creation will be confirmed in the CLI:

PS C:\Users\danie> kafka-topics.bat -create -topic market-data -bootstrap-server localhost:9092 -replication-factor 1 -partitions 4
Created topic market-data.

Write to Kafka Topic

Writing to a Kafka Topic is equally simple:

kafka-console-producer --broker-list localhost:9092 --topic market-data --property parse.key=true --property key.separator=|  

The Command will open a console input prompt and we can start writing our messages to the topic using a key. Once we are done, we can press 'Ctrl+c' and exit by pressing the 'Y'.

Read from a Kafka Topic

Now that we have a topic and wrote a message to it, let us try to fetch the message from the topic:

kafka-console-consumer -bootstrap-server localhost:9092 -topic market-data -from-beginning

It is important here to add the argument “-from-beginning” to the query, to ensure that even those messages are returned, which were written before we started the consumer.

PS C:\Users\danie> kafka-console-consumer -bootstrap-server localhost:9092 -topic market-data -from-beginning
hello world
message 2

The producer

Creating the Spring Boot application

As a first step, we want to generate data that we can actually feed into the Kafka cluster. To get this up and running, we head to Spring Initializr and specify a new project. Keeping things simple, we only add Spring Web, Kafka and Lombok for now. The

Once the package is downloaded, we import it into our IDE, trigger the Maven build, and create a run configuration, to test the correct setup. The project gets versioned on GitHub: Danielmethner/kafka-producer: A sample Kafka producer (github.com)

Generating messages:

We want to generate messages automatically, with a scheduled function that is executed once per second.

To do so, we annotate the entry point of our application with "@EnableScheduling”. As the name suggests, it enables scheduled methods in Spring Boot. The actual method will then also be annotated accordingly, to cause the generation of 2 random numbers per second:

@Component
public class DemoDataRunner {

    private static final Logger log = LoggerFactory.getLogger(DemoDataRunner.class);
    private final Random random = new Random();

    @Scheduled(fixedRate = 500)
    public void generateRandomNumber() {
        double randomNumber = (random.nextDouble() * 0.6) + 0.7;
        randomNumber = Math.round(randomNumber * 10000.0) / 10000.0;
        log.info("Generated random number: {}", randomNumber);
    }

}

If we run the application now, we will be able to read the generated numbers in the console:

We will later use these numbers as the message content to be read by our consumer applications.

Now it is time to start the fun part:

Create the Kafka producer

We will modify our DemoDataRunner to implement CommandLineRunner, so we can override the run() method, which is executed right after the application startup. The run method will configure our Kafka Producer, which we will keep in this class.

Now our random number generator method gets a little upgrade, making sure it sends those previous numbers to our Kafka topic.

We do of course also want to make sure that the Producer get cleaned up upon termination. We hence also add a method to close the KafkaProducer pre destroy:

package com.danielmethner.kafkaproducer.demodata;

import jakarta.annotation.PreDestroy;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.Properties;
import java.util.Random;

@Component
public class DemoDataRunner implements CommandLineRunner {

    private static final Logger log = LoggerFactory.getLogger(DemoDataRunner.class);
    private final Random random = new Random();

    private KafkaProducer<String, String> kafkaProducer;

    @Scheduled(fixedRate = 500)
    public void generateRandomNumber() {
        double randomNumber = (random.nextDouble() * 0.6) + 0.7;
        randomNumber = Math.round(randomNumber * 10000.0) / 10000.0;
        if (kafkaProducer != null) {
            ProducerRecord<String, String> kafkaRecord = new ProducerRecord<>("market-data", "random-number", String.valueOf(randomNumber));
            log.info("Sending random number: {}", randomNumber);
            kafkaProducer.send(kafkaRecord);
            kafkaProducer.flush();
        }
        log.info("Generated random number: {}", randomNumber);
    }

    @Override
    public void run(String... args) throws Exception {
        log.info("Starting DemoDataRunner");
        String bootstrapServers = "127.0.0.1:9092";
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        this.kafkaProducer = new KafkaProducer<>(properties);
    }

    @PreDestroy
    public void close() {
        log.info("Closing DemoDataRunner");
        if (kafkaProducer != null) {
            kafkaProducer.close();
            log.info("Closed KafkaProducer");
        }
    }
}

Assuming that everything works as expected, we can now run the application and observe the generated messages in the consumer CLI, right after the test messages we sent earlier:

Creating the Consumer

For the consumer, we will create a separate Github repository, also containing a Spring Boot application: Danielmethner/kafka-consumer (github.com)

The setup is very straight forward and similar to the producer, except now we initialize a consumer and poll the topic we used in the other application.

@Component
public class ConsumerRunner implements CommandLineRunner {

    Logger logger = Logger.getLogger(ConsumerRunner.class.getName());

    private KafkaConsumer<String, String> kafkaConsumer;

    @Override
    public void run(String... args) throws Exception {
        logger.info("Initialize Runner!");
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-first-consumer-group");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        this.kafkaConsumer = new KafkaConsumer<>(properties);

        kafkaConsumer.subscribe(java.util.Arrays.asList("market-data"));

        while (true) {
            kafkaConsumer.poll(java.time.Duration.ofMillis(100))
                .forEach(record -> {
                    logger.info("Received record: " + record.value());
                });
        }
    }
}



Assuming the configurations are correct, you can now start both the producer and the consumer and you will receive the data accordingly:

That is already it. We now have a functioning proof of concept for a kafka producer and consumer, written in Spring Boot, with the Kafka instance running on a windows machine.

Next
Next

Creating a Trading Simulation Platform