Writing Data to Amazon Kinesis Data Streams in Spring Boot

This tutorial is about how to write data to Amazon Kinesis Data Streams in Java Spring Boot. To achieve this, we need a producer application. A producer application is used to write data into Kinesis Data Streams.

So, in this tutorial, we will learn how to build a producer application using KPL (Kinesis Producer Library). The KPL plays a role of an interface between a producer application and the Kinesis Data Stream API. The advantage of using KPL is that it simplifies the development of the producer application and provides a configurable data writing and automatic retry mechanism that is capable of writing multiple records to multiple shards (A unique sequence of data records.) in each request using functions like PutRecords.

Follow the steps below to create a sample Kinesis Producer application:

  1. Configure Kinesis Stream Data:
    • Log in to the AWS Management Console and open the Amazon Kinesis Streams Management Console at https://console.aws.amazon.com/kinesis.
    • Choose Kinesis Data Streams and click Create data stream button as shown in the image below:
    • On the Create Data Stream page, do the following:
      • Enter a name of your choice in the Data stream name field. For example: tutorialsbuddy-stream.
      • Next, choose Capacity mode between On-demand or Provisioned. In On-demand mode, the data stream's capacity scales automatically, while in provisioned mode, your data stream's capacity is fixed. If you choose Provisioned mode, you will need to enter the number of shards in the Provisioned shards field. For example, you can started with 1 shard. Keep in mind that the more shards you use, the higher the cost. You can see an example in the image below:
      • Click the Create data stream button.
  2. Visit Spring Initializr website at https://start.spring.io.
  3. Create a Spring Boot application with details as follows:
    • Project: Choose the project type (Maven or Gradle).
    • Language: Set the language to Java.
    • Spring Boot: Specify the Spring Boot version. The default selection is the latest stable version of Spring Boot, so you can leave it unchanged.
    • Project Metadata: Enter a Group and Artifact name for your project. The group name is the id of the project. Artifact is the name of your project. Add any necessary project metadata (description, package name, etc.)
    • Choose between packaging as a JAR (Java Archive) or a WAR (Web Application Archive) depends on how you plan to deploy your Spring Boot application. Choose JAR packaging if you want a standalone executable JAR file and WAR packaging if you intend to deploy your application to a Java EE application server or servlet container. When you package your Spring Boot application as a JAR using JAR packaging, it includes an embedded web server, such as Tomcat, by default. This means that you don't need to separately deploy your application to an external Tomcat server. Instead, you can run the JAR file directly, and the embedded Tomcat server will start and serve your application.
    • Select the Java version based on the compatibility requirements of your project. Consider the specific needs of your project, any compatibility requirements, and the Java version supported by your target deployment environment when making these choices.
  4. Add project dependencies:
    • Click on the "Add Dependencies" button.
    • Choose the following dependencies: Spring Web, Lombok, and Spring Boot DevTools.

    For example:

  5. Generate the project:
    • Click on the "Generate" button.
    • Spring Initializr will generate a zip file containing your Spring Boot project.
  6. Download and extract the generated project:
    • Download the zip file generated by Spring Initializr.
    • Extract the contents of the zip file to a directory on your local machine.
  7. Import the project into your IDE:
    • Open your preferred IDE (IntelliJ IDEA, Eclipse, or Spring Tool Suite).
    • Import the extracted project as a Maven or Gradle project, depending on the build system you chose in Spring Initializr.
  8. Add Dependency:
  9. NOTE: The latest version of KCL does not support on Windows. If you are developing on Windows, you should downgrade the KPL version to 0.13.1.

    Add Amazon Kinesis Producer Library dependency from Maven repository:

    For Maven:

    Add the following dependency to your pom.xml file:

    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>amazon-kinesis-producer</artifactId>
        <version>0.15.7</version>
    </dependency>

    For Gradle:

    Add the following dependency to your build.gradle file:

    implementation group: 'com.amazonaws', name: 'amazon-kinesis-producer', version: '0.15.7'
  10. Add Configurations:
  11. Open the src/main/resources/application.properties file in your Eclipse editor and add the following configuration lines to the file:

    server.port = 8080
    
    #use your aws credentials here
    aws.access-key = your-access-key
    aws.access-secret-key = your-access-secret-key
    aws.region = us-east-1
    
    #use your stream name that you have created
    aws.stream-name = tutorialsbuddy-stream
  12. Configure Kinesis Producer:
  13. Create a bean for the KinesisProducer client in your Spring Boot application configuration class. You can use the @Configuration annotation to create a configuration class:

    package com.example.app.producer.config;
    
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import com.amazonaws.auth.AWSStaticCredentialsProvider;
    import com.amazonaws.auth.BasicAWSCredentials;
    import com.amazonaws.services.kinesis.producer.KinesisProducer;
    import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
    
    @Configuration
    public class ProducerConfig {
    
      @Value(value = "${aws.access-key}")
      private String awsAccessKey;
    
      @Value(value = "${aws.access-secret-key}")
      private String awsSecretKey;
    
      @Value(value = "${aws.region}")
      private String awsRegion;
    
      @Bean
      public KinesisProducer kinesisProducer() {
        BasicAWSCredentials awsCreds = new BasicAWSCredentials(awsAccessKey, awsSecretKey);
        KinesisProducerConfiguration config = new KinesisProducerConfiguration();
        config.setRegion(awsRegion);
        config.setCredentialsProvider(new AWSStaticCredentialsProvider(awsCreds));
        config.setMaxConnections(1);
        config.setRequestTimeout(6000); // 6 seconds
        config.setRecordMaxBufferedTime(5000); // 5 seconds
        return new KinesisProducer(config);
      }
    
    }
  14. Create DTO (Data Transfer Object) classes:
  15. Create a DTO class named RequestPayload that represents request body:

    package com.example.app.producer.dto;
    
    import lombok.Data;
    
    @Data
    public class RequestPayload {
    
      private String vehicleId;
      private String driverId;
      private double speed;
      private String coordinatesLatitude;
      private String coordinatesLongitude;
    
    }

    Create a DTO class named ResponsePayload that represents the response body:

    package com.example.app.producer.dto;
    
    import lombok.AllArgsConstructor;
    import lombok.Builder;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    @Data
    @Builder
    @AllArgsConstructor
    @NoArgsConstructor
    public class ResponsePayload {
    
      private int status;
      private String message;
      private boolean success;
      
    }
  16. Create a Service:
  17. Create a service interface named ProducerService that defines the contract for the producer:

    package com.example.app.producer.service;
    
    import com.example.app.producer.dto.RequestPayload;
    import com.example.app.producer.dto.ResponsePayload;
    
    public interface ProducerService {
    
      ResponsePayload write(RequestPayload requestPayload);
    
    }
  18. Create Service Implementation:
  19. Create an implementation class named ProducerServiceImpl that implements the ProducerService interface and handles the business logic:

    package com.example.app.producer.service.impl;
    
    import java.io.UnsupportedEncodingException;
    import java.nio.ByteBuffer;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.atomic.AtomicLong;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Service;
    import com.amazonaws.services.kinesis.producer.Attempt;
    import com.amazonaws.services.kinesis.producer.KinesisProducer;
    import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
    import com.amazonaws.services.kinesis.producer.UserRecordResult;
    import com.example.app.producer.dto.RequestPayload;
    import com.example.app.producer.dto.ResponsePayload;
    import com.example.app.producer.service.ProducerService;
    import com.fasterxml.jackson.core.JsonProcessingException;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.google.common.util.concurrent.FutureCallback;
    import com.google.common.util.concurrent.Futures;
    import com.google.common.util.concurrent.ListenableFuture;
    
    @Service
    public class ProducerServiceImpl implements ProducerService {
    
      private static final Logger LOG = LoggerFactory.getLogger(ProducerServiceImpl.class);
    
      @Value("${aws.stream-name}")
      private String streamName;
    
      @Autowired
      private ObjectMapper mapper;
    
      @Autowired
      private KinesisProducer kinesisProducer;
    
      // The number of records that have finished (either successfully put, or failed)
      final AtomicLong completed = new AtomicLong(0);
    
      private static final String TIMESTAMP_AS_PARTITION_KEY =
          Long.toString(System.currentTimeMillis());
    
    
      @Override
      public ResponsePayload write(RequestPayload requestPayload) {
    
        FutureCallback<UserRecordResult> myCallback = new FutureCallback<UserRecordResult>() {
          @Override
          public void onFailure(Throwable t) {
            /* Analyze and respond to the failure */
            int attempts = ((UserRecordFailedException) t).getResult().getAttempts().size() - 1;
            if (t instanceof UserRecordFailedException) {
              Attempt last = ((UserRecordFailedException) t).getResult().getAttempts().get(attempts);
              if (attempts > 1) {
                Attempt previous =
                    ((UserRecordFailedException) t).getResult().getAttempts().get(attempts - 1);
                LOG.error(String.format("Failed to put record - %s : %s. Previous failure - %s : %s",
                    last.getErrorCode(), last.getErrorMessage(), previous.getErrorCode(),
                    previous.getErrorMessage()));
              } else {
                LOG.error(String.format("Failed to put record - %s : %s.", last.getErrorCode(),
                    last.getErrorMessage()));
              }
    
            }
            LOG.error("Exception during put", t);
          };
    
          @Override
          public void onSuccess(UserRecordResult result) {
            /* Respond to the success */
            long totalTime =
                result.getAttempts().stream().mapToLong(a -> a.getDelay() + a.getDuration()).sum();
    
            LOG.info("Data writing success. Total time taken to write data = {}", totalTime);
    
            completed.getAndIncrement();
    
          };
        };
    
        String payload = null;
        try {
          payload = mapper.writeValueAsString(requestPayload);
        } catch (JsonProcessingException e) {
          LOG.error("Error while converting requestPayload to json string: {}", e.getMessage());
        }
        if (payload == null) {
          return ResponsePayload.builder().status(-1).message("requestPayload is empty").success(false)
              .build();
        }
    
        ByteBuffer data = null;
        try {
          data = ByteBuffer.wrap(payload.getBytes("UTF-8"));
        } catch (UnsupportedEncodingException e) {
          LOG.error("Error while converting payload into ByteBuffer {}", e.getMessage());
        }
        ListenableFuture<UserRecordResult> f =
            kinesisProducer.addUserRecord(streamName, TIMESTAMP_AS_PARTITION_KEY, data);
        kinesisProducer.flushSync();
        kinesisProducer.destroy();
        // If the Future is complete by the time we call addCallback, the callback will be invoked
        // immediately.
        final ExecutorService callbackThreadPool = Executors.newCachedThreadPool();
    
        Futures.addCallback(f, myCallback, callbackThreadPool);
    
        return ResponsePayload.builder().status(0).message("Successfully published data to Kinesis.")
            .success(true).build();
      }
    
    }
  20. Create a WebController:
  21. Create a controller class named ProducerController that will handle HTTP requests and interact with the ProducerService:

    package com.example.app.producer.controller;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.http.ResponseEntity;
    import org.springframework.web.bind.annotation.PostMapping;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    import com.example.app.producer.dto.RequestPayload;
    import com.example.app.producer.dto.ResponsePayload;
    import com.example.app.producer.service.ProducerService;
    
    @RestController
    @RequestMapping(path = "/producer")
    public class ProducerController {
    
      @Autowired
      private ProducerService producerService;
    
      @PostMapping(value = "/write")
      public ResponseEntity<ResponsePayload> write(@RequestBody RequestPayload requestPayload) {
    
        return ResponseEntity.ok(producerService.write(requestPayload));
      }
    }
    
  22. Build and run your Spring Boot application.
  23. Test your endpoints by using API testing tools such as Postman: