Riot is a distributed computing system for Racket. With Riot, you can run parallel tasks on multiple machines accross the network with minimal changes to your code.
You need a Racket that supports submodules. At the time of writing, only the nightly builds will work.
- To get started, first start the tracker server. In a terminal, run:
$ racket -p gcr/riot/server
- Parellel code looks like this:
;; Save to simple-demo.rkt #lang racket (require (planet gcr/riot)) (define (run) (for/work ([i (in-range 10)]) (sleep 3) ; or some big task (* 10 i))) (module+ main (connect-to-riot-server! "localhost") (displayln (run)) ;; Note that (run) must be in a separate function--see below (displayln "Complete"))
You just wrote the “master” program that hands out work and processes the results. The (for/work ...) form acts just like for/list, but it runs in parallell: for/work packages its body into “workunits” that will be run by other worker processes. for/work will generate one workunit for each iteration. (Using for/work is not the only way to control riot and it has restrictions and odd behavior, but it is the easiest.)Go ahead and start your program:
$ racket simple-demo.rkt
The loop runs 10 times, so your program will register 10 units of work with the tracker. It will then appear to freeze because there aren’t any workers to run the work yet. Once we make some workers, the tracker will assign workunits to them and will return results back to your program once the workers finish. Workers can even run on other machines; in these cases, the tracker will simply send workunits accross the network. There’s no difference between local and networked workers, so commandeer your entire computer lab if you like.
Let’s start some worker processes. If you want your workers to run on other machines, copy simple-demo.rkt there.From the same directory that contains simple-demo.rkt, run
$ racket -p gcr/riot/worker -- localhost
where localhost is the hostname of the tracker server you ran earlier. Once you start a worker, it will immediately start to process workunits. Once all workunits are finished, the master program will un-freeze and the for/work form will return the results of each workunit to the caller.
Add as many workers as you like. The more workers you run, the faster your program goes. If you kill a worker with Ctrl+C or subject it to some other horrible fate, the tracker server should notice and will reassign abandoned workunits to other workers.
If one of the workers throws an exception, the tracker will forward the exception to for/work, which will in turn will throw an exception with a message about which worker caused the problem. Don’t worry —
the tracker remembers completed workunits after your program exits, so if you run your program again, it will pick up where it left off.
If you change your program, be sure to copy the new version to all of the workers and restart them all too. If you don’t, they might complain (throw exceptions) if you’re lucky, or they just might give results generated from the older code if you’re unlucky.
Here’s a slightly more complicated example. Here, we find all compound dictionary words: words in /usr/share/dict/words that are made by concatenating two other dictionary words.
Here’s some simple code to do that:
#lang racket ;; dict.rkt (require (planet gcr/riot)) (define dictionary ;; List of words (for/set ([word (in-list (file->lines "/usr/share/dict/words"))] #:when (>= (string-length word) 3)) word)) (define (word-combinations) (apply append ; This flattens the list (for/list ([first-word (in-set dictionary)]) (for/list ([second-word (in-set dictionary)] #:when (set-member? dictionary (string-append first-word second-word))) (cons first-word second-word))))) (module+ main (define words (time (word-combinations))) (printf "There are ~a words.\n" (length words)) ;; Print a random subset of the results. (write (take (shuffle words) 20)) (newline))
We can split up the outer loop of this dictionary search into parts
Each iteration of the outer loop doesn’t depend on any other iteration; each is self-contained
There isn’t very much processing to do after we have the final word list
cpu time: 11233134 real time: 11231587 gc time: 103748
There are 17658 words.
(("for" . "going") ("tail" . "gating") ("minima" . "list's") ("wise" . "acres") ("mill" . "stone's") ("hare" . "brained") ("under" . "bids") ("Chi" . "lean") ("clod" . "hopper") ("reap" . "points") ("dis" . "missal's") ("scholars" . "hip's") ("over" . "load") ("kilo" . "watts") ("trash" . "cans") ("snaps" . "hot") ("lattice" . "work") ("mast" . "head") ("over" . "coming") ("whole" . "sales"))
#lang racket ;; dict.rkt (require (planet gcr/riot)) (define dictionary ;; List of words (for/set ([word (in-list (file->lines "/usr/share/dict/words"))] #:when (>= (string-length word) 3)) word)) (define (word-combinations) (apply append ; This flattens the list (for/work ([first-word (in-set dictionary)]) (for/list ([second-word (in-set dictionary)] #:when (set-member? dictionary (string-append first-word second-word))) (cons first-word second-word))))) (module+ main (connect-to-riot-server! "alfred") (define words (time (word-combinations))) (printf "There are ~a words.\n" (length words)) ;; Print a random subset of the results. (write (take (shuffle words) 20)) (newline))
$ ~/racket/bin/racket dict.rkt
cpu time: 51903 real time: 1121990 gc time: 1732
There are 17658 words.
(("nick" . "name's") ("head" . "lights") ("ran" . "sacks") ("disc" . "lose") ("build" . "ups") ("wind" . "breaks") ("hot" . "headed") ("god" . "parent") ("main" . "frame") ("fiddle" . "sticks") ("pro" . "verbs") ("Volta" . "ire") ("select" . "ions") ("trail" . "blazer") ("bat" . "ten's") ("sniff" . "led") ("over" . "joys") ("down" . "hill") ("panel" . "led") ("tempera" . "ting"))
This version took 18.7 minutes, which is about a factor of 10 improvement. Our cluster completed about 100 workunits (outer loop iterations) per second. To make this more efficient, we would want to find some way of splitting the work up into larger and less workunits to lower the tracker’s network overhead.
$ ~/racket/bin/racket dict.rkt
cpu time: 30133 real time: 63214 gc time: 772
A master program
A tracker server
One or more worker processeses
The master program sends workunits to the tracker and waits for the tracker to send results back. To do this, the master program uses for/work, which sends units of work to the tracker and returns the results. The program can also use do-work, do-work/call, and do-work/eval forms for lower-level control.
$ racket -p gcr/riot/server
$ racket -p gcr/riot/server -- --port 12345
racket -p gcr/riot/worker -- server-host
racket -p gcr/riot/worker -- --port 1234 --name worker-name server-host
/tmp/demo; racket simple-demo.rkt
$ cd /home/worker/demo;
racket -p gcr/riot/worker -- tracker-server
(do-work body ...)
(for/work (for-clause ...) body ...)
...only refer to serializable? values. If a free variable is unserializable, the workunit cannot be packaged up for transmission accross the network to workers.
...do not cause and do not depend on global side effects. Otherwise, each worker may have different state, causing unpredictable behavior.
The do-work form works by wrapping all of the body ... expressions inside a serial-lambda with no arguments. This effectively makes each workunit its own closure. Free variables are serialized when the workunit is sent to the tracker, and the resulting value is serialized on the return trip.
#lang racket (require (planet gcr/riot)) (define (run) (define master-random-number (round (* 100 (random)))) (define running-workunits (for/list ([x (in-range 10)]) (do-work (format "Workunit #~a: Master chose ~a, but we choose ~a." x master-random-number (round (* 100 (random))))))) (for ([workunit (in-list running-workunits)]) (displayln (wait-until-done workunit)))) (module+ main (connect-to-riot-server! "localhost") (run))
Workunit #0: Master chose 67.0, but we choose 27.0.
Workunit #1: Master chose 67.0, but we choose 51.0.
Workunit #2: Master chose 67.0, but we choose 49.0.
Workunit #3: Master chose 67.0, but we choose 64.0.
Workunit #4: Master chose 67.0, but we choose 62.0.
Workunit #5: Master chose 67.0, but we choose 41.0.
Workunit #6: Master chose 67.0, but we choose 5.0.
Workunit #7: Master chose 67.0, but we choose 54.0.
Workunit #8: Master chose 67.0, but we choose 100.0.
Workunit #9: Master chose 67.0, but we choose 33.0.
To ensure that workers use the same version of the code that the master thinks they’re using, do-work signs the code with an md5 hash of the body ... expressions. If a worker’s hash of a do-work body does not match the master’s hash, the worker will complain. Always keep all worker code up to date.
As mentioned earlier, all free variables that the body refers to must be serializable. This means using serialize-struct instead of normal structs and taking care to ensure that all required libraries do the same.
Be careful about referring to large variables. Top-level variables are OK, but if you dynamically generate a large variable in a function (say, our dictionary) outside of the workunit, the network overhead of transferring the entire dictionary will dwarf the computation time of the workunit.
In other words, this is perfectly fine because dictionary is in the top-level:
;; Acceptable! (define dictionary (list->set (file->lines "/usr/share/dict/words"))) (define (word-combinations) (for/work ([word (in-set dictionary)]) ... word ...))
Each worker will load the dictionary once when the file is required and use that copy for each workunit.This, however, will send an entire copy of the dictionary along with each and every workunit:
When workers attempt to execute a workunit created by a do-work or for/work form, they require the module and search for the code to be executed. Be sure that workers won’t execute either of these forms when the module is required, or else your workers will try to create workunits of their own!
(do-work/call module exported-fun arg ...) → any/c module : module-path? exported-fun : symbol? arg : serializable?
Returns instantly (after one round-trip to the tracker) and returns a workunit ID, a string representing a promise to do the work. Pass this value to wait-until-done to return the value of the called function.
#lang racket ;; This is do-work-call.rkt (require (planet gcr/riot)) (provide double) (define (double x) (* x 2)) (define (run) (define running-workunits (for/list ([x (in-range 10)]) (do-work/call "do-work-call.rkt" 'double x))) (map wait-until-done running-workunits)) (module+ main (connect-to-riot-server! "localhost") (run))
Returns instantly (after one round-trip to the tracker) and returns a workunit ID, a string representing a promise to do the work. Pass this value to wait-until-done to return the evaluated value of datum.
This is the only function that does not require workers to share any code with the master.
#lang racket (require (planet gcr/riot)) (connect-to-riot-server! "localhost") (wait-until-done (do-work/eval '(+ 3 5)))
Riot will call (thunk error? client-name result). If the workunit succeeds, error? is #f, client-name is the ID of the client that finished the workunit, and result is the result of the workunit. If the workunit fails, error? is #t and result is the message of the exception that caused the workunit to fail.
(connect-to-riot-server! hostname [ port client-name]) → any/c hostname : string? port : exact-integer? = 2355 client-name : exact-integer? = (gethostname)
(connect-to-tracker hostname [ port client-name]) → client? hostname : string? port : exact-integer? = 2355 client-name : exact-integer? = (gethostname)
(list status wu-client result last-change)
(client-call-with-new-workunit client data thunk) → any/c client : client? data : serializable? thunk : (-> any/c any/c)
(client-complete-workunit! client workunit error? result) → any/c client : client? workunit : any/c error? : boolean? result : serializable?
The code in this (planet gcr/riot) package and this documentation is under the zlib license, reproduced below.
Copyright (c) 2012 gcr
This software is provided 'as-is', without any express or implied
warranty. In no event will the authors be held liable for any damages
arising from the use of this software.
Permission is granted to anyone to use this software for any purpose,
including commercial applications, and to alter it and redistribute it
freely, subject to the following restrictions:
1. The origin of this software must not be misrepresented; you must not
claim that you wrote the original software. If you use this software
in a product, an acknowledgment in the product documentation would be
appreciated but is not required.
2. Altered source versions must be plainly marked as such, and must not be
misrepresented as being the original software.
3. This notice may not be removed or altered from any source