top of page

Pyspark example to process CSV file

Updated: Jun 30, 2020

Pyspark is combination of 2 part:

  1. Spark

  2. Python

Spark is open source distributed processing framework which do in-memory processing which makes it 10 times faster then existing processing framework Hadoop. Spark is written in Scala programming language so to provide development support in python programming they provided API which makes it easier for python developer to write spark code in Python.


Let see the architecture, in which we will process CSV files in Pyspark code and will list down what kind of possible challenge can come during development. We will highlight some of the data issue as well which can help you if you are also trying it yourself.




Technology stack for this development:

  1. Apache Hadoop 2.7.0

  2. Apache Spark 2.3

  3. Apache Hive 1.1.1

  4. Python 2.7

  5. Hive Database=test

  6. Hive Table name=hitab


from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

spark = SparkSession.builder.appName('AnyDataFlow').getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)

df1=spark.read.option("header", "false").option("quote","\"").option("escape","\\").option("multiline",True).csv("/mnt/data/abc.csv")
 
df1.printSchema()

df.show(10,False)

df1.createOrReplaceTempView("tmp1")

sqlContext.sql("insert into test.hitab select * from tmp1").show()

sqlContext.sql("select * from test.hitab").show(10,False)
   

In this code we are creating spark session first and then sc & sqlContext. If you are trying thing on Pyspark console then ignore those 3 lines.

In spark.read.option I kept header false since in my data header not coming in file, if you receive header in file then please make that option "True".

I used escape character to handle multi line in rows for complex data type like map,array.. also to handle JSON string as well,


save above code in test.py file and then run it on hadoop spark cluster like:


spark-submit --master yarn --deploy-mode cluster test.py


Points to keep in mind while developing Pyspark app, main if you are new if python:

  1. indentation error is very common so keep mind on space & new line if required. if you get error means you have used some extra space/line.

  2. In .show(), true & false keyword have first latter in capital and rest small.

Let me tell you some issue of data which mostly comes in CSV format:


  1. new line character in string

  2. complex data type like map(k,v) come in value, for single record's we may get multiple rows. To handle it we used "multiline=True"

  3. multi line JSON come is value, So in this case for single record we may get multiple rows.

In next we are going to cover 1. how to write function in python in use in Pyspark code.

2. Write the same in SCALA

3. Cassandra data export using spark.


subscribe for more interesting post. Also let us know if we can help in anything. Thanks

 
 
 

Comments


bottom of page