1 In which we construct a networked mapreduce cluster from scratch in about thirty seconds
2 In which we gain significant speedups through the copious application of spare machinery
3 In which we present an overview and clarity is achieved
3.1 The master program
3.2 The tracker
3.3 The workers
3.4 In which we describe the peculiarities of for/ work and do-work
do-work
for/ work
3.5 In which, with a heavy heart, we outline restrictions and limitations of for/ work and do-work
3.6 In which we describe the numerous kinds of workunits and how to create them
do-work/ call
do-work/ eval
wait-until-done
call-when-done
connect-to-riot-server!
current-client
4 In which we present a lower-level client API for communicating with the tracker
client?
connect-to-tracker
client-who-am-i
client-workunit-info
client-call-with-workunit-info
client-wait-for-work
client-call-with-work
client-add-workunit
client-call-with-new-workunit
client-wait-for-finished-workunit
client-call-with-finished-workunit
client-complete-workunit!
5 In which we outline licensing and copyrights

riot: Distributed computing for the masses

gcr

Riot is a distributed computing system for Racket. With Riot, you can run parallel tasks on multiple machines accross the network with minimal changes to your code.

You need a Racket that supports submodules. At the time of writing, only the nightly builds will work.

    1 In which we construct a networked mapreduce cluster from scratch in about thirty seconds

    2 In which we gain significant speedups through the copious application of spare machinery

    3 In which we present an overview and clarity is achieved

      3.1 The master program

      3.2 The tracker

      3.3 The workers

      3.4 In which we describe the peculiarities of for/work and do-work

      3.5 In which, with a heavy heart, we outline restrictions and limitations of for/work and do-work

      3.6 In which we describe the numerous kinds of workunits and how to create them

    4 In which we present a lower-level client API for communicating with the tracker

    5 In which we outline licensing and copyrights

1 In which we construct a networked mapreduce cluster from scratch in about thirty seconds

  1. To get started, first start the tracker server. In a terminal, run:

    $ racket -p gcr/riot/server

  2. Parellel code looks like this:
    ;; Save to simple-demo.rkt
    #lang racket
    (require (planet gcr/riot))
     
    (define (run)
      (for/work ([i (in-range 10)])
        (sleep 3) ; or some big task
        (* 10 i)))
     
    (module+ main
      (connect-to-riot-server! "localhost")
      (displayln (run))
      ;; Note that (run) must be in a separate function--see below
      (displayln "Complete"))

    You just wrote the “master” program that hands out work and processes the results. The (for/work ...) form acts just like for/list, but it runs in parallell: for/work packages its body into “workunits” that will be run by other worker processes. for/work will generate one workunit for each iteration. (Using for/work is not the only way to control riot and it has restrictions and odd behavior, but it is the easiest.)

    Go ahead and start your program:

    $ racket simple-demo.rkt

    The loop runs 10 times, so your program will register 10 units of work with the tracker. It will then appear to freeze because there aren’t any workers to run the work yet. Once we make some workers, the tracker will assign workunits to them and will return results back to your program once the workers finish. Workers can even run on other machines; in these cases, the tracker will simply send workunits accross the network. There’s no difference between local and networked workers, so commandeer your entire computer lab if you like.

  3. Let’s start some worker processes. If you want your workers to run on other machines, copy simple-demo.rkt there.

    From the same directory that contains simple-demo.rkt, run

    $ racket -p gcr/riot/worker -- localhost

    where localhost is the hostname of the tracker server you ran earlier. Once you start a worker, it will immediately start to process workunits. Once all workunits are finished, the master program will un-freeze and the for/work form will return the results of each workunit to the caller.

    Add as many workers as you like. The more workers you run, the faster your program goes. If you kill a worker with Ctrl+C or subject it to some other horrible fate, the tracker server should notice and will reassign abandoned workunits to other workers.

    If one of the workers throws an exception, the tracker will forward the exception to for/work, which will in turn will throw an exception with a message about which worker caused the problem. Don’t worry — the tracker remembers completed workunits after your program exits, so if you run your program again, it will pick up where it left off.

    If you change your program, be sure to copy the new version to all of the workers and restart them all too. If you don’t, they might complain (throw exceptions) if you’re lucky, or they just might give results generated from the older code if you’re unlucky.

2 In which we gain significant speedups through the copious application of spare machinery

Here’s a slightly more complicated example. Here, we find all compound dictionary words: words in /usr/share/dict/words that are made by concatenating two other dictionary words.

Here’s some simple code to do that:

#lang racket
;; dict.rkt
(require (planet gcr/riot))
 
(define dictionary
  ;; List of words
  (for/set ([word (in-list (file->lines "/usr/share/dict/words"))]
            #:when (>= (string-length word) 3))
           word))
 
(define (word-combinations)
   (apply append ; This flattens the list
          (for/list ([first-word (in-set dictionary)])
            (for/list ([second-word (in-set dictionary)]
                       #:when (set-member? dictionary
                                           (string-append first-word
                                                          second-word)))
              (cons first-word second-word)))))
 
(module+ main
  (define words (time (word-combinations)))
  (printf "There are ~a words.\n" (length words))
  ;; Print a random subset of the results.
  (write (take (shuffle words) 20)) (newline))

There are definitely better ways to do this. We naively loop through the entire dictionary for each dictionary word and see if the concatenation is also part of the dictionary. There are two ways of making this faster: we can use a smarter algorithm and more complicated data structures (also known as “the right way”), or we can just throw more hardware at the problem (also known as “the fun way”). As written, this code is an ideal candidate for parallelization because:
  • We can split up the outer loop of this dictionary search into parts

  • Each iteration of the outer loop doesn’t depend on any other iteration; each is self-contained

  • There isn’t very much processing to do after we have the final word list

Running this on an Intel Xeon 1.86GHz CPU produced this output:

cpu time: 11233134 real time: 11231587 gc time: 103748

There are 17658 words.

(("for" . "going") ("tail" . "gating") ("minima" . "list's") ("wise" . "acres") ("mill" . "stone's") ("hare" . "brained") ("under" . "bids") ("Chi" . "lean") ("clod" . "hopper") ("reap" . "points") ("dis" . "missal's") ("scholars" . "hip's") ("over" . "load") ("kilo" . "watts") ("trash" . "cans") ("snaps" . "hot") ("lattice" . "work") ("mast" . "head") ("over" . "coming") ("whole" . "sales"))

The above code took 187.2 minutes.

To parallelize this code, we must make three changes:
  • Change the outer for/list form to a for/work form

  • Add a (connect-to-riot-server!) call in the main submodule

  • Start workers. For this example, I ran twenty total workers on four spare lab machines and started the tracker server on "alfred"

The new code looks like this:
#lang racket
;; dict.rkt
(require (planet gcr/riot))
 
(define dictionary
  ;; List of words
  (for/set ([word (in-list (file->lines "/usr/share/dict/words"))]
            #:when (>= (string-length word) 3))
           word))
 
(define (word-combinations)
   (apply append ; This flattens the list
          (for/work ([first-word (in-set dictionary)])
            (for/list ([second-word (in-set dictionary)]
                       #:when (set-member? dictionary
                                           (string-append first-word
                                                          second-word)))
              (cons first-word second-word)))))
 
(module+ main
  (connect-to-riot-server! "alfred")
  (define words (time (word-combinations)))
  (printf "There are ~a words.\n" (length words))
  ;; Print a random subset of the results.
  (write (take (shuffle words) 20)) (newline))

This program generates this output:

$ ~/racket/bin/racket dict.rkt

cpu time: 51903 real time: 1121990 gc time: 1732

There are 17658 words.

(("nick" . "name's") ("head" . "lights") ("ran" . "sacks") ("disc" . "lose") ("build" . "ups") ("wind" . "breaks") ("hot" . "headed") ("god" . "parent") ("main" . "frame") ("fiddle" . "sticks") ("pro" . "verbs") ("Volta" . "ire") ("select" . "ions") ("trail" . "blazer") ("bat" . "ten's") ("sniff" . "led") ("over" . "joys") ("down" . "hill") ("panel" . "led") ("tempera" . "ting"))

This version took 18.7 minutes, which is about a factor of 10 improvement. Our cluster completed about 100 workunits (outer loop iterations) per second. To make this more efficient, we would want to find some way of splitting the work up into larger and less workunits to lower the tracker’s network overhead.

The tracker caches workunits in memory. Running the program a second time...

$ ~/racket/bin/racket dict.rkt

cpu time: 30133 real time: 63214 gc time: 772

...takes 63 seconds because the tracker remembered the ~100,000 completed workunits. The program now spent all of its time in network traffic and appending/shuffling the huge list of results.

3 In which we present an overview and clarity is achieved

 (require (planet gcr/riot:1:=0))

Riot clusters consist of three parts:
  • A master program

  • A tracker server

  • One or more worker processeses

3.1 The master program

The master program sends workunits to the tracker and waits for the tracker to send results back. To do this, the master program uses for/work, which sends units of work to the tracker and returns the results. The program can also use do-work, do-work/call, and do-work/eval forms for lower-level control.

3.2 The tracker

 (require (planet gcr/riot:1:=0/server))
The tracker server’s only job is to accept workunits from master programs, assign them to workers, and return worker results back to the master program. It’s essentially nothing more than a database of workunits. You can query that database using the functions described in the low-level client API section.

To start your own tracker server, run:

$ racket -p gcr/riot/server

The server can also run on a different port, like this:

$ racket -p gcr/riot/server -- --port 12345

The double dash separates racket’s commandline arguments from the tracker server’s arguments.

3.3 The workers

 (require (planet gcr/riot:1:=0/worker))

Workers are processes that turn workunits into results. You can start a worker by running:

racket -p gcr/riot/worker -- server-host

or, more fancy:

racket -p gcr/riot/worker -- --port 1234 --name worker-name server-host

where port and name are optional.

Each worker has a “client name” that identifies itself. This defaults to the machine’s hostname followed by a dash and a random string of letters that uniquely identify multiple workers on that client. To change the first part of the client name, use the name switch.

In workunits created by the for/work and do-work forms, each worker will require the module that contains that form. In the case of do-work/call workunits, the worker will require the module named by the do-work/call form. To be sure that the worker can find the module, you must run the worker in the same directory that you ran the master program relative to the module. For example, if you ran:

 $ cd

/tmp/demo; racket simple-demo.rkt

on the master and copied simple-demo.rkt to /home/worker/demo/simple-demo.rkt on the worker, you must run the worker like this:

 $ cd /home/worker/demo;

racket -p gcr/riot/worker -- tracker-server

This also means that each worker must have identical copies of simple-demo.rkt. If the body of a worker’s for/work form does not exactly match the master’s for/work form, it will complain about a code mismatch. The for/work and do-work forms check for this, but do-work/call cannot check the version of the module it requires, so workers with differing versions of those modules will silently misbehave.
A worker shouldn’t generate its own workunits. This means you musn’t use for/work or do-work forms in the top level of a module (or in any function that runs when the module is required) or else your workers will attempt to generate workunits of their own. See the for/work section for more details.

3.4 In which we describe the peculiarities of for/work and do-work

The two easiest ways of submitting workunits are do-work and for/work.
(do-work body ...)
Packages up the body ... expressions to be evaluated in a workunit. This form returns instantly (after one round-trip to the tracker) and returns a workunit ID, a string representing a promise to do the work. Pass this value to wait-until-done to retrieve the result of the do-work form.
(for/work (for-clause ...) body ...)
Acts just like for/list, but arranges for each body ... to run in parallell: each iteration creates a workunit using do-work; then calls wait-until-done on the resulting workunit.

This is essentially equivalent to:
(let ([workunits (for/list (for-clause ...)
                    (do-work body ...))])
    (for/list ([p workunits]) (wait-until-done p)))

For for/work and do-work, any expressions can appear in the body ... form as long as they:
  • ...only refer to serializable? values. If a free variable is unserializable, the workunit cannot be packaged up for transmission accross the network to workers.

  • ...do not cause and do not depend on global side effects. Otherwise, each worker may have different state, causing unpredictable behavior.

The do-work form works by wrapping all of the body ... expressions inside a serial-lambda with no arguments. This effectively makes each workunit its own closure. Free variables are serialized when the workunit is sent to the tracker, and the resulting value is serialized on the return trip.

Workunits created by do-work (and, by extension, for/work) can refer to free variables, like this:

#lang racket
(require (planet gcr/riot))
 
(define (run)
  (define master-random-number (round (* 100 (random))))
  (define running-workunits
    (for/list ([x (in-range 10)])
      (do-work
       (format "Workunit #~a: Master chose ~a, but we choose ~a."
               x
               master-random-number
               (round (* 100 (random)))))))
  (for ([workunit (in-list running-workunits)])
    (displayln (wait-until-done workunit))))
 
(module+ main
 (connect-to-riot-server! "localhost")
 (run))

Workunit #0: Master chose 67.0, but we choose 27.0.

Workunit #1: Master chose 67.0, but we choose 51.0.

Workunit #2: Master chose 67.0, but we choose 49.0.

Workunit #3: Master chose 67.0, but we choose 64.0.

Workunit #4: Master chose 67.0, but we choose 62.0.

Workunit #5: Master chose 67.0, but we choose 41.0.

Workunit #6: Master chose 67.0, but we choose 5.0.

Workunit #7: Master chose 67.0, but we choose 54.0.

Workunit #8: Master chose 67.0, but we choose 100.0.

Workunit #9: Master chose 67.0, but we choose 33.0.

In this example, the free variable master-random-number’s value of 67.0 has been serialized along with the do-work body and sent to the workers.

To ensure that workers use the same version of the code that the master thinks they’re using, do-work signs the code with an md5 hash of the body ... expressions. If a worker’s hash of a do-work body does not match the master’s hash, the worker will complain. Always keep all worker code up to date.

3.5 In which, with a heavy heart, we outline restrictions and limitations of for/work and do-work

The for/work and do-work forms don’t transmit their code; they only transmit the free variables that the bodies refer to. This creates a number of constraints:

3.6 In which we describe the numerous kinds of workunits and how to create them

There’s more to riot than for/work and do-work. If those functions seem too magical, you may be more comfortable with these instead.

(do-work/call module exported-fun arg ...)  any/c
  module : module-path?
  exported-fun : symbol?
  arg : serializable?
Accepts a module path and an exported symbol in that module. Workers will require module and run (exported-fun arg ...). Be sure that workers have the latest version of the module, or you may get incorrect results – this function does not sign the module’s code or perform any version checking.

Returns instantly (after one round-trip to the tracker) and returns a workunit ID, a string representing a promise to do the work. Pass this value to wait-until-done to return the value of the called function.

Example:

#lang racket
;; This is do-work-call.rkt
(require (planet gcr/riot))
 
(provide double)
(define (double x)
  (* x 2))
 
(define (run)
  (define running-workunits
    (for/list ([x (in-range 10)])
      (do-work/call "do-work-call.rkt" 'double x)))
  (map wait-until-done running-workunits))
 
(module+ main
 (connect-to-riot-server! "localhost")
 (run))
This produces '(0 2 4 6 8 10 12 14 16 18)

(do-work/eval datum)  any/c
  datum : any/c
Creates a workunit that causes workers to (eval datum), with all the nightmarish implications that entails.

Returns instantly (after one round-trip to the tracker) and returns a workunit ID, a string representing a promise to do the work. Pass this value to wait-until-done to return the evaluated value of datum.

This is the only function that does not require workers to share any code with the master.

Example:

#lang racket
(require (planet gcr/riot))
(connect-to-riot-server! "localhost")
(wait-until-done (do-work/eval '(+ 3 5)))
produces 8.

(wait-until-done workunit-id)  any/c
  workunit-id : any/c
Waits until the given workunit-id is finished, and returns the result. If a worker throws an exception while running the workunit , this function will throw an exception in the master program.

(call-when-done workunit-id thunk)  any/c
  workunit-id : any/c
  thunk : (-> boolean? any/c any/c any/c)
Returns instantly, but sets up thunk to be called in its own thread once the given workunit-id finishes.

Riot will call (thunk error? client-name result). If the workunit succeeds, error? is #f, client-name is the ID of the client that finished the workunit, and result is the result of the workunit. If the workunit fails, error? is #t and result is the message of the exception that caused the workunit to fail.

(connect-to-riot-server! hostname    
  [port    
  client-name])  any/c
  hostname : string?
  port : exact-integer? = 2355
  client-name : exact-integer? = (gethostname)
Connects to the tracker server at hostname and returns nothing. This function also sets current-client to the resulting client? object.
(current-client)  client?
(current-client client-obj)  void?
  client-obj : client?
The parameter that represents the currently connected tracker. Used internally by the above functions

4 In which we present a lower-level client API for communicating with the tracker

 (require (planet gcr/riot:1:=0/client))
This module contains lower-level bindings for communicating with the tracker. Both the master prgoram and workers use it.

(client? maybe-client)  boolean?
  maybe-client : any/c
Returns #t if maybe-client is the result of connect-to-tracker.

(connect-to-tracker hostname    
  [port    
  client-name])  client?
  hostname : string?
  port : exact-integer? = 2355
  client-name : exact-integer? = (gethostname)
Connects to the tracker server at hostname, reports a client ID of client-name, and returns a client? upon successful connection.
(client-who-am-i client)  any/c
  client : client?
The server will append a nonce to your client ID. Call this function to return your full client ID as chosen by the server.
(client-workunit-info client workunit-id)
  (list/c symbol? any/c any/c any/c)
  client : client?
  workunit-id : any/c
Returns information about the given workunit-id:

(list status wu-client result last-change)

where status is one of 'waiting, 'running, 'done, or 'error; wu-client is the client ID of the worker processing the workunit (or #f); result is the value of the workunit if completed or the error message if broken; and last-change is the server’s (current-inexact-milliseconds) when the workunit’s status last changed.
(client-call-with-workunit-info client    
  workunit-id    
  thunk)  any/c
  client : client?
  workunit-id : any/c
  thunk : (-> symbol? any/c any/c any/c any/c)
Like above, but immediately returns. Later, thunk will be called in its own thread with each of the arguments in the above list

(client-wait-for-work client)  (list/c wu-key? any/c)
  client : client?
Signals that you’re waiting for work and blocks until the tracker assigns you a workunit. The result is (list workunit-key workunit-data).

After this function returns, you will own that workunit; it will not be assigned to other clients until you disconnect from the tracker or until you call client-complete-workunit!.
(client-call-with-work client thunk)  any/c
  client : client?
  thunk : (-> wu-key? any/c any/c)
Like above, but returns instantly. Later, thunk will be called in its own thread when the tracker assigns a workunit to us, with arguments workunit-key and data.. Its return value is NOT used as the workunit’s result; you must call client-complete-workunit!.
(client-add-workunit client data)  any/c
  client : client?
  data : serializable?
Adds a workunit with the given data and blocks until the tracker returns the new workunit’s key.

Note that data must be in a special format; you should rarely need to call this function. Look at the source code for worker.rkt to see how workers parse this.
(client-call-with-new-workunit client    
  data    
  thunk)  any/c
  client : client?
  data : serializable?
  thunk : (-> any/c any/c)
Like above, but returns instantly, calling (thunk new-workunit-key) in its own thread when the tracker assigns us a workunit.
(client-wait-for-finished-workunit client 
  workunit) 
  (list/c wu-key? symbol? any/c any/c)
  client : client?
  workunit : any/c
Waits for the given workunit to complete, returning (list wu-key status client result) as in client-workunit-info.
(client-call-with-finished-workunit client    
  workunit    
  thunk)  any/c
  client : client?
  workunit : any/c
  thunk : (-> wu-key? symbol? any/c any/c any/c)
Arranges for thunk to be called in its own thread when workunit finishes, with args like the above list.

(client-complete-workunit! client    
  workunit    
  error?    
  result)  any/c
  client : client?
  workunit : any/c
  error? : boolean?
  result : serializable?
Sends result to the tracker as the result of the workunit. If the workunit fails, error? should be #t and result should be a message indicating what went wrong.

5 In which we outline licensing and copyrights

The code in this (planet gcr/riot) package and this documentation is under the zlib license, reproduced below.

Copyright (c) 2012 gcr

 

This software is provided 'as-is', without any express or implied

warranty. In no event will the authors be held liable for any damages

arising from the use of this software.

 

Permission is granted to anyone to use this software for any purpose,

including commercial applications, and to alter it and redistribute it

freely, subject to the following restrictions:

 

   1. The origin of this software must not be misrepresented; you must not

   claim that you wrote the original software. If you use this software

   in a product, an acknowledgment in the product documentation would be

   appreciated but is not required.

 

   2. Altered source versions must be plainly marked as such, and must not be

   misrepresented as being the original software.

 

   3. This notice may not be removed or altered from any source

   distribution.