Run PySpark script from command line

In this PySpark Tutorial we will run pyspark script from command line.

Run PySpark script from command line

Run PySpark script from command line - Run Hello World Program from command line

In previous session we developed Hello World PySpark program and used pyspark interpreter to run the program. The pyspark interpreter is used to run program by typing it on console and it is executed on the Spark cluster.

The pyspark console is useful for development of application where programmers can write code and see the results immediately. If there is any code error he/she can update and test the code. This mode is very good for development. But for running on production spark-submit job is used which connects to the Spark cluster and runs the saved code file (.py).

After development and testing of the code developers saves into .py file and then it can be submitted to Spark cluster with spark-submit script.

What is spark-submit Script?

The spark-submit script is distributed with the Spark distribution and it is accessible from the bin directory of Spark. This tool is used to submit Java, Scala, PySpark, R and TensorFlow job on the distributed Spark cluster.

In our tutorial we have one node cluster so the program will be executed on the same machine.

To run the program with spark-submit script we have to manually create the SparkContext object in the program. If its not created manually then it will throw NameError: name 'sc' is not defined error.

Required code is:

from pyspark import SparkContext

sc = SparkContext("local", "Hello World App")

Create a new file (helloworld.py) in the bin directory of Spark and add following code:

from pyspark import SparkContext
from operator import add
from pyspark import SparkContext

sc = SparkContext("local", "Hello World App")

data = sc.parallelize(list("Hello World"))
counts = data.map(lambda x:
        (x, 1)).reduceByKey(add).sortBy(lambda x: x[1],
         ascending=False).collect()

for (word, count) in counts:
    print("{}: {}".format(word, count))

Save file and exit from the vi editor (if you are using vi). You can use any of the text editor tool in Ubuntu.

To run the program type following command on the terminal:

deepak@deepak-VirtualBox:~/spark/spark-2.3.0-bin-hadoop2.7/bin$ ./spark-submit helloworld.py 

This will submit the job on the Spark standalone cluster and display following result:

deepak@deepak-VirtualBox:~/spark/spark-2.3.0-bin-hadoop2.7/bin$ clear

deepak@deepak-VirtualBox:~/spark/spark-2.3.0-bin-hadoop2.7/bin$ ./spark-submit helloworld.py 
2018-04-29 17:56:45 WARN  Utils:66 - Your hostname, deepak-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
2018-04-29 17:56:45 WARN  Utils:66 - Set SPARK_LOCAL_IP if you need to bind to another address
2018-04-29 17:56:45 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2018-04-29 17:56:46 INFO  SparkContext:54 - Running Spark version 2.3.0
2018-04-29 17:56:46 INFO  SparkContext:54 - Submitted application: Hello World App
2018-04-29 17:56:46 INFO  SecurityManager:54 - Changing view acls to: deepak
2018-04-29 17:56:46 INFO  SecurityManager:54 - Changing modify acls to: deepak
2018-04-29 17:56:46 INFO  SecurityManager:54 - Changing view acls groups to: 
2018-04-29 17:56:46 INFO  SecurityManager:54 - Changing modify acls groups to: 
2018-04-29 17:56:46 INFO  SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(deepak); groups with view permissions: Set(); users  with modify permissions: Set(deepak); groups with modify permissions: Set()
2018-04-29 17:56:46 INFO  Utils:54 - Successfully started service 'sparkDriver' on port 41270.
2018-04-29 17:56:46 INFO  SparkEnv:54 - Registering MapOutputTracker
2018-04-29 17:56:46 INFO  SparkEnv:54 - Registering BlockManagerMaster
2018-04-29 17:56:46 INFO  BlockManagerMasterEndpoint:54 - Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
2018-04-29 17:56:46 INFO  BlockManagerMasterEndpoint:54 - BlockManagerMasterEndpoint up
2018-04-29 17:56:46 INFO  DiskBlockManager:54 - Created local directory at /tmp/blockmgr-4b2dd654-bff5-415d-81e9-ecb4440fc019
2018-04-29 17:56:46 INFO  MemoryStore:54 - MemoryStore started with capacity 366.3 MB
2018-04-29 17:56:46 INFO  SparkEnv:54 - Registering OutputCommitCoordinator
2018-04-29 17:56:46 INFO  log:192 - Logging initialized @2144ms
2018-04-29 17:56:46 INFO  Server:346 - jetty-9.3.z-SNAPSHOT
2018-04-29 17:56:46 INFO  Server:414 - Started @2268ms
2018-04-29 17:56:46 INFO  AbstractConnector:278 - Started ServerConnector@65fc67fa{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
2018-04-29 17:56:46 INFO  Utils:54 - Successfully started service 'SparkUI' on port 4040.
2018-04-29 17:56:46 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@34818b7a{/jobs,null,AVAILABLE,@Spark}
2018-04-29 17:56:46 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@40799694{/jobs/json,null,AVAILABLE,@Spark}
2018-04-29 17:56:46 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@71365737{/jobs/job,null,AVAILABLE,@Spark}
2018-04-29 17:56:46 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@4b666305{/jobs/job/json,null,AVAILABLE,@Spark}
2018-04-29 17:56:46 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@472781cb{/stages,null,AVAILABLE,@Spark}
2018-04-29 17:56:46 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@4ebe9f7{/stages/json,null,AVAILABLE,@Spark}
2018-04-29 17:56:46 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@3503c726{/stages/stage,null,AVAILABLE,@Spark}
2018-04-29 17:56:46 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@6d28fe16{/stages/stage/json,null,AVAILABLE,@Spark}
2018-04-29 17:56:46 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@46392e62{/stages/pool,null,AVAILABLE,@Spark}
2018-04-29 17:56:46 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@24f8a1d9{/stages/pool/json,null,AVAILABLE,@Spark}
2018-04-29 17:56:46 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@2acad461{/storage,null,AVAILABLE,@Spark}
2018-04-29 17:56:46 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@b84760{/storage/json,null,AVAILABLE,@Spark}
2018-04-29 17:56:46 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@ffd232b{/storage/rdd,null,AVAILABLE,@Spark}
2018-04-29 17:56:46 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@6eeaad64{/storage/rdd/json,null,AVAILABLE,@Spark}
2018-04-29 17:56:46 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@12ff1504{/environment,null,AVAILABLE,@Spark}
2018-04-29 17:56:46 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@503ed37c{/environment/json,null,AVAILABLE,@Spark}
2018-04-29 17:56:46 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@36bc462b{/executors,null,AVAILABLE,@Spark}
2018-04-29 17:56:46 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@d064c15{/executors/json,null,AVAILABLE,@Spark}
2018-04-29 17:56:46 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@67bb2f84{/executors/threadDump,null,AVAILABLE,@Spark}
2018-04-29 17:56:46 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@41b8a53c{/executors/threadDump/json,null,AVAILABLE,@Spark}
2018-04-29 17:56:46 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@15e13e3e{/static,null,AVAILABLE,@Spark}
2018-04-29 17:56:46 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@5ff0a0cc{/,null,AVAILABLE,@Spark}
2018-04-29 17:56:46 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@11765f4d{/api,null,AVAILABLE,@Spark}
2018-04-29 17:56:46 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@6e010294{/jobs/job/kill,null,AVAILABLE,@Spark}
2018-04-29 17:56:46 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@3ca086de{/stages/stage/kill,null,AVAILABLE,@Spark}
2018-04-29 17:56:46 INFO  SparkUI:54 - Bound SparkUI to 0.0.0.0, and started at http://10.0.2.15:4040
2018-04-29 17:56:47 INFO  SparkContext:54 - Added file file:/home/deepak/spark/spark-2.3.0-bin-hadoop2.7/bin/helloworld.py at file:/home/deepak/spark/spark-2.3.0-bin-hadoop2.7/bin/helloworld.py with timestamp 1525004807219
2018-04-29 17:56:47 INFO  Utils:54 - Copying /home/deepak/spark/spark-2.3.0-bin-hadoop2.7/bin/helloworld.py to /tmp/spark-a4e5f638-8582-4976-a29f-2fcbc7f1a046/userFiles-bb2ea523-cffd-406c-8a23-d37e238742ac/helloworld.py
2018-04-29 17:56:47 INFO  Executor:54 - Starting executor ID driver on host localhost
2018-04-29 17:56:47 INFO  Utils:54 - Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 41058.
2018-04-29 17:56:47 INFO  NettyBlockTransferService:54 - Server created on 10.0.2.15:41058
2018-04-29 17:56:47 INFO  BlockManager:54 - Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
2018-04-29 17:56:47 INFO  BlockManagerMaster:54 - Registering BlockManager BlockManagerId(driver, 10.0.2.15, 41058, None)
2018-04-29 17:56:47 INFO  BlockManagerMasterEndpoint:54 - Registering block manager 10.0.2.15:41058 with 366.3 MB RAM, BlockManagerId(driver, 10.0.2.15, 41058, None)
2018-04-29 17:56:47 INFO  BlockManagerMaster:54 - Registered BlockManager BlockManagerId(driver, 10.0.2.15, 41058, None)
2018-04-29 17:56:47 INFO  BlockManager:54 - Initialized BlockManager: BlockManagerId(driver, 10.0.2.15, 41058, None)
2018-04-29 17:56:47 INFO  ContextHandler:781 - Started o.s.j.s.ServletContextHandler@411003bc{/metrics/json,null,AVAILABLE,@Spark}
2018-04-29 17:56:48 INFO  SparkContext:54 - Starting job: collect at /home/deepak/spark/spark-2.3.0-bin-hadoop2.7/bin/helloworld.py:10
2018-04-29 17:56:48 INFO  DAGScheduler:54 - Registering RDD 2 (reduceByKey at /home/deepak/spark/spark-2.3.0-bin-hadoop2.7/bin/helloworld.py:9)
2018-04-29 17:56:48 INFO  DAGScheduler:54 - Got job 0 (collect at /home/deepak/spark/spark-2.3.0-bin-hadoop2.7/bin/helloworld.py:10) with 1 output partitions
2018-04-29 17:56:48 INFO  DAGScheduler:54 - Final stage: ResultStage 1 (collect at /home/deepak/spark/spark-2.3.0-bin-hadoop2.7/bin/helloworld.py:10)
2018-04-29 17:56:48 INFO  DAGScheduler:54 - Parents of final stage: List(ShuffleMapStage 0)
2018-04-29 17:56:48 INFO  DAGScheduler:54 - Missing parents: List(ShuffleMapStage 0)
2018-04-29 17:56:48 INFO  DAGScheduler:54 - Submitting ShuffleMapStage 0 (PairwiseRDD[2] at reduceByKey at /home/deepak/spark/spark-2.3.0-bin-hadoop2.7/bin/helloworld.py:9), which has no missing parents
2018-04-29 17:56:48 INFO  MemoryStore:54 - Block broadcast_0 stored as values in memory (estimated size 7.1 KB, free 366.3 MB)
2018-04-29 17:56:48 INFO  MemoryStore:54 - Block broadcast_0_piece0 stored as bytes in memory (estimated size 4.8 KB, free 366.3 MB)
2018-04-29 17:56:48 INFO  BlockManagerInfo:54 - Added broadcast_0_piece0 in memory on 10.0.2.15:41058 (size: 4.8 KB, free: 366.3 MB)
2018-04-29 17:56:48 INFO  SparkContext:54 - Created broadcast 0 from broadcast at DAGScheduler.scala:1039
2018-04-29 17:56:48 INFO  DAGScheduler:54 - Submitting 1 missing tasks from ShuffleMapStage 0 (PairwiseRDD[2] at reduceByKey at /home/deepak/spark/spark-2.3.0-bin-hadoop2.7/bin/helloworld.py:9) (first 15 tasks are for partitions Vector(0))
2018-04-29 17:56:48 INFO  TaskSchedulerImpl:54 - Adding task set 0.0 with 1 tasks
2018-04-29 17:56:48 INFO  TaskSetManager:54 - Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 7916 bytes)
2018-04-29 17:56:48 INFO  Executor:54 - Running task 0.0 in stage 0.0 (TID 0)
2018-04-29 17:56:48 INFO  Executor:54 - Fetching file:/home/deepak/spark/spark-2.3.0-bin-hadoop2.7/bin/helloworld.py with timestamp 1525004807219
2018-04-29 17:56:48 INFO  Utils:54 - /home/deepak/spark/spark-2.3.0-bin-hadoop2.7/bin/helloworld.py has been previously copied to /tmp/spark-a4e5f638-8582-4976-a29f-2fcbc7f1a046/userFiles-bb2ea523-cffd-406c-8a23-d37e238742ac/helloworld.py
2018-04-29 17:56:49 INFO  PythonRunner:54 - Times: total = 366, boot = 355, init = 10, finish = 1
2018-04-29 17:56:49 INFO  Executor:54 - Finished task 0.0 in stage 0.0 (TID 0). 1478 bytes result sent to driver
2018-04-29 17:56:49 INFO  TaskSetManager:54 - Finished task 0.0 in stage 0.0 (TID 0) in 787 ms on localhost (executor driver) (1/1)
2018-04-29 17:56:49 INFO  TaskSchedulerImpl:54 - Removed TaskSet 0.0, whose tasks have all completed, from pool 
2018-04-29 17:56:49 INFO  DAGScheduler:54 - ShuffleMapStage 0 (reduceByKey at /home/deepak/spark/spark-2.3.0-bin-hadoop2.7/bin/helloworld.py:9) finished in 1.039 s
2018-04-29 17:56:49 INFO  DAGScheduler:54 - looking for newly runnable stages
2018-04-29 17:56:49 INFO  DAGScheduler:54 - running: Set()
2018-04-29 17:56:49 INFO  DAGScheduler:54 - waiting: Set(ResultStage 1)
2018-04-29 17:56:49 INFO  DAGScheduler:54 - failed: Set()
2018-04-29 17:56:49 INFO  DAGScheduler:54 - Submitting ResultStage 1 (PythonRDD[5] at collect at /home/deepak/spark/spark-2.3.0-bin-hadoop2.7/bin/helloworld.py:10), which has no missing parents
2018-04-29 17:56:49 INFO  MemoryStore:54 - Block broadcast_1 stored as values in memory (estimated size 7.5 KB, free 366.3 MB)
2018-04-29 17:56:49 INFO  MemoryStore:54 - Block broadcast_1_piece0 stored as bytes in memory (estimated size 4.8 KB, free 366.3 MB)
2018-04-29 17:56:49 INFO  BlockManagerInfo:54 - Added broadcast_1_piece0 in memory on 10.0.2.15:41058 (size: 4.8 KB, free: 366.3 MB)
2018-04-29 17:56:49 INFO  SparkContext:54 - Created broadcast 1 from broadcast at DAGScheduler.scala:1039
2018-04-29 17:56:49 INFO  DAGScheduler:54 - Submitting 1 missing tasks from ResultStage 1 (PythonRDD[5] at collect at /home/deepak/spark/spark-2.3.0-bin-hadoop2.7/bin/helloworld.py:10) (first 15 tasks are for partitions Vector(0))
2018-04-29 17:56:49 INFO  TaskSchedulerImpl:54 - Adding task set 1.0 with 1 tasks
2018-04-29 17:56:49 INFO  TaskSetManager:54 - Starting task 0.0 in stage 1.0 (TID 1, localhost, executor driver, partition 0, ANY, 7649 bytes)
2018-04-29 17:56:49 INFO  Executor:54 - Running task 0.0 in stage 1.0 (TID 1)
2018-04-29 17:56:49 INFO  ShuffleBlockFetcherIterator:54 - Getting 1 non-empty blocks out of 1 blocks
2018-04-29 17:56:49 INFO  ShuffleBlockFetcherIterator:54 - Started 0 remote fetches in 5 ms
2018-04-29 17:56:49 INFO  BlockManagerInfo:54 - Removed broadcast_0_piece0 on 10.0.2.15:41058 in memory (size: 4.8 KB, free: 366.3 MB)
2018-04-29 17:56:49 INFO  PythonRunner:54 - Times: total = 40, boot = -491, init = 530, finish = 1
2018-04-29 17:56:49 INFO  Executor:54 - Finished task 0.0 in stage 1.0 (TID 1). 1766 bytes result sent to driver
2018-04-29 17:56:49 INFO  TaskSetManager:54 - Finished task 0.0 in stage 1.0 (TID 1) in 139 ms on localhost (executor driver) (1/1)
2018-04-29 17:56:49 INFO  TaskSchedulerImpl:54 - Removed TaskSet 1.0, whose tasks have all completed, from pool 
2018-04-29 17:56:49 INFO  DAGScheduler:54 - ResultStage 1 (collect at /home/deepak/spark/spark-2.3.0-bin-hadoop2.7/bin/helloworld.py:10) finished in 0.195 s
2018-04-29 17:56:49 INFO  DAGScheduler:54 - Job 0 finished: collect at /home/deepak/spark/spark-2.3.0-bin-hadoop2.7/bin/helloworld.py:10, took 1.451364 s
l: 3
o: 2
H: 1
e: 1
 : 1
W: 1
r: 1
d: 1
2018-04-29 17:56:49 INFO  SparkContext:54 - Invoking stop() from shutdown hook
2018-04-29 17:56:49 INFO  AbstractConnector:318 - Stopped Spark@65fc67fa{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
2018-04-29 17:56:49 INFO  SparkUI:54 - Stopped Spark web UI at http://10.0.2.15:4040
2018-04-29 17:56:49 INFO  MapOutputTrackerMasterEndpoint:54 - MapOutputTrackerMasterEndpoint stopped!
2018-04-29 17:56:49 INFO  MemoryStore:54 - MemoryStore cleared
2018-04-29 17:56:49 INFO  BlockManager:54 - BlockManager stopped
2018-04-29 17:56:49 INFO  BlockManagerMaster:54 - BlockManagerMaster stopped
2018-04-29 17:56:49 INFO  OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 - OutputCommitCoordinator stopped!
2018-04-29 17:56:49 INFO  SparkContext:54 - Successfully stopped SparkContext
2018-04-29 17:56:49 INFO  ShutdownHookManager:54 - Shutdown hook called
2018-04-29 17:56:49 INFO  ShutdownHookManager:54 - Deleting directory /tmp/spark-a4e5f638-8582-4976-a29f-2fcbc7f1a046
2018-04-29 17:56:49 INFO  ShutdownHookManager:54 - Deleting directory /tmp/spark-1e4f3443-b12e-427a-a0c5-dc46ed4dc113
2018-04-29 17:56:49 INFO  ShutdownHookManager:54 - Deleting directory /tmp/spark-a4e5f638-8582-4976-a29f-2fcbc7f1a046/pyspark-006ca0f1-db48-4f95-9f15-11856131e4dc
deepak@deepak-VirtualBox:~/spark/spark-2.3.0-bin-hadoop2.7/bin$ 

Congrats you have successfully executed your program from command line. If your job is long running job you can see the details by opening the url http://localhost:4040 in browser. It will show the in the following format in browser:

In this tutorial you learned to Run PySpark script from command line.

Check more tutorials at:  PySpark Tutorials - Learning PySpark from beginning.