-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnormal.py
30 lines (24 loc) · 1.08 KB
/
normal.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from __future__ import print_function
import time
import sys
from operator import add
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
start_time = time.time()
print("Starting...")
def getSqlContextInstance(sparkContext):
if ('sqlContextSingletonInstance' not in globals()):
globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
return globals()['sqlContextSingletonInstance']
if len(sys.argv) != 2:
print("Usage: uberstats <file>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="UberStats")
df = getSqlContextInstance(sc).read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load(sys.argv[1])
df.coalesce(1).write.format('com.databricks.spark.csv').options(header='true').save('c:\l')
df.registerTempTable("uber")
getSqlContextInstance(sc).sql("""select distinct(`dispatching_base_number`),
sum(`trips`) as cnt from uber group by `dispatching_base_number`
order by cnt desc""").show()
sc.stop()
print("--- %s seconds ---" % (time.time() - start_time))