Tuesday, October 28, 2014

Getting request timeouts to work with spray-can

Request timeouts! This is such a basic feature of any RPC framework - some endpoints are just going to take a really long time compared to others, and you want to be able to wait for reallySlowRpc while still timing out quickly for shouldBeFastButSuperFlaky.

At AI2 we recently switched to using spray-can as our HTTP client, away from dispatch. Dispatch was great in that it let you get started quickly, and it's backed by the excellent-if-generically-named async HTTP client, but has the disadvantages of:
  • backing library is Java, not Scala
  • no Actor support (implied by the above)
  • very limited feature set out of the box
  • documentation is example-based, and sparse
Basically, dispatch is excellent if you need to get something running quickly, but very difficult to use for advanced applications.

spray-can, on the other hand, has great integration with spray-json (which we use for our RPC requests and responses), works well with actors, is pure Scala, and has moderately well-documented code.

The problem is that the actual APIs are often tricky to use. Very, very tricky to use - if the API hasn't been built to support a feature, it can take quite a lot of digging to unearth the right configuration parameter to give you what you need.

Which brings me back to request timeouts.
If you look through the spray documentation, you find this page on connection timeouts. It says:
If no response to a request is received within the configured request-timeout period the connection actor closes the connection and dispatches an Http.Closed event message to the senders of all requests that are currently open.
...
In order to change the respective config setting for this connection only the application can send the following messages to the connection actor:
  • spray.io.ConnectionTimeouts.SetIdleTimeout
  • spray.http.SetRequestTimeout
 Great! Except it doesn't take too much digging to discover that the actor this documentation refers to (HttpClientConnection) isn't exposed through the API at the top of that page.

Going to stack overflow, we find this question and answer, which give us a good starting point in the second half of the answer. The problem with this example is that you have to specify all of the configuration parameters, and there's no way of using the configured values as defaults without manually looking them all up.

Fortunately, some digging through the spray code got me a solution which works - you can use spray.util.actorSystem to build default configurations, then update the data you need:

// Create the default settings from the spray configuration system. This uses
// what's in your config file. Note that the below call requires an implicit
// actorSystem in scope.             
val sprayActorSystem = spray.util.actorSystem
val defaultClientConnectionSettings = ClientConnectionSettings(sprayActorSystem)
val defaultHostConnectionSettings = HostConnectorSettings(sprayActorSystem)
                                                                                                    
// Override the request & idle timeouts.
// Timeouts are all of type scala.concurrent.duration.FiniteDuration.
val updatedClientConnectionSettings = defaultClientConnectionSettings.copy(
  requestTimeout = requestTimeout,
  idleTimeout = idleTimeout)
val updatedHostConnectionSettings = defaultHostConnectionSettings.copy(
  idleTimeout = idleTimeout,
  connectionSettings = updatedClientConnectionSettings)


Full code below:

import akka.actor.ActorSystem
import akka.io.IO
import akka.pattern.ask
import akka.util.Timeout
import spray.can.Http
import spray.can.client._
import spray.client.pipelining._
import spray.http._

import scala.concurrent.Future
import scala.concurrent.duration._

object SprayCanHelper {
  /** Build a sendReceive pipeline with the given timeouts.
    * @param host the host to connect to
    * @param port the port on the host to connect to
    * @param requestTimeout the amount of time to wait for a request to complete
    * before failing
    * @param idleTimeout how long a connection can remain idle before closing it
    * @return the pipeline with the given timeouts (Note that SendReceive is a
    * type alias for HttpRequest => Future[HttpResponse], defined in
    * spray.client.pipelining).
    */
  def createSendReceiveWithTimeout(
      host: String,
      port: Int,
      requestTimeout: FiniteDuration,
      idleTimeout: FiniteDuration
    )(implicit actorSystem: ActorSystem,
      creationTimeout: Timeout): Future[SendReceive] = {

    // Implicit execution context.
    import actorSystem.dispatcher

    // Create the default settings from the spray configuration system. This uses
    // what's in your config file.
    val sprayActorSystem = spray.util.actorSystem
    val defaultClientConnectionSettings =
      ClientConnectionSettings(sprayActorSystem)
    val defaultHostConnectionSettings = HostConnectorSettings(sprayActorSystem)

    // Override the request & idle timeouts.
    val updatedClientConnectionSettings = defaultClientConnectionSettings.copy(
      requestTimeout = requestTimeout,
      idleTimeout = idleTimeout)
    val updatedHostConnectionSettings = defaultHostConnectionSettings.copy(
      idleTimeout = idleTimeout,
      connectionSettings = updatedClientConnectionSettings)

    // Build the pipeline with the updated settings.
    val response = IO(Http).ask(
      Http.HostConnectorSetup(host,
        port = port,
        settings = Some(updatedHostConnectionSettings)))
    response map {
      case Http.HostConnectorInfo(connector, _) => sendReceive(connector)
    }
  }
}

1 comment:

Ryan O'Rourke said...

Thank you, this was helpful.

By the way, HostConnectorSettings (and ClientConnectionSettings too) has a handy method "fromSubConfig" that will set all its fields based on a config that you supply. You can use this to set up selective custom overrides of the spray.can defaults in another config and generate a properly initialized HostConnectorSettings without having to make an instance, use the copy methods, etc.

Here's one way you could do it:

val mainConfig: Config = ConfigFactory.load() // or however you get your overall config that contains the spray.can defaults
val customConfig = ConfigFactory.parseString("myClient { custom = foo, host-connector.max-retries = 99 }") // really this could just be part of your main application.conf or reference.conf, I'm just breaking it out for illustration

val settings = HostConnectorSettings.fromSubConfig(customConfig.getConfig("myClient").withFallback(mainConfig.getConfig("spray.can"))

(This made sense for me because I already had a custom config block for my client to provide stuff like target host, etc.)