Introduction into dataflow programming
What is dataflow programming all about? In classical imperative programming a program is basically set of
operations working with mutable state thus effectively hiding data paths. Dataflow programming is more like a
series of workers/robots on an assembly line, who execute only when input material arrives. Imperative programming
style can introduce non-determinism in case of concurrent execution (multithreading) without proper
synchronization. In dataflow programming program execution depends on the actual data, or on the data availability
to be precise. Dataflow programming yields completely deterministic programs.
Let’s introduce the concept of dataflow variable which is one of the main concepts of dataflow programming.
Dataflow variable can have two possible states: bound (assigned a value) or unbound (no value has been yet
assigned). Whenever a thread tries to read the value of unbound dataflow variable it gets blocked until some other
thread bounds the variable. Dataflow variable can be bound only once, successive tries to bind the variable will
fail. So, what is dataflow programming? With dataflow variable one can also build blocking queues and streams.
Actor model can be implemented using such blocking queues.
Basically, you can get more information on dataflow programming from this Wikipedia article. Also there is nice article in Groovy GPars guide.
Overview of the article
This article presents basic implementations of dataflow variable in both C# and F#. Also article demonstrates
examples of dataflow programming in C# using futures. The best effect of dataflow programming is achieved in
programming languages that follow declarative model principles. In our case C# is imperative language and
programming in a dataflow style requires developers to be self-disciplined. Surprisingly, but F# being considered
to be a functional programming language, and therefore following declarative programming paradigm, also enables
developers to program in an imperative programming way (via mutable keyword). Adding dataflow variables to C# and
F# does not make them automatically dataflow programming languages, because there is still no necessary syntactic
sugar and language support.
Clojure is one of the most popular modern languages that enable dataflow programming. Clojure supports dataflow
programming through premises. It is also possible to do a dataflow programming in other popular languages like
Groovy, Scala, Ruby using open-source libraries like GPars for Groovy, but all those languages provide no syntactic
support for dataflow variables. As a genuine dataflow programming language I would distinguish Oz programming
language which treats all variables as dataflow variables: reader trying to read an unbound/uninitialized variable
will be blocked until variable is bound/initialized. One one hand it saves us from fameous NullReferenceException
exceptions, but on the other hand it can introduce program hangs.
First I will present implementations in C# and F# and later will dig into the thread synchronization details.
Dataflow variables in C#
Let’s start with the simple example of how to use a dataflow variable in C#.
var variable = new DataflowVariable(); //create variable
variable.Bind(value); //bind variable
int value = 1000 + variable;//read variable
C# is not very extensible when it comes to operator overloading (as you later see in F# implementation) and this is
the reason we are using Bind method here. Actually this is a matter of taste – whether to use operators when
working with dataflow variables or simply properties/functions, but as per me operators look more naturally. What I
love about C# is implicit conversion operators.
Now the code itself:
public class DataflowVariable
{
private readonly object syncLock = new object();
private volatile bool isInitialized = false;
private volatile object value;
private T Value
{
get
{
if(!isInitialized)
{
lock(syncLock)
{
while(!isInitialized)
Monitor.Wait(syncLock);
}
}
return (T)value;
}
set
{
lock (syncLock)
{
if (isInitialized)
throw new System.InvalidOperationException("Dataflow variable can be set only
once.");
else
{
this.value = value;
isInitialized = true;
Monitor.PulseAll(syncLock);
}
}
}
}
public void Bind(T newValue)
{
this.Value = newValue;
}
public static implicit operator T(DataflowVariablemyVar)
{
return myVar.Value;
}
}
Dataflow variables in F#
Let’s start with the simple example of how to use a dataflow variable in F#.
let myVar = new DataflowVariable() // create variable
myVar <~ value //bind variable
let value = (1000 + !!myVar) //read variable
Here we use operator (<~) to bind the dataflow variable and operator (!!) to read its value.
Now the code itself:
type public DataflowVariable<'T> () =
class
[<volatilefield>]
let mutable value : option<'T> = None
let syncLock = new System.Object()
member private this.Value
with get() : 'T =
match value with
| Some(initializedVal) -> initializedVal
| None ->
lock syncLock (fun () ->
while (value.IsNone) do
ignore (System.Threading.Monitor.Wait(syncLock))
value.Value)
and set(newVal : 'T) =
lock syncLock (fun () ->
match value with
| Some(_) -> invalidOp "Dataflow variable can be set only once."
| None ->
value <- Some(newVal)
System.Threading.Monitor.PulseAll(syncLock))
static member public (<~) (var:DataflowVariable<'T>, initValue:'T) =
var.Value <- initValue
static member public (!!) (var:DataflowVariable<'T>) : 'T =
var.Value
end
You may have noticed [<volatilefield>] attribute. As per pretty stingy documentation this attribute
effectively replaces volatile keyword in C#, but I haven’t performed thorough testing to verify it. What? F# has no
keyword for volatile fields? And this is as it has to be. Volatile fields belong to the domain of imperative
programming and F#, being first of all functional programming language (which is implementation of declarative
model), tries to avoid shared state (remember mutable keyword?). F# does not support overloading of implicit
conversion operators, that’s why we need some kind of dereferencing prefix operator (!!).
F# implementation is more elegant, because we expose Option type here and thus do not have to deal with
isInitialized field as in case of C# implementation.
Implementation details and some thoughts on thread synchronization
For synchronization in both implementations I have used volatile fields in conjunction with a simple pattern for
Monitor.Wait/Monitor.Pulse. More information regarding Monitor.Pulse/Monitor.Wait you can get in this very nice
article by Joe Albahari.
Volatile fields here are used to prevent instruction reordering and ensure CPU cache synchronization.
Also as an option, instead of using volatile field, we could use here Thread.VolatileRead method (we do not need to
use also Thread.VolatileWrite because actual write is done in within the lock statement which effectively prevents
reordering and flushes and invalidates CPU cache, and anyway Thread.VolatileWrite only flushes the CPU cache but
does not invalidate it). Basically, the static VolatileRead and VolatileWrite methods in the Thread class
read/write a variable while enforcing (technically, a superset of) the guarantees made by the volatile keyword.
Dataflow programming examples in C# and F#
In C# I will demonstrate a simple example of dataflow prorgamming with Parallel extensions library (futures and
continuations). Basically using Task.Factory.ContinueWhenAll one can achieve similar results as with dataflow
variables, but dataflow variables provide developers with much more flexibility.
var input1 = new DataflowVariable<int>();
var input2 = new DataflowVariable<int>();
var output1 = new DataflowVariable<int>();
var output2 = new DataflowVariable<int>();
Task<int> task1 = Task.Factory.StartNew<int>(
() =>
{
output1.Bind(input1 + input2);
return output1*10;
});
Task<int> task = Task.Factory.StartNew<int>(() =>
{
output2.Bind(input1 + output1);
return input1;
});
input1.Bind(333);
input2.Bind(888);
Console.WriteLine(10 + output1 + output2);
Conclusion
Article describes basic implementation of dataflow variables in C# and F# programming languages and basic examples
of dataflow programming using continuations/futures. Please, consider this article as an starting point in a
journey into the world of dataflow programming.