scala-dataflow icon indicating copy to clipboard operation
scala-dataflow copied to clipboard

Oz-style dataflow (single-assignment) variables and streams for Scala

h1. Scala Dataflow Concurrency DSL

h1. Description

Implements "Oz-style dataflow concurrency":http://www.mozart-oz.org/documentation/tutorial/node8.html#chapter.concurrency through dataflow (single assignment) variables and streams as well as lightweight (event-based) processes/threads.

Currently implemented on top of "Scala Actors":http://scala-lang.org, but could make use of the new "delimited continuations":http://blog.richdougherty.com/2009/02/delimited-continuations-in-scala_24.html support in upcoming Scala 2.8.

The best way to learn how to program with dataflow variables is to read the fantastic book Concepts, Techniques, and Models of Computer Programming. By Peter Van Roy and Seif Haridi.

TODO: Unification of dataflow variables.

h2. Dataflow Variable

Dataflow Variable defines three different operations:

  1. Define a Dataflow Variable
val x = new DataFlowVariable[Int]
  1. Wait for Dataflow Variable to be bound
x()
  1. Bind Dataflow Variable
x 

A Dataflow Variable can only be bound once. Subsequent attempts to bind the variable will throw an exception.

You can also shutdown a dataflow variable like this:

x.shutdown

h2. Dataflow Streams

Dataflow streams work like a BoundedQueue in Java. You can add an item to the tail and try to grab one from the head. Threads will block until data is available.

  1. Define a Dataflow Stream
val producer = new DataFlowStream[Int]
  1. Wait for an element in the stream to be available
producer()
  1. Add an element to the stream
producer 

h2. Threads

You can easily create millions lightweight (event-driven) threads on a regular workstation.

thread { ... }

You can also set the thread to a reference to be able to control its life-cycle:

val t = thread { ... }

... // time passes

t ! 'exit // shut down the thread

h1. Examples

Most of these examples are taken from the "Oz wikipedia page":http://en.wikipedia.org/wiki/Oz_(programming_language).

To run these examples:

  1. Compile:
cd src
scalac -cp . DataFlow.scala
  1. Start REPL
scala -cp .
Welcome to Scala version 2.7.3.final (Java HotSpot(TM) Client VM, Java 1.6.0_06).
Type in expressions to have them evaluated.
Type :help for more information.

scala>
  1. Paste the examples (below) into the Scala REPL. Note: Do not try to run the Oz version, it is only there for reference.

  2. Have fun.

h2. Example 1

This example is from Oz wikipedia page: http://en.wikipedia.org/wiki/Oz_(programming_language). Sort of the "Hello World" of dataflow concurrency.

h3. Example in Oz

  thread 
    Z = X+Y     % will wait until both X and Y are bound to a value.
    {Browse Z}  % shows the value of Z.
  end
  thread X = 40 end
  thread Y = 2 end

h3. Example in Scala

  import DataFlow._
  val x, y, z = new DataFlowVariable[Int]
  thread {
    z 

h2. Example 2

Using DataFlowVariable and recursion to calculate sum.

h3. Example in Oz

  fun {Ints N Max}
    if N == Max then nil
    else 
      {Delay 1000}
      N|{Ints N+1 Max}
    end
  end

  fun {Sum S Stream}
    case Stream of nil then S
    [] H|T then S|{Sum H+S T} end
  end

  local X Y in
    thread X = {Ints 0 1000} end
    thread Y = {Sum 0 X} end
    {Browse Y}
  end

h3. Example in Scala

  import DataFlow._

  def ints(n: Int, max: Int): List[Int] =
    if (n == max) Nil
    else n :: ints(n + 1, max)

  def sum(s: Int, stream: List[Int]): List[Int] = stream match {
    case Nil => s :: Nil
    case h :: t => s :: sum(h + s, t)
  }

  val x = new DataFlowVariable[List[Int]]
  val y = new DataFlowVariable[List[Int]]

  thread { x 

h2. Example 3

Using DataFlowStream and high-order functions to calculate sum.

h3. Example in Oz

  fun {Ints N Max}
    if N == Max then nil
    else 
      {Delay 1000}
      N|{Ints N+1 Max}
    end
  end

  fun {Sum S Stream}
    case Stream of nil then S
    [] H|T then S|{Sum H+S T} end
  end

  local X Y in
    thread X = {Ints 0 1000} end
    thread Y = {Sum 0 X} end
    {Browse Y}
  end

h3. Example in Scala

  import DataFlow._

  def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) { 
    println("Generating int: " + n)
    stream  x * x).foldLeft(0)(_ + _)) 
  }

h2. Example 4

Using DataFlowStream and recursive functions to calculate sum.

h3. Example in Oz

  fun {Ints N Max}
    if N == Max then nil
    else 
      {Delay 1000}
      N|{Ints N+1 Max}
    end
  end

  fun {Sum S Stream}
    case Stream of nil then S
    [] H|T then S|{Sum H+S T} end
  end

  local X Y in
    thread X = {Ints 0 1000} end
    thread Y = {Sum 0 X} end
    {Browse Y}
  end

h3. Example in Scala


  import DataFlow._

  def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) { 
    println("Generating int: " + n)
    stream 

h2. Example 5

Shows how to shutdown dataflow variables and bind threads to values to be able to interact with them (exit etc.).

h3. Example in Scala

  // =======================================
  import DataFlow._

  // create four 'Int' data flow variables
  val x, y, z, v = new DataFlowVariable[Int]

  val main = thread {
    println("Thread 'main'")
   
    x  y()) { 
      z