GCP Üzerinde Covid-19 Etki ve Duygu Analizi
Merhaba, son günlerde dünyayı etkisi altına alan Covid-19 salgını hepimizin hayatını etkilemiş durumda. Bu konuyla ilgili bir çok paylaşım yapıldı. Bizde bu paylaşımlara kayıtsız kalmayıp hem teknolojik olarak kendimizi geliştirmek, hemde pandemiyle alakalı bakış açımızı sizlerle paylaşmak istedik.
Projeyi genel hatlarıyla özetleyecek olursak, twitter üzerinden real time gelen veriler ile ve Google Cloud servisleriyle bir takım analizler çıkardık. Projenin tüm detaylarını sizlerle paylaşacağız. Öncelikle projenin detaylarına geçmeden önce şunu belirtmekte fayda var. Bu çalışmanın amacı yeni teknolojiler öğrenmek ve güncel bir konu üzerine kendi yorumlarımızı katmaktır. Fikirleriniz ve eleştirileriniz bizim için çok önemli.
Neden Google Cloud?
Öncelikle projeyi on-premise bir ortamda yapmak yerine cloud ortamında yapmaya karar verdik. Biraz araştırma yaptıktan sonra belirlediğimiz kriterlere en uygun hizmet sağlayıcısı olarak Google’u tercih ettik.
Mimari
Aşağıdaki kod bloğunda belirlediğimiz anahtar kelimelerle twitter üzerinden ihtiyacımız olan alanları real time olarak elde ediyoruz.
https://gist.github.com/730dfc7631058e95a9ff53e5ab38ac32
import tweepy
from tweepy import StreamListener
import datetime
import json
import time
from google.cloud import pubsub
from datetime import datetime
import settings
from google.cloud import language
from google.cloud.language import enums, types
from googletrans import Translator
import re
from http.client import IncompleteRead
from urllib3.exceptions import IncompleteRead as urllib3_incompleteRead
from http.client import IncompleteRead
import emoji
from urllib3.exceptions import ProtocolError
class TweetStreamListener(StreamListener):
client = pubsub.PublisherClient(credentials=cred)
topic_path = client.topic_path(settings.PROJECT_NAME, settings.PUBSUB_TOPIC_NAME)
count = 0
tweets = []
batch_size = 1
total_tweets =1000000
def write_to_pubsub(self, text,loc,time_tweet,emotional):
publish(self.client, self.topic_path, text, loc,time_tweet,emotional)
def on_status(self, status):
if not status.user.location:
return
tweet_data = json.dumps(status._json)
tweet_data=json.loads(tweet_data)
if "extended_tweet" in tweet_data:
if 'RT' not in tweet_data['extended_tweet']['full_text']:
text = tweet_data['extended_tweet']['full_text']
loc = tweet_data['user']['location']
text = text.replace("\n","")
time_tweet=datetime.now().strftime("%d/%m/%Y %H:%M:%S")
emotional=analyze_text_sentiment(text)
if len(text) >= self.batch_size:
self.write_to_pubsub(text,loc,time_tweet,emotional)
self.count += 1
if (self.count % 50) == 0:
print("count is: {}".format(self.count))
if self.count >= self.total_tweets:
return False
return True
def publish_twitter_stream():
auth = tweepy.OAuthHandler(settings.TWITTER_APP_KEY, settings.TWITTER_APP_SECRET)
auth.set_access_token(settings.TWITTER_KEY, settings.TWITTER_SECRET)
api = tweepy.API(auth)
stream_listener = TweetStreamListener()
stream = tweepy.Stream(auth=api.auth, listener=stream_listener,tweet_mode='extended')
tags = ['evdekal','stayhome','stay home','covid',"corona","coronavirus","korona",
"covid-19","Covid19","Covid-19","Corona Virus","pandemic","pandemi","COVID-19"]
while True:
try:
stream.filter(track=tags,stall_warnings=True)
except (ProtocolError, AttributeError):
continue
if __name__ == '__main__':
publish_twitter_stream()
Natural Language Api: Natural Language API’nin güçlü, önceden eğitilmiş modelleri, geliştiricilerin yaklaşım analizi, varlık analizi, varlık yaklaşım analizi, içerik sınıflandırması ve söz dizimi analizi gibi doğal dil anlama özellikleriyle çalışmalarına olanak tanır. (kaynak: https://cloud.google.com/natural-language )
Bir çok farklı dilden atılmış tweetler’e sahip olduğumuzdan dolayı, verinin hangi dile ait olduğunu öğrenmek için translator api’nı kullanıyoruz. Bu sayede duygu analizi için çoklu dil desteğinden yararlanıyoruz. Daha sonra elde edilen verileri Natural Language Api’na gönderiyoruz. Dönen sonuca göre tweet’in duygu durumu belirliyoruz.
def analyze_text_sentiment(tweet):
cl = language.LanguageServiceClient(credentials=cred)
translator = Translator()
tweet = re.sub(r"http\S+", "", tweet)
tweet=" ".join(filter(lambda x:x[0]!='#' and x[0]!='@' , tweet.split()))
tweet=re.sub(r'[^\w]', ' ',str(tweet))
tweet=re.sub('@[^\s]+',' ',str(tweet))
print(tweet)
try:
res_tr=translator.translate(json.dumps(tweet))
lang=res_tr.src
time.sleep(1)
except Exception as e:
print(str(e))
pass
try:
document = types.Document(
content=text,
language=lang,
type=enums.Document.Type.PLAIN_TEXT)response = cl.analyze_sentiment(document=document)
except:
document = types.Document(
content=tweet,
language="en",
type=enums.Document.Type.PLAIN_TEXT)response = cl.analyze_sentiment(document=document)
sentiment = response.document_sentiment
if sentiment.score<(-0.5):
emotion="Negative"
elif -0.5<sentiment.score<0.5:
emotion="Nötr"
else:
emotion="Positive"
return emotion
Pub/Sub:
Pub / Sub, birbirinden bağımsız uygulamalar arasında mesaj göndermenizi ve almanızı sağlayan, gerçek zamanlı bir mesajlaşma servisidir. Kafka, RabbitMQ vb. servisler gibi düşünülebilir.
Topic oluşturma:
(CONSOLE BAR->PUB/SUB->TOPICS->CREATE TOPICS)
Yukarıdaki adıma geldikten sonra, bir isim vererek topic oluşturuyoruz.
Subscription oluşturma:
(CONSOLE BAR->PUB/SUB->Subscription->CREATE Subscription)
Gerekli parametreleri doldurduktan sonra Subscription’ı oluşturuyoruz.
https://cloud.google.com/pubsub/docs/subscriber?hl=vi-vn
Api üzerinden elde edilen veriyi pub/sub’a gönderiyoruz.
https://gist.github.com/5a60ac7168966ad00c7428c558b2a0a6
def publish(client, topic_path, text, loc,time_tweet,emotional):
message_data = {
"tweet":text,
"location": loc,
"time":time_tweet,
"emotional":emotional
}
message_data = json.dumps(message_data)
message_data = message_data.encode('utf-8')
response = client.publish(topic_path, message_data , origin='python-sample')
Dataflow:
Dataflow, otomatik ölçeklendirme ve toplu işlem aracılığıyla gecikme, işleme süresi ve maliyeti en aza indiren bir tümüyle yönetilen akış analizi hizmetidir. (kaynak https://cloud.google.com/dataflow)
Pub/Sub’a real time akan veriyi alabilmek için, Dataflow üzerinde bir akış kurguluyoruz.
(CONSOLE BAR->DATAFLOW->CREATE JOB FROM TEMPLATE)
Gerekli parametreleri doğru bir şekilde girdikten sonra akış çalışabilir hale gelmektedir. Burada dikkat edilmesi gereken konu, oluşturduğunuz template türü, subscription adı, tablo dizini ve location’ın doğru verilmiş olması.
Not: Temprory location kısmı için Console Bar -> Storage -> Create Bucket -> diyip bir isim vererek bir dosya yolu oluşturmanız gerekiyor.
Yukarıda kurgulamış olduğumuz akış Subscription’ı dinleyip, veriyi BigQuery’e göndermektedir.
BigQuery
BigQuery: ANSI SQL kullanarak, gigabayt ve petabaytlarca veriyi ek operasyon yükü olmadan yüksek hızla analiz etmenizi sağlar. Detaylı bilgi için: https://cloud.google.com/bigquery
BigQuery servisine girdikten sonra, ilk olarak dataset oluşturuyoruz. Ardından oluşturulan dataset’e tıklayarak Create Table
diyoruz.
Daha sonra ihtiyacımız olan tablo ismi, schema vb özellikleri girerek tabloyu oluşturuyoruz.
Data Studio:
Görselleştirme aracı olarak Google Cloud ekosisteminde bulunan Data Studio’u tercih ettik. Verinin uçtan uca yolculuğunda, elde ettiğimiz çıkarımları son kullanıcıyıla paylaşmak için Data Studio’u kullandık.
Data Stduio -> Oluştur ->Veri Kaynağı -> BigQuery
Data studio veri kaynağı için, mimarimizin bir önceki adımı olan bigquery’i seçtik. Data Studio’da raporlar real time olarak güncellenmektedir bu nedenle sayıların anlık olarak artığı gözlemlenebilmektedir.
Toplamda 100.000 e yakın tweet çekildi ve yaklaşık olarak 35.000 farklı noktadan veri alındı. Aldığımız konum verilerini ülke bazlı sınıflandırdık.
Aşağıdaki görselde ülke bazlı atılmış tweetlerin çoktan aza doğru sıralanmış halini görüyoruz. Amerika ve İngiltere gelen tweet sayısında başı çekmektedir.
2.Görselde duygu analizinin ülke bazlı yansımasını görmekteyiz. Özellikle vaka saysının ve ölüm oranlarının çok olduğu ülkelerde negatif tweet sayılarının fazla olması dikkat çekici. (bknz: USA)
Amerika ve İngiltere’ye bakıldığında halen olumsuz bir süreçte olduğunu düşünebiliriz. Çünkü negatif tweet sayısı nötr tweet sayısına oldukça yakın ve pozitif tweet sayısı daha düşük gözükmektedir. Tam tersi olarak düşündüğümüzde de pozitif tweet sayıları negatif tweet sayılarını geçtiği veya yaklaştığı durumlarda o ülkenin giderek normalleşeceğini öngörmekteyiz.
Bu raporda seçtiğiniz gün ve tüm ülkeler bazında toplam duygu durumlarını görebiliyoruz.
Ve son olarak atılan tweet yoğunluğunu dünya haritası üzerinde gözlemleyebiliyoruz. Dünya üzerinde ki vaka konumlarının, atılan tweet konumlarına yaklaşık olarak benzerliği dikkat çekici. Psikolojik olarak düşünüldüğünde de bu virüsün insanları olumsuz veya herhangi bir duygu içerisinde etkileyip bir şekilde twitter ortamında tweet atmaya yönelttiğini düşünüyoruz. Psikolojik etmenler, gerçekten tweet atmamızı sağlamış olabilirler mi?
Amerika ve Avrupa kıtasındaki yoğunluk, Amerika ve Avrupa kıtası şuan da bize Coronavirüs’ün yeni merkezi haline geldiğini gösteriyor.
Veri Yalan Söylemez :)
SEVGİYLE KALIN…
Kaynak:
https://www.storybench.org/how-to-collect-tweets-from-the-twitter-streaming-api-using-python/
https://github.com/AstroHyde/gcp-tweets-streaming-pipeline/tree/master/twitter-to-pubsub
https://towardsdatascience.com/lets-build-a-streaming-data-pipeline-e873d671fc57