scala-dataflow
scala-dataflow copied to clipboard
Oz-style dataflow (single-assignment) variables and streams for Scala
h1. Scala Dataflow Concurrency DSL
h1. Description
Implements "Oz-style dataflow concurrency":http://www.mozart-oz.org/documentation/tutorial/node8.html#chapter.concurrency through dataflow (single assignment) variables and streams as well as lightweight (event-based) processes/threads.
Currently implemented on top of "Scala Actors":http://scala-lang.org, but could make use of the new "delimited continuations":http://blog.richdougherty.com/2009/02/delimited-continuations-in-scala_24.html support in upcoming Scala 2.8.
The best way to learn how to program with dataflow variables is to read the fantastic book Concepts, Techniques, and Models of Computer Programming. By Peter Van Roy and Seif Haridi.
TODO: Unification of dataflow variables.
h2. Dataflow Variable
Dataflow Variable defines three different operations:
- Define a Dataflow Variable
val x = new DataFlowVariable[Int]
- Wait for Dataflow Variable to be bound
x()
- Bind Dataflow Variable
xA Dataflow Variable can only be bound once. Subsequent attempts to bind the variable will throw an exception.
You can also shutdown a dataflow variable like this:
x.shutdownh2. Dataflow Streams
Dataflow streams work like a BoundedQueue in Java. You can add an item to the tail and try to grab one from the head. Threads will block until data is available.
- Define a Dataflow Stream
val producer = new DataFlowStream[Int]
- Wait for an element in the stream to be available
producer()
- Add an element to the stream
producerh2. Threads
You can easily create millions lightweight (event-driven) threads on a regular workstation.
thread { ... }You can also set the thread to a reference to be able to control its life-cycle:
val t = thread { ... } ... // time passes t ! 'exit // shut down the threadh1. Examples
Most of these examples are taken from the "Oz wikipedia page":http://en.wikipedia.org/wiki/Oz_(programming_language).
To run these examples:
- Compile:
cd src scalac -cp . DataFlow.scala
- Start REPL
scala -cp .Welcome to Scala version 2.7.3.final (Java HotSpot(TM) Client VM, Java 1.6.0_06). Type in expressions to have them evaluated. Type :help for more information. scala>
Paste the examples (below) into the Scala REPL. Note: Do not try to run the Oz version, it is only there for reference.
Have fun.
h2. Example 1
This example is from Oz wikipedia page: http://en.wikipedia.org/wiki/Oz_(programming_language). Sort of the "Hello World" of dataflow concurrency.
h3. Example in Oz
thread Z = X+Y % will wait until both X and Y are bound to a value. {Browse Z} % shows the value of Z. end thread X = 40 end thread Y = 2 endh3. Example in Scala
import DataFlow._ val x, y, z = new DataFlowVariable[Int] thread { zh2. Example 2
Using DataFlowVariable and recursion to calculate sum.
h3. Example in Oz
fun {Ints N Max} if N == Max then nil else {Delay 1000} N|{Ints N+1 Max} end end fun {Sum S Stream} case Stream of nil then S [] H|T then S|{Sum H+S T} end end local X Y in thread X = {Ints 0 1000} end thread Y = {Sum 0 X} end {Browse Y} endh3. Example in Scala
import DataFlow._ def ints(n: Int, max: Int): List[Int] = if (n == max) Nil else n :: ints(n + 1, max) def sum(s: Int, stream: List[Int]): List[Int] = stream match { case Nil => s :: Nil case h :: t => s :: sum(h + s, t) } val x = new DataFlowVariable[List[Int]] val y = new DataFlowVariable[List[Int]] thread { xh2. Example 3
Using DataFlowStream and high-order functions to calculate sum.
h3. Example in Oz
fun {Ints N Max} if N == Max then nil else {Delay 1000} N|{Ints N+1 Max} end end fun {Sum S Stream} case Stream of nil then S [] H|T then S|{Sum H+S T} end end local X Y in thread X = {Ints 0 1000} end thread Y = {Sum 0 X} end {Browse Y} endh3. Example in Scala
import DataFlow._ def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) { println("Generating int: " + n) stream x * x).foldLeft(0)(_ + _)) }h2. Example 4
Using DataFlowStream and recursive functions to calculate sum.
h3. Example in Oz
fun {Ints N Max} if N == Max then nil else {Delay 1000} N|{Ints N+1 Max} end end fun {Sum S Stream} case Stream of nil then S [] H|T then S|{Sum H+S T} end end local X Y in thread X = {Ints 0 1000} end thread Y = {Sum 0 X} end {Browse Y} endh3. Example in Scala
import DataFlow._ def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) { println("Generating int: " + n) streamh2. Example 5
Shows how to shutdown dataflow variables and bind threads to values to be able to interact with them (exit etc.).
h3. Example in Scala
// ======================================= import DataFlow._ // create four 'Int' data flow variables val x, y, z, v = new DataFlowVariable[Int] val main = thread { println("Thread 'main'") x y()) { z