Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion README.md

This file was deleted.

78 changes: 78 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
CS4513: Project 3 The MapReduce Library
=======================================

Note, this document includes a number of design questions that can help your implementation. We highly recommend that you answer each design question **before** attempting the corresponding implementation.
These questions will help you design and plan your implementation and guide you towards the resources you need.
Finally, if you are unsure how to start the project, we recommend you visit office hours for some guidance on these questions before attempting to implement this project.


Team members
-----------------

1. Tyler Jones (twjones@wpi.edu)
2. Tom Graham (Tgraham@wpi.edu)

Design Questions
------------------

(2 point) 1. If there are n input files, and nReduce number of reduce tasks , how does the the MapReduce Library uniquely name the intermediate files?

f0-0, ... f0-[nReduce-1]
f1-0, ... f1-[nReduce-1]
...
f[n-1]-0 ... f[n-1][nReduce-1]

MapReduce calls reduceName and mergeName in common.go to construct the unique file names.
reduceName uses the job name, map task and reduce task to create the file name ("mrtmp." + jobName + "-" + strconv.Itoa(mapTask) + "-" + strconv.Itoa(reduceTask)
mergeName uses the job name and the redeuce task to create the file name. ("mrtmp." + jobName + "-res-" + strconv.Itoa(reduceTask)


(1 point) 2. Following the previous question, for the reduce task r, what are the names of files will it work on?

f0-r,
f1-r,
...
f[n-1]-r


(1 point) 3. If the submitted mapreduce job name is "test", what will be the final output file's name?

mrtmp.test


(2 point) 4. Based on `mapreduce/test_test.go`, when you run the `TestBasic()` function, how many master and workers will be started? And what are their respective addresses and their naming schemes?

1 master, 2 workers.
Addresses should look similar to:

/var/tmp/824-UID/mrPID-master
/var/tmp/824-UID/mrPID-worker0
/var/tmp/824-UID/mrPID-worker1

UID and PID can be any integers.


(4 point) 5. In real-world deployments, when giving a mapreduce job, we often start master and workers on different machines (physical or virtual). Describe briefly the protocol that allows master and workers be aware of each other's existence, and subsequently start working together on completing the mapreduce job. Your description should be grounded on the RPC communications.

You would use RPC still but the protocol that would allow this to happen is TCP rather than using UNIX-domain sockets. So you would need to use a NFS to share storage across multiple machines, and you would also need to be able to start the processes on all the machines.


(2 point) 6. The current design and implementation uses a number of RPC methods. Can you find out all the RPCs and list their signatures? Briefly describe the criteria
a method needs to satisfy to be considered a RPC method. (Hint: you can look up at: https://golang.org/pkg/net/rpc/)

In order to be considered an RPC method the method must be structured syntaically like this:
`fun (t *T) myFunc (argType T1, replyType *T2) error`
When written like that the t1 and t2 represent encoding and gob. Gob is the interface for encoding values to be used in the GobDecoder.


7. We provide a bash script called `main/test-wc-distributed.sh` that allows testing the distributed MapReduce implementation.

Both of the test scripts work and provide the expected results.


Errata
------

Describe any known errors, bugs, or deviations from the requirements.

---
50 changes: 28 additions & 22 deletions starter-project-mr/README.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,60 +10,66 @@ Team members
-----------------

1. Tyler Jones (twjones@wpi.edu)
2. Lorenzo Lopez (lplopez@wpi.edu)
3. Tom Graham (Tgraham@wpi.edu)
2. Tom Graham (Tgraham@wpi.edu)

Design Questions
------------------

(2 point) 1. If there are n input files, and nReduce number of reduce tasks , how does the the MapReduce Library uniquely name the intermediate files?

f0-0, ... f0-[nReduce-1]
f1-0, ... f1-[nReduce-1]
...
f[n-1]-0 ... f[n-1][nReduce-1]
f0-0, ... f0-[nReduce-1]
f1-0, ... f1-[nReduce-1]
...
f[n-1]-0 ... f[n-1][nReduce-1]

MapReduce calls reduceName and mergeName in common.go to construct the unique file names.
reduceName uses the job name, map task and reduce task to create the file name ("mrtmp." + jobName + "-" + strconv.Itoa(mapTask) + "-" + strconv.Itoa(reduceTask)
mergeName uses the job name and the redeuce task to create the file name. ("mrtmp." + jobName + "-res-" + strconv.Itoa(reduceTask)


(1 point) 2. Following the previous question, for the reduce task r, what are the names of files will it work on?

f0-r,
f1-r,
...
f[n-1]-r
f0-r,
f1-r,
...
f[n-1]-r


(1 point) 3. If the submitted mapreduce job name is "test", what will be the final output file's name?

mrtmp.test
mrtmp.test


(2 point) 4. Based on `mapreduce/test_test.go`, when you run the `TestBasic()` function, how many master and workers will be started? And what are their respective addresses and their naming schemes?

1 master, 2 workers.
Addresses should look similar to:

1 master, 2 workers.
Addresses should look similar to:

/var/tmp/824-UID/mrPID-master
/var/tmp/824-UID/mrPID-worker0
/var/tmp/824-UID/mrPID-worker1
/var/tmp/824-UID/mrPID-master
/var/tmp/824-UID/mrPID-worker0
/var/tmp/824-UID/mrPID-worker1

UID and PID can be any integers.
UID and PID can be any integers.


(4 point) 5. In real-world deployments, when giving a mapreduce job, we often start master and workers on different machines (physical or virtual). Describe briefly the protocol that allows master and workers be aware of each other's existence, and subsequently start working together on completing the mapreduce job. Your description should be grounded on the RPC communications.


(Most students probably will be able to answer this question correctly---as the process is summarized in the project description. The purpose of this question is to get students to read and understand the protocol.)

Just give submission full points for this question.
You would use RPC still but the protocol that would allow this to happen is TCP rather than using UNIX-domain sockets. So you would need to use a NFS to share storage across multiple machines, and you would also need to be able to start the processes on all the machines.


(2 point) 6. The current design and implementation uses a number of RPC methods. Can you find out all the RPCs and list their signatures? Briefly describe the criteria
a method needs to satisfy to be considered a RPC method. (Hint: you can look up at: https://golang.org/pkg/net/rpc/)

In order to be considered an RPC method the method must be structured syntaically like this:
`fun (t *T) myFunc (argType T1, replyType *T2) error`
When written like that the t1 and t2 represent encoding and gob. Gob is the interface for encoding values to be used in the GobDecoder.


7. We provide a bash script called `main/test-wc-distributed.sh` that allows testing the distributed MapReduce implementation.

Both of the test scripts work and provide the expected results.


Errata
------

Expand Down
27 changes: 25 additions & 2 deletions starter-project-mr/src/main/wc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,44 @@ import (
"fmt"
"mr/mapreduce"
"os"
"strconv"
"strings"
"unicode"
)

// The mapping function is called once for each piece of the input.
// In this framework, the key is the name of the file that is being processed,
// and the value is the file's contents. The return value should be a slice of
// key/value pairs, each represented by a mapreduce.KeyValue.
func mapF(document string, value string) (res []mapreduce.KeyValue) {
// TODO:

// Setup new array for storage of intermediate strings
var newArray []mapreduce.KeyValue

// Splits the given string at Unicode, and rets an array of slices

// w := strings.FieldsFunc(value, func(test int32) bool {
// return !unicode.IsLetter(test)
// })

w := strings.FieldsFunc(value, func(r rune) bool {
return !unicode.IsLetter(r)
})

// Can use blank identifier because word is all that's needed
for _, word := range w {
newArray = append(newArray, mapreduce.KeyValue{Key: word, Value: "1"})
}

return newArray
}

// The reduce function is called once for each key generated by Map, with a
// list of that key's string value (merged across all inputs). The return value
// should be a single output value for that key.
func reduceF(key string, values []string) string {
// TODO:

return strconv.Itoa(len(values))
}

// Can be run in 3 ways:
Expand Down
85 changes: 85 additions & 0 deletions starter-project-mr/src/mapreduce/common_map.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package mapreduce

import (
"encoding/json"
"hash/fnv"
"io/ioutil"
"os"
)

// doMap does the job of a map worker: it reads one of the input files
Expand All @@ -14,6 +17,19 @@ func doMap(
nReduce int, // the number of reduce task that will be run ("R" in the paper)
mapF func(file string, contents string) []KeyValue,
) {

// The program is the job, and the job can run multiple tasks/workers

// doMap : Task - Reads a file, call mapF and partitions the key/value pairs into nReduce intermediate files
// doReduce : Task - For each file, find the corresponding intermediate file, combine the '1's into an array for each word, call reduceF to sum the '1's for each word. Writes key/value pairs to merge file
// mapF : Returns an array of key/value pairs. Each value is '1' which means that there can be duplicate keys in the array.
// reduceF : Takes a list of '1's and combines them using the len operator

// Total map tasks: nMap (# of files)
// Total reduce tasks: nReduce (# of intermediate files)

// complete all of the map tasks first, then complete all of the reduce tasks

// TODO:
// You will need to write this function.
// You can find the filename for this map task's input to reduce task number
Expand Down Expand Up @@ -42,6 +58,75 @@ func doMap(
// Remember to close the file after you have written all the values!
// Use checkError to handle errors.

content, readErr := ioutil.ReadFile(inFile) // read the content of the file
checkError(readErr)

keyValues := mapF(inFile, string(content)) // collect the key values of the file

encoders := make([]*json.Encoder, nReduce)
files := make([]*os.File, nReduce)

for i := 0; i < nReduce; i++ { // creates nReduce files and encoders

var createErr error
fName := reduceName(jobName, mapTaskNumber, i) // creates the file name
files[i], createErr = os.Create(fName) // stores the output files in the ofiles folder
checkError(createErr)
defer files[i].Close()

encoders[i] = json.NewEncoder(files[i])
}

for _, kv := range keyValues { // for every key/value, assigns it a file/encoder
index := ihash(kv.Key) % uint32(nReduce)
encErr := encoders[index].Encode(&kv) // write encoded json string to
checkError(encErr)
}

// Tyler's Notes

// Application:

// master.go creates a master_rpc server for workers to register
// workers will register using the RPC call Register. The workers will also start up their own RPC servers so that master can dispatch them tasks
// Workers register using RPC call Register()
// RPC allows processes to communicate with one another
// As tasks become available, master.go uses schedule() in scedule.go to assign the different tasks to the different workers (and how to handle worker failure)
// Each input file = 1 map task
// Master then makes a call to doMap atleast once for each task, Sequential -> doMap() directly, Distributed -> DoTask() in worker.go to give the task to a worker
// Each call to do map does:
// 1. Read the contents of the input file
// 2. Call mapF and passes the file name and the contents of the file - returns an array of key/value pairs for that file - each value is simply "1" and there are multiple keys
// 3. Partitions the output into nReduce files - for each key, call the hash function and it determines which file the key/value will be stored in
// For the ith map task, it will generate a list of files with the following naming pattern: fi-0, fi-1 ... fi-[nReduce-1]
// So, the total number of files = # of files * nReduce (partition files per file)
// The master then calls doReduce() atleast once for each reduce task, Sequential -> doMap() directly, Distributed -> DoTask() in worker.go to give the task to a worker
// For the jth doReduce() call, doReduce() will go through f0-j, f1-j, ..., f[n-1]-j
// Basically, doMap goes splits each file into R subfiles, and then doReduce iterates through all of the files R times and works only on a subsection of each file
// After reduce, master calls mr.merge() in master_splitmerge.go which merges the the nReduce files from the previous step
// Lastly, master shuts down each worker's RPC and then finally, it's own

// Where do we store all of these partition files???
// What is the purpose of ihash?

// This function accepts jobName, mapTaskNumber, a file, number of reduce tasks that will be run, and the mapF function
// doMap = map worker: Reads an input file (inFile), calls the mapF function for that file (inputting the file, and the content), partitions the output into nReduce intermediate files
// Give mapF a file's name and the content and it returns an array of key/value pairs. Key = word, Value = # of times that word appeared in contents, or a list of 1s???

// Helper functions:
// reduceName - constructs the name of the intermediate file which map task - jobName, mapTask, reduceTask
// ihash function below is used to determine which file a given key belongs into

// Each job conatins multiple tasks
// Each map job stores a given file's key/value pairs in nReduce files
// In this example, the tasks are run sequentially via for loop

// Im guessing you are given a massive list of key/value pairs from mapF. Step three then takes that list and splits it nReduce times.
// This is also the number of reduce tasks needed for this one file (large list of key/value pairs)
// After splitting those key value pairs, you then store each partition in a file
// The files name includes: Which map task produced them and which reduce task are they for. Why do we need the map task???
// Use JSON to convert key/value data structures to a string and store it in a file.

}

func ihash(s string) uint32 {
Expand Down
49 changes: 49 additions & 0 deletions starter-project-mr/src/mapreduce/common_reduce.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package mapreduce

import (
"encoding/json"
"os"
)

// doReduce does the job of a reduce worker: it reads the intermediate
// key/value pairs (produced by the map phase) for this task, sorts the
// intermediate key/value pairs by key, calls the user-defined reduce function
Expand Down Expand Up @@ -34,4 +39,48 @@ func doReduce(
//
// Use checkError to handle errors.

var keyValues []KeyValue

decoders := make([]*json.Decoder, nMap)
oFiles := make([]*os.File, nMap)

for i := 0; i < nMap; i++ { // for each file
//fmt.Println("Hello")
var readErr error
fName := reduceName(jobName, i, reduceTaskNumber) // construct the intermediate file name
oFiles[i], readErr = os.Open(fName) // find and open the intermediate file
checkError(readErr)

decoders[i] = json.NewDecoder(oFiles[i]) // creates a decoder for the file
for {
var keyVal KeyValue
decErr := decoders[i].Decode(&keyVal)
if decErr != nil {
break
} else {
keyValues = append(keyValues, keyVal)
}
}

defer oFiles[i].Close()

}

kvHash := make(map[string][]string) // Key: String, Value: String array

for i := 0; i < len(keyValues); i++ {
kvHash[keyValues[i].Key] = append(kvHash[keyValues[i].Key], keyValues[i].Value)
}

fNameMerge := mergeName(jobName, reduceTaskNumber) // create the name of the merge file
mf, createErr := os.Create(fNameMerge) // create the merge file
checkError(createErr)

enc := json.NewEncoder(mf) // create an encoder for the merge file
for _, key := range keyValues {
enc.Encode(KeyValue{key.Key, reduceF(key.Key, kvHash[key.Key])})
}

defer mf.Close()

}
Loading