import findspark
findspark.init('/Users/sofia/spark')
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer,StopWordsRemover,Word2Vec,StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import udf
from pyspark.sql.types import *
import re
import shutil
Load Data To Spark DataFrame
spark = SparkSession.builder.appName('SentimentClassifierCreationWithSparkML').getOrCreate()
'''
Unpacking Archive data file
'''
shutil.unpack_archive('datasets/training.1600000.processed.noemoticon.csv.tar.gz', 'datasets')
print('Archive file unpacked successfully.')
df = spark.read.csv('datasets/training.1600000.processed.noemoticon.csv',inferSchema=True)
print('Total Number of records in df : ',df.count())
df = df.dropna()
df = df.dropDuplicates()
print('Total Number of records in df after deleting duplicate and null records : ',df.count())
df.show(5)
df = df.withColumnRenamed('_c0','sentiment').withColumnRenamed('_c5','text')
df.select('text','sentiment').show(5)
def removePattern(inputText, pattern):
r = re.findall(pattern, inputText)
for i in r:
inputText = re.sub(i, '', inputText)
return inputText
def cleanTweet(txt):
'''
Remove Twitter Return Handles (RT @xxx:)
'''
txt = removePattern(txt, 'RT @[\w]*:')
'''
Remove Twitter Handles (@xxx)
'''
txt = removePattern(txt, '@[\w]*')
'''
Remove URL Links (httpxxx)
'''
txt = removePattern(txt, 'https?://[A-Za-z0-9./]*')
'''
Remove Special Characters, Numbers and Punctuations
'''
txt = re.sub('[^A-Za-z]+', ' ', txt)
return txt
udfCleanTweet = udf(cleanTweet, StringType())
df=df.withColumn('cleanTweetText', udfCleanTweet('text'))
df.show(5)
df.select('sentiment').distinct().show()
df.groupby('sentiment').count().show()
def mapTarget(sentiment):
return 1 if sentiment == 4 else sentiment
udfMapTarget = udf(mapTarget, IntegerType())
df = df.withColumn('target', udfMapTarget('sentiment'))
df.show(5)
df.groupby('target').count().show()
df=df.select('text','cleanTweetText','target')
df.show(5)
Train Test Split
dfTrain,dfTest = df.randomSplit([0.8,0.2])
Feature Transformations
tokenizer = Tokenizer(inputCol='cleanTweetText', outputCol='tokenTweet')
stopRemover = StopWordsRemover(inputCol='tokenTweet',outputCol='filteredTokens')
word2Vec = Word2Vec(vectorSize=100, minCount=5, inputCol='filteredTokens', outputCol='features')
labelStringIdx = StringIndexer(inputCol = 'target', outputCol = 'label')
Create the Model
lr = LogisticRegression(maxIter=100)
Create the Pipeline
dfPrepPipe = Pipeline(stages=[tokenizer,stopRemover,word2Vec,labelStringIdx,lr])
Training and Evaluation
pipelineFit = dfPrepPipe.fit(dfTrain)
predictions = pipelineFit.transform(dfTest)
predictions.printSchema()
predictions.select(['cleanTweetText','features','label','rawPrediction','probability','prediction']).show(5)
evaluator = BinaryClassificationEvaluator()
roc_accuracy=evaluator.evaluate(predictions)
print('ROC-Accuracy of model at predicting sentiment is: {:.4f}'.format(roc_accuracy))
numberOfTestRecord = dfTest.agg({'target':'count'}).collect()[0]['count(target)']
accuracy = predictions.filter(predictions['label'] == predictions['prediction']).count()/numberOfTestRecord
print('Accuracy of model at predicting sentiment is: {:.4f}'.format(accuracy))
Save the trained model
pipelineFit.save('W2VLogreg.model')