Skip to main content

Contract first development - the event driven way!

Introduction: 

Contract first application development is not limited to synchronized RESTFul API calls. With the adoption of event driven architecture, more developers are demanding a way to set up contracts between these asynchronous event publishers and consumers.. Sharing what data format that each subscriber is consuming, what data format is used from the event publisher, in a way OpenAPI standards is going to be very helpful.

 

But in the asynchronous world, it is ever more complex, not only do you need to be caring about the data schema, there are different protocols, serializing, deserializing mechanisms, and various support libraries. In fact there are talks on AsyncAPI. But I am going to show you that what can be done today,  is to use ONE of the most commonly used solutions in building EDA, “Kafka”. And show how to do Contract First Application Development using Camel + Apicurio Registry. 


The Solutions:

I will be using Red Hat’s version of Kafka, AMQ Streams,  APIcurio as my service registry and finally using Apache Camel to build the contract first event driven application. . To spice things up I am going to be Avro and Protobuf.   (Normally you want to choose between one of them.)


Rather than sending plain text like JSON or XML, binary representation is more compacted and efficient. And this works well with Kafka, since Kafka is using binary message format. In short,  Avro and Protobuf are just ways to serialize/deserialize the data. Both also provide schema in order to serialize/deserialize data. And we can use it as the contract for each topic that is publishing/consuming the events. 


Avro is from Hadoop, more commonly used in Kafka, as it’s default serialization. 

Protobuf is from Google, has two versions, here I am using proto 3. 


Apricurio Registry is a datastore for standard event schemas and API designs. We are going to use it for our Schema management. Where we will use it to store all the schemas, and use it’s client serializer/deserializer to validate the event messages. 


Camel is the perfect framework for connecting the dots, transforming messages to desired states. As it provides built-in libraries to connect to Kafka, built-in data format transforms for Avro/Protobuf. (This is a great feature if you want to process the body, it simplifies and marshal the data into POJO.) 



There are two different approaches to do this: 

  1. Manually upload schema, and using default Camel dataformat component to  serialize/deserialize

  2. Using the Apicurio Registry libraries to upload schema and serialize/deserialize


Environment

Here is what you need to have in the environment. 

  1. Kafka Cluster (AMQ Streams)

  2. Apricurio Service Registry


Solution A Camel built-in Dataformat components: Steps in a glance, using Camel Quarkus. 

Step One: 

Go to this page to https://code.quarkus.io/ to generate your bootstrap quarkus application.


You will have a sample Camel project ready to go. Add all the dependencies needed for the project. Depending on the endpoint you want to connect to, and add the dependencies into the pom.xml file under the project. 


Add the dependencies you will need to convert between two serialize mechanisms, and place it into the Kafka topics.

  <dependency>

    <groupId>org.apache.camel</groupId>

    <artifactId>camel-quarkus-protobuf</artifactId>

  </dependency>  

  <dependency>

    <groupId>org.apache.camel</groupId>

    <artifactId>camel-quarkus-avro</artifactId>

  </dependency>  

  <dependency>

    <groupId>org.apache.camel</groupId>

    <artifactId>camel-quarkus-kafka</artifactId>

  </dependency>

  <dependency>

      <groupId>org.apache.camel</groupId>

      <artifactId>camel-quarkus-stream</artifactId>

  </dependency>


Add plugins to download the schema from registry. (Of course you can register a schema from local code too). But  I am assuming there is a separate person that will design and handle schema management. 


<plugin>

  <groupId>io.apicurio</groupId>

  <artifactId>apicurio-registry-maven-plugin</artifactId>

  <version>${registry.version}</version>

  <executions>

      <execution>

        <phase>generate-sources</phase>

        <goals>

          <goal>download</goal>

        </goals>

        <configuration>

          <registryUrl>http://localhost:8080/api</registryUrl>

          <ids>

            <param1>demo-avro</param1>

            <param1>demo-protobuf</param1>

          </ids>

          <outputDirectory>${project.basedir}/src/main/resources</outputDirectory>

        </configuration>

      </execution>

   </executions>

</plugin>


Note: In configuration,  enter the location of the Registry. Also make sure you tell the plugin what schema to download and where to place it in the project.


Step Two


Create the schema. Login to your Apricurio Service Registry, upload the schema. 


Setup the topics that you want to use to populate the events. 


Add plugins to generate the POJO that we are going to be using for marshalling/unmarshalling. And it’s easier if we want to further process the data. 


      <plugin>

        <groupId>org.apache.avro</groupId>

        <artifactId>avro-maven-plugin</artifactId>

        <version>1.8.1</version>

        <executions>

          <execution>

            <phase>generate-sources</phase>

            <goals>

              <goal>schema</goal>

            </goals>

            <configuration>

              <sourceDirectory>${project.basedir}/src/main/resources</sourceDirectory>

              <outputDirectory>${project.basedir}/src/main/java</outputDirectory>

            </configuration>

          </execution>

        </executions>

      </plugin>


      <plugin>

        <groupId>org.xolstice.maven.plugins</groupId>

        <artifactId>protobuf-maven-plugin</artifactId>

        <version>0.6.1</version>

        <extensions>true</extensions>

        <executions>

          <execution>

            <goals>

              <goal>compile</goal>

            </goals>

            <configuration>

              <protocArtifact>com.google.protobuf:protoc:${protobuf-version}:exe:${os.detected.classifier}</protocArtifact>

              <outputDirectory>${project.basedir}/src/main/java</outputDirectory>

              <protoSourceRoot>${project.basedir}/src/main/resources</protoSourceRoot>

              <clearOutputDirectory>false</clearOutputDirectory>

            </configuration>

          </execution>

        </executions>

      </plugin>

 

After adding the plugins, run the following command: 


`` mvn compile `` 



It will generate the corresponding java source base on the schema that was loaded from the registry. 


Step Three 


Add your Camel route in the MyRouteBuilder.java. Depending on what you want to do with the object. 


For example, the code here demonstrates how to use the built-in Protobuf component, and transform to the Protobuf object. Before sending the data as Byte Array into the Kafka Topics.


.marshal().protobuf("demo.camel.TransactionProtos$Transaction")

.log("Sender: ${header.sender}")        .toD("kafka:webtrans-quarkus?brokers=localhost:9092&key=${header.sender}&serializerClass=org.apache.kafka.common.serialization.ByteArraySerializer")

 

Or this shows you how to use the built-in Camel Avro component to transform before sending the data as Byte Array into the Kafka Topics


AvroDataFormat format = new AvroDataFormat(Transaction.SCHEMA$);

....

.marshal(format)

.toD("kafka:transrec-quarkus?brokers=localhost:9092&groupId=producergroup&serializerClass=org.apache.kafka.common.serialization.ByteArraySerializer")

               


And it’s done. 

Here are the working Camel Quarkus Project: 


I will discuss more in the next post. 


-----

Instead of manually uploading the schemas to the registry, we will be using the Apicurio Registry libraries, by adding it to the Kafka configuration, it will automatically save the schema in the registry. 


Here is how it’s done, 

Solution B - Apricurio Registry Libraries : Steps in a glance, using Camel Java Main. 


Step One: 

Create a Camel Project using the archetypes. 

mvn archetype:generate \

  -DarchetypeGroupId=org.apache.camel.archetypes \

  -DarchetypeArtifactId=camel-archetype-java \

  -DarchetypeVersion=3.5.0


Add the dependencies you will need to convert between two serialize mechanisms, and place it into the Kafka topics.


  <dependency>

    <groupId>io.apicurio</groupId>

    <artifactId>apicurio-registry-utils-converter</artifactId>

    <version>${registry.version}</version>

  </dependency>

  <dependency>

    <groupId>org.glassfish.jersey.core</groupId>

    <artifactId>jersey-common</artifactId>

    <version>2.32</version>

  </dependency>

  <dependency>

    <groupId>org.apache.camel</groupId>

    <artifactId>camel-kafka</artifactId>

  </dependency>

  <dependency>

      <groupId>org.apache.camel</groupId>

      <artifactId>camel-stream</artifactId>

  </dependency>


Step Two:


Create the schema and place the schema files into your application resource folder:

${project.basedir}/src/main/resources


Setup the topics that you want to use to populate the events. 


Add plugins to generate the POJO that we are going to be using for marshalling/unmarshalling. And it’s easier if we want to further process the data. 


      <plugin>

        <groupId>org.apache.avro</groupId>

        <artifactId>avro-maven-plugin</artifactId>

        <version>1.8.1</version>

        <executions>

          <execution>

            <phase>generate-sources</phase>

            <goals>

              <goal>schema</goal>

            </goals>

            <configuration>

              <sourceDirectory>${project.basedir}/src/main/resources</sourceDirectory>

              <outputDirectory>${project.basedir}/src/main/java</outputDirectory>

            </configuration>

          </execution>

        </executions>

      </plugin>


      <plugin>

        <groupId>org.xolstice.maven.plugins</groupId>

        <artifactId>protobuf-maven-plugin</artifactId>

        <version>0.6.1</version>

        <extensions>true</extensions>

        <executions>

          <execution>

            <goals>

              <goal>compile</goal>

            </goals>

            <configuration>

              <protocArtifact>com.google.protobuf:protoc:${protobuf-version}:exe:${os.detected.classifier}</protocArtifact>

              <outputDirectory>${project.basedir}/src/main/java</outputDirectory>

              <protoSourceRoot>${project.basedir}/src/main/resources</protoSourceRoot>

              <clearOutputDirectory>false</clearOutputDirectory>

            </configuration>

          </execution>

        </executions>

      </plugin>

 

After adding the plugins, run the following command: 


`` mvn compile `` 



Step Three:

Add your Camel route in the MyRouteBuilder.java. Configure the Kafka component with the Apicurio registry libraries.


For example, the code here demonstrates how to serialize Avro and deserialize Protobuf with Apicurio registry libraries. And automatically register the schema in the registry. Note you need to point to the registry URL by providing the “apicurio.registry.url”. 


serializerClass=io.apicurio.registry.utils.serde.AvroKafkaSerializer 

deserializerClass=io.apicurio.registry.utils.serde.ProtobufKafkaDeserializer

additionalProperties.apicurio.registry.url=http://localhost:8080/api 

additionalProperties.apicurio.registry.global-id=io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy




 

And in the Camel route, instantiate the two generated Avro/Protobuf classes aldds, work normally with your camel integration logic, and simply send it to the Kafka endpoint that was configured on the previous step. 


For example, this bean will create the ProtoBuf class and then sent to Kafka topics

.bean(demo.camel.TransactionProtos.Transaction.class, "parseFrom(${body})")

.toD("kafka:webtrans?key=${header.sender}")


For example, the method will create the Avro class and then sent to Kafka topics

.setBody(method(this,"setAvroValue( ${originalmsg.userid},${originalmsg.amt})"))

.toD("kafka:transrec?serializerClass={{serializerClass}}"+registryconfigAvro)


You can find a working github example, in the link.That’s all for this post. 

In my next post, I will walk you through the details of the example. 



Summary 


Here is a video that explain everything with diagrams (NO CODE)




Apricurio Registry is a datastore to help topic owners to  manage the schema of it’s dat format.  (API for Rest endpoints). When trying to publish/subscribe to the topic, developers will have access to the already defined schema contract. And using Apache Camel, to implement the actual integration code with simple configuration or built-in data format components. 


Comments

Popular posts from this blog

Red Hat Fuse - Announcing Fuse 7 Tech preview 3 release.

Red Hat Fuse 7.0 technical preview three is out today! On the pathway to become one of the best cloud-native integration platform, Fuse gives developer freedom to choose how they want to develop the integration solution, where they want to deploy it and capabilities to address new integration personas that do not have development experience. By supporting the three major runtime , developer is free to work on the runtime of their choice. By supporting  standalone and cloud deployment , it simplifies the complexity to distinguish between these environments, allowing application to deploy freely among the environment of your choice.  All levels of developers are welcome, you can either dive deep into creating customize complex integration logic , or using the new low code platform to quickly build a simple integration. In this Tech Preview release you get it all. Fuse Standalone Spring-boot for microservice Karaf 4 for OSGi lover JBoss EAP for JavaEE developers Fuse on Op

JBoss EAP 6 - 效能調校 (一) DataSource 的 Connection Pool

效能沒有什麼Best Practice, 反正能調整的就那些。 通常,一個程式的效能大概有70-80% 都跟程式怎麼寫的其實比較有關係。 最近我最疼愛的小貓Puji 因為膀胱結石開刀的時候過世了,心情很差請原諒我的口氣沒有很好,也沒有心情寫部落格。 Puji R.I.P. =======================正文======================= 這個題目很多人叫我寫,可是這題目好大,這分明就是整死我咩~ 所以我會分幾段慢慢寫。 JBoss 的 Subsystem Datasource  Web Web Service  EJB  Hibernate  JMS JCA JVM 調校 OS (作業系統) 先來看一下 DataSource Subsystem, DataSource 的部分主要是針對Connection Pool 做調校。 通常,程式都會需要跟資料庫界接,電腦在本機,尤其是在記憶體的運算很快,但是一旦要外部的資源連接,就是會非常的耗資源。所以現在的應用程式伺服器都會有個Pool 放一些先連接好的 資料庫connection,當程式有需要的時候就可以馬上提供,而不用花那些多餘的資源去連接資料庫。 這就是為什麼要針對Connection Pool 去做調校。 以下會討論到的參數,都是跟效能比較有關係,Datasource 還有很多參數,像是檢核connection 是否正確的,我都不會提到。如果你追求的是非常快速的效能,那我建議你一個檢核都不要加。當然,這樣就會為伺服器上面執行的程式帶來風險。這就是你要在效能與正確,安全性上面的取捨了。 (套句我朋友說的話,不可能又要馬兒好,又要馬兒不吃草的..) 最重要的調校參數就是 Connection 的 Pool 數量。(也就是那個Pool 裡面要放幾條的connection.) 這個參數是每一個應用程式都不一樣的。 min-pool-size  Connection Pool 最少會存留的connection 數量 max-pool-size  Connection Pool 最多可以開啓的 connection 數量 prefill 事先將connection pool 裡面建立好mi

JBoss Fuse - Fuse workshop 101 - Part One

On my way to Hong Kong for a day of workshop on JBoss Fuse, and as I go through my Slide deck, I cannot find any decent easy workshop for beginners. Therefore I decide make a workshop that is easy for Camel first timer to get their hands dirty. The first of part of the workshop is an introduction to Camel, it first goes through what is exactly inside JBoss Fuse. For part one of the workshop, it takes your through the very basic of Camel, one of the very important component inside JBoss Fuse. Every Camel need to have a runtime container to run in, inside camel we call it a CAMEL CONTEXT.  Inside every Camel context, you can define lots of camel route and registry, don't worry about what those are, we will explain later. So inside out blueprint xml, you will see a tag called camelContext. Next up is camel route, they are a chain of command or process defined by you, as a developer. Inside the camel route, there are consumer endpoints that listens to the incoming mess