Introduction:
Contract first application development is not limited to synchronized RESTFul API calls. With the adoption of event driven architecture, more developers are demanding a way to set up contracts between these asynchronous event publishers and consumers.. Sharing what data format that each subscriber is consuming, what data format is used from the event publisher, in a way OpenAPI standards is going to be very helpful.
But in the asynchronous world, it is ever more complex, not only do you need to be caring about the data schema, there are different protocols, serializing, deserializing mechanisms, and various support libraries. In fact there are talks on AsyncAPI. But I am going to show you that what can be done today, is to use ONE of the most commonly used solutions in building EDA, “Kafka”. And show how to do Contract First Application Development using Camel + Apicurio Registry.
The Solutions:
I will be using Red Hat’s version of Kafka, AMQ Streams, APIcurio as my service registry and finally using Apache Camel to build the contract first event driven application. . To spice things up I am going to be Avro and Protobuf. (Normally you want to choose between one of them.)
Rather than sending plain text like JSON or XML, binary representation is more compacted and efficient. And this works well with Kafka, since Kafka is using binary message format. In short, Avro and Protobuf are just ways to serialize/deserialize the data. Both also provide schema in order to serialize/deserialize data. And we can use it as the contract for each topic that is publishing/consuming the events.
Avro is from Hadoop, more commonly used in Kafka, as it’s default serialization.
Protobuf is from Google, has two versions, here I am using proto 3.
Apricurio Registry is a datastore for standard event schemas and API designs. We are going to use it for our Schema management. Where we will use it to store all the schemas, and use it’s client serializer/deserializer to validate the event messages.
Camel is the perfect framework for connecting the dots, transforming messages to desired states. As it provides built-in libraries to connect to Kafka, built-in data format transforms for Avro/Protobuf. (This is a great feature if you want to process the body, it simplifies and marshal the data into POJO.)
There are two different approaches to do this:
Manually upload schema, and using default Camel dataformat component to serialize/deserialize
Using the Apicurio Registry libraries to upload schema and serialize/deserialize
Environment
Here is what you need to have in the environment.
Kafka Cluster (AMQ Streams)
Apricurio Service Registry
Solution A Camel built-in Dataformat components: Steps in a glance, using Camel Quarkus.
Step One:
Go to this page to https://code.quarkus.io/ to generate your bootstrap quarkus application.
You will have a sample Camel project ready to go. Add all the dependencies needed for the project. Depending on the endpoint you want to connect to, and add the dependencies into the pom.xml file under the project.
Add the dependencies you will need to convert between two serialize mechanisms, and place it into the Kafka topics.
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-quarkus-protobuf</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-quarkus-avro</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-quarkus-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-quarkus-stream</artifactId>
</dependency>
Add plugins to download the schema from registry. (Of course you can register a schema from local code too). But I am assuming there is a separate person that will design and handle schema management.
<plugin>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-maven-plugin</artifactId>
<version>${registry.version}</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>download</goal>
</goals>
<configuration>
<registryUrl>http://localhost:8080/api</registryUrl>
<ids>
<param1>demo-avro</param1>
<param1>demo-protobuf</param1>
</ids>
<outputDirectory>${project.basedir}/src/main/resources</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
Note: In configuration, enter the location of the Registry. Also make sure you tell the plugin what schema to download and where to place it in the project.
Step Two
Create the schema. Login to your Apricurio Service Registry, upload the schema.
Setup the topics that you want to use to populate the events.
Add plugins to generate the POJO that we are going to be using for marshalling/unmarshalling. And it’s easier if we want to further process the data.
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.8.1</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<extensions>true</extensions>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf-version}:exe:${os.detected.classifier}</protocArtifact>
<outputDirectory>${project.basedir}/src/main/java</outputDirectory>
<protoSourceRoot>${project.basedir}/src/main/resources</protoSourceRoot>
<clearOutputDirectory>false</clearOutputDirectory>
</configuration>
</execution>
</executions>
</plugin>
After adding the plugins, run the following command:
`` mvn compile ``
It will generate the corresponding java source base on the schema that was loaded from the registry.
Step Three
Add your Camel route in the MyRouteBuilder.java. Depending on what you want to do with the object.
For example, the code here demonstrates how to use the built-in Protobuf component, and transform to the Protobuf object. Before sending the data as Byte Array into the Kafka Topics.
.marshal().protobuf("demo.camel.TransactionProtos$Transaction")
.log("Sender: ${header.sender}") .toD("kafka:webtrans-quarkus?brokers=localhost:9092&key=${header.sender}&serializerClass=org.apache.kafka.common.serialization.ByteArraySerializer")
Or this shows you how to use the built-in Camel Avro component to transform before sending the data as Byte Array into the Kafka Topics
AvroDataFormat format = new AvroDataFormat(Transaction.SCHEMA$);
....
.marshal(format)
.toD("kafka:transrec-quarkus?brokers=localhost:9092&groupId=producergroup&serializerClass=org.apache.kafka.common.serialization.ByteArraySerializer")
And it’s done.
Here are the working Camel Quarkus Project:
I will discuss more in the next post.
-----
Instead of manually uploading the schemas to the registry, we will be using the Apicurio Registry libraries, by adding it to the Kafka configuration, it will automatically save the schema in the registry.
Here is how it’s done,
Solution B - Apricurio Registry Libraries : Steps in a glance, using Camel Java Main.
Step One:
Create a Camel Project using the archetypes.
mvn archetype:generate \
-DarchetypeGroupId=org.apache.camel.archetypes \
-DarchetypeArtifactId=camel-archetype-java \
-DarchetypeVersion=3.5.0
Add the dependencies you will need to convert between two serialize mechanisms, and place it into the Kafka topics.
<dependency>
<groupId>io.apicurio</groupId>
<artifactId>apicurio-registry-utils-converter</artifactId>
<version>${registry.version}</version>
</dependency>
<dependency>
<groupId>org.glassfish.jersey.core</groupId>
<artifactId>jersey-common</artifactId>
<version>2.32</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-stream</artifactId>
</dependency>
Step Two:
Create the schema and place the schema files into your application resource folder:
${project.basedir}/src/main/resources
Setup the topics that you want to use to populate the events.
Add plugins to generate the POJO that we are going to be using for marshalling/unmarshalling. And it’s easier if we want to further process the data.
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.8.1</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<extensions>true</extensions>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf-version}:exe:${os.detected.classifier}</protocArtifact>
<outputDirectory>${project.basedir}/src/main/java</outputDirectory>
<protoSourceRoot>${project.basedir}/src/main/resources</protoSourceRoot>
<clearOutputDirectory>false</clearOutputDirectory>
</configuration>
</execution>
</executions>
</plugin>
After adding the plugins, run the following command:
`` mvn compile ``
Step Three:
Add your Camel route in the MyRouteBuilder.java. Configure the Kafka component with the Apicurio registry libraries.
For example, the code here demonstrates how to serialize Avro and deserialize Protobuf with Apicurio registry libraries. And automatically register the schema in the registry. Note you need to point to the registry URL by providing the “apicurio.registry.url”.
serializerClass=io.apicurio.registry.utils.serde.AvroKafkaSerializer
deserializerClass=io.apicurio.registry.utils.serde.ProtobufKafkaDeserializer
additionalProperties.apicurio.registry.url=http://localhost:8080/api
additionalProperties.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy
And in the Camel route, instantiate the two generated Avro/Protobuf classes aldds, work normally with your camel integration logic, and simply send it to the Kafka endpoint that was configured on the previous step.
For example, this bean will create the ProtoBuf class and then sent to Kafka topics
.bean(demo.camel.TransactionProtos.Transaction.class, "parseFrom(${body})")
.toD("kafka:webtrans?key=${header.sender}")
For example, the method will create the Avro class and then sent to Kafka topics
.setBody(method(this,"setAvroValue( ${originalmsg.userid},${originalmsg.amt})"))
.toD("kafka:transrec?serializerClass={{serializerClass}}"+registryconfigAvro)
You can find a working github example, in the link.That’s all for this post.
In my next post, I will walk you through the details of the example.
Summary
Here is a video that explain everything with diagrams (NO CODE)
Apricurio Registry is a datastore to help topic owners to manage the schema of it’s dat format. (API for Rest endpoints). When trying to publish/subscribe to the topic, developers will have access to the already defined schema contract. And using Apache Camel, to implement the actual integration code with simple configuration or built-in data format components.
View comments