In [1]:
import findspark
findspark.init('/Users/sofia/spark')

from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer,StopWordsRemover,Word2Vec,StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator

from pyspark.sql.functions import udf
from pyspark.sql.types import *

import re
import shutil

Load Data To Spark DataFrame

In [2]:
spark = SparkSession.builder.appName('SentimentClassifierCreationWithSparkML').getOrCreate()

'''
Unpacking Archive data file 
'''
shutil.unpack_archive('datasets/training.1600000.processed.noemoticon.csv.tar.gz', 'datasets')
print('Archive file unpacked successfully.')

df = spark.read.csv('datasets/training.1600000.processed.noemoticon.csv',inferSchema=True)

print('Total Number of records in df : ',df.count())
df = df.dropna()
df = df.dropDuplicates()
print('Total Number of records in df after deleting duplicate and null records : ',df.count())
Archive file unpacked successfully.
Total Number of records in df :  1600000
Total Number of records in df after deleting duplicate and null records :  1600000
In [3]:
df.show(5)
+---+----------+--------------------+--------+---------------+--------------------+
|_c0|       _c1|                 _c2|     _c3|            _c4|                 _c5|
+---+----------+--------------------+--------+---------------+--------------------+
|  0|2039777308|Thu Jun 04 22:46:...|NO_QUERY|        Abby_ox|  I want Miley to...|
|  0|1994140600|Mon Jun 01 11:28:...|NO_QUERY|     chillbabe7|            Exams!!!|
|  0|1676989809|Sat May 02 00:00:...|NO_QUERY|sarahgrieve2011| I wanna go home....|
|  0|1986022894|Sun May 31 18:12:...|NO_QUERY|       sambam87| I will not watch...|
|  0|1694344813|Mon May 04 02:11:...|NO_QUERY|     LeoVonRink|        No Followers|
+---+----------+--------------------+--------+---------------+--------------------+
only showing top 5 rows

In [4]:
df = df.withColumnRenamed('_c0','sentiment').withColumnRenamed('_c5','text')
df.select('text','sentiment').show(5)
+--------------------+---------+
|                text|sentiment|
+--------------------+---------+
|  I want Miley to...|        0|
|            Exams!!!|        0|
| I wanna go home....|        0|
| I will not watch...|        0|
|        No Followers|        0|
+--------------------+---------+
only showing top 5 rows

Clean and Prepare the Data

In [5]:
def removePattern(inputText, pattern):
    r = re.findall(pattern, inputText)
    for i in r:
        inputText = re.sub(i, '', inputText)        
    return inputText

def cleanTweet(txt):
    '''
    Remove Twitter Return Handles (RT @xxx:)
    '''
    txt = removePattern(txt, 'RT @[\w]*:')
    '''
    Remove Twitter Handles (@xxx)
    '''
    txt = removePattern(txt, '@[\w]*')
    '''
    Remove URL Links (httpxxx)
    '''
    txt = removePattern(txt, 'https?://[A-Za-z0-9./]*')
    '''
    Remove Special Characters, Numbers and Punctuations
    '''
    txt = re.sub('[^A-Za-z]+', ' ', txt)
    return txt
In [6]:
udfCleanTweet = udf(cleanTweet, StringType())
df=df.withColumn('cleanTweetText', udfCleanTweet('text'))
df.show(5)
+---------+----------+--------------------+--------+---------------+--------------------+--------------------+
|sentiment|       _c1|                 _c2|     _c3|            _c4|                text|      cleanTweetText|
+---------+----------+--------------------+--------+---------------+--------------------+--------------------+
|        0|2039777308|Thu Jun 04 22:46:...|NO_QUERY|        Abby_ox|  I want Miley to...| I want Miley to ...|
|        0|1994140600|Mon Jun 01 11:28:...|NO_QUERY|     chillbabe7|            Exams!!!|              Exams |
|        0|1676989809|Sat May 02 00:00:...|NO_QUERY|sarahgrieve2011| I wanna go home....| I wanna go home ...|
|        0|1986022894|Sun May 31 18:12:...|NO_QUERY|       sambam87| I will not watch...| I will not watch...|
|        0|1694344813|Mon May 04 02:11:...|NO_QUERY|     LeoVonRink|        No Followers|        No Followers|
+---------+----------+--------------------+--------+---------------+--------------------+--------------------+
only showing top 5 rows

In [7]:
df.select('sentiment').distinct().show()
+---------+
|sentiment|
+---------+
|        4|
|        0|
+---------+

In [8]:
df.groupby('sentiment').count().show()
+---------+------+
|sentiment| count|
+---------+------+
|        4|800000|
|        0|800000|
+---------+------+

In [9]:
def mapTarget(sentiment):
    return 1 if sentiment == 4 else sentiment
In [10]:
udfMapTarget = udf(mapTarget, IntegerType())
df = df.withColumn('target', udfMapTarget('sentiment'))
df.show(5)
+---------+----------+--------------------+--------+---------------+--------------------+--------------------+------+
|sentiment|       _c1|                 _c2|     _c3|            _c4|                text|      cleanTweetText|target|
+---------+----------+--------------------+--------+---------------+--------------------+--------------------+------+
|        0|2039777308|Thu Jun 04 22:46:...|NO_QUERY|        Abby_ox|  I want Miley to...| I want Miley to ...|     0|
|        0|1994140600|Mon Jun 01 11:28:...|NO_QUERY|     chillbabe7|            Exams!!!|              Exams |     0|
|        0|1676989809|Sat May 02 00:00:...|NO_QUERY|sarahgrieve2011| I wanna go home....| I wanna go home ...|     0|
|        0|1986022894|Sun May 31 18:12:...|NO_QUERY|       sambam87| I will not watch...| I will not watch...|     0|
|        0|1694344813|Mon May 04 02:11:...|NO_QUERY|     LeoVonRink|        No Followers|        No Followers|     0|
+---------+----------+--------------------+--------+---------------+--------------------+--------------------+------+
only showing top 5 rows

In [11]:
df.groupby('target').count().show()
+------+------+
|target| count|
+------+------+
|     1|800000|
|     0|800000|
+------+------+

In [12]:
df=df.select('text','cleanTweetText','target')
df.show(5)
+--------------------+--------------------+------+
|                text|      cleanTweetText|target|
+--------------------+--------------------+------+
|  I want Miley to...| I want Miley to ...|     0|
|            Exams!!!|              Exams |     0|
| I wanna go home....| I wanna go home ...|     0|
| I will not watch...| I will not watch...|     0|
|        No Followers|        No Followers|     0|
+--------------------+--------------------+------+
only showing top 5 rows

Train Test Split

In [13]:
dfTrain,dfTest = df.randomSplit([0.8,0.2])

Feature Transformations

In [14]:
tokenizer = Tokenizer(inputCol='cleanTweetText', outputCol='tokenTweet')
stopRemover = StopWordsRemover(inputCol='tokenTweet',outputCol='filteredTokens')
word2Vec = Word2Vec(vectorSize=100, minCount=5, inputCol='filteredTokens', outputCol='features')
labelStringIdx = StringIndexer(inputCol = 'target', outputCol = 'label')

Create the Model

In [15]:
lr = LogisticRegression(maxIter=100)

Create the Pipeline

In [16]:
dfPrepPipe = Pipeline(stages=[tokenizer,stopRemover,word2Vec,labelStringIdx,lr])

Training and Evaluation

In [17]:
pipelineFit = dfPrepPipe.fit(dfTrain)
In [18]:
predictions = pipelineFit.transform(dfTest)
In [19]:
predictions.printSchema()
root
 |-- text: string (nullable = true)
 |-- cleanTweetText: string (nullable = true)
 |-- target: integer (nullable = true)
 |-- tokenTweet: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filteredTokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- features: vector (nullable = true)
 |-- label: double (nullable = false)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)

In [20]:
predictions.select(['cleanTweetText','features','label','rawPrediction','probability','prediction']).show(5)
+--------------------+--------------------+-----+--------------------+--------------------+----------+
|      cleanTweetText|            features|label|       rawPrediction|         probability|prediction|
+--------------------+--------------------+-----+--------------------+--------------------+----------+
| I want Miley to ...|[0.01353734713047...|  0.0|[0.59144533590323...|[0.64369670410457...|       0.0|
|        love you lt |[-0.0808262992650...|  1.0|[-2.8843092921010...|[0.05293468369937...|       1.0|
|      Hello Twitter |[0.08169510029256...|  1.0|[-2.8707641781935...|[0.05361786209311...|       1.0|
| I wanna go home ...|[-0.0382282886033...|  0.0|[2.43213292335975...|[0.91924500906395...|       0.0|
|        No Followers|[0.16656325850635...|  0.0|[-2.9296898778204...|[0.05070525034315...|       1.0|
+--------------------+--------------------+-----+--------------------+--------------------+----------+
only showing top 5 rows

In [21]:
evaluator = BinaryClassificationEvaluator()
roc_accuracy=evaluator.evaluate(predictions)
print('ROC-Accuracy of model at predicting sentiment is: {:.4f}'.format(roc_accuracy))
ROC-Accuracy of model at predicting sentiment is: 0.8276
In [22]:
numberOfTestRecord = dfTest.agg({'target':'count'}).collect()[0]['count(target)']
accuracy = predictions.filter(predictions['label'] == predictions['prediction']).count()/numberOfTestRecord
print('Accuracy of model at predicting sentiment is: {:.4f}'.format(accuracy))
Accuracy of model at predicting sentiment is: 0.7479

Save the trained model

In [23]:
pipelineFit.save('W2VLogreg.model')