Kennis Apache Camel's Scala DSL explored

Apache Camel's Scala DSL explored


Some weeks ago a client approached us because he wanted some assistance with his IT infrastructure. The client has many systems that interact with each other and are quite tightly coupled. Due to some upcoming changes in their business, the time had come to start thinking about how to improve their architecture. One of the goals of the project was that they wanted the ability to decouple the systems so that they could change systems independently of each other. Our solution ended up being a really powerful combination of Apache Camel and Scala.

The main flow is as follows: whenever a product is ordered, a CRM system registers this and the product will be provisioned for that specific customer. A separate website that is used by the customer shows the ordered product along with its properties (some of which can be changed by the customer using this website). The provisioning of the ordered product is handled by a different system.

In order to create flexibility in the kind of software that handles specific parts of the flow it was decided that the systems should be as loosely coupled as possible. This would enable the client to swap systems without affecting other systems. To facilitate this, we ended up using Apache Camel. Most of the communication between systems is done through HTTP and Camel makes consuming (and exposing) HTTP endpoints extremely straightforward.

The Solution

Our strategy was that once a product is ordered this fact is published to the integration platform (Camel) and some other system (possibly more than one) would be able to react to this.

Most of the client's homegrown system were written in Scala and since Camel has a Scala DSL we went ahead and used that. Creating the routes to handle the JSON that the HTTP endpoints returned was very easy, most of the routes were along the lines of:

"servlet:///service/" ==> {

Whenever a system is done with its operation it returns a JSON document. Depending on which system sent the message and the properties of the message we needed to invoke different routes. So what we wanted was a way to deserialize the JSON document into a Scala case class so we could easily check the properties of the message. Basically, we wanted to be able to do something like:


To do that we needed two things:

  1.  a way to register the fact that we want to be able to convert String's (the JSON document) into a case class
  2. a way to deserialize a JSON document into a case class in Scala

The first thing is pretty easy, Camel has something called type converters. This basically tells Camel: “If you encounter a class of type X, and someone wants it as type Y, class Z will be able to do that”.

Class Z is called the type converter and you can register it like this:

camelContext.getTypeConverterRegistry.addTypeConverter(classOf[MyCase], classOf[String], new CaseClassTypeConverter)

To clarify: we're telling Camel to add CaseClassTypeConverter as a type converter that will be able to create a MyCase out of a String.

The second piece of the puzzle is the actual deserialization of the JSON document into a MyCase. Since our client was already using it, we went ahead and used Lift. The CaseClassTypeConverter looks like this:

class CaseClassTypeConverter extends TypeConverterSupport {
implicit val formats = DefaultFormats
def convertTo[T](clazz: Class[T], exchange: Exchange, value: Any) = {
Extraction.extract(parse(value.toString), TypeInfo(clazz, None)).asInstanceOf[T]

Having solved our two puzzle pieces, we can now use the following in our Camel route:

"servlet:///as" ==> {

The case class looks like this:

case class MyCase(something:String)

But wait, there's more!

When you attempt to use the above, you could end up in a situation where it actually does not work. Probably because the input type is not a String but instead a Camel StreamCache. So what we did was create another type converter that handles the conversion from a StreamCache, we registered it like this:

camelContext.getTypeConverterRegistry.addTypeConverter(classOf[MyCase], classOf[StreamCache], new StreamCacheCaseClassTypeConverter)

The type converter looks like this:

class StreamCacheCaseClassTypeConverter extends CaseClassTypeConverter {
override def convertTo[T](clazz: Class[T], exchange: Exchange, value: Any) = {
val os = new ByteArrayOutputStream()
super.convertTo(clazz, exchange, new String(os.toString("UTF-8")))

It basically unwraps the StreamCache into a String and sends that to the original type converter.

Hopefully someone will be able to use these examples to their own advantage!