Map Reduce ArchitectureSingle-node architectureCommodity ClustersCluster ArchitectureStable storageDistributed File SystemMotivation for MapReduce (why)What is Map/ReduceMap in LISP (Scheme)Reduce in LISP (Scheme)Warm up: Word CountWord Count (2)Word Count (3)MapReduceWord Count using MapReduceCount, IllustratedModel is Widely Applicable MapReduce Programs In Google Source TreeImplementation OverviewDistributed Execution OverviewData flowCoordinationFailuresExecutionParallel ExecutionHow many Map and Reduce jobs?CombinersPartition FunctionExecution SummaryExercise 1: Host sizeExercise 2: Distributed GrepGrepExercise 3: Graph reversalReverse Web-Link GraphExercise 4: Frequent PairsHadoopReadingConclusionsPrasad L06MapReduce 1Map Reduce ArchitectureAdapted from Lectures byAnand Rajaraman (Stanford Univ.)and Dan Weld (Univ. of Washington)Single-node architectureMemoryDiskCPUMachine Learning, Statistics“Classical” Data MiningPrasad 2L06MapReduceCommodity ClustersWeb data sets can be very large Tens to hundreds of terabytesCannot mine on a single server (why?)Standard architecture emerging:Cluster of commodity Linux nodesGigabit ethernet interconnectHow to organize computations on this architecture?Mask issues such as hardware failurePrasad 3L06MapReduceCluster ArchitectureMemDiskCPUMemDiskCPU…SwitchEach rack contains 16-64 nodesMemDiskCPUMemDiskCPU…SwitchSwitch1 Gbps between any pair of nodesin a rack2-10 Gbps backbone between racksPrasad 4L06MapReduceStable storageFirst order problem: if nodes can fail, how can we store data persistently? Answer: Distributed File SystemProvides global file namespaceGoogle GFS; Hadoop HDFS; Kosmix KFSTypical usage patternHuge files (100s of GB to TB)Data is rarely updated in placeReads and appends are commonPrasad 5L06MapReduceDistributed File SystemChunk ServersFile is split into contiguous chunksTypically each chunk is 16-64MBEach chunk replicated (usually 2x or 3x)Try to keep replicas in different racksMaster nodea.k.a. Name Nodes in HDFSStores metadataMight be replicatedClient library for file accessTalks to master to find chunk servers Connects directly to chunkservers to access dataPrasad 6L06MapReduceMotivation for MapReduce (why)Large-Scale Data ProcessingWant to use 1000s of CPUsBut don’t want hassle of managing thingsMapReduce Architecture providesAutomatic parallelization & distributionFault toleranceI/O schedulingMonitoring & status updatesPrasad 7L06MapReduceWhat is Map/ReduceMap/Reduce Programming model from LISP(and other functional languages)Many problems can be phrased this wayEasy to distribute across nodesNice retry/failure semanticsPrasad 8L06MapReduceMap in LISP (Scheme)(map f list [list2 list3 …])(map square ‘(1 2 3 4))(1 4 9 16)Unary operatorPrasad 9L06MapReduceReduce in LISP (Scheme)(reduce f id list)(reduce + 0 ‘(1 4 9 16))(+ 16 (+ 9 (+ 4 (+ 1 0)) ) )30(reduce + 0 (map square (map – l1 l2))))Binary operatorPrasad 10L06MapReduceWarm up: Word CountWe have a large file of words, one word to a lineCount the number of times each distinct word appears in the fileSample application: analyze web server logs to find popular URLsPrasad 11L06MapReduceWord Count (2)Case 1: Entire file fits in memoryCase 2: File too large for mem, but all <word, count> pairs fit in memCase 3: File on disk, too many distinct words to fit in memorysort datafile | uniq –cPrasad 12L06MapReduceWord Count (3)To make it slightly harder, suppose we have a large corpus of documentsCount the number of times each distinct word occurs in the corpuswords(docs/*) | sort | uniq -cwhere words takes a file and outputs the words in it, one to a lineThe above captures the essence of MapReduceGreat thing is it is naturally parallelizablePrasad 13L06MapReduceMapReduceInput: a set of key/value pairsUser supplies two functions:map(k,v) list(k1,v1)reduce(k1, list(v1)) v2(k1,v1) is an intermediate key/value pairOutput is the set of (k1,v2) pairsPrasad 14L06MapReduceWord Count using MapReducemap(key, value):// key: document name; value: text of documentfor each word w in value:emit(w, 1)reduce(key, values):// key: a word; values: an iterator over countsresult = 0for each count v in values:result += vemit(key,result)Prasad 15L06MapReduceCount, Illustratedmap(key=url, val=contents): For each word w in contents, emit (w, “1”)reduce(key=word, values=uniq_counts):Sum all “1”s in values listEmit result “(word, sum)”see bob runsee spot throwsee 1bob 1 run 1see 1spot 1throw 1bob 1 run 1see 2spot 1throw 1Prasad 16L06MapReduceExample uses: distributed grep O distributed sort O web link-graph reversal term-vector / hostweb access log stats inverted index construction document clustering machine learning statistical machine translation ... ... ...Model is Widely ApplicableMapReduce Programs In Google Source Tree Prasad 17L06MapReduceTypical cluster: • 100s/1000s of 2-CPU x86 machines, 2-4 GB of memory • Limited bisection bandwidth • Storage is on local IDE disks • GFS: distributed file system manages data (SOSP'03) • Job scheduling system: jobs made up of tasks, scheduler assigns tasks to machines Implementation is a C++ library linked into user programs Implementation OverviewPrasad 18L06MapReduceDistributed Execution Overview UserProgramWorkerWorkerMasterWorkerWorkerWorkerforkforkforkassignmapassignreducereadlocalwriteremoteread,sortOutputFile 0OutputFile 1writeSplit 0Split 1Split 2Input DataPrasad 19L06MapReduceData flowInput, final output are stored on a distributed file systemScheduler tries to schedule map tasks “close” to physical storage location of input dataIntermediate results are stored on local FS of map and reduce workersOutput is often input to another map reduce taskPrasad 20L06MapReduceCoordinationMaster data structuresTask status: (idle, in-progress, completed)Idle tasks get scheduled as workers become availableWhen a map task completes, it sends the master the location and sizes of its R intermediate files, one for each reducerMaster pushes this info to reducersMaster pings workers periodically to detect failuresPrasad 21L06MapReduceFailuresMap worker failureMap tasks completed or in-progress at worker are reset to idleReduce workers are notified when task is
View Full Document