Analisi delle Crypto Valute: Raccogliere i dati con Kafka, ClickHouse e Cryptofeed

scienzadeidati articoli - Raccogliere i dati con Kafka ClickHouse Cryptofeed

In questo post vediamo il primo passo per sviluppare un’infrastruttura di analisi per ricercare adeguatamente le dinamiche del mercato delle criptovalute, la raccolta e l’archiviazione dei dati.

La Microstruttura di mercato è un settore dell’Economia e della Finanza che studia le dinamiche dei mercati finanziari, concentrandosi sugli attori (market maker, speculatori e retail trader) e sulla struttura degli scambi (frammentazione, Dealer vs LOB e trasparenza del mercato).
I dati di cambio tendono a contenere enormi quantità di dati sul trading e sul orderbook (libro degli ordini). Per raccogliere questi dati in modo efficace, dovremo creare un sistema affidabile per gestire dati ad alta frequenza e quantità di dati su larga scala.
Una settimana di dati di trading per un asset può facilmente raggiungere centinaia di milioni di punti dati all’interno di un unico scambio. Sottolineo che non entriamo nei dettagli tecnici relativi a ClickHouse e Kafka. Forniamo solo una spiegazione introduttiva del motivo per cui questi strumenti sono stati selezionati. La priorità di questo articolo è mostrare come costruire dall’inizio alla fine un sistema di raccolta e archiviazione di dati di criptovalute.
Durante il tutorial sono forniti collegamenti per una comprensione più approfondita degli strumenti.

Dove archiviare enormi set di dati? Presentazione di ClickHouse

ClickHouse è un sistema di database basato su colonne creato per aggregare facilmente milioni di punti dati in pochi millisecondi. ClickHouse è un database OLAP (elaborazione analitica online), ciò significa che ClickHouse è specializzato in un’elevata produttività di insert, insert da centinaia di migliaia a milioni di righe alla volta, anziché singoli insert. Il vantaggio è che ClickHouse avrà una velocità impareggiabile per l’analisi, ma non potrà supportare singoli insert continui. È qui che entra in gioco Kafka.

Come produrre un’elevata produttività? Presentazione di Kafka

Apache Kafka è un sistema distribuito di messaggistica di pubblicazione-sottoscrizione progettato per un elevato throughput. Quando si raccolgono dati di trading ad alta frequenza, è fondamentale disporre di uno strumento che possa essere veloce, tollerante ai guasti e scalabile. Kafka soddisfa tutte queste esigenze ed è realizzato da una combinazione ben equilibrata di produttori, consumatori e intermediari.
Gli intermediari, i broker fungono da tramite tra i produttori (invio di dati) e i consumatori (ricezione di dati). I broker lo fanno archiviando i messaggi in argomenti, diversi tipi di messaggi, come transazioni, dati Iot, tracciamenti, ecc. L’argomento è ciò che determina il percorso dal produttore al consumatore.

Da dove ottenere i dati? Presentazione di CryptoFeed

Cryptofeed, creata da Bryant Moscon, è una libreria Python che utilizza asyncio e WebSocket per fornire feed di trade di criptovaluta da un’ampia varietà di exchange. Con Cryptofeed, è possibile aggiungere più flussi di dati da exchange di cryptovalute in un feed unificato. Cryptofeed fornisce molti esempi di come  può essere integrato con più backend, incluso Kafka.

Configurazione di Kafka e ClickHouse con Docker

Ora è il momento di realizzare il nostro sistema di raccolta e archiviazione dei dati, utilizzando ClickHouse per l’archiviazione di dati di massa e Kafka per un’efficace acquisizione dei dati in ClickHouse. Per una facile riproducibilità e distribuzione, tutto questo sarà inserito all’interno di containers di Docker,

Step 1: installazione di  Docker e Docker Compose

Utilizziamo Docker per la distribuzione di ClickHouse e Kafka, quindi ti consiglio di avere l’ambiente Docker già installato prima di continuare.

Step 2: Creazione del Docker Compose per Clickhouse-Kafka

Il primo passo è creare una directory in cui memorizzare i nostri file docker, il file dei requisiti e il nostro main.py. In questo tutorial è denominata  CryptoDB.
Dopo essere entrati nella directory, creiamo un file docker-compose.yml e modifichiamo il file di composizione come di seguito. Descriviamo ogni passaggio in dettaglio.

            version: '3.9'

services: 

  clickhouse-server:
    image: yandex/clickhouse-server
    ports: 
     - "8123:8123"
     - "8124:9000"
    volumes:
      - clickhouse-volume:/var/lib/clickhouse
  
  cryptofeed:
    build:
      context: .
      dockerfile: Dockerfile
    volumes:
      - ./:/app
    depends_on:
    - zookeeper
    - kafka

  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    expose:
    - "2181"

  kafka:
    image: wurstmeister/kafka:2.11-2.0.0
    depends_on:
    - zookeeper
    ports:
    - "9092:9092"
    expose:
    - "9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
     
volumes:
  clickhouse-volume:
        

ClickHouse-Server: il servizio che crea il nostro database ClickHouse ed espone le porte 8123 e 9000 per HTTP/TCP. Tutti i dati verranno archiviati su un volume denominato, clickhouse-volume.

Zookeeper: necessario per la gestione dei nodi Kafka

Kafka: Il servizio crea il nostro nodo Kafka. Le variabili ambientali sono essenziali per il corretto funzionamento di Kafka e cambieranno in base alla posizione in cui si sta ospitando Kafka. Questo tutorial fornisce un’ottima spiegazione sul significato di queste variabili ambientali. Nel complesso, la maggior parte delle variabili ambientali determina il modo in cui Kafka comunica con altri servizi. La porta principale utilizzata da Kafka è la porta 9092.

CryptoFeed: crea un ambiente Python ed esegue main.py per creare i nostri produttori/argomenti Kafka e avviare connessioni WebSocket.

Dopo aver preparato il nostro Docker-compose, possiamo effettuare l’ultimo step prima di avviare i contenitori: scrivere lo script di cryptofeed e configurare il dockerfile di CryptoFeed.

Preparazione dello script CryptoFeed

 

Step 1: scrittura dello script CryptoFeed

Prima di utilizzare i motori  di ClickHouse e Kafka, dobbiamo creare uno script utilizzando CryptoFeed in modo da definire come strutturare le nostre tabelle in ClickHouse. Raccogliamo i dati di trading di BTCUSD e i dati del book degli ordini limite di livello 2 da due exchanges di criptovalute. BitFinex cattura circa il 3,5% del volume totale in 24 ore in tutto il mondo e Binance ne cattura circa il 25,5%. Selezioniamo questi due exchange per catturare dinamiche interessanti grazie alla grande frammentazione dei mercati delle criptovalute. Enormi exchange come Binance possono avere un comportamento microstrutturale di mercato unico rispetto a exchange più piccoli, come BitFinex.

            
from cryptofeed import FeedHandler
from cryptofeed.backends.kafka import BookKafka, TradeKafka
from cryptofeed.defines import L2_BOOK, TRADES, L3_BOOK
from cryptofeed.exchanges import Coinbase, Binance, Bitfinex


def main():
    f = FeedHandler()
    HOSTNAME = 'kafka'
    cbs = {TRADES: TradeKafka(bootstrap=HOSTNAME), L2_BOOK: BookKafka(bootstrap=HOSTNAME), L3_BOOK: BookKafka(bootstrap=HOSTNAME)}

    # Add trade and lv 2 bitcoin data to Feed
    f.add_feed(Bitfinex(max_depth=25, channels=[TRADES, L2_BOOK], symbols=['BTC-USD'], callbacks=cbs))
    f.add_feed(Binance(max_depth=25, channels=[TRADES, L2_BOOK], symbols=['BTC-BUSD'], callbacks=cbs))
    
    # Example of how to extract level 3 order book data
    # f.add_feed(Coinbase(max_depth=25, channels=[TRADES, L2_BOOK, L3_BOOK], symbols=['BTC-USD'], callbacks=cbs))

    f.run()


if __name__ == '__main__':
    main()
        

Nota: il nome dell’host deve essere lo stesso del servizio Kafka specificato nel file docker-compose, altrimenti l’impostazione predefinita prevede l’utilizzo di ‘localhost’. Localhost non funziona nel nostro approccio multi-container.

Aggiungiamo un feed al nostro gestore di feed, che si collegherà alle API del nostro exchange tramite websocket e quindi invierà i dati  di trading tramite i produttori Kafka ai nostri broker Kafka che memorizzano i nostri argomenti. CryptoFeed crea il proprio produttore e argomento Kafka quando utilizza TradeKafka e BookKafka.

Ci sarà un argomento unico per ogni canale, exchange e simbolo, quindi in totale creiamo 4 argomenti corrispondenti a 2 exchange (Binance e BitFinex), 2 canali (TRADES e L2_BOOK) e un simbolo (BTC-USD).

La struttura degli argomenti sono:

<channel>-<EXCHANGE>-<SYMBOL> (es. trade-COINBASE-BTC-USD)

Abbiamo anche inserito un esempio di codice per aggiungere i dati del orderbook di livello tre da CoinBase, ma commentato. Sentiti libero di esplorare CryptoFeed , ma tieni presente che in questo articolo non descriviamo come raccogliere i dati dell’orderbook di livello 3.

Dato che utilizziamo i dati del book L2 e dei dati di trade, la struttura delle tabelle finali è simile a questa.

Trades: {feed, symbol, timestamp, receipt_timestamp, side, amount, price, order_type, id}

Book: {timestamp, receipt_timestamp, delta, bid[0–25], ask[0–25], _topic}

Sia il lato bid che quello ask dell’order book avranno una profondità massima di 25 livelli di prezzo.

Un’ulteriore colonna _topic, l’argomento Kafka, viene aggiunta alla tabella del orderbook perché i dati originali non contengono l’exchange e il simbolo. _topic verrà utilizzato come metodo di filtraggio per analisi successive.

 

Step 2: Creazione del dockerfile di CryptoFeed

Affinché lo script CryptoFeed possa essere eseguito sul nostro container, abbiamo bisogno di un ambiente Python con tutti i pacchetti necessari, possiamo creare l’ambiente che desideriamo. Di seguito descriviamo come crearlo.

            FROM python:3.7-slim

WORKDIR /app
COPY . .

# upgrade pip, install requirements and update permissions for main.py
RUN pip install --upgrade pip \
    && pip install -r ./requirements.txt \
    && chmod +x /app/main.py

# Create CryptoFeed Kafka Producers
CMD ["python", "/app/main.py"]
        

Il dockerfile utilizza solo un’immagine python slim standard, installa i requisiti ed esegue lo script main.py. Molto semplice.

I requisiti sono:

            cryptofeed==1.9.3
aiokafka==0.7.2
        

Configurazione di Clickhouse e Kafka

Il passaggio successivo consiste nel creare i sottoscrittori/consumatori per acquisire e quindi archiviare i dati nelle tabelle ClickHouse. Abbiamo usato questa utile guida per la configurazione di Kafka-ClickHouse e descriviamo nel dettaglio di seguito.

Affinché una tabella ClickHouse raccolga dati da un argomento Kafka è necessario completare tre passaggi:

  • Creare le tabelle  finali MergeTree in ClickHouse per archiviare i dati per l’analisi a lungo termine
  • Creare tabelle del motore Kafka per strutturare i dati degli argomenti nel formato ClickHouse
  • Generare una vista materializzata per spostare i dati dalle tabelle del motore Kafka alle tabelle MergeTree

Step 1: creazione di tabelle MergeTree ClickHouse

Accediamo al server ClickHouse tramite CLI o tramite uno strumento di gestione database (es: Dbeaver e HouseOps).
Innanzitutto creiamo un nuovo database per archiviare le tabelle degli exchange di criptovalute

            CREATE DATABASE CryptoCrypt 
        

Quindi creiamo la tabella MergeTree dell’Orderbook L2 e la tabella MergeTree dei Trades. È molto importante usare il tipo corretto per ciascuna colonna. Guarda qui se hai bisogno di esplorare quali tipi usare.

            CREATE TABLE CryptoCrypt.Trades (
        feed String,
        symbol String,
        trade_date DateTime64(3),
        receipt_timestamp DateTime64(3),
        side String,
        amount Float64,
        price Float64,
        order_type Nullable(String), 
        id UInt64
        ) Engine = MergeTree
PARTITION BY toYYYYMM(trade_date)
ORDER BY (feed, trade_date);


CREATE TABLE CryptoCrypt.L2_orderbook (
        trade_date DateTime64(3),
        receipt_timestamp DateTime64(3),
        delta Int8,
        bid Map(String, String),
        ask Map(String, String)
        ) Engine = MergeTree
PARTITION BY toYYYYMM(trade_date)
ORDER BY (trade_date);

ALTER TABLE CryptoCrypt.L2_orderbook
  ADD COLUMN _topic String
        

Una cosa importante da notare per la tabella L2_orderbook è la complessità per memorizzare i bid e gli ask. Il tipo di mappa deve essere in grado di memorizzare i dati {price:volume} a una profondità di 25 livelli. La mappa non supporta i tipi float, quindi per preservare quei dati la chiave del prezzo e il valore del volume devono essere archiviati come stringhe. I tipi possono essere facilmente riportati alla normalità nella fase di elaborazione dei dati. Se prevediamo di utilizzare i dati L3, viene aggiunto un ulteriore livello di complessità perché dovremmo annidare le colonne bid e ask per memorizzare le sotto-colonne di ID ordine, prezzo e coppie di volume.

Infine dobbiamo evidenziare che per aggiungere la colonna virtuale, _topic, è necessario aggiungere la colonna DOPO la creazione della tabella e prevedere tale colonna solo nella tabella MergeTree e non nella tabella Kafka.

Step 2: Creazione di tabelle nel motore Kafka

Creiamo le tabelle del motore Kafka con la stessa struttura delle tabelle MergeTree. Ogni tabella del motore Kafka sottoscrive due argomenti in base ai dati che sta raccogliendo (trades o orderbook) e un broker che memorizza l’argomento. A fini di test locali, il broker Kafka si trova in Kafka:9092. Poiché ClickHouse e Kafka condividono la stessa rete, possiamo utilizzare “Kafka” come host.

            CREATE TABLE CryptoCrypt.trades_queue (
        feed String,
        symbol String,
        trade_date DateTime64(3),
        receipt_timestamp Float64,
        side String,
        amount Float64,
        price Float64,
        order_type Nullable(String), 
        id UInt64
        ) ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka:9092',
       kafka_topic_list = 'trades-BINANCE-BTC-BUSD, trades-BITFINEX-BTC-USD',
       kafka_group_name = 'trades_group1',
       kafka_format = 'JSONEachRow',
       kafka_row_delimiter = '\n';

CREATE TABLE CryptoCrypt.books_queue (
        trade_date Float64,
        receipt_timestamp Float64,
        delta Int8,
        bid Map(String, LowCardinality(Float64)),
        ask Map(String, LowCardinality(Float64))
        ) ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka:9092',
       kafka_topic_list = 'book-BINANCE-BTC-BUSD, book-BITFINEX-BTC-USD',
       kafka_group_name = 'trades_group1',
       kafka_format = 'JSONEachRow',
       kafka_row_delimiter = '\n';
        

Per avere maggiori dettagli sul funzionamento delle impostazioni di un motore Kafka, si può consultare questa guida. Inoltre, da notare come è stata risolta la complessità bid/ask utilizzando un trucco per assimilare la struttura dei dati {price: volume}. Poiché l’argomento archivia {prezzo:volume} come tipi {stringa:float}, è necessario utilizzare LowCardinality come soluzione alternativa per archiviare temporaneamente i dati prima che vengano inseriti in batch nella tabella MergeTree. Generalmente LowCardinality non è in grado di gestire enormi set di dati, ma in questo caso possiamo utilizzarlo poiché viene utilizzato solo per archiviare temporaneamente i dati in una tabella Kafka.

Step 3: Trasferire i dati tra tabelle con viste materializzate

L’ultimo passaggio consiste nel creare la vista materializzata che fungerà da meccanismo di inserimento tra le tabelle MergeTree e le tabelle Kafka. Possiamo osservare che la funzione cast viene utilizzata per convertire le colonne bid e ask nei corretti tipi di dati e viene richiesta la colonna virtuale _topic.

Non è necessario che le colonne virtuali siano già nella tabella Kafka per inserirle nella tabella MergeTree.

            CREATE MATERIALIZED VIEW CryptoCrypt.trades_queue_mv TO CryptoCrypt.Trades AS
SELECT *
FROM CryptoCrypt.trades_queue;

CREATE MATERIALIZED VIEW CryptoCrypt.books_queue_mv TO CryptoCrypt.L2_orderbook AS
SELECT trade_date, receipt_timestamp, delta, cast(bid, 'Map(String, String)') as bid , cast(ask, 'Map(String, String)') as ask, _topic 
FROM CryptoCrypt.books_queue;
        

Tramite le viste materializzate, i dati che erano stati archiviati negli argomenti inizieranno a essere archiviati nelle tabelle MergeTree finali.

Risoluzione dei problemi e test

Ora che il sistema di raccolta e archiviazione dei dati è in esecuzione, è tempo di verificare che tutto funzioni. Partiamo dai metodi di risoluzione dei problemi più semplici per rilevare i problemi e aumentare la complessità se il problema persiste.

Semplici controlli con ClickHouse

Possiamo verificare che tutte le tabelle siano state create e verificare se sono presenti dati nelle nostre tabelle MergeTree finali. Possiamo usare ClickHouse CLI o una console SQL su Dbeaver.

            # Show all tables, should have 2 kafka, 2 MergeTree and 2 views
SHOW TABLES FROM CryptoCrypt

# Check count of MergeTree tables
SELECT COUNT() FROM CryptoCrypt.L2_orderbook
SELECT COUNT() FROM CryptoCrypt.Trades

# Check if data is correct format 
SELECT * FROM CryptoCrypt.L2_orderbook LIMIT 5
SELECT * FROM CryptoCrypt.Trades LIMIT 5

# If no data in MergeTree tables check Kafka tables
SELECT * FROM CryptoCrypt.books_queue LIMIT 5
SELECT * FROM CryptoCrypt.trades_queue LIMIT 5
        

Se non ci sono dati nelle tabelle Kafka, è probabile che sia dovuto a un problema del produttore Kafka oppure una delle colonne è stata digitata in modo errato. Se viene visualizzato un ‘cannot parse error‘ è probabilmente che sia presente una errata digitazione, in caso contrario è probabilmente dovuto all’utilizzo di un erratto elenco di broker Kafka durante la creazione delle tabelle Kafka.

E’ necessario assicurarsi sempre che l’elenco di broker Kafka = ascoltatori pubblicati da Kafka.


Controllo dei broker e degli argomenti di Kafka

Se i problemi di cui sopra non si verificavano per la mancanza di dati nelle tabelle, è tempo di verificare se sono presenti dati archiviati nei topic. Per verificare se i topic sono stati creati e se stanno memorizzando dei dati possiamo accedere alla bash del contenitore docker di Kafka ed eseguire alcuni comandi

            # From your terminal run: MAKE SURE TO SET KAFKA CONTAINER NAME
> docker exec -i -t -u root $(docker ps | grep NAME_OF_KAFKA_CONTAINER | cut -d' ' -f1) /bin/bash

# List all topics
bash> $KAFKA_HOME/bin/kafka-topics.sh --list --bootstrap-server kafka:9092

# Check if a topic is collecting messages 
bash> $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic book-BITFINEX-BTC-USD --from-beginning

        

Se tutto funziona correttamente, nel terminale vediamo tutti e quattro i topic, insieme a migliaia di messaggi al secondo. Mentre se vediamo i nostri dati, è necessario tornare alla risoluzione dei problemi di ClickHouse e controllare i log, altrimenti è il momento di controllare i produttori Kafka e le connessioni WebSocket.

 

Controllo dei log di CryptoFeed

Se non abbiamo dati nei topic, è il momento di controllare i registri di CryptoFeed. Il gestore del feed produce i log all’avvio di main.py, che mostra se le connessioni WebSocket sono riuscite e se si sono verificati errori durante la produzione dei produttori Kafka. Questi registri sono archiviati nella stessa directory del file del docker-compose.

Nota: LEADER_NOT_AVAILABLE è un errore comune che si verifica a causa di un host broker improprio. Se vogliamo separare ClickHouse e Kafka in diverse macchine host, questo sarà un probabile errore. Questo link descrive come si verifica l’errore e come risolverlo.

Conclusione

Questo tutorial copre il primo e più essenziale passaggio nella creazione di una piattaforma di analisi delle criptovalute, il sistema di raccolta e archiviazione dei dati. Senza i dati è come costruire una casa con un martello e senza chiodi. Inoltre, con un sistema ClickHouse-Kafka (e molto spazio di archiviazione), possiamo facilmente archiviare i dati di trading delle criptovalute da dozzine di  diverse exchange. Il passaggio successivo consiste nel preelaborare i dati con Apache Spark per prepararci alla creazione di modelli di previsione (LSTM e XGBoost) e di inferenza causale, insieme ad alcune analisi esplorative, al fine di ottenere una visione approfondita della microstruttura del mercato degli exchange di criptovalute.

Web scraping automatizzato con Python e Celery

scienzadeidati articoli - Web scraping automatizzato Python Celery

Questa è la 2° parte del tutorial che descrive come creare uno strumento di web scraping con Python. In questo articolo vediamo come integrare Celery, un sistema di gestione delle attività, nel nostro progetto di scraping web.

La 1° parte, Creazione di uno scraper di feed RSS con Python, descrive come utilizzare Requests e Beautiful Soup.

La 3° parte di questa serie, Realizzazione di un’applicazione di web scraping con Python, Celery e Django, descrive come integrare uno strumento di web scraping nelle applicazioni web.

Requisiti

Nell’articolo precedente, abbiamo creato un semplice lettore di feed RSS che estrae informazioni da HackerNews utilizzando Requests e BeautifulSoup (vedi il codice su GitHub).

In questo articolo usiamo il codice come base per la creazione di un sistema di gestione delle attività e lo scraping pianificato.
Il successivo passaggio nella raccolta di dati da siti Web che cambiano frequentemente (ad esempio, un feed RSS che mostra un numero X di elementi alla volta), è quello di eseguire lo scraping su base regolare. Nell’esempio di scraping precedente, abbiamo utilizzato la riga di comando per eseguire il nostro codice; tuttavia, questa non è una soluzione scalabile. Per automatizzare l’esecuzione si può usare Celery per creare un sistema di pianificazione delle attività con esecuzione periodica.

Per raggiunge questo obiettivo dobbiamo utilizzare i seguenti strumenti:

Nota: tutte le dipendenze della libreria sono elencate nei file requirements.txt e Pipfile/ Pipfile.lock.

Come funziona Celery

Celery è un sistema di gestione delle attività, opera in combinazione con un broker di messaggi per svolgere un lavoro asincrono .

articoli - web scraping Celery

Quanto sopra illustra che il nostro produttore di attività (la nostra app di scraping web) passerà le informazioni sulle attività alla coda (Celery) per essere eseguite. Lo scheduler (Celery beat) li eseguirà come cron job, senza alcun lavoro aggiuntivo o interazione al di fuori dell’avvio dell’app Celery.

Obiettivi

Di seguito uno schema dei passaggi da prevedere per creare la nostra applicazione di scraping automatizzato:

  1. Installazione di Celery e RabbitMQ: Celery gestisce l’accodamento e l’esecuzione delle attività, mentre RabbitMQ gestirà i messaggi avanti e indietro
  2. Come avviare RabbitMQ e comprendere i log
  3. Creazione di un proof-of-concept “Hello World” con Celery per verificarne il funzionamento.
  4. Registrazione delle funzioni di scraping in tasks.py con Celery
  5. Sviluppare e gestire ulteriormente le attività di scraping
  6. Creazione ed esecuzione di una pianificazione per l’attività di scraping

Nota: le introduzioni a RabbitMQ e Celery sono piuttosto lunghe, se si ha già esperienza con questi strumenti si può saltare direttamente al punto 4 .

Inizializzazione

Iniziamo aprendo la directory del progetto, in questo caso è Web_Scraping_RSS_Celery_Django dell’articolo precedente. Se lo desideri, può essere clonato da GitHub tramite il link precedentemente menzionato.

Nota: stiamo usando Ubuntu, quindi i comandi potrebbero differire a seconda del sistema operativo.

Inoltre, per brevità, abbiamo omesso parti di codice replicato, usando ….
Infine, i requisiti del progetto possono essere installati utilizzando il comando pip come  nel seguente esempio.

				
					$ pip install celery
				
			


Perché Celery e RabbitMQ? Perché non un'altra tecnologia?

Utilizziamo Celery e RabbitMQ perché sono abbastanza semplici da configurare, testare e ridimensionare in un ambiente di produzione. Sebbene possiamo eseguire un’attività periodica utilizzando altre librerie, o semplicemente con i cron job generali, vogliamo costruire  qualcosa da riutilizzare nei prossimi progetti.

Una soluzione sarà più duratura e richiede meno manutenzione se usiamo una tecnologia che possiamo scalare nel prossimo progetto, imparando alcuni comandi e strumenti chiave man mano che aumentiamo gradualmente la complessità.

Configurazione di RabbitMQ

Far funzionare un server RabbitMQ è molto più semplice su Ubuntu rispetto a un ambiente Windows. Seguiremo la documentazione  ufficiale della guida all’installazione.

Ecco i comandi di installazione per Debian e Ubuntu:

				
					$ sudo apt-get update -y

$ sudo apt-get install curl gnupg -y

$ curl -fsSl https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc | sudo apt-key add -

$ sudo apt-get install apt-transport https

$ sudo tee /etc/apt/sources.list.d/bintray.rabbitmq.list <<EOF

$ deb https://dl.bintray.com/rabbitmq-erlang/debian bionic erlang

$ deb https://dl.bintray.com/rabbitmq/debian bionic main

$ EOF

$ sudo apt-get update -y

$ sudo apt-get install rabbitmq-server -y --fix-missing
				
			

Dopo aver installato RabbitMQ per la prima volta in un ambiente virtuale, si avvia automaticamente. Per verificare che il comando rabbitmq-server funzioni (quello da usare con Celery), è necessario disattivare il servizio.

Da notare che le autorizzazioni predefinite per l’installazione prevedono che Only root or rabbitmq should run rabbitmqctl shutdown, che può risultare strano. Per risolverlo, possiamo semplicemente eseguire il comando con sudo.

				
					$ sudo rabbitmqctl shutdown
				
			

E’ quindi possibile testare il server utilizzando il comando rabbitmq-server,

				
					$ sudo rabbitmq-server
				
			

Si ottiene il seguente output

articoli - web scraping Celery 1

NOTA: si può terminare il comando rabbitmq-server usando Control+C.

La configurazione di RabbitMQ su Windows richiede passaggi aggiuntivi. La documentazione ufficiale contiene una guida per l’installazione manuale.

Testare Celery con RabbitMQ

Prima di immergersi nella codifica di ogni progetto, è buona norma testare gli esempi di base in stile “Hello World” per i pacchetti e framework previsti dal progetto. Questo permette di acquisire una comprensione di base di cosa è possibile aspettarsi, insieme ad alcuni comandi del terminale da aggiungere alla cassetta degli attrezzi per quella specifica tecnologia.

In questo caso, Celery viene fornito con il proprio proof-of-concept “Hello World” sotto forma di un’attività (task) che effettua una semplice addizione aritmetica. Questo è disponibile in forma estesa sulla documentazione ufficiale di Celery. Di seguito viene descritto brevemente; tuttavia, se si desidera spiegazioni o approfondimenti, è consigliato leggere la documentazione ufficiale.

Ora che abbiamo installato e convalidato il broker RabbitMQ, possiamo iniziare a creare il file tasks.py. Questo contiene tutte le attività (task) da eseguire, sia che si tratti di una operazione aritmetica, di uno scraping web o di un salvataggio di utenti in un database. Vediamo ora le modifiche all’interno della directory del progetto.

				
					$ touch tasks.py
				
			

Per creare un task per effettuare un’addizione aritmetica, dobbiamo importare la libreria Celery e creare una funzione con il flag @app.task per consentire ai worker di Celery di ricevere l’attività nel sistema di code.

				
					# tasks.py

from celery import Celery

app = Celery('tasks') # defining the app name to be used in our flag

@app.task # registering the task to the app
def add(x, y):
    return x + y
				
			

Utilizzando il task add, possiamo iniziare a testarne l’esecuzione. È qui che le cose potrebbero iniziare a confondersi, poiché nel passaggio successivo sono necessari 3 terminali aperti contemporaneamente.

Iniziamo con una rapida spiegazione, quindi descriviamo il codice e visualizzare le schermate.

Spiegazione

Per completare il  test, eseguiamo il task Celery usando la riga di comando importando il tasks.py e  poi lanciarlo. Affinché i task siano ricevuti dalla coda, dobbiamo avere attivi i servizi Celery worker e RabbitMQ. Il server RabbitMQ funge da nostro broker di messaggi mentre il  worker Celery esegue le attività.

Indichiamo ogni passaggio con i numeri di terminale:

  1. Rabbit MQ
  2. Worker di Celery
  3. Esecuzione del task


Terminale #1

Iniziamo avviando il server RabbitMQ nel terminale n. 1.

				
					# RabbitMQ

$ sudo rabbitmq-server
				
			
articoli - web scraping Celery 2

Terminale #2

Successivamente, possiamo avviare il processo del worker Celery nel terminale n. 2. Descriviamo anche le impostazioni dettagliate per il  worker in modo da illustrare come apparirà l’output.

Nota: deve essere eseguito dalla directory del progetto.

				
					# Celery worker

$ celery -A tasks worker -l INFO
				
			

Analizziamo il precedente comando:

  • celery – La libreriache stiamo chiamando
  • -A tasks – Si esplicita che vogliamo l’app  tasks
  • worker – Avvio del processo worker
  • -l INFO – Garantisce di avere un dettagliato log degli eventi nella console

Per verificare se il worker è stato avviato correttamente, è necessario avere la riga concurrency: 4 (prefork) nel terminale.

Inoltre, notiamo che l’app [tasks] è stata importata, insieme alla registrazione dei task presenti in tasks.py. Il worker ha registrato una singola attività (1), tasks.add.

articoli - web scraping Celery 3

Terminale #3:

Successivamente, possiamo iniziare l’esecuzione del test nel terminale n. 3. Eseguiamo un ciclo per eseguire più task che vengono catturati dal  servizio worker. Lo realizziamo importando add da tasks.py, e quindi eseguendo un ciclo for.

Nota: dopo la riga add.delay(i, i), si deve usare Control+Invio per eseguire il comando.

				
					$ python
>>> from tasks import add # pulling in add from tasks.py
>>> for i in range(1000):
...    add.delay(i, i) # delay calls the task
				
			

Ora possiamo a vedere un grande blocco di output nel terminale n. 3 (l’esecuzione dell’attività Celery).

Questo mostra che il worker sta ricevendo il risultato dell’attività dal terminale n. 2.

articoli - web scraping Celery 4

Terminale 2:

Se controlliamo il worker Celery nel terminale n. 2, il processo che esegue l’addizione aritmetica, vediamo che sta rilevando ciascuna delle esecuzioni dell task.

articoli - web scraping Celery 5

Abbiamo dimostrato con successo che Celery e RabbitMQ sono installati correttamente. Questo getta le basi per gli altri task che  vogliamo implementare (es. web scraping) dimostrando come interagiscono Celery, Celery worker e RabbitMQ.
Ora che abbiamo coperto l’installazione e le nozioni di base, ci addentreremo nella tasks.pycreazione delle nostre attività di scraping web.

Creazione del task.py con Celery per il web scraping

L’esempio descritto nel paragrafo precedente ha aiutato a descrivere il processo da usare per eseguire i task utilizzando Celery, dimostrando anche come i task sono registrati tramite i worker di Celery.

Basandoci sul questo esempio, iniziamo creando i task di scraping. A tale scopo, per semplicità possiamo copiare il codice  dello script scraping.py, descritto nella 1° parte di questo tutorial, nel file tasks.py.

Quindi possiamo rimuovere la funzione def add(x, y) dell’esempio e copiare l’importazione delle librerie (Requests e BeautifulSoup), e le funzioni di scraping.

Nota: usiamo le stesse funzioni, solamente che sono state copiate in tasks.py.

				
					# tasks.py

from celery import Celery
import requests # pulling data
from bs4 import BeautifulSoup # xml parsing
import json # exporting to files

app = Celery('tasks')

# save function
def save_function(article_list):
    with open('articles.txt', 'w' as outfile:
        json.dump(article_list, outfile)
        
# scraping function
def hackernews_rss():
    article_list = []
    try:
        # execute my request, parse the data using the XML 
        # parser in BS4
        r = requests.get('https://news.ycombinator.com/rss')
        soup = BeautifulSoup(r.content, features='xml')
        # select only the "items" I want from the data      
        articles = soup.findAll('item')
        # for each "item" I want, parse it into a list
        for a in articles:
            title = a.find('title').text
            link = a.find('link').text
            published = a.find('pubDate').text
            # create an "article" object with the data
            # from each "item"
            article = {
                'title': title,
                'link': link,
                'published': published
                }
            # append my "article_list" with each "article" object
            article_list.append(article)
        # after the loop, dump my saved objects into a .txt file
        return save_function(article_list)
    except Exception as e:
        print('The scraping job failed. See exception: ')
        print(e)
				
			

Le funzioni di web scraping sono ora in tasks.py, insieme alle loro dipendenze. Il passaggio successivo è registrare i task con la app Celery, semplicemente specificando il decorator @app.task prima di ogni funzione.

				
					# tasks.py

... # same as above

@app.task
def save_function(article_list):
    ...
    
@app.task
def hackernews_rss():
    ...
				
			

Miglioramenti alle funzioni di scraping

Sebbene le funzioni di scraping già implementate abbiano dimostrato di funzionare per estrarre i dati dal feed RSS di HackerNews, abbiamo ancora qualche margine di miglioramento. Di seguito abbiamo descritto le modifiche da fare all’interno del set di strumenti di scraping prima di automatizzare l’esecuzione dello script.

  1. Salvare l’output in file .json con data e ora.
  2. Aggiungere una data  e ora created_at per ogni articolo.
  3. Aggiungere una  stringa source, nel caso in cui desideriamo fare lo scraping di altri siti

Le precedenti modifiche sono semplici perchè abbiamo già effettuato la maggior parte del lavoro di implementazione durante il primo articolo. Sebbene non sia significativo, lo scambio in .json sarà un po’ più facile da leggere rispetto a .txt. Inoltre i due campi aggiuntivi contengono informazione che rendono l’applicazione più “scalabile” quando si aggiungono altri feed per lo scraping e quando si analizzano i dati in un secondo momento.

Iniziamo con aggiornare la funzione save_function per produrre un file .json e aggiungere un timestamp. In questo modo si migliora la qualità del lavoro quando dobbiamo riutilizzare dati di scraping precedenti.

				
					# tasks.py

from datetime import datetime # for time stamps

... 

def save_function(articles_list):
    # timestamp and filename
    timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
    filename = 'articles-{}.json'.format(timestamp)
    # creating our articles file with timestamp
    with open(filename, 'w').format(timestamp) as outfile:
        json.dump(article_list, outfile)
        
				
			

Stiamo usando la funzione datetime.now().strftime(...) per creare un timestamp da aggiungere al file usando .format(timestamp).

Passando alle due modifiche all’interno della funzione hackernews_rss(), aggiungiamo alcune informazioni sulla fonte (HackerNews RSS) e un timestamp created_at. Si tratta di due semplici modifiche che ci aiutano a differenziare le informazioni nel caso si voglia aggiungere ulteriori funzioni di scraping.

				
					# tasks.py

...

def hackernews_rss():
    ...
        for a in articles:
            ...
            article = {
                ...
                'created_at': str(datetime.now()),
                'source': 'HackerNews RSS'
                }
        ...
				
			
Le modifiche precedenti illustrano l’aggiunta dei campi created_at e source. Un rapido suggerimento: se ometti amo la transizione str() lo script non verrà eseguito a causa di un errore Object of type datetime is not JSON serializable.

Pianificazione dei task con Celery:

Ora sfruttiamo i poteri di pianificazione dei task di Celery utilizzando beat_schedule. Questo ci consente di registrare i task con l’agente di pianificazione in orari specifici.
Un’ottima guida per la schedulazione di task è sicuramente all’interno della documentazione ufficiale . Ci sono anche alcuni esempi di pianificazione aggiuntivi  nel file tasks.py del repository GitHub di questo tutorial.

Prevediamo di eseguire il task di scraping ogni minuto, in modo da dimostrere come il worker Celery interagisce con  i task pianificati. Ciò non comporterà dati diversi, poiché questo feed RSS non viene aggiornato ogni minuto con  nuovi dati.
L’obiettivo di questo  esempio è mostrare l’ouput dei file degli articoli e una semplice pianificazione dei task.

Creazione dello scheduler

				
					# tasks.py

... 

from celery.schedules import crontab # scheduler

# scheduled task execution
app.conf.beat_schedule = {
    # executes every 1 minute
    'scraping-task-one-min': {
        'task': 'tasks.hackernews_rss',
        'schedule': crontab()
    }
}
...
				
			

La configurazione di cui sopra registra le pianificazioni dei task all’interno della stessa app Celery. All’avvio, siamo in grado di chiamare lo scheduler di Celery per ricontrollare ciò che è stato messo in coda.

Esecuzione dei task

Ora che il codice è stato completato, è ora di accendere il server RabbitMQ e avviare i  worker Celery.
Per questo esempio, utilizziamo 2 schede del terminale:

  1. Server RabbitMQ
  2. Worker di Celery

 

Terminale #1

Per avviare il server RabbitMQ (il broker di messaggi), usiamo lo stesso comando descritto in precedenza.

Nota: si può anche prevedere di ricontrollare che il nodo creato all’avvio sia spento, poiché non si avvia con i log e genererà un errore se non viene terminato in precedenza.

				
					$ sudo rabbitmqctl shutdown
$ sudo rabbitmq-server
				
			

Si ottiene un output simile a l precedente esempio (come il seguente screenshot).

Una volta avviato il server RabbitMQ, possiamo iniziare con il terminale n. 2.

Terminale #2

Modifichiamo leggermente il comando, poiché ora include una notazione per -B la quale chiama il worker che esegue il beat scheduler.

				
					$ celery -A tasks worker -B -l INFO
				
			
L’output della console mostra l’avvio dell’applicazione e (in attesa della schedulazione prevista) stampa le informazioni relative all’esecuzione dei task. Nel seguente screenshot:
  • Si registra il [tasks]
  • Avvio dello scheduler beat
  • Il worker MainProcess riceve un’attività Received task: tasks.hackernews_rss
  • Avvio di ForkPoolWorker ed esecuzione del task, quindi restituisce un risultato
articoli - web scraping Celery 7

Suggerimento: possiamo interrompere l’esecuzione del task pianificato con Control + C, dato che il task viene eseguito a tempo indeterminato.

Dopo che il task è stato eseguito correttamente, la funzione save_function() ha generato il file .json.

Conclusione

In questo articolo abbiamo ampliato con successo un semplice strumento di web scraping in modo di prevedere una pianificazione dell’esecuzione. Ciò garantisce che non abbiamo più bisogno di eseguire manualmente lo script di scraping e che possiamo “impostarlo e dimenticarlo”. Tramite la schedulazione dello script, l’applicazione sarà in grado di analizzare i siti alla ricerca di dati che cambiano in base a una pianificazione prestabilita (ad esempio, ogni 15 minuti) e restituire ogni volta nuovi dati.

Quali sono i prossimi passi?

Nella parte 3, descriviamo un’applicazione Django con integrazione di Celery e web scraping. Questo è un ottimo esempio di un’applicazione web che effettua lo scraping e popola un sito web con informazioni dello scraping. Il prodotto finale sarà un aggregatore di notizie che estrae da più feed RSS.

Ulteriori letture

Questo articolo ha descritto molte informazioni sull’esecuzione dei task tramite numerosi esempi. Sebbene siamo stati in grado di effettuare alcune esecuzioni programmate, non siamo entrati nei dettagli di ciascuna delle tecnologie che abbiamo utilizzato. Ecco alcuni ottimi articoli (alcuni su Medium, altri no) che approfondiscono maggiormente la magia di Celery.

Creazione di uno scraper di feed RSS con Python

scienzadeidati articoli - Costruire uno scraper di feed RSS con Python

Questo articolo è la prima parte 1 di un tutorial relativo alla creazione di un’applicazione di web scraping con Python. In questo articolo ci concentriamo ad approfondire le librerie Requests e BeautifulSoup.

Nella 2° parte del tutorial vedremo come schedulare lo scraping tramite Celery.

Nella 3° parte descriviamo come realizzazione di un’applicazione web scraping con Django.

Il codice di questo articolo è disponibile pubblicamente su GitHub.

Requisiti

Ho già utilizzato lo scraping web in diversi modi all’interno dei miei progetti, che si tratti di raccolta di dati per l’analisi, creazione di notifiche quando i siti vengono modificati o creazione di applicazioni web.

Questo articolo illustrerà un semplice  di feed RSS per HackerNews. Il feed RSS, disponibile a questo link, prevede aggiornamenti a intervalli regolari per i  nuovi post e attività sul sito.

Sebbene ciò possa essere ottenuto utilizzando un lettore RSS, questo è inteso come un  semplice esempio di utilizzo di Python, che può essere adattato facilmente ad altri siti Web.

In questo esempio usiamo:

Obiettivi

Di seguito uno schema dei passaggi da prevedere per creare la nostra applicazione:

  1. Creazione della directory  del progetto e del file scraping.py.
  2. Verificare che possiamo eseguire il ping del feed RSS che sarà oggetto dello scraping.
  3. Scraping del contenuto XML del sito.
  4. Analisi del contenuto utilizzando BS4.
  5. Output del contenuto in un file .txt

Inizializzazione

Il primo passo è la creazione della directory del progetto e accedere a quella directory tramite riga di comando. Una volta all’interno della directory di progetto, creiamo il file di progetto.

Nota: in questo caso stiamo usando Ubuntu, quindi questi comandi potrebbero differire a seconda del sistema operativo.

            $ mkdir web_scraping_example && cd web_scraping_example

$ touch scraping.py
        
Successivamente è necessario installare le librerie Python previste dai requisiti. Si può usare il comando pip, come nell’esempio seguente:
            $ pip install requests
$ pip install bs4
        

Importazione delle librerie

Ora che le basi del progetto sono state impostate, possiamo iniziare a scrivere il codice del web scraping.

All’interno del file scraping.py, si deve importare le librerie che abbiamo installato utilizzando pip.

            # scraping.py

import requests
from bs4 import BeautifulSoup
        

Quanto sopra ci consente di utilizzare le funzioni fornite dalle librerie Requests e BeautifulSoup.

Ora possiamo iniziare a testare la capacità di eseguire il ping del feed RSS di HackerNews e scrivere lo script di scraping.

Nota: di seguito non includeremo le righe di importazione. Tutto il codice che rimane uguale verrà annotato come ‘…’ sopra o sotto le nuove righe.

Testare la richiesta

Quando eseguiamo lo scraping web, iniziamo inviando una richiesta a un sito web. Per assicurarci di essere in grado di eseguire lo scraping, dobbiamo verificare che  possiamo stabilre una connessione al sito.

Iniziamo creando la nostra funzione di scraping di base. Questo sarà ciò a cui eseguiremo

            # scraping.py

# library imports
...

# scraping function
def hackernews_rss('https://news.ycombinator.com/rss'):
    try:
        r = requests.get()
        return print('The scraping job succeeded: ', r.status_code)
    except Exception as e:
        print('The scraping job failed. See exception: ')
        print(e)
        
        
print('Starting scraping')
hackernews_rss()
print('Finished scraping')
        

Nel codice di cui sopra,  si richiama la funzione requests.get(...) della libreria Requests per recuperare il sito Web da analizzare. Quindi si stampa a video lo stato della richiesta, contenuto nel parametro r.status_code, per verificare che il sito Web sia stato chiamato correttamente.
Inoltre, abbiamo inserito tutto dentro un try: except: in modo da rilevare eventuali errori che si potremmo verificare in seguito.

Una volta eseguito lo script, si ottiene un codice di stato pari a 200. Questo conferma che siamo in grado di eseguire il ping del sito e “ottenere” informazioni.

            $ python scraping.py

Starting scraping
The scraping job succeeded: 200
Finsihed scraping
        

Scraping del contenuto

Il nostro script ha restituito un codice di stato pari a 200, siamo pronti per iniziare ad estrarre il contenuto XML dal sito. A tal fine, dobbiamo utilizzare BS4 insieme a Requests.
            # scraping.py

...

def hackernews_rss():
    try:
        r = requests.get('https://news.ycombinator.com/rss')
        soup = BeautifulSoup(r.content, features='xml')
        
        return print(soup)
...
        

Questo codice assegne il contenuto XML acquisito da HackerNews alla variabile soup. Stiamo usando r.content per passare l’XML restituito a BeautifulSoup, che analizzeremo nel prossimo paragrafo.

Una cosa fondamentale da notare è che stiamo sfruttando il parametro features='xml', questo parametro varia a seconda dei dati da acquisire (ad esempio, in un scraper HTML si dichiarerà come ‘HTML’).

L’output di quanto sopra sarà un gran disordine di contenuti che non ha molto senso. Questo è solo per visualizzare quali informazioni stiamo estraendo con successo dal sito web.

Analisi dei dati

Abbiamo descritto come possiamo estrarre l’XML dal feed RSS di HackerNews. E’ quindi il momento di analizzare le informazioni.

Abbiamo scelto di usare il feed RSS perché è molto più semplice da analizzare rispetto alle informazioni html del sito Web, poiché non dobbiamo preoccuparci degli elementi HTML nidificati e di individuare le informazioni esatte.

Iniziamo osservando la struttura del feed:

            <item>
    <title>...</title>
    <link>...</link>
    <pubDate>...</pubDate>
    <comments>...</comments>
    <description>...</description>
</item>
        

Ciascuno dei tag disponibili sul feed RSS segue la struttura di cui sopra, contenente tutte le informazioni all’interno dei tag <item>...</item>.

Sfruttiamo la coerenza dei tag item per analizzare le informazioni.

            # scraping.py 

...

def hackernews_rss():
    article_list = []
    try:
        r = requests.get('https://news.ycombinator.com/rss')
        soup = BeautifulSoup(r.content, features='xml')
        articles = soup.findAll('item')        
        for a in articles:
            title = a.find('title').text
            link = a.find('link').text
            published = a.find('pubDate').text
            article = {
                'title': title,
                'link': link,
                'published': published
                }
            article_list.append(article)
        return print(article_list)
...
        

In questo codice, effettuiamo un controllo di tutti gli item in articles = soup.findAll('item'). Questo consente di estrarre ciascuno dei tag <item>...</item> dall’XML che abbiamo acquisito.

Ciascuno degli articoli viene analizzato utilizzando il ciclo: for a in articles:, questo consente di analizzare le informazioni in variabili separate e aggiungerle a un dizionario vuoto che abbiamo creato prima del ciclo.
BS4 ha scomposto l’XML in una stringa, consentendoci di chiamare la funzione .find() per cercare i tag in ciascuno degli oggetti. Usando .text siamo in grado di estrarre gli elementi <tag>...</tag> e salvare solamente il testo.
I risultati sono inseriti in un elenco tramite il comando article_list.append(article) in modo da potervi accedere in seguito.

Infine inseriamo la stampa della lista degli articoli in modo da visualizzare un output durante l’esecuzione dello script di scraping.

Output in un file

Il feed RSS è stato ora inviato correttamente in una funzione print() per visualizzare l’elenco prodotto a termine del parsing. Ora possiamo inserire i dati in un file .txt, in modo che possano riessere utilizzati in future analisi e altre attività relative ai dati. Utilizziamo la libreria JSON per la visualizzazione dei dati più semplice per noi; tuttavia, descriviamo anche un esempio senza la libreria JSON.

Iniziamo creando un’altra funzione def save_function(): che elabora l’elenco prodotto dalla funzione hackernews_rss(). In questo modo sarà più facile apportare modifiche in futuro.

            # scraping.py
import json

...

def save_function(article_list):
    with open('articles.txt', 'w') as outfile:
        json.dump(article_list, outfile)

...
        

Quanto sopra utilizza la libreria JSON per scrivere l’output dello scraping nel file articles.txt. Questo file verrà sovrascritto durante l’esecuzione dello script.

Un altro metodo per scrivere nel file .txt prevede un ciclo for:

            # scraping.py

...

def save_function(article_list):
    with open('articles.txt', 'w') as f:
        for a in article_list:
            f.write(a+'\n')
        f.close()
        
...
        
Ora che abbiamo creato la funzione save_function(), possiamo adattare la funzione di scraping per salvare i dati.
            # scraping.py

...

def hackernews_rss():
    ...
    try:
        ...
        return save_function(article_list)

...
        

Modificando il comando return print(article_list) in return save_function(article_list) siamo in grado di inserire i dati in un file .txt.

L’esecuzione dello script produrrà ora un file .txt dei dati acquisiti dal feed RSS di HackerNews.

Conclusione

Abbiamo creato con successo uno script Python per il di scraping dei feed RSS utilizzando Requests e BeautifulSoup. Questo ci consente di analizzare le informazioni XML in un formato leggibile con cui lavorare in futuro.

Questo script è la base per costruire alcune funzionalità interessanti:

  • Scraping di informazioni più complesse utilizzando elementi HTML.
  • Utilizzo di Selenium per lo scraping di siti dove il rendering lato client rende inutile l’uso di Requests.
  • Creazione di un’applicazione Web che acquisirà i dati dello scraping e li visualizzerà (ad esempio, un aggregatore)
  • Estrazione di dati da siti Web a seconda di una pianificazione e schedulazione.


Sebbene questo articolo abbia  descritto le basi del web scraping, possiamo iniziare ad approfondire alcuni dettagli.

Possiamo inserire questo script all’interno di una sequenza di altri script, schedulati tramite  Celery, oppure possiamo aggregare le informazioni su un’applicazione web utilizzando Django.

Questo articolo fa parte di una serie in 3 parti in cui descriviamo semplici esempi di web scraping e aggregazione su base programmata.

Come manipolare date e ore in Python

scienzadeidati articoli - Python Datetime

Quando si usa Python, arriva sempre il momento di capire come “manipolare” i dati temporali e in particolare la data e l’ora.

In questo articolo descriviamo le principali funzioni relative a l modulo DateTime, già compreso nell’installazione base di Python. Se cerchi qualcosa di semplice ma pratico con esempi, continua a leggere!

In particolare vediamo tutti e 7 i seguenti argomenti più popolari:

  • Data e ora di oggi in diversi formati
  • Conversione da stringa a data
  • La differenza nel calcolo di DateTime
  • Datetime più/meno un’ora specifica
  • Confronto data/ora
  • Impostazioni fusi orari
  • Unix timestamp / calcolo del tempo dell’epoca

 

Data e ora di oggi in diversi formati

Vediamo la manipolazione più semplice. Di seguito è riportato il codice che stampa l’anno, il mese, la data, l’ora, i minuti, i secondi e i millisecondi correnti.

Queste sono informazioni utili, ma spesso ne abbiamo bisogno solo in parte. Possiamo stampare le diverse parti di seguito.
            In: from datetime import datetime    
    d = datetime.now() #today's datetime
    d

Out:datetime.datetime(2019, 12, 22, 13, 14, 18, 193898)
        
            In:  print(d.weekday()) #day of week - Monday is 0 and Sunday is 6
     print(d.year)
     print(d.month)
     print(d.day)
     print(d.hour)
     print(d.minute)
     print(d.second)

Out: 6
     2019
     12
     22
     13
     14
     18
        

Inoltre, potrebbero essere necessari formati specifici della data/ora. Possiamo utilizzare il seguente elenco per personalizzare diversi formati di data. Questo può essere considerato un modo per convertire la data in stringa.

L’elenco completo dei formati può essere consultato in questa pagina.

articolo python datetime - format date
            In:  print(d.strftime("%A %d/%m/%Y")) # date to string

Out: Sunday 22/12/2019
        

 

Conversione da stringa a data

Di seguito sono riportati esempi che mostrano due stringhe convertite in formato data:

            In:  date_string = '2016-02-01 12:00PM'
     print(datetime.strptime(date_string, '%Y-%m-%d %I:%M%p'))

Out: 2016-02-01 12:00:00

        
            In:  date_string = '02/01/2016'
     d2 = datetime.strptime(date_string, '%m/%d/%Y')
     print(d2)

Out: 2016-02-01 00:00:00
        

 

La differenza nel calcolo di DateTime

L’esempio seguente stampa la differenza in giorni (ad esempio, tra oggi e 1 febbraio 2016):

            In:  from datetime import timedelta     
     
     d = datetime.now()
     date_string = '2/01/2016'
     d2 = datetime.strptime(date_string, '%m/%d/%Y')
     
     print(d - d2)

Out: 1420 days, 13:18:27.386763
        
Possiamo anche stampare solo la differenza tra due date-ora in giorni, settimane o anni, ecc.
            In:  date_diff = (d - d2)/timedelta(days=1)
     print('date_diff = {} days'.format(date_diff))

Out: date_diff = 1420.5544836430902 days

        
            In:  date_diff = (d - d2)/timedelta(weeks=1)
     print('date_diff = {} weeks'.format(date_diff))

Out: date_diff = 202.93635480615575 weeks

        
            In:  date_diff = (d - d2)/timedelta(days=365)
     print('date_diff = {} years'.format(date_diff))

Out: date_diff = 3.8919300921728497 years
        

 

Datetime più/meno un’ora specifica

Facciamo un “viaggio nel tempo” con diversi intervalli di tempo di secondi, minuti, ore, giorni, settimane o anni:

            In:  print(d + timedelta(seconds=1)) # today + one second     
     print(d + timedelta(minutes=1)) # today + one minute     
     print(d + timedelta(hours=1)) # today + one hour  
     print(d + timedelta(days=1)) # today + one day
     print(d + timedelta(weeks=1)) # today + one week
     print(d + timedelta(days=1)*365) # today + one year

Out: 2019-12-22 13:18:28.386763
     2019-12-22 13:19:27.386763
     2019-12-22 14:18:27.386763
     2019-12-23 13:18:27.386763
     2019-12-29 13:18:27.386763
     2020-12-21 13:18:27.386763
        

 

Confronto data/ora

I confronti tra le date sono semplici e sono effettuati con i soliti simboli di confronto:

            In:  print(d < (d2 +(timedelta(days=365*6)))) # d is no more than 6 years (assume each year has 365 days) after d2?
     print(d > (d2 +(timedelta(weeks=52*6)))) # d is more than 6 years (assume each year has 52 weeks) after d2?
     print(d != d2) # d2 is not the same date as d?
     print(d == d2) # d2 is the same date as d?

Out: True
     False
     True
     False
        

 

Impostazioni fusi orari

Possiamo anche confrontare l’ora in diversi fusi orari, come Toronto e Shanghai:

            In:  import pytz
     timezone = pytz.timezone("America/Toronto")
     dtz = timezone.localize(d)
     print(dtz.tzinfo)
     print(dtz)

Out: America/Toronto
     2019-12-22 13:18:27.386763-05:00

        
            In:  shanghai_dt = dtz.astimezone(pytz.timezone("Asia/Shanghai"))
     print(shanghai_dt)

Out: 2019-12-23 02:18:27.386763+08:00

In:  (dtz - shanghai_dt)/timedelta(days=1) # same datetimes

Out: 0.0
        
            In:  (dtz - shanghai_dt)/timedelta(days=1) # same datetimes

Out: 0.0
        
Per visualizzare l’intero elenco dei diversi fusi orari si può utilizzare il codice sottostante:
            In:  for tz in pytz.all_timezones:
        print(tz)

Out: Africa/Abidjan
     Africa/Accra
     Africa/Addis_Ababa
(Stampate solo le prime 3 come esempio)
        

Unix timestamp / calcolo del tempo dell’epoca

I timestamp sono comunemente usati per i file nei sistemi operativi Unix. Spesso vengono visualizzati anche nei set di dati.

Innanzitutto, possiamo ottenere il timestamp Unix corrente:

            In:  from datetime import timezone

     dt_now = datetime.now(timezone.utc)
     print(dt_now)
     print(dt_now.tzinfo)
     print(dt_now.timestamp()) # the unix timestamp.

Out: 2019-12-22 18:21:28.681380+00:00
     UTC
     1577038888.68138
        

Inoltre, possiamo convertire il timestamp Unix nel formato DateTime:

            In:  utc_timestamp = 1377050861.206272
     unix_ts_dt = datetime.fromtimestamp(utc_timestamp, timezone.utc)

     print(unix_ts_dt)
     print(unix_ts_dt.astimezone(pytz.timezone("America/Toronto")))
     print(unix_ts_dt.astimezone(pytz.timezone("Asia/Shanghai")))

Out: 2013-08-21 02:07:41.206272+00:00
     2013-08-20 22:07:41.206272-04:00
     2013-08-21 10:07:41.206272+08:00