Channeling the Inner Complexity

or, lightweight threads and channels for Scala

Jakob Odersky

2018-11-15

Overview

Definitions

Premise


Concurrency - Threads


Single Thread

def mkmeme(imageUrl: String, text: String): Image = {
  val layer1: Image = fetchUrl(imageUrl) // network call
  val layer2: Image = textToImage(text) // slow
  superimpose(layer1, layer2) // need both results
}

Single Thread


Many Threads

def mkmeme(imageUrl: String, text: String): Image = {
  var layer1: Image = null
  var layer2: Image = null
  thread {
    layer1 = fetchUrl(imageUrl)
  }
  thread {
    layer2 = textToImage(text)
  }
  while(layer1 == null || layer2 == null) {
    // wait somehow
  }
  superimpose(layer1, layer2)
}

Many Threads


Multiple Threads, Queue-based

def mkmeme(imageUrl: String, text: String): Image = {
  val q1 = Queue[Image]
  val q2 = Queue[Image]
  thread {
    q1.put(fetchUrl(imageUrl))
  }
  thread {
    q2.put(textToImage(text))
  }
  superimpose(q1.take(), q2.take())
}

Multiple Threads, Queue-based


Concurrency - Callbacks

Callbacks

def mkmeme(imageUrl: String, text: String,
    callback: Image => Unit): Unit = {
  var layer1 = null
  var layer2 = null
  def combine() = callback(superimpose(layer1, layer2))
  fetchUrl(imageUrl, img => {
    layer1 = img
    if (layer2 != null) { //!\\ danger if parallelism > 1
      combine()
    }
  })
  textToImage(text, img => {
    layer2 = img
    if (layer1 != null) {
      combine()
    }
  })
}

callback


Callbacks



Concurrency - Futures

scala.concurrent.Future[A]


Future

def mkmeme(imageUrl: String, text: String): Future[Image] = {
  val layer1: Future[Image] = fetchUrl(imageUrl) 
  val layer2: Future[Image] = textToImage(text)
  for {
    l1 <- layer1
    l2 <- layer2
  } yield {
    superimpose(l1, l2)
  }
}

Promises

scala.concurren.Promise[A]


// ScalaJS, env: browser

def url: Future[String] = {
  val promise = Promise[String] // create promise
  input.onsubmit(_ => promise.success(input.value))
  promise.future
}

// single callback at the edge
url.map(fetch).onComplete{
  case Success(site) => webview.value = site
  case Failure(error) =>
    textbox.value = "oh no!"
    textbox.color = red
}

Execution Contexts

Who runs a future?

ExecutionContext

ThreadPool


Futures - Composition

def lookupUser(id: String): Future[Option[User]]
def authorize(user: User, capabilities: Set[Cap]):
  Future[Option[User]]

def authorizeduser(userId: String): Future[Option[User]] = {
  lookupUser(userId).flatMap{
    case None => Future.successful(None)
    case Some(user) => authorize(user, Set("see_meme"))
  }
}

Futures - Shortcomings

  1. composition can be messy3

  2. one-shot; it is not simple to model recurrent events


Solution to 1 - Scala Async


import scala.concurrent.ExecutionContext.Implicits.global
import scala.async.Async._

// looks like single-threaded code
def mkmeme(imageUrl: String, text: String): Future[Image] = 
  async {
    val layer1 = await(fetchUrl(imageUrl))
    val layer2 = await(textToImage(text))
    superimpose(layer1, layer2)
  }

Solution to 2 - Channels

Solution to 2 - Channels


escale

import scala.concurrent.ExecutionContext.Implicits.global
import escale.syntax._

val ch = chan[Int]() // create a channel

go {
  ch !< 1 // write to channel, "block" if no room
  println("wrote 1")
}
go {
  ch !< 2
  println("wrote 2")
}

go {
  val r: Int = !<(ch) // read from channel
  println(r)
  println(!<(ch))
}

escale

import escale.syntax._

go {
  val Ch1 = chan[Int]() // create a channel
  val Ch2 = chan[Int]()
  
  go { Ch1 !< 1 } // write to channel
  go { Ch2 !< 1 }

  // "await" one and only one value
  select(Ch1, Ch2) match {
    case (Ch1, value) => "ch1 was first"
    case (Ch2, value) => "ch2 was first"
  }
}

escale - Implementation

escale - Roadmap


Summary: what have we done?

All problems in computer science can be solved by another level of indirection.


Other Approaches

Actors

Reactive Streams


Guidelines

Keep programs simple, it will make it easier for others to understand.

  1. write synchronous logic
  2. use futures and promises with scala-async
  3. escale and other concurrency libraries
  4. consider callbacks

Thank You!


References

[1] C. A. R. Hoare, “Communicating sequential processes,” Communications of the ACM. 21 (8), pp. 666–667, 1978.


  1. https://en.wikipedia.org/wiki/Parallelism

  2. https://en.wikipedia.org/wiki/Concurrency_(computer_science)

  3. monad transformers may help