Read data from Amazon Kinesis Data Streams in Java Spring Boot Example

After putting data into the Amazon Kinesis Data Stream, you also need to read and process those data for further use. To achieve this, we need a consumer application which is used to retrieve and process all data from a Kinesis Data Stream. So, in this tutorial, we will build a consumer application for reading data from Amazon Kinesis Data Streams and saving those data in a MongoDB database using Java Spring Boot.

Follow the steps below to build this Kinesis sample Consumer application:

Create a Spring Boot Application

  1. Go to Spring Initializr at https://start.spring.io and create a Spring Boot application with details as follows:
    • Project: Choose Gradle Project or Maven Project.
    • Language: Java
    • Spring Boot: Latest stable version of Spring Boot is selected by default. So leave it as is.
    • Project Metadata: Provide group name in the Group field. The group name is the id of the project. In Artifact field, provide the name of your project. In the package field, provide package name for your project. Next, select your preferred version of Java that is installed on your computer and is available on your local environment.
    • Dependencies: Add dependencies for Spring Web, Spring Data Mongodb , and Spring Boot DevTools .

    Refer to the image below for example:

    Create Kinesis Consumer Sample Application in Java Spring Boot
  2. Click the GENERATE button and save/download the project zip bundle.
  3. Extract the project to your preferred working directory.
  4. Import the project in your preferred Java development IDE such as Eclipse or IntelliJ IDEA.

The final project structure of our sample application will look something like this after completion in a hierarchical package presentation view:

Add Dependency

Add KCL dependency to the project. Find the latest version of the KCL in the Maven Repository.

For Gradle

Open the default build.gradle file and add the gradle dependency for KPL:

kinesis-consumer-sample/build.gradle

    implementation 'software.amazon.kinesis:amazon-kinesis-client:2.3.4'

For Maven

Add the following dependency of KPL to pom.xml file:

kinesis-consumer-sample/pom.xml

    

Add Application Configurations

Open the application.properties file and add the following configuration to it. Do not forget to replace the configuration values that is relevant to your project.

kinesis-consumer-sample/src/main/resources/application.properties

#port on which the application would run
server.port = 8000
  
#use your aws credentials here
aws.access-key = AK2ASI5XVVY4JVL46ONF
aws.access-secret = sdYwUXMeBUDqI/YvJNWoHAlzYnWQ75qRGw06jBAR
aws.region = us-east-1
  
#stream name from where data must be read
aws.stream-name = tutorialsbuddy-stream
  
#stream name from where data must be read
aws.stream_name = tutorialsbuddy-stream
  
#Mongodb Configuration
spring.data.mongodb.host = 127.0.0.1
spring.data.mongodb.port = 27017
spring.data.mongodb.database = tutorials_buddy
spring.data.mongodb.username = buddy
spring.data.mongodb.password = buddy
    

Create a DTO class

Create TrackDto.java Java class with getter and setter methods. We will use this class as a DTO class (Data Transfer Object).

kinesis-consumer-sample/src/main/java/com/sample/kinesis/consumer/dto/TrackDto.java

package com.sample.kinesis.consumer.dto;

public class TrackDto {
  
  private String vehicleId;
  private String driverId;
  private String driverName;
     
  public String getVehicleId() {
    return vehicleId;
  }
     
  public void setVehicleId(String vehicleId) {
    this.vehicleId = vehicleId;
  }
   
  public String getDriverId() {
    return driverId;
  }
   
  public void setDriverId(String driverId) {
    this.driverId = driverId;
  }
   
  public String getDriverName() {
    return driverName;
  }
   
  public void setDriverName(String driverName) {
    this.driverName = driverName;
  }

}
    

Create a Model class

After reading data from a Kinesis data stream, we will save them in a MongoDB database. To perform this operation, let's create a Track.java model class and annotate it with @Document(collection = "track") annotation. The @Document annotation identifies a class as being a document object that we want to persist to the database. The attributes of the document class are mapped to the columns of the database table.

kinesis-consumer-sample/src/main/java/com/sample/kinesis/consumer/model/Track

  package com.sample.kinesis.consumer.model;
  
  import java.time.LocalDateTime;
  import org.springframework.data.annotation.CreatedDate;
  import org.springframework.data.annotation.Id;
  import org.springframework.data.annotation.LastModifiedDate;
  import org.springframework.data.mongodb.core.mapping.Document;
      
  @Document(collection = "track")
  public class Track {
      
     @Id
     private String id;
     private String vehicleId;
     private String driverId;
     private String driverName;
     @CreatedDate
     protected LocalDateTime createdDateTime;
     @LastModifiedDate
     protected LocalDateTime lastModifiedDateTime;
      
     public String getId() {
       return id;
     }
      
     public void setId(String id) {
       this.id = id;
     }
      
     public String getVehicleId() {
       return vehicleId;
     }
      
     public void setVehicleId(String vehicleId) {
       this.vehicleId = vehicleId;
     }
      
     public String getDriverId() {
       return driverId;
     }
      
      public void setDriverId(String driverId){
        this.driverId = driverId;
      }
      
      public String getDriverName() {
        return driverName;
      }
      
      public void setDriverName(String driverName) {
        this.driverName = driverName;
      }
      
      public LocalDateTime getCreatedDateTime() {
        return createdDateTime;
      }
      
      public void setCreatedDateTime(LocalDateTime createdDateTime) {
        this.createdDateTime = createdDateTime;
      }
      
      public LocalDateTime getLastModifiedDateTime() {
        return lastModifiedDateTime;
      }
      
      public void setLastModifiedDateTime(LocalDateTime lastModifiedDateTime) {
        this.lastModifiedDateTime = lastModifiedDateTime;
      }
     
  }
    

Add a Repository Interface

To save data on the MongoDB table, we will create a TrackRepository.java Java interface and extend it with MongoRepository interface. The MongoRepository interface provides generic CRUD operations on a repository for a specific type.


    package com.sample.kinesis.consumer.repository;

import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.stereotype.Repository;
import com.sample.kinesis.consumer.model.Track;
     
@Repository
public interface TrackRepository extends MongoRepository<Track, String> {
     
}
    

Create Service Interface

Create TrackService.java Java interface with two methods:

  1. public List<Track> addTrack(List<Track> trackDetails) - saves a list of track data retrieved from a Kinesis data stream into MongoDB.
  2. public Page<Track> getTracks(Pageable pageable) - retrieves track data from the MongoDB database.
kinesis-consumer-sample/src/main/java/com/sample/kinesis/consumer/service/TrackService.java

    package com.sample.kinesis.consumer.service;

import java.util.List;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import com.sample.kinesis.consumer.model.Track;

public interface TrackService {

    public List<Track> addTrack(List<Track> trackDetails);

    public Page<Track> getTracks(Pageable pageable);

}
    

Create Service Implementation Class

Create a TrackServiceImpl.java class and implement the methods of TrackService inferface.

kinesis-consumer-sample/src/main/java/com/sample/kinesis/consumer/service/TrackServiceImpl.java
    
    package com.sample.kinesis.consumer.service;

import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.stereotype.Service;
import com.sample.kinesis.consumer.model.Track;
import com.sample.kinesis.consumer.repository.TrackRepository;

@Service
public class TrackServiceImpl implements TrackService {

    @Autowired
    private TrackRepository trackRepository;

    @Override
    public List addTrack(List trackDetails) {
        return trackRepository.saveAll(trackDetails);
    }

    @Override
    public Page getTracks(Pageable pageable) {
        return trackRepository.findAll(pageable);
    }

}
    
    

Implement ShardRecordProcessor Interface

Create a RecordProcessor.java Java class to implement ShardRecordProcessor interface of the KCL. The ShardRecordProcessor interface contains methods to initilize and process records from a Kinesis data stream.

kinesis-consumer-sample/src/main/java/com/sample/kinesis/consumer/kinesis/processor/RecordProcessor.java
    
    package com.sample.kinesis.consumer.kinesis.processor;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.sample.kinesis.consumer.dto.TrackDto;
import com.sample.kinesis.consumer.model.Track;
import com.sample.kinesis.consumer.service.TrackService;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

public class RecordProcessor implements ShardRecordProcessor {

    private static final Logger log = LoggerFactory.getLogger(RecordProcessor.class);

    @Autowired
    private TrackService trackService;

    @Override
    public void initialize(InitializationInput initializationInput) {
        log.info("Initialization complete");
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {

        // Data is read here from the Kinesis data stream
        for (KinesisClientRecord record : processRecordsInput.records()) {

            log.info("Processing Record For Partition Key : {}", record.partitionKey());

            String originalData = "";

            try {
                byte[] b = new byte[record.data().remaining()];
                ByteBuffer byteBuf = record.data().get(b);
                originalData = new String(byteBuf.array(), "UTF-8");

                log.info("Data from kinesis stream : {}", originalData);

                ObjectMapper objectMapper = new ObjectMapper();

                TrackDto[] tracks = objectMapper.readValue(originalData, TrackDto[].class);

                ListTrack> trackDetails = new ArrayList<>();

                for (TrackDto track : tracks) {

                    Track trackDetail = new Track();
                    trackDetail.setVehicleId(track.getVehicleId());
                    trackDetail.setDriverId(track.getDriverId());
                    trackDetail.setDriverName(track.getDriverName());
                    trackDetails.add(trackDetail);
                }

                trackService.addTrack(trackDetails);

            } catch (Exception e) {
                log.error("Error parsing record {}", e);
                System.exit(1);
            }

            try {
                /*
                 * KCL assumes that the call to checkpoint means that all records have been
                 * processed, records which are passed to the record processor.
                 */
                processRecordsInput.checkpointer().checkpoint();

            } catch (Exception e) {
                log.error("Error during Processing of records", e);
            }
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        log.error("LeaseLostInput {}", leaseLostInput);
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {

            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {

            e.printStackTrace();
        }
    }

}
    
    

Implement ShardRecordProcessorFactory Interface

Create a RecordProcessorFactory.java Java class to implement ShardRecordProcessorFactory interface of the KCL. The ShardRecordProcessorFactory interface contains one method that must be implemented by returning an instance of ShardRecordProcessor implementation class as shown in code below:

kinesis-consumer-sample/src/main/java/com/sample/kinesis/consumer/kinesis/processor/RecordProcessorFactory.java

package com.sample.kinesis.consumer.kinesis.processor;

import org.springframework.stereotype.Component;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

@ Component
public class RecordProcessorFactory implements ShardRecordProcessorFactory {

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new RecordProcessor();
    }

}

Create a Consumer Controller

Create ConsumerController.java class and annotate it with @RestController to make it a REST controller and @RequestMapping(value = "/api") to map requests to path /api. Inside this controller, we will create a GET method having path /tracks.

kinesis-consumer-sample/src/main/java/com/sample/kinesis/consumer/controller/ConsumerController.java
    
    package com.sample.kinesis.consumer.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.sample.kinesis.consumer.model.Track;
import com.sample.kinesis.consumer.service.TrackService;

@RestController
@RequestMapping(value = "/api")
public class ConsumerController {

    @Autowired
    private TrackService trackService;

    @GetMapping(value = "/tracks")
    public ResponseEntity<Page<Track>> getTracks(Pageable pageable) {
        return ResponseEntity.ok(trackService.getTracks(pageable));
    }
}
    

Create Consumer Configuration Class

Create a ConsumerConfig.java class to configure the Consumer.

kinesis-consumer-sample/src/main/java/com/sample/kinesis/consumer/kinesis/config/ConsumerConfig.java

package com.sample.kinesis.consumer.kinesis.config;

import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import com.sample.kinesis.consumer.kinesis.processor.RecordProcessorFactory;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;


@ Component
public class ConsumerConfig {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerConfig.class);

    @Value(value = "${aws.stream-name}")
    private String streamName;

    @Value(value = "${application.name}")
    private String applicationName;

    @Value(value = "${aws.region}")
    private String awsRegion;

    @Value(value = "${aws.access-key}")
    private String accessKey;

    @Value(value = "${aws.access-secret}")
    private String secretKey;

    public ConfigsBuilder getConfigBuilder() {
        logger.info("Getting client configucation");

        AwsCredentials awsCreds = AwsBasicCredentials.create(accessKey, secretKey);

        AwsCredentialsProvider awsCredential = StaticCredentialsProvider.create(awsCreds);

        Region region = Region.of(awsRegion);

        KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(
                KinesisAsyncClient.builder().credentialsProvider(awsCredential).region(region));

        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();

        CloudWatchAsyncClient cloudWatchClient =
                CloudWatchAsyncClient.builder().region(region).build();

        return new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient,
                cloudWatchClient, UUID.randomUUID().toString(), new RecordProcessorFactory());

    }

} 

Setting up Spring Boot Application Main Class

Next, we will implement CommandLineRunner interface on our Spring application main class. The CommandLineRunner interface has a run method that is called automatically after the application context is loaded. Within this run method, we will setup a Scheduler class (also called worker in earlier versions of the KCL). Ths Scheduler class is an entry point to the KCL.

kinesis-consumer-sample/src/main/java/com/sample/kinesis/consumer/KinesisConsumerSampleApplication.java
    
    package com.sample.kinesis.consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import com.sample.kinesis.consumer.kinesis.config.ConsumerConfig;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.coordinator.Scheduler;


@SpringBootApplication
public class KinesisConsumerSampleApplication implements CommandLineRunner {

    private static final Logger LOG =
            LoggerFactory.getLogger(KinesisConsumerSampleApplication.class);

    @Autowired
    private ApplicationContext context;

    public static void main(String[] args) {
        SpringApplication.run(KinesisConsumerSampleApplication.class, args);
    }


    @Override
    public void run(String... args) throws Exception {
        LOG.info("Running consumer application!");

        ConsumerConfig consumerConfig = context.getBean(ConsumerConfig.class);

        ConfigsBuilder configsBuilder = consumerConfig.getConfigBuilder();

        /**
         * The Scheduler is the entry point to the KCL. This instance is configured with defaults
         * provided by the ConfigsBuilder.
         */
        Scheduler scheduler =
                new Scheduler(configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(),
                        configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(),
                        configsBuilder.metricsConfig(), configsBuilder.processorConfig(),
                        configsBuilder.retrievalConfig().maxListShardsRetryAttempts(5)
                                .initialPositionInStreamExtended(InitialPositionInStreamExtended
                                        .newInitialPosition(InitialPositionInStream.TRIM_HORIZON)));

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();

    }
}
    

Run and Test the Consumer Application

The Consumer application is configured with a scheduler for checking and consuming data from the specified Kinesis data stream. As soon as there are data in the data stream, the consumer reads and saves them in the MongoDB database.

Use the REST GET method to fetch data from the MongoDB database.

Summary

Congratulations! you have learned how to build a Kinesis data stream Consumer application.