Concurrent Programming in fsharp using Hopac - Part 4
Welcome back to the fourth part of Concurrent Programming in fsharp blog post series. In part-2, we just learned that
Alt<'a> is a subclass of
Job<'a>. In this blog post, we are doing to dive deep into this abstraction and learn what it brings to the table.
Before diving into the definition of
Alt<'a>, let's figure out why we need it in the first place.
Assume that we have a function
delayedPrintn which prints a given message after
open Hopac // string -> int -> Job<unit> let delayedPrintn msg delayInMillis = timeOutMillis delayInMillis |> Job.map (fun _ -> printfn "%s" msg)
Executing this function in F# interactive,
#time "on" delayedPrintn "Hi" 3000 |> run #time "off"
will give us the following output
--> Timing now on Hi Real: 00:00:03.000, CPU: 00:00:00.002, GC gen0: 0, gen1: 0 val it : unit = () --> Timing now off
Nothing fancy and it worked as expected.
Let's make it little complicated by defining two more jobs to print
Hello after waiting for
1000 milliseconds respectively.
// Job<unit> let delayedHiPrinter = delayedPrintn "Hi" 2000 // Job<unit> let delayedHelloPrinter = delayedPrintn "Hello" 1000
Then define a function to run these two jobs in parallel using the infix operator function
open Hopac.Infixes let runThemParallel () = delayedHiPrinter <*> delayedHelloPrinter |> run |> ignore
If we run this function in F# interactive,
#time "on" runThemParallel () #time "off"
We can witness that the jobs were executed parallelly and print the output as expected.
--> Timing now on Hello Hi Real: 00:00:02.004, CPU: 00:00:00.006, GC gen0: 0, gen1: 0 val it : unit = () --> Timing now off
And here comes the new requirement!
Given we have two printers like the above, if one printer completes its job, stop the other from executing it.
That's interesting! Let's explore how can we solve this
The Alt Type
Altrepresents a first-class selective synchronous operation. The idea of alternatives is to allow one to introduce new selective synchronous operations to be used with non-determinic choice.
Obviously, when you have a concurrent server that responds to some protocol, you don't have to perform the protocol as a selective synchronous operation.
However, if you do encapsulate the protocol as a selective synchronous operation, you can then combine the operation with other selective synchronous operations. That is the essence of Hopac and CML. - Hopac Documentation
The critical point that we are interested in to solve our problem is
selective. In other words, among the two printers, we are concerned (selective) in the one which prints first.
The function that can help us here is
Alt.choosecreates an alternative that is available when any one of the given alternatives is
val choose: seq<#Alt<'x>> -> Alt<'x>
As we are dealing with only two
Alts, we are going to make use of
<|> operator function which is an optimised version of calling the
choose function with a sequence of two items.
<|>creates an alternative that is available when either of the given alternatives is available. xA1 <|> xA2 is an optimized version of choose [xA1; xA2].
val ( <|> ): Alt<'x> -> Alt<'x> -> Alt<'x>
The given alternatives are processed in a left-to-right order with short-cut evaluation. In other words, given an alternative of the form first <|> second, the first alternative is first instantiated and, if it is available, is committed to and the second alternative will not be instantiated at all.
Revisting delayedPrintn function
delayedPrintn function is returning
Job<unit> function now.
val delayedPrintn: string -> int -> Job<unit>
<|> operator function, we need to modify it to return
timeOutMillis function is already returning
val timeOutMillis: int -> Alt<unit>
Job.map function transforming it to
val map: ('x -> 'y) -> Job<'x> -> Job<'y>
let delayedPrintn msg delayInMillis = timeOutMillis delayInMillis // Alt<unit> |> Job.map (fun _ -> printfn "%s" msg) // Job<unit>
Alt<'a> is a subclass of Job<'a>
To achieve what we are doing with
Job.map, we can make use of the
val afterFun: ('x -> 'y) -> Alt<'x> -> Alt<'y>
- // string -> int -> Job<unit> + // string -> int -> Alt<unit> let delayedPrintn msg delayInMillis = timeOutMillis delayInMillis // Alt<unit> - |> Job.map (fun _ -> printfn "%s" msg) // Job<unit> + |> Alt.afterFun (fun _ -> printfn "%s" msg) // Alt<unit>
Then we can make use of the
<|> operator function to choose between the two printers.
// unit -> unit let chooseBetweenThem () = delayedHiPrinter <|> delayedHelloPrinter |> run
If we execute the
chooseBetweenThem function with the timer on in F# interactive,
#time "on" chooseBetweenThem () #time "off"
We can verify that it only prints
Hello after a seconds delay
--> Timing now on Hello Real: 00:00:01.002, CPU: 00:00:00.004, GC gen0: 0, gen1: 0 val it : unit = () --> Timing now off
Wait what is happening behind the scene? Was the
Yes, It is. But as soon as the
delayedHelloPrinter completes its execution, the
<|> function stops the execution of
delayedHiPrinter and hence we don't see
Hi in the output.
To verify this, we can modify the
delayedPrintn as below, which prints a log message when printer started its execution
// string -> int -> Alt<unit> let delayedPrintn msg delayInMillis = Alt.prepareFun <| fun _ -> printfn "starting [%s]" msg timeOutMillis delayInMillis |> Alt.afterFun (fun _ -> printfn "%s" msg)
Alt.prepareFun function that we used here creates an alternative that is computed at instantiation time with the given anonymous function
val prepareFun: (unit -> Alt<'x>) -> Alt<'x>
If we execute the function
chooseBetweenThem now, we'll get the following output
--> Timing now on starting [Hi] starting [Hello] Hello Real: 00:00:01.006, CPU: 00:00:00.005, GC gen0: 0, gen1: 0 val it : unit = () --> Timing now off
In the above section, we didn't care about the
delayedHiPrinter and ignored it completely. But in particular real-world use cases, we can't afford an execution to be stopped abruptly. In those cases, we need to let the
Alt<'a> know about this situation.
To implement this kind of scenarios, Hopac offers Negative Acknowledgement.
To implement this behaviour in our example, let's create an another function
delayedPrintnWithNack which wraps the
delayedPrintn with the negative acknowledgement support.
// string -> int -> Alt<unit> let delayedPrintnWithNack msg delayInMillis = // Alt<'a> -> Alt<unit> let onNack nack = // <1> nack |> Alt.afterFun (fun _ -> printfn "aborting [%s]" msg) Alt.withNackJob <| fun nack -> // <2> Job.start (onNack nack) // <3> |> Job.map (fun _ -> delayedPrintn msg delayInMillis) // <4>
There is a lot is happening in this short code snippet. So, Let's dissect it.
<1> We are defining an
onNack function to specify what to do in the event of a negative acknowledgement. For simplicity we are just printing an abort message.
<2> To make any
Alt<'a> negative acknowledgement aware, Hopac provides a function called
val withNackJob: (Promise<unit> -> Job<Alt<'x>>) -> Alt<'x>
> The `withNackJob` function creates an alternative that is computed at instantiation time with the given job constructed with a negative acknowledgement alternative. > `withNackJob` allows client-server protocols that do require the server to be notified when the client aborts the transaction to be encapsulated as selective operations. > The negative acknowledgement alternative will be available in case some other instantiated alternative involved in the choice is committed to instead. - **Hopac Documentation** > `Promise<'a>` is a sub class of `Alt<'a>`, which we'll see in detail in a later blog post <3> Using the `Job.start` function, we are immediately starting the `onNack` job in an another concurrent job > ```fsharp val start: Job<unit> -> Job<unit>
<4> After starting the
onNack job, we are calling the actual
delayedPrintn and return its result.
Let's verify this behaviour with a new set of function.
let delayedHiPrinterWithNack = delayedPrintnWithNack "Hi" 2000 let delayedHelloPrinterWithNack = delayedPrintnWithNack "Hello" 1000 let chooseBetweenThemWithNack () = delayedHiPrinterWithNack <|> delayedHelloPrinterWithNack |> run #time "on" chooseBetweenThemWithNack () #time "off"
--> Timing now on starting [Hi] starting [Hello] Hello aborting [Hi] Real: 00:00:01.000, CPU: 00:00:00.001, GC gen0: 0, gen1: 0 val it : unit = () --> Timing now off
From the log that we can assert that we gracefully handled the negative acknowledgement.
Here is my best effort to show what is happening in the
In this blog post, we explored how to implement selective synchronisation in Hopac using
Alt. It is fascinating to experience that we can write harder concurrent programs with less code.
Stay tuned for the upcoming blog posts. We are going to build some awesome stuff using
The source code of this blog post is available on GitHub
Reposted from my personal blog