Szymon Jednac bio photo

Szymon Jednac

Email Twitter LinkedIn Github Stackoverflow

I’ve been working on a pet project of my own lately, which is an Akka Streams wrapper for ZeroMQ sockets. In order to test the library and the target application itself, I wanted to build some load tests, which could determine how well they performed under an arbitrary pressure.

There are several ways one could approach this, but if you want to stick to an existing framework and work on a Scala project, than Gatling is probably a good choice. The framework provides an elegant DSL and can be integrated with an sbt project using the official plugin. You can find a brief summary of the core features on the project’s home page and in this blog post by ThoughtWorks.

Gatling is protocol-agnostic at its core, which allows to build some custom testing scenarios on top of it. However, most examples are focused on HTTP or JMS at the moment, which makes it somehow difficult to begin with.

This article is a summary of my own process with a custom DSL implementation for ZeroMQ. I hope you’ll find it useful, if you’re implementing a protocol of your own.

Big picture

The initial test case I took was a simple publisher-subscriber fan-out, which would occur over relevant ZeroMQ sockets. My Gatling simulation would look something like this:

import com.mintbeans.rzmq.gatling.Predef._
import io.gatling.core.Predef._
import scala.concurrent.duration._

class MySimulation extends Simulation {
  val endpoint = "tcp://127.0.0.1:5555"
  val topic = "some-topic"

  val zmqConfig = ZMQ.endpoint(endpoint).receiveTimeout(100 millis)
  val subscribers = scenario("Consume Topic").exec(
    subscriber("simple-subscriber").topic(topic).messagesToRead(5)
  )

  setUp(
    subscribers.inject(atOnceUsers(10))
  ).protocols(zmqConfig).maxDuration(1 minutes)
}

As you’ve probably figured out, the idea is to create a group of simultaneous subscribers, that will listen to some fixed topic over TCP/IP and read an arbitrary number of messages.

The publisher is assumed to be working on the given address. It’s implementation will be skipped in this article, but you can assume, that it will provide a constant stream of ascending integers.

Preparing the DSL

I started by analyzing the JMS module, which is a fairly simple protocol implementation, comparing to HTTP. I’ve also read the Gatling Protocol Breakdown by James Gregory, which I highly recommend to begin with.

NOTE: The following implementation is based on Gatling 2.2.0-M3. Several things had changed between 2.1.x and 2.2.x in the context of custom protocol implementation (in a good way). I would recommend checking the relevant GitHub branch, if you intend to work with 2.1.x.

Predef & DSL trait

The convention is to put all protocol-specific DSL into a Predef object for easy importing in your Simulation classes. Let’s start by defining it:

object Predef extends ZmqDsl

trait ZmqDsl {
  ...
}

Protocol definition

The protocol needs a Gatling representation, which will provide shared attributes for establishing a connection:

case class ZmqProtocol(endpoint: String, receiveTimeout: FiniteDuration) extends io.gatling.core.config.Protocol

We need to express the ZmqProtocol creation process in the DSL. The convention is to create a set of relevant builder classes, that will form a hierarchy:

object ZmqProtocolBuilderBase {
  def endpoint(address: String) = ZmqProtocolBuilder(address)
}

case class ZmqProtocolBuilder(endpoint: String, receiveTimeout: FiniteDuration = 500 millis) {

  def receiveTimeout(duration: FiniteDuration) = copy(receiveTimeout = duration)

  def build = ZmqProtocol(
    endpoint = endpoint,
    receiveTimeout = receiveTimeout
  )
}

trait ZmqDsl {
  val ZMQ = ZmqProtocolBuilderBase
  ...

  //By convention, a protocol instance is built using an implicit conversion
  import scala.language.implicitConversions

  implicit def zmqProtocolBuilder2zmqProtocol(builder: ZmqProtocolBuilder): ZmqProtocol = builder.build
}

Our simulation code could look like this at the moment:

import Predef._

val zmqConfig1 = ZMQ.endpoint(endpoint)
val zmqConfig2 = ZMQ.endpoint(endpoint).receiveTimeout(500 millis)

//A builder instance should be converted to a Gatling protocol implicitly
setUp(...).protocols(zmqConfig1)

You can add intermediary builders of course, if protocol creation should happen in several steps, that take some mandatory attributes on the way.

Example: the “base” builder could produce an “endpoint step” builder, which would take some mandatory parameters and create a valid ZmqProtocolBuilder in the end. The final builder is assumed to take optional configuration only. That’s why we’re creating a copy of it once an attribute is altered (both builders are immutable and can produce a valid protocol).

Action definition

An Action is a top level abstraction for executing a concrete step along a scenario. It is implemented as an Akka actor, that receives Session messages for triggering the action.

In this example we’ll implement a subscriber action, that will:

  1. Connect to a ZMQ_PUB socket
  2. Consume some integers from the stream
  3. Terminate

ActionBuilder DSL

Instead of producing an Action directly, we need to create a hierarchy of DSL builders, that will be converted to an ActionBuilder instance when evaluated in the simulation class. This “action builder” will be used by Gatling to create concrete Action instances during scenario execution later.

Let’s define the DSL and the ActionBuilder for the subscriber process:

case class SubscriberProcessBuilderBase(processName: String) {
  def topic(topic: String) = SubscriberProcessBuilder(processName, topic)
}

case class SubscriberProcessBuilder(processName: String, topic: String, messagesToRead: Int = 1) {
  def messagesToRead(numberOfMessages: Int) = copy(messagesToRead = numberOfMessages)

  def build(): ActionBuilder = SubscriberActionBuilder(processName, topic, messagesToRead)
}

case class SubscriberActionBuilder(processName: String, topic: String, messagesToRead: Int) extends ActionBuilder {
  //NOTE: This is 2.2.0-M3 specific
  def build(system: ActorSystem, next: ActorRef, ctx: ScenarioContext) = {
    val protocol = ctx.protocols.protocol[ZmqProtocol]
    val statsEngine = ctx.statsEngine

    system.actorOf(SubscriberAction.props(processName, protocol, topic, messagesToRead, statsEngine, next), actorName("subscriber"))
  }
}

trait ZmqDsl {
  ...

  def subscriber(processName: String) = SubscriberProcessBuilderBase(processName)

  //By convention, an ActionBuilder instance is created using an implicit conversion from a parent builder
  import scala.language.implicitConversions

  implicit def subscriberProcessBuilder2ActionBuilder(builder: SubscriberProcessBuilder): ActionBuilder = builder.build()
}

We can use this in the simulation as follows:

import Predef._

val subscribers = scenario("Consume Topic").exec(
  //Our SubscriberProcessBuilder should be implicitly converted to an ActionBuilder instance
  subscriber("simple-subscriber").topic("some-topic").messagesToRead(5)
)

setUp(
  subscribers.inject(atOnceUsers(10))
).protocols(zmqConfig)

Action implementation

One missing piece is the SubscriberAction implementation, which is created by the SubscriberActionBuilder in the previous step. We’ll implement it next:

class SubscriberAction(processName: String, protocol: ZmqProtocol, topic: String, messagesToRead: Int, val statsEngine: StatsEngine, val next: ActorRef) extends Interruptable with Failable {

  val expectedTopicBytes = ByteString(topic)
  val mainCtx = new ZContext(1)

  override def postStop() = {
    mainCtx.close()
  }

  override def executeOrFail(session: Session): Validation[String] = {
    val ctx = ZContext.shadow(mainCtx)
    try {
      statsEngine.logRequest(session, processName)

      val requestStartDate = now()
      val socket = ctx.createSocket(ZMQ.SUB)
      socket.setReceiveTimeOut(protocol.receiveTimeout.toMillis.toInt)
      socket.subscribe(topic.getBytes(ZMQ.CHARSET))
      socket.connect(protocol.endpoint)
      val requestEndDate = now()

      val messageStream = {
        val serverStream = for (i <- Stream.range(0, messagesToRead)) yield readMessage(socket)
        serverStream.takeWhile(validContent _).map(extractPayload _)
      }

      val responseStartDate = now()
      val messages = messageStream.toList
      val responseEndDate = now()

      val timings = ResponseTimings(requestStartDate, requestEndDate, responseStartDate, responseEndDate)

      messages match {
        case list if list.size == messagesToRead =>
          statsEngine.logResponse(session, processName, timings, Status("OK"), None, None)
          Success("OK")
        case list =>
          statsEngine.logResponse(session, processName, timings, Status("KO"), None, None)
          Failure(s"Invalid list size ${list.size}")
      }
    } finally {
      ctx.close()
      session.terminate()
    }
  }

  @inline
  private def validContent(response: Option[Either[String, ByteString]]): Boolean = response match {
    case Some(Right(_)) => true
    case _ => false
  }

  @inline
  private def extractPayload(response: Option[Either[String, ByteString]]): ByteString = response match {
    case Some(Right(bytes)) => bytes
    case _ => throw new IllegalStateException("Extracting payload from an invalid response.")
  }

  @inline
  private def readMessage(socket: ZMQ.Socket) = {
    Option(ZMsg.recvMsg(socket)).map { zMsg =>
      zMsg.asScala.toList.map(zFrame => ByteString(zFrame.getData))
    }.map {
      case topic :: payload :: Nil if topic == expectedTopicBytes => Right(payload)
      case topic :: payload :: Nil => Left(s"Invalid topic: ${new String(topic.toArray, ZMQ.CHARSET)}")
      case _ => Left("Invalid message format")
    }
  }

  @inline
  private def now() = System.currentTimeMillis()
}

The most important part are the statsEngine calls, which are responsible for storing request and response stats. If everything was setup correctly, then you should be able to get a valid report after all tests were executed.

Gatling screenshot

Conclusions

I hope you enjoyed the article and didn’t get discouraged by implicit conversions and longish code examples.

You can find a complete version of the DSL in the reactive-zeromq project on GitHub. Please take it with a grain of salt - it’s a small experiment of mine, not a full blown communication library.