Creare un’API di dati azionari utilizzando il Web Scraping e FastAPI

scienzadeidati articoli - Creare API di dati finanziari utilizzando Web Scraping e FastAPI

Per la stragrande maggioranza delle API REST Python che ho creato, ho usato Flask o Django. Questi due framework hanno costituito le fondamenta delle API REST di Python da ormai molti anni. Di recente, tuttavia, sono emersi alcuni nuovi strumenti. Uno di questi framework ha suscitato un notevole clamore: FastAPI. In questo articolo esploreremo  alcune funzionalità di questo strumento.


Il progetto

La prima cosa da fare è elaborare un nuovo progetto. Volevo trovare un progetto che fosse interessante ma semplice, un progetto che mi permettesse di sperimentare il flusso di lavoro di FastAPI evitando dettagli confusi. Dopo averci pensato un po’, ho deciso di creare un’API REST per la finanza.

Questo progetto sarà composto da due parti:

  • Un web scraper per ottenere dati finanziari e,
  • Un’API REST che offre l’accesso ai dati.

Pertanto, ho diviso questo articolo in due. Sentiti libero di saltare intorno alle parti che trovi più interessanti.


Il setup

Prima di iniziare, dovremmo occuparci della parte più divertente dell’avvio di un nuovo progetto: la creazione del nostro ambiente. Sì, che divertimento!

Useremo alcune librerie, quindi consiglio di creare un nuovo ambiente virtuale.
Ecco un elenco di dipendenze che utilizzeremo:

  • FastAPI (ovviamente è nel titolo!)
  • Uvicorn
  • Requests
  • BeautifulSoup4

Possiamo eseguire un comando per installarli tutti in una volta:

            pip install fastapi uvicorn requests beautifulsoup4
        

Per fortuna, questo è sufficiente per la configurazione. Immergiamoci subito nel codice!

Parte I: Il Web Scraper

Quando costruiamo un web scraper, dobbiamo prima di tutto rispondere a tre domande: dove , cosa e come.

Dove?

Esistono diverse risorse online dove è possibile trovare dati finanziari. Uno di questi è Yahoo Finance. Questo sito Web ha una pletora di dati finanziari organizzati per azienda e quindi per categoria. Useremo Yahoo Finance come obiettivo del nostro scraping.

Cosa?

Per ogni società quotata su Yahoo Finance, esiste una sezione riepilogativa. Questa sezione è ciò che utilizzeremo per ottenere i nostri dati.

Come?

Rispondere ai quesiti “dove” e “cosa” è piuttosto semplice, ma rispondere alla domanda “come” richiede un po’ di lavoro. Affronteremo questa domanda nelle seguenti sezioni.

Trovare il selettore CSS di un elemento.

Se apriamo Yahoo Finance e scegliamo un titolo, apparirà la pagina di riepilogo predefinita. Vogliamo raschiare il prezzo corrente del titolo e tutto nella tabella riepilogativa (tutto da “Chiusura precedente” a “Stima target 1A”).

Per lo scraping di questi dati, dobbiamo trovare il selettore CSS di ciascun elemento. Possiamo farlo tramite:

  • Aprire gli strumenti di sviluppo di Chrome premendo F12.
  • Premere Ctrl + Maiusc + C per abilitare il selettore di elementi.
  • Evidenziare e fare clic su un elemento. Questo evidenzierà l’HTML dell’elemento nel pannello Elementi.
  • Fare clic con il pulsante destro del mouse sull’HTML nel pannello Elementi e selezionare Copia > Copia selettore.

Per ciascuno dei nostri elementi desiderati, ripetiamo i passaggi 3 e 4.

Il file ‘scrape.json’

Una volta che abbiamo determinato i selettori CSS per ogni elemento, dobbiamo posizionarli in un posto utile. Per questo, creeremo un file chiamato scrape.json.

Alla fine, vogliamo archiviare i nostri dati in un dizionario Python. Per fare ciò, dobbiamo mappare le chiavi sui valori. In altre parole, dobbiamo sapere quali dati sono associati a quale chiave. scrape.json implementa questa mappatura.

La struttura di scrape.json è la seguente:

            {
    "elements": [
        {
            "from": "",
            "to": ""
        },
        {
            "from": "",
            "to": ""
        },
        {
            "from": "",
            "to": ""
        }
    ]
}
        

Ogni oggetto JSON in elements ha due attributi: from e to.

  • from è il selettore CSS di un elemento di cui vogliamo i dati.
  • to è la chiave con cui verranno archiviati i dati di questo elemento.

Di seguito il scrape.json compilato:

            {
    "elements": [
        {
            "from": "[class='Trsdu\\(0\\.3s\\) Fw\\(b\\) Fz\\(36px\\) Mb\\(-4px\\) D\\(ib\\)']",
            "to": "price"
        },
        {
            "from": "[data-test='PREV_CLOSE-value'] [class]",
            "to": "prev_close"
        },
        {
            "from": "[data-test='OPEN-value'] [class]",
            "to": "open"
        },
        {
            "from": "[data-test='BID-value'] [class]",
            "to": "bid"
        },
        {
            "from": "[data-test='ASK-value'] [class]",
            "to": "ask"
        },
        {
            "from": "[data-test='DAYS_RANGE-value']",
            "to": "days_range"
        },
        {
            "from": "[data-test='FIFTY_TWO_WK_RANGE-value']",
            "to": "52_week_range"
        },
        {
            "from": "[data-test='TD_VOLUME-value'] [class]",
            "to": "volume"
        },
        {
            "from": "[data-test='AVERAGE_VOLUME_3MONTH-value'] [class]",
            "to": "avg_volume"
        },
        {
            "from": "[data-test='MARKET_CAP-value'] [class]",
            "to": "market_cap"
        },
        {
            "from": "[data-test='BETA_5Y-value'] [class]",
            "to": "beta"
        },
        {
            "from": "[data-test='PE_RATIO-value'] [class]",
            "to": "pe"
        },
        {
            "from": "[data-test='EPS_RATIO-value'] [class]",
            "to": "eps"
        },
        {
            "from": "[data-test='EARNINGS_DATE-value']",
            "to": "earnings_date"
        },
        {
            "from": "[data-test='DIVIDEND_AND_YIELD-value']",
            "to": "dividend_and_yield"
        },
        {
            "from": "[data-test='EX_DIVIDEND_DATE-value'] span",
            "to": "ex_dividend_yield"
        },
        {
            "from": "[data-test='ONE_YEAR_TARGET_PRICE-value'] [class]",
            "to": "target_est"
        }
    ]
}
        

Codificare lo scraping.

Dopo aver compilato il file scrape.json con le nostre mappature, è il momento di codificare lo scraper.
Ci sono quattro passaggi che il nostro scraper deve compiere:

  • Creare un URL che punti alla pagina web di Yahoo Finance del titolo
  • Scaricare la pagina web
  • Analizzare la pagina web
  • Estrarre e mappare i dati desiderati (Sì, so cosa stai pensando, tecnicamente sono due passaggi. Non possiamo essere tutti perfetti.)

Diamo un’occhiata al codice e poi lo esaminiamo sezione per sezione.

            import requests
from bs4 import BeautifulSoup

class Scrape():

    def __init__(self, symbol, elements):
        url = "https://finance.yahoo.com/quote/" + symbol

        r = requests.get(url)
        if(r.url != url): # redirect occurred; likely symbol doesn't exist or cannot be found.
            raise requests.TooManyRedirects()

        r.raise_for_status()
        
        self.soup = BeautifulSoup(r.text, "html.parser")

        self.__summary = {}

        for el in elements["elements"]:
            tag = self.soup.select_one(el["from"])

            if tag != None:
                self.__summary[el["to"]] = tag.get_text()

    def summary(self):
        return self.__summary
        

Nella riga 7 (fase 1) si costruisce un URL.

Nella riga 9 (fase 2) sta scaricando la pagina web del titolo azionario.

Nelle righe 10-13 si controlla se si sono verificati errori o reindirizzamenti durante la fase 2. Se si è verificato un reindirizzamento, significa che il simbolo di borsa non esiste o non è corretto; trattiamo questo scenario come un errore.

Nella riga 15 (fase 3) si analizza la pagina web.

Nelle righe 17-23 (fase 4) è dove avviene la magia. Qui, le mappature in scrape.json sono usate per costruire un dizionario, __summary. Ogni elemento è selezionato con self.soup.select_one(el["from"]). Ai dati dell’elemento (testo) viene quindi assegnata una chiave con self.__summary[el["to"]] = tag.get_text().

Da buoni programmatori che siamo, dovremmo testare il nostro codice prima di continuare. Creiamo main.py e aggiungiamo il seguente codice di test:

            from Scrape import Scrape
import json

elements_to_scrape = {}

f = open("scrape.json")
data = f.read()
f.close()
elements_to_scrape = json.loads(data)

s = Scrape("AAPL", elements_to_scrape)
print(s.summary())
        

Con un po’ di fortuna, dovremmo vedere un dizionario Python popolato con la stampa dei dati sullo schermo. Questo è tutto per il web scraper! Ora possiamo cambiare marcia e iniziare a lavorare sulla nostra API REST.

 

Parte II: L’API REST

L’idea di base della nostra API REST sarà la seguente:

  • La nostra API REST avrà un endpoint, che prenderà un simbolo di borsa come input.
  • Il nostro scaper utilizzerà il simbolo per raccogliere i dati di riepilogo finanziario del titolo.
  • L’endpoint restituirà i dati di riepilogo in formato JSON.

Ora che abbiamo l’idea di base, iniziamo a programmare.

 

Configurazione FastAPI

FastAPI è facile da eseguire. Aggiungiamo il seguente codice a main.py:

            from fastapi import FastAPI

app = FastAPI()

@app.get("/root")
def root():
    return "Hello World!"
        

Per avviare FastAPI, possiamo emettere il seguente comando:

uvicorn main:app --reload

Analizziamo questo comando:

  • main: si riferisce al nostro file main.py.
  • app: si riferisce all’app FastAPI creata con main.py la linea app = FastAPI().
  • --reload: indica al server di riavviarsi quando ha rilevato modifiche al codice.

A parte: uvicorn è un’interfaccia ASGI (Asynchronous Server Gateway Interface) veloce che FastAPI utilizza per l’esecuzione. Non approfondiremo l’uvicorn, ma se sei interessato a saperne di più, puoi controllare il loro sito web .

Ora che abbiamo avviato il server, possiamo andare su http://127.0.0.1:8000/docs. Qui troveremo la documentazione dell’API interattiva automatica. Possiamo utilizzare questo strumento per verificare la corretta funzionalità dei nostri endpoint. Poiché al momento abbiamo solo l’endpoint root, non c’è molto da testare. Iniziamo con le modifiche.

 

Definizione di un endpoint

Il passaggio successivo consiste nell’aggiungere l’endpoint di riepilogo. Come accennato in precedenza, prenderà un simbolo azionario e restituirà il foglio di riepilogo finanziario di quel titolo.

Aggiungiamo questo codice in main.py

            @app.get("/v1/{symbol}/summary/")
def summary(symbol):
    summary_data = {}
    try:
        s = Scrape(symbol, elements_to_scrape)
        summary_data = s.summary()
        
    except TooManyRedirects:
        raise HTTPException(status_code=404, 
              detail="{symbol} doesn't exist or cannot be found.")
    except HTTPError:
        raise HTTPException(status_code=500, 
              detail="An error has occurred while processing the request.")

    return summary_data
        

Qui, definiamo un endpoint in /v1/{symbol}/summary/. Si usa {symbol} per denotare un parametro di percorso. Questo parametro viene passato al nostro metodo endpoint e quindi nella nostra istanza dell’oggetto Scrape, s. Assegniamo quindi a summary_datail il risultato di s.summary(). Se non si verificano eccezioni, si restituisce summary_data.

NOTA: il tipo di contenuto predefinito di FastAPI è application/JSON. Poiché s.summary() restituisce un oggetto dizionario, FastAPI lo converte automaticamente in JSON. Quindi, non è necessario eseguire questa conversione manualmente.

 

FastAPI ci consente di definire un metodo di start-up, che ovviamente verrà eseguito all’avvio del nostro server. Quindi si trasferisce il nostro  codice di caricamento scrape.json, da testing, all’interno di questo metodo.

            @app.on_event("startup")
def startup():
    f = open("scrape.json")
    data = f.read()
    f.close()
    global elements_to_scrape
    elements_to_scrape = json.loads(data)
        

È buona norma inserire metodi più lenti e utilizzabili una tantum nella funzione di avvio. In questo modo, evitiamo di rallentare i nostri tempi di risposta eseguendo codice non necessario durante una richiesta.

A questo punto, dovremmo avere un API REST funzionante. Possiamo testarlo tornando allo strumento endpoint e digitando un titolo.

            {
  "price": "135.37",
  "prev_close": "135.13",
  "open": "134.35",
  "bid": "135.40 x 1000",
  "ask": "135.47 x 1300",
  "days_range": "133.69 - 135.51",
  "52_week_range": "53.15 - 145.09",
  "volume": "60,145,130",
  "avg_volume": "102,827,562",
  "market_cap": "2.273T",
  "beta": "1.27",
  "pe": "36.72",
  "eps": "3.69",
  "earnings_date": "Apr 28, 2021 - May 03, 2021",
  "dividend_and_yield": "0.82 (0.61%)",
  "ex_dividend_yield": "Feb 05, 2021",
  "target_est": "151.75"
}
        

Filtraggio

Tecnicamente, abbiamo realizzato tutto ciò che ci eravamo prefissati di fare. Tuttavia, c’è un punto di attenzione. Ogni volta che richiediamo un riepilogo delle scorte, viene restituito l’intero foglio di riepilogo. E se volessimo solo pochi punti dati selezionati? Sembra un terribile spreco richiedere un intero oggetto riassuntivo quando sono necessari solo pochi punti dati. Per risolvere questo problema, dobbiamo implementare un filtro. Fortunatamente, FastAPI offre una soluzione: le query.

Per aggiungere il filtro, è necessario modificare il metodo summary dell’endpoint per gestire le query. Ecco come farlo:

            @app.get("/v1/{symbol}/summary/")
def summary(symbol, q: Optional[List[str]] = Query(None)):
    summary_data = {}
    try:
        s = Scrape(symbol, elements_to_scrape)
        summary_data = s.summary()
        if(q != None):
            # if all query parameters are keys in summary_data
            if all (k in summary_data for k in q): 
                # summary_data keeps requested key-value pairs
                summary_data = {key: summary_data[key] for key in q}
            # else, return whole summary_data

    except TooManyRedirects:
        raise HTTPException(status_code=404, detail="{symbol} doesn't exist or cannot be found.")
    except HTTPError:
        raise HTTPException(status_code=500, detail="An error has occurred while processing the request.")

    return summary_data
        

l parametro q: Optional[List[str]] = Query(None) dice a FastAPI che dovrebbe aspettarsi, facoltativamente, come query una List di String.

Le righe 8 e 9 controllano le stringhe di query rispetto a tutte le chiavi nel dizionario summary_dat. Se anche i parametri della query sono chiavi, manteniamo solo quelle coppie chiave-valore. In caso contrario, se uno dei parametri della query non è una chiave, viene restituito l’intero oggetto.

NOTA: per fare pratica, è possibile modificare la funzionalità del metodo precedente per restituire solo le coppie chiave-valore dei parametri di query valide. Puoi anche scegliere di sollevare un’eccezione se un parametro di query non corrisponde a nessuna delle chiavi.

Testiamo il nostro nuovo codice interrogando solo price e open.

            {
  "price": "135.37",
  "open": "134.35"
}
        

Conclusione

In questo articolo abbiamo creato un web scraper per recuperare i dati finanziari da Yahoo Finance durante questo articolo e abbiamo creato un API REST per servire questi dati. È dannatamente buono, e dovremmo essere orgogliosi di noi stessi.

Comunque per oggi è tutto. Come sempre, tutto il codice può essere trovato sul mio GitHub.

Grazie per aver letto. Spero che tu abbia imparato qualcosa e ti sia divertito!
Ci vediamo tutti la prossima volta.

 

Riferimenti

[1] Documentazione Beautiful Soup – https://www.crummy.com/software/BeautifulSoup/bs4/doc/

[2] FastAPI – https://fastapi.tiangolo.com/

[3] Requests: HTTP for HumansTM – https://2.python-requests.org/en/master/

[4] Uvicorn – https://www.uvicorn.org/

[5] Yahoo Finance – https://finance.yahoo.com/

Creazione di Microservizi con Python e FastAPI

scienzadeidati articoli - Creazione di Microservizi con Python e FastAPI

Come sviluppatore Python potresti aver sentito parlare del termine microservizi e desideri creare da solo un microservizio Python. I microservice sono un’ottima architettura per la creazione di applicazioni altamente scalabili. Prima di iniziare a creare l’applicazione usando i microservice, è necessario conoscere i vantaggi e gli svantaggi dell’uso dei microservizi. In questo articolo imparerai i vantaggi e gli svantaggi dell’utilizzo dei microservice. Imparerai anche come creare il tuo microservice e distribuirlo utilizzando Docker Compose.

In questo tutorial vedremo:

  • Quali sono i vantaggi e gli svantaggi dei microservizi.
  • Perché dovresti creare microservice con Python.
  • Come creare API REST utilizzando FastAPI e PostgreSQL.
  • Come creare microservice utilizzando FastAPI.
  • Come eseguire i microservice usando docker-compose.
  • Come gestire i microservice utilizzando Nginx.

Prima creeremo una semplice API REST usando FastAPI e poi utilizzeremo PostgreSQL come nostro database. Estenderemo quindi la stessa applicazione a un microservizio.

 

Introduzione ai microservice

Il microservice è un approccio per suddividere grandi applicazioni monolitiche in singole applicazioni specializzate in uno specifico servizio/funzionalità. Questo approccio è spesso noto come architettura orientata ai servizi o SOA.

Nell’architettura monolitica, ogni logica di business risiede nella stessa applicazione. I servizi dell’applicazione come la gestione degli utenti, l’autenticazione e altre funzionalità utilizzano lo stesso database.

In un’architettura di microservice, l’applicazione è suddivisa in diversi servizi separati che vengono eseguiti in processi separati. Esiste un database diverso per le diverse funzionalità dell’applicazione e i servizi comunicano tra loro utilizzando HTTP, AMQP o un protocollo binario come TCP, a seconda della natura di ciascun servizio. La comunicazione tra servizi può essere eseguita anche utilizzando le code di messaggi come RabbitMQ, Kafka o Redis.

 

Vantaggi dei microservice

L’architettura a microservizi offre molti vantaggi. Alcuni di questi vantaggi sono:

  • Un’applicazione debolmente accoppiata significa che i diversi servizi possono essere costruiti utilizzando le tecnologie più adatte alle singole funzioni e specificità. Quindi, il team di sviluppo non è vincolato alle scelte fatte durante l’avvio del progetto.
  • Poiché i servizi sono responsabili di funzionalità specifiche, la comprensione e il controllo dell’applicazione è più semplice e facile.
  • Anche il ridimensionamento dell’applicazione diventa più semplice perché se uno dei servizi richiede un utilizzo elevato della GPU, solo il server che contiene quel servizio deve avere una GPU elevata e gli altri possono essere eseguiti su un server normale.
 

Svantaggi dei microservice

L’architettura dei microservizi non è un proiettile d’argento che risolve tutti i tuoi problemi, ha anche i suoi svantaggi. Alcuni di questi inconvenienti sono:

  • Poiché diversi servizi utilizzano un diverso database, le transazioni che coinvolgono più di un servizio devono gestire la consistenza dei dati.
  • La suddivisione perfetta dei servizi è molto difficile da ottenere al primo tentativo e questo deve essere ripetuto prima di ottenere la migliore separazione possibile dei servizi.
  • Poiché i servizi comunicano tra loro attraverso l’uso dell’interazione di rete, ciò rende l’applicazione più lenta a causa della latenza della rete e del servizio.
 

Perché Microservice in Python?

Python è uno strumento perfetto per la creazione di microservizi perché questo linguaggio ha una semplice curva di apprendimento, tonnellate di librerie e pacchetti già implementati e una grande community di utilizzatori. Grazie all’introduzione della programmazione asincrona in Python, sono emersi framework web con prestazioni alla pari con GO e Node.js.

 

Introduzione a FastAPI

FastAPI è un moderno framework Web ad alte prestazioni, dotato di tantissime funzioni interessanti come la documentazione automatica basata su OpenAPI e la libreria di convalida e serializzazione integrata. In questo link puoi leggere l’elenco di tutte le fantastiche funzionalità presenti FastAPI.

 

Perché FastAPI

Alcuni dei motivi per cui penso che FastAPI sia un’ottima scelta per la creazione di microservizi in Python sono:

  • Documentazione automatica
    Supporto Async/Await
  • Convalida e serializzazione integrate
  • Tipo 100% annotato, quindi il completamento automatico funziona alla grande
 

Installazione FastAPI

Prima di installare FastAPI è necessario creare una nuova directory movie_service e creare un nuovo ambiente virtuale all’interno della directory appena creata usando virtualenv.

Se non l’hai già installato virtualenv:

            pip install virtualenv
        
Ora, si crea un nuovo ambiente virtuale.
            virtualenv env
        

Nei sistemi Mac/Linux si può attivare l’ambiente virtuale usando il comando:

            source ./env/bin/activate
        
Gli utenti Windows possono invece eseguire questo comando:
            .\env\Scripts\activate
        
Infine, siamo pronti per installare FastAPI eseguendo il seguente comando:
            pip install fastapi
        

Poiché FastAPI non viene fornito con un service integrato, è necessario installare uvicorn per eseguire il service. uvicorn è un server ASGI che ci consente di utilizzare le funzionalità async/await.

Per installare uvicorn si può usare il comando:

            pip install uvicorn
        

Creazione di un semplice REST API utilizzando FastAPI

Prima di iniziare a creare un microservice utilizzando FastAPI, impariamo le basi di FastAPI. Creiamo una nuova directory zcode>app e un nuovo file main.py all’interno della directory appena creata.

Aggiungiamo il seguente codice in main.py.

            #~/movie_service/app/main.py

from fastapi import FastAPI

app = FastAPI()


@app.get('/')
async def index():
    return {"Real": "Python"}
        

E’ necessario importare e creare un’istanza di FastAPI e quindi registrare l’endpoint radice / che restituisce un file JSON.

È possibile eseguire il server delle applicazioni utilizzando uvicorn app.main:app --reload. Qui app.main indica che si sta usando il file main.py all’interno della directory app e :app indica a FastAPI il nome della nostra istanza.

Possiamo accedere all’app da http://127.0.0.1:8000. Per accedere alla documentazione automatica bisogna andare su http://127.0.0.1:8000/docs. Possiamo giocare e interagire con l’API dal browser stesso.

Aggiungiamo alcune funzionalità CRUD alla nostra applicazione. Aggiorniamo il main.py in modo che assomigli a quanto segue:

            #~/movie_service/app/main.py

from fastapi import FastAPI
from pydantic import BaseModel
from typing import List

app = FastAPI()

fake_movie_db = [
    {
        'name': 'Star Wars: Episode IX - The Rise of Skywalker',
        'plot': 'The surviving members of the resistance face the First Order once again.',
        'genres': ['Action', 'Adventure', 'Fantasy'],
        'casts': ['Daisy Ridley', 'Adam Driver']
    }
]

class Movie(BaseModel):
    name: str
    plot: str
    genres: List[str]
    casts: List[str]


@app.get('/', response_model=List[Movie])
async def index():
    return fake_movie_db
        

Come puoi vedere hai creato una nuova classe Movie che estende la classe BaseModel di Pydantic.

Il modello Movie contiene il nome, la foto, il genere e il cast. Pydantic è integrato con FastAPI che rende la creazione di modelli e la convalida delle richieste un gioco da ragazzi.

Se andiamo alla pagina della documentazione possiamo vedere che sono descritti i campi del nostro modello nella sezione di risposta di esempio. Questo è possibile perché abbiamo definito il response_model nella definizione del percorso nel decoratore @app.get.

Ora aggiungiamo l’endpoint per aggiungere un film al nostro elenco di film.

Aggiungiamo una nuova definizione di endpoint per gestire la richiesta POST.

            @app.post('/', status_code=201)
async def add_movie(payload: Movie):
    movie = payload.dict()
    fake_movie_db.append(movie)
    return {'id': len(fake_movie_db) - 1}
        

Ora apriamo il browser e testiamo la nuova API. Proviamo ad aggiungere un film con un campo non valido o senza i campi obbligatori e verifichiamo che la convalida venga gestita automaticamente da FastAPI.

Aggiungiamo un nuovo endpoint per aggiornare il film.

            @app.put('/{id}')
async def update_movie(id: int, payload: Movie):
    movie = payload.dict()
    movies_length = len(fake_movie_db)
    if 0 <= id <= movies_length:
        fake_movie_db[id] = movie
        return None
    raise HTTPException(status_code=404, 
                        detail="Movie with given id not found")
        

Ecco l’indice id della nostra lista fake_movie_db.

Nota: ricorda di importare HTTPException da Fastapi

Ora possiamo anche aggiungere l’endpoint per eliminare il film.

            @app.delete('/{id}')
async def delete_movie(id: int):
    movies_length = len(fake_movie_db)
    if 0 <= id <= movies_length:
        del fake_movie_db[id]
        return None
    raise HTTPException(status_code=404, 
                        detail="Movie with given id not found")
        
Prima di andare avanti, strutturiamo la nostra app in un modo migliore. Creiamo una nuova cartella api all’interno di app e creiamo un nuovo file movies.py all’interno della cartella creata di recente. Spostiamo tutti i codici relativi alle route da main.py a movies.py. Quindi, movies.py dovrebbe essere simile al seguente:
            #~/movie-service/app/api/movies.py

from typing import List
from fastapi import Header, APIRouter

from app.api.models import Movie

fake_movie_db = [
    {
        'name': 'Star Wars: Episode IX - The Rise of Skywalker',
        'plot': 'The surviving members of the resistance face the First Order once again.',
        'genres': ['Action', 'Adventure', 'Fantasy'],
        'casts': ['Daisy Ridley', 'Adam Driver']
    }
]

movies = APIRouter()

@movies.get('/', response_model=List[Movie])
async def index():
    return fake_movie_db

@movies.post('/', status_code=201)
async def add_movie(payload: Movie):
    movie = payload.dict()
    fake_movie_db.append(movie)
    return {'id': len(fake_movie_db) - 1}

@movies.put('/{id}')
async def update_movie(id: int, payload: Movie):
    movie = payload.dict()
    movies_length = len(fake_movie_db)
    if 0 <= id <= movies_length:
        fake_movie_db[id] = movie
        return None
    raise HTTPException(status_code=404, detail="Movie with given id not found")

@movies.delete('/{id}')
async def delete_movie(id: int):
    movies_length = len(fake_movie_db)
    if 0 <= id <= movies_length:
        del fake_movie_db[id]
        return None
    raise HTTPException(status_code=404, detail="Movie with given id not found")
        

Qui abbiamo registrato un nuovo percorso API utilizzando APIRouter di FastAPI.

Inoltre, creiamo un nuovo file models.py all’interno di api in cui definiamo i nostri modelli Pydantic.

            #~/movie-service/api/models.py

from typing import List
from pydantic import BaseModel

class Movie(BaseModel):
    name: str
    plot: str
    genres: List[str]
    casts: List[str]

        
Ora dobbiamo indicare a FastAPI dove trovare i persorsi web. A tale scopo dobbiamo registrare all’interno di main.py il nuovo file con le “routes”.
            #~/movie-service/app/main.py

from fastapi import FastAPI

from app.api.movies import movies

app = FastAPI()

app.include_router(movies)
        
Infine, la struttura della directory della nostra applicazione è simile alla seguente:
            movie-service
├── app
│   ├── api
│   │   ├── models.py
│   │   ├── movies.py
│   |── main.py
└── env
        

Prima di proseguire è bene assicurarsi che l’applicazione funzioni correttamente.

 

Utilizzo del database PostgreSQL con FastAPI

In precedenza abbiamo usato lista di stringhe Python per simulare un elenco di film, ma ora siamo pronti per utilizzare un database reale a tale questo scopo. In particolare, in questa applicazione useremo PostgreSQL. Installaremo PostgreSQL, se non l’hai già fatto. Dopo aver installato PostgreSQL creeremo un nuovo database che chiameremo movie_db.

Utilizzeramo encode/database per connettersi al database utilizzando il supporto async e await.

Installa la libreria richiesta utilizzando:

            pip install 'databases[postgresql]'
        

questo comando installerà anche sqlalchemy e asyncpg, che sono necessari per lavorare con PostgreSQL.

Creiamo un nuovo file all’interno di api e lo chiamiamo db.py. Questo file conterrà il modello del database reale per il REST API.

            #~/movie-service/app/api/db.py

from sqlalchemy import (Column, Integer, MetaData, String, Table,
                        create_engine, ARRAY)

from databases import Database

DATABASE_URL = 'postgresql://movie_user:movie_password@localhost/movie_db'

engine = create_engine(DATABASE_URL)
metadata = MetaData()

movies = Table(
    'movies',
    metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String(50)),
    Column('plot', String(250)),
    Column('genres', ARRAY(String)),
    Column('casts', ARRAY(String))
)

database = Database(DATABASE_URL)
        

In particolare, DATABASE_URI è l’URL utilizzato per connettersi al database PostgreSQL. movie_user è il nome dell’utente del database, movie_password è la password dell’utente del database ed movie_db è il nome del database.

Proprio come in SQLAlchemy, abbiamo creato la tabella per il database dei film.

Aggiorniamo quindi main.py per connettersi al database. Il codice di main.py è il seguente:

            #~/movie-service/app/main.py

from fastapi import FastAPI
from app.api.movies import movies
from app.api.db import metadata, database, engine

metadata.create_all(engine)

app = FastAPI()

@app.on_event("startup")
async def startup():
    await database.connect()

@app.on_event("shutdown")
async def shutdown():
    await database.disconnect()
    
app.include_router(movies)
        
FastAPI fornisce alcuni gestori di eventi che è possibile utilizzare per connettersi al nostro database all’avvio dell’applicazione e disconnettersi alla chiusura. Aggiorniamo movies.py in modo che utilizzi un database invece di un falso elenco Python.
            #~/movie-service/app/api/movies.py

from typing import List
from fastapi import Header, APIRouter

from app.api.models import MovieIn, MovieOut
from app.api import db_manager

movies = APIRouter()

@movies.get('/', response_model=List[MovieOut])
async def index():
    return await db_manager.get_all_movies()

@movies.post('/', status_code=201)
async def add_movie(payload: MovieIn):
    movie_id = await db_manager.add_movie(payload)
    response = {
        'id': movie_id,
        **payload.dict()
    }

    return response

@movies.put('/{id}')
async def update_movie(id: int, payload: MovieIn):
    movie = payload.dict()
    fake_movie_db[id] = movie
    return None

@movies.put('/{id}')
async def update_movie(id: int, payload: MovieIn):
    movie = await db_manager.get_movie(id)
    if not movie:
        raise HTTPException(status_code=404, detail="Movie not found")

    update_data = payload.dict(exclude_unset=True)
    movie_in_db = MovieIn(**movie)

    updated_movie = movie_in_db.copy(update=update_data)

    return await db_manager.update_movie(id, updated_movie)

@movies.delete('/{id}')
async def delete_movie(id: int):
    movie = await db_manager.get_movie(id)
    if not movie:
        raise HTTPException(status_code=404, detail="Movie not found")
    return await db_manager.delete_movie(id)
        
Aggiungiamo db_manager.py per manipolare il nostro database.
            #~/movie-service/app/api/db_manager.py

from app.api.models import MovieIn, MovieOut, MovieUpdate
from app.api.db import movies, database

async def add_movie(payload: MovieIn):
    query = movies.insert().values(**payload.dict())
    return await database.execute(query=query)

async def get_all_movies():
    query = movies.select()
    return await database.fetch_all(query=query)

async def get_movie(id):
    query = movies.select(movies.c.id==id)
    return await database.fetch_one(query=query)

async def delete_movie(id: int):
    query = movies.delete().where(movies.c.id==id)
    return await database.execute(query=query)

async def update_movie(id: int, payload: MovieIn):
    query = (
        movies
        .update()
        .where(movies.c.id == id)
        .values(**payload.dict())
    )
    return await database.execute(query=query)
        
Aggiorniamo il nostro models.py in modo che possiamo utilizzare il modello Pydantic con la tabella sqlalchemy.
            #~/movie-service/app/api/models.py

from pydantic import BaseModel
from typing import List, Optional


class MovieIn(BaseModel):
    name: str
    plot: str
    genres: List[str]
    casts: List[str]


class MovieOut(MovieIn):
    id: int


class MovieUpdate(MovieIn):
    name: Optional[str] = None
    plot: Optional[str] = None
    genres: Optional[List[str]] = None
    casts: Optional[List[str]] = None
        

La classe MovieIn è il modello base da usare per aggiungere un film al database. Dobbiamo aggiungere id a questo modello per ottenerlo dal database, creando il modello ModeulOut. Il modello MovieUpdate consente di impostare i valori nel modello come facoltativi in modo che durante l’aggiornamento del filmato possa essere inviato solo il campo che deve essere aggiornato.

Possiamo ora collegarci alla pagina “docs” della nostra applicazione ed inizia a giocare con l’API.

 

 

Modelli di gestione dei dati nei microservice

La gestione dei dati è uno degli aspetti più critici durante la creazione di un microservizio. Poiché le diverse funzioni dell’applicazione sono gestite da servizi diversi, l’utilizzo di un database può essere complicato.

Di seguito sono riportati alcuni modelli che è possibile utilizzare per gestire il flusso di dati nell’applicazione.

 

Database per servizio

L’utilizzo di un database per ogni servizio è ottimo se vuoi che i tuoi microservizi siano il più possibile accoppiati debolmente. Avere un database diverso per ogni servizio ci consente di scalare servizi diversi in modo indipendente. Una transazione che coinvolge più database viene eseguita tramite API ben definite. Ciò ha il suo svantaggio in quanto non è semplice implementare transazioni complesse che coinvolgono più servizi . Inoltre, l’aggiunta del sovraccarico di rete rende questo approccio meno efficiente da usare.

 

Database condiviso

Se ci sono molte transazioni che coinvolgono più servizi è meglio usare un database condiviso. Ciò comporta i vantaggi di un’applicazione altamente coerente, ma elimina la maggior parte dei vantaggi offerti dall’architettura dei microservizi. Gli sviluppatori che lavorano su un servizio devono coordinarsi con le modifiche allo schema in altri servizi.

 

Composizione API

Nelle transazioni che coinvolgono più database, il compositore API funge da gateway API ed esegue chiamate API ad altri microservizi nell’ordine richiesto. Infine, i risultati di ogni microservizio vengono restituiti al servizio client dopo aver eseguito un join in memoria. Lo svantaggio di questo approccio è l’inefficienza dei join in memoria di un set di dati di grandi dimensioni.

 

 

Creazione di un microservice Python in Docker

Il problema della distribuzione del microservice può essere notevolmente ridotto utilizzando Docker. Docker aiuta a incapsulare ogni servizio e a scalarlo in modo indipendente.

 

Installazione di Docker e Docker Compose

Se non hai già installato docker nel tuo sistema, verifichiamo se docker è installato eseguendo il comando docker. Dopo aver completato l’installazione di Docker, installiamo Docker Compose . Docker Compose viene utilizzato per definire ed eseguire più container Docker. E’ utile anche per facilitare l’interazione tra i container.

 

Creazione del servizio Movies

Poiché gran parte del lavoro per la creazione di un servizio Movies è già stato svolto all’inizio con FastAPI, riutilizzeremo il codice che abbiamo già scritto. Creiamo una cartella nuova di zecca, che chiamerò python-microservices e spostiamoci il codice che abbiamo scritto in precedenza.

Quindi, la struttura delle cartelle sarebbe simile a questa:

            python-microservices/
└── movie-service/
    ├── app/
    └── env/
        

Prima di tutto, creiamo un file requirements.txt in cui conserveremo tutte le dipendenze che utilizzeremo in movie-service.

Creiamo un nuovo file requirements.txt all’interno di movie-service e aggiungiamo quanto segue:

            asyncpg==0.20.1
databases[postgresql]==0.2.6
fastapi==0.48.0
SQLAlchemy==1.3.13
uvicorn==0.11.2
httpx==0.11.1
        

Abbiamo usato tutte le librerie menzionate nel file tranne httpx che utilizzerai mentre effettui una chiamata API da servizio ad un altro.

Creiamo un Dockerfile all’interno di movie-service come segue:

            FROM python:3.8-slim

WORKDIR /app

COPY ./requirements.txt /app/requirements.txt

RUN apt-get update \
    && apt-get install gcc -y \
    && apt-get clean

RUN pip install -r /app/requirements.txt \
    && rm -rf /root/.cache/pip

COPY . /app/
        

Per prima cosa definiamo quale versione di Python usare. Quindi impostiamo la cartella app come WORKDIR all’interno del contenitore Docker. Dopodiché viene installato gcc, richiesto dalle librerie che utilizziamo nell’applicazione.

Infine, installiamo tutte le dipendenze in requirements.txt e copiamo tutti i file all’interno di movie-service/app.

Aggiorniamo db.py e sostituiamo:

            DATABASE_URI = 'postgresql://movie_user:movie_password@localhost/movie_db'
        

con:

            DATABASE_URI = os.getenv('DATABASE_URI')
        

NOTA: non dimentichiamo di importare os nella parte superiore del file.

È necessario eseguire questa operazione in modo da poter fornire in seguito DATABASE_URI come variabile di ambiente.

Inoltre, aggiorniamo main.py e sostituiamo:

            app.include_router(movies)
        
con:
            app.include_router(movies, prefix='/api/v1/movies', tags=['movies'])
        

Abbiamo aggiunto prefix=/api/v1/movies in modo da gestire più facilmente le diverse versioni dell’API. Inoltre, i tag facilitano la ricerca delle API relative ai movies nei docs di FastAPI.

Inoltre, dobbiamo aggiornare i nostri modelli in modo che casts memorizzi l’ID del cast invece del nome effettivo. Quindi, aggiorniamo models.py come segue:

            #~/python-microservices/movie-service/app/api/models.py

from pydantic import BaseModel
from typing import List, Optional

class MovieIn(BaseModel):
    name: str
    plot: str
    genres: List[str]
    casts_id: List[int]


class MovieOut(MovieIn):
    id: int


class MovieUpdate(MovieIn):
    name: Optional[str] = None
    plot: Optional[str] = None
    genres: Optional[List[str]] = None
    casts_id: Optional[List[int]] = None
        
Allo stesso modo, dobbiamo aggiornare le tabelle del database, aggiorniamo db.py:
            #~/python-microservices/movie-service/app/api/db.py

import os

from sqlalchemy import (Column, DateTime, Integer, MetaData, String, Table,
                        create_engine, ARRAY)

from databases import Database

DATABASE_URL = os.getenv('DATABASE_URL')

engine = create_engine(DATABASE_URL)
metadata = MetaData()

movies = Table(
    'movies',
    metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String(50)),
    Column('plot', String(250)),
    Column('genres', ARRAY(String)),
    Column('casts_id', ARRAY(Integer))
)

database = Database(DATABASE_URL)
        

Ora, aggiorniamo movies.py per verificare se il cast con l’ID specificato si presenta nel cast-service prima di aggiungere un nuovo film o aggiornare un film.

            #~/python-microservices/movie-service/app/api/movies.py

from typing import List
from fastapi import APIRouter, HTTPException

from app.api.models import MovieOut, MovieIn, MovieUpdate
from app.api import db_manager
from app.api.service import is_cast_present

movies = APIRouter()

@movies.post('/', response_model=MovieOut, status_code=201)
async def create_movie(payload: MovieIn):
    for cast_id in payload.casts_id:
        if not is_cast_present(cast_id):
            raise HTTPException(status_code=404, detail=f"Cast with id:{cast_id} not found")

    movie_id = await db_manager.add_movie(payload)
    response = {
        'id': movie_id,
        **payload.dict()
    }

    return response

@movies.get('/', response_model=List[MovieOut])
async def get_movies():
    return await db_manager.get_all_movies()

@movies.get('/{id}/', response_model=MovieOut)
async def get_movie(id: int):
    movie = await db_manager.get_movie(id)
    if not movie:
        raise HTTPException(status_code=404, detail="Movie not found")
    return movie

@movies.put('/{id}/', response_model=MovieOut)
async def update_movie(id: int, payload: MovieUpdate):
    movie = await db_manager.get_movie(id)
    if not movie:
        raise HTTPException(status_code=404, detail="Movie not found")

    update_data = payload.dict(exclude_unset=True)

    if 'casts_id' in update_data:
        for cast_id in payload.casts_id:
            if not is_cast_present(cast_id):
                raise HTTPException(status_code=404, detail=f"Cast with given id:{cast_id} not found")

    movie_in_db = MovieIn(**movie)

    updated_movie = movie_in_db.copy(update=update_data)

    return await db_manager.update_movie(id, updated_movie)

@movies.delete('/{id}', response_model=None)
async def delete_movie(id: int):
    movie = await db_manager.get_movie(id)
    if not movie:
        raise HTTPException(status_code=404, detail="Movie not found")
    return await db_manager.delete_movie(id)
        

Aggiungiamo un servizio per effettuare una chiamata API al casts-service:

            #~/python-microservices/movie-service/app/api/service.py

import os
import httpx

CAST_SERVICE_HOST_URL = 'http://localhost:8002/api/v1/casts/'
url = os.environ.get('CAST_SERVICE_HOST_URL') or CAST_SERVICE_HOST_URL

def is_cast_present(cast_id: int):
    r = httpx.get(f'{url}{cast_id}')
    return True if r.status_code == 200 else False
        

Nel precedente codice si effettua una chiamata API per ricavare il cast con uno specifico id e restituire true se il cast esiste altrimenti false.


Creazione del casts-service

Analogamente al movie-service, per creare un casts-service utilizzeremo FastAPI e un database PostgreSQL.

Creiamo una struttura di cartelle come la seguente:

            python-microservices/
.
├── cast_service/
│   ├── app/
│   │   ├── api/
│   │   │   ├── casts.py
│   │   │   ├── db_manager.py
│   │   │   ├── db.py
│   │   │   ├── models.py
│   │   ├── main.py
│   ├── Dockerfile
│   └── requirements.txt
├── movie_service/
...
        
Aggiungiamo quanto segue a requirements.txt:
            asyncpg==0.20.1
databases[postgresql]==0.2.6
fastapi==0.48.0
SQLAlchemy==1.3.13
uvicorn==0.11.2
        
Dockerfile:
            FROM python:3.8-slim

WORKDIR /app

COPY ./requirements.txt /app/requirements.txt

RUN apt-get update \
    && apt-get install gcc -y \
    && apt-get clean

RUN pip install -r /app/requirements.txt \
    && rm -rf /root/.cache/pip

COPY . /app/
        
main.py:
            #~/python-microservices/cast-service/app/main.py

from fastapi import FastAPI
from app.api.casts import casts
from app.api.db import metadata, database, engine

metadata.create_all(engine)

app = FastAPI()

@app.on_event("startup")
async def startup():
    await database.connect()

@app.on_event("shutdown")
async def shutdown():
    await database.disconnect()

app.include_router(casts, prefix='/api/v1/casts', tags=['casts'])
        

Abbiamo aggiunto il prefix /api/v1/casts in modo che la gestione dell’API diventi più semplice. Inoltre, l’aggiunta dei tags facilita la ricerca dei documenti relativi a casts nel docs di FastAPI.

casts.py

            #~/python-microservices/cast-service/app/api/casts.py

from fastapi import APIRouter, HTTPException
from typing import List

from app.api.models import CastOut, CastIn, CastUpdate
from app.api import db_manager

casts = APIRouter()

@casts.post('/', response_model=CastOut, status_code=201)
async def create_cast(payload: CastIn):
    cast_id = await db_manager.add_cast(payload)

    response = {
        'id': cast_id,
        **payload.dict()
    }

    return response

@casts.get('/{id}/', response_model=CastOut)
async def get_cast(id: int):
    cast = await db_manager.get_cast(id)
    if not cast:
        raise HTTPException(status_code=404, detail="Cast not found")
    return cast
        
db_manager.py
            #~/python-microservices/cast-service/app/api/db_manager.py

from app.api.models import CastIn, CastOut, CastUpdate
from app.api.db import casts, database


async def add_cast(payload: CastIn):
    query = casts.insert().values(**payload.dict())

    return await database.execute(query=query)

async def get_cast(id):
    query = casts.select(casts.c.id==id)
    return await database.fetch_one(query=query)
        
db.py
            #~/python-microservices/cast-service/app/api/db.py

import os

from sqlalchemy import (Column, Integer, MetaData, String, Table,
                        create_engine, ARRAY)

from databases import Database

DATABASE_URI = os.getenv('DATABASE_URI')

engine = create_engine(DATABASE_URI)
metadata = MetaData()

casts = Table(
    'casts',
    metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String(50)),
    Column('nationality', String(20)),
)

database = Database(DATABASE_URI)
        
models.py
            #~/python-microservices/cast-service/app/api/models.py

from pydantic import BaseModel
from typing import List, Optional

class CastIn(BaseModel):
    name: str
    nationality: Optional[str] = None


class CastOut(CastIn):
    id: int


class CastUpdate(CastIn):
    name: Optional[str] = None
        

Esecuzione del microservizio utilizzando Docker Compose

Per eseguire i microservice, è necessario creare un  filedocker-compose.yml e aggiungiamo quanto segue:

            version: '3.7'

services:
  movie_service:
    build: ./movie-service
    command: uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
    volumes:
      - ./movie-service/:/app/
    ports:
      - 8001:8000
    environment:
      - DATABASE_URI=postgresql://movie_db_username:movie_db_password@movie_db/movie_db_dev
      - CAST_SERVICE_HOST_URL=http://cast_service:8000/api/v1/casts/

  movie_db:
    image: postgres:12.1-alpine
    volumes:
      - postgres_data_movie:/var/lib/postgresql/data/
    environment:
      - POSTGRES_USER=movie_db_username
      - POSTGRES_PASSWORD=movie_db_password
      - POSTGRES_DB=movie_db_dev

  cast_service:
    build: ./cast-service
    command: uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
    volumes:
      - ./cast-service/:/app/
    ports:
      - 8002:8000
    environment:
      - DATABASE_URI=postgresql://cast_db_username:cast_db_password@cast_db/cast_db_dev

  cast_db:
    image: postgres:12.1-alpine
    volumes:
      - postgres_data_cast:/var/lib/postgresql/data/
    environment:
      - POSTGRES_USER=cast_db_username
      - POSTGRES_PASSWORD=cast_db_password
      - POSTGRES_DB=cast_db_dev

volumes:
  postgres_data_movie:
  postgres_data_cast:
        

Abbiamo 4 diversi servizi, movie_service, un database per movie_service, cast_service e un database per cast service. Abbiamo esposto il movie_service alla porta 8001 e modo simile cast_service alla porta 8002.

Per il database, abbiamo utilizzato i volumi in modo che i dati non vengano distrutti quando il contenitore docker viene chiuso. 

Eseguiamo il docker-compose usando il comando:

            docker-compose up -d
        

Questo comando crea la dockar image, se non esiste già, e la esegue.

Andiamo su http://localhost:8002/docs per aggiungere un cast nel casts-service. Allo stesso modo, http://localhost:8001/docs  per aggiungere il film nel movie-service.

Utilizziamo di Nginx per accedere a entrambi i servizi utilizzando un singolo indirizzo host

Abbiamo distribuito i microservizi utilizzando Docker compose, ma c’è un piccolo problema. È necessario accedere a ciascuno dei microservizi utilizzando una porta diversa. Si può risolvere questo problema utilizzando il reverse-proxy di Nginx , utilizzando Nginx puoi indirizzare la richiesta aggiungendo un middleware che indirizza le nostre richieste a diversi servizi in base all’URL dell’API.

Aggiungamo un nuovo file nginx_config.conf all’interno python-microservices con i seguenti contenuti.

            server {
  listen 8080;

  location /api/v1/movies {
    proxy_pass http://movie_service:8000/api/v1/movies;
  }

  location /api/v1/casts {
    proxy_pass http://cast_service:8000/api/v1/casts;
  }

}
        

Stiamo eseguendo Nginx alla porta 8080 e instradando le richieste al movie-service se l’endpoint inizia con /api/v1/movies e in modo simile al casts-service se l’endpoint inizia con /api/v1/casts.

Ora dobbiamo aggiungere il servizio nginx nel nostro file docker-compose-yml. Aggiungiamo il seguente servizio dopo il servizio cast_db:

            ...
nginx:
    image: nginx:latest
    ports:
      - "8080:8080"
    volumes:
      - ./nginx_config.conf:/etc/nginx/conf.d/default.conf
    depends_on:
      - cast_service
      - movie_service
...
        
Ora, spegnamo i contenitori docker con il comando:
            docker-compose down
        

Ed eseguiamolo di nuovo con:

            docker-compose up -d
        

Ora possiamo accedere sia al movie-service che al casts-service tramite la porta 8080.

Collegandosi a http://localhost:8080/api/v1/movies/ otteniamo l’elenco dei film.

Ora, per accedere ai docs dei service è necessario modificare il  main.py del movie-service la seguente riga:

            app = FastAPI()
        

con

            app = FastAPI(openapi_url="/api/v1/movies/openapi.json", docs_url="/api/v1/movies/docs")
        
Allo stesso modo, per il casts-service si deve sostituirlo con
            app = FastAPI(openapi_url="/api/v1/casts/openapi.json", docs_url="/api/v1/casts/docs")
        

Abbiamo cambiato l’endpoint in cui vengono serviti i docs e da dove viene servito il openapi.json.

Ora possiamo accedere ai docs da http://localhost:8080/api/v1/movies/docs per il movie-service e da http://localhost:8080/api/v1/casts/docs per il casts-service.

 

Conclusione

L’architettura del microservice è ottima per suddividere una grande applicazione monolitica in logiche di business separate, ma anche questo comporta una complicazione. Python è ottimo per la creazione di microservizi grazie all’esperienza degli sviluppatori e a tonnellate di pacchetti e framework per rendere gli sviluppatori più produttivi.

Puoi trovare il codice completo presentato in questo tutorial su Github.

FastAPI con SQLAlchemy, PostgreSQL, Alembic e Docker – Parte 2 (versione asincrona)

scienzadeidati articoli - Database con FASTAPI [Parte2]

 

Introduzione

Lo scopo di questo articolo è creare una semplice guida su come utilizzare FastAPI con database relazionali in modo asincrono e utilizzare Alembic per le migrazioni.

Prima di iniziare con questo tutorial, leggi la parte 1 di questo tutorial.

Ecco il codice funzionante completo su github.

Iniziamo

Installa il pacchetto richiesto databases.

databases è un pacchetto leggero con supporto asyncio per molti database relazionali e utilizza le principali query di sqlalchemy.

Per lo scopo di questo tutorial userò pipenv , ma puoi usare pip o poetry o conda o qualsiasi altro gestore di pacchetti che preferisci.

            pipenv install databases
pipenv install databases[postgresql]
pipenv install asyncpg
        

useremo la stessa configurazione docker descritta nell’articolo precedente.

Dockerfile

            # Pull base image
FROM python:3.7

# Set environment varibles
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1

WORKDIR /code/

# Install dependencies
RUN pip install pipenv
COPY Pipfile Pipfile.lock /code/
RUN pipenv install --system --dev

COPY . /code/

EXPOSE 8000
        

docker-compose.yml

            version: "3"

services:
  db:
    image: postgres:11
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=test_db
  web:
    build: .
    command: bash -c "uvicorn main:app --host 0.0.0.0 --port 8000 --reload"
    volumes:
      - .:/code
    ports:
      - "8000:8000"
    depends_on:
      - db

  pgadmin:
    container_name: pgadmin
    image: dpage/pgadmin4
    environment:
      - [email protected]
      - PGADMIN_DEFAULT_PASSWORD=admin
    ports:
      - "5050:80"
    depends_on:
      - db
        

manterremo schema.py così com’è

            # schema.py

from pydantic import BaseModel


class User(BaseModel):
    first_name: str
    last_name: str
    age: int

    class Config:
        orm_mode = True
        

Analogamente manteniamo lo stesso alembic.ini.

Modifichiamo il file .env come segue:

DATABASE_URL=postgresql://postgres:postgres@db:5432/postgres

ed lo utilizziamo nel file db.py dove inizializzeremo il nostro database.

            # db.py

import os
from databases import Database
from dotenv import load_dotenv
import sqlalchemy

BASE_DIR = os.path.dirname(os.path.abspath(__file__))
load_dotenv(os.path.join(BASE_DIR, ".env"))

db = Database(os.environ["DATABASE_URL"])
metadata = sqlalchemy.MetaData()
        

A questo punto dobbiamo prevedere il file app.py, dove gestiremo l’inizializzazione dell’app con la connessione e la terminazione del database.

            # app.py

from db import db
from fastapi import FastAPI


app = FastAPI(title="Async FastAPI")


@app.on_event("startup")
async def startup():
    await db.connect()


@app.on_event("shutdown")
async def shutdown():
    await db.disconnect()
        
Di conseguenza dobbiamo modificare il file model.py.
            # model.py

from db import db

users = sqlalchemy.Table(
    "users",
    metadata,
    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column("first_name", sqlalchemy.String),
    sqlalchemy.Column("last_name", sqlalchemy.String),
    sqlalchemy.Column("age", sqlalchemy.Integer),
)
        
miglioriamo il nostro model.py, creando una semplice classe di gestione del modello User
            # model.py

import sqlalchemy
from db import db, metadata, sqlalchemy


users = sqlalchemy.Table(
    "users",
    metadata,
    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column("first_name", sqlalchemy.String),
    sqlalchemy.Column("last_name", sqlalchemy.String),
    sqlalchemy.Column("age", sqlalchemy.Integer),
)


class User:
    @classmethod
    async def get(cls, id):
        query = users.select().where(users.c.id == id)
        user = await db.fetch_one(query)
        return user

    @classmethod
    async def create(cls, **user):
        query = users.insert().values(**user)
        user_id = await db.execute(query)
        return user_id
        

Questa classe fornirà un’implementazione più semplice per i metodi get e create.

Di conseguenza dobbiamo modificare il file main.py come segue.

            # main.py

import uvicorn
from models import User as ModelUser
from schema import User as SchemaUser
from app import app
from db import db


@app.post("/user/")
async def create_user(user: SchemaUser):
    user_id = await ModelUser.create(**user.dict())
    return {"user_id": user_id}


@app.get("/user/{id}", response_model=SchemaUser)
async def get_user(id: int):
    user = await ModelUser.get(id)
    return SchemaUser(**user).dict()


if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)
        

Da notare come ora stiamo usando async/await per gestire le chiamate verso il database.

È tempo quindi di modificare la configurazione del nostro Alembic.

Bisogna modificare
import models

target_metadata = models.Base.metadata

in
import models
from db import metadata

target_metadata = metadata

 

NOTA: è importante importare i modelli prima dei metadati.

 

Quindi procediamo a ricompilare il nostro Docker:

  • Costruzione: docker-compose build
  • Creazioni migrazioni: docker-compose run web alembic revision --autogenerate
  • Migrazione: docker-compose run web alembic upgrade head
  • Esecuzione: docker-compose up

Ora aprendo il browser e collegarsi a http://localhost:8000

articoli - pgadmin localhost 7
richiesta POST per creare un utente
articoli - pgadmin localhost 8
risposta alla richiesta precedente
articoli - pgadmin localhost 9
richiesta GET dello stesso utente con risposta

Spero che questo tutorial sia stato abbastanza completo su come utilizzare FastAPI con PostgreSQL, SQLAlchemy e Alembic utilizzando la potenza di async .

Il codice completo per questo articolo è disponibile su github.

FastAPI con SQLAlchemy, PostgreSQL e Alembic e ovviamente Docker – Parte 1

scienzadeidati articoli - Database con FASTAPI

La guida completa (tutorial) all’utilizzo dei database relazionali con FastAPI

Introduzione

Lo scopo di questo articolo è creare una semplice guida su come utilizzare FastAPI con i database relazionali e utilizzare Alembic per le migrazioni. Un’implementazione che può essere utilizzata in produzione.

Installazione

Useremo pipenv per gestire sia i miei pacchetti che l’ambiente virtuale. Sentiti libero di gestire i tuoi pacchetti come preferisci.

Pacchetti Utilizzati
  • python ≥ 3.5
  • fastapi
  • pydantic
  • fastapi-sqlalchemy
  • alembic
  • psycopg2
  • uvicorn

Creiamo una nuova directory (puoi chiamarla come vuoi).

Ad esempio possiamo chiamarla fastapi_sqlalchemy_alembic

Apri il terminale e scrivi

cd fastapi_sqlalchemy_alembic

pipenv install --three fastapi fastapi-sqlalchemy pydantic alembic psycopg2 uvicorn

Utilizzerò docker compose per gestire il database, potresti ricevere alcuni errori relativi all’installazione di psycopg2 se stai utilizzando Mac-OS, ma dal momento che utilizzeremo Docker, questo non è molto importante.

Main.py

Iniziamo con un semplice file principale per Fastapi

            # main.py

import uvicorn
from fastapi import FastAPI

app = FastAPI()

@app.post("/user/", response_model=User)
def create_user(user: User):
    return user

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)
    
        

Configurazione di Docker

            # Dockerfile

# Pull base image
FROM python:3.7

# Set environment varibles
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1

WORKDIR /code/

# Install dependencies
RUN pip install pipenv
COPY Pipfile Pipfile.lock /code/
RUN pipenv install --system --dev

COPY . /code/

EXPOSE 8000
        
            # docker-compose.yml

version: "3"

services:
  db:
    image: postgres:11
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=test_db
  web:
    build: .
    command: bash -c "alembic upgrade head && uvicorn main:app --host 0.0.0.0 --port 8000 --reload"
    volumes:
      - .:/code
    ports:
      - "8000:8000"
    depends_on:
      - db

  pgadmin:
    container_name: pgadmin
    image: dpage/pgadmin4
    environment:
      - [email protected]
      - PGADMIN_DEFAULT_PASSWORD=admin
    ports:
      - "5050:80"
    depends_on:
      - db
        

La configurazione precedente creerà un cluster con 3 contenitori:

  1. contenitore web — dove verrà eseguito il codice effettivo
  2. contenitore db
  3. contenitore pgadmin

Nella tua directory corrente dovresti vedere 5 file:

  1. file pip
  2. Pipfile.lock
  3. Dockerfile
  4. docker-compose.yml
  5. main.py

Quindi costruiamo il cluster docker, eseguendo il seguente comando nel terminale:

docker-compose build

 

Alembic

Si inizializza Alembic eseguendo il seguente cmd nel terminale della stessa directory:

alembic init alembic

In questo modo si crea una directory chiamata alembic e un file di configurazione alembic.ini

articoli - alembic

Il prossimo passo è aprire il file alembic.ini con il tuo editor e modificare la riga 38 da:

sqlalchemy.url = driver://user:pass@localhost/dbname

a:

sqlalchemy.url =

e quindi aggiungere l’url db postgres nel file alembic/env.py.

Dato che stiamo creando una configurazione che dovrebbe funzionare in produzione, non possiamo scrivere in chiaro il nome utente e password del database all’interno del file alembic.ini. Dobbiamo invece leggerlo dalle variabili d’ambiente tramite lo script alembic/env.py.

 
Installiamo python-dotenv

pipenv install python-dotenv

Dal momento che abbiamo aggiunto un nuovo pacchetto, ricostruiamo il docker per includerlo:

docker-compose build

 
Creiamo un .envfile

e aggiungiamo quanto segue:

DATABASE_URL = postgresql+psycopg2://postgres:postgres@db:5432

Come abbiamo scoperto l’URL del database?

DATABASE_URL = postgresql+psycopg2://{utente}:{password}@{host}:{porta}

se controlliamo la configurazione docker-compose.yml per il database:

            ...
db:
    image: postgres:11
    ports:
        - "5432:5432"
    environment:
        - POSTGRES_USER=postgres
        - POSTGRES_PASSWORD=postgres
        - POSTGRES_DB=test_db

...

        

Vediamo come user=postgres, password=postgres e poiché siamo nel mondo docker, l’host del database non sarà localhost ma il nome del contenitore, nel nostro caso lo abbiamo chiamato db.

Quindi aggiungiamo questa riga al nostro .env:

DATABASE_URL = postgresql+psycopg2://postgres:postgres@db:5432

Apriamo alembic\env.py , che dovrebbe apparire come segue:

            from logging.config import fileConfig

from sqlalchemy import engine_from_config
from sqlalchemy import pool

from alembic import context

# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config

# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)

# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = None
        

Dobbiamo quindi apportare le seguenti modifiche:

            from logging.config import fileConfig

from sqlalchemy import engine_from_config
from sqlalchemy import poolfrom alembic import context
# ---------------- added code here -------------------------#
import os, sys
from dotenv import load_dotenv

BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
load_dotenv(os.path.join(BASE_DIR, ".env"))
sys.path.append(BASE_DIR)
#------------------------------------------------------------#
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# ---------------- added code here -------------------------#
# this will overwrite the ini-file sqlalchemy.url path
# with the path given in the config of the main code
config.set_main_option("sqlalchemy.url", os.environ["DATABASE_URL"])
#------------------------------------------------------------#
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)

# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
# ---------------- added code here -------------------------#
import models
#------------------------------------------------------------#
# ---------------- changed code here -------------------------#
# here target_metadata was equal to None
target_metadata = models.Base.metadata
#------------------------------------------------------------#

        

Modelli

Ora creiamo i nostri modelli da migrare a PostgreSQL:

            # models.py

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String

Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True, index=True)
    first_name = Column(String,)
    last_name = Column(String)
    age = Column(Integer)
        

Quindi è necessario generare la revisione per la nostra prima migrazione:

docker-compose run web alembic revision --autogenerate -m "First migration"

articoli - alembic update

Se il comando è stato eseguito correttamente dovresti vedere un nuovo file generato nella directory “versions”:

articoli - alembic2

Infine possiamo eseguire la migrazione:

docker-compose run web alembic upgrade head

articoli - alembic update2

Pgadmin

Per controllare le migrazioni create è sufficiente eseguire nel terminale il seguente comando:

docker-compose up

ed aspettare un po’, ci vuole un po’ di tempo per caricare. 

A caricamento concluso, si apre un browser all’indirizzo localhost:5050 e si può accedere con [email protected] e password=admin, come da impostazione definita nel nostro docker-compose.yml

            ...
pgadmin:
    container_name: pgadmin
    image: dpage/pgadmin4
    environment:
      - [email protected]
      - PGADMIN_DEFAULT_PASSWORD=admin
    ports:
      - "5050:80"
    depends_on:
      - db
...
        
articoli - pgadmin localhost

Una volta entrati, è necessario creare una nuova connessione. Alla voce “General” si deve scegliere un nome per la connessione.

Alla voce “Connection” bisogna inserire i riferimenti e le credenziali per connettersi al database (la password è postgres)

articoli - pgadmin localhost 2

Navigamio fino a trovare la nostra tabella User.

Servers > {your-connection-name} > Databases > postgres > Schemas > public > Tables > users

articoli - pgadmin localhost 3
articoli - pgadmin localhost 4

Ora possiamo sicuramente dire che la nostra migrazione si è conclusa con successo.

Finore abbiamo implementato completamente l’ORM con le migrazioni Alembic. Il prossimo passo è collegarlo allo schema Pydantic.

 

Schema — Modello Pydantic

            # schema.py

from pydantic import BaseModel

class User(BaseModel):
    first_name: str
    last_name: str = None
    age: int
    class Config:
        orm_mode = True
        

Si noti che abbiamo una classe Config in cui impostiamo orm_mode=True ed è tutto ciò di cui abbiamo bisogno per i modelli Pydantic, senza i quali gli oggetti del modello Sqlalchemy non verranno serializzati su JSON.

Connettiamo tutto all’interno di main.py

            import uvicorn
from fastapi import FastAPI

#--------------- added code ------------------------#
import os
from fastapi_sqlalchemy import DBSessionMiddleware
from fastapi_sqlalchemy import db
from models import User as ModelUser
from schema import User as SchemaUser
from dotenv import load_dotenv

BASE_DIR = os.path.dirname(os.path.abspath(__file__))
load_dotenv(os.path.join(BASE_DIR, ".env"))
#---------------------------------------------------#

app = FastAPI()
#--------------- added code ------------------------#
app.add_middleware(DBSessionMiddleware, 
                    db_url=os.environ["DATABASE_URL"])
#---------------------------------------------------#
#--------------- modified code ---------------------#
@app.post("/user/", response_model=SchemaUser)
def create_user(user: SchemaUser):
    db_user = ModelUser(first_name=user.first_name, 
                        last_name=user.last_name, 
                        age=user.age
    )
    db.session.add(db_user)
    db.session.commit()
    return db_user
#---------------------------------------------------#

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)
        

Fantastico, eseguiamo di nuovo docker-compose up

Ora andiamo a \docs e invochiamo l’endpoint Create User

articoli - pgadmin localhost 5a

Possiamo quindi controllare su pgadmin se ha funzionato correttamente.

Colleghiamoci a localhost:5050

articoli - pgadmin localhost 6a

Spero che questo tutorial sia stato abbastanza completo su come utilizzare FastAPI con PostgreSQL, SQLAlchemy e Alembic.

Il codice completo di questo articolo è disponibile su github.

Nella Parte 2 discuteremo come lavorare con i database in modo asincrono.