Creare un’API di dati azionari utilizzando il Web Scraping e FastAPI

scienzadeidati articoli - Creare API di dati finanziari utilizzando Web Scraping e FastAPI

Per la stragrande maggioranza delle API REST Python che ho creato, ho usato Flask o Django. Questi due framework hanno costituito le fondamenta delle API REST di Python da ormai molti anni. Di recente, tuttavia, sono emersi alcuni nuovi strumenti. Uno di questi framework ha suscitato un notevole clamore: FastAPI. In questo articolo esploreremo  alcune funzionalità di questo strumento.


Il progetto

La prima cosa da fare è elaborare un nuovo progetto. Volevo trovare un progetto che fosse interessante ma semplice, un progetto che mi permettesse di sperimentare il flusso di lavoro di FastAPI evitando dettagli confusi. Dopo averci pensato un po’, ho deciso di creare un’API REST per la finanza.

Questo progetto sarà composto da due parti:

  • Un web scraper per ottenere dati finanziari e,
  • Un’API REST che offre l’accesso ai dati.

Pertanto, ho diviso questo articolo in due. Sentiti libero di saltare intorno alle parti che trovi più interessanti.


Il setup

Prima di iniziare, dovremmo occuparci della parte più divertente dell’avvio di un nuovo progetto: la creazione del nostro ambiente. Sì, che divertimento!

Useremo alcune librerie, quindi consiglio di creare un nuovo ambiente virtuale.
Ecco un elenco di dipendenze che utilizzeremo:

  • FastAPI (ovviamente è nel titolo!)
  • Uvicorn
  • Requests
  • BeautifulSoup4

Possiamo eseguire un comando per installarli tutti in una volta:

            pip install fastapi uvicorn requests beautifulsoup4
        

Per fortuna, questo è sufficiente per la configurazione. Immergiamoci subito nel codice!

Parte I: Il Web Scraper

Quando costruiamo un web scraper, dobbiamo prima di tutto rispondere a tre domande: dove , cosa e come.

Dove?

Esistono diverse risorse online dove è possibile trovare dati finanziari. Uno di questi è Yahoo Finance. Questo sito Web ha una pletora di dati finanziari organizzati per azienda e quindi per categoria. Useremo Yahoo Finance come obiettivo del nostro scraping.

Cosa?

Per ogni società quotata su Yahoo Finance, esiste una sezione riepilogativa. Questa sezione è ciò che utilizzeremo per ottenere i nostri dati.

Come?

Rispondere ai quesiti “dove” e “cosa” è piuttosto semplice, ma rispondere alla domanda “come” richiede un po’ di lavoro. Affronteremo questa domanda nelle seguenti sezioni.

Trovare il selettore CSS di un elemento.

Se apriamo Yahoo Finance e scegliamo un titolo, apparirà la pagina di riepilogo predefinita. Vogliamo raschiare il prezzo corrente del titolo e tutto nella tabella riepilogativa (tutto da “Chiusura precedente” a “Stima target 1A”).

Per lo scraping di questi dati, dobbiamo trovare il selettore CSS di ciascun elemento. Possiamo farlo tramite:

  • Aprire gli strumenti di sviluppo di Chrome premendo F12.
  • Premere Ctrl + Maiusc + C per abilitare il selettore di elementi.
  • Evidenziare e fare clic su un elemento. Questo evidenzierà l’HTML dell’elemento nel pannello Elementi.
  • Fare clic con il pulsante destro del mouse sull’HTML nel pannello Elementi e selezionare Copia > Copia selettore.

Per ciascuno dei nostri elementi desiderati, ripetiamo i passaggi 3 e 4.

Il file ‘scrape.json’

Una volta che abbiamo determinato i selettori CSS per ogni elemento, dobbiamo posizionarli in un posto utile. Per questo, creeremo un file chiamato scrape.json.

Alla fine, vogliamo archiviare i nostri dati in un dizionario Python. Per fare ciò, dobbiamo mappare le chiavi sui valori. In altre parole, dobbiamo sapere quali dati sono associati a quale chiave. scrape.json implementa questa mappatura.

La struttura di scrape.json è la seguente:

            {
    "elements": [
        {
            "from": "",
            "to": ""
        },
        {
            "from": "",
            "to": ""
        },
        {
            "from": "",
            "to": ""
        }
    ]
}
        

Ogni oggetto JSON in elements ha due attributi: from e to.

  • from è il selettore CSS di un elemento di cui vogliamo i dati.
  • to è la chiave con cui verranno archiviati i dati di questo elemento.

Di seguito il scrape.json compilato:

            {
    "elements": [
        {
            "from": "[class='Trsdu\\(0\\.3s\\) Fw\\(b\\) Fz\\(36px\\) Mb\\(-4px\\) D\\(ib\\)']",
            "to": "price"
        },
        {
            "from": "[data-test='PREV_CLOSE-value'] [class]",
            "to": "prev_close"
        },
        {
            "from": "[data-test='OPEN-value'] [class]",
            "to": "open"
        },
        {
            "from": "[data-test='BID-value'] [class]",
            "to": "bid"
        },
        {
            "from": "[data-test='ASK-value'] [class]",
            "to": "ask"
        },
        {
            "from": "[data-test='DAYS_RANGE-value']",
            "to": "days_range"
        },
        {
            "from": "[data-test='FIFTY_TWO_WK_RANGE-value']",
            "to": "52_week_range"
        },
        {
            "from": "[data-test='TD_VOLUME-value'] [class]",
            "to": "volume"
        },
        {
            "from": "[data-test='AVERAGE_VOLUME_3MONTH-value'] [class]",
            "to": "avg_volume"
        },
        {
            "from": "[data-test='MARKET_CAP-value'] [class]",
            "to": "market_cap"
        },
        {
            "from": "[data-test='BETA_5Y-value'] [class]",
            "to": "beta"
        },
        {
            "from": "[data-test='PE_RATIO-value'] [class]",
            "to": "pe"
        },
        {
            "from": "[data-test='EPS_RATIO-value'] [class]",
            "to": "eps"
        },
        {
            "from": "[data-test='EARNINGS_DATE-value']",
            "to": "earnings_date"
        },
        {
            "from": "[data-test='DIVIDEND_AND_YIELD-value']",
            "to": "dividend_and_yield"
        },
        {
            "from": "[data-test='EX_DIVIDEND_DATE-value'] span",
            "to": "ex_dividend_yield"
        },
        {
            "from": "[data-test='ONE_YEAR_TARGET_PRICE-value'] [class]",
            "to": "target_est"
        }
    ]
}
        

Codificare lo scraping.

Dopo aver compilato il file scrape.json con le nostre mappature, è il momento di codificare lo scraper.
Ci sono quattro passaggi che il nostro scraper deve compiere:

  • Creare un URL che punti alla pagina web di Yahoo Finance del titolo
  • Scaricare la pagina web
  • Analizzare la pagina web
  • Estrarre e mappare i dati desiderati (Sì, so cosa stai pensando, tecnicamente sono due passaggi. Non possiamo essere tutti perfetti.)

Diamo un’occhiata al codice e poi lo esaminiamo sezione per sezione.

            import requests
from bs4 import BeautifulSoup

class Scrape():

    def __init__(self, symbol, elements):
        url = "https://finance.yahoo.com/quote/" + symbol

        r = requests.get(url)
        if(r.url != url): # redirect occurred; likely symbol doesn't exist or cannot be found.
            raise requests.TooManyRedirects()

        r.raise_for_status()
        
        self.soup = BeautifulSoup(r.text, "html.parser")

        self.__summary = {}

        for el in elements["elements"]:
            tag = self.soup.select_one(el["from"])

            if tag != None:
                self.__summary[el["to"]] = tag.get_text()

    def summary(self):
        return self.__summary
        

Nella riga 7 (fase 1) si costruisce un URL.

Nella riga 9 (fase 2) sta scaricando la pagina web del titolo azionario.

Nelle righe 10-13 si controlla se si sono verificati errori o reindirizzamenti durante la fase 2. Se si è verificato un reindirizzamento, significa che il simbolo di borsa non esiste o non è corretto; trattiamo questo scenario come un errore.

Nella riga 15 (fase 3) si analizza la pagina web.

Nelle righe 17-23 (fase 4) è dove avviene la magia. Qui, le mappature in scrape.json sono usate per costruire un dizionario, __summary. Ogni elemento è selezionato con self.soup.select_one(el["from"]). Ai dati dell’elemento (testo) viene quindi assegnata una chiave con self.__summary[el["to"]] = tag.get_text().

Da buoni programmatori che siamo, dovremmo testare il nostro codice prima di continuare. Creiamo main.py e aggiungiamo il seguente codice di test:

            from Scrape import Scrape
import json

elements_to_scrape = {}

f = open("scrape.json")
data = f.read()
f.close()
elements_to_scrape = json.loads(data)

s = Scrape("AAPL", elements_to_scrape)
print(s.summary())
        

Con un po’ di fortuna, dovremmo vedere un dizionario Python popolato con la stampa dei dati sullo schermo. Questo è tutto per il web scraper! Ora possiamo cambiare marcia e iniziare a lavorare sulla nostra API REST.

 

Parte II: L’API REST

L’idea di base della nostra API REST sarà la seguente:

  • La nostra API REST avrà un endpoint, che prenderà un simbolo di borsa come input.
  • Il nostro scaper utilizzerà il simbolo per raccogliere i dati di riepilogo finanziario del titolo.
  • L’endpoint restituirà i dati di riepilogo in formato JSON.

Ora che abbiamo l’idea di base, iniziamo a programmare.

 

Configurazione FastAPI

FastAPI è facile da eseguire. Aggiungiamo il seguente codice a main.py:

            from fastapi import FastAPI

app = FastAPI()

@app.get("/root")
def root():
    return "Hello World!"
        

Per avviare FastAPI, possiamo emettere il seguente comando:

uvicorn main:app --reload

Analizziamo questo comando:

  • main: si riferisce al nostro file main.py.
  • app: si riferisce all’app FastAPI creata con main.py la linea app = FastAPI().
  • --reload: indica al server di riavviarsi quando ha rilevato modifiche al codice.

A parte: uvicorn è un’interfaccia ASGI (Asynchronous Server Gateway Interface) veloce che FastAPI utilizza per l’esecuzione. Non approfondiremo l’uvicorn, ma se sei interessato a saperne di più, puoi controllare il loro sito web .

Ora che abbiamo avviato il server, possiamo andare su http://127.0.0.1:8000/docs. Qui troveremo la documentazione dell’API interattiva automatica. Possiamo utilizzare questo strumento per verificare la corretta funzionalità dei nostri endpoint. Poiché al momento abbiamo solo l’endpoint root, non c’è molto da testare. Iniziamo con le modifiche.

 

Definizione di un endpoint

Il passaggio successivo consiste nell’aggiungere l’endpoint di riepilogo. Come accennato in precedenza, prenderà un simbolo azionario e restituirà il foglio di riepilogo finanziario di quel titolo.

Aggiungiamo questo codice in main.py

            @app.get("/v1/{symbol}/summary/")
def summary(symbol):
    summary_data = {}
    try:
        s = Scrape(symbol, elements_to_scrape)
        summary_data = s.summary()
        
    except TooManyRedirects:
        raise HTTPException(status_code=404, 
              detail="{symbol} doesn't exist or cannot be found.")
    except HTTPError:
        raise HTTPException(status_code=500, 
              detail="An error has occurred while processing the request.")

    return summary_data
        

Qui, definiamo un endpoint in /v1/{symbol}/summary/. Si usa {symbol} per denotare un parametro di percorso. Questo parametro viene passato al nostro metodo endpoint e quindi nella nostra istanza dell’oggetto Scrape, s. Assegniamo quindi a summary_datail il risultato di s.summary(). Se non si verificano eccezioni, si restituisce summary_data.

NOTA: il tipo di contenuto predefinito di FastAPI è application/JSON. Poiché s.summary() restituisce un oggetto dizionario, FastAPI lo converte automaticamente in JSON. Quindi, non è necessario eseguire questa conversione manualmente.

 

FastAPI ci consente di definire un metodo di start-up, che ovviamente verrà eseguito all’avvio del nostro server. Quindi si trasferisce il nostro  codice di caricamento scrape.json, da testing, all’interno di questo metodo.

            @app.on_event("startup")
def startup():
    f = open("scrape.json")
    data = f.read()
    f.close()
    global elements_to_scrape
    elements_to_scrape = json.loads(data)
        

È buona norma inserire metodi più lenti e utilizzabili una tantum nella funzione di avvio. In questo modo, evitiamo di rallentare i nostri tempi di risposta eseguendo codice non necessario durante una richiesta.

A questo punto, dovremmo avere un API REST funzionante. Possiamo testarlo tornando allo strumento endpoint e digitando un titolo.

            {
  "price": "135.37",
  "prev_close": "135.13",
  "open": "134.35",
  "bid": "135.40 x 1000",
  "ask": "135.47 x 1300",
  "days_range": "133.69 - 135.51",
  "52_week_range": "53.15 - 145.09",
  "volume": "60,145,130",
  "avg_volume": "102,827,562",
  "market_cap": "2.273T",
  "beta": "1.27",
  "pe": "36.72",
  "eps": "3.69",
  "earnings_date": "Apr 28, 2021 - May 03, 2021",
  "dividend_and_yield": "0.82 (0.61%)",
  "ex_dividend_yield": "Feb 05, 2021",
  "target_est": "151.75"
}
        

Filtraggio

Tecnicamente, abbiamo realizzato tutto ciò che ci eravamo prefissati di fare. Tuttavia, c’è un punto di attenzione. Ogni volta che richiediamo un riepilogo delle scorte, viene restituito l’intero foglio di riepilogo. E se volessimo solo pochi punti dati selezionati? Sembra un terribile spreco richiedere un intero oggetto riassuntivo quando sono necessari solo pochi punti dati. Per risolvere questo problema, dobbiamo implementare un filtro. Fortunatamente, FastAPI offre una soluzione: le query.

Per aggiungere il filtro, è necessario modificare il metodo summary dell’endpoint per gestire le query. Ecco come farlo:

            @app.get("/v1/{symbol}/summary/")
def summary(symbol, q: Optional[List[str]] = Query(None)):
    summary_data = {}
    try:
        s = Scrape(symbol, elements_to_scrape)
        summary_data = s.summary()
        if(q != None):
            # if all query parameters are keys in summary_data
            if all (k in summary_data for k in q): 
                # summary_data keeps requested key-value pairs
                summary_data = {key: summary_data[key] for key in q}
            # else, return whole summary_data

    except TooManyRedirects:
        raise HTTPException(status_code=404, detail="{symbol} doesn't exist or cannot be found.")
    except HTTPError:
        raise HTTPException(status_code=500, detail="An error has occurred while processing the request.")

    return summary_data
        

l parametro q: Optional[List[str]] = Query(None) dice a FastAPI che dovrebbe aspettarsi, facoltativamente, come query una List di String.

Le righe 8 e 9 controllano le stringhe di query rispetto a tutte le chiavi nel dizionario summary_dat. Se anche i parametri della query sono chiavi, manteniamo solo quelle coppie chiave-valore. In caso contrario, se uno dei parametri della query non è una chiave, viene restituito l’intero oggetto.

NOTA: per fare pratica, è possibile modificare la funzionalità del metodo precedente per restituire solo le coppie chiave-valore dei parametri di query valide. Puoi anche scegliere di sollevare un’eccezione se un parametro di query non corrisponde a nessuna delle chiavi.

Testiamo il nostro nuovo codice interrogando solo price e open.

            {
  "price": "135.37",
  "open": "134.35"
}
        

Conclusione

In questo articolo abbiamo creato un web scraper per recuperare i dati finanziari da Yahoo Finance durante questo articolo e abbiamo creato un API REST per servire questi dati. È dannatamente buono, e dovremmo essere orgogliosi di noi stessi.

Comunque per oggi è tutto. Come sempre, tutto il codice può essere trovato sul mio GitHub.

Grazie per aver letto. Spero che tu abbia imparato qualcosa e ti sia divertito!
Ci vediamo tutti la prossima volta.

 

Riferimenti

[1] Documentazione Beautiful Soup – https://www.crummy.com/software/BeautifulSoup/bs4/doc/

[2] FastAPI – https://fastapi.tiangolo.com/

[3] Requests: HTTP for HumansTM – https://2.python-requests.org/en/master/

[4] Uvicorn – https://www.uvicorn.org/

[5] Yahoo Finance – https://finance.yahoo.com/

Creazione di uno scraper di feed RSS con Python

scienzadeidati articoli - Costruire uno scraper di feed RSS con Python

Questo articolo è la prima parte 1 di un tutorial relativo alla creazione di un’applicazione di web scraping con Python. In questo articolo ci concentriamo ad approfondire le librerie Requests e BeautifulSoup.

Nella 2° parte del tutorial vedremo come schedulare lo scraping tramite Celery.

Nella 3° parte descriviamo come realizzazione di un’applicazione web scraping con Django.

Il codice di questo articolo è disponibile pubblicamente su GitHub.

Requisiti

Ho già utilizzato lo scraping web in diversi modi all’interno dei miei progetti, che si tratti di raccolta di dati per l’analisi, creazione di notifiche quando i siti vengono modificati o creazione di applicazioni web.

Questo articolo illustrerà un semplice  di feed RSS per HackerNews. Il feed RSS, disponibile a questo link, prevede aggiornamenti a intervalli regolari per i  nuovi post e attività sul sito.

Sebbene ciò possa essere ottenuto utilizzando un lettore RSS, questo è inteso come un  semplice esempio di utilizzo di Python, che può essere adattato facilmente ad altri siti Web.

In questo esempio usiamo:

Obiettivi

Di seguito uno schema dei passaggi da prevedere per creare la nostra applicazione:

  1. Creazione della directory  del progetto e del file scraping.py.
  2. Verificare che possiamo eseguire il ping del feed RSS che sarà oggetto dello scraping.
  3. Scraping del contenuto XML del sito.
  4. Analisi del contenuto utilizzando BS4.
  5. Output del contenuto in un file .txt

Inizializzazione

Il primo passo è la creazione della directory del progetto e accedere a quella directory tramite riga di comando. Una volta all’interno della directory di progetto, creiamo il file di progetto.

Nota: in questo caso stiamo usando Ubuntu, quindi questi comandi potrebbero differire a seconda del sistema operativo.

            $ mkdir web_scraping_example && cd web_scraping_example

$ touch scraping.py
        
Successivamente è necessario installare le librerie Python previste dai requisiti. Si può usare il comando pip, come nell’esempio seguente:
            $ pip install requests
$ pip install bs4
        

Importazione delle librerie

Ora che le basi del progetto sono state impostate, possiamo iniziare a scrivere il codice del web scraping.

All’interno del file scraping.py, si deve importare le librerie che abbiamo installato utilizzando pip.

            # scraping.py

import requests
from bs4 import BeautifulSoup
        

Quanto sopra ci consente di utilizzare le funzioni fornite dalle librerie Requests e BeautifulSoup.

Ora possiamo iniziare a testare la capacità di eseguire il ping del feed RSS di HackerNews e scrivere lo script di scraping.

Nota: di seguito non includeremo le righe di importazione. Tutto il codice che rimane uguale verrà annotato come ‘…’ sopra o sotto le nuove righe.

Testare la richiesta

Quando eseguiamo lo scraping web, iniziamo inviando una richiesta a un sito web. Per assicurarci di essere in grado di eseguire lo scraping, dobbiamo verificare che  possiamo stabilre una connessione al sito.

Iniziamo creando la nostra funzione di scraping di base. Questo sarà ciò a cui eseguiremo

            # scraping.py

# library imports
...

# scraping function
def hackernews_rss('https://news.ycombinator.com/rss'):
    try:
        r = requests.get()
        return print('The scraping job succeeded: ', r.status_code)
    except Exception as e:
        print('The scraping job failed. See exception: ')
        print(e)
        
        
print('Starting scraping')
hackernews_rss()
print('Finished scraping')
        

Nel codice di cui sopra,  si richiama la funzione requests.get(...) della libreria Requests per recuperare il sito Web da analizzare. Quindi si stampa a video lo stato della richiesta, contenuto nel parametro r.status_code, per verificare che il sito Web sia stato chiamato correttamente.
Inoltre, abbiamo inserito tutto dentro un try: except: in modo da rilevare eventuali errori che si potremmo verificare in seguito.

Una volta eseguito lo script, si ottiene un codice di stato pari a 200. Questo conferma che siamo in grado di eseguire il ping del sito e “ottenere” informazioni.

            $ python scraping.py

Starting scraping
The scraping job succeeded: 200
Finsihed scraping
        

Scraping del contenuto

Il nostro script ha restituito un codice di stato pari a 200, siamo pronti per iniziare ad estrarre il contenuto XML dal sito. A tal fine, dobbiamo utilizzare BS4 insieme a Requests.
            # scraping.py

...

def hackernews_rss():
    try:
        r = requests.get('https://news.ycombinator.com/rss')
        soup = BeautifulSoup(r.content, features='xml')
        
        return print(soup)
...
        

Questo codice assegne il contenuto XML acquisito da HackerNews alla variabile soup. Stiamo usando r.content per passare l’XML restituito a BeautifulSoup, che analizzeremo nel prossimo paragrafo.

Una cosa fondamentale da notare è che stiamo sfruttando il parametro features='xml', questo parametro varia a seconda dei dati da acquisire (ad esempio, in un scraper HTML si dichiarerà come ‘HTML’).

L’output di quanto sopra sarà un gran disordine di contenuti che non ha molto senso. Questo è solo per visualizzare quali informazioni stiamo estraendo con successo dal sito web.

Analisi dei dati

Abbiamo descritto come possiamo estrarre l’XML dal feed RSS di HackerNews. E’ quindi il momento di analizzare le informazioni.

Abbiamo scelto di usare il feed RSS perché è molto più semplice da analizzare rispetto alle informazioni html del sito Web, poiché non dobbiamo preoccuparci degli elementi HTML nidificati e di individuare le informazioni esatte.

Iniziamo osservando la struttura del feed:

            <item>
    <title>...</title>
    <link>...</link>
    <pubDate>...</pubDate>
    <comments>...</comments>
    <description>...</description>
</item>
        

Ciascuno dei tag disponibili sul feed RSS segue la struttura di cui sopra, contenente tutte le informazioni all’interno dei tag <item>...</item>.

Sfruttiamo la coerenza dei tag item per analizzare le informazioni.

            # scraping.py 

...

def hackernews_rss():
    article_list = []
    try:
        r = requests.get('https://news.ycombinator.com/rss')
        soup = BeautifulSoup(r.content, features='xml')
        articles = soup.findAll('item')        
        for a in articles:
            title = a.find('title').text
            link = a.find('link').text
            published = a.find('pubDate').text
            article = {
                'title': title,
                'link': link,
                'published': published
                }
            article_list.append(article)
        return print(article_list)
...
        

In questo codice, effettuiamo un controllo di tutti gli item in articles = soup.findAll('item'). Questo consente di estrarre ciascuno dei tag <item>...</item> dall’XML che abbiamo acquisito.

Ciascuno degli articoli viene analizzato utilizzando il ciclo: for a in articles:, questo consente di analizzare le informazioni in variabili separate e aggiungerle a un dizionario vuoto che abbiamo creato prima del ciclo.
BS4 ha scomposto l’XML in una stringa, consentendoci di chiamare la funzione .find() per cercare i tag in ciascuno degli oggetti. Usando .text siamo in grado di estrarre gli elementi <tag>...</tag> e salvare solamente il testo.
I risultati sono inseriti in un elenco tramite il comando article_list.append(article) in modo da potervi accedere in seguito.

Infine inseriamo la stampa della lista degli articoli in modo da visualizzare un output durante l’esecuzione dello script di scraping.

Output in un file

Il feed RSS è stato ora inviato correttamente in una funzione print() per visualizzare l’elenco prodotto a termine del parsing. Ora possiamo inserire i dati in un file .txt, in modo che possano riessere utilizzati in future analisi e altre attività relative ai dati. Utilizziamo la libreria JSON per la visualizzazione dei dati più semplice per noi; tuttavia, descriviamo anche un esempio senza la libreria JSON.

Iniziamo creando un’altra funzione def save_function(): che elabora l’elenco prodotto dalla funzione hackernews_rss(). In questo modo sarà più facile apportare modifiche in futuro.

            # scraping.py
import json

...

def save_function(article_list):
    with open('articles.txt', 'w') as outfile:
        json.dump(article_list, outfile)

...
        

Quanto sopra utilizza la libreria JSON per scrivere l’output dello scraping nel file articles.txt. Questo file verrà sovrascritto durante l’esecuzione dello script.

Un altro metodo per scrivere nel file .txt prevede un ciclo for:

            # scraping.py

...

def save_function(article_list):
    with open('articles.txt', 'w') as f:
        for a in article_list:
            f.write(a+'\n')
        f.close()
        
...
        
Ora che abbiamo creato la funzione save_function(), possiamo adattare la funzione di scraping per salvare i dati.
            # scraping.py

...

def hackernews_rss():
    ...
    try:
        ...
        return save_function(article_list)

...
        

Modificando il comando return print(article_list) in return save_function(article_list) siamo in grado di inserire i dati in un file .txt.

L’esecuzione dello script produrrà ora un file .txt dei dati acquisiti dal feed RSS di HackerNews.

Conclusione

Abbiamo creato con successo uno script Python per il di scraping dei feed RSS utilizzando Requests e BeautifulSoup. Questo ci consente di analizzare le informazioni XML in un formato leggibile con cui lavorare in futuro.

Questo script è la base per costruire alcune funzionalità interessanti:

  • Scraping di informazioni più complesse utilizzando elementi HTML.
  • Utilizzo di Selenium per lo scraping di siti dove il rendering lato client rende inutile l’uso di Requests.
  • Creazione di un’applicazione Web che acquisirà i dati dello scraping e li visualizzerà (ad esempio, un aggregatore)
  • Estrazione di dati da siti Web a seconda di una pianificazione e schedulazione.


Sebbene questo articolo abbia  descritto le basi del web scraping, possiamo iniziare ad approfondire alcuni dettagli.

Possiamo inserire questo script all’interno di una sequenza di altri script, schedulati tramite  Celery, oppure possiamo aggregare le informazioni su un’applicazione web utilizzando Django.

Questo articolo fa parte di una serie in 3 parti in cui descriviamo semplici esempi di web scraping e aggregazione su base programmata.

Pianificare un web scraping con Apache Airflow

scienzadeidati articoli - Web Scarping con Airflow

Nel post precedente , abbiamo introdotto Apache Airflow e i suoi concetti di base, la  configurazione e l’utilizzo. In questo post, descriviamo come possiamo pianificare i nostri web scraper con l’aiuto di Apache Airflow.

A titolo di esempio, vedremo come effettuare lo scraping del sito https://allrecipes.com tramite perché lo scopo è utilizzare Airflow. Nel caso si desideri approfondire il web scraping si può consultare l’intera serie presente qui su scienzadeidati.com.

Quindi, lavoreremo su un flusso di lavoro composto dalle seguenti attività:

  • parse_recipes: analizza le singole ricette.
  • download_image: scarica l’immagine della ricetta.
  • store_data: memorizza  l’immagine e i dati analizzati in un database MySQL

Non descriviamo come pianificare i DAG e altre cose che abbiamo già trattato nella Parte 1.

articoli - airflow_web_scraping

Abbiamo impostato load_examples=False in airflow.cfg in modo da semplificare l’interfaccia ed avere solo i DAG che ci interessano. La struttura del codice di base è simile alla seguente:

            import datetime as dt

from airflow import DAG
from airflow.operators.python_operator import PythonOperator


def parse_recipes():
    return 'parse_recipes'


def download_image():
    return 'download_image'


def store_data():
    return 'store_data'


default_args = {
    'owner': 'airflow',
    'start_date': dt.datetime(2020, 12, 1, 10, 00, 00),
    'concurrency': 1,
    'retries': 0
}

with DAG('parsing_recipes',
         catchup=False,
         default_args=default_args,
         schedule_interval='*/10 * * * *',
         # schedule_interval=None,
         ) as dag:
    opr_parse_recipes = PythonOperator(task_id='parse_recipes',
                                       python_callable=parse_recipes)

    opr_download_image = PythonOperator(task_id='download_image',
                                        python_callable=download_image)
    opr_store_data = PythonOperator(task_id='store_data',
                                    python_callable=store_data)

opr_parse_recipes >> opr_download_image >> opr_store_data
        

Se registriamo questo DAG eseguendo airflow scheduler, otteniamo qualcosa di simile:

articoli - airflow_web_scraping

Da notare la potenza dei flussi di lavoro. E’ possibile visualizzare lo scheletro della nostra pipeline in modo da avere un’idea del nostro flusso dati senza entrare nei dettagli. Bello, vero?

Ora, implementiamo una per una le routine di ogni task. Prima di entrare nella logica di parse_recipies, è fondamentale descrivere come i task di Airflow possono comunicare tra loro.

 

Cos’è XCOM?

XCOM fornisce i metodi per far comunicare i task tra di loro. I dati inviati da un task vengono acquisiti da un’altro task. Se impostiamo provide_context=True, il valore restituito della funzione viene inserito in XCOM che di per sé non è altro che una tabella Db. Se controlliamo airflow.db troveremo una tabella con il nome xcom, al cui interno sono presenti le voci delle istanze di attività in esecuzione.

            
def parse_recipes(**kwargs):
    return 'RETURNS parse_recipes'

        

Il primo task non prevede ha tali modifiche oltre a fornire **kwargs che consentono di condividere coppie chiave/valore. E’ comunque necessario impostare provide_context=True per ogni operatore, in modo da renderlo compatibile con XCom. Ad esempio:

            opr_parse_recipes = PythonOperator(task_id='parse_recipes',
                                python_callable=parse_recipes, provide_context=True)
        

 

Il task download_image avrà le seguenti modifiche:

            
def download_image(**kwargs):
    ti = kwargs['ti']
    v1 = ti.xcom_pull(key=None, task_ids='parse_recipes')

    print('Printing Task 1 values in Download_image')
    print(v1)
    return 'download_image'
        

Con prima riga è ti=kwargs['t1'] otteniamo i dettagli delle istanze tramite la chiave di accesso ti.

Vediamo in dettaglio perchè è necessario prevedere l’uso di queste chiavi di accesso. Se analizziamo il parametro kwargs, otteniamo una stampa simile alla seguente, dove sono presenti chiavi come t1task_instance, ecc… che contengono il valore inviato da un task:

            {
  'dag': <DAG: parsing_recipes>,
  'ds': '2020-12-02',
  'next_ds': '2020-12-02',
  'prev_ds': '2020-12-02',
  'ds_nodash': '20201202',
  'ts': '2020-12-02T09:56:05.289457+00:00',
  'ts_nodash': '20201202T095605.289457+0000',
  'yesterday_ds': '2020-12-01',
  'yesterday_ds_nodash': '20201201',
  'tomorrow_ds': '2020-12-03',
  'tomorrow_ds_nodash': '20201203',
  'END_DATE': '2020-12-02',
  'end_date': '2020-12-02',
  'dag_run': <DagRunparsing_recipes@2020-12-0209:56:05.289457+00:00:manual__2020-12-02T09:56:05.289457+00:00, externallytriggered: True>,
  'run_id': 'manual__2020-12-02T09:56:05.289457+00:00',
  'execution_date': <Pendulum[2020-12-02T09: 56: 05.289457+00:00]>,
  'prev_execution_date': datetime.datetime(2020,12,2,9,56,tzinfo=<TimezoneInfo[UTC,GMT,+00:00:00,STD]>),
  'next_execution_date': datetime.datetime(2020,12,2,9,58,tzinfo=<TimezoneInfo[UTC,GMT,+00:00:00,STD]>),
  'latest_date': '2020-12-02',
  'macros': <module'airflow.macros'from'/anaconda3/anaconda/lib/python3.6/site-packages/airflow/macros/__init__.py'>,
  'params': {},
  'tables': None,
  'task': <Task(PythonOperator): download_image>,
  'task_instance': <TaskInstance: parsing_recipes.download_image2020-12-02T09: 56: 05.289457+00:00[running]>,
  'ti': <TaskInstance: parsing_recipes.download_image2020-12-02T09: 56: 05.289457+00:00[running]>,
  'task_instance_key_str': 'parsing_recipes__download_image__20201202',
  'conf': <module'airflow.configuration'from'/anaconda3/anaconda/lib/python3.6/site-packages/airflow/configuration.py'>,
  'test_mode': False,
  'var': {
    'value': None,
    'json': None
  },
  'inlets': [],
  'outlets': [],
  'templates_dict': None
}
        

Nella riga successiva si richiama il metodo xcom_pull per inserire il valore restituito di un determinato task. Nel nostro caso l’ID task è parse_recipes:

v1 = ti.xcom_pull(key=None, task_ids='parse_recipes')

Bene, dopo aver descritto il concetto di XCOM, torniamo al nostro codice originale.

Quindi il nostro processo prevede un primo task che legge gli URL dal file di testo (possiamo anche creare un altra task che sarà responsabile solo di recuperare i collegamenti url e archiviarli in un file o DB.  Lascio questa implementazione a voi lettori! ) e quindi la list delle voci viene passata al successivo task che ha il compito di scaricare le immagini e aggiungere le informazioni del file appena scaricato in  un dict e infine memorizzarlo nel database MySQL.

Il metodo download_image appare ora come segue:

            
def download_image(**kwargs):
    local_image_file = None
    idx = 0
    records = []
    ti = kwargs['ti']

    parsed_records = ti.xcom_pull(key=None, task_ids='parse_recipes')

    for rec in parsed_records:
        idx += 1
        image_url = rec['image_url']
        r_url = rec['url']
        print('Downloading Pic# {}'.format(idx))
        local_image_file = dl_img(image_url, r_url)
        rec['local_image'] = local_image_file
        records.append(rec)

    return records
        

Non descriviamo il metodo dl_img dato che esula dallo scopo di questo post. Se vuoi puoi controllare il codice direttamente su Github. Una volta scaricato il file, aggiungiamo la chiave nel record originale.

Il metodo store_data appare ora come segue:

            
def store_data(**kwargs):
    ti = kwargs['ti']

    parsed_records = ti.xcom_pull(key=None, task_ids='download_image')
    print('PRINTING DUMPED RECORDS in STORE DATA')
    print(parsed_records)
    return 'store_data'
        

Assicuriamoci di impostare provide_context=True per l’operatore opr_store_data altrimenti si ottiene il seguente errore:

Subtask store_data KeyError: 'ti'

Ora che i dati sono disponibili., sono inviati per essere memorizzati nel database MySQL.

Per utilizzare MySQL con Airflow, utilizzeremo gli hook forniti da Airflow.

Airflow Hooks ci consente di interagire con sistemi esterni: e-mail, S3, database e vari altri.

Prima di iniziare a programmare, dobbiamo configurare una connessione MySQL.

Sull’interfaccia utente Web di Airflow, dobbiamo andare su Amministratore > Connessioni. Qui abbiamo un elenco di connessioni esistenti se ci colleghiamo a http://0.0.0.0:8080/admin/connection/

articoli - airflow_web_scraping

Modifichiamo la connessione e impostiamo il nome e la password della tabella.

articoli - airflow_web_scraping

Per utilizzare la libreria MySQL dobbiamo importare MySqlHook 

from airflow.hooks.mysql_hook import MySqlHook

            
def store_data(**kwargs):
    ti = kwargs['ti']

    parsed_records = ti.xcom_pull(key=None, task_ids='download_image')
    connection = MySqlHook(mysql_conn_id='mysql_default')
    for r in parsed_records:
        url = r['url']
        data = json.dumps(r)
        sql = 'INSERT INTO recipes(url,data) VALUES (%s,%s)'
        connection.run(sql, autocommit=True, parameters=(url, data))
    return True
        

L’SQL della tabella è riportato di seguito:

            
CREATE TABLE recipes (
	'id' int(11) UNSIGNED NOT NULL AUTO_INCREMENT,
	'url' varchar(100) NOT NULL,
	'data' text NOT NULL,
	PRIMARY KEY (`id`)
);
        

Questo è tutto. Oh, aspetta… E’ possibile informare l’amministratore se il flusso di lavoro è stato eseguito correttamente? Airflow ci consente di utilizzare EmailOperator per tale scopo.

            
opr_email = EmailOperator(
        task_id='send_email',
        to='[email protected]',
        subject='Airflow Finished',
        html_content=""" <h3>DONE</h3> """,
        dag=dag
    )
        

Prima di usarlo dobbiamo apportare modifiche a airflow.cfg, nelle impostazioni relative alla posta. C’è una sezione [smtp].

            
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = smtp.gmail.com
smtp_starttls = False
smtp_ssl = True
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = [email protected]
smtp_password = your_password
smtp_port = 465
smtp_mail_from = [email protected]
        

Conclusione

In questo post, abbiamo visto come introdurre Airflow nella  nostra architettura di scraping esistente e come utilizzare MySQL con Airflow. Ci sono varie possibilità per migliorarlo o estenderlo. Provate e fatemi sapere.

Il codice è disponibile su Github

Quando eseguiamo i web scraping, c’è un’alta probabilità di essere bloccato e non abbiamo altra scelta che aspettare e pregare di essere sbloccati. Gli indirizzi IP proxy vengono utilizzati per evitare tali circostanze.