Днес данните нарастват и се натрупват по-бързо от преди. В момента около 90% от данните, генерирани в нашия свят, са генерирани през последните две години. Поради този ръст в скоростта, платформи голяма информация трябваше да приемат радикални решения, за да могат да поддържат толкова големи обеми данни.
в какво са кодирани дискорд ботове
Един от най-значимите източници на данни днес са социалните медии. Позволете ми да демонстрирам пример от реалния живот: управление, анализ и извличане на информация от данните в социалните медии в реално време, използвайки едно от еко решенията в голяма информация най-важните от тях - Apache Spark и Python.
В тази статия ще ви покажа как да създадете просто приложение, което чете онлайн емисии на Twitter с помощта на Python, след което обработва туитове с помощта на Apache Spark Streaming за да идентифицирате хаштаговете и накрая да върнете най-важните тенденционни хаштагове и да рендирате тези данни в таблото в реално време.
За да получавате туитове от Twitter, трябва да се регистрирате на TwitterApps Като кликнете върху „Създаване на ново приложение“ и след попълване на формуляра по-долу, кликнете върху „Създайте вашето приложение в Twitter“.
Второ, отидете на новосъздаденото приложение и отворете прозореца „Идентификатори и ключове за достъп“. След това кликнете върху „Генериране на моя идентификатор за достъп“.
Новите ви идентификатори за вход ще се появят, както е показано по-долу.
И сега сте готови за следващата стъпка.
В тази стъпка ще ви покажа как да изградите прост клиент, който да извлича туитове от Twitter API с помощта на Python и след това да ги предава на екземпляра Искрено поточно предаване . Трябва да е лесно да се следват за всеки разработчик на python професионален.
Първо ще създадем файл, наречен twitter_app.py
и след това ще добавим кода заедно, както се вижда по-долу.
5 основни принципа на дизайна
Импортирайте библиотеките, които ще използваме, както е показано по-долу:
import socket import sys import requests import requests_oauthlib import json
И добавете променливите, които ще се използват в OAuth за свързване с Twitter, както е показано по-долу:
# Reemplaza los valores de abajo con los tuyos ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN' ACCESS_SECRET = 'YOUR_ACCESS_SECRET' CONSUMER_KEY = 'YOUR_CONSUMER_KEY' CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET' my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)
Сега, нека създадем нова функция, наречена get_tweets
който ще извика URL адреса на API на Twitter и ще върне отговора за низ от туитове.
def get_tweets(): url = 'https://stream.twitter.com/1.1/statuses/filter.json' query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')] query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data]) response = requests.get(query_url, auth=my_auth, stream=True) print(query_url, response) return response
След това създавате функция, която взема отговора от изгледа по-горе и извлича текста на туитовете от JSON обекта на пълните туитове. След това изпратете всеки туит до инстанцията Искрено поточно предаване (ще бъде обсъдено по-късно) през TCP връзка.
def send_tweets_to_spark(http_resp, tcp_connection): for line in http_resp.iter_lines(): try: full_tweet = json.loads(line) tweet_text = full_tweet['text'] print('Tweet Text: ' + tweet_text) print ('------------------------------------------') tcp_connection.send(tweet_text + '
') except: e = sys.exc_info()[0] print('Error: %s' % e)
Сега ще направим основната част. Това ще накара приложението да хоства връзките гнездо , с която по-късно ще се свърже Искра . Нека зададем IP тук localhost
тъй като всичко ще работи на една и съща машина и на порт 9009
. След това ще извикаме метода get_tweets
, който направихме по-горе, за да получим туитовете от Twitter и да предадем отговора ви с връзката гнездо a send_tweets_to_spark
да изпрати туитовете на Spark.
TCP_IP = 'localhost' TCP_PORT = 9009 conn = None s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((TCP_IP, TCP_PORT)) s.listen(1) print('Waiting for TCP connection...') conn, addr = s.accept() print('Connected... Starting getting tweets.') resp = get_tweets() send_tweets_to_spark(resp, conn)
Нека да изградим нашето приложение Искрено поточно предаване , който ще обработва входящите туитове в реално време, извлича от тях хаштагове и изчислява колко хаштага са споменати.
Първо, трябва да създадем екземпляр Искрен контекст sc
, след това създаваме Контекст на поточно предаване ssc
от sc
с интервал от две секунди, който ще извърши трансформацията във всички предавания, получени на всеки две секунди. Обърнете внимание, че задаваме нивото на регистрационния файл на ERROR
за да можете да деактивирате повечето дневници, които пишете Искра .
Тук дефинираме контролна точка, за да можем да разрешим периодична проверка на RDD; това е задължително, за да се използва в нашето приложение, тъй като ще използваме трансформации за пожарогасене в състояние (ще бъдат обсъдени по-късно в същия раздел).
След това дефинираме основния ни DStream dataStream, който ще свърже сървъра гнездо които създадохме по-рано в пристанището 9009
и ще прочете туитовете от този порт. Всеки запис в DStream ще бъде чуруликане.
най-добрите езици за програмиране за роботика
from pyspark import SparkConf,SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import Row,SQLContext import sys import requests # crea una configuración spark conf = SparkConf() conf.setAppName('TwitterStreamApp') # crea un contexto spark con la configuración anterior sc = SparkContext(conf=conf) sc.setLogLevel('ERROR') # crea el Contexto Streaming desde el contexto spark visto arriba con intervalo de 2 segundos ssc = StreamingContext(sc, 2) # establece un punto de control para permitir la recuperación de RDD ssc.checkpoint('checkpoint_TwitterApp') # lee data del puerto 9009 dataStream = ssc.socketTextStream('localhost',9009)
Сега ще определим нашата логика на трансформация. Първо, ще разделим всички туитове на думи и ще ги поставим в RDD думи. След това филтрираме само хаштаговете на всички думи и ги начертаваме до (hashtag, 1)
и ги поставяме в RDD хаштагове.
След това трябва да изчислим колко пъти е споменат хаштагът. Можем да направим това с помощта на функцията reduceByKey
Тази функция ще изчисли колко пъти е споменат хаштагът от всяка група, т.е. ще нулира акаунта във всяка група.
В нашия случай трябва да изчислим броя на всички групи, така че ще използваме друга функция, наречена updateStateByKey
тъй като тази функция ви позволява да запазите състоянието на RDD, докато го актуализирате с нови данни. Тази форма се нарича Stateful Transformation
.
Имайте предвид, че за да използвате updateStateByKey
, трябва да конфигурирате контролна точка и какво е направено в предишната стъпка.
# divide cada Tweet en palabras words = dataStream.flatMap(lambda line: line.split(' ')) # filtra las palabras para obtener solo hashtags, luego mapea cada hashtag para que sea un par de (hashtag,1) hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1)) # agrega la cuenta de cada hashtag a su última cuenta tags_totals = hashtags.updateStateByKey(aggregate_tags_count) # procesa cada RDD generado en cada intervalo tags_totals.foreachRDD(process_rdd) # comienza la computación de streaming ssc.start() # espera que la transmisión termine ssc.awaitTermination()
updateStateByKey
приема функция като параметър, наречена функция update
Това се изпълнява на всеки елемент в RDD и изпълнява желаната логика.
В нашия случай създадохме функция за актуализация, наречена aggregate_tags_count
което ще сумира всички new_values
(нови стойности) за всеки хаштаг и ще ги добави към total_sum
(обща сума), което е сумата от всички групи и записва данните в RDD tags_totals
.
def aggregate_tags_count(new_values, total_sum): return sum(new_values) + (total_sum or 0)
След това правим RDD обработка tags_totals
във всяка група, за да може да го преобразува във временна таблица, използвайки Spark SQL контекст и след това направете изявление, за да можете да вземете първите десет хаштагове с техните акаунти и да ги поставите в рамките на данните hashtag_counts_df
def get_sql_context_instance(spark_context): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(spark_context) return globals()['sqlContextSingletonInstance'] def process_rdd(time, rdd): print('----------- %s -----------' % str(time)) try: # obtén el contexto spark sql singleton desde el contexto actual sql_context = get_sql_context_instance(rdd.context) # convierte el RDD a Row RDD row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1])) # crea un DF desde el Row RDD hashtags_df = sql_context.createDataFrame(row_rdd) # Registra el marco de data como tabla hashtags_df.registerTempTable('hashtags') # obtén los 10 mejores hashtags de la tabla utilizando SQL e imprímelos hashtag_counts_df = sql_context.sql('select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10') hashtag_counts_df.show() # llama a este método para preparar los 10 mejores hashtags DF y envíalos send_df_to_dashboard(hashtag_counts_df) except: e = sys.exc_info()[0] print('Error: %s' % e)
Последната стъпка в нашето приложение Spark е изпращането на рамката с данни hashtag_counts_df
към приложението на таблото за управление. По този начин ще преобразуваме рамката с данни в две матрици, една за хаштаговете и друга за техните акаунти. След това ще преминем към приложението на таблото чрез REST API.
def send_df_to_dashboard(df): # extrae los hashtags del marco de data y conviértelos en una matriz top_tags = [str(t.hashtag) for t in df.select('hashtag').collect()] # extrae las cuentas del marco de data y conviértelos en una matriz tags_count = [p.hashtag_count for p in df.select('hashtag_count').collect()] # inicia y envía la data a través de la API REST url = 'http://localhost:5001/updateData' request_data = {'label': str(top_tags), 'data': str(tags_count)} response = requests.post(url, data=request_data)
И накрая, ето пример за изхода на Искрено поточно предаване по време на изпълнение и отпечатване на hashtag_counts_df
. Ще забележите, че изходът се отпечатва точно на всеки две секунди за всеки групов интервал.
Сега ще създадем просто приложение за табло, което ще бъде актуализирано в реално време от Spark. Ще го изградим с помощта на Python, Flask и Charts.js .
Първо ще създадем проект на Python със структурата, показана по-долу, ще изтеглим и добавим файла Chart.js в статичната директория.
След това във файла app.py
ще създадем функция, наречена update_data
, която ще бъде извикана от Spark чрез URL http://localhost:5001/updateData
за да можете да актуализирате глобални етикети и масиви от стойности.
По същия начин функцията refresh_graph_data
той е създаден, за да бъде извикан от заявката AJAX за връщане на новите актуализирани етикети и масиви от стойности като JSON. Функцията get_chart_page
ще напусне страницата chart.html
при повикване.
from flask import Flask,jsonify,request from flask import render_template import ast app = Flask(__name__) labels = [] values = [] @app.route('/') def get_chart_page(): global labels,values labels = [] values = [] return render_template('chart.html', values=values, labels=labels) @app.route('/refreshData') def refresh_graph_data(): global labels, values print('labels now: ' + str(labels)) print('data now: ' + str(values)) return jsonify(sLabel=labels, sData=values) @app.route('/updateData', methods=['POST']) def update_data(): global labels, values if not request.form or 'data' not in request.form: return 'error',400 labels = ast.literal_eval(request.form['label']) values = ast.literal_eval(request.form['data']) print('labels received: ' + str(labels)) print('data received: ' + str(values)) return 'success',201 if __name__ == '__main__': app.run(host='localhost', port=5001)
Сега ще създадем проста графика във файла chart.html
за да можете да показвате данните от хаштага и да ги актуализирате в реално време. Както е дефинирано по-долу, трябва да импортираме JavaScript библиотеките, Chart.js
и jquery.min.js
.
В тялото на маркера трябва да създадем платно и да му дадем идентификатор, за да можем да го препращаме, докато показваме графиката, когато използваме JavaScript в следващата стъпка.
Top Trending Twitter Hashtags Top Trending Twitter Hashtags
Сега ще създадем графиката, използвайки JavaScript кода по-долу. Първо вземаме елемента на платното и след това създаваме нов обект на графика и му предаваме елемента на платното и дефинираме обекта на данни, както се вижда по-долу.
__________ е подреждането и външния вид на буквите в графичния дизайн.
Имайте предвид, че етикетите с данни се обединяват с етикети и променливи на стойност, които се връщат, докато напускате страницата, когато извиквате get_chart_page
във файла app.py
.
Последната част е функцията, която е конфигурирана да прави заявка за Ajax всяка секунда и да извиква URL адреса /refreshData
, който ще изпълни refresh_graph_data
в app.py
и ще върне новите актуализирани данни и след това ще актуализира графиката, която новите данни оставят.
var ctx = document.getElementById('chart'); var myChart = new Chart(ctx, { type: 'horizontalBar', data: { labels: [{% for item in labels %} '{{item}}', {% endfor %}], datasets: [{ label: '# of Mentions', data: [{% for item in values %} {{item}}, {% endfor %}], backgroundColor: [ 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)', 'rgba(255, 159, 64, 0.2)', 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)' ], borderColor: [ 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)', 'rgba(255, 159, 64, 1)', 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)' ], borderWidth: 1 }] }, options: { scales: { yAxes: [{ ticks: { beginAtZero:true } }] } } }); var src_Labels = []; var src_Data = []; setInterval(function(){ $.getJSON('/refreshData', { }, function(data) { src_Labels = data.sLabel; src_Data = data.sData; }); myChart.data.labels = src_Labels; myChart.data.datasets[0].data = src_Data; myChart.update(); },1000);
Ще стартираме трите приложения в реда по-долу: 1. Клиент на Twitter App. 2. Приложение Spark 3. Уеб приложение на таблото за управление.
След това можете да получите достъп до контролния панел в реално време, като потърсите URL адреса
Сега можете да видите как вашата графика се актуализира по-долу:
Научихме се да правим лесен анализ на данни в реално време с помощта на Spark Streaming и да го интегрираме директно с прост контролен панел, използвайки RESTful уеб услуга. От този пример можем да видим колко мощна е Spark, тъй като улавя масивен поток от данни, трансформира го и извлича ценна информация, която лесно може да се използва за вземане на решения за кратко време. Има много полезни случаи на употреба, които могат да бъдат внедрени и могат да обслужват различни индустрии, като новини или маркетинг.
Пример за индустрия за новини
как да си направим google стъкло
Можем да проследим най-често споменаваните хаштагове, за да разберем за какви теми говорят хората в социалните медии. Също така можем да проследяваме конкретни хаштагове и техните туитове, за да разберем какво казват хората по конкретни теми или събития в света.
Пример за маркетинг
Можем да събираме предаването на туитове и, като правим анализ на мнението, да ги категоризираме и да определяме интересите на хората, за да им предоставяме оферти, свързани с техните интереси.
Също така има много случаи на употреба, които могат да бъдат приложени специално за анализ. голяма информация и те могат да обслужват много индустрии. За повече случаи на използване на Apache Spark като цяло предлагам да разгледате един от нашите предишни публикации .
Препоръчвам ви да прочетете повече за Искрено поточно предаване тук за да научите повече за неговите възможности и да направите по-усъвършенствана трансформация на данни за повече информация в реално време, когато я използвате.