Reading data from Amazon Kinesis Data Streams in Spring Boot

After putting data into the Amazon Kinesis Data Stream, you also need to read and process those data for further use. To achieve this, we need a consumer application which is used to retrieve and process all data from a Kinesis Data Stream. So, in this tutorial, we will build a consumer application for reading data from Amazon Kinesis Data Streams and saving those data in a MongoDB database using Java Spring Boot.

Follow the steps below to create a sample Kinesis Consumer application in Spring Boot:

  1. Visit Spring Initializr website at https://start.spring.io.
  2. Create a Spring Boot application with details as follows:
    • Project: Choose the project type (Maven or Gradle).
    • Language: Set the language to Java.
    • Spring Boot: Specify the Spring Boot version. The default selection is the latest stable version of Spring Boot, so you can leave it unchanged.
    • Project Metadata: Enter a Group and Artifact name for your project. The group name is the id of the project. Artifact is the name of your project. Add any necessary project metadata (description, package name, etc.)
    • Choose between packaging as a JAR (Java Archive) or a WAR (Web Application Archive) depends on how you plan to deploy your Spring Boot application. Choose JAR packaging if you want a standalone executable JAR file and WAR packaging if you intend to deploy your application to a Java EE application server or servlet container. When you package your Spring Boot application as a JAR using JAR packaging, it includes an embedded web server, such as Tomcat, by default. This means that you don't need to separately deploy your application to an external Tomcat server. Instead, you can run the JAR file directly, and the embedded Tomcat server will start and serve your application.
    • Select the Java version based on the compatibility requirements of your project. Consider the specific needs of your project, any compatibility requirements, and the Java version supported by your target deployment environment when making these choices.
  3. Add project dependencies:
    • Click on the "Add Dependencies" button.
    • Choose the following dependencies: Spring Web, Lombok, and Spring Boot DevTools.

    For example:

  4. Generate the project:
    • Click on the "Generate" button.
    • Spring Initializr will generate a zip file containing your Spring Boot project.
  5. Download and extract the generated project:
    • Download the zip file generated by Spring Initializr.
    • Extract the contents of the zip file to a directory on your local machine.
  6. Import the project into your IDE:
    • Open your preferred IDE (IntelliJ IDEA, Eclipse, or Spring Tool Suite).
    • Import the extracted project as a Maven or Gradle project, depending on the build system you chose in Spring Initializr.
  7. Add Dependency:
  8. Add Amazon Kinesis Kinesis Client Library from Maven repository:

    For Maven:

    Add the following dependency to your pom.xml file:

    <dependency>
        <groupId>software.amazon.kinesis</groupId>
        <artifactId>amazon-kinesis-client</artifactId>
        <version>2.5.2</version>
    </dependency>

    For Gradle:

    Add the following dependency to your build.gradle file:

    implementation group: 'software.amazon.kinesis', name: 'amazon-kinesis-client', version: '2.5.2'
  9. Add Configurations:
  10. Open the src/main/resources/application.properties file in your Eclipse editor and add the following configuration lines to the file:

    server.port = 8080
      
    #use your aws credentials here
    aws.access-key = your-access-key
    aws.access-secret = your-access-secret
    aws.region = us-east-1
      
    #stream name from where data must be read
    aws.stream-name = tutorialsbuddy-stream
    
    application.name = consumer
  11. Create DTO (Data Transfer Object) class:
  12. Create a DTO class named Payload:

    package com.example.app.consumer.dto;
    
    import lombok.Data;
    
    @Data
    public class Payload {
    
      private String vehicleId;
      private String driverId;
      private double speed;
      private String coordinatesLatitude;
      private String coordinatesLongitude;
      
    }
  13. Implement ShardRecordProcessor Interface:
  14. Create a class named RecordProcessor to implement the ShardRecordProcessor interface of the KCL. The ShardRecordProcessor interface contains methods to initialize and process records from a Kinesis data stream:

    package com.example.app.consumer.processor;
    
    import java.nio.ByteBuffer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import com.example.app.consumer.dto.Payload;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import software.amazon.kinesis.exceptions.InvalidStateException;
    import software.amazon.kinesis.exceptions.ShutdownException;
    import software.amazon.kinesis.lifecycle.events.InitializationInput;
    import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
    import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
    import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
    import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
    import software.amazon.kinesis.processor.ShardRecordProcessor;
    import software.amazon.kinesis.retrieval.KinesisClientRecord;
    
    public class RecordProcessor implements ShardRecordProcessor {
      private static final Logger LOG = LoggerFactory.getLogger(RecordProcessor.class);
    
      @Override
      public void initialize(InitializationInput initializationInput) {
        LOG.info("Initialization complete...");
      }
    
      @Override
      public void processRecords(ProcessRecordsInput processRecordsInput) {
        // Data is read here from the Kinesis data stream
        for (KinesisClientRecord record : processRecordsInput.records()) {
    
          LOG.info("Processing Record For Partition Key : {}", record.partitionKey());
    
          String originalData = "";
    
          try {
            byte[] b = new byte[record.data().remaining()];
            ByteBuffer byteBuf = record.data().get(b);
            originalData = new String(byteBuf.array(), "UTF-8");
    
            LOG.info("Data from kinesis stream : {}", originalData);
    
            ObjectMapper mapper = new ObjectMapper();
    
            Payload[] payloads = mapper.readValue(originalData, Payload[].class);
            // You can call a method to save data to database here
            for (Payload payload : payloads) {
              System.out.println("vehicleId : " + payload.getVehicleId());
              System.out.println("driverId : " + payload.getDriverId());
              System.out.println("speed : " + payload.getSpeed());
              System.out.println("CoordinatesLatitude : " + payload.getCoordinatesLatitude());
              System.out.println("CoordinatesLongitude : " + payload.getCoordinatesLongitude());
            }
    
          } catch (Exception e) {
            LOG.error("Error parsing record {}", e);
            System.exit(1);
          }
    
          try {
            /*
             * KCL assumes that the call to checkpoint means that all records have been processed,
             * records which are passed to the record processor.
             */
            processRecordsInput.checkpointer().checkpoint();
    
          } catch (Exception e) {
            LOG.error("Error during Processing of records", e);
          }
        }
      }
    
      @Override
      public void leaseLost(LeaseLostInput leaseLostInput) {
        LOG.error("leaseLost {}", leaseLostInput);
      }
    
      @Override
      public void shardEnded(ShardEndedInput shardEndedInput) {
        LOG.error("shardEnded {}", shardEndedInput);
        try {
          shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
          e.printStackTrace();
        }
      }
    
      @Override
      public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
          shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
          e.printStackTrace();
        }
      }
    
    }
  15. Implement ShardRecordProcessorFactory Interface:
  16. Create a class named RecordProcessorFactory to implement ShardRecordProcessorFactory interface of the KCL. The ShardRecordProcessorFactory interface contains one method that must be implemented by returning an instance of ShardRecordProcessor implementation class as shown in code below:

    package com.example.app.consumer.processor;
    
    import org.springframework.stereotype.Component;
    import software.amazon.kinesis.processor.ShardRecordProcessor;
    import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
    
    @Component
    public class RecordProcessorFactory implements ShardRecordProcessorFactory {
    
      @Override
      public ShardRecordProcessor shardRecordProcessor() {
        return new RecordProcessor();
      }
    
    }
  17. Configure Consumer:
  18. Create a class named ConsumerConfig to configure the Consumer:

    package com.example.app.consumer.config;
    
    import java.util.UUID;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    import com.example.app.consumer.processor.RecordProcessorFactory;
    import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
    import software.amazon.awssdk.auth.credentials.AwsCredentials;
    import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
    import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
    import software.amazon.awssdk.regions.Region;
    import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
    import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
    import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
    import software.amazon.kinesis.common.ConfigsBuilder;
    import software.amazon.kinesis.common.KinesisClientUtil;
    
    @Component
    public class ConsumerConfig {
      private static final Logger logger = LoggerFactory.getLogger(ConsumerConfig.class);
    
      @Value(value = "${aws.stream-name}")
      private String streamName;
    
      @Value(value = "${application.name}")
      private String applicationName;
    
      @Value(value = "${aws.region}")
      private String awsRegion;
    
      @Value(value = "${aws.access-key}")
      private String accessKey;
    
      @Value(value = "${aws.access-secret}")
      private String secretKey;
    
      public ConfigsBuilder getConfigBuilder() {
        logger.info("Getting client configucation");
    
        AwsCredentials awsCreds = AwsBasicCredentials.create(accessKey, secretKey);
    
        AwsCredentialsProvider awsCredential = StaticCredentialsProvider.create(awsCreds);
    
        Region region = Region.of(awsRegion);
    
        KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(
            KinesisAsyncClient.builder().credentialsProvider(awsCredential).region(region));
    
        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
    
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
    
        return new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient,
            cloudWatchClient, UUID.randomUUID().toString(), new RecordProcessorFactory());
      }
    
    }
  19. Set up a scheduler for checking and consuming data:
  20. Next, we will implement the CommandLineRunner interface in our Spring application's main class. The CommandLineRunner interface has a run method that is called automatically after the application context is loaded. Within this run method, we will set up a Scheduler class (also known as a worker in earlier versions of the KCL). The Scheduler class serves as an entry point to the KCL:

    package com.example.app;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.context.ApplicationContext;
    import com.example.app.consumer.config.ConsumerConfig;
    import software.amazon.kinesis.common.ConfigsBuilder;
    import software.amazon.kinesis.coordinator.Scheduler;
    
    @SpringBootApplication
    public class AwsKinesisConsumerExampleApplication implements CommandLineRunner {
    
      public static void main(String[] args) {
        SpringApplication.run(AwsKinesisConsumerExampleApplication.class, args);
      }
    
      @Autowired
      private ApplicationContext context;
    
      @Override
      public void run(String... args) throws Exception {
    
        ConsumerConfig consumerConfig = context.getBean(ConsumerConfig.class);
    
        ConfigsBuilder configsBuilder = consumerConfig.getConfigBuilder();
    
        /**
         * The Scheduler is the entry point to the KCL. This instance is configured with defaults
         * provided by the ConfigsBuilder.
         */
        Scheduler scheduler =
            new Scheduler(configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(), configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig().maxListShardsRetryAttempts(5));
    
    
        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();
    
      }
    
    }