1 1 Lecture 26b: Supplementary slides for Pig Latin Friday, Dec 3, 2010 Outline Based entirely on Pig Latin: A not-so-foreign language for data processing, by Olston, Reed, Srivastava, Kumar, and Tomkins, 2008 Quiz section tomorrow: in CSE 403 (this is CSE, don‟t go to EE1) 2 Why ? • Map-reduce is a low-level programming envinroment • In most applications need more complex queries • Pig-latin accepts higher level queries, translates them to sequences of map-reduce 3 Pig-Latin Overview • Data model = loosely typed nested relations • Query model = a sql-like, dataflow language • Execution model: – Option 1: run locally on your machine – Option 2: compile into sequence of map/reduce, run on a cluster supporting Hadoop • Main idea: use Opt1 to debug, Opt2 to execute 4 Example • Input: a table of urls: (url, category, pagerank) • Compute the average pagerank of all sufficiently high pageranks, for each category • Return the answers only for categories with sufficiently many such pages 5 First in SQL… 6 SELECT category, AVG(pagerank) FROM urls WHERE pagerank > 0.2 GROUP By category HAVING COUNT(*) > 1062 …then in Pig-Latin 7 good_urls = FILTER urls BY pagerank > 0.2 groups = GROUP good_urls BY category big_groups = FILTER groups BY COUNT(good_urls) > 106 output = FOREACH big_groups GENERATE category, AVG(good_urls.pagerank) Types in Pig-Latin • Atomic: string or number, e.g. „Alice‟ or 55 • Tuple: („Alice‟, 55, „salesperson‟) • Bag: {(„Alice‟, 55, „salesperson‟), („Betty‟,44, „manager‟), …} • Maps: we will try not to use these 8 Types in Pig-Latin Bags can be nested ! • {(„a‟, {1,4,3}), („c‟,{ }), („d‟, {2,2,5,3,2})} Tuple components can be referenced by number • $0, $1, $2, … 9 10 Loading data • Input data = FILES ! – Heard that before ? • The LOAD command parses an input file into a bag of records • Both parser (=“deserializer”) and output type are provided by user 11 Loading data 12 queries = LOAD „query_log.txt‟ USING myLoad( ) AS (userID, queryString, timeStamp)3 Loading data • USING userfuction( ) -- is optional – Default deserializer expects tab-delimited file • AS type – is optional – Default is a record with unnamed fields; refer to them as $0, $1, … • The return value of LOAD is just a handle to a bag – The actual reading is done in pull mode, or parallelized 13 FOREACH 14 expanded_queries = FOREACH queries GENERATE userId, expandQuery(queryString) expandQuery( ) is a UDF that produces likely expansions Note: it returns a bag, hence expanded_queries is a nested bag FOREACH 15 expanded_queries = FOREACH queries GENERATE userId, flatten(expandQuery(queryString)) Now we get a flat collection 16 FLATTEN Note that it is NOT a first class function ! (that‟s one thing I don‟t like about Pig-latin) • First class FLATTEN: – FLATTEN({{2,3},{5},{},{4,5,6}}) = {2,3,5,4,5,6} – Type: {{T}} {T} • Pig-latin FLATTEN – FLATTEN({4,5,6}) = 4, 5, 6 – Type: {T} T, T, T, …, T ????? 17 FILTER 18 real_queries = FILTER queries BY userId neq „bot‟ Remove all queries from Web bots: real_queries = FILTER queries BY NOT isBot(userId) Better: use a complex UDF to detect Web bots:4 JOIN 19 join_result = JOIN results BY queryString revenue BY queryString results: {(queryString, url, position)} revenue: {(queryString, adSlot, amount)} join_result : {(queryString, url, position, adSlot, amount)} 20 GROUP BY 21 grouped_revenue = GROUP revenue BY queryString query_revenues = FOREACH grouped_revenue GENERATE queryString, SUM(revenue.amount) AS totalRevenue revenue: {(queryString, adSlot, amount)} grouped_revenue: {(queryString, {(adSlot, amount)})} query_revenues: {(queryString, totalRevenue)} Simple Map-Reduce 22 map_result = FOREACH input GENERATE FLATTEN(map(*)) key_groups = GROUP map_result BY $0 output = FOREACH key_groups GENERATE reduce($1) input : {(field1, field2, field3, . . . .)} map_result : {(a1, a2, a3, . . .)} key_groups : {(a1, {(a2, a3, . . .)})} Co-Group 23 grouped_data = COGROUP results BY queryString, revenue BY queryString; results: {(queryString, url, position)} revenue: {(queryString, adSlot, amount)} grouped_data: {(queryString, results:{(url, position)}, revenue:{(adSlot, amount)})} What is the output type in general ? Co-Group 24 Is this an inner join, or an outer join ?5 Co-Group 25 url_revenues = FOREACH grouped_data GENERATE FLATTEN(distributeRevenue(results, revenue)); grouped_data: {(queryString, results:{(url, position)}, revenue:{(adSlot, amount)})} distributeRevenue is a UDF that accepts search re- sults and revenue information for a query string at a time, and outputs a bag of urls and the revenue attributed to them. Co-Group v.s. Join 26 grouped_data = COGROUP results BY queryString, revenue BY queryString; join_result = FOREACH grouped_data GENERATE FLATTEN(results), FLATTEN(revenue); grouped_data: {(queryString, results:{(url, position)}, revenue:{(adSlot, amount)})} Result is the same as JOIN Asking for Output: STORE 27 STORE query_revenues INTO `myoutput' USING myStore(); Meaning: write query_revenues to the file „myoutput‟ Implementation • Over Hadoop ! • Parse query: – Everything between LOAD and STORE one logical plan • Logical plan sequence of Map/Reduce ops • All statements between two (CO)GROUPs one Map/Reduce op 28 Implementation
View Full Document