portaldacalheta.pt
  • Основен
  • Пъргав
  • Иновация
  • Тенденции
  • Back-End
Наука За Данни И Бази Данни

Урок за поточно предаване на Apache Spark: Идентифициране на популярни хештегове в Twitter



Днес данните нарастват и се натрупват по-бързо от преди. В момента около 90% от данните, генерирани в нашия свят, са генерирани през последните две години. Поради този ръст в скоростта, платформи голяма информация трябваше да приемат радикални решения, за да могат да поддържат толкова големи обеми данни.

в какво са кодирани дискорд ботове

Един от най-значимите източници на данни днес са социалните медии. Позволете ми да демонстрирам пример от реалния живот: управление, анализ и извличане на информация от данните в социалните медии в реално време, използвайки едно от еко решенията в голяма информация най-важните от тях - Apache Spark и Python.



Apache Spark Streaming може да се използва за извличане на информация от социалните медии, като например тенденции в Twitter



В тази статия ще ви покажа как да създадете просто приложение, което чете онлайн емисии на Twitter с помощта на Python, след което обработва туитове с помощта на Apache Spark Streaming за да идентифицирате хаштаговете и накрая да върнете най-важните тенденционни хаштагове и да рендирате тези данни в таблото в реално време.



Създаване на собствени идентификационни данни за API на Twitter

За да получавате туитове от Twitter, трябва да се регистрирате на TwitterApps Като кликнете върху „Създаване на ново приложение“ и след попълване на формуляра по-долу, кликнете върху „Създайте вашето приложение в Twitter“.

Снимка на екрана: Как да създадете вашето Twitter приложение



Второ, отидете на новосъздаденото приложение и отворете прозореца „Идентификатори и ключове за достъп“. След това кликнете върху „Генериране на моя идентификатор за достъп“.

Снимка на екрана: Инсталиране на идентификационни данни за приложение за Twitter, пароли и идентификатори за достъп



Новите ви идентификатори за вход ще се появят, както е показано по-долу.

Снимка на екрана: Инсталиране на идентификатори за достъп за приложението Twiiter



И сега сте готови за следващата стъпка.

Изградете HTTP клиент на Twitter

В тази стъпка ще ви покажа как да изградите прост клиент, който да извлича туитове от Twitter API с помощта на Python и след това да ги предава на екземпляра Искрено поточно предаване . Трябва да е лесно да се следват за всеки разработчик на python професионален.



Първо ще създадем файл, наречен twitter_app.py и след това ще добавим кода заедно, както се вижда по-долу.

5 основни принципа на дизайна

Импортирайте библиотеките, които ще използваме, както е показано по-долу:



import socket import sys import requests import requests_oauthlib import json

И добавете променливите, които ще се използват в OAuth за свързване с Twitter, както е показано по-долу:

# Reemplaza los valores de abajo con los tuyos ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN' ACCESS_SECRET = 'YOUR_ACCESS_SECRET' CONSUMER_KEY = 'YOUR_CONSUMER_KEY' CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET' my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)

Сега, нека създадем нова функция, наречена get_tweets който ще извика URL адреса на API на Twitter и ще върне отговора за низ от туитове.

def get_tweets(): url = 'https://stream.twitter.com/1.1/statuses/filter.json' query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')] query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data]) response = requests.get(query_url, auth=my_auth, stream=True) print(query_url, response) return response

След това създавате функция, която взема отговора от изгледа по-горе и извлича текста на туитовете от JSON обекта на пълните туитове. След това изпратете всеки туит до инстанцията Искрено поточно предаване (ще бъде обсъдено по-късно) през TCP връзка.

def send_tweets_to_spark(http_resp, tcp_connection): for line in http_resp.iter_lines(): try: full_tweet = json.loads(line) tweet_text = full_tweet['text'] print('Tweet Text: ' + tweet_text) print ('------------------------------------------') tcp_connection.send(tweet_text + ' ') except: e = sys.exc_info()[0] print('Error: %s' % e)

Сега ще направим основната част. Това ще накара приложението да хоства връзките гнездо , с която по-късно ще се свърже Искра . Нека зададем IP тук localhost тъй като всичко ще работи на една и съща машина и на порт 9009. След това ще извикаме метода get_tweets, който направихме по-горе, за да получим туитовете от Twitter и да предадем отговора ви с връзката гнездо a send_tweets_to_spark да изпрати туитовете на Spark.

TCP_IP = 'localhost' TCP_PORT = 9009 conn = None s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((TCP_IP, TCP_PORT)) s.listen(1) print('Waiting for TCP connection...') conn, addr = s.accept() print('Connected... Starting getting tweets.') resp = get_tweets() send_tweets_to_spark(resp, conn)

Инсталиране на нашето приложение за поточно предаване Apache Spark

Нека да изградим нашето приложение Искрено поточно предаване , който ще обработва входящите туитове в реално време, извлича от тях хаштагове и изчислява колко хаштага са споменати.

Илюстрация: * Spark streaming * позволява обработка на входящи туитове в реално време и извличане на хаштаг

Първо, трябва да създадем екземпляр Искрен контекст sc, след това създаваме Контекст на поточно предаване ssc от sc с интервал от две секунди, който ще извърши трансформацията във всички предавания, получени на всеки две секунди. Обърнете внимание, че задаваме нивото на регистрационния файл на ERROR за да можете да деактивирате повечето дневници, които пишете Искра .

Тук дефинираме контролна точка, за да можем да разрешим периодична проверка на RDD; това е задължително, за да се използва в нашето приложение, тъй като ще използваме трансформации за пожарогасене в състояние (ще бъдат обсъдени по-късно в същия раздел).

След това дефинираме основния ни DStream dataStream, който ще свърже сървъра гнездо които създадохме по-рано в пристанището 9009 и ще прочете туитовете от този порт. Всеки запис в DStream ще бъде чуруликане.

най-добрите езици за програмиране за роботика
from pyspark import SparkConf,SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import Row,SQLContext import sys import requests # crea una configuración spark conf = SparkConf() conf.setAppName('TwitterStreamApp') # crea un contexto spark con la configuración anterior sc = SparkContext(conf=conf) sc.setLogLevel('ERROR') # crea el Contexto Streaming desde el contexto spark visto arriba con intervalo de 2 segundos ssc = StreamingContext(sc, 2) # establece un punto de control para permitir la recuperación de RDD ssc.checkpoint('checkpoint_TwitterApp') # lee data del puerto 9009 dataStream = ssc.socketTextStream('localhost',9009)

Сега ще определим нашата логика на трансформация. Първо, ще разделим всички туитове на думи и ще ги поставим в RDD думи. След това филтрираме само хаштаговете на всички думи и ги начертаваме до (hashtag, 1) и ги поставяме в RDD хаштагове.

След това трябва да изчислим колко пъти е споменат хаштагът. Можем да направим това с помощта на функцията reduceByKey Тази функция ще изчисли колко пъти е споменат хаштагът от всяка група, т.е. ще нулира акаунта във всяка група.

В нашия случай трябва да изчислим броя на всички групи, така че ще използваме друга функция, наречена updateStateByKey тъй като тази функция ви позволява да запазите състоянието на RDD, докато го актуализирате с нови данни. Тази форма се нарича Stateful Transformation.

Имайте предвид, че за да използвате updateStateByKey, трябва да конфигурирате контролна точка и какво е направено в предишната стъпка.

# divide cada Tweet en palabras words = dataStream.flatMap(lambda line: line.split(' ')) # filtra las palabras para obtener solo hashtags, luego mapea cada hashtag para que sea un par de (hashtag,1) hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1)) # agrega la cuenta de cada hashtag a su última cuenta tags_totals = hashtags.updateStateByKey(aggregate_tags_count) # procesa cada RDD generado en cada intervalo tags_totals.foreachRDD(process_rdd) # comienza la computación de streaming ssc.start() # espera que la transmisión termine ssc.awaitTermination()

updateStateByKey приема функция като параметър, наречена функция update Това се изпълнява на всеки елемент в RDD и изпълнява желаната логика.

В нашия случай създадохме функция за актуализация, наречена aggregate_tags_count което ще сумира всички new_values (нови стойности) за всеки хаштаг и ще ги добави към total_sum (обща сума), което е сумата от всички групи и записва данните в RDD tags_totals.

def aggregate_tags_count(new_values, total_sum): return sum(new_values) + (total_sum or 0)

След това правим RDD обработка tags_totals във всяка група, за да може да го преобразува във временна таблица, използвайки Spark SQL контекст и след това направете изявление, за да можете да вземете първите десет хаштагове с техните акаунти и да ги поставите в рамките на данните hashtag_counts_df

def get_sql_context_instance(spark_context): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(spark_context) return globals()['sqlContextSingletonInstance'] def process_rdd(time, rdd): print('----------- %s -----------' % str(time)) try: # obtén el contexto spark sql singleton desde el contexto actual sql_context = get_sql_context_instance(rdd.context) # convierte el RDD a Row RDD row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1])) # crea un DF desde el Row RDD hashtags_df = sql_context.createDataFrame(row_rdd) # Registra el marco de data como tabla hashtags_df.registerTempTable('hashtags') # obtén los 10 mejores hashtags de la tabla utilizando SQL e imprímelos hashtag_counts_df = sql_context.sql('select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10') hashtag_counts_df.show() # llama a este método para preparar los 10 mejores hashtags DF y envíalos send_df_to_dashboard(hashtag_counts_df) except: e = sys.exc_info()[0] print('Error: %s' % e)

Последната стъпка в нашето приложение Spark е изпращането на рамката с данни hashtag_counts_df към приложението на таблото за управление. По този начин ще преобразуваме рамката с данни в две матрици, една за хаштаговете и друга за техните акаунти. След това ще преминем към приложението на таблото чрез REST API.

def send_df_to_dashboard(df): # extrae los hashtags del marco de data y conviértelos en una matriz top_tags = [str(t.hashtag) for t in df.select('hashtag').collect()] # extrae las cuentas del marco de data y conviértelos en una matriz tags_count = [p.hashtag_count for p in df.select('hashtag_count').collect()] # inicia y envía la data a través de la API REST url = 'http://localhost:5001/updateData' request_data = {'label': str(top_tags), 'data': str(tags_count)} response = requests.post(url, data=request_data)

И накрая, ето пример за изхода на Искрено поточно предаване по време на изпълнение и отпечатване на hashtag_counts_df. Ще забележите, че изходът се отпечатва точно на всеки две секунди за всеки групов интервал.

Пример за изход за Twitter * Spark streaming *, отпечатан за всяка настройка на интервал от група

Създайте просто табло за управление в реално време за представяне на данни

Сега ще създадем просто приложение за табло, което ще бъде актуализирано в реално време от Spark. Ще го изградим с помощта на Python, Flask и Charts.js .

Първо ще създадем проект на Python със структурата, показана по-долу, ще изтеглим и добавим файла Chart.js в статичната директория.

Илюстрация: Създайте проект на Python за използване в анализа на хештег в Twitter

След това във файла app.py ще създадем функция, наречена update_data, която ще бъде извикана от Spark чрез URL http://localhost:5001/updateData за да можете да актуализирате глобални етикети и масиви от стойности.

По същия начин функцията refresh_graph_data той е създаден, за да бъде извикан от заявката AJAX за връщане на новите актуализирани етикети и масиви от стойности като JSON. Функцията get_chart_page ще напусне страницата chart.html при повикване.

from flask import Flask,jsonify,request from flask import render_template import ast app = Flask(__name__) labels = [] values = [] @app.route('/') def get_chart_page(): global labels,values labels = [] values = [] return render_template('chart.html', values=values, labels=labels) @app.route('/refreshData') def refresh_graph_data(): global labels, values print('labels now: ' + str(labels)) print('data now: ' + str(values)) return jsonify(sLabel=labels, sData=values) @app.route('/updateData', methods=['POST']) def update_data(): global labels, values if not request.form or 'data' not in request.form: return 'error',400 labels = ast.literal_eval(request.form['label']) values = ast.literal_eval(request.form['data']) print('labels received: ' + str(labels)) print('data received: ' + str(values)) return 'success',201 if __name__ == '__main__': app.run(host='localhost', port=5001)

Сега ще създадем проста графика във файла chart.html за да можете да показвате данните от хаштага и да ги актуализирате в реално време. Както е дефинирано по-долу, трябва да импортираме JavaScript библиотеките, Chart.js и jquery.min.js.

В тялото на маркера трябва да създадем платно и да му дадем идентификатор, за да можем да го препращаме, докато показваме графиката, когато използваме JavaScript в следващата стъпка.

Top Trending Twitter Hashtags

Top Trending Twitter Hashtags

Сега ще създадем графиката, използвайки JavaScript кода по-долу. Първо вземаме елемента на платното и след това създаваме нов обект на графика и му предаваме елемента на платното и дефинираме обекта на данни, както се вижда по-долу.

__________ е подреждането и външния вид на буквите в графичния дизайн.

Имайте предвид, че етикетите с данни се обединяват с етикети и променливи на стойност, които се връщат, докато напускате страницата, когато извиквате get_chart_page във файла app.py.

Последната част е функцията, която е конфигурирана да прави заявка за Ajax всяка секунда и да извиква URL адреса /refreshData, който ще изпълни refresh_graph_data в app.py и ще върне новите актуализирани данни и след това ще актуализира графиката, която новите данни оставят.

var ctx = document.getElementById('chart'); var myChart = new Chart(ctx, { type: 'horizontalBar', data: { labels: [{% for item in labels %} '{{item}}', {% endfor %}], datasets: [{ label: '# of Mentions', data: [{% for item in values %} {{item}}, {% endfor %}], backgroundColor: [ 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)', 'rgba(255, 159, 64, 0.2)', 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)' ], borderColor: [ 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)', 'rgba(255, 159, 64, 1)', 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)' ], borderWidth: 1 }] }, options: { scales: { yAxes: [{ ticks: { beginAtZero:true } }] } } }); var src_Labels = []; var src_Data = []; setInterval(function(){ $.getJSON('/refreshData', { }, function(data) { src_Labels = data.sLabel; src_Data = data.sData; }); myChart.data.labels = src_Labels; myChart.data.datasets[0].data = src_Data; myChart.update(); },1000);

Стартирайте приложенията заедно

Ще стартираме трите приложения в реда по-долу: 1. Клиент на Twitter App. 2. Приложение Spark 3. Уеб приложение на таблото за управление.

След това можете да получите достъп до контролния панел в реално време, като потърсите URL адреса

Сега можете да видите как вашата графика се актуализира по-долу:

Анимация: Графика на популярни хаштагове в Twitter в реално време

Употреба в реалния живот на Apache Streaming

Научихме се да правим лесен анализ на данни в реално време с помощта на Spark Streaming и да го интегрираме директно с прост контролен панел, използвайки RESTful уеб услуга. От този пример можем да видим колко мощна е Spark, тъй като улавя масивен поток от данни, трансформира го и извлича ценна информация, която лесно може да се използва за вземане на решения за кратко време. Има много полезни случаи на употреба, които могат да бъдат внедрени и могат да обслужват различни индустрии, като новини или маркетинг.

Илюстрация: Хештеговете могат да се използват за извличане на информация и настроение за стойност, които могат да бъдат приложени към множество индустрии.

Пример за индустрия за новини

как да си направим google стъкло

Можем да проследим най-често споменаваните хаштагове, за да разберем за какви теми говорят хората в социалните медии. Също така можем да проследяваме конкретни хаштагове и техните туитове, за да разберем какво казват хората по конкретни теми или събития в света.

Пример за маркетинг

Можем да събираме предаването на туитове и, като правим анализ на мнението, да ги категоризираме и да определяме интересите на хората, за да им предоставяме оферти, свързани с техните интереси.

Също така има много случаи на употреба, които могат да бъдат приложени специално за анализ. голяма информация и те могат да обслужват много индустрии. За повече случаи на използване на Apache Spark като цяло предлагам да разгледате един от нашите предишни публикации .

Препоръчвам ви да прочетете повече за Искрено поточно предаване тук за да научите повече за неговите възможности и да направите по-усъвършенствана трансформация на данни за повече информация в реално време, когато я използвате.

Стратегия за дизайн - Ръководство за тактическо мислене в дизайна

Процес На Проектиране

Стратегия за дизайн - Ръководство за тактическо мислене в дизайна
Старши инженер отпред, екип на клиентския портал

Старши инженер отпред, екип на клиентския портал

Други

Популярни Публикации
ApeeScape разраства връзката си с Amazon Web Services, за да продължи да стимулира икономиката на талантите
ApeeScape разраства връзката си с Amazon Web Services, за да продължи да стимулира икономиката на талантите
Въведение в теорията и сложността на изчислимостта
Въведение в теорията и сложността на изчислимостта
Ръководство стъпка по стъпка за проектиране на персонализирани илюстрации без предишен опит
Ръководство стъпка по стъпка за проектиране на персонализирани илюстрации без предишен опит
Обяснено оптимизиране на ефективността на Magento
Обяснено оптимизиране на ефективността на Magento
Изчерпателно ръководство за дизайн на известия
Изчерпателно ръководство за дизайн на известия
 
Малки данни, големи възможности
Малки данни, големи възможности
Достъпност в мрежата: Защо стандартите W3C често се игнорират
Достъпност в мрежата: Защо стандартите W3C често се игнорират
Бъдещето на UX е нашето човечество
Бъдещето на UX е нашето човечество
Предвиждащ дизайн: Как да създадем магически потребителски опит
Предвиждащ дизайн: Как да създадем магически потребителски опит
Въведение в Python Microservices с Nameko
Въведение в Python Microservices с Nameko
Популярни Публикации
  • създайте блог с angularjs
  • обектен модел на страница в selenium webdriver
  • как да намеря разработчик на софтуер
  • как да направите видео в After Effects
  • какво правят главните финансови директори
  • как да направя настройка на производителността в sql
Категории
  • Пъргав
  • Иновация
  • Тенденции
  • Back-End
  • © 2022 | Всички Права Запазени

    portaldacalheta.pt