Create and Deploy a Streaming Application with Amazon Managed Streaming for Apache Kafka (MSK)

This guide describes how to create a streaming application that uses Micronaut Streaming. The application consists of two Micronaut microservices that use Amazon MSK to communicate with each other in an asynchronous and decoupled way.

Amazon Managed Streaming for Apache Kafka (MSK) securely streams data with a fully managed, highly available Apache Kafka service.

Prerequisites #

Follow the steps below to create the application from scratch. However, you can also download the completed example:

A note regarding your development environment

Consider using Visual Studio Code that provides native support for developing applications with the Graal Cloud Native extension.

If you use IntelliJ IDEA, enable annotation processing.

Windows platform: The GCN guides are compatible with Gradle only. Maven support is coming soon.

1. Create the Microservices #

The two microservices are:

  • Books returns a list of books. It uses a domain consisting of a book name and an International Standard Book Number (ISBN). It also publishes a message to the streaming service every time a book is accessed.
  • Analytics connects to the streaming service to update the analytics for every book (a counter). It also exposes an endpoint to retrieve the counter.

1.1. Create the Books Microservice #

  1. Open the GCN Launcher in advanced mode.

  2. Create a new project using the following selections.
    • Project Type: Application (Default)
    • Project Name: books
    • Base Package: com.example.publisher
    • Clouds: AWS
    • Language: Java (Default)
    • Build Tool: Gradle (Groovy) or Maven
    • Test Framework: JUnit (Default)
    • Java Version: 17 (Default)
    • Micronaut Version: (Default)
    • Cloud Services: Streaming
    • Features: Awaitility Framework, GraalVM Native Image, Micronaut Serialization Jackson Core, and Reactor
    • Sample Code: No
  3. Click Generate Project, then click Download Zip. The GCN Launcher creates an application with the default package com.example.publisher in a directory named books. The application ZIP file will be downloaded in your default downloads directory. Unzip it, open in your code editor, and proceed to the next steps.

Alternatively, use the GCN CLI as follows:

gcn create-app com.example.publisher.books \
    --clouds=aws \
    --services=streaming \
    --features=awaitility,graalvm,reactor,serialization-jackson \
    --build=gradle \
    --jdk=17 \
    --lang=java \
    --example-code=false
gcn create-app com.example.publisher.books \
    --clouds=aws \
    --services=streaming \
    --features=awaitility,graalvm,reactor,serialization-jackson \
    --build=maven \
    --jdk=17 \
    --lang=java \
    --example-code=false

1.1.1. Book Domain Class

The GCN Launcher created a Book domain class in a file named lib/src/main/java/com/example/publisher/Book.java, as follows:

package com.example.publisher;

import io.micronaut.core.annotation.Creator;
import io.micronaut.serde.annotation.Serdeable;

import java.util.Objects;

@Serdeable
public class Book {

    private final String isbn;
    private final String name;

    @Creator
    public Book(String isbn, String name) {
        this.isbn = isbn;
        this.name = name;
    }

    public String getIsbn() {
        return isbn;
    }

    public String getName() {
        return name;
    }

    @Override
    public String toString() {
        return "Book{" +
                "isbn='" + isbn + '\'' +
                ", name='" + name + '\'' +
                '}';
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        Book other = (Book) o;
        return Objects.equals(isbn, other.isbn) &&
                Objects.equals(name, other.name);
    }

    @Override
    public int hashCode() {
        return Objects.hash(isbn, name);
    }
}

1.1.2. BookService

To keep this guide simple there is no database persistence: the Books microservice keeps the list of books in memory. The GCN Launcher created a class named BookService in lib/src/main/java/com/example/publisher/BookService.java with the following contents:

package com.example.publisher;

import jakarta.annotation.PostConstruct;
import jakarta.inject.Singleton;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

@Singleton
public class BookService {

    private final List<Book> bookStore = new ArrayList<>();

    @PostConstruct
    void init() {
        bookStore.add(new Book("1491950358", "Building Microservices"));
        bookStore.add(new Book("1680502395", "Release It!"));
        bookStore.add(new Book("0321601912", "Continuous Delivery"));
    }

    public List<Book> listAll() {
        return bookStore;
    }

    public Optional<Book> findByIsbn(String isbn) {
        return bookStore.stream()
                .filter(b -> b.getIsbn().equals(isbn))
                .findFirst();
    }
}

1.1.3. BookController

The GCN Launcher created a controller to access Book instances in a file named lib/src/main/java/com/example/publisher/BookController.java with the following contents:

package com.example.publisher;

import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;

import java.util.List;
import java.util.Optional;

@Controller("/books") // <1>
class BookController {

    private final BookService bookService;

    BookController(BookService bookService) { // <2>
        this.bookService = bookService;
    }

    @Get // <3>
    List<Book> listAll() {
        return bookService.listAll();
    }

    @Get("/{isbn}") // <4>
    Optional<Book> findBook(String isbn) {
        return bookService.findByIsbn(isbn);
    }
}

1 The @Controller annotation defines the class as a controller mapped to the root URI /books.

2 Use constructor injection to inject a bean of type BookService.

3 The @Get annotation maps the listAll method to an HTTP GET request on /books.

4 The @Get annotation maps the findBook method to an HTTP GET request on /books/{isbn}.

1.1.4. BookControllerTest

The GCN Launcher created a test for BookController to verify the interaction with the Analytics microservice in a file named aws/src/test/java/com/example/publisher/BookControllerTest.java with the following contents:

package com.example.publisher;

import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.client.HttpClient;
import io.micronaut.http.client.annotation.Client;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;

import jakarta.inject.Inject;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedDeque;

import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;

@MicronautTest
@TestInstance(PER_CLASS) // <1>
class BookControllerTest {

    private static final Collection<Book> received = new ConcurrentLinkedDeque<>();

    @Inject
    AnalyticsListener analyticsListener; // <2>

    @Inject
    @Client("/")
    HttpClient client; // <3>

    @Test
    void testMessageIsPublishedToKafkaWhenBookFound() {
        String isbn = "1491950358";

        Optional<Book> result = retrieveGet("/books/" + isbn); // <4>
        assertNotNull(result);
        assertTrue(result.isPresent());
        assertEquals(isbn, result.get().getIsbn());

        await().atMost(5, SECONDS).until(() -> !received.isEmpty()); // <5>

        assertEquals(1, received.size()); // <6>
        Book bookFromKafka = received.iterator().next();
        assertNotNull(bookFromKafka);
        assertEquals(isbn, bookFromKafka.getIsbn());
    }

    @Test
    void testMessageIsNotPublishedToKafkaWhenBookNotFound() throws Exception {
        assertThrows(HttpClientResponseException.class, () -> {
            retrieveGet("/books/INVALID");
        });

        Thread.sleep(5_000); // <7>
        assertEquals(0, received.size());
    }

    @AfterEach
    void cleanup() {
        received.clear();
    }

    @KafkaListener(offsetReset = EARLIEST)
    static class AnalyticsListener {

        @Topic("analytics")
        void updateAnalytics(Book book) {
            received.add(book);
        }
    }

    private Optional<Book> retrieveGet(String url) {
        return client.toBlocking().retrieve(HttpRequest.GET(url), Argument.of(Optional.class, Book.class));
    }
}

1 Classes that implement TestPropertyProvider must use this annotation to create a single class instance for all tests.

2 Dependency injection for the AnalyticsListener class declared later in the file. This is a listener class that replicates the functionality of the class of the same name in the Analytics microservice.

3 Dependency injection for an HTTP client that the Micronaut framework will implement at compile to make calls to BookController.

4 Use the HttpClient to retrieve the details of a Book, which will trigger sending a message.

5 Wait a few seconds for the message to arrive; it should happen very quickly, but the message will be sent on a separate thread.

6 Verify that the message was received and that it has the correct data.

7 Wait a few seconds to ensure no message is received.

1.1.5. AnalyticsClient

The GCN Launcher created a client interface to send messages to the streaming service in a file named lib/src/main/java/com/example/publisher/AnalyticsClient.java with the contents shown below. (Micronaut generates an implementation for the client interface at compilation time.)

package com.example.publisher;

import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.Topic;
import reactor.core.publisher.Mono;

@KafkaClient
public interface AnalyticsClient {

    @Topic("analytics") // <1>
    Mono<Book> updateAnalytics(Book book); // <2>
}

1 Set the name of the topic. This matches the name of the topic used by AnalyticsListener in the Analytics microservice.

Note: This must match the name of the stream that you will create later in Oracle Cloud Infrastructure.

2 Send the Book POJO. Micronaut will automatically convert it to JSON before sending it.

1.1.6. AnalyticsFilter

Sending a message to the streaming service is as simple as injecting AnalyticsClient and calling its updateAnalytics method. The goal is to send a message every time the details of a book are returned from the Books microservice or, in other words, every time there is a call to http://localhost:8080/books/{isbn}. To achieve this, the GCN Launcher created an Http Server Filter in a file named lib/src/main/java/com/example/publisher/AnalyticsFilter.java as follows:

package com.example.publisher;

import io.micronaut.http.HttpRequest;
import io.micronaut.http.MutableHttpResponse;
import io.micronaut.http.annotation.Filter;
import io.micronaut.http.filter.HttpServerFilter;
import io.micronaut.http.filter.ServerFilterChain;
import reactor.core.publisher.Flux;
import org.reactivestreams.Publisher;

@Filter("/books/?*") // <1>
class AnalyticsFilter implements HttpServerFilter { // <2>

    private final AnalyticsClient analyticsClient;

    AnalyticsFilter(AnalyticsClient analyticsClient) { // <3>
        this.analyticsClient = analyticsClient;
    }

    @Override
    public Publisher<MutableHttpResponse<?>> doFilter(HttpRequest<?> request,
                                                      ServerFilterChain chain) { // <4>
        return Flux
                .from(chain.proceed(request)) // <5>
                .flatMap(response -> {
                    Book book = response.getBody(Book.class).orElse(null); // <6>
                    if (book == null) {
                        return Flux.just(response);
                    }
                    return Flux.from(analyticsClient.updateAnalytics(book)).map(b -> response); // <7>
                });
    }
}

1 Annotate the class with @Filter and define the Ant-style matcher pattern to intercept all calls to the desired URIs.

2 The class must implement HttpServerFilter.

3 Dependency injection for AnalyticsClient.

4 Implement the doFilter method.

5 Call the request; this will invoke the controller action.

6 Get the response from the controller and return the body as an instance of the Book class.

7 If the book is retrieved, use the client to send a message.

1.1.7. Test the Microservice

Use the following command to test the Books microservice:

./gradlew :aws:test
./mvnw install -pl lib -am
./mvnw package -pl aws -DskipTests
./mvnw test

1.2. Create the Analytics Microservice #

  1. Open the GCN Launcher in advanced mode.

  2. Create a new project using the following selections.
    • Project Type: Application (Default)
    • Project Name: analytics
    • Base Package: com.example.consumer
    • Clouds: AWS
    • Language: Java (Default)
    • Build Tool: Gradle (Groovy) or Maven
    • Test Framework: JUnit (Default)
    • Java Version: 17 (Default)
    • Micronaut Version: (Default)
    • Cloud Services: Streaming
    • Features: Awaitility Framework, GraalVM Native Image and Micronaut Serialization Jackson Core
    • Sample Code: No
  3. Click Generate Project, then click Download Zip. The GCN Launcher creates an application with the default package com.example.consumer in a directory named analytics. The application ZIP file will be downloaded in your default downloads directory. Unzip it, open in your code editor, and proceed to the next steps.

Alternatively, use the GCN CLI as follows:

gcn create-app com.example.consumer.analytics \
    --clouds=aws \
    --services=streaming \
    --features=awaitility,graalvm,serialization-jackson \
    --build=gradle \
    --jdk=17 \
    --lang=java \
    --example-code=false
gcn create-app com.example.consumer.analytics \
    --clouds=aws \
    --services=streaming \
    --features=awaitility,graalvm,serialization-jackson \
    --build=maven \
    --jdk=17 \
    --lang=java \
    --example-code=false

1.2.1. Domain Classes

The GCN Launcher created a Book domain class in a file named lib/src/main/java/com/example/consumer/Book.java, as shown below. (This Book POJO is the same as the one in the Books microservice. In a real application this would be in a shared library.)

package com.example.consumer;

import io.micronaut.core.annotation.Creator;
import io.micronaut.serde.annotation.Serdeable;

import java.util.Objects;

@Serdeable
public class Book {

    private final String isbn;
    private final String name;

    @Creator
    public Book(String isbn, String name) {
        this.isbn = isbn;
        this.name = name;
    }

    public String getIsbn() {
        return isbn;
    }

    public String getName() {
        return name;
    }

    @Override
    public String toString() {
        return "Book{" +
                "isbn='" + isbn + '\'' +
                ", name='" + name + '\'' +
                '}';
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        Book other = (Book) o;
        return Objects.equals(isbn, other.isbn) &&
                Objects.equals(name, other.name);
    }

    @Override
    public int hashCode() {
        return Objects.hash(isbn, name);
    }
}

The GCN Launcher also created a BookAnalytics domain class in a file named lib/src/main/java/com/example/consumer/BookAnalytics.java, as follows:

package com.example.consumer;

import io.micronaut.core.annotation.Creator;
import io.micronaut.serde.annotation.Serdeable;

@Serdeable
public class BookAnalytics {

    private final String bookIsbn;
    private final long count;

    @Creator
    public BookAnalytics(String bookIsbn, long count) {
        this.bookIsbn = bookIsbn;
        this.count = count;
    }

    public String getBookIsbn() {
        return bookIsbn;
    }

    public long getCount() {
        return count;
    }
}

1.2.2. AnalyticsService

To keep this guide simple there is no database persistence: the Analytics microservice keeps book analytics in memory. The GCN Launcher created a class named AnalyticsService in lib/src/main/java/com/example/consumer/AnalyticsService.java with the following contents:

package com.example.consumer;

import jakarta.inject.Singleton;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

@Singleton
public class AnalyticsService {

    private final Map<Book, Long> bookAnalytics = new ConcurrentHashMap<>(); // <1>

    public void updateBookAnalytics(Book book) { // <2>
        bookAnalytics.compute(book, (k, v) -> {
            return v == null ? 1L : v + 1;
        });
    }

    public List<BookAnalytics> listAnalytics() { // <3>
        return bookAnalytics
                .entrySet()
                .stream()
                .map(e -> new BookAnalytics(e.getKey().getIsbn(), e.getValue()))
                .collect(Collectors.toList());
    }
}

1 Keep the book analytics in memory.

2 Initialize and update the analytics for the book passed as parameter.

3 Return all the analytics.

1.2.3. AnalyticsServiceTest

The GCN Launcher created a test for the AnalyticsService class, in a file named aws/src/test/java/com/example/consumer/AnalyticsServiceTest.java, as follows:

package com.example.consumer;

import static org.junit.jupiter.api.Assertions.assertEquals;

import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.Test;

import jakarta.inject.Inject;
import java.util.List;

@MicronautTest
class AnalyticsServiceTest {

    @Inject
    AnalyticsService analyticsService;

    @Test
    void testUpdateBookAnalyticsAndGetAnalytics() {
        Book b1 = new Book("1491950358", "Building Microservices");
        Book b2 = new Book("1680502395", "Release It!");

        analyticsService.updateBookAnalytics(b1);
        analyticsService.updateBookAnalytics(b1);
        analyticsService.updateBookAnalytics(b1);
        analyticsService.updateBookAnalytics(b2);

        List<BookAnalytics> analytics = analyticsService.listAnalytics();
        assertEquals(2, analytics.size());

        assertEquals(3, findBookAnalytics(b1, analytics).getCount());
        assertEquals(1, findBookAnalytics(b2, analytics).getCount());
    }

    private BookAnalytics findBookAnalytics(Book b, List<BookAnalytics> analytics) {
        return analytics
                .stream()
                .filter(bookAnalytics -> bookAnalytics.getBookIsbn().equals(b.getIsbn()))
                .findFirst()
                .orElseThrow(() -> new RuntimeException("Book not found"));
    }
}

1.2.4. AnalyticsController

The GCN Launcher created a Controller to create an endpoint for the Analytics microservice in a file named lib/src/main/java/com/example/consumer/AnalyticsController.java, as follows:

package com.example.consumer;

import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;

import java.util.List;

@Controller("/analytics") // <1>
class AnalyticsController {

    private final AnalyticsService analyticsService;

    AnalyticsController(AnalyticsService analyticsService) {
        this.analyticsService = analyticsService;
    }

    @Get // <2>
    List<BookAnalytics> listAnalytics() {
        return analyticsService.listAnalytics();
    }
}

1 The @Controller annotation defines the class as a controller mapped to the root URI /analytics.

2 The @Get annotation maps the listAnalytics method to an HTTP GET request on /analytics.

The application doesn’t expose the method updateBookAnalytics created in AnalyticsService. This method will be invoked when reading messages from Kafka.

1.2.5. AnalyticsListener

The GCN Launcher created a class to act as a consumer of the messages sent to the streaming service by the Books microservice. The Micronaut framework implements logic to invoke the consumer at compile time. The AnalyticsListener class is in a file named lib/src/main/java/com/example/consumer/AnalyticsListener.java, as follows:

package com.example.consumer;

import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.env.Environment;

@Requires(notEnv = Environment.TEST) // <1>
@KafkaListener // <2>
class AnalyticsListener {

    private final AnalyticsService analyticsService; // <3>

    AnalyticsListener(AnalyticsService analyticsService) { // <3>
        this.analyticsService = analyticsService;
    }

    @Topic("analytics") // <4>
    void updateAnalytics(Book book) {
        analyticsService.updateBookAnalytics(book); // <5>
    }
}

1 Do not load this bean in the test environment: you can run tests without access to a streaming service.

2 Annotate the class with @KafkaListener to indicate that this bean consumes messages from Kafka.

3 Constructor injection for AnalyticsService.

4 Annotate the method with @Topic and specify the topic name. This matches the name of the topic used by AnalyticsClient in the Books microservice.

Note: This must match the name of the stream that you will create later in Oracle Cloud Infrastructure. (See section 2.4.)

5 Call AnalyticsService to update the analytics for the book.

1.2.6. Test the Microservice

Use the following command to test the Analytics microservice:

./gradlew :aws:test
./mvnw install -pl lib -am
./mvnw package -pl aws -DskipTests
./mvnw test

1.2.7. Change the Port of the Analytics Microservice

The Books and Analytics microservices are both run on the same AWS EC2 instance, so they must run on different ports. Change the port that Analytics runs on by editing its aws/src/main/resources/application.properties to include the following configuration:

micronaut.server.port=8081

2. Run the Microservices #

2.1. Start the Books Microservice #

To run the Books microservice, use the following command, which starts the application on port 8080.

./gradlew :aws:run
./mvnw install -pl lib -am
./mvnw -pl aws mn:run

2.2. Start the Analytics Microservice #

To run the Analytics microservice, use the following command, which starts the application on port 8081.

./gradlew :aws:run
./mvnw install -pl lib -am
./mvnw -pl aws mn:run

2.3. Test the Microservices #

Use curl to test the microservices, as follows.

  1. Retrieve the list of books:

     curl http://localhost:8080/books
    
     [{"isbn":"1491950358","name":"Building Microservices"},{"isbn":"1680502395","name":"Release It!"},{"isbn":"0321601912","name":"Continuous Delivery"}]
    
  2. Retrieve the details of a specified book:

     curl http://localhost:8080/books/1491950358
    
     {"isbn":"1491950358","name":"Building Microservices"}
    
  3. Retrieve the analytics:

     curl http://localhost:8081/analytics
    
     [{"bookIsbn":"1491950358","count":1}]
    

Update the curl command to the Books microservice to retrieve other books and repeat the invocations, then re-run the curl command to the Analytics microservice to see that the counts increase.

3. Set up AWS Resources #

Start with creating an administrator account, then set up the networking, the Kafka cluster, and configure the AWS EC2 instance.

Note: In the instructions that follow, Windows users should replace the export keyword with set.

3.1. Create an Administrator Account #

Instead of using your AWS root account, use an administrator account. If you do not have one already, see Setting up Your Cloud Accounts.

3.2. Create a Virtual Private Cloud, an Internet Gateway, and a Route Table #

  1. Create a virtual private cloud using the following command:

     aws ec2 create-vpc --cidr-block 10.0.0.0/16
    

    Copy the value of the Vpc.VpcId property and export it for later use, as follows:

     export VPC_ID=<replace_with_the_copied_value>
    
  2. Create an internet gateway using the following command:

     aws ec2 create-internet-gateway
    

    Copy the value of the InternetGateway.InternetGatewayId property and export it for later use, as follows:

     export IG_ID=<replace_with_the_copied_value>
    
  3. Configure route tables using the following commands:

     aws ec2 attach-internet-gateway --internet-gateway-id $IG_ID --vpc-id $VPC_ID
     aws ec2 modify-vpc-attribute --enable-dns-hostnames --vpc-id $VPC_ID
     aws ec2 describe-route-tables --filters "Name=vpc-id,Values=$VPC_ID"
    

    Copy the value of the RouteTables[].RouteTableId property and export it for later use, as follows:

     export RT_ID=<replace_with_the_copied_value>
    
  4. Finally, create the route table using the following command:

     aws ec2 create-route \
         --route-table-id $RT_ID \
         --destination-cidr-block 0.0.0.0/0 \
         --gateway-id $IG_ID
    

3.3. Create Subnets #

  1. Export the values of the AWS availability zones using the following commands:

     export AZ_0=$(aws ec2 describe-availability-zones \
         --filters "Name=state,Values=available" \
         --query "AvailabilityZones[0].ZoneName" \
         --output text)
     export AZ_1=$(aws ec2 describe-availability-zones \
         --filters "Name=state,Values=available" \
         --query "AvailabilityZones[1].ZoneName" \
         --output text)
     export AZ_2=$(aws ec2 describe-availability-zones \
         --filters "Name=state,Values=available" \
         --query "AvailabilityZones[2].ZoneName" \
         --output text)
    
  2. Create three subnets using the following commands:

     aws ec2 create-subnet \
         --vpc-id $VPC_ID \
         --cidr-block 10.0.0.0/24 \
         --availability-zone $AZ_0
    

    Copy the value of the Subnet.SubnetId property and export it for later use, as follows:

     export SN0_ID=<replace_with_the_copied_value>
    
     aws ec2 create-subnet \
         --vpc-id $VPC_ID \
         --cidr-block 10.0.1.0/24 \
         --availability-zone $AZ_1
    

    Copy the value of the Subnet.SubnetId property and export it for later use, as follows:

     export SN1_ID=<replace_with_the_copied_value>
    
     aws ec2 create-subnet \
         --vpc-id $VPC_ID \
         --cidr-block 10.0.2.0/24 \
         --availability-zone $AZ_2
    

    Copy the value of the Subnet.SubnetId property and export it for later use, as follows:

     export SN2_ID=<replace_with_the_copied_value>
    

3.4. Configure a MSK Kafka Cluster #

  1. Create a new file named aws_msk_cluster_broker_info.json with the following contents. Replace the ClientSubnets values with the IDs of your subnets.

     {
       "InstanceType": "kafka.m5.xlarge",
       "BrokerAZDistribution": "DEFAULT",
       "ClientSubnets": [
         "<replace_with_the_subnet_id_0>",
         "<replace_with_the_subnet_id_1>",
         "<replace_with_the_subnet_id_2>"
       ]
     }
    
  2. Create a new file named aws_msk_cluster_auth_info.json with the following contents:

     {
       "Sasl": {
         "Iam": {
           "Enabled": true
         }
       }
     }
    
  3. Create a Kafka cluster using the following command:

     export CLUSTER_ARN=$(aws kafka create-cluster \
         --cluster-name "AWSKafkaGCNGuide" \
         --broker-node-group-info file://aws_msk_cluster_broker_info.json \
         --client-authentication  file://aws_msk_cluster_auth_info.json \
         --kafka-version "3.5.1" \
         --number-of-broker-nodes 3 | jq -r '.ClusterArn')
    

    Note: You may have to wait up to 40 minutes before the cluster is in the ACTIVE state.

  4. Get and copy the BootstrapBrokerStringSaslIam, which you are going to use later.

     export KAFKA_BOOTSTRAP_SERVERS=$(aws kafka get-bootstrap-brokers --cluster-arn $CLUSTER_ARN | jq -r '.BootstrapBrokerStringSaslIam')
    

    Note: This command does not return results while cluster is in the CREATING state.

3.5. Configure Network #

  1. Associate route table type for one subnet, to make it publicly available, using the following command:

     aws ec2 associate-route-table --subnet-id $SN0_ID --route-table-id $RT_ID
    
  2. Map the public IP to the subnet using the following command:

     aws ec2 modify-subnet-attribute --subnet-id $SN0_ID --map-public-ip-on-launch
    
  3. Create a security group using the following command:

     aws ec2 create-security-group \
         --group-name gcn-guides-streaming-sg \
         --description "Security Group for the GCN Streaming guide" \
         --vpc-id $VPC_ID
    

    Copy the GroupId value and export it for later use, as follows:

     export SG_ID=<replace_with_the_copied_group_id_value>
    
  4. Create ingress rules to be able to connect to the instance via SSH, using the following command:

     aws ec2 authorize-security-group-ingress \
         --group-id $SG_ID \
         --protocol tcp \
         --port 22 \
         --cidr 0.0.0.0/0
    
  5. To be able to issue requests to the Books and Analytics microservices, run the following commands:

     aws ec2 authorize-security-group-ingress \
         --group-id $SG_ID \
         --protocol tcp \
         --port 8080 \
         --cidr 0.0.0.0/0
     aws ec2 authorize-security-group-ingress \
         --group-id $SG_ID \
         --protocol tcp \
         --port 8081 \
         --cidr 0.0.0.0/0
    

3.6. Launch an EC2 Instance #

  1. Generate and prepare an SSH key to be uploaded, using the following command:

     aws ec2 create-key-pair \
         --key-name AWS-Keypair \
         --query "KeyMaterial" \
         --output text > "AWS_Keypair.pem"
    
  2. Get the most current Amazon Machine Image (AMI) ID for your region:

    aws ec2 describe-images \
       --owners amazon \
       --filters "Name=name,Values=amzn*gp2" "Name=virtualization-type,Values=hvm" \
       --query "sort_by(Images, &CreationDate)[-1].ImageId" \
       --output text
    

    Note: Copy the resulting AMI ID for later use.

  3. Launch an EC2 instance, as follows:

     aws ec2 run-instances \
         --image-id <replace_with_the_copied_ami_image_id_value> \
         --count 1 \
         --instance-type t2.large \
         --key-name AWS-Keypair \
         --subnet-id $SN0_ID \
         --block-device-mappings '[{"DeviceName":"/dev/xvda","Ebs":{"VolumeSize":8}}]'
    
  4. Copy the value of the Instances.InstanceId property and export it for later use, as follows:

     export INST_ID=<replace_with_the_copied_value>
    
  5. To verify and get instance details, run the following command:

     aws ec2 describe-instances --instance-id $INST_ID
    
  6. Copy the value of the Reservations.Instances.PublicIpAddress property and export it for later use, as follows:

     export PIPA=<replace_with_the_copied_value>
    
  7. There is a default security group assigned to the EC2 instance. However, you must add the custom security group that you created earlier. Run the following command to retrieve the existing security groups:

     export CURRENT_SG_IDS=$(aws ec2 describe-instances \
         --instance-ids $INST_ID \
         --query "Reservations[*].Instances[*].SecurityGroups[*].GroupId" \
         --output text)
    
  8. Use the following commands to print the values of the existing security groups and the group you created earlier. Create a new set by combining the values of each group. (The set must contain distinct entries.) Each entry must be enclosed by double quotes and separated by whitespace (for example, “sg-0f8eb8d4b8f66503f” “sg-0814ae2f87013d7f7”):

     echo $SG_ID
     echo $CURRENT_SG_IDS
    
  9. Add the set of security groups to your EC2 instance, as follows:

     aws ec2 modify-instance-attribute \
         --instance-id $INST_ID \
         --groups "$SG_ID" "$CURRENT_SG_IDS"
    

Note: You will use this EC2 instance to install the Kafka client library and to deploy Books and Analytics microservices.

3.7. Configure Role and Policy #

Add the role and the policy for the EC2 instance to be able to interact with the Amazon MSK cluster.

  1. Create a file named msk-gcn-guide-policy.json with the following contents:

     {
       "Version": "2012-10-17",
       "Statement": [
         {
           "Effect": "Allow",
           "Action": [
             "kafka-cluster:Connect",
             "kafka-cluster:AlterCluster",
             "kafka-cluster:DescribeCluster"
           ],
           "Resource": [
             "arn:aws:kafka:<region>:<Account-ID>:cluster/AWSKafkaGCNGuide/*"
           ]
         },
         {
           "Effect": "Allow",
           "Action": [
             "kafka-cluster:*Topic*",
             "kafka-cluster:WriteData",
             "kafka-cluster:ReadData"
           ],
           "Resource": [
             "arn:aws:kafka:<region>:<Account-ID>:topic/AWSKafkaGCNGuide/*"
           ]
         },
         {
           "Effect": "Allow",
           "Action": [
             "kafka-cluster:AlterGroup",
             "kafka-cluster:DescribeGroup"
           ],
           "Resource": [
             "arn:aws:kafka:<region>:<Account-ID>:group/AWSKafkaGCNGuide/*"
           ]
         }
       ]
     }
    

    Replace <region> with the code of the Amazon Web Services Region where you created your cluster. Replace <Account-ID> with your account ID.

  2. Create a new file named msk-gcn-guide-role-trust-policy.json with the following contents:

     {
       "Version": "2012-10-17",
       "Statement": [
         {
           "Effect": "Allow",
           "Principal": {
             "Service": "ec2.amazonaws.com"
           },
           "Action": "sts:AssumeRole"
         }
       ]
     }
    
  3. Create a role named “msk-gcn-guide-role” with the trust policy, as follows:

     aws iam create-role \
         --role-name msk-gcn-guide-role \
         --assume-role-policy-document file://msk-gcn-guide-role-trust-policy.json
    
  4. Attach an inline policy to the role using the following command:

     aws iam put-role-policy \
         --role-name msk-gcn-guide-role \
         --policy-name msk-gcn-guide-policy \
         --policy-document file://msk-gcn-guide-policy.json
    
  5. Create an IAM instance profile. The instance profile allows EC2 to pass the IAM role. Use the following commands:

     aws iam create-instance-profile \
         --instance-profile-name msk-gcn-guide-role-instance-profile
     aws iam add-role-to-instance-profile \
         --role-name msk-gcn-guide-role \
         --instance-profile-name msk-gcn-guide-role-instance-profile
    
  6. Attach the IAM role to an existing EC2 instance, as follows:

     aws ec2 associate-iam-instance-profile \
         --instance-id $INST_ID \
         --iam-instance-profile Name=msk-gcn-guide-role-instance-profile
    
  7. You can verify that the IAM role is now attached to the instance using the following command:

     aws ec2 describe-iam-instance-profile-associations
    

3.8. Install and Configure a Kafka Client #

Install and configure a Kafka client library on the EC2 instance.

Note: Apache Kafka version numbers used in this guide are examples only. When you need to connect to your Amazon MSK cluster using the Apache Kafka client, to create or change a topic configuration for example, ensure that the Apache Kafka client version you’re using matches your Amazon MSK cluster version. Be aware that using an Apache Kafka client version that is not the same as your MSK cluster version may lead to Apache Kafka data corruption, loss, and down time.

  1. Use SSH to connect the client EC2 instance, using the following command:

     ssh -i /path/AWS_Keypair.pem ec2-user@$PIPA
    
  2. Once connected, install GraalVM for Java 17. See Download Oracle GraalVM.

  3. Export the MSK version, as follows:

     export MSK_VERSION=3.5.1
    
  4. Run the following command to download an Apache Kafka client:

     wget https://archive.apache.org/dist/kafka/$MSK_VERSION/kafka_2.12-$MSK_VERSION.tgz
    
  5. Run the following command in the home directory where you downloaded the file in the previous step:

     tar -xzf kafka_2.12-$MSK_VERSION.tgz
    
  6. Go to the Kafka client directory, as follows:

     cd kafka_2.12-$MSK_VERSION/bin
    
  7. Enable the IAM SASL mechanism to be able to connect to MSK from the client EC2 instance by creating a file named client.properties with the following contents:

     security.protocol=SASL_SSL
     sasl.mechanism=AWS_MSK_IAM
     sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
     sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
    
  8. Go to the home directory:

     cd ~
    
  9. Download the aws-msk-iam-auth library:

     wget https://github.com/aws/aws-msk-iam-auth/releases/download/v1.1.9/aws-msk-iam-auth-1.1.9-all.jar
    
  10. Add the aws-msk-iam-auth library to the Kafka client classpath:

    cp aws-msk-iam-auth-1.1.9-all.jar kafka_2.12-$MSK_VERSION/libs
    

3.9. Create a Kafka Topic #

  1. Export the bootstrap string which you saved earlier when you created the MSK cluster:

     export KAFKA_BOOTSTRAP_SERVERS=<replace_with_your_bootstrap_string>
    
  2. Go to the Kafka client bin/ directory:

     cd kafka_2.12-$MSK_VERSION/bin
    
  3. Create a Kafka topic named “analytics”:

     ./kafka-topics.sh --bootstrap-server $KAFKA_BOOTSTRAP_SERVERS \
         --command-config client.properties \
         --create \
         --topic analytics \
         --partitions 1 \
         --replication-factor 3
    

4. Configure Microservices #

  1. Edit the file named aws/src/main/resources/application-ec2.properties for the Books microservice so that it matches the following contents. (The Micronaut framework applies this configuration file only for the ec2 environment.)

    kafka.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
    kafka.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
    kafka.sasl.mechanism=AWS_MSK_IAM
    kafka.security.protocol=SASL_SSL
    
  2. Edit the file named aws/src/main/resources/application-ec2.properties for the Analytics microservice so that it matches the following contents. (The Micronaut framework applies this configuration file only for the ec2 environment.)

    kafka.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
    kafka.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
    kafka.sasl.mechanism=AWS_MSK_IAM
    kafka.security.protocol=SASL_SSL
    
  3. After you deploy your applications to the AWS EC2 instance, make sure that the following environment variable is exported; Micronaut will set the kafka.bootstrap.servers configuration property from its value:

     export KAFKA_BOOTSTRAP_SERVERS=<use the bootstrap string which you saved earlier when MSK cluster was created and promoted to the ACTIVE state>
    
  4. Add the runtime-only aws-msk-iam-auth dependency to be able to authenticate the Books and Analytics microservices with the AWS MSK cluster. This is the same library that you have already added for the Kafka command line client to be able to create topics.

    build.gradle

    runtimeOnly("software.amazon.msk:aws-msk-iam-auth:1.1.9")

    pom.xml

    <dependency>
         <groupId>software.amazon.msk</groupId>
         <artifactId>aws-msk-iam-auth</artifactId>
         <version>1.1.9</version>
         <scope>runtime</scope>
     </dependency>

5. Generate a Native Executable Using GraalVM #

GCN supports compiling a Java application ahead-of-time into a native executable using GraalVM Native Image. You can use the Gradle plugin for GraalVM Native Image building/Maven plugin for GraalVM Native Image building. Packaged as a native executable, it significantly reduces application startup time and memory footprint.

To generate a native executable, run the following command for each microservice:

./gradlew :aws:nativeCompile

The native executable is created in the aws/build/native/nativeCompile/ directory and can be run with the following command.

MICRONAUT_ENVIRONMENTS=ec2 aws/build/native/nativeCompile/aws
./mvnw install -pl lib -am
./mvnw package -pl aws -Dpackaging=native-image

The native executable is created in the aws/target/ directory and can be run with the following command:

MICRONAUT_ENVIRONMENTS=ec2 aws/target/aws

Start the native executables for the two microservices and run the same curl requests as before to check that everything works as expected.

You can see that the microservices behave identically as if you run them from JAR files, but with reduced startup time and smaller memory footprint.

6. Deploy the Books Microservice to AWS Cloud #

  1. Ensure that the private key you downloaded has the correct permissions, as follows:

     chmod 400 /path/to/AWS_Keypair.pem
    
  2. Create a JAR file containing all the microservice’s dependencies, as follows:

    ./gradlew :aws:shadowJar
    ./mvnw install -pl lib -am
    ./mvnw package -pl aws -DskipTests
  3. Copy the JAR file to your EC2 instance, as follows:

    scp -i /path/to/AWS_Keypair.pem aws/build/libs/aws-1.0-SNAPSHOT-all.jar ec2-user@$PIPA:/home/ec2-user/books_application.jar
    scp -i /path/to/AWS_Keypair.pem aws/target/aws-1.0-SNAPSHOT.jar ec2-user@$PIPA:/home/ec2-user/books_application.jar
  4. Connect to the EC2 instance:

     ssh -i /path/to/AWS_Keypair.pem ec2-user@$PIPA
    
  5. Make sure that the following environment variable is exported; Micronaut will set the kafka.bootstrap.servers configuration property from its value:

     export KAFKA_BOOTSTRAP_SERVERS=<use the bootstrap string which you saved earlier when MSK cluster was created and promoted to the ACTIVE state>
    
  6. Start the Books microservice, as follows:

     MICRONAUT_ENVIRONMENTS=ec2 java -jar books_application.jar
    
  7. Verify that the application is running by invoking the controller at http://[EC2_PUBLIC_IP]:8080/books using curl:

     curl -i http://$PIPA:8080/books
    
  8. Invoke the controller endpoint to trigger a message to be published to the Streaming service. You can test other ISBNs as well.

     curl -i http://$PIPA:8080/books/1491950358
    

7. Deploy the Analytics Microservice to AWS Cloud #

  1. Create a JAR file containing all the microservice’s dependencies, as follows:

    ./gradlew :aws:shadowJar
    ./mvnw install -pl lib -am
    ./mvnw package -pl aws -DskipTests
  2. Copy the JAR file to your EC2 instance, as follows:

    scp -i /path/to/AWS_Keypair.pem aws/build/libs/aws-1.0-SNAPSHOT-all.jar ec2-user@$PIPA:/home/ec2-user/analytics_application.jar
    scp -i /path/to/AWS_Keypair.pem aws/target/aws-1.0-SNAPSHOT.jar ec2-user@$PIPA:/home/ec2-user/analytics_application.jar
  3. Connect to the EC2 instance:

     ssh -i /path/to/AWS_Keypair.pem ec2-user@$PIPA
    
  4. Make sure that the following environment variable is exported; Micronaut will set the kafka.bootstrap.servers configuration property from its value:

     export KAFKA_BOOTSTRAP_SERVERS=<use the bootstrap string which you saved earlier when MSK cluster was created and promoted to the ACTIVE state>
    
  5. Start the Analytics microservice, as follows:

     MICRONAUT_ENVIRONMENTS=ec2 java -jar analytics_application.jar
    
  6. Verify that the application is running by invoking the controller at http://[EC2_PUBLIC_IP]:8081/analytics using curl:

     curl -i http://$PIPA:8081/analytics
    

8. Deploy Native Executables to AWS Cloud #

If you build a native executable locally, it will only run on your OS, and is not suitable for deployment to AWS Cloud. To create a deployable native executable, use a different packaging command that builds the executable inside a container, which you will extract from the container to deploy to the cloud.

8.1. Configure Native Image #

The aws-msk-iam-auth authentication library relies on the Java reflection mechanism. GraalVM Native Image requires you to manually configure the library elements that are reflectively accessed at runtime. Create two configuration files for each microservice as follows:

  • Books:
    • aws/src/main/resources/META-INF/native-image/com.example.publisher/resource-config.json
    • aws/src/main/resources/META-INF/native-image/com.example.publisher/reflect-config.json
  • Analytics:
    • aws/src/main/resources/META-INF/native-image/com.example.consumer/resource-config.json
    • aws/src/main/resources/META-INF/native-image/com.example.consumer/reflect-config.json

The contents of the files are provided below.

resource-config.json:

{
  "resources":{
    "includes":[
      {
        "pattern":"\\QMETA-INF/micronaut/io.micronaut.context.ApplicationContextConfigurer\\E"
      },
      {
        "pattern":"\\QMETA-INF/micronaut/io.micronaut.inject.BeanConfiguration\\E"
      },
      {
        "pattern":"\\QMETA-INF/micronaut/io.micronaut.inject.BeanDefinitionReference\\E"
      },
      {
        "pattern":"\\QMETA-INF/services/com.fasterxml.jackson.databind.Module\\E"
      },
      {
        "pattern":"\\QMETA-INF/services/io.micronaut.context.env.PropertySourceLoader\\E"
      },
      {
        "pattern":"\\QMETA-INF/services/io.micronaut.core.convert.TypeConverterRegistrar\\E"
      },
      {
        "pattern":"\\QMETA-INF/services/io.micronaut.core.type.TypeInformationProvider\\E"
      },
      {
        "pattern":"\\QMETA-INF/services/io.micronaut.http.HttpRequestFactory\\E"
      },
      {
        "pattern":"\\QMETA-INF/services/io.micronaut.http.client.HttpClientFactory\\E"
      },
      {
        "pattern":"\\QMETA-INF/services/java.nio.file.spi.FileSystemProvider\\E"
      },
       {
          "pattern":"\\Qapplication.properties\\E"
       },
       {
          "pattern":"\\Qapplication-ec2.properties\\E"
       },
      {
        "pattern":"\\Qcom/amazonaws/internal/config/awssdk_config_default.json\\E"
      },
      {
        "pattern":"\\Qcom/amazonaws/sdk/versionInfo.properties\\E"
      },
      {
        "pattern":"\\Qkafka/kafka-version.properties\\E"
      },
      {
        "pattern":"\\Qlogback.xml\\E"
      },
      {
        "pattern":"\\Qmicronaut-version.properties\\E"
      },
      {
        "pattern":"\\Qorg/slf4j/impl/StaticLoggerBinder.class\\E"
      },
      {
        "pattern":"\\Qtest-resources.properties\\E"
      },
      {
        "pattern":"\\Qcom/amazonaws/partitions/endpoints.json\\E"
      },
      {
        "pattern":"org/joda/time/tz/data/.*"
      }
    ]},
  "bundles":[{
    "name":"sun.security.util.Resources",
    "classNames":["sun.security.util.Resources"]
  }]
}

reflect-config.json:

[
  {
    "name":"software.amazon.msk.auth.iam.IAMClientCallbackHandler",
    "methods":[{"name":"<init>","parameterTypes":[] }]
  },
  {
    "name":"org.apache.commons.logging.impl.Jdk14Logger",
    "methods":[{"name":"<init>","parameterTypes":["java.lang.String"] }]
  },
  {
    "name":"org.apache.commons.logging.impl.Log4JLogger"
  },
  {
    "name":"org.apache.commons.logging.impl.LogFactoryImpl",
    "methods":[{"name":"<init>","parameterTypes":[] }]
  },
  {
    "name":"com.amazonaws.internal.config.InternalConfigJsonHelper",
    "allDeclaredFields":true,
    "queryAllDeclaredMethods":true,
    "queryAllDeclaredConstructors":true,
    "methods":[
      {"name":"<init>","parameterTypes":[] },
      {"name":"setDefaultSigner","parameterTypes":["com.amazonaws.internal.config.SignerConfigJsonHelper"] },
      {"name":"setHostRegexToRegionMappings","parameterTypes":["com.amazonaws.internal.config.HostRegexToRegionMappingJsonHelper[]"] },
      {"name":"setHttpClients","parameterTypes":["com.amazonaws.internal.config.JsonIndex[]"] },
      {"name":"setRegionSigners","parameterTypes":["com.amazonaws.internal.config.JsonIndex[]"] },
      {"name":"setServiceRegionSigners","parameterTypes":["com.amazonaws.internal.config.JsonIndex[]"] },
      {"name":"setServiceSigners","parameterTypes":["com.amazonaws.internal.config.JsonIndex[]"] },
      {"name":"setUserAgentTemplate","parameterTypes":["java.lang.String"] }
      ]
  },
  {
    "name":"com.amazonaws.internal.config.SignerConfigJsonHelper",
    "allDeclaredFields":true,
    "queryAllDeclaredMethods":true,
    "queryAllDeclaredConstructors":true,
    "methods":[
      {"name":"<init>","parameterTypes":[] },
      {"name":"setSignerType","parameterTypes":["java.lang.String"] }
    ]
  },
  {
    "name":"com.amazonaws.internal.config.JsonIndex",
    "allDeclaredFields":true,
    "queryAllDeclaredMethods":true,
    "queryAllDeclaredConstructors":true,
    "methods":[
      {"name":"<init>","parameterTypes":[] },
      {"name":"setConfig","parameterTypes":["com.amazonaws.internal.config.Builder"] },
      {"name":"setKey","parameterTypes":["java.lang.String"] }
    ]
  },
  {
    "name":"com.amazonaws.internal.config.HttpClientConfigJsonHelper",
    "allDeclaredFields":true,
    "queryAllDeclaredMethods":true,
    "queryAllDeclaredConstructors":true,
    "methods":[
      {"name":"<init>","parameterTypes":[] },
      {"name":"setRegionMetadataServiceName","parameterTypes":["java.lang.String"] },
      {"name":"setServiceName","parameterTypes":["java.lang.String"] }
    ]
  },
  {
    "name":"com.amazonaws.internal.config.HostRegexToRegionMappingJsonHelper",
    "allDeclaredFields":true,
    "queryAllDeclaredMethods":true,
    "queryAllDeclaredConstructors":true,
    "methods":[
      {"name":"<init>","parameterTypes":[] },
      {"name":"setHostNameRegex","parameterTypes":["java.lang.String"] },
      {"name":"setRegionName","parameterTypes":["java.lang.String"] }
    ]
  },
  {
    "name":"software.amazon.msk.auth.iam.internals.AuthenticationResponse",
    "allDeclaredFields":true,
    "queryAllDeclaredMethods":true,
    "queryAllDeclaredConstructors":true,
    "methods":[{"name":"<init>","parameterTypes":["java.lang.String","java.lang.String"] }]
  },
  {
    "name":"software.amazon.msk.auth.iam.IAMLoginModule",
    "methods":[{"name":"<init>","parameterTypes":[] }]
  },
  {
    "name":"software.amazon.msk.auth.iam.internals.IAMSaslClient$ClassLoaderAwareIAMSaslClientFactory",
    "methods":[{"name":"<init>","parameterTypes":[] }]
  },
  {
    "name":"software.amazon.msk.auth.iam.internals.IAMSaslClient$IAMSaslClientFactory",
    "methods":[{"name":"<init>","parameterTypes":[] }]
  },
  {
    "name":"com.amazonaws.partitions.model.CredentialScope",
    "allDeclaredFields":true,
    "queryAllDeclaredMethods":true,
    "queryAllDeclaredConstructors":true,
    "methods":[
      {"name":"<init>","parameterTypes":[] },
      {"name":"setRegion","parameterTypes":["java.lang.String"] },
      {"name":"setService","parameterTypes":["java.lang.String"] }
    ]
  },
  {
    "name":"com.amazonaws.partitions.model.Endpoint",
    "allDeclaredFields":true,
    "queryAllDeclaredMethods":true,
    "queryAllDeclaredConstructors":true,
    "methods":[
      {"name":"<init>","parameterTypes":[] },
      {"name":"setCredentialScope","parameterTypes":["com.amazonaws.partitions.model.CredentialScope"] },
      {"name":"setHostName","parameterTypes":["java.lang.String"] },
      {"name":"setProtocols","parameterTypes":["java.util.Set"] },
      {"name":"setSignatureVersions","parameterTypes":["java.util.Set"] },
      {"name":"setSslCommonName","parameterTypes":["java.lang.String"] }
    ]
  },
  {
    "name":"com.amazonaws.partitions.model.Partition",
    "allDeclaredFields":true,
    "queryAllDeclaredMethods":true,
    "queryAllDeclaredConstructors":true,
    "methods":[
      {"name":"<init>","parameterTypes":["java.lang.String","java.util.Map","java.util.Map"] },
      {"name":"setDefaults","parameterTypes":["com.amazonaws.partitions.model.Endpoint"] },
      {"name":"setDnsSuffix","parameterTypes":["java.lang.String"] },
      {"name":"setPartitionName","parameterTypes":["java.lang.String"] },
      {"name":"setRegionRegex","parameterTypes":["java.lang.String"] }
    ]
  },
  {
    "name":"com.amazonaws.partitions.model.Partitions",
    "allDeclaredFields":true,
    "queryAllDeclaredMethods":true,
    "queryAllDeclaredConstructors":true,
    "methods":[{"name":"<init>","parameterTypes":["java.lang.String","java.util.List"] }]
  },
  {
    "name":"com.amazonaws.partitions.model.Region",
    "allDeclaredFields":true,
    "queryAllDeclaredMethods":true,
    "queryAllDeclaredConstructors":true,
    "methods":[{"name":"<init>","parameterTypes":["java.lang.String"] }]
  },
  {
    "name":"com.amazonaws.partitions.model.Service",
    "allDeclaredFields":true,
    "queryAllDeclaredMethods":true,
    "queryAllDeclaredConstructors":true,
    "methods":[
      {"name":"<init>","parameterTypes":["java.util.Map"] },
      {"name":"setDefaults","parameterTypes":["com.amazonaws.partitions.model.Endpoint"] },
      {"name":"setPartitionEndpoint","parameterTypes":["java.lang.String"] },
      {"name":"setRegionalized","parameterTypes":["boolean"] }
    ]
  }
]

8.2. Run Native Executables on EC2 Instance #

  1. Download the example ZIP file and copy it to the EC2 instance:

    scp -i /path/to/AWS_Keypair.pem /path/to/downloaded/streaming_sample.zip ec2-user@$PIPA:/home/ec2-user/streaming_sample.zip
    
  2. Once copied, connect to the EC2 instance (if it has disconnected by now):

     ssh -i /path/to/AWS_Keypair.pem ec2-user@$PIPA
    
  3. Unpack the compressed file:

    sudo yum install unzip
    unzip streaming_sample.zip
    
  4. Install the packages that GraalVM needs to work properly on EC2:

    sudo yum -y install gcc
    sudo yum -y install zlib*
    
  5. You must apply the settings from sections 5 (Configure Microservices) and 8.1 (Configure Native Image).

  6. To generate a native executable, run the following command for each microservice:

    ./gradlew :aws:nativeCompile

    The native executable is created in the aws/build/native/nativeCompile/ directory and can be run with the following command.

    MICRONAUT_ENVIRONMENTS=ec2 aws/build/native/nativeCompile/aws

    Or if you use Windows:

    cmd /C "set MICRONAUT_ENVIRONMENTS=ec2 && aws\build\native\nativeCompile\aws"
    ./mvnw install -pl lib -am
    ./mvnw package -pl aws -Dpackaging=native-image

    The native executable is created in the aws/target/ directory and can be run with the following command:

    MICRONAUT_ENVIRONMENTS=ec2 aws/target/aws

    Or if you use Windows:

    cmd /C "set MICRONAUT_ENVIRONMENTS=ec2 && aws\target\aws"

    Start the native executables for the two microservices and run the same curl requests as before to check that everything works as expected.

You can see that the microservices behave identically as if you run them from JAR files, but with reduced startup time and smaller memory footprint.

Summary #

In this guide you created a streaming application with the Micronaut framework, Kafka, and Amazon Managed Streaming for Apache Kafka (MSK). The communication between two microservices acting as a producer and consumer ran asynchronously. Then you packaged these microservices into native executables with GraalVM Native Image for their faster startup and reduced memory footprint, and deployed to AWS Cloud.