Analisi delle Crypto Valute: Raccogliere i dati con Kafka, ClickHouse e Cryptofeed

scienzadeidati articoli - Raccogliere i dati con Kafka ClickHouse Cryptofeed

In questo post vediamo il primo passo per sviluppare un’infrastruttura di analisi per ricercare adeguatamente le dinamiche del mercato delle criptovalute, la raccolta e l’archiviazione dei dati.

La Microstruttura di mercato è un settore dell’Economia e della Finanza che studia le dinamiche dei mercati finanziari, concentrandosi sugli attori (market maker, speculatori e retail trader) e sulla struttura degli scambi (frammentazione, Dealer vs LOB e trasparenza del mercato).
I dati di cambio tendono a contenere enormi quantità di dati sul trading e sul orderbook (libro degli ordini). Per raccogliere questi dati in modo efficace, dovremo creare un sistema affidabile per gestire dati ad alta frequenza e quantità di dati su larga scala.
Una settimana di dati di trading per un asset può facilmente raggiungere centinaia di milioni di punti dati all’interno di un unico scambio. Sottolineo che non entriamo nei dettagli tecnici relativi a ClickHouse e Kafka. Forniamo solo una spiegazione introduttiva del motivo per cui questi strumenti sono stati selezionati. La priorità di questo articolo è mostrare come costruire dall’inizio alla fine un sistema di raccolta e archiviazione di dati di criptovalute.
Durante il tutorial sono forniti collegamenti per una comprensione più approfondita degli strumenti.

Dove archiviare enormi set di dati? Presentazione di ClickHouse

ClickHouse è un sistema di database basato su colonne creato per aggregare facilmente milioni di punti dati in pochi millisecondi. ClickHouse è un database OLAP (elaborazione analitica online), ciò significa che ClickHouse è specializzato in un’elevata produttività di insert, insert da centinaia di migliaia a milioni di righe alla volta, anziché singoli insert. Il vantaggio è che ClickHouse avrà una velocità impareggiabile per l’analisi, ma non potrà supportare singoli insert continui. È qui che entra in gioco Kafka.

Come produrre un’elevata produttività? Presentazione di Kafka

Apache Kafka è un sistema distribuito di messaggistica di pubblicazione-sottoscrizione progettato per un elevato throughput. Quando si raccolgono dati di trading ad alta frequenza, è fondamentale disporre di uno strumento che possa essere veloce, tollerante ai guasti e scalabile. Kafka soddisfa tutte queste esigenze ed è realizzato da una combinazione ben equilibrata di produttori, consumatori e intermediari.
Gli intermediari, i broker fungono da tramite tra i produttori (invio di dati) e i consumatori (ricezione di dati). I broker lo fanno archiviando i messaggi in argomenti, diversi tipi di messaggi, come transazioni, dati Iot, tracciamenti, ecc. L’argomento è ciò che determina il percorso dal produttore al consumatore.

Da dove ottenere i dati? Presentazione di CryptoFeed

Cryptofeed, creata da Bryant Moscon, è una libreria Python che utilizza asyncio e WebSocket per fornire feed di trade di criptovaluta da un’ampia varietà di exchange. Con Cryptofeed, è possibile aggiungere più flussi di dati da exchange di cryptovalute in un feed unificato. Cryptofeed fornisce molti esempi di come  può essere integrato con più backend, incluso Kafka.

Configurazione di Kafka e ClickHouse con Docker

Ora è il momento di realizzare il nostro sistema di raccolta e archiviazione dei dati, utilizzando ClickHouse per l’archiviazione di dati di massa e Kafka per un’efficace acquisizione dei dati in ClickHouse. Per una facile riproducibilità e distribuzione, tutto questo sarà inserito all’interno di containers di Docker,

Step 1: installazione di  Docker e Docker Compose

Utilizziamo Docker per la distribuzione di ClickHouse e Kafka, quindi ti consiglio di avere l’ambiente Docker già installato prima di continuare.

Step 2: Creazione del Docker Compose per Clickhouse-Kafka

Il primo passo è creare una directory in cui memorizzare i nostri file docker, il file dei requisiti e il nostro main.py. In questo tutorial è denominata  CryptoDB.
Dopo essere entrati nella directory, creiamo un file docker-compose.yml e modifichiamo il file di composizione come di seguito. Descriviamo ogni passaggio in dettaglio.

            version: '3.9'

services: 

  clickhouse-server:
    image: yandex/clickhouse-server
    ports: 
     - "8123:8123"
     - "8124:9000"
    volumes:
      - clickhouse-volume:/var/lib/clickhouse
  
  cryptofeed:
    build:
      context: .
      dockerfile: Dockerfile
    volumes:
      - ./:/app
    depends_on:
    - zookeeper
    - kafka

  zookeeper:
    image: wurstmeister/zookeeper:3.4.6
    expose:
    - "2181"

  kafka:
    image: wurstmeister/kafka:2.11-2.0.0
    depends_on:
    - zookeeper
    ports:
    - "9092:9092"
    expose:
    - "9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
     
volumes:
  clickhouse-volume:
        

ClickHouse-Server: il servizio che crea il nostro database ClickHouse ed espone le porte 8123 e 9000 per HTTP/TCP. Tutti i dati verranno archiviati su un volume denominato, clickhouse-volume.

Zookeeper: necessario per la gestione dei nodi Kafka

Kafka: Il servizio crea il nostro nodo Kafka. Le variabili ambientali sono essenziali per il corretto funzionamento di Kafka e cambieranno in base alla posizione in cui si sta ospitando Kafka. Questo tutorial fornisce un’ottima spiegazione sul significato di queste variabili ambientali. Nel complesso, la maggior parte delle variabili ambientali determina il modo in cui Kafka comunica con altri servizi. La porta principale utilizzata da Kafka è la porta 9092.

CryptoFeed: crea un ambiente Python ed esegue main.py per creare i nostri produttori/argomenti Kafka e avviare connessioni WebSocket.

Dopo aver preparato il nostro Docker-compose, possiamo effettuare l’ultimo step prima di avviare i contenitori: scrivere lo script di cryptofeed e configurare il dockerfile di CryptoFeed.

Preparazione dello script CryptoFeed

 

Step 1: scrittura dello script CryptoFeed

Prima di utilizzare i motori  di ClickHouse e Kafka, dobbiamo creare uno script utilizzando CryptoFeed in modo da definire come strutturare le nostre tabelle in ClickHouse. Raccogliamo i dati di trading di BTCUSD e i dati del book degli ordini limite di livello 2 da due exchanges di criptovalute. BitFinex cattura circa il 3,5% del volume totale in 24 ore in tutto il mondo e Binance ne cattura circa il 25,5%. Selezioniamo questi due exchange per catturare dinamiche interessanti grazie alla grande frammentazione dei mercati delle criptovalute. Enormi exchange come Binance possono avere un comportamento microstrutturale di mercato unico rispetto a exchange più piccoli, come BitFinex.

            
from cryptofeed import FeedHandler
from cryptofeed.backends.kafka import BookKafka, TradeKafka
from cryptofeed.defines import L2_BOOK, TRADES, L3_BOOK
from cryptofeed.exchanges import Coinbase, Binance, Bitfinex


def main():
    f = FeedHandler()
    HOSTNAME = 'kafka'
    cbs = {TRADES: TradeKafka(bootstrap=HOSTNAME), L2_BOOK: BookKafka(bootstrap=HOSTNAME), L3_BOOK: BookKafka(bootstrap=HOSTNAME)}

    # Add trade and lv 2 bitcoin data to Feed
    f.add_feed(Bitfinex(max_depth=25, channels=[TRADES, L2_BOOK], symbols=['BTC-USD'], callbacks=cbs))
    f.add_feed(Binance(max_depth=25, channels=[TRADES, L2_BOOK], symbols=['BTC-BUSD'], callbacks=cbs))
    
    # Example of how to extract level 3 order book data
    # f.add_feed(Coinbase(max_depth=25, channels=[TRADES, L2_BOOK, L3_BOOK], symbols=['BTC-USD'], callbacks=cbs))

    f.run()


if __name__ == '__main__':
    main()
        

Nota: il nome dell’host deve essere lo stesso del servizio Kafka specificato nel file docker-compose, altrimenti l’impostazione predefinita prevede l’utilizzo di ‘localhost’. Localhost non funziona nel nostro approccio multi-container.

Aggiungiamo un feed al nostro gestore di feed, che si collegherà alle API del nostro exchange tramite websocket e quindi invierà i dati  di trading tramite i produttori Kafka ai nostri broker Kafka che memorizzano i nostri argomenti. CryptoFeed crea il proprio produttore e argomento Kafka quando utilizza TradeKafka e BookKafka.

Ci sarà un argomento unico per ogni canale, exchange e simbolo, quindi in totale creiamo 4 argomenti corrispondenti a 2 exchange (Binance e BitFinex), 2 canali (TRADES e L2_BOOK) e un simbolo (BTC-USD).

La struttura degli argomenti sono:

<channel>-<EXCHANGE>-<SYMBOL> (es. trade-COINBASE-BTC-USD)

Abbiamo anche inserito un esempio di codice per aggiungere i dati del orderbook di livello tre da CoinBase, ma commentato. Sentiti libero di esplorare CryptoFeed , ma tieni presente che in questo articolo non descriviamo come raccogliere i dati dell’orderbook di livello 3.

Dato che utilizziamo i dati del book L2 e dei dati di trade, la struttura delle tabelle finali è simile a questa.

Trades: {feed, symbol, timestamp, receipt_timestamp, side, amount, price, order_type, id}

Book: {timestamp, receipt_timestamp, delta, bid[0–25], ask[0–25], _topic}

Sia il lato bid che quello ask dell’order book avranno una profondità massima di 25 livelli di prezzo.

Un’ulteriore colonna _topic, l’argomento Kafka, viene aggiunta alla tabella del orderbook perché i dati originali non contengono l’exchange e il simbolo. _topic verrà utilizzato come metodo di filtraggio per analisi successive.

 

Step 2: Creazione del dockerfile di CryptoFeed

Affinché lo script CryptoFeed possa essere eseguito sul nostro container, abbiamo bisogno di un ambiente Python con tutti i pacchetti necessari, possiamo creare l’ambiente che desideriamo. Di seguito descriviamo come crearlo.

            FROM python:3.7-slim

WORKDIR /app
COPY . .

# upgrade pip, install requirements and update permissions for main.py
RUN pip install --upgrade pip \
    && pip install -r ./requirements.txt \
    && chmod +x /app/main.py

# Create CryptoFeed Kafka Producers
CMD ["python", "/app/main.py"]
        

Il dockerfile utilizza solo un’immagine python slim standard, installa i requisiti ed esegue lo script main.py. Molto semplice.

I requisiti sono:

            cryptofeed==1.9.3
aiokafka==0.7.2
        

Configurazione di Clickhouse e Kafka

Il passaggio successivo consiste nel creare i sottoscrittori/consumatori per acquisire e quindi archiviare i dati nelle tabelle ClickHouse. Abbiamo usato questa utile guida per la configurazione di Kafka-ClickHouse e descriviamo nel dettaglio di seguito.

Affinché una tabella ClickHouse raccolga dati da un argomento Kafka è necessario completare tre passaggi:

  • Creare le tabelle  finali MergeTree in ClickHouse per archiviare i dati per l’analisi a lungo termine
  • Creare tabelle del motore Kafka per strutturare i dati degli argomenti nel formato ClickHouse
  • Generare una vista materializzata per spostare i dati dalle tabelle del motore Kafka alle tabelle MergeTree

Step 1: creazione di tabelle MergeTree ClickHouse

Accediamo al server ClickHouse tramite CLI o tramite uno strumento di gestione database (es: Dbeaver e HouseOps).
Innanzitutto creiamo un nuovo database per archiviare le tabelle degli exchange di criptovalute

            CREATE DATABASE CryptoCrypt 
        

Quindi creiamo la tabella MergeTree dell’Orderbook L2 e la tabella MergeTree dei Trades. È molto importante usare il tipo corretto per ciascuna colonna. Guarda qui se hai bisogno di esplorare quali tipi usare.

            CREATE TABLE CryptoCrypt.Trades (
        feed String,
        symbol String,
        trade_date DateTime64(3),
        receipt_timestamp DateTime64(3),
        side String,
        amount Float64,
        price Float64,
        order_type Nullable(String), 
        id UInt64
        ) Engine = MergeTree
PARTITION BY toYYYYMM(trade_date)
ORDER BY (feed, trade_date);


CREATE TABLE CryptoCrypt.L2_orderbook (
        trade_date DateTime64(3),
        receipt_timestamp DateTime64(3),
        delta Int8,
        bid Map(String, String),
        ask Map(String, String)
        ) Engine = MergeTree
PARTITION BY toYYYYMM(trade_date)
ORDER BY (trade_date);

ALTER TABLE CryptoCrypt.L2_orderbook
  ADD COLUMN _topic String
        

Una cosa importante da notare per la tabella L2_orderbook è la complessità per memorizzare i bid e gli ask. Il tipo di mappa deve essere in grado di memorizzare i dati {price:volume} a una profondità di 25 livelli. La mappa non supporta i tipi float, quindi per preservare quei dati la chiave del prezzo e il valore del volume devono essere archiviati come stringhe. I tipi possono essere facilmente riportati alla normalità nella fase di elaborazione dei dati. Se prevediamo di utilizzare i dati L3, viene aggiunto un ulteriore livello di complessità perché dovremmo annidare le colonne bid e ask per memorizzare le sotto-colonne di ID ordine, prezzo e coppie di volume.

Infine dobbiamo evidenziare che per aggiungere la colonna virtuale, _topic, è necessario aggiungere la colonna DOPO la creazione della tabella e prevedere tale colonna solo nella tabella MergeTree e non nella tabella Kafka.

Step 2: Creazione di tabelle nel motore Kafka

Creiamo le tabelle del motore Kafka con la stessa struttura delle tabelle MergeTree. Ogni tabella del motore Kafka sottoscrive due argomenti in base ai dati che sta raccogliendo (trades o orderbook) e un broker che memorizza l’argomento. A fini di test locali, il broker Kafka si trova in Kafka:9092. Poiché ClickHouse e Kafka condividono la stessa rete, possiamo utilizzare “Kafka” come host.

            CREATE TABLE CryptoCrypt.trades_queue (
        feed String,
        symbol String,
        trade_date DateTime64(3),
        receipt_timestamp Float64,
        side String,
        amount Float64,
        price Float64,
        order_type Nullable(String), 
        id UInt64
        ) ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka:9092',
       kafka_topic_list = 'trades-BINANCE-BTC-BUSD, trades-BITFINEX-BTC-USD',
       kafka_group_name = 'trades_group1',
       kafka_format = 'JSONEachRow',
       kafka_row_delimiter = '\n';

CREATE TABLE CryptoCrypt.books_queue (
        trade_date Float64,
        receipt_timestamp Float64,
        delta Int8,
        bid Map(String, LowCardinality(Float64)),
        ask Map(String, LowCardinality(Float64))
        ) ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka:9092',
       kafka_topic_list = 'book-BINANCE-BTC-BUSD, book-BITFINEX-BTC-USD',
       kafka_group_name = 'trades_group1',
       kafka_format = 'JSONEachRow',
       kafka_row_delimiter = '\n';
        

Per avere maggiori dettagli sul funzionamento delle impostazioni di un motore Kafka, si può consultare questa guida. Inoltre, da notare come è stata risolta la complessità bid/ask utilizzando un trucco per assimilare la struttura dei dati {price: volume}. Poiché l’argomento archivia {prezzo:volume} come tipi {stringa:float}, è necessario utilizzare LowCardinality come soluzione alternativa per archiviare temporaneamente i dati prima che vengano inseriti in batch nella tabella MergeTree. Generalmente LowCardinality non è in grado di gestire enormi set di dati, ma in questo caso possiamo utilizzarlo poiché viene utilizzato solo per archiviare temporaneamente i dati in una tabella Kafka.

Step 3: Trasferire i dati tra tabelle con viste materializzate

L’ultimo passaggio consiste nel creare la vista materializzata che fungerà da meccanismo di inserimento tra le tabelle MergeTree e le tabelle Kafka. Possiamo osservare che la funzione cast viene utilizzata per convertire le colonne bid e ask nei corretti tipi di dati e viene richiesta la colonna virtuale _topic.

Non è necessario che le colonne virtuali siano già nella tabella Kafka per inserirle nella tabella MergeTree.

            CREATE MATERIALIZED VIEW CryptoCrypt.trades_queue_mv TO CryptoCrypt.Trades AS
SELECT *
FROM CryptoCrypt.trades_queue;

CREATE MATERIALIZED VIEW CryptoCrypt.books_queue_mv TO CryptoCrypt.L2_orderbook AS
SELECT trade_date, receipt_timestamp, delta, cast(bid, 'Map(String, String)') as bid , cast(ask, 'Map(String, String)') as ask, _topic 
FROM CryptoCrypt.books_queue;
        

Tramite le viste materializzate, i dati che erano stati archiviati negli argomenti inizieranno a essere archiviati nelle tabelle MergeTree finali.

Risoluzione dei problemi e test

Ora che il sistema di raccolta e archiviazione dei dati è in esecuzione, è tempo di verificare che tutto funzioni. Partiamo dai metodi di risoluzione dei problemi più semplici per rilevare i problemi e aumentare la complessità se il problema persiste.

Semplici controlli con ClickHouse

Possiamo verificare che tutte le tabelle siano state create e verificare se sono presenti dati nelle nostre tabelle MergeTree finali. Possiamo usare ClickHouse CLI o una console SQL su Dbeaver.

            # Show all tables, should have 2 kafka, 2 MergeTree and 2 views
SHOW TABLES FROM CryptoCrypt

# Check count of MergeTree tables
SELECT COUNT() FROM CryptoCrypt.L2_orderbook
SELECT COUNT() FROM CryptoCrypt.Trades

# Check if data is correct format 
SELECT * FROM CryptoCrypt.L2_orderbook LIMIT 5
SELECT * FROM CryptoCrypt.Trades LIMIT 5

# If no data in MergeTree tables check Kafka tables
SELECT * FROM CryptoCrypt.books_queue LIMIT 5
SELECT * FROM CryptoCrypt.trades_queue LIMIT 5
        

Se non ci sono dati nelle tabelle Kafka, è probabile che sia dovuto a un problema del produttore Kafka oppure una delle colonne è stata digitata in modo errato. Se viene visualizzato un ‘cannot parse error‘ è probabilmente che sia presente una errata digitazione, in caso contrario è probabilmente dovuto all’utilizzo di un erratto elenco di broker Kafka durante la creazione delle tabelle Kafka.

E’ necessario assicurarsi sempre che l’elenco di broker Kafka = ascoltatori pubblicati da Kafka.


Controllo dei broker e degli argomenti di Kafka

Se i problemi di cui sopra non si verificavano per la mancanza di dati nelle tabelle, è tempo di verificare se sono presenti dati archiviati nei topic. Per verificare se i topic sono stati creati e se stanno memorizzando dei dati possiamo accedere alla bash del contenitore docker di Kafka ed eseguire alcuni comandi

            # From your terminal run: MAKE SURE TO SET KAFKA CONTAINER NAME
> docker exec -i -t -u root $(docker ps | grep NAME_OF_KAFKA_CONTAINER | cut -d' ' -f1) /bin/bash

# List all topics
bash> $KAFKA_HOME/bin/kafka-topics.sh --list --bootstrap-server kafka:9092

# Check if a topic is collecting messages 
bash> $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic book-BITFINEX-BTC-USD --from-beginning

        

Se tutto funziona correttamente, nel terminale vediamo tutti e quattro i topic, insieme a migliaia di messaggi al secondo. Mentre se vediamo i nostri dati, è necessario tornare alla risoluzione dei problemi di ClickHouse e controllare i log, altrimenti è il momento di controllare i produttori Kafka e le connessioni WebSocket.

 

Controllo dei log di CryptoFeed

Se non abbiamo dati nei topic, è il momento di controllare i registri di CryptoFeed. Il gestore del feed produce i log all’avvio di main.py, che mostra se le connessioni WebSocket sono riuscite e se si sono verificati errori durante la produzione dei produttori Kafka. Questi registri sono archiviati nella stessa directory del file del docker-compose.

Nota: LEADER_NOT_AVAILABLE è un errore comune che si verifica a causa di un host broker improprio. Se vogliamo separare ClickHouse e Kafka in diverse macchine host, questo sarà un probabile errore. Questo link descrive come si verifica l’errore e come risolverlo.

Conclusione

Questo tutorial copre il primo e più essenziale passaggio nella creazione di una piattaforma di analisi delle criptovalute, il sistema di raccolta e archiviazione dei dati. Senza i dati è come costruire una casa con un martello e senza chiodi. Inoltre, con un sistema ClickHouse-Kafka (e molto spazio di archiviazione), possiamo facilmente archiviare i dati di trading delle criptovalute da dozzine di  diverse exchange. Il passaggio successivo consiste nel preelaborare i dati con Apache Spark per prepararci alla creazione di modelli di previsione (LSTM e XGBoost) e di inferenza causale, insieme ad alcune analisi esplorative, al fine di ottenere una visione approfondita della microstruttura del mercato degli exchange di criptovalute.

Creare un’API di dati azionari utilizzando il Web Scraping e FastAPI

scienzadeidati articoli - Creare API di dati finanziari utilizzando Web Scraping e FastAPI

Per la stragrande maggioranza delle API REST Python che ho creato, ho usato Flask o Django. Questi due framework hanno costituito le fondamenta delle API REST di Python da ormai molti anni. Di recente, tuttavia, sono emersi alcuni nuovi strumenti. Uno di questi framework ha suscitato un notevole clamore: FastAPI. In questo articolo esploreremo  alcune funzionalità di questo strumento.


Il progetto

La prima cosa da fare è elaborare un nuovo progetto. Volevo trovare un progetto che fosse interessante ma semplice, un progetto che mi permettesse di sperimentare il flusso di lavoro di FastAPI evitando dettagli confusi. Dopo averci pensato un po’, ho deciso di creare un’API REST per la finanza.

Questo progetto sarà composto da due parti:

  • Un web scraper per ottenere dati finanziari e,
  • Un’API REST che offre l’accesso ai dati.

Pertanto, ho diviso questo articolo in due. Sentiti libero di saltare intorno alle parti che trovi più interessanti.


Il setup

Prima di iniziare, dovremmo occuparci della parte più divertente dell’avvio di un nuovo progetto: la creazione del nostro ambiente. Sì, che divertimento!

Useremo alcune librerie, quindi consiglio di creare un nuovo ambiente virtuale.
Ecco un elenco di dipendenze che utilizzeremo:

  • FastAPI (ovviamente è nel titolo!)
  • Uvicorn
  • Requests
  • BeautifulSoup4

Possiamo eseguire un comando per installarli tutti in una volta:

            pip install fastapi uvicorn requests beautifulsoup4
        

Per fortuna, questo è sufficiente per la configurazione. Immergiamoci subito nel codice!

Parte I: Il Web Scraper

Quando costruiamo un web scraper, dobbiamo prima di tutto rispondere a tre domande: dove , cosa e come.

Dove?

Esistono diverse risorse online dove è possibile trovare dati finanziari. Uno di questi è Yahoo Finance. Questo sito Web ha una pletora di dati finanziari organizzati per azienda e quindi per categoria. Useremo Yahoo Finance come obiettivo del nostro scraping.

Cosa?

Per ogni società quotata su Yahoo Finance, esiste una sezione riepilogativa. Questa sezione è ciò che utilizzeremo per ottenere i nostri dati.

Come?

Rispondere ai quesiti “dove” e “cosa” è piuttosto semplice, ma rispondere alla domanda “come” richiede un po’ di lavoro. Affronteremo questa domanda nelle seguenti sezioni.

Trovare il selettore CSS di un elemento.

Se apriamo Yahoo Finance e scegliamo un titolo, apparirà la pagina di riepilogo predefinita. Vogliamo raschiare il prezzo corrente del titolo e tutto nella tabella riepilogativa (tutto da “Chiusura precedente” a “Stima target 1A”).

Per lo scraping di questi dati, dobbiamo trovare il selettore CSS di ciascun elemento. Possiamo farlo tramite:

  • Aprire gli strumenti di sviluppo di Chrome premendo F12.
  • Premere Ctrl + Maiusc + C per abilitare il selettore di elementi.
  • Evidenziare e fare clic su un elemento. Questo evidenzierà l’HTML dell’elemento nel pannello Elementi.
  • Fare clic con il pulsante destro del mouse sull’HTML nel pannello Elementi e selezionare Copia > Copia selettore.

Per ciascuno dei nostri elementi desiderati, ripetiamo i passaggi 3 e 4.

Il file ‘scrape.json’

Una volta che abbiamo determinato i selettori CSS per ogni elemento, dobbiamo posizionarli in un posto utile. Per questo, creeremo un file chiamato scrape.json.

Alla fine, vogliamo archiviare i nostri dati in un dizionario Python. Per fare ciò, dobbiamo mappare le chiavi sui valori. In altre parole, dobbiamo sapere quali dati sono associati a quale chiave. scrape.json implementa questa mappatura.

La struttura di scrape.json è la seguente:

            {
    "elements": [
        {
            "from": "",
            "to": ""
        },
        {
            "from": "",
            "to": ""
        },
        {
            "from": "",
            "to": ""
        }
    ]
}
        

Ogni oggetto JSON in elements ha due attributi: from e to.

  • from è il selettore CSS di un elemento di cui vogliamo i dati.
  • to è la chiave con cui verranno archiviati i dati di questo elemento.

Di seguito il scrape.json compilato:

            {
    "elements": [
        {
            "from": "[class='Trsdu\\(0\\.3s\\) Fw\\(b\\) Fz\\(36px\\) Mb\\(-4px\\) D\\(ib\\)']",
            "to": "price"
        },
        {
            "from": "[data-test='PREV_CLOSE-value'] [class]",
            "to": "prev_close"
        },
        {
            "from": "[data-test='OPEN-value'] [class]",
            "to": "open"
        },
        {
            "from": "[data-test='BID-value'] [class]",
            "to": "bid"
        },
        {
            "from": "[data-test='ASK-value'] [class]",
            "to": "ask"
        },
        {
            "from": "[data-test='DAYS_RANGE-value']",
            "to": "days_range"
        },
        {
            "from": "[data-test='FIFTY_TWO_WK_RANGE-value']",
            "to": "52_week_range"
        },
        {
            "from": "[data-test='TD_VOLUME-value'] [class]",
            "to": "volume"
        },
        {
            "from": "[data-test='AVERAGE_VOLUME_3MONTH-value'] [class]",
            "to": "avg_volume"
        },
        {
            "from": "[data-test='MARKET_CAP-value'] [class]",
            "to": "market_cap"
        },
        {
            "from": "[data-test='BETA_5Y-value'] [class]",
            "to": "beta"
        },
        {
            "from": "[data-test='PE_RATIO-value'] [class]",
            "to": "pe"
        },
        {
            "from": "[data-test='EPS_RATIO-value'] [class]",
            "to": "eps"
        },
        {
            "from": "[data-test='EARNINGS_DATE-value']",
            "to": "earnings_date"
        },
        {
            "from": "[data-test='DIVIDEND_AND_YIELD-value']",
            "to": "dividend_and_yield"
        },
        {
            "from": "[data-test='EX_DIVIDEND_DATE-value'] span",
            "to": "ex_dividend_yield"
        },
        {
            "from": "[data-test='ONE_YEAR_TARGET_PRICE-value'] [class]",
            "to": "target_est"
        }
    ]
}
        

Codificare lo scraping.

Dopo aver compilato il file scrape.json con le nostre mappature, è il momento di codificare lo scraper.
Ci sono quattro passaggi che il nostro scraper deve compiere:

  • Creare un URL che punti alla pagina web di Yahoo Finance del titolo
  • Scaricare la pagina web
  • Analizzare la pagina web
  • Estrarre e mappare i dati desiderati (Sì, so cosa stai pensando, tecnicamente sono due passaggi. Non possiamo essere tutti perfetti.)

Diamo un’occhiata al codice e poi lo esaminiamo sezione per sezione.

            import requests
from bs4 import BeautifulSoup

class Scrape():

    def __init__(self, symbol, elements):
        url = "https://finance.yahoo.com/quote/" + symbol

        r = requests.get(url)
        if(r.url != url): # redirect occurred; likely symbol doesn't exist or cannot be found.
            raise requests.TooManyRedirects()

        r.raise_for_status()
        
        self.soup = BeautifulSoup(r.text, "html.parser")

        self.__summary = {}

        for el in elements["elements"]:
            tag = self.soup.select_one(el["from"])

            if tag != None:
                self.__summary[el["to"]] = tag.get_text()

    def summary(self):
        return self.__summary
        

Nella riga 7 (fase 1) si costruisce un URL.

Nella riga 9 (fase 2) sta scaricando la pagina web del titolo azionario.

Nelle righe 10-13 si controlla se si sono verificati errori o reindirizzamenti durante la fase 2. Se si è verificato un reindirizzamento, significa che il simbolo di borsa non esiste o non è corretto; trattiamo questo scenario come un errore.

Nella riga 15 (fase 3) si analizza la pagina web.

Nelle righe 17-23 (fase 4) è dove avviene la magia. Qui, le mappature in scrape.json sono usate per costruire un dizionario, __summary. Ogni elemento è selezionato con self.soup.select_one(el["from"]). Ai dati dell’elemento (testo) viene quindi assegnata una chiave con self.__summary[el["to"]] = tag.get_text().

Da buoni programmatori che siamo, dovremmo testare il nostro codice prima di continuare. Creiamo main.py e aggiungiamo il seguente codice di test:

            from Scrape import Scrape
import json

elements_to_scrape = {}

f = open("scrape.json")
data = f.read()
f.close()
elements_to_scrape = json.loads(data)

s = Scrape("AAPL", elements_to_scrape)
print(s.summary())
        

Con un po’ di fortuna, dovremmo vedere un dizionario Python popolato con la stampa dei dati sullo schermo. Questo è tutto per il web scraper! Ora possiamo cambiare marcia e iniziare a lavorare sulla nostra API REST.

 

Parte II: L’API REST

L’idea di base della nostra API REST sarà la seguente:

  • La nostra API REST avrà un endpoint, che prenderà un simbolo di borsa come input.
  • Il nostro scaper utilizzerà il simbolo per raccogliere i dati di riepilogo finanziario del titolo.
  • L’endpoint restituirà i dati di riepilogo in formato JSON.

Ora che abbiamo l’idea di base, iniziamo a programmare.

 

Configurazione FastAPI

FastAPI è facile da eseguire. Aggiungiamo il seguente codice a main.py:

            from fastapi import FastAPI

app = FastAPI()

@app.get("/root")
def root():
    return "Hello World!"
        

Per avviare FastAPI, possiamo emettere il seguente comando:

uvicorn main:app --reload

Analizziamo questo comando:

  • main: si riferisce al nostro file main.py.
  • app: si riferisce all’app FastAPI creata con main.py la linea app = FastAPI().
  • --reload: indica al server di riavviarsi quando ha rilevato modifiche al codice.

A parte: uvicorn è un’interfaccia ASGI (Asynchronous Server Gateway Interface) veloce che FastAPI utilizza per l’esecuzione. Non approfondiremo l’uvicorn, ma se sei interessato a saperne di più, puoi controllare il loro sito web .

Ora che abbiamo avviato il server, possiamo andare su http://127.0.0.1:8000/docs. Qui troveremo la documentazione dell’API interattiva automatica. Possiamo utilizzare questo strumento per verificare la corretta funzionalità dei nostri endpoint. Poiché al momento abbiamo solo l’endpoint root, non c’è molto da testare. Iniziamo con le modifiche.

 

Definizione di un endpoint

Il passaggio successivo consiste nell’aggiungere l’endpoint di riepilogo. Come accennato in precedenza, prenderà un simbolo azionario e restituirà il foglio di riepilogo finanziario di quel titolo.

Aggiungiamo questo codice in main.py

            @app.get("/v1/{symbol}/summary/")
def summary(symbol):
    summary_data = {}
    try:
        s = Scrape(symbol, elements_to_scrape)
        summary_data = s.summary()
        
    except TooManyRedirects:
        raise HTTPException(status_code=404, 
              detail="{symbol} doesn't exist or cannot be found.")
    except HTTPError:
        raise HTTPException(status_code=500, 
              detail="An error has occurred while processing the request.")

    return summary_data
        

Qui, definiamo un endpoint in /v1/{symbol}/summary/. Si usa {symbol} per denotare un parametro di percorso. Questo parametro viene passato al nostro metodo endpoint e quindi nella nostra istanza dell’oggetto Scrape, s. Assegniamo quindi a summary_datail il risultato di s.summary(). Se non si verificano eccezioni, si restituisce summary_data.

NOTA: il tipo di contenuto predefinito di FastAPI è application/JSON. Poiché s.summary() restituisce un oggetto dizionario, FastAPI lo converte automaticamente in JSON. Quindi, non è necessario eseguire questa conversione manualmente.

 

FastAPI ci consente di definire un metodo di start-up, che ovviamente verrà eseguito all’avvio del nostro server. Quindi si trasferisce il nostro  codice di caricamento scrape.json, da testing, all’interno di questo metodo.

            @app.on_event("startup")
def startup():
    f = open("scrape.json")
    data = f.read()
    f.close()
    global elements_to_scrape
    elements_to_scrape = json.loads(data)
        

È buona norma inserire metodi più lenti e utilizzabili una tantum nella funzione di avvio. In questo modo, evitiamo di rallentare i nostri tempi di risposta eseguendo codice non necessario durante una richiesta.

A questo punto, dovremmo avere un API REST funzionante. Possiamo testarlo tornando allo strumento endpoint e digitando un titolo.

            {
  "price": "135.37",
  "prev_close": "135.13",
  "open": "134.35",
  "bid": "135.40 x 1000",
  "ask": "135.47 x 1300",
  "days_range": "133.69 - 135.51",
  "52_week_range": "53.15 - 145.09",
  "volume": "60,145,130",
  "avg_volume": "102,827,562",
  "market_cap": "2.273T",
  "beta": "1.27",
  "pe": "36.72",
  "eps": "3.69",
  "earnings_date": "Apr 28, 2021 - May 03, 2021",
  "dividend_and_yield": "0.82 (0.61%)",
  "ex_dividend_yield": "Feb 05, 2021",
  "target_est": "151.75"
}
        

Filtraggio

Tecnicamente, abbiamo realizzato tutto ciò che ci eravamo prefissati di fare. Tuttavia, c’è un punto di attenzione. Ogni volta che richiediamo un riepilogo delle scorte, viene restituito l’intero foglio di riepilogo. E se volessimo solo pochi punti dati selezionati? Sembra un terribile spreco richiedere un intero oggetto riassuntivo quando sono necessari solo pochi punti dati. Per risolvere questo problema, dobbiamo implementare un filtro. Fortunatamente, FastAPI offre una soluzione: le query.

Per aggiungere il filtro, è necessario modificare il metodo summary dell’endpoint per gestire le query. Ecco come farlo:

            @app.get("/v1/{symbol}/summary/")
def summary(symbol, q: Optional[List[str]] = Query(None)):
    summary_data = {}
    try:
        s = Scrape(symbol, elements_to_scrape)
        summary_data = s.summary()
        if(q != None):
            # if all query parameters are keys in summary_data
            if all (k in summary_data for k in q): 
                # summary_data keeps requested key-value pairs
                summary_data = {key: summary_data[key] for key in q}
            # else, return whole summary_data

    except TooManyRedirects:
        raise HTTPException(status_code=404, detail="{symbol} doesn't exist or cannot be found.")
    except HTTPError:
        raise HTTPException(status_code=500, detail="An error has occurred while processing the request.")

    return summary_data
        

l parametro q: Optional[List[str]] = Query(None) dice a FastAPI che dovrebbe aspettarsi, facoltativamente, come query una List di String.

Le righe 8 e 9 controllano le stringhe di query rispetto a tutte le chiavi nel dizionario summary_dat. Se anche i parametri della query sono chiavi, manteniamo solo quelle coppie chiave-valore. In caso contrario, se uno dei parametri della query non è una chiave, viene restituito l’intero oggetto.

NOTA: per fare pratica, è possibile modificare la funzionalità del metodo precedente per restituire solo le coppie chiave-valore dei parametri di query valide. Puoi anche scegliere di sollevare un’eccezione se un parametro di query non corrisponde a nessuna delle chiavi.

Testiamo il nostro nuovo codice interrogando solo price e open.

            {
  "price": "135.37",
  "open": "134.35"
}
        

Conclusione

In questo articolo abbiamo creato un web scraper per recuperare i dati finanziari da Yahoo Finance durante questo articolo e abbiamo creato un API REST per servire questi dati. È dannatamente buono, e dovremmo essere orgogliosi di noi stessi.

Comunque per oggi è tutto. Come sempre, tutto il codice può essere trovato sul mio GitHub.

Grazie per aver letto. Spero che tu abbia imparato qualcosa e ti sia divertito!
Ci vediamo tutti la prossima volta.

 

Riferimenti

[1] Documentazione Beautiful Soup – https://www.crummy.com/software/BeautifulSoup/bs4/doc/

[2] FastAPI – https://fastapi.tiangolo.com/

[3] Requests: HTTP for HumansTM – https://2.python-requests.org/en/master/

[4] Uvicorn – https://www.uvicorn.org/

[5] Yahoo Finance – https://finance.yahoo.com/

Web scraping automatizzato con Python e Celery

scienzadeidati articoli - Web scraping automatizzato Python Celery

Questa è la 2° parte del tutorial che descrive come creare uno strumento di web scraping con Python. In questo articolo vediamo come integrare Celery, un sistema di gestione delle attività, nel nostro progetto di scraping web.

La 1° parte, Creazione di uno scraper di feed RSS con Python, descrive come utilizzare Requests e Beautiful Soup.

La 3° parte di questa serie, Realizzazione di un’applicazione di web scraping con Python, Celery e Django, descrive come integrare uno strumento di web scraping nelle applicazioni web.

Requisiti

Nell’articolo precedente, abbiamo creato un semplice lettore di feed RSS che estrae informazioni da HackerNews utilizzando Requests e BeautifulSoup (vedi il codice su GitHub).

In questo articolo usiamo il codice come base per la creazione di un sistema di gestione delle attività e lo scraping pianificato.
Il successivo passaggio nella raccolta di dati da siti Web che cambiano frequentemente (ad esempio, un feed RSS che mostra un numero X di elementi alla volta), è quello di eseguire lo scraping su base regolare. Nell’esempio di scraping precedente, abbiamo utilizzato la riga di comando per eseguire il nostro codice; tuttavia, questa non è una soluzione scalabile. Per automatizzare l’esecuzione si può usare Celery per creare un sistema di pianificazione delle attività con esecuzione periodica.

Per raggiunge questo obiettivo dobbiamo utilizzare i seguenti strumenti:

Nota: tutte le dipendenze della libreria sono elencate nei file requirements.txt e Pipfile/ Pipfile.lock.

Come funziona Celery

Celery è un sistema di gestione delle attività, opera in combinazione con un broker di messaggi per svolgere un lavoro asincrono .

articoli - web scraping Celery

Quanto sopra illustra che il nostro produttore di attività (la nostra app di scraping web) passerà le informazioni sulle attività alla coda (Celery) per essere eseguite. Lo scheduler (Celery beat) li eseguirà come cron job, senza alcun lavoro aggiuntivo o interazione al di fuori dell’avvio dell’app Celery.

Obiettivi

Di seguito uno schema dei passaggi da prevedere per creare la nostra applicazione di scraping automatizzato:

  1. Installazione di Celery e RabbitMQ: Celery gestisce l’accodamento e l’esecuzione delle attività, mentre RabbitMQ gestirà i messaggi avanti e indietro
  2. Come avviare RabbitMQ e comprendere i log
  3. Creazione di un proof-of-concept “Hello World” con Celery per verificarne il funzionamento.
  4. Registrazione delle funzioni di scraping in tasks.py con Celery
  5. Sviluppare e gestire ulteriormente le attività di scraping
  6. Creazione ed esecuzione di una pianificazione per l’attività di scraping

Nota: le introduzioni a RabbitMQ e Celery sono piuttosto lunghe, se si ha già esperienza con questi strumenti si può saltare direttamente al punto 4 .

Inizializzazione

Iniziamo aprendo la directory del progetto, in questo caso è Web_Scraping_RSS_Celery_Django dell’articolo precedente. Se lo desideri, può essere clonato da GitHub tramite il link precedentemente menzionato.

Nota: stiamo usando Ubuntu, quindi i comandi potrebbero differire a seconda del sistema operativo.

Inoltre, per brevità, abbiamo omesso parti di codice replicato, usando ….
Infine, i requisiti del progetto possono essere installati utilizzando il comando pip come  nel seguente esempio.

				
					$ pip install celery
				
			


Perché Celery e RabbitMQ? Perché non un'altra tecnologia?

Utilizziamo Celery e RabbitMQ perché sono abbastanza semplici da configurare, testare e ridimensionare in un ambiente di produzione. Sebbene possiamo eseguire un’attività periodica utilizzando altre librerie, o semplicemente con i cron job generali, vogliamo costruire  qualcosa da riutilizzare nei prossimi progetti.

Una soluzione sarà più duratura e richiede meno manutenzione se usiamo una tecnologia che possiamo scalare nel prossimo progetto, imparando alcuni comandi e strumenti chiave man mano che aumentiamo gradualmente la complessità.

Configurazione di RabbitMQ

Far funzionare un server RabbitMQ è molto più semplice su Ubuntu rispetto a un ambiente Windows. Seguiremo la documentazione  ufficiale della guida all’installazione.

Ecco i comandi di installazione per Debian e Ubuntu:

				
					$ sudo apt-get update -y

$ sudo apt-get install curl gnupg -y

$ curl -fsSl https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc | sudo apt-key add -

$ sudo apt-get install apt-transport https

$ sudo tee /etc/apt/sources.list.d/bintray.rabbitmq.list <<EOF

$ deb https://dl.bintray.com/rabbitmq-erlang/debian bionic erlang

$ deb https://dl.bintray.com/rabbitmq/debian bionic main

$ EOF

$ sudo apt-get update -y

$ sudo apt-get install rabbitmq-server -y --fix-missing
				
			

Dopo aver installato RabbitMQ per la prima volta in un ambiente virtuale, si avvia automaticamente. Per verificare che il comando rabbitmq-server funzioni (quello da usare con Celery), è necessario disattivare il servizio.

Da notare che le autorizzazioni predefinite per l’installazione prevedono che Only root or rabbitmq should run rabbitmqctl shutdown, che può risultare strano. Per risolverlo, possiamo semplicemente eseguire il comando con sudo.

				
					$ sudo rabbitmqctl shutdown
				
			

E’ quindi possibile testare il server utilizzando il comando rabbitmq-server,

				
					$ sudo rabbitmq-server
				
			

Si ottiene il seguente output

articoli - web scraping Celery 1

NOTA: si può terminare il comando rabbitmq-server usando Control+C.

La configurazione di RabbitMQ su Windows richiede passaggi aggiuntivi. La documentazione ufficiale contiene una guida per l’installazione manuale.

Testare Celery con RabbitMQ

Prima di immergersi nella codifica di ogni progetto, è buona norma testare gli esempi di base in stile “Hello World” per i pacchetti e framework previsti dal progetto. Questo permette di acquisire una comprensione di base di cosa è possibile aspettarsi, insieme ad alcuni comandi del terminale da aggiungere alla cassetta degli attrezzi per quella specifica tecnologia.

In questo caso, Celery viene fornito con il proprio proof-of-concept “Hello World” sotto forma di un’attività (task) che effettua una semplice addizione aritmetica. Questo è disponibile in forma estesa sulla documentazione ufficiale di Celery. Di seguito viene descritto brevemente; tuttavia, se si desidera spiegazioni o approfondimenti, è consigliato leggere la documentazione ufficiale.

Ora che abbiamo installato e convalidato il broker RabbitMQ, possiamo iniziare a creare il file tasks.py. Questo contiene tutte le attività (task) da eseguire, sia che si tratti di una operazione aritmetica, di uno scraping web o di un salvataggio di utenti in un database. Vediamo ora le modifiche all’interno della directory del progetto.

				
					$ touch tasks.py
				
			

Per creare un task per effettuare un’addizione aritmetica, dobbiamo importare la libreria Celery e creare una funzione con il flag @app.task per consentire ai worker di Celery di ricevere l’attività nel sistema di code.

				
					# tasks.py

from celery import Celery

app = Celery('tasks') # defining the app name to be used in our flag

@app.task # registering the task to the app
def add(x, y):
    return x + y
				
			

Utilizzando il task add, possiamo iniziare a testarne l’esecuzione. È qui che le cose potrebbero iniziare a confondersi, poiché nel passaggio successivo sono necessari 3 terminali aperti contemporaneamente.

Iniziamo con una rapida spiegazione, quindi descriviamo il codice e visualizzare le schermate.

Spiegazione

Per completare il  test, eseguiamo il task Celery usando la riga di comando importando il tasks.py e  poi lanciarlo. Affinché i task siano ricevuti dalla coda, dobbiamo avere attivi i servizi Celery worker e RabbitMQ. Il server RabbitMQ funge da nostro broker di messaggi mentre il  worker Celery esegue le attività.

Indichiamo ogni passaggio con i numeri di terminale:

  1. Rabbit MQ
  2. Worker di Celery
  3. Esecuzione del task


Terminale #1

Iniziamo avviando il server RabbitMQ nel terminale n. 1.

				
					# RabbitMQ

$ sudo rabbitmq-server
				
			
articoli - web scraping Celery 2

Terminale #2

Successivamente, possiamo avviare il processo del worker Celery nel terminale n. 2. Descriviamo anche le impostazioni dettagliate per il  worker in modo da illustrare come apparirà l’output.

Nota: deve essere eseguito dalla directory del progetto.

				
					# Celery worker

$ celery -A tasks worker -l INFO
				
			

Analizziamo il precedente comando:

  • celery – La libreriache stiamo chiamando
  • -A tasks – Si esplicita che vogliamo l’app  tasks
  • worker – Avvio del processo worker
  • -l INFO – Garantisce di avere un dettagliato log degli eventi nella console

Per verificare se il worker è stato avviato correttamente, è necessario avere la riga concurrency: 4 (prefork) nel terminale.

Inoltre, notiamo che l’app [tasks] è stata importata, insieme alla registrazione dei task presenti in tasks.py. Il worker ha registrato una singola attività (1), tasks.add.

articoli - web scraping Celery 3

Terminale #3:

Successivamente, possiamo iniziare l’esecuzione del test nel terminale n. 3. Eseguiamo un ciclo per eseguire più task che vengono catturati dal  servizio worker. Lo realizziamo importando add da tasks.py, e quindi eseguendo un ciclo for.

Nota: dopo la riga add.delay(i, i), si deve usare Control+Invio per eseguire il comando.

				
					$ python
>>> from tasks import add # pulling in add from tasks.py
>>> for i in range(1000):
...    add.delay(i, i) # delay calls the task
				
			

Ora possiamo a vedere un grande blocco di output nel terminale n. 3 (l’esecuzione dell’attività Celery).

Questo mostra che il worker sta ricevendo il risultato dell’attività dal terminale n. 2.

articoli - web scraping Celery 4

Terminale 2:

Se controlliamo il worker Celery nel terminale n. 2, il processo che esegue l’addizione aritmetica, vediamo che sta rilevando ciascuna delle esecuzioni dell task.

articoli - web scraping Celery 5

Abbiamo dimostrato con successo che Celery e RabbitMQ sono installati correttamente. Questo getta le basi per gli altri task che  vogliamo implementare (es. web scraping) dimostrando come interagiscono Celery, Celery worker e RabbitMQ.
Ora che abbiamo coperto l’installazione e le nozioni di base, ci addentreremo nella tasks.pycreazione delle nostre attività di scraping web.

Creazione del task.py con Celery per il web scraping

L’esempio descritto nel paragrafo precedente ha aiutato a descrivere il processo da usare per eseguire i task utilizzando Celery, dimostrando anche come i task sono registrati tramite i worker di Celery.

Basandoci sul questo esempio, iniziamo creando i task di scraping. A tale scopo, per semplicità possiamo copiare il codice  dello script scraping.py, descritto nella 1° parte di questo tutorial, nel file tasks.py.

Quindi possiamo rimuovere la funzione def add(x, y) dell’esempio e copiare l’importazione delle librerie (Requests e BeautifulSoup), e le funzioni di scraping.

Nota: usiamo le stesse funzioni, solamente che sono state copiate in tasks.py.

				
					# tasks.py

from celery import Celery
import requests # pulling data
from bs4 import BeautifulSoup # xml parsing
import json # exporting to files

app = Celery('tasks')

# save function
def save_function(article_list):
    with open('articles.txt', 'w' as outfile:
        json.dump(article_list, outfile)
        
# scraping function
def hackernews_rss():
    article_list = []
    try:
        # execute my request, parse the data using the XML 
        # parser in BS4
        r = requests.get('https://news.ycombinator.com/rss')
        soup = BeautifulSoup(r.content, features='xml')
        # select only the "items" I want from the data      
        articles = soup.findAll('item')
        # for each "item" I want, parse it into a list
        for a in articles:
            title = a.find('title').text
            link = a.find('link').text
            published = a.find('pubDate').text
            # create an "article" object with the data
            # from each "item"
            article = {
                'title': title,
                'link': link,
                'published': published
                }
            # append my "article_list" with each "article" object
            article_list.append(article)
        # after the loop, dump my saved objects into a .txt file
        return save_function(article_list)
    except Exception as e:
        print('The scraping job failed. See exception: ')
        print(e)
				
			

Le funzioni di web scraping sono ora in tasks.py, insieme alle loro dipendenze. Il passaggio successivo è registrare i task con la app Celery, semplicemente specificando il decorator @app.task prima di ogni funzione.

				
					# tasks.py

... # same as above

@app.task
def save_function(article_list):
    ...
    
@app.task
def hackernews_rss():
    ...
				
			

Miglioramenti alle funzioni di scraping

Sebbene le funzioni di scraping già implementate abbiano dimostrato di funzionare per estrarre i dati dal feed RSS di HackerNews, abbiamo ancora qualche margine di miglioramento. Di seguito abbiamo descritto le modifiche da fare all’interno del set di strumenti di scraping prima di automatizzare l’esecuzione dello script.

  1. Salvare l’output in file .json con data e ora.
  2. Aggiungere una data  e ora created_at per ogni articolo.
  3. Aggiungere una  stringa source, nel caso in cui desideriamo fare lo scraping di altri siti

Le precedenti modifiche sono semplici perchè abbiamo già effettuato la maggior parte del lavoro di implementazione durante il primo articolo. Sebbene non sia significativo, lo scambio in .json sarà un po’ più facile da leggere rispetto a .txt. Inoltre i due campi aggiuntivi contengono informazione che rendono l’applicazione più “scalabile” quando si aggiungono altri feed per lo scraping e quando si analizzano i dati in un secondo momento.

Iniziamo con aggiornare la funzione save_function per produrre un file .json e aggiungere un timestamp. In questo modo si migliora la qualità del lavoro quando dobbiamo riutilizzare dati di scraping precedenti.

				
					# tasks.py

from datetime import datetime # for time stamps

... 

def save_function(articles_list):
    # timestamp and filename
    timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
    filename = 'articles-{}.json'.format(timestamp)
    # creating our articles file with timestamp
    with open(filename, 'w').format(timestamp) as outfile:
        json.dump(article_list, outfile)
        
				
			

Stiamo usando la funzione datetime.now().strftime(...) per creare un timestamp da aggiungere al file usando .format(timestamp).

Passando alle due modifiche all’interno della funzione hackernews_rss(), aggiungiamo alcune informazioni sulla fonte (HackerNews RSS) e un timestamp created_at. Si tratta di due semplici modifiche che ci aiutano a differenziare le informazioni nel caso si voglia aggiungere ulteriori funzioni di scraping.

				
					# tasks.py

...

def hackernews_rss():
    ...
        for a in articles:
            ...
            article = {
                ...
                'created_at': str(datetime.now()),
                'source': 'HackerNews RSS'
                }
        ...
				
			
Le modifiche precedenti illustrano l’aggiunta dei campi created_at e source. Un rapido suggerimento: se ometti amo la transizione str() lo script non verrà eseguito a causa di un errore Object of type datetime is not JSON serializable.

Pianificazione dei task con Celery:

Ora sfruttiamo i poteri di pianificazione dei task di Celery utilizzando beat_schedule. Questo ci consente di registrare i task con l’agente di pianificazione in orari specifici.
Un’ottima guida per la schedulazione di task è sicuramente all’interno della documentazione ufficiale . Ci sono anche alcuni esempi di pianificazione aggiuntivi  nel file tasks.py del repository GitHub di questo tutorial.

Prevediamo di eseguire il task di scraping ogni minuto, in modo da dimostrere come il worker Celery interagisce con  i task pianificati. Ciò non comporterà dati diversi, poiché questo feed RSS non viene aggiornato ogni minuto con  nuovi dati.
L’obiettivo di questo  esempio è mostrare l’ouput dei file degli articoli e una semplice pianificazione dei task.

Creazione dello scheduler

				
					# tasks.py

... 

from celery.schedules import crontab # scheduler

# scheduled task execution
app.conf.beat_schedule = {
    # executes every 1 minute
    'scraping-task-one-min': {
        'task': 'tasks.hackernews_rss',
        'schedule': crontab()
    }
}
...
				
			

La configurazione di cui sopra registra le pianificazioni dei task all’interno della stessa app Celery. All’avvio, siamo in grado di chiamare lo scheduler di Celery per ricontrollare ciò che è stato messo in coda.

Esecuzione dei task

Ora che il codice è stato completato, è ora di accendere il server RabbitMQ e avviare i  worker Celery.
Per questo esempio, utilizziamo 2 schede del terminale:

  1. Server RabbitMQ
  2. Worker di Celery

 

Terminale #1

Per avviare il server RabbitMQ (il broker di messaggi), usiamo lo stesso comando descritto in precedenza.

Nota: si può anche prevedere di ricontrollare che il nodo creato all’avvio sia spento, poiché non si avvia con i log e genererà un errore se non viene terminato in precedenza.

				
					$ sudo rabbitmqctl shutdown
$ sudo rabbitmq-server
				
			

Si ottiene un output simile a l precedente esempio (come il seguente screenshot).

Una volta avviato il server RabbitMQ, possiamo iniziare con il terminale n. 2.

Terminale #2

Modifichiamo leggermente il comando, poiché ora include una notazione per -B la quale chiama il worker che esegue il beat scheduler.

				
					$ celery -A tasks worker -B -l INFO
				
			
L’output della console mostra l’avvio dell’applicazione e (in attesa della schedulazione prevista) stampa le informazioni relative all’esecuzione dei task. Nel seguente screenshot:
  • Si registra il [tasks]
  • Avvio dello scheduler beat
  • Il worker MainProcess riceve un’attività Received task: tasks.hackernews_rss
  • Avvio di ForkPoolWorker ed esecuzione del task, quindi restituisce un risultato
articoli - web scraping Celery 7

Suggerimento: possiamo interrompere l’esecuzione del task pianificato con Control + C, dato che il task viene eseguito a tempo indeterminato.

Dopo che il task è stato eseguito correttamente, la funzione save_function() ha generato il file .json.

Conclusione

In questo articolo abbiamo ampliato con successo un semplice strumento di web scraping in modo di prevedere una pianificazione dell’esecuzione. Ciò garantisce che non abbiamo più bisogno di eseguire manualmente lo script di scraping e che possiamo “impostarlo e dimenticarlo”. Tramite la schedulazione dello script, l’applicazione sarà in grado di analizzare i siti alla ricerca di dati che cambiano in base a una pianificazione prestabilita (ad esempio, ogni 15 minuti) e restituire ogni volta nuovi dati.

Quali sono i prossimi passi?

Nella parte 3, descriviamo un’applicazione Django con integrazione di Celery e web scraping. Questo è un ottimo esempio di un’applicazione web che effettua lo scraping e popola un sito web con informazioni dello scraping. Il prodotto finale sarà un aggregatore di notizie che estrae da più feed RSS.

Ulteriori letture

Questo articolo ha descritto molte informazioni sull’esecuzione dei task tramite numerosi esempi. Sebbene siamo stati in grado di effettuare alcune esecuzioni programmate, non siamo entrati nei dettagli di ciascuna delle tecnologie che abbiamo utilizzato. Ecco alcuni ottimi articoli (alcuni su Medium, altri no) che approfondiscono maggiormente la magia di Celery.

Creazione di uno scraper di feed RSS con Python

scienzadeidati articoli - Costruire uno scraper di feed RSS con Python

Questo articolo è la prima parte 1 di un tutorial relativo alla creazione di un’applicazione di web scraping con Python. In questo articolo ci concentriamo ad approfondire le librerie Requests e BeautifulSoup.

Nella 2° parte del tutorial vedremo come schedulare lo scraping tramite Celery.

Nella 3° parte descriviamo come realizzazione di un’applicazione web scraping con Django.

Il codice di questo articolo è disponibile pubblicamente su GitHub.

Requisiti

Ho già utilizzato lo scraping web in diversi modi all’interno dei miei progetti, che si tratti di raccolta di dati per l’analisi, creazione di notifiche quando i siti vengono modificati o creazione di applicazioni web.

Questo articolo illustrerà un semplice  di feed RSS per HackerNews. Il feed RSS, disponibile a questo link, prevede aggiornamenti a intervalli regolari per i  nuovi post e attività sul sito.

Sebbene ciò possa essere ottenuto utilizzando un lettore RSS, questo è inteso come un  semplice esempio di utilizzo di Python, che può essere adattato facilmente ad altri siti Web.

In questo esempio usiamo:

Obiettivi

Di seguito uno schema dei passaggi da prevedere per creare la nostra applicazione:

  1. Creazione della directory  del progetto e del file scraping.py.
  2. Verificare che possiamo eseguire il ping del feed RSS che sarà oggetto dello scraping.
  3. Scraping del contenuto XML del sito.
  4. Analisi del contenuto utilizzando BS4.
  5. Output del contenuto in un file .txt

Inizializzazione

Il primo passo è la creazione della directory del progetto e accedere a quella directory tramite riga di comando. Una volta all’interno della directory di progetto, creiamo il file di progetto.

Nota: in questo caso stiamo usando Ubuntu, quindi questi comandi potrebbero differire a seconda del sistema operativo.

            $ mkdir web_scraping_example && cd web_scraping_example

$ touch scraping.py
        
Successivamente è necessario installare le librerie Python previste dai requisiti. Si può usare il comando pip, come nell’esempio seguente:
            $ pip install requests
$ pip install bs4
        

Importazione delle librerie

Ora che le basi del progetto sono state impostate, possiamo iniziare a scrivere il codice del web scraping.

All’interno del file scraping.py, si deve importare le librerie che abbiamo installato utilizzando pip.

            # scraping.py

import requests
from bs4 import BeautifulSoup
        

Quanto sopra ci consente di utilizzare le funzioni fornite dalle librerie Requests e BeautifulSoup.

Ora possiamo iniziare a testare la capacità di eseguire il ping del feed RSS di HackerNews e scrivere lo script di scraping.

Nota: di seguito non includeremo le righe di importazione. Tutto il codice che rimane uguale verrà annotato come ‘…’ sopra o sotto le nuove righe.

Testare la richiesta

Quando eseguiamo lo scraping web, iniziamo inviando una richiesta a un sito web. Per assicurarci di essere in grado di eseguire lo scraping, dobbiamo verificare che  possiamo stabilre una connessione al sito.

Iniziamo creando la nostra funzione di scraping di base. Questo sarà ciò a cui eseguiremo

            # scraping.py

# library imports
...

# scraping function
def hackernews_rss('https://news.ycombinator.com/rss'):
    try:
        r = requests.get()
        return print('The scraping job succeeded: ', r.status_code)
    except Exception as e:
        print('The scraping job failed. See exception: ')
        print(e)
        
        
print('Starting scraping')
hackernews_rss()
print('Finished scraping')
        

Nel codice di cui sopra,  si richiama la funzione requests.get(...) della libreria Requests per recuperare il sito Web da analizzare. Quindi si stampa a video lo stato della richiesta, contenuto nel parametro r.status_code, per verificare che il sito Web sia stato chiamato correttamente.
Inoltre, abbiamo inserito tutto dentro un try: except: in modo da rilevare eventuali errori che si potremmo verificare in seguito.

Una volta eseguito lo script, si ottiene un codice di stato pari a 200. Questo conferma che siamo in grado di eseguire il ping del sito e “ottenere” informazioni.

            $ python scraping.py

Starting scraping
The scraping job succeeded: 200
Finsihed scraping
        

Scraping del contenuto

Il nostro script ha restituito un codice di stato pari a 200, siamo pronti per iniziare ad estrarre il contenuto XML dal sito. A tal fine, dobbiamo utilizzare BS4 insieme a Requests.
            # scraping.py

...

def hackernews_rss():
    try:
        r = requests.get('https://news.ycombinator.com/rss')
        soup = BeautifulSoup(r.content, features='xml')
        
        return print(soup)
...
        

Questo codice assegne il contenuto XML acquisito da HackerNews alla variabile soup. Stiamo usando r.content per passare l’XML restituito a BeautifulSoup, che analizzeremo nel prossimo paragrafo.

Una cosa fondamentale da notare è che stiamo sfruttando il parametro features='xml', questo parametro varia a seconda dei dati da acquisire (ad esempio, in un scraper HTML si dichiarerà come ‘HTML’).

L’output di quanto sopra sarà un gran disordine di contenuti che non ha molto senso. Questo è solo per visualizzare quali informazioni stiamo estraendo con successo dal sito web.

Analisi dei dati

Abbiamo descritto come possiamo estrarre l’XML dal feed RSS di HackerNews. E’ quindi il momento di analizzare le informazioni.

Abbiamo scelto di usare il feed RSS perché è molto più semplice da analizzare rispetto alle informazioni html del sito Web, poiché non dobbiamo preoccuparci degli elementi HTML nidificati e di individuare le informazioni esatte.

Iniziamo osservando la struttura del feed:

            <item>
    <title>...</title>
    <link>...</link>
    <pubDate>...</pubDate>
    <comments>...</comments>
    <description>...</description>
</item>
        

Ciascuno dei tag disponibili sul feed RSS segue la struttura di cui sopra, contenente tutte le informazioni all’interno dei tag <item>...</item>.

Sfruttiamo la coerenza dei tag item per analizzare le informazioni.

            # scraping.py 

...

def hackernews_rss():
    article_list = []
    try:
        r = requests.get('https://news.ycombinator.com/rss')
        soup = BeautifulSoup(r.content, features='xml')
        articles = soup.findAll('item')        
        for a in articles:
            title = a.find('title').text
            link = a.find('link').text
            published = a.find('pubDate').text
            article = {
                'title': title,
                'link': link,
                'published': published
                }
            article_list.append(article)
        return print(article_list)
...
        

In questo codice, effettuiamo un controllo di tutti gli item in articles = soup.findAll('item'). Questo consente di estrarre ciascuno dei tag <item>...</item> dall’XML che abbiamo acquisito.

Ciascuno degli articoli viene analizzato utilizzando il ciclo: for a in articles:, questo consente di analizzare le informazioni in variabili separate e aggiungerle a un dizionario vuoto che abbiamo creato prima del ciclo.
BS4 ha scomposto l’XML in una stringa, consentendoci di chiamare la funzione .find() per cercare i tag in ciascuno degli oggetti. Usando .text siamo in grado di estrarre gli elementi <tag>...</tag> e salvare solamente il testo.
I risultati sono inseriti in un elenco tramite il comando article_list.append(article) in modo da potervi accedere in seguito.

Infine inseriamo la stampa della lista degli articoli in modo da visualizzare un output durante l’esecuzione dello script di scraping.

Output in un file

Il feed RSS è stato ora inviato correttamente in una funzione print() per visualizzare l’elenco prodotto a termine del parsing. Ora possiamo inserire i dati in un file .txt, in modo che possano riessere utilizzati in future analisi e altre attività relative ai dati. Utilizziamo la libreria JSON per la visualizzazione dei dati più semplice per noi; tuttavia, descriviamo anche un esempio senza la libreria JSON.

Iniziamo creando un’altra funzione def save_function(): che elabora l’elenco prodotto dalla funzione hackernews_rss(). In questo modo sarà più facile apportare modifiche in futuro.

            # scraping.py
import json

...

def save_function(article_list):
    with open('articles.txt', 'w') as outfile:
        json.dump(article_list, outfile)

...
        

Quanto sopra utilizza la libreria JSON per scrivere l’output dello scraping nel file articles.txt. Questo file verrà sovrascritto durante l’esecuzione dello script.

Un altro metodo per scrivere nel file .txt prevede un ciclo for:

            # scraping.py

...

def save_function(article_list):
    with open('articles.txt', 'w') as f:
        for a in article_list:
            f.write(a+'\n')
        f.close()
        
...
        
Ora che abbiamo creato la funzione save_function(), possiamo adattare la funzione di scraping per salvare i dati.
            # scraping.py

...

def hackernews_rss():
    ...
    try:
        ...
        return save_function(article_list)

...
        

Modificando il comando return print(article_list) in return save_function(article_list) siamo in grado di inserire i dati in un file .txt.

L’esecuzione dello script produrrà ora un file .txt dei dati acquisiti dal feed RSS di HackerNews.

Conclusione

Abbiamo creato con successo uno script Python per il di scraping dei feed RSS utilizzando Requests e BeautifulSoup. Questo ci consente di analizzare le informazioni XML in un formato leggibile con cui lavorare in futuro.

Questo script è la base per costruire alcune funzionalità interessanti:

  • Scraping di informazioni più complesse utilizzando elementi HTML.
  • Utilizzo di Selenium per lo scraping di siti dove il rendering lato client rende inutile l’uso di Requests.
  • Creazione di un’applicazione Web che acquisirà i dati dello scraping e li visualizzerà (ad esempio, un aggregatore)
  • Estrazione di dati da siti Web a seconda di una pianificazione e schedulazione.


Sebbene questo articolo abbia  descritto le basi del web scraping, possiamo iniziare ad approfondire alcuni dettagli.

Possiamo inserire questo script all’interno di una sequenza di altri script, schedulati tramite  Celery, oppure possiamo aggregare le informazioni su un’applicazione web utilizzando Django.

Questo articolo fa parte di una serie in 3 parti in cui descriviamo semplici esempi di web scraping e aggregazione su base programmata.

Creazione di Microservizi con Python e FastAPI

scienzadeidati articoli - Creazione di Microservizi con Python e FastAPI

Come sviluppatore Python potresti aver sentito parlare del termine microservizi e desideri creare da solo un microservizio Python. I microservice sono un’ottima architettura per la creazione di applicazioni altamente scalabili. Prima di iniziare a creare l’applicazione usando i microservice, è necessario conoscere i vantaggi e gli svantaggi dell’uso dei microservizi. In questo articolo imparerai i vantaggi e gli svantaggi dell’utilizzo dei microservice. Imparerai anche come creare il tuo microservice e distribuirlo utilizzando Docker Compose.

In questo tutorial vedremo:

  • Quali sono i vantaggi e gli svantaggi dei microservizi.
  • Perché dovresti creare microservice con Python.
  • Come creare API REST utilizzando FastAPI e PostgreSQL.
  • Come creare microservice utilizzando FastAPI.
  • Come eseguire i microservice usando docker-compose.
  • Come gestire i microservice utilizzando Nginx.

Prima creeremo una semplice API REST usando FastAPI e poi utilizzeremo PostgreSQL come nostro database. Estenderemo quindi la stessa applicazione a un microservizio.

 

Introduzione ai microservice

Il microservice è un approccio per suddividere grandi applicazioni monolitiche in singole applicazioni specializzate in uno specifico servizio/funzionalità. Questo approccio è spesso noto come architettura orientata ai servizi o SOA.

Nell’architettura monolitica, ogni logica di business risiede nella stessa applicazione. I servizi dell’applicazione come la gestione degli utenti, l’autenticazione e altre funzionalità utilizzano lo stesso database.

In un’architettura di microservice, l’applicazione è suddivisa in diversi servizi separati che vengono eseguiti in processi separati. Esiste un database diverso per le diverse funzionalità dell’applicazione e i servizi comunicano tra loro utilizzando HTTP, AMQP o un protocollo binario come TCP, a seconda della natura di ciascun servizio. La comunicazione tra servizi può essere eseguita anche utilizzando le code di messaggi come RabbitMQ, Kafka o Redis.

 

Vantaggi dei microservice

L’architettura a microservizi offre molti vantaggi. Alcuni di questi vantaggi sono:

  • Un’applicazione debolmente accoppiata significa che i diversi servizi possono essere costruiti utilizzando le tecnologie più adatte alle singole funzioni e specificità. Quindi, il team di sviluppo non è vincolato alle scelte fatte durante l’avvio del progetto.
  • Poiché i servizi sono responsabili di funzionalità specifiche, la comprensione e il controllo dell’applicazione è più semplice e facile.
  • Anche il ridimensionamento dell’applicazione diventa più semplice perché se uno dei servizi richiede un utilizzo elevato della GPU, solo il server che contiene quel servizio deve avere una GPU elevata e gli altri possono essere eseguiti su un server normale.
 

Svantaggi dei microservice

L’architettura dei microservizi non è un proiettile d’argento che risolve tutti i tuoi problemi, ha anche i suoi svantaggi. Alcuni di questi inconvenienti sono:

  • Poiché diversi servizi utilizzano un diverso database, le transazioni che coinvolgono più di un servizio devono gestire la consistenza dei dati.
  • La suddivisione perfetta dei servizi è molto difficile da ottenere al primo tentativo e questo deve essere ripetuto prima di ottenere la migliore separazione possibile dei servizi.
  • Poiché i servizi comunicano tra loro attraverso l’uso dell’interazione di rete, ciò rende l’applicazione più lenta a causa della latenza della rete e del servizio.
 

Perché Microservice in Python?

Python è uno strumento perfetto per la creazione di microservizi perché questo linguaggio ha una semplice curva di apprendimento, tonnellate di librerie e pacchetti già implementati e una grande community di utilizzatori. Grazie all’introduzione della programmazione asincrona in Python, sono emersi framework web con prestazioni alla pari con GO e Node.js.

 

Introduzione a FastAPI

FastAPI è un moderno framework Web ad alte prestazioni, dotato di tantissime funzioni interessanti come la documentazione automatica basata su OpenAPI e la libreria di convalida e serializzazione integrata. In questo link puoi leggere l’elenco di tutte le fantastiche funzionalità presenti FastAPI.

 

Perché FastAPI

Alcuni dei motivi per cui penso che FastAPI sia un’ottima scelta per la creazione di microservizi in Python sono:

  • Documentazione automatica
    Supporto Async/Await
  • Convalida e serializzazione integrate
  • Tipo 100% annotato, quindi il completamento automatico funziona alla grande
 

Installazione FastAPI

Prima di installare FastAPI è necessario creare una nuova directory movie_service e creare un nuovo ambiente virtuale all’interno della directory appena creata usando virtualenv.

Se non l’hai già installato virtualenv:

            pip install virtualenv
        
Ora, si crea un nuovo ambiente virtuale.
            virtualenv env
        

Nei sistemi Mac/Linux si può attivare l’ambiente virtuale usando il comando:

            source ./env/bin/activate
        
Gli utenti Windows possono invece eseguire questo comando:
            .\env\Scripts\activate
        
Infine, siamo pronti per installare FastAPI eseguendo il seguente comando:
            pip install fastapi
        

Poiché FastAPI non viene fornito con un service integrato, è necessario installare uvicorn per eseguire il service. uvicorn è un server ASGI che ci consente di utilizzare le funzionalità async/await.

Per installare uvicorn si può usare il comando:

            pip install uvicorn
        

Creazione di un semplice REST API utilizzando FastAPI

Prima di iniziare a creare un microservice utilizzando FastAPI, impariamo le basi di FastAPI. Creiamo una nuova directory zcode>app e un nuovo file main.py all’interno della directory appena creata.

Aggiungiamo il seguente codice in main.py.

            #~/movie_service/app/main.py

from fastapi import FastAPI

app = FastAPI()


@app.get('/')
async def index():
    return {"Real": "Python"}
        

E’ necessario importare e creare un’istanza di FastAPI e quindi registrare l’endpoint radice / che restituisce un file JSON.

È possibile eseguire il server delle applicazioni utilizzando uvicorn app.main:app --reload. Qui app.main indica che si sta usando il file main.py all’interno della directory app e :app indica a FastAPI il nome della nostra istanza.

Possiamo accedere all’app da http://127.0.0.1:8000. Per accedere alla documentazione automatica bisogna andare su http://127.0.0.1:8000/docs. Possiamo giocare e interagire con l’API dal browser stesso.

Aggiungiamo alcune funzionalità CRUD alla nostra applicazione. Aggiorniamo il main.py in modo che assomigli a quanto segue:

            #~/movie_service/app/main.py

from fastapi import FastAPI
from pydantic import BaseModel
from typing import List

app = FastAPI()

fake_movie_db = [
    {
        'name': 'Star Wars: Episode IX - The Rise of Skywalker',
        'plot': 'The surviving members of the resistance face the First Order once again.',
        'genres': ['Action', 'Adventure', 'Fantasy'],
        'casts': ['Daisy Ridley', 'Adam Driver']
    }
]

class Movie(BaseModel):
    name: str
    plot: str
    genres: List[str]
    casts: List[str]


@app.get('/', response_model=List[Movie])
async def index():
    return fake_movie_db
        

Come puoi vedere hai creato una nuova classe Movie che estende la classe BaseModel di Pydantic.

Il modello Movie contiene il nome, la foto, il genere e il cast. Pydantic è integrato con FastAPI che rende la creazione di modelli e la convalida delle richieste un gioco da ragazzi.

Se andiamo alla pagina della documentazione possiamo vedere che sono descritti i campi del nostro modello nella sezione di risposta di esempio. Questo è possibile perché abbiamo definito il response_model nella definizione del percorso nel decoratore @app.get.

Ora aggiungiamo l’endpoint per aggiungere un film al nostro elenco di film.

Aggiungiamo una nuova definizione di endpoint per gestire la richiesta POST.

            @app.post('/', status_code=201)
async def add_movie(payload: Movie):
    movie = payload.dict()
    fake_movie_db.append(movie)
    return {'id': len(fake_movie_db) - 1}
        

Ora apriamo il browser e testiamo la nuova API. Proviamo ad aggiungere un film con un campo non valido o senza i campi obbligatori e verifichiamo che la convalida venga gestita automaticamente da FastAPI.

Aggiungiamo un nuovo endpoint per aggiornare il film.

            @app.put('/{id}')
async def update_movie(id: int, payload: Movie):
    movie = payload.dict()
    movies_length = len(fake_movie_db)
    if 0 <= id <= movies_length:
        fake_movie_db[id] = movie
        return None
    raise HTTPException(status_code=404, 
                        detail="Movie with given id not found")
        

Ecco l’indice id della nostra lista fake_movie_db.

Nota: ricorda di importare HTTPException da Fastapi

Ora possiamo anche aggiungere l’endpoint per eliminare il film.

            @app.delete('/{id}')
async def delete_movie(id: int):
    movies_length = len(fake_movie_db)
    if 0 <= id <= movies_length:
        del fake_movie_db[id]
        return None
    raise HTTPException(status_code=404, 
                        detail="Movie with given id not found")
        
Prima di andare avanti, strutturiamo la nostra app in un modo migliore. Creiamo una nuova cartella api all’interno di app e creiamo un nuovo file movies.py all’interno della cartella creata di recente. Spostiamo tutti i codici relativi alle route da main.py a movies.py. Quindi, movies.py dovrebbe essere simile al seguente:
            #~/movie-service/app/api/movies.py

from typing import List
from fastapi import Header, APIRouter

from app.api.models import Movie

fake_movie_db = [
    {
        'name': 'Star Wars: Episode IX - The Rise of Skywalker',
        'plot': 'The surviving members of the resistance face the First Order once again.',
        'genres': ['Action', 'Adventure', 'Fantasy'],
        'casts': ['Daisy Ridley', 'Adam Driver']
    }
]

movies = APIRouter()

@movies.get('/', response_model=List[Movie])
async def index():
    return fake_movie_db

@movies.post('/', status_code=201)
async def add_movie(payload: Movie):
    movie = payload.dict()
    fake_movie_db.append(movie)
    return {'id': len(fake_movie_db) - 1}

@movies.put('/{id}')
async def update_movie(id: int, payload: Movie):
    movie = payload.dict()
    movies_length = len(fake_movie_db)
    if 0 <= id <= movies_length:
        fake_movie_db[id] = movie
        return None
    raise HTTPException(status_code=404, detail="Movie with given id not found")

@movies.delete('/{id}')
async def delete_movie(id: int):
    movies_length = len(fake_movie_db)
    if 0 <= id <= movies_length:
        del fake_movie_db[id]
        return None
    raise HTTPException(status_code=404, detail="Movie with given id not found")
        

Qui abbiamo registrato un nuovo percorso API utilizzando APIRouter di FastAPI.

Inoltre, creiamo un nuovo file models.py all’interno di api in cui definiamo i nostri modelli Pydantic.

            #~/movie-service/api/models.py

from typing import List
from pydantic import BaseModel

class Movie(BaseModel):
    name: str
    plot: str
    genres: List[str]
    casts: List[str]

        
Ora dobbiamo indicare a FastAPI dove trovare i persorsi web. A tale scopo dobbiamo registrare all’interno di main.py il nuovo file con le “routes”.
            #~/movie-service/app/main.py

from fastapi import FastAPI

from app.api.movies import movies

app = FastAPI()

app.include_router(movies)
        
Infine, la struttura della directory della nostra applicazione è simile alla seguente:
            movie-service
├── app
│   ├── api
│   │   ├── models.py
│   │   ├── movies.py
│   |── main.py
└── env
        

Prima di proseguire è bene assicurarsi che l’applicazione funzioni correttamente.

 

Utilizzo del database PostgreSQL con FastAPI

In precedenza abbiamo usato lista di stringhe Python per simulare un elenco di film, ma ora siamo pronti per utilizzare un database reale a tale questo scopo. In particolare, in questa applicazione useremo PostgreSQL. Installaremo PostgreSQL, se non l’hai già fatto. Dopo aver installato PostgreSQL creeremo un nuovo database che chiameremo movie_db.

Utilizzeramo encode/database per connettersi al database utilizzando il supporto async e await.

Installa la libreria richiesta utilizzando:

            pip install 'databases[postgresql]'
        

questo comando installerà anche sqlalchemy e asyncpg, che sono necessari per lavorare con PostgreSQL.

Creiamo un nuovo file all’interno di api e lo chiamiamo db.py. Questo file conterrà il modello del database reale per il REST API.

            #~/movie-service/app/api/db.py

from sqlalchemy import (Column, Integer, MetaData, String, Table,
                        create_engine, ARRAY)

from databases import Database

DATABASE_URL = 'postgresql://movie_user:movie_password@localhost/movie_db'

engine = create_engine(DATABASE_URL)
metadata = MetaData()

movies = Table(
    'movies',
    metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String(50)),
    Column('plot', String(250)),
    Column('genres', ARRAY(String)),
    Column('casts', ARRAY(String))
)

database = Database(DATABASE_URL)
        

In particolare, DATABASE_URI è l’URL utilizzato per connettersi al database PostgreSQL. movie_user è il nome dell’utente del database, movie_password è la password dell’utente del database ed movie_db è il nome del database.

Proprio come in SQLAlchemy, abbiamo creato la tabella per il database dei film.

Aggiorniamo quindi main.py per connettersi al database. Il codice di main.py è il seguente:

            #~/movie-service/app/main.py

from fastapi import FastAPI
from app.api.movies import movies
from app.api.db import metadata, database, engine

metadata.create_all(engine)

app = FastAPI()

@app.on_event("startup")
async def startup():
    await database.connect()

@app.on_event("shutdown")
async def shutdown():
    await database.disconnect()
    
app.include_router(movies)
        
FastAPI fornisce alcuni gestori di eventi che è possibile utilizzare per connettersi al nostro database all’avvio dell’applicazione e disconnettersi alla chiusura. Aggiorniamo movies.py in modo che utilizzi un database invece di un falso elenco Python.
            #~/movie-service/app/api/movies.py

from typing import List
from fastapi import Header, APIRouter

from app.api.models import MovieIn, MovieOut
from app.api import db_manager

movies = APIRouter()

@movies.get('/', response_model=List[MovieOut])
async def index():
    return await db_manager.get_all_movies()

@movies.post('/', status_code=201)
async def add_movie(payload: MovieIn):
    movie_id = await db_manager.add_movie(payload)
    response = {
        'id': movie_id,
        **payload.dict()
    }

    return response

@movies.put('/{id}')
async def update_movie(id: int, payload: MovieIn):
    movie = payload.dict()
    fake_movie_db[id] = movie
    return None

@movies.put('/{id}')
async def update_movie(id: int, payload: MovieIn):
    movie = await db_manager.get_movie(id)
    if not movie:
        raise HTTPException(status_code=404, detail="Movie not found")

    update_data = payload.dict(exclude_unset=True)
    movie_in_db = MovieIn(**movie)

    updated_movie = movie_in_db.copy(update=update_data)

    return await db_manager.update_movie(id, updated_movie)

@movies.delete('/{id}')
async def delete_movie(id: int):
    movie = await db_manager.get_movie(id)
    if not movie:
        raise HTTPException(status_code=404, detail="Movie not found")
    return await db_manager.delete_movie(id)
        
Aggiungiamo db_manager.py per manipolare il nostro database.
            #~/movie-service/app/api/db_manager.py

from app.api.models import MovieIn, MovieOut, MovieUpdate
from app.api.db import movies, database

async def add_movie(payload: MovieIn):
    query = movies.insert().values(**payload.dict())
    return await database.execute(query=query)

async def get_all_movies():
    query = movies.select()
    return await database.fetch_all(query=query)

async def get_movie(id):
    query = movies.select(movies.c.id==id)
    return await database.fetch_one(query=query)

async def delete_movie(id: int):
    query = movies.delete().where(movies.c.id==id)
    return await database.execute(query=query)

async def update_movie(id: int, payload: MovieIn):
    query = (
        movies
        .update()
        .where(movies.c.id == id)
        .values(**payload.dict())
    )
    return await database.execute(query=query)
        
Aggiorniamo il nostro models.py in modo che possiamo utilizzare il modello Pydantic con la tabella sqlalchemy.
            #~/movie-service/app/api/models.py

from pydantic import BaseModel
from typing import List, Optional


class MovieIn(BaseModel):
    name: str
    plot: str
    genres: List[str]
    casts: List[str]


class MovieOut(MovieIn):
    id: int


class MovieUpdate(MovieIn):
    name: Optional[str] = None
    plot: Optional[str] = None
    genres: Optional[List[str]] = None
    casts: Optional[List[str]] = None
        

La classe MovieIn è il modello base da usare per aggiungere un film al database. Dobbiamo aggiungere id a questo modello per ottenerlo dal database, creando il modello ModeulOut. Il modello MovieUpdate consente di impostare i valori nel modello come facoltativi in modo che durante l’aggiornamento del filmato possa essere inviato solo il campo che deve essere aggiornato.

Possiamo ora collegarci alla pagina “docs” della nostra applicazione ed inizia a giocare con l’API.

 

 

Modelli di gestione dei dati nei microservice

La gestione dei dati è uno degli aspetti più critici durante la creazione di un microservizio. Poiché le diverse funzioni dell’applicazione sono gestite da servizi diversi, l’utilizzo di un database può essere complicato.

Di seguito sono riportati alcuni modelli che è possibile utilizzare per gestire il flusso di dati nell’applicazione.

 

Database per servizio

L’utilizzo di un database per ogni servizio è ottimo se vuoi che i tuoi microservizi siano il più possibile accoppiati debolmente. Avere un database diverso per ogni servizio ci consente di scalare servizi diversi in modo indipendente. Una transazione che coinvolge più database viene eseguita tramite API ben definite. Ciò ha il suo svantaggio in quanto non è semplice implementare transazioni complesse che coinvolgono più servizi . Inoltre, l’aggiunta del sovraccarico di rete rende questo approccio meno efficiente da usare.

 

Database condiviso

Se ci sono molte transazioni che coinvolgono più servizi è meglio usare un database condiviso. Ciò comporta i vantaggi di un’applicazione altamente coerente, ma elimina la maggior parte dei vantaggi offerti dall’architettura dei microservizi. Gli sviluppatori che lavorano su un servizio devono coordinarsi con le modifiche allo schema in altri servizi.

 

Composizione API

Nelle transazioni che coinvolgono più database, il compositore API funge da gateway API ed esegue chiamate API ad altri microservizi nell’ordine richiesto. Infine, i risultati di ogni microservizio vengono restituiti al servizio client dopo aver eseguito un join in memoria. Lo svantaggio di questo approccio è l’inefficienza dei join in memoria di un set di dati di grandi dimensioni.

 

 

Creazione di un microservice Python in Docker

Il problema della distribuzione del microservice può essere notevolmente ridotto utilizzando Docker. Docker aiuta a incapsulare ogni servizio e a scalarlo in modo indipendente.

 

Installazione di Docker e Docker Compose

Se non hai già installato docker nel tuo sistema, verifichiamo se docker è installato eseguendo il comando docker. Dopo aver completato l’installazione di Docker, installiamo Docker Compose . Docker Compose viene utilizzato per definire ed eseguire più container Docker. E’ utile anche per facilitare l’interazione tra i container.

 

Creazione del servizio Movies

Poiché gran parte del lavoro per la creazione di un servizio Movies è già stato svolto all’inizio con FastAPI, riutilizzeremo il codice che abbiamo già scritto. Creiamo una cartella nuova di zecca, che chiamerò python-microservices e spostiamoci il codice che abbiamo scritto in precedenza.

Quindi, la struttura delle cartelle sarebbe simile a questa:

            python-microservices/
└── movie-service/
    ├── app/
    └── env/
        

Prima di tutto, creiamo un file requirements.txt in cui conserveremo tutte le dipendenze che utilizzeremo in movie-service.

Creiamo un nuovo file requirements.txt all’interno di movie-service e aggiungiamo quanto segue:

            asyncpg==0.20.1
databases[postgresql]==0.2.6
fastapi==0.48.0
SQLAlchemy==1.3.13
uvicorn==0.11.2
httpx==0.11.1
        

Abbiamo usato tutte le librerie menzionate nel file tranne httpx che utilizzerai mentre effettui una chiamata API da servizio ad un altro.

Creiamo un Dockerfile all’interno di movie-service come segue:

            FROM python:3.8-slim

WORKDIR /app

COPY ./requirements.txt /app/requirements.txt

RUN apt-get update \
    && apt-get install gcc -y \
    && apt-get clean

RUN pip install -r /app/requirements.txt \
    && rm -rf /root/.cache/pip

COPY . /app/
        

Per prima cosa definiamo quale versione di Python usare. Quindi impostiamo la cartella app come WORKDIR all’interno del contenitore Docker. Dopodiché viene installato gcc, richiesto dalle librerie che utilizziamo nell’applicazione.

Infine, installiamo tutte le dipendenze in requirements.txt e copiamo tutti i file all’interno di movie-service/app.

Aggiorniamo db.py e sostituiamo:

            DATABASE_URI = 'postgresql://movie_user:movie_password@localhost/movie_db'
        

con:

            DATABASE_URI = os.getenv('DATABASE_URI')
        

NOTA: non dimentichiamo di importare os nella parte superiore del file.

È necessario eseguire questa operazione in modo da poter fornire in seguito DATABASE_URI come variabile di ambiente.

Inoltre, aggiorniamo main.py e sostituiamo:

            app.include_router(movies)
        
con:
            app.include_router(movies, prefix='/api/v1/movies', tags=['movies'])
        

Abbiamo aggiunto prefix=/api/v1/movies in modo da gestire più facilmente le diverse versioni dell’API. Inoltre, i tag facilitano la ricerca delle API relative ai movies nei docs di FastAPI.

Inoltre, dobbiamo aggiornare i nostri modelli in modo che casts memorizzi l’ID del cast invece del nome effettivo. Quindi, aggiorniamo models.py come segue:

            #~/python-microservices/movie-service/app/api/models.py

from pydantic import BaseModel
from typing import List, Optional

class MovieIn(BaseModel):
    name: str
    plot: str
    genres: List[str]
    casts_id: List[int]


class MovieOut(MovieIn):
    id: int


class MovieUpdate(MovieIn):
    name: Optional[str] = None
    plot: Optional[str] = None
    genres: Optional[List[str]] = None
    casts_id: Optional[List[int]] = None
        
Allo stesso modo, dobbiamo aggiornare le tabelle del database, aggiorniamo db.py:
            #~/python-microservices/movie-service/app/api/db.py

import os

from sqlalchemy import (Column, DateTime, Integer, MetaData, String, Table,
                        create_engine, ARRAY)

from databases import Database

DATABASE_URL = os.getenv('DATABASE_URL')

engine = create_engine(DATABASE_URL)
metadata = MetaData()

movies = Table(
    'movies',
    metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String(50)),
    Column('plot', String(250)),
    Column('genres', ARRAY(String)),
    Column('casts_id', ARRAY(Integer))
)

database = Database(DATABASE_URL)
        

Ora, aggiorniamo movies.py per verificare se il cast con l’ID specificato si presenta nel cast-service prima di aggiungere un nuovo film o aggiornare un film.

            #~/python-microservices/movie-service/app/api/movies.py

from typing import List
from fastapi import APIRouter, HTTPException

from app.api.models import MovieOut, MovieIn, MovieUpdate
from app.api import db_manager
from app.api.service import is_cast_present

movies = APIRouter()

@movies.post('/', response_model=MovieOut, status_code=201)
async def create_movie(payload: MovieIn):
    for cast_id in payload.casts_id:
        if not is_cast_present(cast_id):
            raise HTTPException(status_code=404, detail=f"Cast with id:{cast_id} not found")

    movie_id = await db_manager.add_movie(payload)
    response = {
        'id': movie_id,
        **payload.dict()
    }

    return response

@movies.get('/', response_model=List[MovieOut])
async def get_movies():
    return await db_manager.get_all_movies()

@movies.get('/{id}/', response_model=MovieOut)
async def get_movie(id: int):
    movie = await db_manager.get_movie(id)
    if not movie:
        raise HTTPException(status_code=404, detail="Movie not found")
    return movie

@movies.put('/{id}/', response_model=MovieOut)
async def update_movie(id: int, payload: MovieUpdate):
    movie = await db_manager.get_movie(id)
    if not movie:
        raise HTTPException(status_code=404, detail="Movie not found")

    update_data = payload.dict(exclude_unset=True)

    if 'casts_id' in update_data:
        for cast_id in payload.casts_id:
            if not is_cast_present(cast_id):
                raise HTTPException(status_code=404, detail=f"Cast with given id:{cast_id} not found")

    movie_in_db = MovieIn(**movie)

    updated_movie = movie_in_db.copy(update=update_data)

    return await db_manager.update_movie(id, updated_movie)

@movies.delete('/{id}', response_model=None)
async def delete_movie(id: int):
    movie = await db_manager.get_movie(id)
    if not movie:
        raise HTTPException(status_code=404, detail="Movie not found")
    return await db_manager.delete_movie(id)
        

Aggiungiamo un servizio per effettuare una chiamata API al casts-service:

            #~/python-microservices/movie-service/app/api/service.py

import os
import httpx

CAST_SERVICE_HOST_URL = 'http://localhost:8002/api/v1/casts/'
url = os.environ.get('CAST_SERVICE_HOST_URL') or CAST_SERVICE_HOST_URL

def is_cast_present(cast_id: int):
    r = httpx.get(f'{url}{cast_id}')
    return True if r.status_code == 200 else False
        

Nel precedente codice si effettua una chiamata API per ricavare il cast con uno specifico id e restituire true se il cast esiste altrimenti false.


Creazione del casts-service

Analogamente al movie-service, per creare un casts-service utilizzeremo FastAPI e un database PostgreSQL.

Creiamo una struttura di cartelle come la seguente:

            python-microservices/
.
├── cast_service/
│   ├── app/
│   │   ├── api/
│   │   │   ├── casts.py
│   │   │   ├── db_manager.py
│   │   │   ├── db.py
│   │   │   ├── models.py
│   │   ├── main.py
│   ├── Dockerfile
│   └── requirements.txt
├── movie_service/
...
        
Aggiungiamo quanto segue a requirements.txt:
            asyncpg==0.20.1
databases[postgresql]==0.2.6
fastapi==0.48.0
SQLAlchemy==1.3.13
uvicorn==0.11.2
        
Dockerfile:
            FROM python:3.8-slim

WORKDIR /app

COPY ./requirements.txt /app/requirements.txt

RUN apt-get update \
    && apt-get install gcc -y \
    && apt-get clean

RUN pip install -r /app/requirements.txt \
    && rm -rf /root/.cache/pip

COPY . /app/
        
main.py:
            #~/python-microservices/cast-service/app/main.py

from fastapi import FastAPI
from app.api.casts import casts
from app.api.db import metadata, database, engine

metadata.create_all(engine)

app = FastAPI()

@app.on_event("startup")
async def startup():
    await database.connect()

@app.on_event("shutdown")
async def shutdown():
    await database.disconnect()

app.include_router(casts, prefix='/api/v1/casts', tags=['casts'])
        

Abbiamo aggiunto il prefix /api/v1/casts in modo che la gestione dell’API diventi più semplice. Inoltre, l’aggiunta dei tags facilita la ricerca dei documenti relativi a casts nel docs di FastAPI.

casts.py

            #~/python-microservices/cast-service/app/api/casts.py

from fastapi import APIRouter, HTTPException
from typing import List

from app.api.models import CastOut, CastIn, CastUpdate
from app.api import db_manager

casts = APIRouter()

@casts.post('/', response_model=CastOut, status_code=201)
async def create_cast(payload: CastIn):
    cast_id = await db_manager.add_cast(payload)

    response = {
        'id': cast_id,
        **payload.dict()
    }

    return response

@casts.get('/{id}/', response_model=CastOut)
async def get_cast(id: int):
    cast = await db_manager.get_cast(id)
    if not cast:
        raise HTTPException(status_code=404, detail="Cast not found")
    return cast
        
db_manager.py
            #~/python-microservices/cast-service/app/api/db_manager.py

from app.api.models import CastIn, CastOut, CastUpdate
from app.api.db import casts, database


async def add_cast(payload: CastIn):
    query = casts.insert().values(**payload.dict())

    return await database.execute(query=query)

async def get_cast(id):
    query = casts.select(casts.c.id==id)
    return await database.fetch_one(query=query)
        
db.py
            #~/python-microservices/cast-service/app/api/db.py

import os

from sqlalchemy import (Column, Integer, MetaData, String, Table,
                        create_engine, ARRAY)

from databases import Database

DATABASE_URI = os.getenv('DATABASE_URI')

engine = create_engine(DATABASE_URI)
metadata = MetaData()

casts = Table(
    'casts',
    metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String(50)),
    Column('nationality', String(20)),
)

database = Database(DATABASE_URI)
        
models.py
            #~/python-microservices/cast-service/app/api/models.py

from pydantic import BaseModel
from typing import List, Optional

class CastIn(BaseModel):
    name: str
    nationality: Optional[str] = None


class CastOut(CastIn):
    id: int


class CastUpdate(CastIn):
    name: Optional[str] = None
        

Esecuzione del microservizio utilizzando Docker Compose

Per eseguire i microservice, è necessario creare un  filedocker-compose.yml e aggiungiamo quanto segue:

            version: '3.7'

services:
  movie_service:
    build: ./movie-service
    command: uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
    volumes:
      - ./movie-service/:/app/
    ports:
      - 8001:8000
    environment:
      - DATABASE_URI=postgresql://movie_db_username:movie_db_password@movie_db/movie_db_dev
      - CAST_SERVICE_HOST_URL=http://cast_service:8000/api/v1/casts/

  movie_db:
    image: postgres:12.1-alpine
    volumes:
      - postgres_data_movie:/var/lib/postgresql/data/
    environment:
      - POSTGRES_USER=movie_db_username
      - POSTGRES_PASSWORD=movie_db_password
      - POSTGRES_DB=movie_db_dev

  cast_service:
    build: ./cast-service
    command: uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
    volumes:
      - ./cast-service/:/app/
    ports:
      - 8002:8000
    environment:
      - DATABASE_URI=postgresql://cast_db_username:cast_db_password@cast_db/cast_db_dev

  cast_db:
    image: postgres:12.1-alpine
    volumes:
      - postgres_data_cast:/var/lib/postgresql/data/
    environment:
      - POSTGRES_USER=cast_db_username
      - POSTGRES_PASSWORD=cast_db_password
      - POSTGRES_DB=cast_db_dev

volumes:
  postgres_data_movie:
  postgres_data_cast:
        

Abbiamo 4 diversi servizi, movie_service, un database per movie_service, cast_service e un database per cast service. Abbiamo esposto il movie_service alla porta 8001 e modo simile cast_service alla porta 8002.

Per il database, abbiamo utilizzato i volumi in modo che i dati non vengano distrutti quando il contenitore docker viene chiuso. 

Eseguiamo il docker-compose usando il comando:

            docker-compose up -d
        

Questo comando crea la dockar image, se non esiste già, e la esegue.

Andiamo su http://localhost:8002/docs per aggiungere un cast nel casts-service. Allo stesso modo, http://localhost:8001/docs  per aggiungere il film nel movie-service.

Utilizziamo di Nginx per accedere a entrambi i servizi utilizzando un singolo indirizzo host

Abbiamo distribuito i microservizi utilizzando Docker compose, ma c’è un piccolo problema. È necessario accedere a ciascuno dei microservizi utilizzando una porta diversa. Si può risolvere questo problema utilizzando il reverse-proxy di Nginx , utilizzando Nginx puoi indirizzare la richiesta aggiungendo un middleware che indirizza le nostre richieste a diversi servizi in base all’URL dell’API.

Aggiungamo un nuovo file nginx_config.conf all’interno python-microservices con i seguenti contenuti.

            server {
  listen 8080;

  location /api/v1/movies {
    proxy_pass http://movie_service:8000/api/v1/movies;
  }

  location /api/v1/casts {
    proxy_pass http://cast_service:8000/api/v1/casts;
  }

}
        

Stiamo eseguendo Nginx alla porta 8080 e instradando le richieste al movie-service se l’endpoint inizia con /api/v1/movies e in modo simile al casts-service se l’endpoint inizia con /api/v1/casts.

Ora dobbiamo aggiungere il servizio nginx nel nostro file docker-compose-yml. Aggiungiamo il seguente servizio dopo il servizio cast_db:

            ...
nginx:
    image: nginx:latest
    ports:
      - "8080:8080"
    volumes:
      - ./nginx_config.conf:/etc/nginx/conf.d/default.conf
    depends_on:
      - cast_service
      - movie_service
...
        
Ora, spegnamo i contenitori docker con il comando:
            docker-compose down
        

Ed eseguiamolo di nuovo con:

            docker-compose up -d
        

Ora possiamo accedere sia al movie-service che al casts-service tramite la porta 8080.

Collegandosi a http://localhost:8080/api/v1/movies/ otteniamo l’elenco dei film.

Ora, per accedere ai docs dei service è necessario modificare il  main.py del movie-service la seguente riga:

            app = FastAPI()
        

con

            app = FastAPI(openapi_url="/api/v1/movies/openapi.json", docs_url="/api/v1/movies/docs")
        
Allo stesso modo, per il casts-service si deve sostituirlo con
            app = FastAPI(openapi_url="/api/v1/casts/openapi.json", docs_url="/api/v1/casts/docs")
        

Abbiamo cambiato l’endpoint in cui vengono serviti i docs e da dove viene servito il openapi.json.

Ora possiamo accedere ai docs da http://localhost:8080/api/v1/movies/docs per il movie-service e da http://localhost:8080/api/v1/casts/docs per il casts-service.

 

Conclusione

L’architettura del microservice è ottima per suddividere una grande applicazione monolitica in logiche di business separate, ma anche questo comporta una complicazione. Python è ottimo per la creazione di microservizi grazie all’esperienza degli sviluppatori e a tonnellate di pacchetti e framework per rendere gli sviluppatori più produttivi.

Puoi trovare il codice completo presentato in questo tutorial su Github.

Pianificare un web scraping con Apache Airflow

scienzadeidati articoli - Web Scarping con Airflow

Nel post precedente , abbiamo introdotto Apache Airflow e i suoi concetti di base, la  configurazione e l’utilizzo. In questo post, descriviamo come possiamo pianificare i nostri web scraper con l’aiuto di Apache Airflow.

A titolo di esempio, vedremo come effettuare lo scraping del sito https://allrecipes.com tramite perché lo scopo è utilizzare Airflow. Nel caso si desideri approfondire il web scraping si può consultare l’intera serie presente qui su scienzadeidati.com.

Quindi, lavoreremo su un flusso di lavoro composto dalle seguenti attività:

  • parse_recipes: analizza le singole ricette.
  • download_image: scarica l’immagine della ricetta.
  • store_data: memorizza  l’immagine e i dati analizzati in un database MySQL

Non descriviamo come pianificare i DAG e altre cose che abbiamo già trattato nella Parte 1.

articoli - airflow_web_scraping

Abbiamo impostato load_examples=False in airflow.cfg in modo da semplificare l’interfaccia ed avere solo i DAG che ci interessano. La struttura del codice di base è simile alla seguente:

            import datetime as dt

from airflow import DAG
from airflow.operators.python_operator import PythonOperator


def parse_recipes():
    return 'parse_recipes'


def download_image():
    return 'download_image'


def store_data():
    return 'store_data'


default_args = {
    'owner': 'airflow',
    'start_date': dt.datetime(2020, 12, 1, 10, 00, 00),
    'concurrency': 1,
    'retries': 0
}

with DAG('parsing_recipes',
         catchup=False,
         default_args=default_args,
         schedule_interval='*/10 * * * *',
         # schedule_interval=None,
         ) as dag:
    opr_parse_recipes = PythonOperator(task_id='parse_recipes',
                                       python_callable=parse_recipes)

    opr_download_image = PythonOperator(task_id='download_image',
                                        python_callable=download_image)
    opr_store_data = PythonOperator(task_id='store_data',
                                    python_callable=store_data)

opr_parse_recipes >> opr_download_image >> opr_store_data
        

Se registriamo questo DAG eseguendo airflow scheduler, otteniamo qualcosa di simile:

articoli - airflow_web_scraping

Da notare la potenza dei flussi di lavoro. E’ possibile visualizzare lo scheletro della nostra pipeline in modo da avere un’idea del nostro flusso dati senza entrare nei dettagli. Bello, vero?

Ora, implementiamo una per una le routine di ogni task. Prima di entrare nella logica di parse_recipies, è fondamentale descrivere come i task di Airflow possono comunicare tra loro.

 

Cos’è XCOM?

XCOM fornisce i metodi per far comunicare i task tra di loro. I dati inviati da un task vengono acquisiti da un’altro task. Se impostiamo provide_context=True, il valore restituito della funzione viene inserito in XCOM che di per sé non è altro che una tabella Db. Se controlliamo airflow.db troveremo una tabella con il nome xcom, al cui interno sono presenti le voci delle istanze di attività in esecuzione.

            
def parse_recipes(**kwargs):
    return 'RETURNS parse_recipes'

        

Il primo task non prevede ha tali modifiche oltre a fornire **kwargs che consentono di condividere coppie chiave/valore. E’ comunque necessario impostare provide_context=True per ogni operatore, in modo da renderlo compatibile con XCom. Ad esempio:

            opr_parse_recipes = PythonOperator(task_id='parse_recipes',
                                python_callable=parse_recipes, provide_context=True)
        

 

Il task download_image avrà le seguenti modifiche:

            
def download_image(**kwargs):
    ti = kwargs['ti']
    v1 = ti.xcom_pull(key=None, task_ids='parse_recipes')

    print('Printing Task 1 values in Download_image')
    print(v1)
    return 'download_image'
        

Con prima riga è ti=kwargs['t1'] otteniamo i dettagli delle istanze tramite la chiave di accesso ti.

Vediamo in dettaglio perchè è necessario prevedere l’uso di queste chiavi di accesso. Se analizziamo il parametro kwargs, otteniamo una stampa simile alla seguente, dove sono presenti chiavi come t1task_instance, ecc… che contengono il valore inviato da un task:

            {
  'dag': <DAG: parsing_recipes>,
  'ds': '2020-12-02',
  'next_ds': '2020-12-02',
  'prev_ds': '2020-12-02',
  'ds_nodash': '20201202',
  'ts': '2020-12-02T09:56:05.289457+00:00',
  'ts_nodash': '20201202T095605.289457+0000',
  'yesterday_ds': '2020-12-01',
  'yesterday_ds_nodash': '20201201',
  'tomorrow_ds': '2020-12-03',
  'tomorrow_ds_nodash': '20201203',
  'END_DATE': '2020-12-02',
  'end_date': '2020-12-02',
  'dag_run': <DagRunparsing_recipes@2020-12-0209:56:05.289457+00:00:manual__2020-12-02T09:56:05.289457+00:00, externallytriggered: True>,
  'run_id': 'manual__2020-12-02T09:56:05.289457+00:00',
  'execution_date': <Pendulum[2020-12-02T09: 56: 05.289457+00:00]>,
  'prev_execution_date': datetime.datetime(2020,12,2,9,56,tzinfo=<TimezoneInfo[UTC,GMT,+00:00:00,STD]>),
  'next_execution_date': datetime.datetime(2020,12,2,9,58,tzinfo=<TimezoneInfo[UTC,GMT,+00:00:00,STD]>),
  'latest_date': '2020-12-02',
  'macros': <module'airflow.macros'from'/anaconda3/anaconda/lib/python3.6/site-packages/airflow/macros/__init__.py'>,
  'params': {},
  'tables': None,
  'task': <Task(PythonOperator): download_image>,
  'task_instance': <TaskInstance: parsing_recipes.download_image2020-12-02T09: 56: 05.289457+00:00[running]>,
  'ti': <TaskInstance: parsing_recipes.download_image2020-12-02T09: 56: 05.289457+00:00[running]>,
  'task_instance_key_str': 'parsing_recipes__download_image__20201202',
  'conf': <module'airflow.configuration'from'/anaconda3/anaconda/lib/python3.6/site-packages/airflow/configuration.py'>,
  'test_mode': False,
  'var': {
    'value': None,
    'json': None
  },
  'inlets': [],
  'outlets': [],
  'templates_dict': None
}
        

Nella riga successiva si richiama il metodo xcom_pull per inserire il valore restituito di un determinato task. Nel nostro caso l’ID task è parse_recipes:

v1 = ti.xcom_pull(key=None, task_ids='parse_recipes')

Bene, dopo aver descritto il concetto di XCOM, torniamo al nostro codice originale.

Quindi il nostro processo prevede un primo task che legge gli URL dal file di testo (possiamo anche creare un altra task che sarà responsabile solo di recuperare i collegamenti url e archiviarli in un file o DB.  Lascio questa implementazione a voi lettori! ) e quindi la list delle voci viene passata al successivo task che ha il compito di scaricare le immagini e aggiungere le informazioni del file appena scaricato in  un dict e infine memorizzarlo nel database MySQL.

Il metodo download_image appare ora come segue:

            
def download_image(**kwargs):
    local_image_file = None
    idx = 0
    records = []
    ti = kwargs['ti']

    parsed_records = ti.xcom_pull(key=None, task_ids='parse_recipes')

    for rec in parsed_records:
        idx += 1
        image_url = rec['image_url']
        r_url = rec['url']
        print('Downloading Pic# {}'.format(idx))
        local_image_file = dl_img(image_url, r_url)
        rec['local_image'] = local_image_file
        records.append(rec)

    return records
        

Non descriviamo il metodo dl_img dato che esula dallo scopo di questo post. Se vuoi puoi controllare il codice direttamente su Github. Una volta scaricato il file, aggiungiamo la chiave nel record originale.

Il metodo store_data appare ora come segue:

            
def store_data(**kwargs):
    ti = kwargs['ti']

    parsed_records = ti.xcom_pull(key=None, task_ids='download_image')
    print('PRINTING DUMPED RECORDS in STORE DATA')
    print(parsed_records)
    return 'store_data'
        

Assicuriamoci di impostare provide_context=True per l’operatore opr_store_data altrimenti si ottiene il seguente errore:

Subtask store_data KeyError: 'ti'

Ora che i dati sono disponibili., sono inviati per essere memorizzati nel database MySQL.

Per utilizzare MySQL con Airflow, utilizzeremo gli hook forniti da Airflow.

Airflow Hooks ci consente di interagire con sistemi esterni: e-mail, S3, database e vari altri.

Prima di iniziare a programmare, dobbiamo configurare una connessione MySQL.

Sull’interfaccia utente Web di Airflow, dobbiamo andare su Amministratore > Connessioni. Qui abbiamo un elenco di connessioni esistenti se ci colleghiamo a http://0.0.0.0:8080/admin/connection/

articoli - airflow_web_scraping

Modifichiamo la connessione e impostiamo il nome e la password della tabella.

articoli - airflow_web_scraping

Per utilizzare la libreria MySQL dobbiamo importare MySqlHook 

from airflow.hooks.mysql_hook import MySqlHook

            
def store_data(**kwargs):
    ti = kwargs['ti']

    parsed_records = ti.xcom_pull(key=None, task_ids='download_image')
    connection = MySqlHook(mysql_conn_id='mysql_default')
    for r in parsed_records:
        url = r['url']
        data = json.dumps(r)
        sql = 'INSERT INTO recipes(url,data) VALUES (%s,%s)'
        connection.run(sql, autocommit=True, parameters=(url, data))
    return True
        

L’SQL della tabella è riportato di seguito:

            
CREATE TABLE recipes (
	'id' int(11) UNSIGNED NOT NULL AUTO_INCREMENT,
	'url' varchar(100) NOT NULL,
	'data' text NOT NULL,
	PRIMARY KEY (`id`)
);
        

Questo è tutto. Oh, aspetta… E’ possibile informare l’amministratore se il flusso di lavoro è stato eseguito correttamente? Airflow ci consente di utilizzare EmailOperator per tale scopo.

            
opr_email = EmailOperator(
        task_id='send_email',
        to='[email protected]',
        subject='Airflow Finished',
        html_content=""" <h3>DONE</h3> """,
        dag=dag
    )
        

Prima di usarlo dobbiamo apportare modifiche a airflow.cfg, nelle impostazioni relative alla posta. C’è una sezione [smtp].

            
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = smtp.gmail.com
smtp_starttls = False
smtp_ssl = True
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = [email protected]
smtp_password = your_password
smtp_port = 465
smtp_mail_from = [email protected]
        

Conclusione

In questo post, abbiamo visto come introdurre Airflow nella  nostra architettura di scraping esistente e come utilizzare MySQL con Airflow. Ci sono varie possibilità per migliorarlo o estenderlo. Provate e fatemi sapere.

Il codice è disponibile su Github

Quando eseguiamo i web scraping, c’è un’alta probabilità di essere bloccato e non abbiamo altra scelta che aspettare e pregare di essere sbloccati. Gli indirizzi IP proxy vengono utilizzati per evitare tali circostanze.

Introduzione ad Apache Airflow

scienzadeidati articoli - Introduzione ad AirflowI

In questo post introduciamo Apache Airflow, un sistema di gestione del flusso di lavoro sviluppato da Airbnb.

Apache Airflow permette ovviamente di creare pipeline ETL per automatizzare le cose,  ma le sue funzionalità permettono di implementare molti altri scenari dove è necessario eseguire attività in un determinato ordine, almeno una volta o periodicamente. Ad esempio:

  • Monitoraggio dei Cron
  • Trasferire dati da un luogo all’altro.
  • Automatizzare le operazioni DevOps.
  • Recuperare periodicamente i dati dai siti Web ed  aggiornare un database.
  • Elaborazione dei dati per sistemi complessi.
  • Pipeline di  machine learning.

Le possibilità sono infinite.

Prima di vedere l’implementazione di Airflow nei nostri sistemi, descriviamo cosa effettivamente è Airflow e delle sue terminologie.

 

Che cos’è il Airflow?

Dal sito web:

Airflow è una piattaforma per creare, pianificare e monitorare i flussi di lavoro in modo programmatico.

Utilizza Airflow per creare flussi di lavoro come grafici aciclici diretti (DAG) di attività. Lo scheduler di Airflow esegue le tue attività su una serie di worker, seguendo le dipendenze specificate. Le ricche utilità della riga di comando semplificano l’esecuzione di interventi complessi sui DAG. La ricca interfaccia utente semplifica la visualizzazione delle pipeline in esecuzione in produzione, il monitoraggio dei progressi e la risoluzione dei problemi quando necessario.

Fondamentalmente, aiuta ad automatizzare gli script per eseguire attività. Airflow è basato su Python ma puoi eseguire un programma indipendentemente dal linguaggio. Ad esempio, la prima fase del flusso di lavoro deve eseguire un programma basato su C++ per eseguire l’analisi delle immagini e quindi un programma basato su Python per trasferire tali informazioni su S3. Le possibilità sono infinite.

 

Cos’è Dag?

Da Wikipedia

In matematica e informatica, un grafo aciclico diretto (DAG /ˈdæɡ/, è un grafo diretto finito senza cicli diretti. Cioè, consiste di un numero finito di vertici e spigoli, con ogni spigolo diretto da un vertice all’altro, in modo tale che non c’è modo di iniziare da nessun vertice v e seguire una sequenza di spigoli coerentemente diretta che alla fine torna nuovamente a v . Equivalentemente, un DAG è un grafo diretto che ha un ordinamento topologico, una sequenza di vertici tale che ogni arco è diretto da prima a dopo nella sequenza.

Proviamo a spiegarlo in parole semplici: puoi essere solo figlio di tuo padre ma non viceversa. OK, è un esempio zoppo o strano, ma non sono riuscito a trovare un esempio migliore per spiegare un ciclo diretto .

aricoli - airflow grafo

In Airflow tutti i flussi di lavoro sono DAG. Un Dag è composto da operatori. Un operatore definisce un’attività individuale che deve essere eseguita. Sono disponibili diversi tipi di operatori (come indicato sul sito Web Airflow):

  • BashOperator – esegue un comando bash
  • PythonOperator – richiama un’arbitraria funzione Python
  • EmailOperator – invia un’e-mail
  • SimpleHttpOperator – invia una richiesta HTTP
  • MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, ecc. – esegue un comando SQL
  • Sensor – attende un certo tempo, file, record di un database, chiave S3, ecc…

Possiamo anche trovare un operatore personalizzato secondo specifiche esigenze.

 

Installazione e configurazione

Ariflow è basato su Python. Il modo migliore per installarlo è tramite lo strumento pip.

pip install apache-airflow

Per verificare se è stato installato, eseguiamo il comando airflow version e otteniamo qualcosa del tipo:

            [2020-12-02 15:59:23,880] {__init__.py:51} INFO - Using executor SequentialExecutor
____________       _____________
____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
_/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
v1.10.0
You will need to install mysqlclient as well to incorporate MySQL in your workflows. It is optional though.
        

Inoltre possiamo anche installare mysqlclient per incorporare MySQL nei flussi di lavoro. È facoltativo e possiamo scegliere un diverso database, come postgresql, SQlite, ecc…

pip install mysqlclient

Prima di iniziare, è necessario creare una cartella ed impostala come AIRFLOW_HOME. Nel nostro caso è “airflow_home”. Una volta creata, eseguiamo il comando export per impostare la home nelle variabili d’ambiente.

export AIRFLOW_HOME='pwd' airflow_home

E’ necessario assicurarsi di essere nella directory padre di “airflow_home” prima di eseguire il comando export. Inoltre all’interno di “airflow_home” è necessario creare un’altra cartella per conservare i DAG, chiamata dags.

Se si imposta load_examples=False, gli esempi predefiniti non verranno caricati sull’interfaccia Web.

Ora dobbiamo eseguire il seguente comando all’interno della cartella “airflow_home”.

airflow db init

Questo comando crea i file airflow.cfg e unitests.cfg

In particolare airflow.db è un file SQLite per memorizzare tutta la configurazione relativa ai flussi di lavoro di esecuzione. airflow.cfg è quello di memorizzare tutte le impostazioni iniziali per gestire i wrokflow in esecuzione.

In questo file, è presente il parametro sql_alchemy_conn con il valore ../airflow_home/airflow.db.

Possiamo usare MySQL, ma per ora, seguiamo le impostazioni di base.

 

Airflow WebServer

Fin qui tutto bene, ora senza perdere tempo avviamo il web server.

airflow webserver

Quando si avvia il webserver si ha una stampa simile a quanto segue:

            2020-12-02 22:36:24,943] {__init__.py:51} INFO - Using executor SequentialExecutor
/anaconda3/anaconda/lib/python3.6/site-packages/airflow/bin/cli.py:1595: 
DeprecationWarning: The celeryd_concurrency option in [celery] 
has been renamed to worker_concurrency - the old setting has been used,
but please update your config. default=conf.get('celery', 'worker_concurrency')),
____________       _____________
____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
_/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
v1.10.0
        
            [2020-12-03 14:21:42,340] {__init__.py:57} INFO - Using executor SequentialExecutor
____________       _____________
____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
_/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/


/anaconda3/anaconda/lib/python3.6/site-packages/flask/exthook.py:71:
ExtDeprecationWarning: Importing flask.ext.cache is deprecated, use flask_cache instead.
.format(x=modname), ExtDeprecationWarning

[2020-12-03 14:21:43,119] [48995] {models.py:167} INFO - 
Filling up the DagBag from /Development/airflow_home/dags
Running the Gunicorn Server with:
Workers: 4 sync
Host: 0.0.0.0:8080
        

Ora, visitiamo la pagina 0.0.0.0:8080 e otteniamo la seguente schermata:

articoli - airflow webserver

Qui possiamo vedere una serie di voci. Queste sono gli esempi forniti con l’installazione Airflow. Si possono disattivarli modificando il file airflow.cfg e impostando il parametro load_examples su FALSE.

Le DAG Runs indicano quante volte è stato eseguito un determinato DAG. Recent Tasks indica quale attività tra molte attività all’interno di un DAG è attualmente in esecuzione e qual è il suo stato. Lo schedule è simile a quello che si usa quando  si pianifica un Cron, quindi al momento non ha bisogno di approfondimenti. Lo Schedule è responsabile dell’ora in cui un determinato DAG dovrebbe essere attivato.

articoli - airflow grafo view

Qui sopra veidamo lo screenshot di un DAG creato ed eseguito in precedenza. Le caselle rettangolari rappresentano un’attività (o Task). In alto a destra si possono vedere caselle di diverso colore, denominate: success, running, failed ecc. Questi sono gli stati che un DAG può assumere.

Nell’immagine sopra è possibile che tutte le caselle abbiano un bordo verde, tuttavia, se non siamo sicuri è sufficiente muovere il mouse sopra uno stato success e si evidenziano i task in quello stato, come nell’immagine seguente:

articoli - airflow grafo hover

Da notare che il colore di sfondo/riempimento di queste caselle è verde e canna. In alto a sinistra del pannello grigio, puoi vedere perché sono ci sono tali colori, questi colori di sfondo rappresentano i diversi tipi di operatori utilizzati in questo DAG. In questo caso, stiamo usando BashOperator e PythonOperator.

 

Esempio di base

Vediamo ora un esempio di base per descrivere il funzionamento di Airflow. Nella cartella dags, che è stata precedentemente creata  in airflow_home, creeremo il nostro primo DAG di esempio. In pratica si deve creare un file denominato, ad esempio,my_simple_dag.py.

La prima cosa da fare dopo le importazioni è scrivere le routine che serviranno come task per gli operatori. Useremo una miscela di BashOperator e PythonOperator.

            import datetime as dt

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator


def greet():
    print('Writing in file')
    with open('path/to/file/greet.txt', 'a+', encoding='utf8') as f:
        now = dt.datetime.now()
        t = now.strftime("%Y-%m-%d %H:%M")
        f.write(str(t) + '\n')
    return 'Greeted'
def respond():
    return 'Greet Responded Again'
        
Queste sono due semplici routine che non fanno altro che restituire un testo. Vedremo successivamente perché stiamo scrivendo qualcosa in un file di testo. La prossima cosa da fare è definire default_args e creare un’istanza DAG.
            default_args = {
    'owner': 'airflow',
    'start_date': dt.datetime(2020, 12, 03, 11, 00, 00),
    'concurrency': 1,
    'retries': 0
}
        

Nella variabile default_args si definisco una serie di parametri.

start_date indica quando questo DAG dovrebbe iniziare a eseguire il flusso di lavoro. Questo start_date potrebbe appartenere al passato. Nel mio caso, è il 3 dicembre alle 11:00 UTC. Questa data è già passata per me perché ora sono già le 11:15 UTC. Puoi sempre modificare questo parametro all’interno del file airflow.cfg e impostare il tuo fuso orario locale. Per ora, possiamo anche lasciare UTC. Nel caso in cui tu sia ancora curioso di sapere che ora viene utilizzata da Airflow, controlla in alto a destra nell’interfaccia utente Web di Airflow, dovresti vedere qualcosa di simile a quello indicato di seguito. Puoi usarlo come riferimento per pianificare le tue attività.

articoli - airflow_date

Il parametro retries tenta di eseguire il DAG X un numero di volte in caso di mancata esecuzione. Il parametro concurrency aiuta a determinare il numero di processi da utilizzare per l’esecuzione di più DAG. Per esempio, il DAG deve eseguire 4 istanze del passato, chiamati anche come Backfill , con un intervallo di 10 minuti (vedremo questo argomento complesso a breve) ed è stata impostata concurrency pari a 2, quindi 2 DAG sono lanciati nello stesso momento ed sono eseguite i task previsto in esse. Se hai già implementato multiprocessing in Python, dovresti facilmente capire il funzionamento.

            with DAG('my_simple_dag',
         default_args=default_args,
         schedule_interval='*/10 * * * *',
         ) as dag:
    opr_hello = BashOperator(task_id='say_Hi',
                             bash_command='echo "Hi!!"')

    opr_greet = PythonOperator(task_id='greet',
                               python_callable=greet)
    opr_sleep = BashOperator(task_id='sleep_me',
                             bash_command='sleep 5')

    opr_respond = PythonOperator(task_id='respond',
                                 python_callable=respond)
                                 
opr_hello >> opr_greet >> opr_sleep >> opr_respond
        

Ora usando Context Manager stiamo definendo un DAG con le sue proprietà, il primo parametro è l’ID del dag, nel nostro caso è my_simple_dag, il secondo parametro lo abbiamo già descritto, il 3° parametro è qualcosa che deve essere discusso insieme a start_date, che abbiamo menzionato in default_args.

All’interno di quel Context Manager, stiamo assegnando operatori associati all’ID del task. Nel nostro caso questi operatori etichettati come: opr_hello opr_greet opr_sleep e opr_respond. Questi nomi vengono quindi visualizzati nelle caselle rettangolari discusse in precedenza.

Prima di andare oltre, è meglio discutere i DAG Runs e lo Scheduler e quale ruolo svolgono nell’intero flusso di lavoro.

 

Che cos’è l’utilità di pianificazione di Airflow?

Airflow Scheduler è un processo di monitoraggio che viene eseguito continuamente e attiva l’esecuzione delle attività in base a schedule_interval e execution_date.

 

Cos’è il DagRun?

Un DagRun è l’istanza di un DAG che verrà eseguito di volta in volta. Quando viene eseguito, verranno eseguite tutte le attività al suo interno.

articoli - airflow_DAGruns

Supponiamo che start_date sia il 3 dicembre 2020 alle 12:00:00 UTC e che il DAG sia stato avviato alle 12:30:00 UTC con il schedule_interval di */10 * * * * (ogni 10 minuti).

Utilizzando gli stessi parametri default_args discusse sopra, di seguito sono riportate le istanze del DAG che verranno eseguite immediatamente, una alla volta dato che nostro caso concurrency è pari 1:

articoli - airflow DAG times

Perché accade questo? Siamo stati noi a deciderlo tramite i parametri di configurazione. Airflow ti offre la possibilità di eseguire i DAG passati. Il processo di esecuzione dei DAG passati si chiama Backfill. Il processo di backfilling consente effettivamente a Airflow di anticipare lo stato di tutti i DAG sin dal suo inizio. La funzione è stata fornita per scenari in cui si esegue un DAG che interroga alcuni DB o API come Google Analytics per recuperare dati precedenti e renderli parte del flusso di lavoro. Anche se non ci sono dati passati, Airflow lo eseguirà comunque per mantenere intatto lo stato dell’intero flusso di lavoro.

Una volta eseguiti i DAG precedenti, il successivo (quello che si intende eseguire) verrà eseguito alle 12:40:00 UTC. DA sottolineare che qualunque sia lo scheduler impostato, il DAG viene eseguito dopo che quell’intervallo di tempo. Nel nostro caso, se deve essere eseguito ogni 10 minuti, verrà eseguito dopo che sono trascorsi 10 minuti.

 

Esempio di Scheduler

Proviamo. Abilitiamo l’esecuzione di my_simple_dag e avviamo lo scheduler.

articoli - airflow my_simple_dag

Non appena si esegue il DAG si visualizza una schermata simile alla seguente:

articoli - airflow my_simple_dag start
Alcune delle attività sono in coda. Se facciamo clic sull’ID del DAG, my_simple_dag, vediamo una schermata come la seguente:
articoli - airflow my_simple_dag run

Da notare il timestamp nella colonna Run Id . Vediamo lo schema: Il primo è stato eseguito alle 10:00, poi alle 10:10, alle 10:20. Quindi si interrompe, vorrei chiarire ancora una volta che il DAG viene eseguito una volta trascorso il tempo di durata di 10 minuti. Lo scheduler è iniziato alle 10:30. quindi ha effettuato il backfilling di 3 runs ad intervalli di 10 minuti.

articoli - airflow my_simple_dag finish

Il DAG che è stato eseguito per le 10:30:00 UTC è stato effettivamente eseguito alle 10:40:00 UTC. L’ultimo record del DAGRun sarà sempre uno meno rispetto all’ora corrente. Nel nostro caso, l’ora della macchina era 10:40:00 UTC

articoli - airflow DAG tree

Se passiamo con il mouse su uno dei cerchi si può vedere il timestamp davanti a Run: questo indica l’ora in cui è stato eseguito. Si può notare che questi cerchi verdi hanno un intervallo di tempo di 10 minuti. La Tree View è un po’ complicata ma offre un quadro completo dell’intero flusso di lavoro. Nel nostro caso, è stato eseguito 4 volte e tutte le attività sono state eseguite correttamente, il colore verde scuro.

È possibile evitare il Backfilling in due modi: è possibile impostare start_date con una data futura o impostare catchup = False nell’istanza di DAG. Ad esempio:

            with DAG('my_simple_dag', 
                  catchup=False,
                  default_args=default_args,
                  schedule_interval='*/10 * * * *',
                  # schedule_interval=None,
                  ) as dag:
        
Quindi impostando catchup=False non importa se start_date appartiene o meno al passato. Verrà eseguito dall’ora corrente e continuerà. Impostando end_date è possibile interrompere l’esecuzione di un DAG.
            opr_hello >> opr_greet >> opr_sleep >> opr_respond
        

La riga di codice precedente indica la relazione tra gli operatori, quindi costruisce l’intero flusso di lavoro. In questo caso, L’operatore bit a bit sta indicando la relazione tra gli operatori. Quindi  opr_hello viene eseguito per primo e poi tutto il resto. Il flusso viene eseguito da sinistra a destra. In forma grafica appare come segue:

articoli - airflow_DAG treeview

Se modifichiamo la direzione dell’ultimo operatore, come segue:

            opr_hello >> opr_greet >> opr_sleep << opr_respond
        

Iil flusso sarà simile al seguente:

articoli - airflow_DAG treeview 2

Il taks respond verrà eseguito in parallelo e sleep verrà eseguito in entrambi i casi.

Conclusione

In questo post, abbiamo discusso di come è possibile introdurre un sistema di flusso di lavoro completo per pianificare e automatizzare i flussi di lavoro. Nella parte 2, presenteremo un caso reale per mostrare come è possibile utilizzare Airflow. Volevo includerlo in questo post, ma è già diventato abbastanza lungo ed era necessario spiegare il concetto DAGRun perché mi ci è voluto un po’ di tempo per capirlo.

Come sempre il codice di questo post è disponibile su Github .

FastAPI con SQLAlchemy, PostgreSQL, Alembic e Docker – Parte 2 (versione asincrona)

scienzadeidati articoli - Database con FASTAPI [Parte2]

 

Introduzione

Lo scopo di questo articolo è creare una semplice guida su come utilizzare FastAPI con database relazionali in modo asincrono e utilizzare Alembic per le migrazioni.

Prima di iniziare con questo tutorial, leggi la parte 1 di questo tutorial.

Ecco il codice funzionante completo su github.

Iniziamo

Installa il pacchetto richiesto databases.

databases è un pacchetto leggero con supporto asyncio per molti database relazionali e utilizza le principali query di sqlalchemy.

Per lo scopo di questo tutorial userò pipenv , ma puoi usare pip o poetry o conda o qualsiasi altro gestore di pacchetti che preferisci.

            pipenv install databases
pipenv install databases[postgresql]
pipenv install asyncpg
        

useremo la stessa configurazione docker descritta nell’articolo precedente.

Dockerfile

            # Pull base image
FROM python:3.7

# Set environment varibles
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1

WORKDIR /code/

# Install dependencies
RUN pip install pipenv
COPY Pipfile Pipfile.lock /code/
RUN pipenv install --system --dev

COPY . /code/

EXPOSE 8000
        

docker-compose.yml

            version: "3"

services:
  db:
    image: postgres:11
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=test_db
  web:
    build: .
    command: bash -c "uvicorn main:app --host 0.0.0.0 --port 8000 --reload"
    volumes:
      - .:/code
    ports:
      - "8000:8000"
    depends_on:
      - db

  pgadmin:
    container_name: pgadmin
    image: dpage/pgadmin4
    environment:
      - [email protected]
      - PGADMIN_DEFAULT_PASSWORD=admin
    ports:
      - "5050:80"
    depends_on:
      - db
        

manterremo schema.py così com’è

            # schema.py

from pydantic import BaseModel


class User(BaseModel):
    first_name: str
    last_name: str
    age: int

    class Config:
        orm_mode = True
        

Analogamente manteniamo lo stesso alembic.ini.

Modifichiamo il file .env come segue:

DATABASE_URL=postgresql://postgres:postgres@db:5432/postgres

ed lo utilizziamo nel file db.py dove inizializzeremo il nostro database.

            # db.py

import os
from databases import Database
from dotenv import load_dotenv
import sqlalchemy

BASE_DIR = os.path.dirname(os.path.abspath(__file__))
load_dotenv(os.path.join(BASE_DIR, ".env"))

db = Database(os.environ["DATABASE_URL"])
metadata = sqlalchemy.MetaData()
        

A questo punto dobbiamo prevedere il file app.py, dove gestiremo l’inizializzazione dell’app con la connessione e la terminazione del database.

            # app.py

from db import db
from fastapi import FastAPI


app = FastAPI(title="Async FastAPI")


@app.on_event("startup")
async def startup():
    await db.connect()


@app.on_event("shutdown")
async def shutdown():
    await db.disconnect()
        
Di conseguenza dobbiamo modificare il file model.py.
            # model.py

from db import db

users = sqlalchemy.Table(
    "users",
    metadata,
    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column("first_name", sqlalchemy.String),
    sqlalchemy.Column("last_name", sqlalchemy.String),
    sqlalchemy.Column("age", sqlalchemy.Integer),
)
        
miglioriamo il nostro model.py, creando una semplice classe di gestione del modello User
            # model.py

import sqlalchemy
from db import db, metadata, sqlalchemy


users = sqlalchemy.Table(
    "users",
    metadata,
    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column("first_name", sqlalchemy.String),
    sqlalchemy.Column("last_name", sqlalchemy.String),
    sqlalchemy.Column("age", sqlalchemy.Integer),
)


class User:
    @classmethod
    async def get(cls, id):
        query = users.select().where(users.c.id == id)
        user = await db.fetch_one(query)
        return user

    @classmethod
    async def create(cls, **user):
        query = users.insert().values(**user)
        user_id = await db.execute(query)
        return user_id
        

Questa classe fornirà un’implementazione più semplice per i metodi get e create.

Di conseguenza dobbiamo modificare il file main.py come segue.

            # main.py

import uvicorn
from models import User as ModelUser
from schema import User as SchemaUser
from app import app
from db import db


@app.post("/user/")
async def create_user(user: SchemaUser):
    user_id = await ModelUser.create(**user.dict())
    return {"user_id": user_id}


@app.get("/user/{id}", response_model=SchemaUser)
async def get_user(id: int):
    user = await ModelUser.get(id)
    return SchemaUser(**user).dict()


if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)
        

Da notare come ora stiamo usando async/await per gestire le chiamate verso il database.

È tempo quindi di modificare la configurazione del nostro Alembic.

Bisogna modificare
import models

target_metadata = models.Base.metadata

in
import models
from db import metadata

target_metadata = metadata

 

NOTA: è importante importare i modelli prima dei metadati.

 

Quindi procediamo a ricompilare il nostro Docker:

  • Costruzione: docker-compose build
  • Creazioni migrazioni: docker-compose run web alembic revision --autogenerate
  • Migrazione: docker-compose run web alembic upgrade head
  • Esecuzione: docker-compose up

Ora aprendo il browser e collegarsi a http://localhost:8000

articoli - pgadmin localhost 7
richiesta POST per creare un utente
articoli - pgadmin localhost 8
risposta alla richiesta precedente
articoli - pgadmin localhost 9
richiesta GET dello stesso utente con risposta

Spero che questo tutorial sia stato abbastanza completo su come utilizzare FastAPI con PostgreSQL, SQLAlchemy e Alembic utilizzando la potenza di async .

Il codice completo per questo articolo è disponibile su github.

FastAPI con SQLAlchemy, PostgreSQL e Alembic e ovviamente Docker – Parte 1

scienzadeidati articoli - Database con FASTAPI

La guida completa (tutorial) all’utilizzo dei database relazionali con FastAPI

Introduzione

Lo scopo di questo articolo è creare una semplice guida su come utilizzare FastAPI con i database relazionali e utilizzare Alembic per le migrazioni. Un’implementazione che può essere utilizzata in produzione.

Installazione

Useremo pipenv per gestire sia i miei pacchetti che l’ambiente virtuale. Sentiti libero di gestire i tuoi pacchetti come preferisci.

Pacchetti Utilizzati
  • python ≥ 3.5
  • fastapi
  • pydantic
  • fastapi-sqlalchemy
  • alembic
  • psycopg2
  • uvicorn

Creiamo una nuova directory (puoi chiamarla come vuoi).

Ad esempio possiamo chiamarla fastapi_sqlalchemy_alembic

Apri il terminale e scrivi

cd fastapi_sqlalchemy_alembic

pipenv install --three fastapi fastapi-sqlalchemy pydantic alembic psycopg2 uvicorn

Utilizzerò docker compose per gestire il database, potresti ricevere alcuni errori relativi all’installazione di psycopg2 se stai utilizzando Mac-OS, ma dal momento che utilizzeremo Docker, questo non è molto importante.

Main.py

Iniziamo con un semplice file principale per Fastapi

            # main.py

import uvicorn
from fastapi import FastAPI

app = FastAPI()

@app.post("/user/", response_model=User)
def create_user(user: User):
    return user

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)
    
        

Configurazione di Docker

            # Dockerfile

# Pull base image
FROM python:3.7

# Set environment varibles
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1

WORKDIR /code/

# Install dependencies
RUN pip install pipenv
COPY Pipfile Pipfile.lock /code/
RUN pipenv install --system --dev

COPY . /code/

EXPOSE 8000
        
            # docker-compose.yml

version: "3"

services:
  db:
    image: postgres:11
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=test_db
  web:
    build: .
    command: bash -c "alembic upgrade head && uvicorn main:app --host 0.0.0.0 --port 8000 --reload"
    volumes:
      - .:/code
    ports:
      - "8000:8000"
    depends_on:
      - db

  pgadmin:
    container_name: pgadmin
    image: dpage/pgadmin4
    environment:
      - [email protected]
      - PGADMIN_DEFAULT_PASSWORD=admin
    ports:
      - "5050:80"
    depends_on:
      - db
        

La configurazione precedente creerà un cluster con 3 contenitori:

  1. contenitore web — dove verrà eseguito il codice effettivo
  2. contenitore db
  3. contenitore pgadmin

Nella tua directory corrente dovresti vedere 5 file:

  1. file pip
  2. Pipfile.lock
  3. Dockerfile
  4. docker-compose.yml
  5. main.py

Quindi costruiamo il cluster docker, eseguendo il seguente comando nel terminale:

docker-compose build

 

Alembic

Si inizializza Alembic eseguendo il seguente cmd nel terminale della stessa directory:

alembic init alembic

In questo modo si crea una directory chiamata alembic e un file di configurazione alembic.ini

articoli - alembic

Il prossimo passo è aprire il file alembic.ini con il tuo editor e modificare la riga 38 da:

sqlalchemy.url = driver://user:pass@localhost/dbname

a:

sqlalchemy.url =

e quindi aggiungere l’url db postgres nel file alembic/env.py.

Dato che stiamo creando una configurazione che dovrebbe funzionare in produzione, non possiamo scrivere in chiaro il nome utente e password del database all’interno del file alembic.ini. Dobbiamo invece leggerlo dalle variabili d’ambiente tramite lo script alembic/env.py.

 
Installiamo python-dotenv

pipenv install python-dotenv

Dal momento che abbiamo aggiunto un nuovo pacchetto, ricostruiamo il docker per includerlo:

docker-compose build

 
Creiamo un .envfile

e aggiungiamo quanto segue:

DATABASE_URL = postgresql+psycopg2://postgres:postgres@db:5432

Come abbiamo scoperto l’URL del database?

DATABASE_URL = postgresql+psycopg2://{utente}:{password}@{host}:{porta}

se controlliamo la configurazione docker-compose.yml per il database:

            ...
db:
    image: postgres:11
    ports:
        - "5432:5432"
    environment:
        - POSTGRES_USER=postgres
        - POSTGRES_PASSWORD=postgres
        - POSTGRES_DB=test_db

...

        

Vediamo come user=postgres, password=postgres e poiché siamo nel mondo docker, l’host del database non sarà localhost ma il nome del contenitore, nel nostro caso lo abbiamo chiamato db.

Quindi aggiungiamo questa riga al nostro .env:

DATABASE_URL = postgresql+psycopg2://postgres:postgres@db:5432

Apriamo alembic\env.py , che dovrebbe apparire come segue:

            from logging.config import fileConfig

from sqlalchemy import engine_from_config
from sqlalchemy import pool

from alembic import context

# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config

# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)

# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = None
        

Dobbiamo quindi apportare le seguenti modifiche:

            from logging.config import fileConfig

from sqlalchemy import engine_from_config
from sqlalchemy import poolfrom alembic import context
# ---------------- added code here -------------------------#
import os, sys
from dotenv import load_dotenv

BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
load_dotenv(os.path.join(BASE_DIR, ".env"))
sys.path.append(BASE_DIR)
#------------------------------------------------------------#
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# ---------------- added code here -------------------------#
# this will overwrite the ini-file sqlalchemy.url path
# with the path given in the config of the main code
config.set_main_option("sqlalchemy.url", os.environ["DATABASE_URL"])
#------------------------------------------------------------#
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)

# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
# ---------------- added code here -------------------------#
import models
#------------------------------------------------------------#
# ---------------- changed code here -------------------------#
# here target_metadata was equal to None
target_metadata = models.Base.metadata
#------------------------------------------------------------#

        

Modelli

Ora creiamo i nostri modelli da migrare a PostgreSQL:

            # models.py

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String

Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True, index=True)
    first_name = Column(String,)
    last_name = Column(String)
    age = Column(Integer)
        

Quindi è necessario generare la revisione per la nostra prima migrazione:

docker-compose run web alembic revision --autogenerate -m "First migration"

articoli - alembic update

Se il comando è stato eseguito correttamente dovresti vedere un nuovo file generato nella directory “versions”:

articoli - alembic2

Infine possiamo eseguire la migrazione:

docker-compose run web alembic upgrade head

articoli - alembic update2

Pgadmin

Per controllare le migrazioni create è sufficiente eseguire nel terminale il seguente comando:

docker-compose up

ed aspettare un po’, ci vuole un po’ di tempo per caricare. 

A caricamento concluso, si apre un browser all’indirizzo localhost:5050 e si può accedere con [email protected] e password=admin, come da impostazione definita nel nostro docker-compose.yml

            ...
pgadmin:
    container_name: pgadmin
    image: dpage/pgadmin4
    environment:
      - [email protected]
      - PGADMIN_DEFAULT_PASSWORD=admin
    ports:
      - "5050:80"
    depends_on:
      - db
...
        
articoli - pgadmin localhost

Una volta entrati, è necessario creare una nuova connessione. Alla voce “General” si deve scegliere un nome per la connessione.

Alla voce “Connection” bisogna inserire i riferimenti e le credenziali per connettersi al database (la password è postgres)

articoli - pgadmin localhost 2

Navigamio fino a trovare la nostra tabella User.

Servers > {your-connection-name} > Databases > postgres > Schemas > public > Tables > users

articoli - pgadmin localhost 3
articoli - pgadmin localhost 4

Ora possiamo sicuramente dire che la nostra migrazione si è conclusa con successo.

Finore abbiamo implementato completamente l’ORM con le migrazioni Alembic. Il prossimo passo è collegarlo allo schema Pydantic.

 

Schema — Modello Pydantic

            # schema.py

from pydantic import BaseModel

class User(BaseModel):
    first_name: str
    last_name: str = None
    age: int
    class Config:
        orm_mode = True
        

Si noti che abbiamo una classe Config in cui impostiamo orm_mode=True ed è tutto ciò di cui abbiamo bisogno per i modelli Pydantic, senza i quali gli oggetti del modello Sqlalchemy non verranno serializzati su JSON.

Connettiamo tutto all’interno di main.py

            import uvicorn
from fastapi import FastAPI

#--------------- added code ------------------------#
import os
from fastapi_sqlalchemy import DBSessionMiddleware
from fastapi_sqlalchemy import db
from models import User as ModelUser
from schema import User as SchemaUser
from dotenv import load_dotenv

BASE_DIR = os.path.dirname(os.path.abspath(__file__))
load_dotenv(os.path.join(BASE_DIR, ".env"))
#---------------------------------------------------#

app = FastAPI()
#--------------- added code ------------------------#
app.add_middleware(DBSessionMiddleware, 
                    db_url=os.environ["DATABASE_URL"])
#---------------------------------------------------#
#--------------- modified code ---------------------#
@app.post("/user/", response_model=SchemaUser)
def create_user(user: SchemaUser):
    db_user = ModelUser(first_name=user.first_name, 
                        last_name=user.last_name, 
                        age=user.age
    )
    db.session.add(db_user)
    db.session.commit()
    return db_user
#---------------------------------------------------#

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)
        

Fantastico, eseguiamo di nuovo docker-compose up

Ora andiamo a \docs e invochiamo l’endpoint Create User

articoli - pgadmin localhost 5a

Possiamo quindi controllare su pgadmin se ha funzionato correttamente.

Colleghiamoci a localhost:5050

articoli - pgadmin localhost 6a

Spero che questo tutorial sia stato abbastanza completo su come utilizzare FastAPI con PostgreSQL, SQLAlchemy e Alembic.

Il codice completo di questo articolo è disponibile su github.

Nella Parte 2 discuteremo come lavorare con i database in modo asincrono.

Grafici delle Serie Temporali con Dash, Flask, TimescaleDB e Docker – Parte 3

scienzadeidati articoli - Grafici delle Serie Temporali con Dash, Flask, TimescaleDB, and Docker - Parte 3

Questo è il terzo articolo della serie su TimescaleDB, Flask e Dash.

Il primo articolo si è concentrato su come configurare il database TimescaleDB all’interno di un’istanza Docker, insieme a PgAdmin per gestire il database.

La seconda parte si è concentrata sul linguaggio Python, creando un sito Web con Flask e quindi integrandolo il framework Web Dash all’interno di Flask.

Questa terza parte si concentra sull’utilizzo di Dash per creare un’applicazione Web reattiva a pagina singola per visualizzare i dati all’interno del database TimescaleDB tramite grafici Plotly accattivanti e interattivi .

Tutto il codice per questo tutorial può essere trovato qui su GitHub.

Parte 3 - Grafici interattivi con Dash per creare un'applicazione di data science

Nella seconda parte di questa serie abbiamo creato un’istanza Dash con il file dash_setup.py, che prevede il seguente codice:

            # /app/dash_setup.py

import dash
from flask.helpers import get_root_path


def register_dashapps(app):
    """
    Register Dash apps with the Flask app
    """

    # external Bootstrap CSS stylesheets
    external_stylesheets = [
        'https://cdn.jsdelivr.net/npm/[email protected]/dist/css/bootstrap.min.css'
    ]
    
    # external Bootstrap JavaScript files
    external_scripts = [
        "https://code.jquery.com/jquery-3.5.1.slim.min.js",
        "https://cdn.jsdelivr.net/npm/[email protected]/dist/umd/popper.min.js",
        "https://cdn.jsdelivr.net/npm/[email protected]/dist/js/bootstrap.min.js",
    ]

    # To ensure proper rendering and touch zooming for all devices, add the responsive viewport meta tag
    meta_viewport = [{
        "name": "viewport", 
        "content": "width=device-width, initial-scale=1, shrink-to-fit=no"
    }]

    dashapp = dash.Dash(
        __name__,
        # This is where the Flask app gets appointed as the server for the Dash app
        server = app,
        url_base_pathname = '/dash/',
        # Separate assets folder in "static_dash" (optional)
        assets_folder = get_root_path(__name__) + '/static_dash/', 
        meta_tags = meta_viewport, 
        external_scripts = external_scripts,
        external_stylesheets = external_stylesheets
    )
    dashapp.title = 'Dash Charts in Single-Page Application'

    # Some of these imports should be inside this function so that other Flask
    # stuff gets loaded first, since some of the below imports reference the other
    # Flask stuff, creating circular references 
    from app.dashapp.layout import get_layout
    from app.dashapp.callbacks import register_callbacks

    with app.app_context():

        # Assign the get_layout function without calling it yet
        dashapp.layout = get_layout

        # Register callbacks
        # Layout must be assigned above, before callbacks
        register_callbacks(dashapp)

    return None

        

In questo codice si utilizzano le funzioni get_layout e register_callback che sono importati da specifici moduli.

Iniziamo quindi a descrive i due moduli dashapp.layout e dashapp.callbacks

            # /app/dashapp/layout.py

import os

from flask import url_for
import dash_html_components as html
import dash_core_components as dcc
import dash_bootstrap_components as dbc
from psycopg2.extras import RealDictCursor

# Local imports
from app.database import get_conn


def get_navbar():
    """Get a Bootstrap 4 navigation bar for our single-page application's HTML layout"""

    return dbc.NavbarSimple(
        children=[
            dbc.NavItem(dbc.NavLink("Blog", href="https://mccarthysean.dev")),
            dbc.NavItem(dbc.NavLink("IJACK", href="https://myijack.com")),
            dbc.DropdownMenu(
                children=[
                    dbc.DropdownMenuItem("References", header=True),
                    dbc.DropdownMenuItem("Dash", href="https://dash.plotly.com/"),
                    dbc.DropdownMenuItem("Dash Bootstrap Components", href="https://dash-bootstrap-components.opensource.faculty.ai/"),
                    dbc.DropdownMenuItem("Testdriven", href="https://testdriven.io/"),
                ],
                nav=True,
                in_navbar=True,
                label="Links",
            ),
        ],
        brand="Home",
        brand_href="/",
        color="dark",
        dark=True,
    )


def get_sensor_types():
    """Get a list of different types of sensors"""
    sql = """
        --Get the labels and underlying values for the dropdown menu "children"
        SELECT 
            distinct 
            type as label, 
            type as value
        FROM sensors;
    """
    conn = get_conn()
    with conn.cursor(cursor_factory=RealDictCursor) as cursor:
        cursor.execute(sql)
        # types is a list of dictionaries that looks like this, for example:
        # [{'label': 'a', 'value': 'a'}]
        types = cursor.fetchall()
    
    return types


def get_body():
    """Get the body of the layout for our Dash SPA"""

    types = get_sensor_types()

    # The layout starts with a Bootstrap row, containing a Bootstrap column
    return dbc.Row(
        [
            # 1st column and dropdown (NOT empty at first)
            dbc.Col(
                [
                    html.Label('Types of Sensors', style={'margin-top': '1.5em'}),
                    dcc.Dropdown(
                        options=types,
                        value=types[0]['value'],
                        id="types_dropdown"
                    )
                ], xs=12, sm=6, md=4
            ),
            # 2nd column and dropdown (empty at first)
            dbc.Col(
                [
                    html.Label('Locations of Sensors', style={'margin-top': '1.5em'}),
                    dcc.Dropdown(
                        # options=None,
                        # value=None,
                        id="locations_dropdown"
                    )
                ], xs=12, sm=6, md=4
            ),
            # 3rd column and dropdown (empty at first)
            dbc.Col(
                [
                    html.Label('Sensors', style={'margin-top': '1.5em'}),
                    dcc.Dropdown(
                        # options=None,
                        # value=None,
                        id="sensors_dropdown"
                    )
                ], xs=12, sm=6, md=4
            ),
        ]
    )


def get_layout():
    """Function to get Dash's "HTML" layout"""

    # A Bootstrap 4 container holds the rest of the layout
    return dbc.Container(
        [
            get_navbar(),
            get_body(), 
        ], 
    )

        

Lavoriamo dal basso verso l’alto, iniziando con get_layout(). Abbiamo aggiunto get_body() sotto la funzione navbar.

Ecco come appare ora il semplice sito Flask nel browser:

timescale-dash-flask-click-here-to-see-the-main-dash-page

Fare clic sul collegamento per visualizzare il sito Dash su /dash/:

NOTA: In questa fase il secondo e il terzo menu a discesa saranno vuoti. È stato popolato solo il primo menu a discesa. Per il secondo e il terzo menu a discesa, utilizzeremo i callback di Dash invece di popolarli nel layout iniziale. Rimanete sintonizzati…

timescale-dash-flask-navbar-and-body

Visualizza il sito in development mode

Ho saltato un passaggio importante: come avviare un sito Flask / Dash in modalità “sviluppo” in modo da poterlo visualizzare nel browser.

Aggiungiamo un file chiamato .flaskenv accanto al file .env nella cartella principale del progetto, con le seguenti tre righe:

            FLASK_APP=wsgi.py
FLASK_RUN_HOST=0.0.0.0
FLASK_RUN_PORT=5002
        

Flask cerca il file .flaskenv quando si avvia il sito. Le variabili d’ambiente specificano il file di partenza e le coordinate (indirizzo web o IP) dove sarà pubblicato il sito (ad esempio si inizializza una  FLASK_APP che è stata importata e creata da wsgi.py ed pubblicata sull’host 0.0.0.0 e porta 5002).

Quindi, una volta creato il file .flaskenv, si digita flask run e sul terminale e il sito sarà pronto. Si può visualizzare il sito collegandosi a http:// localhost:5002 da qualsiasi browser.

Tornando al codice

Tornando al file layout.py, la funzione get_body() restituisce un Bootstrap row e tre Boostrap, columns uno per ciascuno dei menu a discesa di Dash.

Per ora concentriamoci sulla prima colonna, vediamo che è presente un html.Label Dash HTML Component “, seguito da un dcc.Dropdown Dash Core Component“. Stiamo creato creando un Bootstrap HTML/CSS/JS solamente usando Python! Questa è la bellezza di Dash, ed è molto conveniente per i data scientist che cercano di produrre i propri modelli e dati.

La variabile types proviene dalla funzione get_sensor_types(), che interroga il nostro database TimescaleDB e restituisce i tipi di sensori specifici/univoci in un “elenco di dizionari”. Questo è reso possibile tramite cursor_factory=RealDictCursor (cioè restituisce le righe del database come comodi dizionari Python).

Molti Pythonistas amano interrogare i loro database usando SQLAlchemy , e anch’io lo uso molto spesso, anche se a volte ho voglia di solo sporcarmi le mani e scrivere un po’ di SQL, come ai vecchi tempi. La libreria psycopg2 ci consente di farlo molto facilmente. Entrambe sono ottime librerie, ben mantenute e testate sul campo.

Le altre due colonne e query nel nostro layout sono sostanzialmente le stesse della prima, quindi non le descriverò individualmente.

Callback in Dash

È ora di descrivere la logica dei callback, così possiamo far effettivamente funzionare il secondo e il terzo menu a discesa. A questo punto, nel codice del file layout.py il secondo e il terzo elenco a discesa dovrebbero essere vuoti.

Finalmente, quello che stavamo tutti aspettando: un po’ di divertente interattività in Dash! Di seguito il codice del file callbacks.pyQui è dove popoleremo il 2 ° e il 3 ° menu a discesa, in base al valore del primo menu a discesa:

            # /app/dashapp/callbacks.py

import dash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input, Output, State
from psycopg2.extras import RealDictCursor

# Local imports
from app.database import get_conn


def get_sensor_locations(type_):
    """Get a list of different locations of sensors"""
    sql = f"""
        --Get the labels and underlying values for the dropdown menu "children"
        SELECT 
            distinct 
            location as label, 
            location as value
        FROM sensors
        WHERE type = '{type_}';
    """
    conn = get_conn()
    with conn.cursor(cursor_factory=RealDictCursor) as cursor:
        cursor.execute(sql)
        # locations is a list of dictionaries that looks like this, for example:
        # [{'label': 'floor', 'value': 'floor'}]
        locations = cursor.fetchall()
    
    return locations


def get_sensors(type_, location):
    """
    Get a list of sensor dictionaries from our TimescaleDB database, 
    along with lists of distinct sensor types and locations
    """
    sql = f"""
        --Get the labels and underlying values for the dropdown menu "children"
        SELECT 
            location || ' - ' || type as label,
            id as value
        FROM sensors
        WHERE 
            type = '{type_}'
            and location = '{location}';
    """
    conn = get_conn()
    with conn.cursor(cursor_factory=RealDictCursor) as cursor:
        cursor.execute(sql)
        # sensors is a list of dictionaries that looks like this, for example:
        # [{'label': 'floor - a', 'value': 1}]
        sensors = cursor.fetchall()

    return sensors


def register_callbacks(dash_app):
    """Register the callback functions for the Dash app, within the Flask app""" 

    @dash_app.callback(
        [
            Output("locations_dropdown", "options"),
            Output("locations_dropdown", "value")
        ],
        [
            Input("types_dropdown", "value")
        ]
    )
    def get_locations_from_types(types_dropdown_value):
        """Get the location options, based on the type of sensor chosen"""

        # First get the location options (i.e. a list of dictionaries)
        location_options = get_sensor_locations(types_dropdown_value)

        # Default to the first item in the list, 
        # and get the "value" from the dictionary
        location_value = location_options[0]["value"]

        return location_options, location_value


    @dash_app.callback(
        [
            Output("sensors_dropdown", "options"),
            Output("sensors_dropdown", "value")
        ],
        [
            Input("types_dropdown", "value"),
            Input("locations_dropdown", "value")
        ]
    )
    def get_sensors_from_locations_and_types(types_dropdown_value, locations_dropdown_value):
        """Get the sensors available, based on both the location and type of sensor chosen"""

        # First get the sensor options (i.e. a list of dictionaries)
        sensor_options = get_sensors(types_dropdown_value, locations_dropdown_value)

        # Default to the first item in the list, 
        # and get the "value" from the dictionary
        sensor_value = sensor_options[0]["value"]

        return sensor_options, sensor_value


    return None

        

Ricordiamoci che la funzione register_callbacks(dash_app) è usata all’interno di dash_setup.py.

Concentriamoci sulla prima funzione di callback. Tutti i callback di Dash hanno funzioni Input() che li attivano e funzioni Output(), che sono gli elementi HTML che sono modificati dai callback.

Il primo parametro nelle funzioni Input()Output() è sempre l’id= dell’elemento, all’interno del file layout.py. Il secondo parametro è sempre la proprietà che stiamo cercando di modificare. Quindi, nella funzione di callback stiamo utilizzando il “valore” del menu a discesa “types_dropdown” come input per la funzione, e stiamo modificando sia le “opzioni” che il “valore” selezionato del menu a discesa “locations_dropdown”.

La seconda callback è solo leggermente più complicata della prima. Utilizza i valori di entrambi i primi due menu a discesa (ovvero il menu a discesa dei tipi e il menu a discesa delle posizioni) per filtrare le opzioni del sensore e il valore selezionato.

Inoltre la funzione get_sensors(type_, location) prevede una delle nostre query al database, che acquisisce i sensori unici corrispondenti ai filtri sul tipo e sulla posizione dei sensori. L’altra query del database è quasi identica, ad eccezione che prevede un solo filtro in base al tipo di sensore.

Implementiamo ora un bel grafico delle serie temporali, in modo da visualizzare i dati nel tempo del sensore selezionato tramite i filtri.

Aggiungiamo la seguente funzione get_chart_row() al tuo file layout.py, nella parte inferiore. Questo ci fornisce una riga / colonna Bootstrap in cui posizionare un grafico Dash tramite una callback:

            ...

def get_chart_row(): # NEW
    """Create a row and column for our Plotly/Dash time series chart"""

    return dbc.Row(
        dbc.Col(
            id="time_series_chart_col"
        )
    )


def get_layout():
    """Function to get Dash's "HTML" layout"""

    # A Bootstrap 4 container holds the rest of the layout
    return dbc.Container(
        [
            get_navbar(),
            get_body(), 
            get_chart_row(), # NEW
        ], 
    )

        

Vediamo ora come creare il grafico tramite le callback.

Innanzitutto assicuriamoci di aggiungere i seguenti import nel file callbacks.py e installiamole nel nostro ambiente virtuale con il comando pip install pandas plotly:

            import pandas as pd
import plotly.graph_objs as go

        

A questo punto dobbiamo aggiungere la query per acquisire i dati delle serie temporali anche nel file callbacks.py. Da notare che questa volta inseriamo i dati estratti all’interno di un DataFrame Pandas. Pandas è una libreria essenziale per i data scientist che lavorano con Python.

In poche parole, un Pandas DataFrame è una tabella composta da un array NumPy per ogni colonna. Descrivere Pandas e Numpy non è lo scopo di questo articolo, ma è importante ricordare che queste librerie sono estremamente ottimizzati per il lavoro in ambito della data science.

            def get_sensor_time_series_data(sensor_id):
    """Get the time series data in a Pandas DataFrame, for the sensor chosen in the dropdown"""

    sql = f"""
        SELECT 
            --Get the 3-hour average instead of every single data point
            time_bucket('03:00:00'::interval, time) as time,
            sensor_id,
            avg(temperature) as temperature,
            avg(cpu) as cpu
        FROM sensor_data
        WHERE sensor_id = {sensor_id}
        GROUP BY 
            time_bucket('03:00:00'::interval, time), 
            sensor_id
        ORDER BY 
            time_bucket('03:00:00'::interval, time), 
            sensor_id;
    """

    conn = get_conn()
    with conn.cursor(cursor_factory=RealDictCursor) as cursor:
        cursor.execute(sql)
        rows = cursor.fetchall()
        columns = [str.lower(x[0]) for x in cursor.description]

    df = pd.DataFrame(rows, columns=columns)
    
    return df

        

Infine, dobbiamo aggiungere il grafico Dash / Plotly. 

A tale scopo dobbiamo aggiungere una callback alla funzione register_callbacks(dash_app)Da notare che la callback aggiorna la proprietà “children” della colonna Bootstrap con ID “time_series_chart_col”.

Il callback utilizza un Input() e due State() come parametri della funzione. La differenza tra Input() State() consiste che la funzione viene chiamata solo se ci sono modifiche a Input(). Se la State() cambia, la funzione non viene chiamata, ma abbiamo ancora ha disposizione i loro valori all’interno della funzione.

Successivamente, prendiamo i dati della serie temporale (in un DataFrame, come accennato in precedenza) e consideriamo la colonna time per l’asse x. Aggiungiamo due grafici a linee (grafici a dispersione collegati da linee), quindi preleviamo due colonne dal dataframe, “temperatura” e “cpu”. Aggiungi anche una variabile per il titolo dei grafici.

Quindi creiamo due oggetti grafici Plotly (ad esempio go.Scatter) per i due grafici e li passiamo in input alla funzione get_graph() (che descriviamo tra poco).

Infine, la funzione restituisce un semplice HTML div con due righe Bootstrap (una riga Bootstrap deve contenere sempre almeno una colonna).

              @dash_app.callback(
        Output("time_series_chart_col", "children"),
        [Input("sensors_dropdown", "value")],
        [
            State("types_dropdown", "value"),
            State("locations_dropdown", "value")
        ]
    )
    def get_time_series_chart(
        sensors_dropdown_value, 
        types_dropdown_value, 
        locations_dropdown_value
    ):
        """Get the sensors available, based on both the location and type of sensor chosen"""

        df = get_sensor_time_series_data(sensors_dropdown_value)
        x = df["time"]
        y1 = df["temperature"]
        y2 = df["cpu"]
        title = f"Location: {locations_dropdown_value} - Type: {types_dropdown_value}"

        trace1 = go.Scatter(
            x=x,
            y=y1,
            name="Temp"
        )
        trace2 = go.Scatter(
            x=x,
            y=y2,
            name="CPU"
        )

        # Create two graphs using the traces above
        graph1 = get_graph(trace1, f"Temperature for {title}")
        graph2 = get_graph(trace2, f"CPU for {title}")

        return html.Div(
            [
                dbc.Row(dbc.Col(graph1)),
                dbc.Row(dbc.Col(graph2)),
            ]
        )
        

A questo punto non ci rimane che aggiungere la funzione la get_graph(). I grafici Plotly / Dash hanno molte opzioni, ma non lasciarti scoraggiare; sono facile da capire.

Il primo parametro disabilita il logo Plotly (totalmente opzionale). Successivamente nell’oggetto grafico c’è il parametro figure, che contiene due sottoparametri principali, datalayout. Ecco il riferimento completo alle API , che è una risorsa essenziale se stai lavorando con i grafici Plotly.

In layout, aggiungiamo alcune opzioni per il xaxis in modo da inserire alcuni rapidi filtri dato che stiamo costruendo un grafico di serie temporali. Di solito scelgo i pulsanti rangeselector sopra il grafico o il rangeslider sotto il grafico, ma in questo esempio li usiamo entrambi per mostrare le potenzialità di ploty. Preferisco i pulsanti rangeselector sopra il grafico. Se ti piace il trading di azioni, saprai che questi filtri temporali sono abbastanza comuni e intuitivi.

 

            def get_graph(trace, title):
    """Get a Plotly Graph object for Dash"""
    
    return dcc.Graph(
        # Disable the ModeBar with the Plotly logo and other buttons
        config=dict(
            displayModeBar=False
        ),
        figure=go.Figure(
            data=[trace],
            layout=go.Layout(
                title=title,
                plot_bgcolor="white",
                xaxis=dict(
                    autorange=True,
                    # Time-filtering buttons above chart
                    rangeselector=dict(
                        buttons=list([
                            dict(count=1,
                                label="1d",
                                step="day",
                                stepmode="backward"),
                            dict(count=7,
                                label="7d",
                                step="day",
                                stepmode="backward"),
                            dict(count=1,
                                label="1m",
                                step="month",
                                stepmode="backward"),
                            dict(step="all")
                        ])
                    ),
                    type = "date",
                    # Alternative time filter slider
                    rangeslider = dict(
                        visible = True
                    )
                )
            )
        )
    )

        

Abbiamo concluso l’implementazione delle callback e delle funzioni grafiche.

Avviamo la nostra applicazione, i grafici avranno il seguente aspetto:

timescale-dash-flask-finished-product

Questo è tutto! Abbiamo approfondito molti aspetti in questo tutorial diviso in tre parti.

Nella prima parte , abbiamo creato un database TimescaleDB e lo abbiamo popolato con i dati simulati delle serie temporali di sensori IoT. Abbiamo anche installato un’app Web PGAdmin per l’amministrazione del nostro database ed entrambe le applicazioni sono state distribuite utilizzando Docker-Compose, uno strumento fantastico per riprodurre facilmente ambienti e implementazioni.

Nella seconda parte, abbiamo combinato un’app Web Flask con un’app Web Dash in modo da poter avere il meglio da entrambe le librerie: Flask può praticamente implementare qualsiasi applicazione Web e Dash è ottimo per la produzione di app a pagina singola di data science senza bisogno di utilizzare JavaScript o React.

In questa terza parte, abbiamo descritto le funzioni di Dash, in modo particolare i callback interattivi del menu a discesa e i grafici Plotly per dare un assaggio di ciò che è possibile fare con Dash.

Spero che questa serie ti sia piaciuta.

A presto!