Creating Camel routes on-the-fly using OSGi

Avisi werknemer

Avisi werknemer

Published: 5 June, 2014

A while ago I found the need to create routes on-the-fly with Apache Camel. This is something you wouldn't do very often, most of the time the endpoints you're about to connect inside a route are known during development. Say for instance you want to route a XML message from a Jetty endpoint to a JMS queue. You wouldn't need any dynamic tricks to build this, a static Camel route with a few property replacements (to keep e.g. hostname and port number of the Jetty endpoint configurable) would suffice.

But what if you want to create an arbitrary number of Jetty endpoints (or in my case MLLP endpoints) at runtime? Then you need something more dynamic (again I consider this an exception rather than the rule).

Since I was already packaging my Camel routes as OSGi bundles for deployment in Apache Karaf (this project required more dynamic parts) it was a natural choice to combine the power of both OSGi and Camel.

Managed Service Factories

OSGi has the notion of a 'managed service factory'. Basically, a managed service factory allows one to create and update stuff on-the-fly based on configuration changes received from ConfigAdmin.

So to put it simple: if you add or change a property - e.g. properties file or properties embedded in Karaf features file - the managed service factory is triggered with the property changes and it can act accordingly by creating or updating a Camel route. This way we can create multiple instances of one route with different configurations.

Example

package nl.avisi.example.route;

import org.apache.camel.CamelContext;
import org.apache.camel.component.hl7.HL7MLLPCodec;
import org.apache.camel.core.osgi.OsgiDefaultCamelContext;
import org.apache.camel.impl.SimpleRegistry;
import org.apache.camel.spi.Registry;
import org.osgi.framework.BundleContext;
import org.osgi.service.cm.ConfigurationException;
import org.osgi.service.cm.ManagedServiceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.Map;

/**
* OSGi {@link ManagedServiceFactory} to create {@link MllpListener * Route} Camel routes
*/
public class MllpListenerRouteFactory implements ManagedServiceFactory, Closeable {

private static final Logger LOG = LoggerFactory.getLogger(MllpListenerRouteFactory.class);

private CamelContext camelContext;

private Map<String, MllpListenerRoute> routes = new HashMap<>();

public MllpListenerRouteFactory(final BundleContext bundleContext) {
LOG.debug("Setting up Camel");
setupCamelContext(bundleContext);
}

@Override
public void updated(final String pid, final Dictionary<String, ?> properties) throws ConfigurationException {
LOG.debug("Updating...");
String host = (String) properties.get("host");
String port = (String) properties.get("port");

MllpListenerRoute route = routes.get(pid);
if (route == null) {
LOG.info("Building new route");
addRoute(pid, host, port, new Hl7MllpListenerRoute());
} else {
LOG.info("Updating existing route");
if (host.equals(route.getHost()) && port.equals(route.getPort())) {
return; // only update route if properties changed
}
removeRoute(pid, route);
addRoute(pid, host, port, new Hl7MllpListenerRoute());
}
}

@Override
public void deleted(final String pid) {
LOG.debug("Deleting...");
removeRoute(pid, routes.get(pid));
}

@Override
public String getName() {
return "Factory for " + MllpListenerRoute.class.getSimpleName();
}

@Override
public void close() {
LOG.info("Shutting down Camel");
if (camelContext != null) {
try {
camelContext.stop();
} catch (Exception e) {
LOG.error("Failed to stop Camel", e);
}
}
}

private void removeRoute(final String pid, final MllpListenerRoute route) {
try {
camelContext.stopRoute(route.getRouteId());
camelContext.removeRoute(route.getRouteId());
} catch (Exception e) {
LOG.error("Failed to stop and remove route " + route.getRouteId());
}
routes.remove(pid);
}

private void addRoute(final String pid, final String host, final String port, final MllpListenerRoute route) {
route.setHost(host);
route.setPort(port);
try {
camelContext.addRoutes(route);
} catch (Exception e) {
LOG.error("Failed to add route", e);
}
routes.put(pid, route);
}

private void setupCamelContext(final BundleContext context) {
camelContext = new OsgiDefaultCamelContext(context, createCamelRegistry());
try {
camelContext.start();
} catch (Exception e) {
LOG.error("Failed to start Camel", e);
}
}

private Registry createCamelRegistry() {
SimpleRegistry registry = new SimpleRegistry();
HL7MLLPCodec hl7MLLPCodec = new HL7MLLPCodec();
hl7MLLPCodec.setValidate(false);
registry.put("hl7Codec", hl7MLLPCodec);
return registry;
}
}

The managed service factory above is able to create or update the following route

/**
* HL7v2 MLLP listener
*/
public class Hl7MllpListenerRoute extends RouteBuilder implements MllpListenerRoute {

private static final Logger LOG = LoggerFactory.getLogger(Hl7MllpListenerRoute.class);

private String host, port;

@Override
public void configure() {
LOG.info("Configuring route " + getRouteId());

fromF("mina2:tcp://%s:%s?sync=true&codec=#hl7Codec", getHost(), getPort())
.routeId(getRouteId())
// integration logic removed for brevity
.transform(ack());
}

@Override
public String getRouteId() {
return Hl7MllpListenerRoute.class.getSimpleName() + "-" + getPort();
}

@Override
public String getHost() {
return host;
}

@Override
public void setHost(final String host) {
this.host = host;
}

@Override
public String getPort() {
return port;
}

@Override
public void setPort(final String port) {
this.port = port;
}
}

We need something to bootstrap the Managed Service Factory. Since we only have one factory I've written an OSGi activator using plain OSGi but I suggest also taking a look at one of the available higher-level API's like Declarative Services, Blueprint, etc if you want to use it yourself.

public class Activator implements BundleActivator {

private static final Logger LOG = LoggerFactory.getLogger(Activator.class);

private static final String CONFIG_PID = "nl.avisi.example.route";

private ServiceRegistration routeFactoryRegistration;

@Override
public void start(BundleContext context) {
LOG.debug("Starting");
Dictionary properties = new Hashtable();
properties.put(Constants.SERVICE_PID, CONFIG_PID);
routeFactoryRegistration = context.registerService(
new String[]{ ManagedServiceFactory.class.getName(), Closeable.class.getName() },
new MllpListenerRouteFactory(context),
properties);
}

@Override
public void stop(BundleContext context) {
LOG.debug("Stopping");
if (routeFactoryRegistration != null) {
closeRouteFactory(context);
routeFactoryRegistration.unregister();
routeFactoryRegistration = null;
}
}

private void closeRouteFactory(final BundleContext context) {
Closeable closeable = (Closeable) context.getService(routeFactoryRegistration.getReference());
try {
closeable.close();
} catch (IOException e) {
throw new IllegalStateException("Failed to close " + MllpListenerRouteFactory.class.getName(), e);
}
}
}

Happy coding!

Did you enjoy reading?

Share this blog with your audience!