In the table below, find the max of each row. Lets assume there are 10 columns after the key column.
| Key | Col 1 | Col 2 | Col 3 | ... | Col n |
|---|---|---|---|---|---|
| A | 50 | 100 | 20 | ... | 80 |
| B | 25 | 92 | 99 | ... | 10 |
| ... | ... | ... | ... | ... | ... |
| Z | 121 | 55 | 300 | ... | 88 |
x1import findspark2from pyspark.sql import SparkSession34findspark.init()5spark = SparkSession \6 .builder \7 .appName("Example") \8 .getOrCreate()map. All global data is available to this functionx
1def find_max(inx):2 v_max = -float('inf')3 k_max = None45 for key in series_data:6 v = float(series_data[key][inx])7 if delta > v:8 k_max = key9 v_max = v10 return {'n': k_max, 'y': str(inx)}xxxxxxxxxx101# read the data frame from csv file2df = spark.read \3 .format("csv") \4 .option("header", "true") \5 .option("inferSchema", "true") \6 .load(7 "filepath/filename")8df.printSchema()9df.show(10)10df.count()xxxxxxxxxx61print(len(df.columns))2# column_1 is key to data, the rest of the columns are data corresponding to the key3col_size = len(df.columns) - 1 # ignore the 0th column, that is the key4# select and filter the data frame by attributes (a), returns a list5list_data = df.select(a1.a2, a3.a4.a5).filter(a6.a7 == "value").collect()6data = df.collect()x
1series_data = dict()2# column 0 are the keys, get some of these rows and store in a dictionary3region_dict = {"key_1": 10, "key_2": 15, "key_3": 53, "key_4": 30}45# create series data for each key6for key in region_dict:7 inx = region_dict[key]8 row = list()9 for x in range(0, col_size): 10 row.append(data[inx][str(x)])11 series_data[key] = rowx
1# create spark context2sc = spark.sparkContext.getOrCreate()3result = sc.parallelize(range(col_size)).map(find_max).collect() 45for k in result:6 print(k)Important Note:
All global variables are broadcasted by Spark. That means all global data is copied.
The map function needs to be defined prior to the call as python is interpreted. All global data is available in this function.
Data cannot be in main function of python as in that case it will be local and will not be broadcasted.