1 Lecture 23: Supplementary slides for Pig Latin Friday, May 28, 2010Outline Based entirely on Pig Latin: A not-so-foreign language for data processing, by Olston, Reed, Srivastava, Kumar, and Tomkins, 2008 Quiz section tomorrow: in CSE 403 (this is CSE, don’t go to EE1) 2Why ? • Map-reduce is a low-level programming envinroment • In most applications need more complex queries • Pig-latin accepts higher level queries, translates them to sequences of map-reduce 3Pig-Latin Overview • Data model = loosely typed nested relations • Query model = a sql-like, dataflow language • Execution model: – Option 1: run locally on your machine – Option 2: compile into sequence of map/reduce, run on a cluster supporting Hadoop • Main idea: use Opt1 to debug, Opt2 to execute 4Example • Input: a table of urls: (url, category, pagerank) • Compute the average pagerank of all sufficiently high pageranks, for each category • Return the answers only for categories with sufficiently many such pages 5First in SQL… 6 SELECT category, AVG(pagerank) FROM urls WHERE pagerank > 0.2 GROUP By category HAVING COUNT(*) > 106…then in Pig-Latin 7 good_urls = FILTER urls BY pagerank > 0.2 groups = GROUP good_urls BY category big_groups = FILTER groups BY COUNT(good_urls) > 106 output = FOREACH big_groups GENERATE category, AVG(good_urls.pagerank)Types in Pig-Latin • Atomic: string or number, e.g. ‘Alice’ or 55 • Tuple: (‘Alice’, 55, ‘salesperson’) • Bag: {(‘Alice’, 55, ‘salesperson’), (‘Betty’,44, ‘manager’), …} • Maps: we will try not to use these 8Types in Pig-Latin Bags can be nested ! • {(‘a’, {1,4,3}), (‘c’,{ }), (‘d’, {2,2,5,3,2})} Tuple components can be referenced by number • $0, $1, $2, … 910Loading data • Input data = FILES ! – Heard that before ? • The LOAD command parses an input file into a bag of records • Both parser (=“deserializer”) and output type are provided by user 11Loading data 12 queries = LOAD ‘query_log.txt’ USING myLoad( ) AS (userID, queryString, timeStamp)Loading data • USING userfuction( ) -- is optional – Default deserializer expects tab-delimited file • AS type – is optional – Default is a record with unnamed fields; refer to them as $0, $1, … • The return value of LOAD is just a handle to a bag – The actual reading is done in pull mode, or parallelized 13FOREACH 14 expanded_queries = FOREACH queries GENERATE userId, expandQuery(queryString) expandQuery( ) is a UDF that produces likely expansions Note: it returns a bag, hence expanded_queries is a nested bagFOREACH 15 expanded_queries = FOREACH queries GENERATE userId, flatten(expandQuery(queryString)) Now we get a flat collection16FLATTEN Note that it is NOT a first class function ! (that’s one thing I don’t like about Pig-latin) • First class FLATTEN: – FLATTEN({{2,3},{5},{},{4,5,6}}) = {2,3,5,4,5,6} – Type: {{T}} {T} • Pig-latin FLATTEN – FLATTEN({4,5,6}) = 4, 5, 6 – Type: {T} T, T, T, …, T ????? 17FILTER 18 real_queries = FILTER queries BY userId neq ‘bot’ Remove all queries from Web bots: real_queries = FILTER queries BY NOT isBot(userId) Better: use a complex UDF to detect Web bots:JOIN 19 join_result = JOIN results BY queryString revenue BY queryString results: {(queryString, url, position)} revenue: {(queryString, adSlot, amount)} join_result : {(queryString, url, position, adSlot, amount)}20GROUP BY 21 grouped_revenue = GROUP revenue BY queryString query_revenues = FOREACH grouped_revenue GENERATE queryString, SUM(revenue.amount) AS totalRevenue revenue: {(queryString, adSlot, amount)} grouped_revenue: {(queryString, {(adSlot, amount)})} query_revenues: {(queryString, totalRevenue)}Simple Map-Reduce 22 map_result = FOREACH input GENERATE FLATTEN(map(*)) key_groups = GROUP map_result BY $0 output = FOREACH key_groups GENERATE reduce($1) input : {(field1, field2, field3, . . . .)} map_result : {(a1, a2, a3, . . .)} key_groups : {(a1, {(a2, a3, . . .)})}Co-Group 23 grouped_data = COGROUP results BY queryString, revenue BY queryString; results: {(queryString, url, position)} revenue: {(queryString, adSlot, amount)} grouped_data: {(queryString, results:{(url, position)}, revenue:{(adSlot, amount)})} What is the output type in general ?Co-Group 24 Is this an inner join, or an outer join ?Co-Group 25 url_revenues = FOREACH grouped_data GENERATE FLATTEN(distributeRevenue(results, revenue)); grouped_data: {(queryString, results:{(url, position)}, revenue:{(adSlot, amount)})} distributeRevenue is a UDF that accepts search re- sults and revenue information for a query string at a time, and outputs a bag of urls and the revenue attributed to them.Co-Group v.s. Join 26 grouped_data = COGROUP results BY queryString, revenue BY queryString; join_result = FOREACH grouped_data GENERATE FLATTEN(results), FLATTEN(revenue); grouped_data: {(queryString, results:{(url, position)}, revenue:{(adSlot, amount)})} Result is the same as JOINAsking for Output: STORE 27 STORE query_revenues INTO `myoutput' USING myStore(); Meaning: write query_revenues to the file ‘myoutput’Implementation • Over Hadoop ! • Parse query: – Everything between LOAD and STORE one logical plan • Logical plan sequence of Map/Reduce ops • All statements between two (CO)GROUPs one Map/Reduce op 28Implementation
View Full Document