Apache Spark

******************************************Spark APS***********************************************

In order to proces the data in haddop we need to convert the data in RDD (Resilient distributed Dataset) and then we can apply neccessary transformation and action on the RDD. The transformation is to process the data and the actions are used to process tha data. When the transfortaion is applied on the rdd, another RDD will be created, once the RDD is created we cannot odiyf it, but we can create another RDD using the provious RDD.

Action :
first() : this is use to get the first row for the RDD (Eg : orderItems.first())

take(n) : gets the first n row from the datasets

collect() : it is used t convert the rdd into python collections, this is used when we have to apply the aps which is not present in    spark-aps but in the python-aps.

parallelize() : this is used to convert the collection into the RDD. When we read the data from the local file and if we want to process the data then we need to convert the data into RDD, so we need to use the parallelize() to convert the data into RDD
EXAMPLE: lrdd = sc.parallelize(l) (l is the collections)

open : this is used to read the file from the local directory data = open("data/retail_db/product/part-00000").read().splitlines()
textFile = read the file from the hadoop diretory and create an RDD



******************************read the data from the different file format******************************************

load , read

load : it is an aps which can read json, orc, parquet and text file and create an data frame
sqlContext.load("/public/retail_db_json/order_items","json").show()


read is also an ap which is used to read the data from different file format:
sqlContext.read.json("/public/retail_db_json/order_items").show()
here instead of json we can use text,orc,parquet


******************************processing the rdd******************************************

Map :
below is the script used to apply the map tarnsfromation function on the rdd:
Item_map=Items.map(lambda i :(int (i.split(",")[1]),float(i.split(",")[4]))

Flat map : its is similar to the map function but, it takes on element and produce 0 or more elements

list = ["hello how are you","I am fine","How was work today"] // create the list
data = sc.parallelize(list) // create the rdd
result = data.flatMap(lambda i : i.split(" "))
for i in result.collect() : print(i)

filter :
The filter is to apply the condition and get the set of data which will satisfy the condition:

orders = sc.textFile("/public/retail_db/orders")
order_filter = Orders.filter(lambda i : (i.split(",")[3] in ["CLOSED","COMPLETE"] and i.split(",")[1][:7] == "2014-01"))


************************************join the RDD*******************************************

--inner_join :  this tool is used to join the two rdd based on the same key. the joining RDD will be in the form of (key,value) pair and when we join two RDD (K1,V1) and (K2,V2) we get (K , (V1,V2))

orders = sc.textFile("/public/retail_db/orders")
items = sc.textFile("/public/retail_db/order_items")

order_map = orders.map(lambda i :(int(i.split(",")[0]),i.split(",")[1]))
item_map = items.map(lambda i :(int(i.split(",")[1]),float(i.split(",")[4])))

joined_result = order_map.join(item_map)


--leftouterjoin /rightouterjoin

orders = sc.textFile("/public/retail_db/orders")
items = sc.textFile("/public/retail_db/order_items")

order_map = orders.map(lambda i :(int(i.split(",")[0]),i.split(",")[3]))
item_map = items.map(lambda i :(int(i.split(",")[1]),float(i.split(",")[4])))

joined_result = order_map.leftOuterJoin(item_map)   (parent map should be in the left side and the child should be in the right side)

joined_filter = joined_result.filter(lambda i : i[1][1] == None) //get the data which does not have entry in the child table but it is in the parent table

joined_result = item_map.rightOuterJoin(order_map)


*********************************************aggregation*********************************
There are two type os aggregation 1. total aggregation and 2.bykey/group by aggregation

1. total aggregation

takes tha set of vaue and then aggregate it and provide on single output

reduce(func):

items = sc.textFile("/public/retail_db/order_items")

items_filter = items.filter(lambda i: int(i.split(",")[1]) == 2)
items_map = items_filter.map(lambda i : float(i.split(",")[4]))

two ways to add the values
1. from operator import add
   items_map.reduce(add)

2. items_map.reduce(lambda x,y:x+y)

The reduce funstion can also be used for getting the minimum value in the set of values:

items = sc.textFile("/public/retail_db/order_items")

items_filter = items.filter(lambda i: int(i.split(",")[1]) == 2)
items_filter.reduce(lambda x , y : x if (float(x.split(",")[4]) < float(y.split(",")[4])) else y)

total aggregation also have count()

2. bykey/group by aggregation

countByKey : this will get the count for the unique key, the inpout it takes is in the from of (K,V) and the out will be (k, count)

orders = sc.textFile("/public/retail_db/orders")

order_map = orders.map(lambda i : (i.split(",")[3], 1))
result = order_map.countByKey()


--> the difference between groupByKey, reduceByKey,aggregateByKey

groupByKey does not uses the combiner by reduceByKey,aggregateByKey uses the combiner, due to whihc the reduceByKey,aggregateByKey is fatser
(combiner :  when we try to add (1 to 1000), this take will be divided into number of thread assigned and each tread will run parallely and each thred function is to add and their will 4 different result and the final caculation will bd adding these four values)
for example (sum 1 to 100) -> sum(1,25),sum(26-50),sum(51,75),sum(76,100) //four thread will have different range
sum(t1,t2,t3,t4) // the output fom each thread will be added

the difference between reduceByKey,aggregateByKey is that in reduceByKey if the oprtaion done by each thread is the same operation done to get final output then we can use reduceByKey for example the above example, but if e are tryomg to do average where the first operation is to add and the final opertaion is to divided then in that case we can use aggregateByKey


1. groupByKey : it takes the (k,v) and the output will be (k,iterable(v))

items = sc.textFile("/public/retail_db/order_items")

item_map = items.map(lambda i: (int(i.split(",")[1]), float(i.split(",")[4])))
result = item_map.groupByKey()
final = result.map(lambda i :( int(i[0]), round(sum(i[1]),2)))

we can also do the sorting on the group od itesm in the single key

items = sc.textFile("/public/retail_db/order_items")
items_map = items.map(lambda i : (int(i.split(",")[1]),i))
o/p: (2, u'2,2,1073,1,199.99,199.99')
(2, u'3,2,502,5,250.0,50.0')
(2, u'4,2,403,1,129.99,129.99')

group = items_map.groupByKey()
o/p: (2, <pyspark.resultiterable.ResultIterable object at 0x61d6c7a5e090>)
//<pyspark.resultiterable.ResultIterable object at 0x61d6c7a5e090> = [2,2,1073,1,199.99,199.99
3,2,502,5,250.0,50.0
4,2,403,1,129.99,129.99])
(4, <pyspark.resultiterable.ResultIterable object at 0x61d6c7a5eb50>)
(8, <pyspark.resultiterable.ResultIterable object at 0x61d6c7a5e490>)


final = group.flatMap(lambda i : sorted(i[1],key=(lambda j:float(j.split(",")[4])),reverse = True))

o/p:
3,2,502,5,250.0,50.0
2,2,1073,1,199.99,199.99
4,2,403,1,129.99,129.99
6,4,365,5,299.95,59.99
8,4,1014,4,199.92,49.98
7,4,502,3,150.0,50.0
5,4,897,2,49.98,24.99

i[1] consists of list of tuples

2 . reduceByKey

items = sc.textFile("/public/retail_db/order_items")
item_map = items.map(lambda i : (int(i.split(",")[1]),float(i.split(",")[4])))

result = item_map.reduceByKey(lambda x,y:x+y)  //get the sum of the set of value with same key

final = item_map.reduceByKey(lambda x , y: x if (x< y) else y) // get the min value from the set of values in the single key

items = sc.textFile("/public/retail_db/order_items")
items_map = items.map(lambda i : (int(i.split(",")[1]),i))
final = items_map.reduceByKey(lambda x, y: x if (float(x.split(",")[4])< float(y.split(",")[4])) else y)  //get the details of the min value in the set


3. aggregateByKey

items = sc.textFile("/public/retail_db/order_items")
item_map = items.map(lambda i : (int(i.split(",")[1]),float(i.split(",")[4])))
final = item_map.aggregateByKey((0.0,0),lambda x,y: (x[0]+y,x[1]+1),lambda x,y:(x[0]+y[0], x[1]+y[1]))

the first argumnet of the aggregateByKey is the structure of the output key of the output. the aggregateByKey will take (k,v) and provide the output (k,u) u will be different from v and the structure of U will be the first agrument of the aggregateByKey. the second argument x will be the tuple (0.0,0) and y will be the value that has to be added to x[0] and it is float value. and if the first lambda function does not complete the operation then it will go to second argument.

4. sortByKey() : this tool will taken (k,v) and provide the sorted output of the form (k,v). the data will be sorted by key (k)

product = sc.textFile("/public/retail_db/products")
product_map = product.filter(lambda i: (i.split(",")[4]) != "").map(lambda i: (float(i.split(",")[4]) , i))
// te above query we use filter because before the field 4 their is the field for description so it have description with "," so the split was dne incorrectly so the field 4 was empty so we use filter to discard the data which has empty value in field 4
final = product_map.sortByKey()
result = final.map(lambda i:i[1])

sort by composite key (when their are two key to sort)

product = sc.textFile("/public/retail_db/products")
product_map = product.filter(lambda i: (i.split(",")[4]) != "").map(lambda i: ((int(i.split(",")[1]),-float(i.split(",")[4])) , i))
final = product_map.sortByKey()
result = final.map(lambda i:i[1])


************************************************ranking**************************************************
get top 5 from the sorted data by the productprice

product = sc.textFile("/public/retail_db/products")
product_map = product.filter(lambda i: (i.split(",")[4]) != "").map(lambda i: (float(i.split(",")[4]) , i))
final = product_map.sortByKey(False)
for i in final.map(lambda p:p[1]).take(5):print(i)

the above script can be reduce by using either top or takeordered

product = sc.textFile("/public/retail_db/products")
product_map = product.filter(lambda i: (i.split(",")[4]) != "")
final=product_map.top(5,key=lambda i: float(i.split(",")[4]))

final=product_map.takeOrdered(5,key=lambda i: -float(i.split(",")[4]))


1. groupByKey : ranking the data

product = sc.textFile("/public/retail_db/products")
product_map = product.filter(lambda i: (i.split(",")[4]) != "")
final = product_map.map(lambda i: (int(i.split(",")[1]),i))
result=final.groupByKey()
sorted(t[1],key=lambda i : float(i.split(",")[4]),reverse =False)
group = result.flatMap(lambda j : sorted(j[1],key=lambda i : float(i.split(",")[4]),reverse =True)[:3])


2. get top 3 price in single category:
product = sc.textFile("/public/retail_db/products")
product_map = product.filter(lambda i: float(i.split(",")[4] != "")).map(lambda j: (int(j.split(",")[1]),j))
result = product_map.groupByKey()
t = result.filter(lambda p:p[0] == 59).first()  //single category
result_sort= sorted(t[1],key=lambda i : float(i.split(",")[4]),reverse =True)  //sort the data based on the prodcut_price
l_map = map(lambda p : float(p.split(",")[4]),result_sort)  // get just the product_price
topN = sorted(set(l_map),reverse=True)[:3]  //get top three product_price
import itertools as it
final = it.takewhile(lambda p:float(p.split(",")[4]) in topN, result_sort)

above can we used with function given below

def defined_function(lst , n) :
result_sort= sorted(lst[1],key=lambda i : float(i.split(",")[4]),reverse =True) 
l_map = map(lambda p : float(p.split(",")[4]),result_sort) 
topN = sorted(set(l_map),reverse=True)[:n] 
import itertools as it
return it.takewhile(lambda p:float(p.split(",")[4]) in topN, result_sort)

sent = result.flatMap(lambda i : defined_function(i,3))


****************************************Boolean operation*************************************************

prepare the data
orders = sc.textFile("/public/retail_db/orders")
items = sc.textFile("/public/retail_db/order_items")

order2013 = orders.filter(lambda i: i.split(",")[1][:7] == "2013-12").map(lambda j : (int(j.split(",")[0]),j))
order2014 = orders.filter(lambda i: i.split(",")[1][:7] == "2014-01").map(lambda j : (int(j.split(",")[0]),j))

item_map = items.map(lambda j : (int(j.split(",")[1]),j))

result2013 = order2013.join(item_map).map(lambda j : j[1][1])
result2014 = order2014.join(item_map).map(lambda j : j[1][1])

1.union operation:


set2013 = result2013.map(lambda j : int(j.split(",")[2]))
set2014 = result2014.map(lambda j : int(j.split(",")[2]))
final_union = set2013.union(set2014)
final_union = set2013.union(set2014).distinct()  //get the unique value

2. subtract (a-b /b-a) also ((a-b)union(b-a))

set2013 = result2013.map(lambda j : int(j.split(",")[2]))
set2014 = result2014.map(lambda j : int(j.split(",")[2]))

final_2013 = set2013.subtract(set2014).distinct()
final_2014 = set2014.subtract(set2013).distinct()

union_1314 = final_2013.union(final_2014)

******************************save the data in hdfs****************************

items = sc.textFile("/public/retail_db/order_items")
items_map = items.map(lambda i : (int(i.split(",")[1]), float(i.split(",")[4])))

result = items_map.reduceByKey(lambda x,y:x+y)
saving = result.map(lambda j : (str(j[0]) + "\t" + str(j[1])))
saving.saveAsTextFile("/user/pramodpn/newdata")

//use compression
saving.saveAsTextFile("/user/pramodpn/newdata1", compressionCodecClass= "org.apache.hadoop.io.compress.SnappyCodec")

to save the file in orc, json,avro and parquet we need to make sure that the data is converted to dataframe and then save it in that given directory

items = sc.textFile("/public/retail_db/order_items")
items_map = items.map(lambda i : (int(i.split(",")[1]), float(i.split(",")[4])))

result = items_map.reduceByKey(lambda x,y:x+y).map(lambda j : (j[0],round(j[1],2)))
result_df = result.toDF(schema=["order_id","order_revenue"])

result_df.save("/user/pramodpn/new","json")
result_df.write.json("/user/pramodpn/new1")



Comments