In the table below, find the max of each row. Lets assume there are 10 columns after the key
column.
Key | Col 1 | Col 2 | Col 3 | ... | Col n |
---|---|---|---|---|---|
A | 50 | 100 | 20 | ... | 80 |
B | 25 | 92 | 99 | ... | 10 |
... | ... | ... | ... | ... | ... |
Z | 121 | 55 | 300 | ... | 88 |
x1import findspark
2from pyspark.sql import SparkSession
3
4findspark.init()
5spark = SparkSession \
6 .builder \
7 .appName("Example") \
8 .getOrCreate()
map
. All global data is available to this functionx
1def find_max(inx):
2 v_max = -float('inf')
3 k_max = None
4
5 for key in series_data:
6 v = float(series_data[key][inx])
7 if delta > v:
8 k_max = key
9 v_max = v
10 return {'n': k_max, 'y': str(inx)}
xxxxxxxxxx
101# read the data frame from csv file
2df = spark.read \
3 .format("csv") \
4 .option("header", "true") \
5 .option("inferSchema", "true") \
6 .load(
7 "filepath/filename")
8df.printSchema()
9df.show(10)
10df.count()
xxxxxxxxxx
61print(len(df.columns))
2# column_1 is key to data, the rest of the columns are data corresponding to the key
3col_size = len(df.columns) - 1 # ignore the 0th column, that is the key
4# select and filter the data frame by attributes (a), returns a list
5list_data = df.select(a1.a2, a3.a4.a5).filter(a6.a7 == "value").collect()
6data = df.collect()
x
1series_data = dict()
2# column 0 are the keys, get some of these rows and store in a dictionary
3region_dict = {"key_1": 10, "key_2": 15, "key_3": 53, "key_4": 30}
4
5# create series data for each key
6for key in region_dict:
7 inx = region_dict[key]
8 row = list()
9 for x in range(0, col_size):
10 row.append(data[inx][str(x)])
11 series_data[key] = row
x
1# create spark context
2sc = spark.sparkContext.getOrCreate()
3result = sc.parallelize(range(col_size)).map(find_max).collect()
4
5for k in result:
6 print(k)
Important Note:
All global variables are broadcasted by Spark. That means all global data is copied.
The map function needs to be defined prior to the call as python is interpreted. All global data is available in this function.
Data cannot be in main function of python as in that case it will be local and will not be broadcasted.