import findspark
findspark.init('/Users/sofia/spark')
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import format_number as fmt
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.ml.feature import Tokenizer,StopWordsRemover,Word2Vec
from pyspark.ml import PipelineModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import re
import numpy as np
import pandas as pd
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
import nltk
import folium
from os import path, getcwd
from PIL import Image
from wordcloud import WordCloud, STOPWORDS
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline
Loading Twitter data stored in MongoDB and collected via Twitter Data API to Spark Sql DataFrame
spark = SparkSession \
.builder \
.appName('myApp') \
.config('spark.mongodb.input.uri', 'mongodb://127.0.0.1/twitterdb.twitter_search') \
.config('spark.mongodb.input.twitter', 'twitter_search') \
.config('spark.mongodb.output.uri', 'mongodb://127.0.0.1/twitterdb.twitter_search') \
.config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.4.1')\
.getOrCreate()
df = spark.read.format('mongo').option('uri', 'mongodb://127.0.0.1/twitterdb.twitter_search').load()
Required Funtions to be used in Spark udf for data processing
def removePattern(inputText, pattern):
r = re.findall(pattern, inputText)
for i in r:
inputText = re.sub(i, '', inputText)
return inputText
def cleanTweet(txt):
'''
Remove twitter return handles (RT @xxx:)
'''
txt = removePattern(txt, 'RT @[\w]*:')
'''
Remove twitter handles (@xxx)
'''
txt = removePattern(txt, '@[\w]*')
'''
Remove URL links (httpxxx)
'''
txt = removePattern(txt, 'https?://[A-Za-z0-9./]*')
'''
Remove special characters, numbers, punctuations
'''
txt = re.sub('[^A-Za-z]+', ' ', txt)
return txt
def getCleanTweetText(filteredTweetText):
return ' '.join(filteredTweetText)
def getSentimentScore(tweetText):
analyzer = SentimentIntensityAnalyzer()
vs = analyzer.polarity_scores(tweetText)
return float(vs['compound'])
def getSentiment(score):
return 1 if score > 0 else 0
def getTweetArray(tweet):
return tweet.split(' ')
Cleanup and Preparing Data For Performing Vader Sentiment Analysis
udfCleanTweet = udf(cleanTweet, StringType())
dfCleanTweet=df.withColumn('cleanTweetText', udfCleanTweet('text'))
dfCleanTweet.select('text','cleanTweetText').show(5)
tokenizer = Tokenizer(inputCol='cleanTweetText', outputCol='words')
dfCleanTweetTokenized = tokenizer.transform(dfCleanTweet)
dfCleanTweetTokenized.select('text','cleanTweetText','words').show(5)
remover = StopWordsRemover(inputCol='words', outputCol='filteredTweetText')
dfStopwordRemoved=remover.transform(dfCleanTweetTokenized)
dfStopwordRemoved.select('text','cleanTweetText','words','filteredTweetText').show(5)
udfCleanTweetText = udf(getCleanTweetText, StringType())
dfFilteredCleanedTweet = dfStopwordRemoved.withColumn('filteredCleanedTweetText', udfCleanTweetText('filteredTweetText'))
dfFilteredCleanedTweet.select('filteredCleanedTweetText').show(5)
udfSentimentScore = udf(getSentimentScore, FloatType())
dfSentimentScore = dfFilteredCleanedTweet.withColumn('sentimentScore', udfSentimentScore('filteredCleanedTweetText'))
dfSentimentScore.select('filteredCleanedTweetText','sentimentScore').show(5)
udfSentiment = udf(getSentiment, IntegerType())
dfSentiment = dfSentimentScore.withColumn('sentiment', udfSentiment('sentimentScore'))
dfSentiment.select('filteredCleanedTweetText','sentimentScore','sentiment').show(5)
dfSentiment.groupBy('sentiment').count().show()
dfPlotVaderSentiment=dfSentiment.groupBy('sentiment').count().toPandas()
dfPlotVaderSentiment
Visualization For Twitter Sentiment Analyzed By Vader Sentiment Analysis
sentimentsList=['Negative','Positive']
sns.set_style('darkgrid')
sns.set_palette('rainbow')
plt.figure(figsize=(10,8))
sns.barplot(x='sentiment', y='count',data=dfPlotVaderSentiment, alpha=0.7)
plt.title('Number of occurrences for each type of Sentiment',fontsize=18)
plt.xticks(np.arange(2),sentimentsList)
plt.setp(plt.gca().get_xticklabels(), fontsize=14)
plt.xlabel('Sentiment',fontsize=16)
plt.ylabel('Number of occurrences',fontsize=16)
plt.show()
labels = ['Positive' if sentiment==1 else 'Negative' for sentiment in dfPlotVaderSentiment['sentiment'].tolist() ]
sizes = dfPlotVaderSentiment['count'].tolist()
explode = (0.05,0.05)
colors = ['#66b3ff','#99ff99']
plt.pie(dfPlotVaderSentiment['count'], labels=labels, colors=colors, shadow=True, autopct='%1.1f%%'
,explode=explode, pctdistance=0.85)
'''
draw circle
'''
centre_circle = plt.Circle((0,0),0.70,fc='white')
fig = plt.gcf()
fig.gca().add_artist(centre_circle)
plt.axis('equal')
plt.tight_layout()
plt.title('Percentage distribution of each type of sentiment in all Tweets',fontsize=16)
plt.legend(loc='upper right', labels=labels)
plt.show()
Visualization with WordCloud
Extracting words from Spark DataFrame to create wordcloud visualization
dfSentiment.select('filteredCleanedTweetText','sentimentScore','sentiment').show(5)
filteredCleanedTweetTextRddList = dfSentiment.select('filteredCleanedTweetText').collect()
filteredCleanedTweetTextList = [row.filteredCleanedTweetText for row in filteredCleanedTweetTextRddList]
wordList = []
for filteredCleanedTweetText in filteredCleanedTweetTextList:
wordList.append(filteredCleanedTweetText.split(' '))
allTweetWords = [word for subList in wordList for word in subList]
'''
Remove empty strings
'''
allTweetWords = list(filter(None, allTweetWords))
allTweetWords=set(allTweetWords)
frequencyDistribution = nltk.FreqDist(allTweetWords)
sorted(frequencyDistribution,key=frequencyDistribution.__getitem__, reverse=True)[0:100]
directory = getcwd()
mask = np.array(Image.open(path.join(directory, 'img/twitter.png')))
wordCloud = WordCloud(background_color='white', mask=mask).generate_from_frequencies(frequencyDistribution)
plt.figure(figsize=(15,10))
plt.imshow(wordCloud, interpolation='bilinear')
plt.axis('off')
plt.show()
def getWordCloud(wordList,color):
stopWords = set(STOPWORDS)
allWords = ' '.join([word for word in wordList])
wordCloud = WordCloud(background_color=color,
stopwords=stopWords,
width=1600,
height=800,
random_state=21,
max_words=50,
max_font_size=200).generate(allWords)
plt.figure(figsize=(15, 10))
plt.axis('off')
plt.imshow(wordCloud, interpolation='bilinear');
getWordCloud(allTweetWords,'black')
Word2Vec - Exploring the data a little; analyzing how Word2Vec similar words behave for Tweets
udfTweetArray = udf(getTweetArray, ArrayType(StringType()))
dfWord2Vec = dfSentiment.withColumn('filteredCleanedTweetArrayForW2V', udfTweetArray('filteredCleanedTweetText'))
dfWord2Vec.select('filteredCleanedTweetText','filteredCleanedTweetArrayForW2V').show(5)
word2Vec = Word2Vec(vectorSize=100, minCount=5, inputCol='filteredCleanedTweetArrayForW2V', outputCol='wrdVector')
modelW2V = word2Vec.fit(dfWord2Vec)
wrdVec = modelW2V.transform(dfWord2Vec)
wrdVec.select('wrdVector').show(5, truncate = True)
dfWordVectors = modelW2V.getVectors()
dfWordVectors.show(5)
Let's explore the data a little
topN=10
dfSynonyms = modelW2V.findSynonyms('btc', topN).toPandas()
dfSynonyms[['word']].head(topN)
dfSynonyms = modelW2V.findSynonyms('bitcoin', topN).toPandas()
dfSynonyms[['word']].head(topN)
dfSimilarity=modelW2V.findSynonyms('litecoin', topN).select('word', fmt('similarity', 5).alias('similarity')).toPandas()
dfSimilarity.head(topN)
dfSimilarity=modelW2V.findSynonyms('blockchain', topN).select('word', fmt('similarity', 5).alias('similarity')).toPandas()
dfSimilarity.head(topN)
dfSentiment = dfSentiment.withColumnRenamed('sentiment','target')
dfSentimnetAnalysisW2VLogreg=dfSentiment.select('text','cleanTweetText','target')
dfSentimnetAnalysisW2VLogreg.show(5)
w2vLogreg = PipelineModel.load('W2VLogreg.model')
predictions = w2vLogreg.transform(dfSentimnetAnalysisW2VLogreg)
predictions.printSchema()
evaluator = BinaryClassificationEvaluator()
roc_accuracy=evaluator.evaluate(predictions)
print('ROC-Accuracy of logistic regression model with word2vec word embedding at predicting vader sentiment is: {:.4f}'.format(roc_accuracy))
Perfromance comparision of my own classifiers and Vader Sentiment Analysis
sample1 = dfSentimnetAnalysisW2VLogreg.sample(False, 0.1, 101)
sample2 = dfSentimnetAnalysisW2VLogreg.sample(False, 0.2, 101)
sample3 = dfSentimnetAnalysisW2VLogreg.sample(False, 0.3, 101)
predictions1 = w2vLogreg.transform(sample1)
predictions2 = w2vLogreg.transform(sample2)
predictions3 = w2vLogreg.transform(sample3)
numberOfRecInSample1 = sample1.agg({'target':'count'}).collect()[0]['count(target)']
numberOfRecInSample2 = sample2.agg({'target':'count'}).collect()[0]['count(target)']
numberOfRecInSample3 = sample3.agg({'target':'count'}).collect()[0]['count(target)']
numOfRecInSamplesList=[numberOfRecInSample1,numberOfRecInSample2,numberOfRecInSample3]
print(numOfRecInSamplesList)
matchedPredictionsPercent1 = ((predictions1.filter(predictions1['label'] == predictions1['prediction']).count())/numberOfRecInSample1)*100
matchedPredictionsPercent2 = ((predictions2.filter(predictions2['label'] == predictions2['prediction']).count())/numberOfRecInSample2)*100
matchedPredictionsPercent3 = ((predictions3.filter(predictions3['label'] == predictions3['prediction']).count())/numberOfRecInSample3)*100
predMatchWithVSList = [matchedPredictionsPercent1,matchedPredictionsPercent2,matchedPredictionsPercent3]
print('predMatchWithVSList : ',predMatchWithVSList)
sns.set_palette('coolwarm')
sns.set_style('darkgrid')
xPos = np.arange(len(numOfRecInSamplesList))
plt.figure(figsize=(10,8))
plt.bar(xPos, predMatchWithVSList, align='center', alpha=0.5)
plt.title('Agreement percent my classifier using Logistic Regression with Word2Vec Word Embedding & Vader Sentiment',fontsize=18)
plt.xticks(xPos,numOfRecInSamplesList)
plt.setp(plt.gca().get_xticklabels(), fontsize=14)
plt.xlabel('Number Of Records In Samples',fontsize=16)
plt.ylabel('Matched Predictions Percentage (%)',fontsize=16)
plt.show()
Determining the distribution of tweets across Countries (geographies)
dfSentiment.where(dfSentiment.place.isNotNull()).select('place').toPandas()['place'].tolist()[:5]
def getCountry(place):
if place:
return place[6]
else:
return 'None'
udfCountry = udf(getCountry, StringType())
dfCountry = dfSentiment.withColumn('country', udfCountry('place'))
dfCountryWiseTweetCount = dfCountry.groupBy('country')\
.agg(F.count('text').alias('numberOfTweets'))\
.orderBy('numberOfTweets', ascending=False)
dfCountryWiseTweetCount.cache()
dfCountryWiseTweetCount=dfCountryWiseTweetCount.filter(~dfCountryWiseTweetCount['country'].isin(['None']))
dfCountryWiseTweetCount.show(5)
dfPlotCountryWiseTweetCount = dfCountryWiseTweetCount.toPandas()
numberOfPlottedCountries = 10
countries = dfPlotCountryWiseTweetCount['country'][:numberOfPlottedCountries].tolist()
numberOfTweets = dfPlotCountryWiseTweetCount['numberOfTweets'][:numberOfPlottedCountries].tolist()
yPos = np.arange(len(countries))
colors = np.repeat('skyblue', numberOfPlottedCountries - 1).tolist()
colors = ['darkgreen'] + colors
plt.figure(figsize=(10,8))
plt.barh(yPos, numberOfTweets, align='center', color=colors, alpha=.7)
plt.title('Bitcoin Related Tweets Distribution in Country',fontsize=18)
plt.yticks(yPos, countries)
plt.setp(plt.gca().get_yticklabels(), fontsize=14)
plt.xlabel('Number of Tweets',fontsize=16)
plt.ylabel('Countries',fontsize=16)
plt.ylim(-1, len(yPos))
plt.show()
Geo-distribution of tweets across the globe about bitcoin
sumOfTweets = dfCountryWiseTweetCount.agg({'numberOfTweets':'sum'}).collect()[0]
totalNumberOfTweets = sumOfTweets['sum(numberOfTweets)']
dfCountryWiseTweetPercent=dfCountryWiseTweetCount.withColumn('percentage', (dfCountryWiseTweetCount['numberOfTweets']/totalNumberOfTweets)*100)
dfCountryWiseTweetPercent.select('country','percentage').show(10)
def getFormatedCountryName(country):
if country=='United States':
return 'United States of America'
elif country=='Republic of the Philippines':
return 'Philippines'
elif country=='Việt Nam':
return 'Viet Nam'
elif country=='Islamic Republic of Iran':
return 'Iran'
else:
return country
udfFormatedCountryName = udf(getFormatedCountryName, StringType())
dfCountryWiseTweetPercent = dfCountryWiseTweetPercent.withColumn('countryName',
udfFormatedCountryName('country'))
dfCountryWiseTweetPercent.show(10)
dfPlotCountryWiseTweetPercent = dfCountryWiseTweetPercent.select('countryName','percentage').toPandas()
dfPlotCountryWiseTweetPercent.head(5)
import warnings
warnings.filterwarnings("ignore", category=FutureWarning)
'''
Country coordinates for plotting
'''
countryGeo = path.join('world-countries.json')
foliumMap = folium.Map(location=[0,0], zoom_start=2)
foliumMap.choropleth(geo_data=countryGeo,
name='choropleth',
data=dfPlotCountryWiseTweetPercent,
columns=['countryName', 'percentage'],
key_on='feature.properties.name',
fill_color='YlGn',
fill_opacity=0.7,
line_opacity=0.5,
legend_name='Bitcoin related Tweet Rate(%)',
reset=True
)
folium.LayerControl().add_to(foliumMap)
foliumMap