import findspark

from pyspark.sql import SparkSession
from import Tokenizer,StopWordsRemover,Word2Vec,StringIndexer
from import Pipeline
from import LogisticRegression
from import BinaryClassificationEvaluator

from pyspark.sql.functions import udf
from pyspark.sql.types import *

import re
import shutil

Load Data To Spark DataFrame

spark = SparkSession.builder.appName('SentimentClassifierCreationWithSparkML').getOrCreate()

Unpacking Archive data file 
shutil.unpack_archive('datasets/training.1600000.processed.noemoticon.csv.tar.gz', 'datasets')
print('Archive file unpacked successfully.')

df ='datasets/training.1600000.processed.noemoticon.csv',inferSchema=True)

print('Total Number of records in df : ',df.count())
df = df.dropna()
df = df.dropDuplicates()
print('Total Number of records in df after deleting duplicate and null records : ',df.count())
Archive file unpacked successfully.
Total Number of records in df :  1600000
Total Number of records in df after deleting duplicate and null records :  1600000
|_c0|       _c1|                 _c2|     _c3|            _c4|                 _c5|
|  0|2039777308|Thu Jun 04 22:46:...|NO_QUERY|        Abby_ox|  I want Miley to...|
|  0|1994140600|Mon Jun 01 11:28:...|NO_QUERY|     chillbabe7|            Exams!!!|
|  0|1676989809|Sat May 02 00:00:...|NO_QUERY|sarahgrieve2011| I wanna go home....|
|  0|1986022894|Sun May 31 18:12:...|NO_QUERY|       sambam87| I will not watch...|
|  0|1694344813|Mon May 04 02:11:...|NO_QUERY|     LeoVonRink|        No Followers|
only showing top 5 rows

df = df.withColumnRenamed('_c0','sentiment').withColumnRenamed('_c5','text')'text','sentiment').show(5)
|                text|sentiment|
|  I want Miley to...|        0|
|            Exams!!!|        0|
| I wanna go home....|        0|
| I will not watch...|        0|
|        No Followers|        0|
only showing top 5 rows

Clean and Prepare the Data

def removePattern(inputText, pattern):
    r = re.findall(pattern, inputText)
    for i in r:
        inputText = re.sub(i, '', inputText)        
    return inputText

def cleanTweet(txt):
    Remove Twitter Return Handles (RT @xxx:)
    txt = removePattern(txt, 'RT @[\w]*:')
    Remove Twitter Handles (@xxx)
    txt = removePattern(txt, '@[\w]*')
    Remove URL Links (httpxxx)
    txt = removePattern(txt, 'https?://[A-Za-z0-9./]*')
    Remove Special Characters, Numbers and Punctuations
    txt = re.sub('[^A-Za-z]+', ' ', txt)
    return txt
udfCleanTweet = udf(cleanTweet, StringType())
df=df.withColumn('cleanTweetText', udfCleanTweet('text'))
|sentiment|       _c1|                 _c2|     _c3|            _c4|                text|      cleanTweetText|
|        0|2039777308|Thu Jun 04 22:46:...|NO_QUERY|        Abby_ox|  I want Miley to...| I want Miley to ...|
|        0|1994140600|Mon Jun 01 11:28:...|NO_QUERY|     chillbabe7|            Exams!!!|              Exams |
|        0|1676989809|Sat May 02 00:00:...|NO_QUERY|sarahgrieve2011| I wanna go home....| I wanna go home ...|
|        0|1986022894|Sun May 31 18:12:...|NO_QUERY|       sambam87| I will not watch...| I will not watch...|
|        0|1694344813|Mon May 04 02:11:...|NO_QUERY|     LeoVonRink|        No Followers|        No Followers|
only showing top 5 rows

In [7]:'sentiment').distinct().show()
|        4|
|        0|

|sentiment| count|
|        4|800000|
|        0|800000|

def mapTarget(sentiment):
    return 1 if sentiment == 4 else sentiment
udfMapTarget = udf(mapTarget, IntegerType())
df = df.withColumn('target', udfMapTarget('sentiment'))
|sentiment|       _c1|                 _c2|     _c3|            _c4|                text|      cleanTweetText|target|
|        0|2039777308|Thu Jun 04 22:46:...|NO_QUERY|        Abby_ox|  I want Miley to...| I want Miley to ...|     0|
|        0|1994140600|Mon Jun 01 11:28:...|NO_QUERY|     chillbabe7|            Exams!!!|              Exams |     0|
|        0|1676989809|Sat May 02 00:00:...|NO_QUERY|sarahgrieve2011| I wanna go home....| I wanna go home ...|     0|
|        0|1986022894|Sun May 31 18:12:...|NO_QUERY|       sambam87| I will not watch...| I will not watch...|     0|
|        0|1694344813|Mon May 04 02:11:...|NO_QUERY|     LeoVonRink|        No Followers|        No Followers|     0|
only showing top 5 rows

|target| count|
|     1|800000|
|     0|800000|

In [12]:'text','cleanTweetText','target')
|                text|      cleanTweetText|target|
|  I want Miley to...| I want Miley to ...|     0|
|            Exams!!!|              Exams |     0|
| I wanna go home....| I wanna go home ...|     0|
| I will not watch...| I will not watch...|     0|
|        No Followers|        No Followers|     0|
only showing top 5 rows

Train Test Split

dfTrain,dfTest = df.randomSplit([0.8,0.2])

Feature Transformations

tokenizer = Tokenizer(inputCol='cleanTweetText', outputCol='tokenTweet')
stopRemover = StopWordsRemover(inputCol='tokenTweet',outputCol='filteredTokens')
word2Vec = Word2Vec(vectorSize=100, minCount=5, inputCol='filteredTokens', outputCol='features')
labelStringIdx = StringIndexer(inputCol = 'target', outputCol = 'label')

Create the Model

lr = LogisticRegression(maxIter=100)

Create the Pipeline

dfPrepPipe = Pipeline(stages=[tokenizer,stopRemover,word2Vec,labelStringIdx,lr])

Training and Evaluation

pipelineFit =
predictions = pipelineFit.transform(dfTest)
 |-- text: string (nullable = true)
 |-- cleanTweetText: string (nullable = true)
 |-- target: integer (nullable = true)
 |-- tokenTweet: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filteredTokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- features: vector (nullable = true)
 |-- label: double (nullable = false)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

In [20]:['cleanTweetText','features','label','rawPrediction','probability','prediction']).show(5)
|      cleanTweetText|            features|label|       rawPrediction|         probability|prediction|
| I want Miley to ...|[0.01353734713047...|  0.0|[0.59144533590323...|[0.64369670410457...|       0.0|
|        love you lt |[-0.0808262992650...|  1.0|[-2.8843092921010...|[0.05293468369937...|       1.0|
|      Hello Twitter |[0.08169510029256...|  1.0|[-2.8707641781935...|[0.05361786209311...|       1.0|
| I wanna go home ...|[-0.0382282886033...|  0.0|[2.43213292335975...|[0.91924500906395...|       0.0|
|        No Followers|[0.16656325850635...|  0.0|[-2.9296898778204...|[0.05070525034315...|       1.0|
only showing top 5 rows

evaluator = BinaryClassificationEvaluator()
print('ROC-Accuracy of model at predicting sentiment is: {:.4f}'.format(roc_accuracy))
ROC-Accuracy of model at predicting sentiment is: 0.8276
numberOfTestRecord = dfTest.agg({'target':'count'}).collect()[0]['count(target)']
accuracy = predictions.filter(predictions['label'] == predictions['prediction']).count()/numberOfTestRecord
print('Accuracy of model at predicting sentiment is: {:.4f}'.format(accuracy))
Accuracy of model at predicting sentiment is: 0.7479

Save the trained model

In [23]:'W2VLogreg.model')