MIT 6.824 lab1

This is the first lab of the four labs. In this lab we need to build a mapreduce system. You can find detailed description from MIT’s website.

The system includes two parts: worker and coordinator(in the mapreduce paper it is called as “master”). Worker will make rpc calls to coordinator to register itself, request for task and report task status. The difficult part is that workers may crash, and we have to re-issue the task to a different worker. Also, dealing with concurrency is tricky.

type Coordinator struct {
	mu      sync.Mutex
	cond    *sync.Cond
	nReduce int
	done    bool
	phase   Phase
	files   []string
	tasks   []*Task
	workers []*workerStatus
}
type workerStatus struct {
	workerId int
	task     *Task
	cancel   context.CancelFunc
}

Here is how I define the coordinator. tasks stores tasks that are not yet issused. workers stores known workers. Once a task is issused, this task will be removed from tasks and link to a worker(the worker’s task field will be the task it is running). If tasks is empty and all workers’ task field is nil, meaning current phase is done. Then, if current phase is “Map”, switch phase to “Reduce” and add reduce tasks to tasks. If current phase is “Reduce”, meaning the mapreduce work is done.

But here is a problem: what should we tell a worker when there is no task to issuse for now but the whole work is not done yet(e.g. all Map tasks are issused and Reduce tasks are not yet generated)? My solution is using condition variable.

for !c.done && len(c.tasks) == 0 {
    c.cond.Wait()
}

When there is no task to issuse and the work is not done, we wait. Once we update tasks or done, we call Broadcast.

c.cond.Broadcast()

In this way we solved this problem gracefully.

Dealing with crash

Some workers may crash. Crashed worker will never report its status. In this lab, we assume that worker that not report in 10 seconds has died. The solution is that we create a goroutine when we issuse a task. This goroutine will wait up to 10 seconds. If this worker didn’t report in 10 seconds, this goroutine will remove the link between the task and the worker, and add it back to the tasks. Also, it will call Broadcast.

I found that using the context package is convenient to do this job. We create a context that timeout in 10 seconds when we issuse a task. Then we create a goroutine that wait for this context to be done and find out if it is canceled. If the context is canceled, meaning the worker reports normally, so it will do nothing. Otherwise it will do the recovery work.

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
c.workers[WorkerId].cancel = cancel
go func(ctx context.Context) {
    <-ctx.Done()
    if ctx.Err() == context.Canceled {
        return
    }
    c.mu.Lock()
    defer c.mu.Unlock()
    //do recovery work...
}(ctx)

Each time a worker reports, we call its cancel function.

c.workers[WorkerId].cancel()