Creare un’API di dati azionari utilizzando il Web Scraping e FastAPI

scienzadeidati articoli - Creare API di dati finanziari utilizzando Web Scraping e FastAPI

Per la stragrande maggioranza delle API REST Python che ho creato, ho usato Flask o Django. Questi due framework hanno costituito le fondamenta delle API REST di Python da ormai molti anni. Di recente, tuttavia, sono emersi alcuni nuovi strumenti. Uno di questi framework ha suscitato un notevole clamore: FastAPI. In questo articolo esploreremo  alcune funzionalità di questo strumento.


Il progetto

La prima cosa da fare è elaborare un nuovo progetto. Volevo trovare un progetto che fosse interessante ma semplice, un progetto che mi permettesse di sperimentare il flusso di lavoro di FastAPI evitando dettagli confusi. Dopo averci pensato un po’, ho deciso di creare un’API REST per la finanza.

Questo progetto sarà composto da due parti:

  • Un web scraper per ottenere dati finanziari e,
  • Un’API REST che offre l’accesso ai dati.

Pertanto, ho diviso questo articolo in due. Sentiti libero di saltare intorno alle parti che trovi più interessanti.


Il setup

Prima di iniziare, dovremmo occuparci della parte più divertente dell’avvio di un nuovo progetto: la creazione del nostro ambiente. Sì, che divertimento!

Useremo alcune librerie, quindi consiglio di creare un nuovo ambiente virtuale.
Ecco un elenco di dipendenze che utilizzeremo:

  • FastAPI (ovviamente è nel titolo!)
  • Uvicorn
  • Requests
  • BeautifulSoup4

Possiamo eseguire un comando per installarli tutti in una volta:

            pip install fastapi uvicorn requests beautifulsoup4
        

Per fortuna, questo è sufficiente per la configurazione. Immergiamoci subito nel codice!

Parte I: Il Web Scraper

Quando costruiamo un web scraper, dobbiamo prima di tutto rispondere a tre domande: dove , cosa e come.

Dove?

Esistono diverse risorse online dove è possibile trovare dati finanziari. Uno di questi è Yahoo Finance. Questo sito Web ha una pletora di dati finanziari organizzati per azienda e quindi per categoria. Useremo Yahoo Finance come obiettivo del nostro scraping.

Cosa?

Per ogni società quotata su Yahoo Finance, esiste una sezione riepilogativa. Questa sezione è ciò che utilizzeremo per ottenere i nostri dati.

Come?

Rispondere ai quesiti “dove” e “cosa” è piuttosto semplice, ma rispondere alla domanda “come” richiede un po’ di lavoro. Affronteremo questa domanda nelle seguenti sezioni.

Trovare il selettore CSS di un elemento.

Se apriamo Yahoo Finance e scegliamo un titolo, apparirà la pagina di riepilogo predefinita. Vogliamo raschiare il prezzo corrente del titolo e tutto nella tabella riepilogativa (tutto da “Chiusura precedente” a “Stima target 1A”).

Per lo scraping di questi dati, dobbiamo trovare il selettore CSS di ciascun elemento. Possiamo farlo tramite:

  • Aprire gli strumenti di sviluppo di Chrome premendo F12.
  • Premere Ctrl + Maiusc + C per abilitare il selettore di elementi.
  • Evidenziare e fare clic su un elemento. Questo evidenzierà l’HTML dell’elemento nel pannello Elementi.
  • Fare clic con il pulsante destro del mouse sull’HTML nel pannello Elementi e selezionare Copia > Copia selettore.

Per ciascuno dei nostri elementi desiderati, ripetiamo i passaggi 3 e 4.

Il file ‘scrape.json’

Una volta che abbiamo determinato i selettori CSS per ogni elemento, dobbiamo posizionarli in un posto utile. Per questo, creeremo un file chiamato scrape.json.

Alla fine, vogliamo archiviare i nostri dati in un dizionario Python. Per fare ciò, dobbiamo mappare le chiavi sui valori. In altre parole, dobbiamo sapere quali dati sono associati a quale chiave. scrape.json implementa questa mappatura.

La struttura di scrape.json è la seguente:

            {
    "elements": [
        {
            "from": "",
            "to": ""
        },
        {
            "from": "",
            "to": ""
        },
        {
            "from": "",
            "to": ""
        }
    ]
}
        

Ogni oggetto JSON in elements ha due attributi: from e to.

  • from è il selettore CSS di un elemento di cui vogliamo i dati.
  • to è la chiave con cui verranno archiviati i dati di questo elemento.

Di seguito il scrape.json compilato:

            {
    "elements": [
        {
            "from": "[class='Trsdu\\(0\\.3s\\) Fw\\(b\\) Fz\\(36px\\) Mb\\(-4px\\) D\\(ib\\)']",
            "to": "price"
        },
        {
            "from": "[data-test='PREV_CLOSE-value'] [class]",
            "to": "prev_close"
        },
        {
            "from": "[data-test='OPEN-value'] [class]",
            "to": "open"
        },
        {
            "from": "[data-test='BID-value'] [class]",
            "to": "bid"
        },
        {
            "from": "[data-test='ASK-value'] [class]",
            "to": "ask"
        },
        {
            "from": "[data-test='DAYS_RANGE-value']",
            "to": "days_range"
        },
        {
            "from": "[data-test='FIFTY_TWO_WK_RANGE-value']",
            "to": "52_week_range"
        },
        {
            "from": "[data-test='TD_VOLUME-value'] [class]",
            "to": "volume"
        },
        {
            "from": "[data-test='AVERAGE_VOLUME_3MONTH-value'] [class]",
            "to": "avg_volume"
        },
        {
            "from": "[data-test='MARKET_CAP-value'] [class]",
            "to": "market_cap"
        },
        {
            "from": "[data-test='BETA_5Y-value'] [class]",
            "to": "beta"
        },
        {
            "from": "[data-test='PE_RATIO-value'] [class]",
            "to": "pe"
        },
        {
            "from": "[data-test='EPS_RATIO-value'] [class]",
            "to": "eps"
        },
        {
            "from": "[data-test='EARNINGS_DATE-value']",
            "to": "earnings_date"
        },
        {
            "from": "[data-test='DIVIDEND_AND_YIELD-value']",
            "to": "dividend_and_yield"
        },
        {
            "from": "[data-test='EX_DIVIDEND_DATE-value'] span",
            "to": "ex_dividend_yield"
        },
        {
            "from": "[data-test='ONE_YEAR_TARGET_PRICE-value'] [class]",
            "to": "target_est"
        }
    ]
}
        

Codificare lo scraping.

Dopo aver compilato il file scrape.json con le nostre mappature, è il momento di codificare lo scraper.
Ci sono quattro passaggi che il nostro scraper deve compiere:

  • Creare un URL che punti alla pagina web di Yahoo Finance del titolo
  • Scaricare la pagina web
  • Analizzare la pagina web
  • Estrarre e mappare i dati desiderati (Sì, so cosa stai pensando, tecnicamente sono due passaggi. Non possiamo essere tutti perfetti.)

Diamo un’occhiata al codice e poi lo esaminiamo sezione per sezione.

            import requests
from bs4 import BeautifulSoup

class Scrape():

    def __init__(self, symbol, elements):
        url = "https://finance.yahoo.com/quote/" + symbol

        r = requests.get(url)
        if(r.url != url): # redirect occurred; likely symbol doesn't exist or cannot be found.
            raise requests.TooManyRedirects()

        r.raise_for_status()
        
        self.soup = BeautifulSoup(r.text, "html.parser")

        self.__summary = {}

        for el in elements["elements"]:
            tag = self.soup.select_one(el["from"])

            if tag != None:
                self.__summary[el["to"]] = tag.get_text()

    def summary(self):
        return self.__summary
        

Nella riga 7 (fase 1) si costruisce un URL.

Nella riga 9 (fase 2) sta scaricando la pagina web del titolo azionario.

Nelle righe 10-13 si controlla se si sono verificati errori o reindirizzamenti durante la fase 2. Se si è verificato un reindirizzamento, significa che il simbolo di borsa non esiste o non è corretto; trattiamo questo scenario come un errore.

Nella riga 15 (fase 3) si analizza la pagina web.

Nelle righe 17-23 (fase 4) è dove avviene la magia. Qui, le mappature in scrape.json sono usate per costruire un dizionario, __summary. Ogni elemento è selezionato con self.soup.select_one(el["from"]). Ai dati dell’elemento (testo) viene quindi assegnata una chiave con self.__summary[el["to"]] = tag.get_text().

Da buoni programmatori che siamo, dovremmo testare il nostro codice prima di continuare. Creiamo main.py e aggiungiamo il seguente codice di test:

            from Scrape import Scrape
import json

elements_to_scrape = {}

f = open("scrape.json")
data = f.read()
f.close()
elements_to_scrape = json.loads(data)

s = Scrape("AAPL", elements_to_scrape)
print(s.summary())
        

Con un po’ di fortuna, dovremmo vedere un dizionario Python popolato con la stampa dei dati sullo schermo. Questo è tutto per il web scraper! Ora possiamo cambiare marcia e iniziare a lavorare sulla nostra API REST.

 

Parte II: L’API REST

L’idea di base della nostra API REST sarà la seguente:

  • La nostra API REST avrà un endpoint, che prenderà un simbolo di borsa come input.
  • Il nostro scaper utilizzerà il simbolo per raccogliere i dati di riepilogo finanziario del titolo.
  • L’endpoint restituirà i dati di riepilogo in formato JSON.

Ora che abbiamo l’idea di base, iniziamo a programmare.

 

Configurazione FastAPI

FastAPI è facile da eseguire. Aggiungiamo il seguente codice a main.py:

            from fastapi import FastAPI

app = FastAPI()

@app.get("/root")
def root():
    return "Hello World!"
        

Per avviare FastAPI, possiamo emettere il seguente comando:

uvicorn main:app --reload

Analizziamo questo comando:

  • main: si riferisce al nostro file main.py.
  • app: si riferisce all’app FastAPI creata con main.py la linea app = FastAPI().
  • --reload: indica al server di riavviarsi quando ha rilevato modifiche al codice.

A parte: uvicorn è un’interfaccia ASGI (Asynchronous Server Gateway Interface) veloce che FastAPI utilizza per l’esecuzione. Non approfondiremo l’uvicorn, ma se sei interessato a saperne di più, puoi controllare il loro sito web .

Ora che abbiamo avviato il server, possiamo andare su http://127.0.0.1:8000/docs. Qui troveremo la documentazione dell’API interattiva automatica. Possiamo utilizzare questo strumento per verificare la corretta funzionalità dei nostri endpoint. Poiché al momento abbiamo solo l’endpoint root, non c’è molto da testare. Iniziamo con le modifiche.

 

Definizione di un endpoint

Il passaggio successivo consiste nell’aggiungere l’endpoint di riepilogo. Come accennato in precedenza, prenderà un simbolo azionario e restituirà il foglio di riepilogo finanziario di quel titolo.

Aggiungiamo questo codice in main.py

            @app.get("/v1/{symbol}/summary/")
def summary(symbol):
    summary_data = {}
    try:
        s = Scrape(symbol, elements_to_scrape)
        summary_data = s.summary()
        
    except TooManyRedirects:
        raise HTTPException(status_code=404, 
              detail="{symbol} doesn't exist or cannot be found.")
    except HTTPError:
        raise HTTPException(status_code=500, 
              detail="An error has occurred while processing the request.")

    return summary_data
        

Qui, definiamo un endpoint in /v1/{symbol}/summary/. Si usa {symbol} per denotare un parametro di percorso. Questo parametro viene passato al nostro metodo endpoint e quindi nella nostra istanza dell’oggetto Scrape, s. Assegniamo quindi a summary_datail il risultato di s.summary(). Se non si verificano eccezioni, si restituisce summary_data.

NOTA: il tipo di contenuto predefinito di FastAPI è application/JSON. Poiché s.summary() restituisce un oggetto dizionario, FastAPI lo converte automaticamente in JSON. Quindi, non è necessario eseguire questa conversione manualmente.

 

FastAPI ci consente di definire un metodo di start-up, che ovviamente verrà eseguito all’avvio del nostro server. Quindi si trasferisce il nostro  codice di caricamento scrape.json, da testing, all’interno di questo metodo.

            @app.on_event("startup")
def startup():
    f = open("scrape.json")
    data = f.read()
    f.close()
    global elements_to_scrape
    elements_to_scrape = json.loads(data)
        

È buona norma inserire metodi più lenti e utilizzabili una tantum nella funzione di avvio. In questo modo, evitiamo di rallentare i nostri tempi di risposta eseguendo codice non necessario durante una richiesta.

A questo punto, dovremmo avere un API REST funzionante. Possiamo testarlo tornando allo strumento endpoint e digitando un titolo.

            {
  "price": "135.37",
  "prev_close": "135.13",
  "open": "134.35",
  "bid": "135.40 x 1000",
  "ask": "135.47 x 1300",
  "days_range": "133.69 - 135.51",
  "52_week_range": "53.15 - 145.09",
  "volume": "60,145,130",
  "avg_volume": "102,827,562",
  "market_cap": "2.273T",
  "beta": "1.27",
  "pe": "36.72",
  "eps": "3.69",
  "earnings_date": "Apr 28, 2021 - May 03, 2021",
  "dividend_and_yield": "0.82 (0.61%)",
  "ex_dividend_yield": "Feb 05, 2021",
  "target_est": "151.75"
}
        

Filtraggio

Tecnicamente, abbiamo realizzato tutto ciò che ci eravamo prefissati di fare. Tuttavia, c’è un punto di attenzione. Ogni volta che richiediamo un riepilogo delle scorte, viene restituito l’intero foglio di riepilogo. E se volessimo solo pochi punti dati selezionati? Sembra un terribile spreco richiedere un intero oggetto riassuntivo quando sono necessari solo pochi punti dati. Per risolvere questo problema, dobbiamo implementare un filtro. Fortunatamente, FastAPI offre una soluzione: le query.

Per aggiungere il filtro, è necessario modificare il metodo summary dell’endpoint per gestire le query. Ecco come farlo:

            @app.get("/v1/{symbol}/summary/")
def summary(symbol, q: Optional[List[str]] = Query(None)):
    summary_data = {}
    try:
        s = Scrape(symbol, elements_to_scrape)
        summary_data = s.summary()
        if(q != None):
            # if all query parameters are keys in summary_data
            if all (k in summary_data for k in q): 
                # summary_data keeps requested key-value pairs
                summary_data = {key: summary_data[key] for key in q}
            # else, return whole summary_data

    except TooManyRedirects:
        raise HTTPException(status_code=404, detail="{symbol} doesn't exist or cannot be found.")
    except HTTPError:
        raise HTTPException(status_code=500, detail="An error has occurred while processing the request.")

    return summary_data
        

l parametro q: Optional[List[str]] = Query(None) dice a FastAPI che dovrebbe aspettarsi, facoltativamente, come query una List di String.

Le righe 8 e 9 controllano le stringhe di query rispetto a tutte le chiavi nel dizionario summary_dat. Se anche i parametri della query sono chiavi, manteniamo solo quelle coppie chiave-valore. In caso contrario, se uno dei parametri della query non è una chiave, viene restituito l’intero oggetto.

NOTA: per fare pratica, è possibile modificare la funzionalità del metodo precedente per restituire solo le coppie chiave-valore dei parametri di query valide. Puoi anche scegliere di sollevare un’eccezione se un parametro di query non corrisponde a nessuna delle chiavi.

Testiamo il nostro nuovo codice interrogando solo price e open.

            {
  "price": "135.37",
  "open": "134.35"
}
        

Conclusione

In questo articolo abbiamo creato un web scraper per recuperare i dati finanziari da Yahoo Finance durante questo articolo e abbiamo creato un API REST per servire questi dati. È dannatamente buono, e dovremmo essere orgogliosi di noi stessi.

Comunque per oggi è tutto. Come sempre, tutto il codice può essere trovato sul mio GitHub.

Grazie per aver letto. Spero che tu abbia imparato qualcosa e ti sia divertito!
Ci vediamo tutti la prossima volta.

 

Riferimenti

[1] Documentazione Beautiful Soup – https://www.crummy.com/software/BeautifulSoup/bs4/doc/

[2] FastAPI – https://fastapi.tiangolo.com/

[3] Requests: HTTP for HumansTM – https://2.python-requests.org/en/master/

[4] Uvicorn – https://www.uvicorn.org/

[5] Yahoo Finance – https://finance.yahoo.com/

Realizzazione di un’applicazione di web scraping con Python, Celery e Django

scienzadeidati articoli - Realizzazione applicazione di web scraping con Python Celery e Django

In questa è la 3°parte del tutorial relativo alla creazione di uno strumento di web scraping con Python, descriviamo come integrare il web scraper schedulato all’interno di un’applicazione web Django.

La 1° parte, Creazione di uno scraper di feed RSS con Python, illustra come utilizzare Requests e Beautiful Soup.

La 2° parte, Web scraping automatizzato con Python e Celery, descrive come pianificare le attività di scraping web con Celery, una coda di task.

Il codice di questo articolo è disponibile pubblicamente su GitHub.

Requisiti

In precedenza, abbiamo creato un semplice lettore di feed RSS che raccoglie informazioni da HackerNews utilizzando Requests e BeautifulSoup. Dopo aver creato lo script di scraping di base, abbiamo descritto come integrare Celery nell’applicazione per fungere da sistema di gestione delle attività. Usando Celery, siamo stati in grado di pianificare le attività di scraping in modo che siano effettuati periodicamente ad intervalli fissi: questo permette di eseguire lo script senza interazione umana.

Il prossimo passo è raggruppare le attività di scraping pianificate in un’applicazione Web utilizzando Django. In questo modo possiamo accedere a un database, visualizzare i dati su un sito Web e terminare la creazione di un’app di “scraping”. L’obiettivo di questo progetto è creare qualcosa di scalabile, simile a un aggregatore.

Questo articolo non è una guida dettagliata al web framework Django. E’ invece orientato verso un approccio “Hello World”, seguito dalla visualizzazione di contenuti acquisiti dall’app web.

Per raggiunge questo obiettivo dobbiamo utilizzare i seguenti strumenti:

  • Python 3.7+
  • Requests – per le richieste web
  • BeautifulSoup 4 – Strumento per il parsing  HTML
  • Un editor di testo (PyCharm o Visual Studio Code)
  • Celery – Coda asincrona di attività con messaggi distribuiti
  • RabbitMQ – Un broker di messaggi
  • lxml – Se si usa un ambiente virtuale
  • Django – Un framework  web con Python
  • Pipenv – Un pacchetto per gestire ambienti virtuali

Nota: tutte le dipendenze dell’applicazione sono elencate nel file Pipfile/ Pipfile.lock.

Obiettivi

Si vuole realizzare un’applicazione Web che utilizza un sistema di gestione dei task per raccogliere i dati e memorizzarli nel database.

articoli - applicazione scraping Celery Django
Quanto sopra mostra come un’ applicazione Django prevede l’invio di task sistema di code, che li esegue e salva gli eventi nel database. Mentre l’applicazione Django è in esecuzione, non è richiesto di eseguire manualment nessuna attività di web scraping. Di seguito l’elenco dei passaggi previsti per realizzare l’applicazione desiderata:
  1. Installare Django, il framework Python usato per creare la base dell’applicazione web
  2. Creare un progetto Django e avviare il server
  3. Generare l’app scraping per  acqusisire i dati
  4. Configurare celery.py e tasks.py per effettuare l’estrazione dei dati
  5. Integrare i dati con la view HomePage di Django
Nota: se sei a tuo agio con Django, vai al passaggio 4.

Inizializzazione

Per iniziare dobbiamo creare un ambiente virtuale per il progetto Django, e quindi creare lo starter. Questo codice è disponibile su GitHub di Scienzadeidati.com.

Il file Piplock specifca tutti i requisiti del progetto, in questo modo l’ambiente virtuale verrà avviato con tutti i pacchetti necessari.

				
					$ mkdir django_celery_web_scraping && cd django_celery_web_scraping
$ pipenv install requests bs4 lxml django celery
				
			

Inoltre, è necessario assicurarsi che RabbitMQ sia installato, come descritto nel precedente articolo.

Nota: in questo articolo stiamo usando Ubuntu, quindi i comandi potrebbero differire a seconda del sistema operativo. Inoltre, per brevità, abbiamo omesso il codice che non ha subito modifiche, usando ….

Creare un progetto Django e avviare il server

Il primo passo per la configurazione del progetto,  dobbiamo creare un’istanza di una shell pipenv, e quindi creere un progetto Django. Successivamente, dobbiamo iniziare la creazione dell’applicazione Django ed effettuare le generiche impostazioni.

				
					# django_web_scraping

$ pipenv shell
$ django-admin startproject django_web_scraping .
$ python manage.py createsuperuser
$ python manage.py makemigrations
$ python manage.py migrate
				
			

Tramite alcuni dei comandi precedenti, creiamo un’istanza della shell dell’ambiente virtuale per eseguire i comandi Django. Il comando startproject crea l’applicazione iniziale all’interno della directory che stiamo utilizzando . e quindi si eseguono gli altri comandi: createsuperuser, makemigrations, migrate.

E’ ora possibile avviare il server per mostrare che siamo operativi.

Nota: assicuriamoci che questi comandi siano eseguiti in una  shell <code>pipenv</code>.

				
					$ python manage.py runserver
				
			
Collegandosi all’indirizzo localhost:8000 possiamo vedere che il server è avviato e funzionante.
articoli - applicazione scraping Celery Django 1
Ora dobbiamo creare un URL in urls.py dove specificare la view della homepage.
				
					# urls.py 

from django.contrib import admin
from django.urls import path, include

from .views import HomePageView # new

urlpatterns = [
    path('', HomePageView.as_view(), name='home'), # homepage
    path('admin/', admin.site.urls),
]
				
			

Quanto sopra è una vista generica importata dal file <code>views.py</code> che  dobbiamo creare nella directory principale del progetto

				
					# django_web_scraping/views.py

from django.shortcuts import render
from django.views import generic

# Create your views here.

class HomePageView(generic.ListView):
    template_name = 'home.html'
    
				
			
Successivamente, dobbiamo creare la directory dei modelli, il modello HTML di base e il modello della home page
				
					$ mkdir templates && touch templates/base.html && touch templates/home.html
				
			
Per registrare i file modello, dobbiamo aggiungere la directory templates alle impostazioni di Django:
				
					# settings.py
TEMPLATES = [
    ...
    'DIRS': ['templates'], # new
    ...
]
    
				
			
Ora aggiungiamo un semplice codice HTML per il template base
				
					# base.html

{% load static %}

<!DOCTYPE html>
<html>
    <head>
        <meta charset="utf-8">
            <title>{% block title %}Django Web Scraping Example  
                   {% endblock title %}</title>
    </head>
    <body>
        <div class="container">
            {% block content %}
            {% endblock content %}
        </div>
    </body>
</html>
				
			
I contenuti di tutte le pagine dell’applicazione web sono inseriti nei contenitori contrassegnati con {% block %} del modello base.html.
				
					# home.html

{% extends 'base.html' %}

{% block content %}
Hello World
{% endblock content %}
				
			
Dopo aver definito i modelli, l’esempio “Hello World” è completato.

Genera l'app scraping per raccogliere i dati

In questao paragrafano descriviamo come creare l’applicazione di scraping e il modello di dati. Questo è integrato nel file settings.py e i suoi dati sono passati all’applicazione principale HomePageView.
				
					$ python manage.py startapp scraping
				
			
Registriamo l’applicazione all’interno delle impostazioni.
				
					# settings.py

INSTALLED_APPS [
    ...
    'scraping.apps.ScrapingConfig', # new
]
				
			
Quini dobbiamo creare il modello in cui salvara i dati, fortunatamente la struttura dati del feed RSS ha pochissimi campi.
				
					# models.py

from django.db import models

# Create your models here.
class News(models.Model):
    title = models.CharField(max_length=200)
    link = models.CharField(max_length=2083, default="", unique=True)
    published = models.DateTimeField()
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    source = models.CharField(max_length=30, default="", blank=True, null=True)
    
				
			

I campi del modello News hanno i seguenti significati:

  • title – Dati RSS strutturati
  • link – Il link dell’articolo
  • published – La data in cui l’articolo è stato pubblicato su HackerNews
  • created_at – La data di immissione dei dati, “now” per impostazione predefinita
  • updated_at – La data dell’ultimo aggiornamento dei dati
  • source – HackerNews (o qualsiasi altro sito che scegliamo di analizzare)

Dopo aver creato il modello, l’applicazione Django non viene caricata perché mancano le migrazioni (ovvero la creazione delle tabelle).

				
					$ python manage.py makemigrations
$ python manage.py migrate
				
			

Nota: non prevediamo nessun URL per questa app, poiché stiamo solo inviando i dati all’applicazione principale.

Configurazione del file celery.py

I passaggi precedenti in questo articolo hanno descritto le basi per costruire il progetto, vediamo ora  come integrare Celery e gli stessi tasks
Questa sezione si basa sul codice descritto negli articoli precedenti. Iniziamo con un file celery.py per l’applicazione Celery, quindi aggiungiamo i task dal codice base dell’articolo Web scraping automatizzato con Python e Celery

				
					$ touch django_web_scraping/celery.py
				
			

La configurazione di cui sopra deve essere posizionata all’interno della directory principale del progetto e fungerà da file di “impostazioni” per la coda dei task.

				
					# celery.py

import os
from celery import Celery
from celery.schedules import crontab # scheduler

# default django settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE','django_web_scraping.settings')
app = Celery('django_web_scraping')
app.conf.timezone = 'UTC'
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
				
			

Queste sono le impostazioni predefinite dalla documentazione di Celery, e prevedono che l’applicazione Celery utilizzi il modulo settings ed individuare automaticamente i task.

La seconda configurazione fondamentale prima di creare i task è specificare il file settings.py per il broker di messaggi (RabbitMQ) e Celery.

				
					# settings.py

# celery
CELERY_BROKER_URL = 'amqp://localhost:5672'
CELERY_RESULT_BACKEND = 'amqp://localhost:5672'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'
				
			

Includere task.py

I task definiti in tasks.py sono simili a quelli descritti nel precedente articolo. Le principali modifiche sono:

  • La funzione di salvataggio
  • Come richiamiamo gli oggetti

Anziché salvare i dati dello scraping nei file .txt, prevediamo di memorizzarli come voci nel database predefinito (SQLite).

Iniziamo con la funzione di scraping, per descrivere come  i dati sono estratti. Il seguente blocco di codice mostra l’intero task condiviso, con importazioni specifiche per questo task.

				
					# scraping/tasks.py

# scraping
import requests
from bs4 import BeautifulSoup
import json
from datetime import datetime
import lxml

# scraping function
@shared_task
def hackernews_rss():
    article_list = []
    try:
        print('Starting the scraping tool')
        # execute my request, parse the data using XML
        # parser in BS4
        r = requests.get('https://news.ycombinator.com/rss')
        soup = BeautifulSoup(r.content, features='xml')
        # select only the "items" I want from the data
        articles = soup.findAll('item')
    
        # for each "item" I want, parse it into a list
        for a in articles:
            title = a.find('title').text
            link = a.find('link').text
            published_wrong = a.find('pubDate').text
            published = datetime.strptime(published_wrong, '%a, %d %b %Y %H:%M:%S %z')
            # print(published, published_wrong) # checking correct date format
            # create an "article" object with the data
            # from each "item"
            article = {
                'title': title,
                'link': link,
                'published': published,
                'source': 'HackerNews RSS'
            }
            # append my "article_list" with each "article" object
            article_list.append(article)
            print('Finished scraping the articles')
    
            # after the loop, dump my saved objects into a .txt file
            return save_function(article_list)
    except Exception as e:
        print('The scraping job failed. See exception:')
        print(e)
				
			
I codice di cui sopra prevede di:
  • Inviare una richiesta al feed RSS di HackerNews, ottenere gli elementi elencati, e quindi restituire i dati XML.
  • Separare i dati XML in “elementi” utilizzando soup.findAll('item'), e quindi analizzare i dati utilizzando la libreria LXML.
  • Pulire i dati in formato JSON, prestando particolare attenzione al formato della data estratta da item per ogni articolo. Questo è importante per salvare gli articoli nel database.
  • Assicurarsi che le date siano in un formato accettato dal database.
  • Aggiungere l’articolo a un elenco di elementi.
  • Chiamare la save_function() con l’elenco di articoli come parametro.
Se l’attività di scraping ha esito negativo, gestiamo le informazioni da restituire tramite la funzione Exception. Successivamente, iniziamo a esaminare il metodo save_function() che è stato implementato nell’articolo precedente. Questo è stato adattato per utilizzare il modello News che è stato creato all’interno dell’applicazione di scraping.
				
					# scraping/tasks.py

@shared_task(serializer='json')
def save_function(article_list):
    print('starting')
    new_count = 0

    for article in article_list:
        try:
            News.objects.create(
                title = article['title'],
                link = article['link'],
                published = article['published'],
                source = article['source']
            )
            new_count += 1
        except Exception as e:
            print('failed at latest_article is none')
            print(e)
            break
    return print('finished')
				
			
La funzione save_function() utilizza il parametro article_list passato dalla funzione di scraping, e salva ogni oggetto article nel database. Nel repository Github abbiamo previsto una versione aggiornata della funzione save_function() che recupera il più recente articolo di HackerNews salvato nel database ed interrompe l’elaborazione.

Invio dei dati alla HomePageView

Dopo aver creato celery.py e tasks.py, siamo in grado di integrare i dati in HomePageView per mostrarli sull’applicazione web. Per iniziare, apriamo views.py presente nella root del progetto, quindi aggiungiamo il modello News al suo interno. Questo consente di chiamare gli oggetti tag article all’interno dei modelli Django.
				
					# django_web_scraping/views.py

from scraping.models import News # bring News into the views

class HomePageView(generic.ListView):
    template_name = 'home.html'
    context_object_name = 'articles' 
    # assign "News" object list to the object "articles"
    # pass news objects as queryset for listview
    def get_queryset(self):
        return News.objects.all()
				
			

Avvio e test dell'applicazione

Dopo avere aggiornato la HomePageView, il progetto è pronto per essere lanciato e testato. In modo analogo alla Parte 1 e alla Parte 2 di questa serie, dobbiamo usare più finestre di terminale.

Per avviare il progetto abbiamo bisogno di:

  • Avviare il servizio broker RabbitMQ.
  • Avviare il server Django.
  • Abilitare i task di Celery.

I passaggi precedenti richiedono più terminali, come descritto di seguito.

Terminale #1 – RabbitMQ

Innanzitutto, verifichiamo che non ci sia in esecuzione un’istanza di RabbitMQ.

Nota: utilizziamo sudo perché l’installazione predefinita non ha concesso le autorizzazioni appropriate.

				
					$ sudo rabbitmqctl shutdown
$ sudo rabbitmq-server start # start server
				
			

Terminale #2  – Django

Django è facile da avviare, iniziamo solo con il comando runserver. Usando Pipenv, eseguiamo il comando nella shell
				
					$ pipenv shell
$ python manage.py runserver
				
			
articoli - applicazione scraping Celery Django 3a

Terminale #3 – Celery

Ora che il progetto è in esecuzione,, possiamo abilitare i task di Celery

				
					$ celery -A django_web_scraping worker -B -l INFO
				
			
articoli - applicazione scraping Celery Django 4

Una volta che i servizi di cui sopra sono stati avviati, siamo in grado di controllare l’output dello scraping sulla homepage (raggiungibile all’indirizzo 127.0.0.1:8000).

Nella homepage sono visualizzati, in forma tabellare, i dati acquisiti dallo scraping e restituiti dai task di Celery che abbiamo creato. Se osserviamo l’output dei task, vediamo che stanno fallendo perché i dati non soddisfano il vincolo univoco (ad esempio, è un duplicato e non ci sono nuovi post).
Una modifica futura può essere prevedere l’esecuzione  di tasks.py a intervalli maggiori, perché il feed RSS probabilmente non avrà molti aggiornamenti ad intervalli di un minuto.

Conclusione

Abbiamo integrato con successo Django, Celery, RabbitMQ e le librerie di web scraping di Python per creare un lettore di feed RSS. Questo tutorial ha fornito una panoramica sull’aggregazione dei dati nella forma di applicazione web, simile a popolari siti (come Feedly).

 

Possibili sviluppi futuri

  • Aggregare altri siti Web o feed di notizie
  • Modificare save_function() in modo da evitare di salvare ogni singolo oggetto ad ogni scraping (meno chiamate al database!!).
  • Creare un proprio feed RSS, con i dati aggregati.

Creazione di Microservizi con Python e FastAPI

scienzadeidati articoli - Creazione di Microservizi con Python e FastAPI

Come sviluppatore Python potresti aver sentito parlare del termine microservizi e desideri creare da solo un microservizio Python. I microservice sono un’ottima architettura per la creazione di applicazioni altamente scalabili. Prima di iniziare a creare l’applicazione usando i microservice, è necessario conoscere i vantaggi e gli svantaggi dell’uso dei microservizi. In questo articolo imparerai i vantaggi e gli svantaggi dell’utilizzo dei microservice. Imparerai anche come creare il tuo microservice e distribuirlo utilizzando Docker Compose.

In questo tutorial vedremo:

  • Quali sono i vantaggi e gli svantaggi dei microservizi.
  • Perché dovresti creare microservice con Python.
  • Come creare API REST utilizzando FastAPI e PostgreSQL.
  • Come creare microservice utilizzando FastAPI.
  • Come eseguire i microservice usando docker-compose.
  • Come gestire i microservice utilizzando Nginx.

Prima creeremo una semplice API REST usando FastAPI e poi utilizzeremo PostgreSQL come nostro database. Estenderemo quindi la stessa applicazione a un microservizio.

 

Introduzione ai microservice

Il microservice è un approccio per suddividere grandi applicazioni monolitiche in singole applicazioni specializzate in uno specifico servizio/funzionalità. Questo approccio è spesso noto come architettura orientata ai servizi o SOA.

Nell’architettura monolitica, ogni logica di business risiede nella stessa applicazione. I servizi dell’applicazione come la gestione degli utenti, l’autenticazione e altre funzionalità utilizzano lo stesso database.

In un’architettura di microservice, l’applicazione è suddivisa in diversi servizi separati che vengono eseguiti in processi separati. Esiste un database diverso per le diverse funzionalità dell’applicazione e i servizi comunicano tra loro utilizzando HTTP, AMQP o un protocollo binario come TCP, a seconda della natura di ciascun servizio. La comunicazione tra servizi può essere eseguita anche utilizzando le code di messaggi come RabbitMQ, Kafka o Redis.

 

Vantaggi dei microservice

L’architettura a microservizi offre molti vantaggi. Alcuni di questi vantaggi sono:

  • Un’applicazione debolmente accoppiata significa che i diversi servizi possono essere costruiti utilizzando le tecnologie più adatte alle singole funzioni e specificità. Quindi, il team di sviluppo non è vincolato alle scelte fatte durante l’avvio del progetto.
  • Poiché i servizi sono responsabili di funzionalità specifiche, la comprensione e il controllo dell’applicazione è più semplice e facile.
  • Anche il ridimensionamento dell’applicazione diventa più semplice perché se uno dei servizi richiede un utilizzo elevato della GPU, solo il server che contiene quel servizio deve avere una GPU elevata e gli altri possono essere eseguiti su un server normale.
 

Svantaggi dei microservice

L’architettura dei microservizi non è un proiettile d’argento che risolve tutti i tuoi problemi, ha anche i suoi svantaggi. Alcuni di questi inconvenienti sono:

  • Poiché diversi servizi utilizzano un diverso database, le transazioni che coinvolgono più di un servizio devono gestire la consistenza dei dati.
  • La suddivisione perfetta dei servizi è molto difficile da ottenere al primo tentativo e questo deve essere ripetuto prima di ottenere la migliore separazione possibile dei servizi.
  • Poiché i servizi comunicano tra loro attraverso l’uso dell’interazione di rete, ciò rende l’applicazione più lenta a causa della latenza della rete e del servizio.
 

Perché Microservice in Python?

Python è uno strumento perfetto per la creazione di microservizi perché questo linguaggio ha una semplice curva di apprendimento, tonnellate di librerie e pacchetti già implementati e una grande community di utilizzatori. Grazie all’introduzione della programmazione asincrona in Python, sono emersi framework web con prestazioni alla pari con GO e Node.js.

 

Introduzione a FastAPI

FastAPI è un moderno framework Web ad alte prestazioni, dotato di tantissime funzioni interessanti come la documentazione automatica basata su OpenAPI e la libreria di convalida e serializzazione integrata. In questo link puoi leggere l’elenco di tutte le fantastiche funzionalità presenti FastAPI.

 

Perché FastAPI

Alcuni dei motivi per cui penso che FastAPI sia un’ottima scelta per la creazione di microservizi in Python sono:

  • Documentazione automatica
    Supporto Async/Await
  • Convalida e serializzazione integrate
  • Tipo 100% annotato, quindi il completamento automatico funziona alla grande
 

Installazione FastAPI

Prima di installare FastAPI è necessario creare una nuova directory movie_service e creare un nuovo ambiente virtuale all’interno della directory appena creata usando virtualenv.

Se non l’hai già installato virtualenv:

            pip install virtualenv
        
Ora, si crea un nuovo ambiente virtuale.
            virtualenv env
        

Nei sistemi Mac/Linux si può attivare l’ambiente virtuale usando il comando:

            source ./env/bin/activate
        
Gli utenti Windows possono invece eseguire questo comando:
            .\env\Scripts\activate
        
Infine, siamo pronti per installare FastAPI eseguendo il seguente comando:
            pip install fastapi
        

Poiché FastAPI non viene fornito con un service integrato, è necessario installare uvicorn per eseguire il service. uvicorn è un server ASGI che ci consente di utilizzare le funzionalità async/await.

Per installare uvicorn si può usare il comando:

            pip install uvicorn
        

Creazione di un semplice REST API utilizzando FastAPI

Prima di iniziare a creare un microservice utilizzando FastAPI, impariamo le basi di FastAPI. Creiamo una nuova directory zcode>app e un nuovo file main.py all’interno della directory appena creata.

Aggiungiamo il seguente codice in main.py.

            #~/movie_service/app/main.py

from fastapi import FastAPI

app = FastAPI()


@app.get('/')
async def index():
    return {"Real": "Python"}
        

E’ necessario importare e creare un’istanza di FastAPI e quindi registrare l’endpoint radice / che restituisce un file JSON.

È possibile eseguire il server delle applicazioni utilizzando uvicorn app.main:app --reload. Qui app.main indica che si sta usando il file main.py all’interno della directory app e :app indica a FastAPI il nome della nostra istanza.

Possiamo accedere all’app da http://127.0.0.1:8000. Per accedere alla documentazione automatica bisogna andare su http://127.0.0.1:8000/docs. Possiamo giocare e interagire con l’API dal browser stesso.

Aggiungiamo alcune funzionalità CRUD alla nostra applicazione. Aggiorniamo il main.py in modo che assomigli a quanto segue:

            #~/movie_service/app/main.py

from fastapi import FastAPI
from pydantic import BaseModel
from typing import List

app = FastAPI()

fake_movie_db = [
    {
        'name': 'Star Wars: Episode IX - The Rise of Skywalker',
        'plot': 'The surviving members of the resistance face the First Order once again.',
        'genres': ['Action', 'Adventure', 'Fantasy'],
        'casts': ['Daisy Ridley', 'Adam Driver']
    }
]

class Movie(BaseModel):
    name: str
    plot: str
    genres: List[str]
    casts: List[str]


@app.get('/', response_model=List[Movie])
async def index():
    return fake_movie_db
        

Come puoi vedere hai creato una nuova classe Movie che estende la classe BaseModel di Pydantic.

Il modello Movie contiene il nome, la foto, il genere e il cast. Pydantic è integrato con FastAPI che rende la creazione di modelli e la convalida delle richieste un gioco da ragazzi.

Se andiamo alla pagina della documentazione possiamo vedere che sono descritti i campi del nostro modello nella sezione di risposta di esempio. Questo è possibile perché abbiamo definito il response_model nella definizione del percorso nel decoratore @app.get.

Ora aggiungiamo l’endpoint per aggiungere un film al nostro elenco di film.

Aggiungiamo una nuova definizione di endpoint per gestire la richiesta POST.

            @app.post('/', status_code=201)
async def add_movie(payload: Movie):
    movie = payload.dict()
    fake_movie_db.append(movie)
    return {'id': len(fake_movie_db) - 1}
        

Ora apriamo il browser e testiamo la nuova API. Proviamo ad aggiungere un film con un campo non valido o senza i campi obbligatori e verifichiamo che la convalida venga gestita automaticamente da FastAPI.

Aggiungiamo un nuovo endpoint per aggiornare il film.

            @app.put('/{id}')
async def update_movie(id: int, payload: Movie):
    movie = payload.dict()
    movies_length = len(fake_movie_db)
    if 0 <= id <= movies_length:
        fake_movie_db[id] = movie
        return None
    raise HTTPException(status_code=404, 
                        detail="Movie with given id not found")
        

Ecco l’indice id della nostra lista fake_movie_db.

Nota: ricorda di importare HTTPException da Fastapi

Ora possiamo anche aggiungere l’endpoint per eliminare il film.

            @app.delete('/{id}')
async def delete_movie(id: int):
    movies_length = len(fake_movie_db)
    if 0 <= id <= movies_length:
        del fake_movie_db[id]
        return None
    raise HTTPException(status_code=404, 
                        detail="Movie with given id not found")
        
Prima di andare avanti, strutturiamo la nostra app in un modo migliore. Creiamo una nuova cartella api all’interno di app e creiamo un nuovo file movies.py all’interno della cartella creata di recente. Spostiamo tutti i codici relativi alle route da main.py a movies.py. Quindi, movies.py dovrebbe essere simile al seguente:
            #~/movie-service/app/api/movies.py

from typing import List
from fastapi import Header, APIRouter

from app.api.models import Movie

fake_movie_db = [
    {
        'name': 'Star Wars: Episode IX - The Rise of Skywalker',
        'plot': 'The surviving members of the resistance face the First Order once again.',
        'genres': ['Action', 'Adventure', 'Fantasy'],
        'casts': ['Daisy Ridley', 'Adam Driver']
    }
]

movies = APIRouter()

@movies.get('/', response_model=List[Movie])
async def index():
    return fake_movie_db

@movies.post('/', status_code=201)
async def add_movie(payload: Movie):
    movie = payload.dict()
    fake_movie_db.append(movie)
    return {'id': len(fake_movie_db) - 1}

@movies.put('/{id}')
async def update_movie(id: int, payload: Movie):
    movie = payload.dict()
    movies_length = len(fake_movie_db)
    if 0 <= id <= movies_length:
        fake_movie_db[id] = movie
        return None
    raise HTTPException(status_code=404, detail="Movie with given id not found")

@movies.delete('/{id}')
async def delete_movie(id: int):
    movies_length = len(fake_movie_db)
    if 0 <= id <= movies_length:
        del fake_movie_db[id]
        return None
    raise HTTPException(status_code=404, detail="Movie with given id not found")
        

Qui abbiamo registrato un nuovo percorso API utilizzando APIRouter di FastAPI.

Inoltre, creiamo un nuovo file models.py all’interno di api in cui definiamo i nostri modelli Pydantic.

            #~/movie-service/api/models.py

from typing import List
from pydantic import BaseModel

class Movie(BaseModel):
    name: str
    plot: str
    genres: List[str]
    casts: List[str]

        
Ora dobbiamo indicare a FastAPI dove trovare i persorsi web. A tale scopo dobbiamo registrare all’interno di main.py il nuovo file con le “routes”.
            #~/movie-service/app/main.py

from fastapi import FastAPI

from app.api.movies import movies

app = FastAPI()

app.include_router(movies)
        
Infine, la struttura della directory della nostra applicazione è simile alla seguente:
            movie-service
├── app
│   ├── api
│   │   ├── models.py
│   │   ├── movies.py
│   |── main.py
└── env
        

Prima di proseguire è bene assicurarsi che l’applicazione funzioni correttamente.

 

Utilizzo del database PostgreSQL con FastAPI

In precedenza abbiamo usato lista di stringhe Python per simulare un elenco di film, ma ora siamo pronti per utilizzare un database reale a tale questo scopo. In particolare, in questa applicazione useremo PostgreSQL. Installaremo PostgreSQL, se non l’hai già fatto. Dopo aver installato PostgreSQL creeremo un nuovo database che chiameremo movie_db.

Utilizzeramo encode/database per connettersi al database utilizzando il supporto async e await.

Installa la libreria richiesta utilizzando:

            pip install 'databases[postgresql]'
        

questo comando installerà anche sqlalchemy e asyncpg, che sono necessari per lavorare con PostgreSQL.

Creiamo un nuovo file all’interno di api e lo chiamiamo db.py. Questo file conterrà il modello del database reale per il REST API.

            #~/movie-service/app/api/db.py

from sqlalchemy import (Column, Integer, MetaData, String, Table,
                        create_engine, ARRAY)

from databases import Database

DATABASE_URL = 'postgresql://movie_user:movie_password@localhost/movie_db'

engine = create_engine(DATABASE_URL)
metadata = MetaData()

movies = Table(
    'movies',
    metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String(50)),
    Column('plot', String(250)),
    Column('genres', ARRAY(String)),
    Column('casts', ARRAY(String))
)

database = Database(DATABASE_URL)
        

In particolare, DATABASE_URI è l’URL utilizzato per connettersi al database PostgreSQL. movie_user è il nome dell’utente del database, movie_password è la password dell’utente del database ed movie_db è il nome del database.

Proprio come in SQLAlchemy, abbiamo creato la tabella per il database dei film.

Aggiorniamo quindi main.py per connettersi al database. Il codice di main.py è il seguente:

            #~/movie-service/app/main.py

from fastapi import FastAPI
from app.api.movies import movies
from app.api.db import metadata, database, engine

metadata.create_all(engine)

app = FastAPI()

@app.on_event("startup")
async def startup():
    await database.connect()

@app.on_event("shutdown")
async def shutdown():
    await database.disconnect()
    
app.include_router(movies)
        
FastAPI fornisce alcuni gestori di eventi che è possibile utilizzare per connettersi al nostro database all’avvio dell’applicazione e disconnettersi alla chiusura. Aggiorniamo movies.py in modo che utilizzi un database invece di un falso elenco Python.
            #~/movie-service/app/api/movies.py

from typing import List
from fastapi import Header, APIRouter

from app.api.models import MovieIn, MovieOut
from app.api import db_manager

movies = APIRouter()

@movies.get('/', response_model=List[MovieOut])
async def index():
    return await db_manager.get_all_movies()

@movies.post('/', status_code=201)
async def add_movie(payload: MovieIn):
    movie_id = await db_manager.add_movie(payload)
    response = {
        'id': movie_id,
        **payload.dict()
    }

    return response

@movies.put('/{id}')
async def update_movie(id: int, payload: MovieIn):
    movie = payload.dict()
    fake_movie_db[id] = movie
    return None

@movies.put('/{id}')
async def update_movie(id: int, payload: MovieIn):
    movie = await db_manager.get_movie(id)
    if not movie:
        raise HTTPException(status_code=404, detail="Movie not found")

    update_data = payload.dict(exclude_unset=True)
    movie_in_db = MovieIn(**movie)

    updated_movie = movie_in_db.copy(update=update_data)

    return await db_manager.update_movie(id, updated_movie)

@movies.delete('/{id}')
async def delete_movie(id: int):
    movie = await db_manager.get_movie(id)
    if not movie:
        raise HTTPException(status_code=404, detail="Movie not found")
    return await db_manager.delete_movie(id)
        
Aggiungiamo db_manager.py per manipolare il nostro database.
            #~/movie-service/app/api/db_manager.py

from app.api.models import MovieIn, MovieOut, MovieUpdate
from app.api.db import movies, database

async def add_movie(payload: MovieIn):
    query = movies.insert().values(**payload.dict())
    return await database.execute(query=query)

async def get_all_movies():
    query = movies.select()
    return await database.fetch_all(query=query)

async def get_movie(id):
    query = movies.select(movies.c.id==id)
    return await database.fetch_one(query=query)

async def delete_movie(id: int):
    query = movies.delete().where(movies.c.id==id)
    return await database.execute(query=query)

async def update_movie(id: int, payload: MovieIn):
    query = (
        movies
        .update()
        .where(movies.c.id == id)
        .values(**payload.dict())
    )
    return await database.execute(query=query)
        
Aggiorniamo il nostro models.py in modo che possiamo utilizzare il modello Pydantic con la tabella sqlalchemy.
            #~/movie-service/app/api/models.py

from pydantic import BaseModel
from typing import List, Optional


class MovieIn(BaseModel):
    name: str
    plot: str
    genres: List[str]
    casts: List[str]


class MovieOut(MovieIn):
    id: int


class MovieUpdate(MovieIn):
    name: Optional[str] = None
    plot: Optional[str] = None
    genres: Optional[List[str]] = None
    casts: Optional[List[str]] = None
        

La classe MovieIn è il modello base da usare per aggiungere un film al database. Dobbiamo aggiungere id a questo modello per ottenerlo dal database, creando il modello ModeulOut. Il modello MovieUpdate consente di impostare i valori nel modello come facoltativi in modo che durante l’aggiornamento del filmato possa essere inviato solo il campo che deve essere aggiornato.

Possiamo ora collegarci alla pagina “docs” della nostra applicazione ed inizia a giocare con l’API.

 

 

Modelli di gestione dei dati nei microservice

La gestione dei dati è uno degli aspetti più critici durante la creazione di un microservizio. Poiché le diverse funzioni dell’applicazione sono gestite da servizi diversi, l’utilizzo di un database può essere complicato.

Di seguito sono riportati alcuni modelli che è possibile utilizzare per gestire il flusso di dati nell’applicazione.

 

Database per servizio

L’utilizzo di un database per ogni servizio è ottimo se vuoi che i tuoi microservizi siano il più possibile accoppiati debolmente. Avere un database diverso per ogni servizio ci consente di scalare servizi diversi in modo indipendente. Una transazione che coinvolge più database viene eseguita tramite API ben definite. Ciò ha il suo svantaggio in quanto non è semplice implementare transazioni complesse che coinvolgono più servizi . Inoltre, l’aggiunta del sovraccarico di rete rende questo approccio meno efficiente da usare.

 

Database condiviso

Se ci sono molte transazioni che coinvolgono più servizi è meglio usare un database condiviso. Ciò comporta i vantaggi di un’applicazione altamente coerente, ma elimina la maggior parte dei vantaggi offerti dall’architettura dei microservizi. Gli sviluppatori che lavorano su un servizio devono coordinarsi con le modifiche allo schema in altri servizi.

 

Composizione API

Nelle transazioni che coinvolgono più database, il compositore API funge da gateway API ed esegue chiamate API ad altri microservizi nell’ordine richiesto. Infine, i risultati di ogni microservizio vengono restituiti al servizio client dopo aver eseguito un join in memoria. Lo svantaggio di questo approccio è l’inefficienza dei join in memoria di un set di dati di grandi dimensioni.

 

 

Creazione di un microservice Python in Docker

Il problema della distribuzione del microservice può essere notevolmente ridotto utilizzando Docker. Docker aiuta a incapsulare ogni servizio e a scalarlo in modo indipendente.

 

Installazione di Docker e Docker Compose

Se non hai già installato docker nel tuo sistema, verifichiamo se docker è installato eseguendo il comando docker. Dopo aver completato l’installazione di Docker, installiamo Docker Compose . Docker Compose viene utilizzato per definire ed eseguire più container Docker. E’ utile anche per facilitare l’interazione tra i container.

 

Creazione del servizio Movies

Poiché gran parte del lavoro per la creazione di un servizio Movies è già stato svolto all’inizio con FastAPI, riutilizzeremo il codice che abbiamo già scritto. Creiamo una cartella nuova di zecca, che chiamerò python-microservices e spostiamoci il codice che abbiamo scritto in precedenza.

Quindi, la struttura delle cartelle sarebbe simile a questa:

            python-microservices/
└── movie-service/
    ├── app/
    └── env/
        

Prima di tutto, creiamo un file requirements.txt in cui conserveremo tutte le dipendenze che utilizzeremo in movie-service.

Creiamo un nuovo file requirements.txt all’interno di movie-service e aggiungiamo quanto segue:

            asyncpg==0.20.1
databases[postgresql]==0.2.6
fastapi==0.48.0
SQLAlchemy==1.3.13
uvicorn==0.11.2
httpx==0.11.1
        

Abbiamo usato tutte le librerie menzionate nel file tranne httpx che utilizzerai mentre effettui una chiamata API da servizio ad un altro.

Creiamo un Dockerfile all’interno di movie-service come segue:

            FROM python:3.8-slim

WORKDIR /app

COPY ./requirements.txt /app/requirements.txt

RUN apt-get update \
    && apt-get install gcc -y \
    && apt-get clean

RUN pip install -r /app/requirements.txt \
    && rm -rf /root/.cache/pip

COPY . /app/
        

Per prima cosa definiamo quale versione di Python usare. Quindi impostiamo la cartella app come WORKDIR all’interno del contenitore Docker. Dopodiché viene installato gcc, richiesto dalle librerie che utilizziamo nell’applicazione.

Infine, installiamo tutte le dipendenze in requirements.txt e copiamo tutti i file all’interno di movie-service/app.

Aggiorniamo db.py e sostituiamo:

            DATABASE_URI = 'postgresql://movie_user:movie_password@localhost/movie_db'
        

con:

            DATABASE_URI = os.getenv('DATABASE_URI')
        

NOTA: non dimentichiamo di importare os nella parte superiore del file.

È necessario eseguire questa operazione in modo da poter fornire in seguito DATABASE_URI come variabile di ambiente.

Inoltre, aggiorniamo main.py e sostituiamo:

            app.include_router(movies)
        
con:
            app.include_router(movies, prefix='/api/v1/movies', tags=['movies'])
        

Abbiamo aggiunto prefix=/api/v1/movies in modo da gestire più facilmente le diverse versioni dell’API. Inoltre, i tag facilitano la ricerca delle API relative ai movies nei docs di FastAPI.

Inoltre, dobbiamo aggiornare i nostri modelli in modo che casts memorizzi l’ID del cast invece del nome effettivo. Quindi, aggiorniamo models.py come segue:

            #~/python-microservices/movie-service/app/api/models.py

from pydantic import BaseModel
from typing import List, Optional

class MovieIn(BaseModel):
    name: str
    plot: str
    genres: List[str]
    casts_id: List[int]


class MovieOut(MovieIn):
    id: int


class MovieUpdate(MovieIn):
    name: Optional[str] = None
    plot: Optional[str] = None
    genres: Optional[List[str]] = None
    casts_id: Optional[List[int]] = None
        
Allo stesso modo, dobbiamo aggiornare le tabelle del database, aggiorniamo db.py:
            #~/python-microservices/movie-service/app/api/db.py

import os

from sqlalchemy import (Column, DateTime, Integer, MetaData, String, Table,
                        create_engine, ARRAY)

from databases import Database

DATABASE_URL = os.getenv('DATABASE_URL')

engine = create_engine(DATABASE_URL)
metadata = MetaData()

movies = Table(
    'movies',
    metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String(50)),
    Column('plot', String(250)),
    Column('genres', ARRAY(String)),
    Column('casts_id', ARRAY(Integer))
)

database = Database(DATABASE_URL)
        

Ora, aggiorniamo movies.py per verificare se il cast con l’ID specificato si presenta nel cast-service prima di aggiungere un nuovo film o aggiornare un film.

            #~/python-microservices/movie-service/app/api/movies.py

from typing import List
from fastapi import APIRouter, HTTPException

from app.api.models import MovieOut, MovieIn, MovieUpdate
from app.api import db_manager
from app.api.service import is_cast_present

movies = APIRouter()

@movies.post('/', response_model=MovieOut, status_code=201)
async def create_movie(payload: MovieIn):
    for cast_id in payload.casts_id:
        if not is_cast_present(cast_id):
            raise HTTPException(status_code=404, detail=f"Cast with id:{cast_id} not found")

    movie_id = await db_manager.add_movie(payload)
    response = {
        'id': movie_id,
        **payload.dict()
    }

    return response

@movies.get('/', response_model=List[MovieOut])
async def get_movies():
    return await db_manager.get_all_movies()

@movies.get('/{id}/', response_model=MovieOut)
async def get_movie(id: int):
    movie = await db_manager.get_movie(id)
    if not movie:
        raise HTTPException(status_code=404, detail="Movie not found")
    return movie

@movies.put('/{id}/', response_model=MovieOut)
async def update_movie(id: int, payload: MovieUpdate):
    movie = await db_manager.get_movie(id)
    if not movie:
        raise HTTPException(status_code=404, detail="Movie not found")

    update_data = payload.dict(exclude_unset=True)

    if 'casts_id' in update_data:
        for cast_id in payload.casts_id:
            if not is_cast_present(cast_id):
                raise HTTPException(status_code=404, detail=f"Cast with given id:{cast_id} not found")

    movie_in_db = MovieIn(**movie)

    updated_movie = movie_in_db.copy(update=update_data)

    return await db_manager.update_movie(id, updated_movie)

@movies.delete('/{id}', response_model=None)
async def delete_movie(id: int):
    movie = await db_manager.get_movie(id)
    if not movie:
        raise HTTPException(status_code=404, detail="Movie not found")
    return await db_manager.delete_movie(id)
        

Aggiungiamo un servizio per effettuare una chiamata API al casts-service:

            #~/python-microservices/movie-service/app/api/service.py

import os
import httpx

CAST_SERVICE_HOST_URL = 'http://localhost:8002/api/v1/casts/'
url = os.environ.get('CAST_SERVICE_HOST_URL') or CAST_SERVICE_HOST_URL

def is_cast_present(cast_id: int):
    r = httpx.get(f'{url}{cast_id}')
    return True if r.status_code == 200 else False
        

Nel precedente codice si effettua una chiamata API per ricavare il cast con uno specifico id e restituire true se il cast esiste altrimenti false.


Creazione del casts-service

Analogamente al movie-service, per creare un casts-service utilizzeremo FastAPI e un database PostgreSQL.

Creiamo una struttura di cartelle come la seguente:

            python-microservices/
.
├── cast_service/
│   ├── app/
│   │   ├── api/
│   │   │   ├── casts.py
│   │   │   ├── db_manager.py
│   │   │   ├── db.py
│   │   │   ├── models.py
│   │   ├── main.py
│   ├── Dockerfile
│   └── requirements.txt
├── movie_service/
...
        
Aggiungiamo quanto segue a requirements.txt:
            asyncpg==0.20.1
databases[postgresql]==0.2.6
fastapi==0.48.0
SQLAlchemy==1.3.13
uvicorn==0.11.2
        
Dockerfile:
            FROM python:3.8-slim

WORKDIR /app

COPY ./requirements.txt /app/requirements.txt

RUN apt-get update \
    && apt-get install gcc -y \
    && apt-get clean

RUN pip install -r /app/requirements.txt \
    && rm -rf /root/.cache/pip

COPY . /app/
        
main.py:
            #~/python-microservices/cast-service/app/main.py

from fastapi import FastAPI
from app.api.casts import casts
from app.api.db import metadata, database, engine

metadata.create_all(engine)

app = FastAPI()

@app.on_event("startup")
async def startup():
    await database.connect()

@app.on_event("shutdown")
async def shutdown():
    await database.disconnect()

app.include_router(casts, prefix='/api/v1/casts', tags=['casts'])
        

Abbiamo aggiunto il prefix /api/v1/casts in modo che la gestione dell’API diventi più semplice. Inoltre, l’aggiunta dei tags facilita la ricerca dei documenti relativi a casts nel docs di FastAPI.

casts.py

            #~/python-microservices/cast-service/app/api/casts.py

from fastapi import APIRouter, HTTPException
from typing import List

from app.api.models import CastOut, CastIn, CastUpdate
from app.api import db_manager

casts = APIRouter()

@casts.post('/', response_model=CastOut, status_code=201)
async def create_cast(payload: CastIn):
    cast_id = await db_manager.add_cast(payload)

    response = {
        'id': cast_id,
        **payload.dict()
    }

    return response

@casts.get('/{id}/', response_model=CastOut)
async def get_cast(id: int):
    cast = await db_manager.get_cast(id)
    if not cast:
        raise HTTPException(status_code=404, detail="Cast not found")
    return cast
        
db_manager.py
            #~/python-microservices/cast-service/app/api/db_manager.py

from app.api.models import CastIn, CastOut, CastUpdate
from app.api.db import casts, database


async def add_cast(payload: CastIn):
    query = casts.insert().values(**payload.dict())

    return await database.execute(query=query)

async def get_cast(id):
    query = casts.select(casts.c.id==id)
    return await database.fetch_one(query=query)
        
db.py
            #~/python-microservices/cast-service/app/api/db.py

import os

from sqlalchemy import (Column, Integer, MetaData, String, Table,
                        create_engine, ARRAY)

from databases import Database

DATABASE_URI = os.getenv('DATABASE_URI')

engine = create_engine(DATABASE_URI)
metadata = MetaData()

casts = Table(
    'casts',
    metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String(50)),
    Column('nationality', String(20)),
)

database = Database(DATABASE_URI)
        
models.py
            #~/python-microservices/cast-service/app/api/models.py

from pydantic import BaseModel
from typing import List, Optional

class CastIn(BaseModel):
    name: str
    nationality: Optional[str] = None


class CastOut(CastIn):
    id: int


class CastUpdate(CastIn):
    name: Optional[str] = None
        

Esecuzione del microservizio utilizzando Docker Compose

Per eseguire i microservice, è necessario creare un  filedocker-compose.yml e aggiungiamo quanto segue:

            version: '3.7'

services:
  movie_service:
    build: ./movie-service
    command: uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
    volumes:
      - ./movie-service/:/app/
    ports:
      - 8001:8000
    environment:
      - DATABASE_URI=postgresql://movie_db_username:movie_db_password@movie_db/movie_db_dev
      - CAST_SERVICE_HOST_URL=http://cast_service:8000/api/v1/casts/

  movie_db:
    image: postgres:12.1-alpine
    volumes:
      - postgres_data_movie:/var/lib/postgresql/data/
    environment:
      - POSTGRES_USER=movie_db_username
      - POSTGRES_PASSWORD=movie_db_password
      - POSTGRES_DB=movie_db_dev

  cast_service:
    build: ./cast-service
    command: uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
    volumes:
      - ./cast-service/:/app/
    ports:
      - 8002:8000
    environment:
      - DATABASE_URI=postgresql://cast_db_username:cast_db_password@cast_db/cast_db_dev

  cast_db:
    image: postgres:12.1-alpine
    volumes:
      - postgres_data_cast:/var/lib/postgresql/data/
    environment:
      - POSTGRES_USER=cast_db_username
      - POSTGRES_PASSWORD=cast_db_password
      - POSTGRES_DB=cast_db_dev

volumes:
  postgres_data_movie:
  postgres_data_cast:
        

Abbiamo 4 diversi servizi, movie_service, un database per movie_service, cast_service e un database per cast service. Abbiamo esposto il movie_service alla porta 8001 e modo simile cast_service alla porta 8002.

Per il database, abbiamo utilizzato i volumi in modo che i dati non vengano distrutti quando il contenitore docker viene chiuso. 

Eseguiamo il docker-compose usando il comando:

            docker-compose up -d
        

Questo comando crea la dockar image, se non esiste già, e la esegue.

Andiamo su http://localhost:8002/docs per aggiungere un cast nel casts-service. Allo stesso modo, http://localhost:8001/docs  per aggiungere il film nel movie-service.

Utilizziamo di Nginx per accedere a entrambi i servizi utilizzando un singolo indirizzo host

Abbiamo distribuito i microservizi utilizzando Docker compose, ma c’è un piccolo problema. È necessario accedere a ciascuno dei microservizi utilizzando una porta diversa. Si può risolvere questo problema utilizzando il reverse-proxy di Nginx , utilizzando Nginx puoi indirizzare la richiesta aggiungendo un middleware che indirizza le nostre richieste a diversi servizi in base all’URL dell’API.

Aggiungamo un nuovo file nginx_config.conf all’interno python-microservices con i seguenti contenuti.

            server {
  listen 8080;

  location /api/v1/movies {
    proxy_pass http://movie_service:8000/api/v1/movies;
  }

  location /api/v1/casts {
    proxy_pass http://cast_service:8000/api/v1/casts;
  }

}
        

Stiamo eseguendo Nginx alla porta 8080 e instradando le richieste al movie-service se l’endpoint inizia con /api/v1/movies e in modo simile al casts-service se l’endpoint inizia con /api/v1/casts.

Ora dobbiamo aggiungere il servizio nginx nel nostro file docker-compose-yml. Aggiungiamo il seguente servizio dopo il servizio cast_db:

            ...
nginx:
    image: nginx:latest
    ports:
      - "8080:8080"
    volumes:
      - ./nginx_config.conf:/etc/nginx/conf.d/default.conf
    depends_on:
      - cast_service
      - movie_service
...
        
Ora, spegnamo i contenitori docker con il comando:
            docker-compose down
        

Ed eseguiamolo di nuovo con:

            docker-compose up -d
        

Ora possiamo accedere sia al movie-service che al casts-service tramite la porta 8080.

Collegandosi a http://localhost:8080/api/v1/movies/ otteniamo l’elenco dei film.

Ora, per accedere ai docs dei service è necessario modificare il  main.py del movie-service la seguente riga:

            app = FastAPI()
        

con

            app = FastAPI(openapi_url="/api/v1/movies/openapi.json", docs_url="/api/v1/movies/docs")
        
Allo stesso modo, per il casts-service si deve sostituirlo con
            app = FastAPI(openapi_url="/api/v1/casts/openapi.json", docs_url="/api/v1/casts/docs")
        

Abbiamo cambiato l’endpoint in cui vengono serviti i docs e da dove viene servito il openapi.json.

Ora possiamo accedere ai docs da http://localhost:8080/api/v1/movies/docs per il movie-service e da http://localhost:8080/api/v1/casts/docs per il casts-service.

 

Conclusione

L’architettura del microservice è ottima per suddividere una grande applicazione monolitica in logiche di business separate, ma anche questo comporta una complicazione. Python è ottimo per la creazione di microservizi grazie all’esperienza degli sviluppatori e a tonnellate di pacchetti e framework per rendere gli sviluppatori più produttivi.

Puoi trovare il codice completo presentato in questo tutorial su Github.

FastAPI con SQLAlchemy, PostgreSQL, Alembic e Docker – Parte 2 (versione asincrona)

scienzadeidati articoli - Database con FASTAPI [Parte2]

 

Introduzione

Lo scopo di questo articolo è creare una semplice guida su come utilizzare FastAPI con database relazionali in modo asincrono e utilizzare Alembic per le migrazioni.

Prima di iniziare con questo tutorial, leggi la parte 1 di questo tutorial.

Ecco il codice funzionante completo su github.

Iniziamo

Installa il pacchetto richiesto databases.

databases è un pacchetto leggero con supporto asyncio per molti database relazionali e utilizza le principali query di sqlalchemy.

Per lo scopo di questo tutorial userò pipenv , ma puoi usare pip o poetry o conda o qualsiasi altro gestore di pacchetti che preferisci.

            pipenv install databases
pipenv install databases[postgresql]
pipenv install asyncpg
        

useremo la stessa configurazione docker descritta nell’articolo precedente.

Dockerfile

            # Pull base image
FROM python:3.7

# Set environment varibles
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1

WORKDIR /code/

# Install dependencies
RUN pip install pipenv
COPY Pipfile Pipfile.lock /code/
RUN pipenv install --system --dev

COPY . /code/

EXPOSE 8000
        

docker-compose.yml

            version: "3"

services:
  db:
    image: postgres:11
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=test_db
  web:
    build: .
    command: bash -c "uvicorn main:app --host 0.0.0.0 --port 8000 --reload"
    volumes:
      - .:/code
    ports:
      - "8000:8000"
    depends_on:
      - db

  pgadmin:
    container_name: pgadmin
    image: dpage/pgadmin4
    environment:
      - [email protected]
      - PGADMIN_DEFAULT_PASSWORD=admin
    ports:
      - "5050:80"
    depends_on:
      - db
        

manterremo schema.py così com’è

            # schema.py

from pydantic import BaseModel


class User(BaseModel):
    first_name: str
    last_name: str
    age: int

    class Config:
        orm_mode = True
        

Analogamente manteniamo lo stesso alembic.ini.

Modifichiamo il file .env come segue:

DATABASE_URL=postgresql://postgres:postgres@db:5432/postgres

ed lo utilizziamo nel file db.py dove inizializzeremo il nostro database.

            # db.py

import os
from databases import Database
from dotenv import load_dotenv
import sqlalchemy

BASE_DIR = os.path.dirname(os.path.abspath(__file__))
load_dotenv(os.path.join(BASE_DIR, ".env"))

db = Database(os.environ["DATABASE_URL"])
metadata = sqlalchemy.MetaData()
        

A questo punto dobbiamo prevedere il file app.py, dove gestiremo l’inizializzazione dell’app con la connessione e la terminazione del database.

            # app.py

from db import db
from fastapi import FastAPI


app = FastAPI(title="Async FastAPI")


@app.on_event("startup")
async def startup():
    await db.connect()


@app.on_event("shutdown")
async def shutdown():
    await db.disconnect()
        
Di conseguenza dobbiamo modificare il file model.py.
            # model.py

from db import db

users = sqlalchemy.Table(
    "users",
    metadata,
    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column("first_name", sqlalchemy.String),
    sqlalchemy.Column("last_name", sqlalchemy.String),
    sqlalchemy.Column("age", sqlalchemy.Integer),
)
        
miglioriamo il nostro model.py, creando una semplice classe di gestione del modello User
            # model.py

import sqlalchemy
from db import db, metadata, sqlalchemy


users = sqlalchemy.Table(
    "users",
    metadata,
    sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
    sqlalchemy.Column("first_name", sqlalchemy.String),
    sqlalchemy.Column("last_name", sqlalchemy.String),
    sqlalchemy.Column("age", sqlalchemy.Integer),
)


class User:
    @classmethod
    async def get(cls, id):
        query = users.select().where(users.c.id == id)
        user = await db.fetch_one(query)
        return user

    @classmethod
    async def create(cls, **user):
        query = users.insert().values(**user)
        user_id = await db.execute(query)
        return user_id
        

Questa classe fornirà un’implementazione più semplice per i metodi get e create.

Di conseguenza dobbiamo modificare il file main.py come segue.

            # main.py

import uvicorn
from models import User as ModelUser
from schema import User as SchemaUser
from app import app
from db import db


@app.post("/user/")
async def create_user(user: SchemaUser):
    user_id = await ModelUser.create(**user.dict())
    return {"user_id": user_id}


@app.get("/user/{id}", response_model=SchemaUser)
async def get_user(id: int):
    user = await ModelUser.get(id)
    return SchemaUser(**user).dict()


if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)
        

Da notare come ora stiamo usando async/await per gestire le chiamate verso il database.

È tempo quindi di modificare la configurazione del nostro Alembic.

Bisogna modificare
import models

target_metadata = models.Base.metadata

in
import models
from db import metadata

target_metadata = metadata

 

NOTA: è importante importare i modelli prima dei metadati.

 

Quindi procediamo a ricompilare il nostro Docker:

  • Costruzione: docker-compose build
  • Creazioni migrazioni: docker-compose run web alembic revision --autogenerate
  • Migrazione: docker-compose run web alembic upgrade head
  • Esecuzione: docker-compose up

Ora aprendo il browser e collegarsi a http://localhost:8000

articoli - pgadmin localhost 7
richiesta POST per creare un utente
articoli - pgadmin localhost 8
risposta alla richiesta precedente
articoli - pgadmin localhost 9
richiesta GET dello stesso utente con risposta

Spero che questo tutorial sia stato abbastanza completo su come utilizzare FastAPI con PostgreSQL, SQLAlchemy e Alembic utilizzando la potenza di async .

Il codice completo per questo articolo è disponibile su github.

FastAPI con SQLAlchemy, PostgreSQL e Alembic e ovviamente Docker – Parte 1

scienzadeidati articoli - Database con FASTAPI

La guida completa (tutorial) all’utilizzo dei database relazionali con FastAPI

Introduzione

Lo scopo di questo articolo è creare una semplice guida su come utilizzare FastAPI con i database relazionali e utilizzare Alembic per le migrazioni. Un’implementazione che può essere utilizzata in produzione.

Installazione

Useremo pipenv per gestire sia i miei pacchetti che l’ambiente virtuale. Sentiti libero di gestire i tuoi pacchetti come preferisci.

Pacchetti Utilizzati
  • python ≥ 3.5
  • fastapi
  • pydantic
  • fastapi-sqlalchemy
  • alembic
  • psycopg2
  • uvicorn

Creiamo una nuova directory (puoi chiamarla come vuoi).

Ad esempio possiamo chiamarla fastapi_sqlalchemy_alembic

Apri il terminale e scrivi

cd fastapi_sqlalchemy_alembic

pipenv install --three fastapi fastapi-sqlalchemy pydantic alembic psycopg2 uvicorn

Utilizzerò docker compose per gestire il database, potresti ricevere alcuni errori relativi all’installazione di psycopg2 se stai utilizzando Mac-OS, ma dal momento che utilizzeremo Docker, questo non è molto importante.

Main.py

Iniziamo con un semplice file principale per Fastapi

            # main.py

import uvicorn
from fastapi import FastAPI

app = FastAPI()

@app.post("/user/", response_model=User)
def create_user(user: User):
    return user

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)
    
        

Configurazione di Docker

            # Dockerfile

# Pull base image
FROM python:3.7

# Set environment varibles
ENV PYTHONDONTWRITEBYTECODE 1
ENV PYTHONUNBUFFERED 1

WORKDIR /code/

# Install dependencies
RUN pip install pipenv
COPY Pipfile Pipfile.lock /code/
RUN pipenv install --system --dev

COPY . /code/

EXPOSE 8000
        
            # docker-compose.yml

version: "3"

services:
  db:
    image: postgres:11
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=test_db
  web:
    build: .
    command: bash -c "alembic upgrade head && uvicorn main:app --host 0.0.0.0 --port 8000 --reload"
    volumes:
      - .:/code
    ports:
      - "8000:8000"
    depends_on:
      - db

  pgadmin:
    container_name: pgadmin
    image: dpage/pgadmin4
    environment:
      - [email protected]
      - PGADMIN_DEFAULT_PASSWORD=admin
    ports:
      - "5050:80"
    depends_on:
      - db
        

La configurazione precedente creerà un cluster con 3 contenitori:

  1. contenitore web — dove verrà eseguito il codice effettivo
  2. contenitore db
  3. contenitore pgadmin

Nella tua directory corrente dovresti vedere 5 file:

  1. file pip
  2. Pipfile.lock
  3. Dockerfile
  4. docker-compose.yml
  5. main.py

Quindi costruiamo il cluster docker, eseguendo il seguente comando nel terminale:

docker-compose build

 

Alembic

Si inizializza Alembic eseguendo il seguente cmd nel terminale della stessa directory:

alembic init alembic

In questo modo si crea una directory chiamata alembic e un file di configurazione alembic.ini

articoli - alembic

Il prossimo passo è aprire il file alembic.ini con il tuo editor e modificare la riga 38 da:

sqlalchemy.url = driver://user:pass@localhost/dbname

a:

sqlalchemy.url =

e quindi aggiungere l’url db postgres nel file alembic/env.py.

Dato che stiamo creando una configurazione che dovrebbe funzionare in produzione, non possiamo scrivere in chiaro il nome utente e password del database all’interno del file alembic.ini. Dobbiamo invece leggerlo dalle variabili d’ambiente tramite lo script alembic/env.py.

 
Installiamo python-dotenv

pipenv install python-dotenv

Dal momento che abbiamo aggiunto un nuovo pacchetto, ricostruiamo il docker per includerlo:

docker-compose build

 
Creiamo un .envfile

e aggiungiamo quanto segue:

DATABASE_URL = postgresql+psycopg2://postgres:postgres@db:5432

Come abbiamo scoperto l’URL del database?

DATABASE_URL = postgresql+psycopg2://{utente}:{password}@{host}:{porta}

se controlliamo la configurazione docker-compose.yml per il database:

            ...
db:
    image: postgres:11
    ports:
        - "5432:5432"
    environment:
        - POSTGRES_USER=postgres
        - POSTGRES_PASSWORD=postgres
        - POSTGRES_DB=test_db

...

        

Vediamo come user=postgres, password=postgres e poiché siamo nel mondo docker, l’host del database non sarà localhost ma il nome del contenitore, nel nostro caso lo abbiamo chiamato db.

Quindi aggiungiamo questa riga al nostro .env:

DATABASE_URL = postgresql+psycopg2://postgres:postgres@db:5432

Apriamo alembic\env.py , che dovrebbe apparire come segue:

            from logging.config import fileConfig

from sqlalchemy import engine_from_config
from sqlalchemy import pool

from alembic import context

# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config

# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)

# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = None
        

Dobbiamo quindi apportare le seguenti modifiche:

            from logging.config import fileConfig

from sqlalchemy import engine_from_config
from sqlalchemy import poolfrom alembic import context
# ---------------- added code here -------------------------#
import os, sys
from dotenv import load_dotenv

BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
load_dotenv(os.path.join(BASE_DIR, ".env"))
sys.path.append(BASE_DIR)
#------------------------------------------------------------#
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# ---------------- added code here -------------------------#
# this will overwrite the ini-file sqlalchemy.url path
# with the path given in the config of the main code
config.set_main_option("sqlalchemy.url", os.environ["DATABASE_URL"])
#------------------------------------------------------------#
# Interpret the config file for Python logging.
# This line sets up loggers basically.
fileConfig(config.config_file_name)

# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
# ---------------- added code here -------------------------#
import models
#------------------------------------------------------------#
# ---------------- changed code here -------------------------#
# here target_metadata was equal to None
target_metadata = models.Base.metadata
#------------------------------------------------------------#

        

Modelli

Ora creiamo i nostri modelli da migrare a PostgreSQL:

            # models.py

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String

Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True, index=True)
    first_name = Column(String,)
    last_name = Column(String)
    age = Column(Integer)
        

Quindi è necessario generare la revisione per la nostra prima migrazione:

docker-compose run web alembic revision --autogenerate -m "First migration"

articoli - alembic update

Se il comando è stato eseguito correttamente dovresti vedere un nuovo file generato nella directory “versions”:

articoli - alembic2

Infine possiamo eseguire la migrazione:

docker-compose run web alembic upgrade head

articoli - alembic update2

Pgadmin

Per controllare le migrazioni create è sufficiente eseguire nel terminale il seguente comando:

docker-compose up

ed aspettare un po’, ci vuole un po’ di tempo per caricare. 

A caricamento concluso, si apre un browser all’indirizzo localhost:5050 e si può accedere con [email protected] e password=admin, come da impostazione definita nel nostro docker-compose.yml

            ...
pgadmin:
    container_name: pgadmin
    image: dpage/pgadmin4
    environment:
      - [email protected]
      - PGADMIN_DEFAULT_PASSWORD=admin
    ports:
      - "5050:80"
    depends_on:
      - db
...
        
articoli - pgadmin localhost

Una volta entrati, è necessario creare una nuova connessione. Alla voce “General” si deve scegliere un nome per la connessione.

Alla voce “Connection” bisogna inserire i riferimenti e le credenziali per connettersi al database (la password è postgres)

articoli - pgadmin localhost 2

Navigamio fino a trovare la nostra tabella User.

Servers > {your-connection-name} > Databases > postgres > Schemas > public > Tables > users

articoli - pgadmin localhost 3
articoli - pgadmin localhost 4

Ora possiamo sicuramente dire che la nostra migrazione si è conclusa con successo.

Finore abbiamo implementato completamente l’ORM con le migrazioni Alembic. Il prossimo passo è collegarlo allo schema Pydantic.

 

Schema — Modello Pydantic

            # schema.py

from pydantic import BaseModel

class User(BaseModel):
    first_name: str
    last_name: str = None
    age: int
    class Config:
        orm_mode = True
        

Si noti che abbiamo una classe Config in cui impostiamo orm_mode=True ed è tutto ciò di cui abbiamo bisogno per i modelli Pydantic, senza i quali gli oggetti del modello Sqlalchemy non verranno serializzati su JSON.

Connettiamo tutto all’interno di main.py

            import uvicorn
from fastapi import FastAPI

#--------------- added code ------------------------#
import os
from fastapi_sqlalchemy import DBSessionMiddleware
from fastapi_sqlalchemy import db
from models import User as ModelUser
from schema import User as SchemaUser
from dotenv import load_dotenv

BASE_DIR = os.path.dirname(os.path.abspath(__file__))
load_dotenv(os.path.join(BASE_DIR, ".env"))
#---------------------------------------------------#

app = FastAPI()
#--------------- added code ------------------------#
app.add_middleware(DBSessionMiddleware, 
                    db_url=os.environ["DATABASE_URL"])
#---------------------------------------------------#
#--------------- modified code ---------------------#
@app.post("/user/", response_model=SchemaUser)
def create_user(user: SchemaUser):
    db_user = ModelUser(first_name=user.first_name, 
                        last_name=user.last_name, 
                        age=user.age
    )
    db.session.add(db_user)
    db.session.commit()
    return db_user
#---------------------------------------------------#

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)
        

Fantastico, eseguiamo di nuovo docker-compose up

Ora andiamo a \docs e invochiamo l’endpoint Create User

articoli - pgadmin localhost 5a

Possiamo quindi controllare su pgadmin se ha funzionato correttamente.

Colleghiamoci a localhost:5050

articoli - pgadmin localhost 6a

Spero che questo tutorial sia stato abbastanza completo su come utilizzare FastAPI con PostgreSQL, SQLAlchemy e Alembic.

Il codice completo di questo articolo è disponibile su github.

Nella Parte 2 discuteremo come lavorare con i database in modo asincrono.

Grafici delle Serie Temporali con Dash, Flask, TimescaleDB e Docker – Parte 3

scienzadeidati articoli - Grafici delle Serie Temporali con Dash, Flask, TimescaleDB, and Docker - Parte 3

Questo è il terzo articolo della serie su TimescaleDB, Flask e Dash.

Il primo articolo si è concentrato su come configurare il database TimescaleDB all’interno di un’istanza Docker, insieme a PgAdmin per gestire il database.

La seconda parte si è concentrata sul linguaggio Python, creando un sito Web con Flask e quindi integrandolo il framework Web Dash all’interno di Flask.

Questa terza parte si concentra sull’utilizzo di Dash per creare un’applicazione Web reattiva a pagina singola per visualizzare i dati all’interno del database TimescaleDB tramite grafici Plotly accattivanti e interattivi .

Tutto il codice per questo tutorial può essere trovato qui su GitHub.

Parte 3 - Grafici interattivi con Dash per creare un'applicazione di data science

Nella seconda parte di questa serie abbiamo creato un’istanza Dash con il file dash_setup.py, che prevede il seguente codice:

            # /app/dash_setup.py

import dash
from flask.helpers import get_root_path


def register_dashapps(app):
    """
    Register Dash apps with the Flask app
    """

    # external Bootstrap CSS stylesheets
    external_stylesheets = [
        'https://cdn.jsdelivr.net/npm/[email protected]/dist/css/bootstrap.min.css'
    ]
    
    # external Bootstrap JavaScript files
    external_scripts = [
        "https://code.jquery.com/jquery-3.5.1.slim.min.js",
        "https://cdn.jsdelivr.net/npm/[email protected]/dist/umd/popper.min.js",
        "https://cdn.jsdelivr.net/npm/[email protected]/dist/js/bootstrap.min.js",
    ]

    # To ensure proper rendering and touch zooming for all devices, add the responsive viewport meta tag
    meta_viewport = [{
        "name": "viewport", 
        "content": "width=device-width, initial-scale=1, shrink-to-fit=no"
    }]

    dashapp = dash.Dash(
        __name__,
        # This is where the Flask app gets appointed as the server for the Dash app
        server = app,
        url_base_pathname = '/dash/',
        # Separate assets folder in "static_dash" (optional)
        assets_folder = get_root_path(__name__) + '/static_dash/', 
        meta_tags = meta_viewport, 
        external_scripts = external_scripts,
        external_stylesheets = external_stylesheets
    )
    dashapp.title = 'Dash Charts in Single-Page Application'

    # Some of these imports should be inside this function so that other Flask
    # stuff gets loaded first, since some of the below imports reference the other
    # Flask stuff, creating circular references 
    from app.dashapp.layout import get_layout
    from app.dashapp.callbacks import register_callbacks

    with app.app_context():

        # Assign the get_layout function without calling it yet
        dashapp.layout = get_layout

        # Register callbacks
        # Layout must be assigned above, before callbacks
        register_callbacks(dashapp)

    return None

        

In questo codice si utilizzano le funzioni get_layout e register_callback che sono importati da specifici moduli.

Iniziamo quindi a descrive i due moduli dashapp.layout e dashapp.callbacks

            # /app/dashapp/layout.py

import os

from flask import url_for
import dash_html_components as html
import dash_core_components as dcc
import dash_bootstrap_components as dbc
from psycopg2.extras import RealDictCursor

# Local imports
from app.database import get_conn


def get_navbar():
    """Get a Bootstrap 4 navigation bar for our single-page application's HTML layout"""

    return dbc.NavbarSimple(
        children=[
            dbc.NavItem(dbc.NavLink("Blog", href="https://mccarthysean.dev")),
            dbc.NavItem(dbc.NavLink("IJACK", href="https://myijack.com")),
            dbc.DropdownMenu(
                children=[
                    dbc.DropdownMenuItem("References", header=True),
                    dbc.DropdownMenuItem("Dash", href="https://dash.plotly.com/"),
                    dbc.DropdownMenuItem("Dash Bootstrap Components", href="https://dash-bootstrap-components.opensource.faculty.ai/"),
                    dbc.DropdownMenuItem("Testdriven", href="https://testdriven.io/"),
                ],
                nav=True,
                in_navbar=True,
                label="Links",
            ),
        ],
        brand="Home",
        brand_href="/",
        color="dark",
        dark=True,
    )


def get_sensor_types():
    """Get a list of different types of sensors"""
    sql = """
        --Get the labels and underlying values for the dropdown menu "children"
        SELECT 
            distinct 
            type as label, 
            type as value
        FROM sensors;
    """
    conn = get_conn()
    with conn.cursor(cursor_factory=RealDictCursor) as cursor:
        cursor.execute(sql)
        # types is a list of dictionaries that looks like this, for example:
        # [{'label': 'a', 'value': 'a'}]
        types = cursor.fetchall()
    
    return types


def get_body():
    """Get the body of the layout for our Dash SPA"""

    types = get_sensor_types()

    # The layout starts with a Bootstrap row, containing a Bootstrap column
    return dbc.Row(
        [
            # 1st column and dropdown (NOT empty at first)
            dbc.Col(
                [
                    html.Label('Types of Sensors', style={'margin-top': '1.5em'}),
                    dcc.Dropdown(
                        options=types,
                        value=types[0]['value'],
                        id="types_dropdown"
                    )
                ], xs=12, sm=6, md=4
            ),
            # 2nd column and dropdown (empty at first)
            dbc.Col(
                [
                    html.Label('Locations of Sensors', style={'margin-top': '1.5em'}),
                    dcc.Dropdown(
                        # options=None,
                        # value=None,
                        id="locations_dropdown"
                    )
                ], xs=12, sm=6, md=4
            ),
            # 3rd column and dropdown (empty at first)
            dbc.Col(
                [
                    html.Label('Sensors', style={'margin-top': '1.5em'}),
                    dcc.Dropdown(
                        # options=None,
                        # value=None,
                        id="sensors_dropdown"
                    )
                ], xs=12, sm=6, md=4
            ),
        ]
    )


def get_layout():
    """Function to get Dash's "HTML" layout"""

    # A Bootstrap 4 container holds the rest of the layout
    return dbc.Container(
        [
            get_navbar(),
            get_body(), 
        ], 
    )

        

Lavoriamo dal basso verso l’alto, iniziando con get_layout(). Abbiamo aggiunto get_body() sotto la funzione navbar.

Ecco come appare ora il semplice sito Flask nel browser:

timescale-dash-flask-click-here-to-see-the-main-dash-page

Fare clic sul collegamento per visualizzare il sito Dash su /dash/:

NOTA: In questa fase il secondo e il terzo menu a discesa saranno vuoti. È stato popolato solo il primo menu a discesa. Per il secondo e il terzo menu a discesa, utilizzeremo i callback di Dash invece di popolarli nel layout iniziale. Rimanete sintonizzati…

timescale-dash-flask-navbar-and-body

Visualizza il sito in development mode

Ho saltato un passaggio importante: come avviare un sito Flask / Dash in modalità “sviluppo” in modo da poterlo visualizzare nel browser.

Aggiungiamo un file chiamato .flaskenv accanto al file .env nella cartella principale del progetto, con le seguenti tre righe:

            FLASK_APP=wsgi.py
FLASK_RUN_HOST=0.0.0.0
FLASK_RUN_PORT=5002
        

Flask cerca il file .flaskenv quando si avvia il sito. Le variabili d’ambiente specificano il file di partenza e le coordinate (indirizzo web o IP) dove sarà pubblicato il sito (ad esempio si inizializza una  FLASK_APP che è stata importata e creata da wsgi.py ed pubblicata sull’host 0.0.0.0 e porta 5002).

Quindi, una volta creato il file .flaskenv, si digita flask run e sul terminale e il sito sarà pronto. Si può visualizzare il sito collegandosi a http:// localhost:5002 da qualsiasi browser.

Tornando al codice

Tornando al file layout.py, la funzione get_body() restituisce un Bootstrap row e tre Boostrap, columns uno per ciascuno dei menu a discesa di Dash.

Per ora concentriamoci sulla prima colonna, vediamo che è presente un html.Label Dash HTML Component “, seguito da un dcc.Dropdown Dash Core Component“. Stiamo creato creando un Bootstrap HTML/CSS/JS solamente usando Python! Questa è la bellezza di Dash, ed è molto conveniente per i data scientist che cercano di produrre i propri modelli e dati.

La variabile types proviene dalla funzione get_sensor_types(), che interroga il nostro database TimescaleDB e restituisce i tipi di sensori specifici/univoci in un “elenco di dizionari”. Questo è reso possibile tramite cursor_factory=RealDictCursor (cioè restituisce le righe del database come comodi dizionari Python).

Molti Pythonistas amano interrogare i loro database usando SQLAlchemy , e anch’io lo uso molto spesso, anche se a volte ho voglia di solo sporcarmi le mani e scrivere un po’ di SQL, come ai vecchi tempi. La libreria psycopg2 ci consente di farlo molto facilmente. Entrambe sono ottime librerie, ben mantenute e testate sul campo.

Le altre due colonne e query nel nostro layout sono sostanzialmente le stesse della prima, quindi non le descriverò individualmente.

Callback in Dash

È ora di descrivere la logica dei callback, così possiamo far effettivamente funzionare il secondo e il terzo menu a discesa. A questo punto, nel codice del file layout.py il secondo e il terzo elenco a discesa dovrebbero essere vuoti.

Finalmente, quello che stavamo tutti aspettando: un po’ di divertente interattività in Dash! Di seguito il codice del file callbacks.pyQui è dove popoleremo il 2 ° e il 3 ° menu a discesa, in base al valore del primo menu a discesa:

            # /app/dashapp/callbacks.py

import dash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input, Output, State
from psycopg2.extras import RealDictCursor

# Local imports
from app.database import get_conn


def get_sensor_locations(type_):
    """Get a list of different locations of sensors"""
    sql = f"""
        --Get the labels and underlying values for the dropdown menu "children"
        SELECT 
            distinct 
            location as label, 
            location as value
        FROM sensors
        WHERE type = '{type_}';
    """
    conn = get_conn()
    with conn.cursor(cursor_factory=RealDictCursor) as cursor:
        cursor.execute(sql)
        # locations is a list of dictionaries that looks like this, for example:
        # [{'label': 'floor', 'value': 'floor'}]
        locations = cursor.fetchall()
    
    return locations


def get_sensors(type_, location):
    """
    Get a list of sensor dictionaries from our TimescaleDB database, 
    along with lists of distinct sensor types and locations
    """
    sql = f"""
        --Get the labels and underlying values for the dropdown menu "children"
        SELECT 
            location || ' - ' || type as label,
            id as value
        FROM sensors
        WHERE 
            type = '{type_}'
            and location = '{location}';
    """
    conn = get_conn()
    with conn.cursor(cursor_factory=RealDictCursor) as cursor:
        cursor.execute(sql)
        # sensors is a list of dictionaries that looks like this, for example:
        # [{'label': 'floor - a', 'value': 1}]
        sensors = cursor.fetchall()

    return sensors


def register_callbacks(dash_app):
    """Register the callback functions for the Dash app, within the Flask app""" 

    @dash_app.callback(
        [
            Output("locations_dropdown", "options"),
            Output("locations_dropdown", "value")
        ],
        [
            Input("types_dropdown", "value")
        ]
    )
    def get_locations_from_types(types_dropdown_value):
        """Get the location options, based on the type of sensor chosen"""

        # First get the location options (i.e. a list of dictionaries)
        location_options = get_sensor_locations(types_dropdown_value)

        # Default to the first item in the list, 
        # and get the "value" from the dictionary
        location_value = location_options[0]["value"]

        return location_options, location_value


    @dash_app.callback(
        [
            Output("sensors_dropdown", "options"),
            Output("sensors_dropdown", "value")
        ],
        [
            Input("types_dropdown", "value"),
            Input("locations_dropdown", "value")
        ]
    )
    def get_sensors_from_locations_and_types(types_dropdown_value, locations_dropdown_value):
        """Get the sensors available, based on both the location and type of sensor chosen"""

        # First get the sensor options (i.e. a list of dictionaries)
        sensor_options = get_sensors(types_dropdown_value, locations_dropdown_value)

        # Default to the first item in the list, 
        # and get the "value" from the dictionary
        sensor_value = sensor_options[0]["value"]

        return sensor_options, sensor_value


    return None

        

Ricordiamoci che la funzione register_callbacks(dash_app) è usata all’interno di dash_setup.py.

Concentriamoci sulla prima funzione di callback. Tutti i callback di Dash hanno funzioni Input() che li attivano e funzioni Output(), che sono gli elementi HTML che sono modificati dai callback.

Il primo parametro nelle funzioni Input()Output() è sempre l’id= dell’elemento, all’interno del file layout.py. Il secondo parametro è sempre la proprietà che stiamo cercando di modificare. Quindi, nella funzione di callback stiamo utilizzando il “valore” del menu a discesa “types_dropdown” come input per la funzione, e stiamo modificando sia le “opzioni” che il “valore” selezionato del menu a discesa “locations_dropdown”.

La seconda callback è solo leggermente più complicata della prima. Utilizza i valori di entrambi i primi due menu a discesa (ovvero il menu a discesa dei tipi e il menu a discesa delle posizioni) per filtrare le opzioni del sensore e il valore selezionato.

Inoltre la funzione get_sensors(type_, location) prevede una delle nostre query al database, che acquisisce i sensori unici corrispondenti ai filtri sul tipo e sulla posizione dei sensori. L’altra query del database è quasi identica, ad eccezione che prevede un solo filtro in base al tipo di sensore.

Implementiamo ora un bel grafico delle serie temporali, in modo da visualizzare i dati nel tempo del sensore selezionato tramite i filtri.

Aggiungiamo la seguente funzione get_chart_row() al tuo file layout.py, nella parte inferiore. Questo ci fornisce una riga / colonna Bootstrap in cui posizionare un grafico Dash tramite una callback:

            ...

def get_chart_row(): # NEW
    """Create a row and column for our Plotly/Dash time series chart"""

    return dbc.Row(
        dbc.Col(
            id="time_series_chart_col"
        )
    )


def get_layout():
    """Function to get Dash's "HTML" layout"""

    # A Bootstrap 4 container holds the rest of the layout
    return dbc.Container(
        [
            get_navbar(),
            get_body(), 
            get_chart_row(), # NEW
        ], 
    )

        

Vediamo ora come creare il grafico tramite le callback.

Innanzitutto assicuriamoci di aggiungere i seguenti import nel file callbacks.py e installiamole nel nostro ambiente virtuale con il comando pip install pandas plotly:

            import pandas as pd
import plotly.graph_objs as go

        

A questo punto dobbiamo aggiungere la query per acquisire i dati delle serie temporali anche nel file callbacks.py. Da notare che questa volta inseriamo i dati estratti all’interno di un DataFrame Pandas. Pandas è una libreria essenziale per i data scientist che lavorano con Python.

In poche parole, un Pandas DataFrame è una tabella composta da un array NumPy per ogni colonna. Descrivere Pandas e Numpy non è lo scopo di questo articolo, ma è importante ricordare che queste librerie sono estremamente ottimizzati per il lavoro in ambito della data science.

            def get_sensor_time_series_data(sensor_id):
    """Get the time series data in a Pandas DataFrame, for the sensor chosen in the dropdown"""

    sql = f"""
        SELECT 
            --Get the 3-hour average instead of every single data point
            time_bucket('03:00:00'::interval, time) as time,
            sensor_id,
            avg(temperature) as temperature,
            avg(cpu) as cpu
        FROM sensor_data
        WHERE sensor_id = {sensor_id}
        GROUP BY 
            time_bucket('03:00:00'::interval, time), 
            sensor_id
        ORDER BY 
            time_bucket('03:00:00'::interval, time), 
            sensor_id;
    """

    conn = get_conn()
    with conn.cursor(cursor_factory=RealDictCursor) as cursor:
        cursor.execute(sql)
        rows = cursor.fetchall()
        columns = [str.lower(x[0]) for x in cursor.description]

    df = pd.DataFrame(rows, columns=columns)
    
    return df

        

Infine, dobbiamo aggiungere il grafico Dash / Plotly. 

A tale scopo dobbiamo aggiungere una callback alla funzione register_callbacks(dash_app)Da notare che la callback aggiorna la proprietà “children” della colonna Bootstrap con ID “time_series_chart_col”.

Il callback utilizza un Input() e due State() come parametri della funzione. La differenza tra Input() State() consiste che la funzione viene chiamata solo se ci sono modifiche a Input(). Se la State() cambia, la funzione non viene chiamata, ma abbiamo ancora ha disposizione i loro valori all’interno della funzione.

Successivamente, prendiamo i dati della serie temporale (in un DataFrame, come accennato in precedenza) e consideriamo la colonna time per l’asse x. Aggiungiamo due grafici a linee (grafici a dispersione collegati da linee), quindi preleviamo due colonne dal dataframe, “temperatura” e “cpu”. Aggiungi anche una variabile per il titolo dei grafici.

Quindi creiamo due oggetti grafici Plotly (ad esempio go.Scatter) per i due grafici e li passiamo in input alla funzione get_graph() (che descriviamo tra poco).

Infine, la funzione restituisce un semplice HTML div con due righe Bootstrap (una riga Bootstrap deve contenere sempre almeno una colonna).

              @dash_app.callback(
        Output("time_series_chart_col", "children"),
        [Input("sensors_dropdown", "value")],
        [
            State("types_dropdown", "value"),
            State("locations_dropdown", "value")
        ]
    )
    def get_time_series_chart(
        sensors_dropdown_value, 
        types_dropdown_value, 
        locations_dropdown_value
    ):
        """Get the sensors available, based on both the location and type of sensor chosen"""

        df = get_sensor_time_series_data(sensors_dropdown_value)
        x = df["time"]
        y1 = df["temperature"]
        y2 = df["cpu"]
        title = f"Location: {locations_dropdown_value} - Type: {types_dropdown_value}"

        trace1 = go.Scatter(
            x=x,
            y=y1,
            name="Temp"
        )
        trace2 = go.Scatter(
            x=x,
            y=y2,
            name="CPU"
        )

        # Create two graphs using the traces above
        graph1 = get_graph(trace1, f"Temperature for {title}")
        graph2 = get_graph(trace2, f"CPU for {title}")

        return html.Div(
            [
                dbc.Row(dbc.Col(graph1)),
                dbc.Row(dbc.Col(graph2)),
            ]
        )
        

A questo punto non ci rimane che aggiungere la funzione la get_graph(). I grafici Plotly / Dash hanno molte opzioni, ma non lasciarti scoraggiare; sono facile da capire.

Il primo parametro disabilita il logo Plotly (totalmente opzionale). Successivamente nell’oggetto grafico c’è il parametro figure, che contiene due sottoparametri principali, datalayout. Ecco il riferimento completo alle API , che è una risorsa essenziale se stai lavorando con i grafici Plotly.

In layout, aggiungiamo alcune opzioni per il xaxis in modo da inserire alcuni rapidi filtri dato che stiamo costruendo un grafico di serie temporali. Di solito scelgo i pulsanti rangeselector sopra il grafico o il rangeslider sotto il grafico, ma in questo esempio li usiamo entrambi per mostrare le potenzialità di ploty. Preferisco i pulsanti rangeselector sopra il grafico. Se ti piace il trading di azioni, saprai che questi filtri temporali sono abbastanza comuni e intuitivi.

 

            def get_graph(trace, title):
    """Get a Plotly Graph object for Dash"""
    
    return dcc.Graph(
        # Disable the ModeBar with the Plotly logo and other buttons
        config=dict(
            displayModeBar=False
        ),
        figure=go.Figure(
            data=[trace],
            layout=go.Layout(
                title=title,
                plot_bgcolor="white",
                xaxis=dict(
                    autorange=True,
                    # Time-filtering buttons above chart
                    rangeselector=dict(
                        buttons=list([
                            dict(count=1,
                                label="1d",
                                step="day",
                                stepmode="backward"),
                            dict(count=7,
                                label="7d",
                                step="day",
                                stepmode="backward"),
                            dict(count=1,
                                label="1m",
                                step="month",
                                stepmode="backward"),
                            dict(step="all")
                        ])
                    ),
                    type = "date",
                    # Alternative time filter slider
                    rangeslider = dict(
                        visible = True
                    )
                )
            )
        )
    )

        

Abbiamo concluso l’implementazione delle callback e delle funzioni grafiche.

Avviamo la nostra applicazione, i grafici avranno il seguente aspetto:

timescale-dash-flask-finished-product

Questo è tutto! Abbiamo approfondito molti aspetti in questo tutorial diviso in tre parti.

Nella prima parte , abbiamo creato un database TimescaleDB e lo abbiamo popolato con i dati simulati delle serie temporali di sensori IoT. Abbiamo anche installato un’app Web PGAdmin per l’amministrazione del nostro database ed entrambe le applicazioni sono state distribuite utilizzando Docker-Compose, uno strumento fantastico per riprodurre facilmente ambienti e implementazioni.

Nella seconda parte, abbiamo combinato un’app Web Flask con un’app Web Dash in modo da poter avere il meglio da entrambe le librerie: Flask può praticamente implementare qualsiasi applicazione Web e Dash è ottimo per la produzione di app a pagina singola di data science senza bisogno di utilizzare JavaScript o React.

In questa terza parte, abbiamo descritto le funzioni di Dash, in modo particolare i callback interattivi del menu a discesa e i grafici Plotly per dare un assaggio di ciò che è possibile fare con Dash.

Spero che questa serie ti sia piaciuta.

A presto!

Grafici delle Serie Temporali con Dash, Flask, TimescaleDB e Docker – Parte 2

scienzadeidati articoli - Grafici delle Serie Temporali con Dash, Flask, TimescaleDB, and Docker - Parte 2

Questo è il secondo di tre articoli su TimescaleDB, Flask e Dash. Il primo articolo si è concentrato su come installare e configurare il database TimescaleDB tramite l’esecuzione in Docker, inoltre abbiamo configurato PgAdmin per la gestione del database. Questo articolo si concentra sul linguaggio Python, sulla creazione di un sito Web con Flask e quindi sull’integrazione del framework Web Dash all’interno di Flask.

Flask è un framework web Python molto popolare, leggero ed estensibile, quindi è molto adatto per applicazioni di data science. Dash è un framework Web basato su Flask, che utilizza React JavaScript dietro le quinte per creare applicazioni reattive e interattive a pagina singola (SPA), utilizzando i grafici generati con la libreria Plotly . Direi che Dash sta per Python come Shiny sta per R , in quanto entrambi si concentrano sulla produzione di data science e modelli di machine learning, senza che un utente debba imparare molto HTML, CSS e JavaScript. In genere i data scientist non sono ingegneri del software e le complessità della creazione di un’applicazione web a pagina singola sono troppo complicate e non valgono il loro tempo.

Questo tutorial mostra come ottenere il meglio da alcuni mondi diversi:

  1. Un’applicazione Flask per un normale sito web
  2. Un’elegante applicazione Dash a pagina singola che utilizza il meglio di React JavaScript
  3. Un modo per produrre un’applicazione di data science

Cominciamo con una semplice applicazione web Flask e quindi vediamo come integrare Dash. La parte 3 di questa serie approfondirà la creazione di grafici interattivi con ​​Dash.

Tutto il codice per questo tutorial può essere trovato qui su GitHub.

Parte 2 - Integrazione dei Framework Web Flask e Dash

Prima di iniziare un nuovo progetto con Python, dobbiamo sempre creare un ambiente virtuale Python3 dedicato. Usiamo solo python3 -m venv venv per creare un ambiente virtuale chiamato “venv” nella cartella principale del progetto. In questi giorni preferisco usare Poetry rispetto a Pip , ma Poetry non è al centro di questo articolo. Ora attiviamo l’ambiente virtuale con source venv/bin/activate su Linux / Mac o venv\Scripts\activate.bat su Windows. Dopo aver attivato l’ambiente virtuale, installiamo Flask, Dash, Dash Bootstrap Components e la libreria PostgreSQL psycopg2, con pip install flask dash dash-bootstrap-components psycopg2-binary.

Per creare un’applicazione Flask, iniziamo dal punto di ingresso più esterno o dal punto di partenza dell’applicazione. Nella cartella di primo livello, creiamo un wsgi.py file come segue. Questa è la pratica migliore, usando factory pattern per inizializzare Flask.

            # wsgi.py

from app import create_app

app = create_app()

        

Nel file wsgi.py si importando la funzione chiamata create_app presente all’interno di un pacchetto/libreria chiamato app, quindi creiamo una cartella app per ospitare la nostra applicazione Flask. All’interno della cartella dell’app, come per tutti i pacchetti Python, creiamo un file __init__.py:

            # /app/__init__.py

import os
import logging

# Third-party imports
from flask import Flask, render_template

# Local imports
from app import database
from app.dash_setup import register_dashapps


def create_app():
    """Factory function that creates the Flask app"""

    app = Flask(__name__)
    app.config['SECRET_KEY'] = os.getenv('SECRET_KEY')
    logging.basicConfig(level=logging.DEBUG)

    @app.route('/')
    def home():
        """Our only non-Dash route, to demonstrate that Flask can be used normally"""
        return render_template('index.html')

    # Initialize extensions
    database.init_app(app) # PostgreSQL db with psycopg2

    # For the Dash app
    register_dashapps(app)

    return app


        

Il file contiene la funzione create_app() necessaria  al file precedente  wsgi.py. Per ora possiamo ignorare le altre importazioni locali: ci ritorneremo tra poco.

All’interno della fuzione create_app(), iniziamo con le basi, istanziando l’istanza Flask() passandole il __name__ del file e impostando la SECRET_KEY… Chiave segreta?

Apriamo il file .env e aggiungiamo una variabile di ambiente SECRET_KEY in fondo al file, insieme alle altre variabili di ambiente:

            # .env

# For the Postgres/TimescaleDB database. 
POSTGRES_USER=postgres
POSTGRES_PASSWORD=password
POSTGRES_HOST=timescale
POSTGRES_PORT=5432
POSTGRES_DB=postgres
PGDATA=/var/lib/postgresql/data

# For the PGAdmin web app
[email protected]
PGADMIN_DEFAULT_PASSWORD=password

# For Flask
SECRET_KEY=long-random-string-of-characters-numbers-etc-must-be-unique # NEW

        
Tornando al nostro file __init__.py, impostiamo la registrazione di base e quindi aggiungiamo la prima “route” di Flask (la homepage principale), solo per dimostrare che abbiamo una normale applicazione Flask funzionante. Creiamo una cartella /app/templates per i modelli HTML e aggiungiamo un file index.html con i seguenti contenuti per la nostra home page:
            <html>
    <body>
        <h1 style="text-align: center;">
            Click <a href="/dash/">here</a> to see the Dash single-page application (SPA)
        </h1>
    </body>
</html>
        

Successivamente, inizializziamo il nostro database con database.init_app(app). “Database” è un modulo locale che abbiamo importato all’inizio, quindi vediamo come implementarlo.

Infine, in fondo alla funzione create_app(), richiamiamo la funzione register_dashapps(app) dal modulo dash_setup.py. Qui è dove inizializzeremo l’applicazione web Dash che utilizza il motore React JavaScript sotto il cofano. 

Connessione al Database

Creiamo il file database.py, accanto al file __init__.py. Seguiamo le best practice consigliate qui dal team di Flask. Lo scopo di questo modulo è creare alcune funzioni per aprire e chiudere una connessione al database TimescaleDB e garantire che la connessione venga chiusa da Flask alla fine della richiesta HTTP. La funzione “init_app (app)” è quella che viene richiamata nel file __init__.py attraverso la funzione create_app(). Da notare che teardown_appcontext(close_db) assicura che la connessione venga chiusa durante lo “smontaggio”. In futuro, quando avremo bisogno di dati dal database, richiameremo semplicemente get_conn() per ottenere una connessione al database ed eseguire le query SQL.

Nel caso ve lo stiate chiedendo, g è fondamentalmente un oggetto globale in cui memorizzate la connessione al database. È complicato, quindi non me ne parlerò, sappi solo che questa è la pratica migliore e goditi la vita. 😉 Va bene, ecco un link per ulteriori letture…

Se stai cercando un ottimo corso su Flask, che includa un’immersione profonda nella meccanica di Flask, consiglio vivamente questo corso di Patrick Kennedy.

            # /app/database.py


import os
import psycopg2
from flask import g


def get_conn():
    """
    Connect to the application's configured database. The connection
    is unique for each request and will be reused if this is called
    again.
    """
    if 'conn' not in g:
        g.conn = psycopg2.connect(
            host=os.getenv('POSTGRES_HOST'),
            port=os.getenv("POSTGRES_PORT"), 
            dbname=os.getenv("POSTGRES_DB"), 
            user=os.getenv("POSTGRES_USER"), 
            password=os.getenv("POSTGRES_PASSWORD"), 
            connect_timeout=5
        )
    
    return g.conn


def close_db(e=None):
    """
    If this request connected to the database, close the
    connection.
    """
    conn = g.pop('conn', None)

    if conn is not None:
        conn.close()
    
    return None


def init_app(app):
    """
    Register database functions with the Flask app. This is called by
    the application factory.
    """
    app.teardown_appcontext(close_db)

        

Integrazione con Dash

Passiamo ora a descrivere il modulo il dash_setup, dove è definita la funzione register_dashapps. Creiamo un file chiamato dash_setup.pyall’interno della cartella “/app”, accanto a __init__.py:

            # /app/dash_setup.py

import dash
from flask.helpers import get_root_path


def register_dashapps(app):
    """
    Register Dash apps with the Flask app
    """

    # external CSS stylesheets
    external_stylesheets = [
        'https://cdn.jsdelivr.net/npm/[email protected]/dist/css/bootstrap.min.css'
    ]
    
    # external JavaScript files
    external_scripts = [
        "https://code.jquery.com/jquery-3.5.1.slim.min.js",
        "https://cdn.jsdelivr.net/npm/[email protected]/dist/umd/popper.min.js",
        "https://cdn.jsdelivr.net/npm/[email protected]/dist/js/bootstrap.min.js",
    ]

    # To ensure proper rendering and touch zooming for all devices, add the responsive viewport meta tag
    meta_viewport = [{
        "name": "viewport", 
        "content": "width=device-width, initial-scale=1, shrink-to-fit=no"
    }]

    dashapp = dash.Dash(
        __name__,
        # This is where the Flask app gets appointed as the server for the Dash app
        server = app,
        url_base_pathname = '/dash/',
        # Separate assets folder in "static_dash" (optional)
        assets_folder = get_root_path(__name__) + '/static_dash/', 
        meta_tags = meta_viewport, 
        external_scripts = external_scripts,
        external_stylesheets = external_stylesheets
    )
    dashapp.title = 'Dash Charts in Single-Page Application'
    
    # Some of these imports should be inside this function so that other Flask
    # stuff gets loaded first, since some of the below imports reference the other
    # Flask stuff, creating circular references
    from app.dashapp.layout import get_layout
    from app.dashapp.callbacks import register_callbacks

    with app.app_context():

        # Assign the get_layout function without calling it yet
        dashapp.layout = get_layout

        # Register callbacks
        # Layout must be assigned above, before callbacks
        register_callbacks(dashapp)

    return None

        

La funzione  register_dashapps(app) viene passata all’istanza dell’applicazione Flask, che Dash assegnerà come “server”.

Per prima cosa, creeremo una sorta di template HTML passando alla classe dash.Dash()alcuni fogli di stile e script. Questi file .js e .css si trovano nella sezione “head” dei file HTML, quindi Dash li metterà lì per noi.

Useremo Bootstrap CSS per rendere eccezionale la nostra applicazione a pagina singola e funzionare alla grande sui telefoni cellulari. Bootstrap 4 richiede anche Popper e jQuery, quindi li includiamo secondo le linee guida per l’installazione di Bootstrap, disponibile qui.

Dopo aver inizializzato l’istanza Dash (dashapp), creeremo il suo layout HTML / CSS e i callback JavaScript (abbiamo React dietro le quinte).

Per evitare riferimenti circolari, importiamo il layout e i moduli di callback all’interno della funzione register_dashapps(app).

Per fare questo dobbiamo creare una cartella “dashapp” all’interno della cartella “app”, contenente tre nuovi file:

  1. __init__.py
  2. callbacks.py
  3. layout.py

Non preoccupiamoci per il file __init__.py: è lì, quindi Python (e tu) sa che la cartella fa parte del pacchetto del nostro progetto.

La parte 3 approfondisce il layout e i callback di Dash, quindi per ora impostiamo solo le basi.

Innanzitutto, layout.py conterrà per ora solo una barra di navigazione Bootstrap all’interno di un contenitore Bootstrap:

            # /app/dashapp/layout.py

import os

from flask import url_for
import dash_html_components as html
import dash_core_components as dcc
import dash_bootstrap_components as dbc
import psycopg2
from psycopg2.extras import RealDictCursor

# Local imports
from app.database import get_conn


def get_navbar():
    """Get a Bootstrap 4 navigation bar for our single-page application's HTML layout"""

    return dbc.NavbarSimple(
        children=[
            dbc.NavItem(dbc.NavLink("Blog", href="https://mccarthysean.dev")),
            dbc.NavItem(dbc.NavLink("IJACK", href="https://myijack.com")),
            dbc.DropdownMenu(
                children=[
                    dbc.DropdownMenuItem("References", header=True),
                    dbc.DropdownMenuItem("Dash", href="https://dash.plotly.com/"),
                    dbc.DropdownMenuItem("Dash Bootstrap Components", href="https://dash-bootstrap-components.opensource.faculty.ai/"),
                    dbc.DropdownMenuItem("Testdriven", href="https://testdriven.io/"),
                ],
                nav=True,
                in_navbar=True,
                label="Links",
            ),
        ],
        brand="Home",
        brand_href="/",
        color="dark",
        dark=True,
    )


def get_layout():
    """Function to get Dash's "HTML" layout"""

    # A Bootstrap 4 container holds the rest of the layout
    return dbc.Container(
        [
            # Just the navigation bar at the top for now...
            # Stay tuned for part 3!
            get_navbar(),
        ], 
    )

        

La funzione get_layout() viene richiamata dal nostro modulo dash_setup.py.

Non descriverò la funzione get_navbar() perché penso sia autoesplicativa, ma se vuoi approfondire in questo link trovi la documentazione.

I callback di Dash sono il fulcro della parte 3 di questa serie, quindi per ora implementiamo il file callbacks.py con quanto segue:

            # /app/dashapp/callbacks.py

import dash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input, Output, State


def register_callbacks(dash_app):
    """Register the callback functions for the Dash app, within the Flask app"""        

    return None

        

Siamo arrivati alla conclusione di questa seconda parte. Spero che finora ti sia piaciuta. Dai un’occhiata alla Parte 3 per un’analisi approfondita dei callback di Dash e dei grafici di Plotly.

A presto!

Grafici delle Serie Temporali con Dash, Flask, TimescaleDB e Docker – Parte 1

scienzadeidati articoli - Grafici delle Serie Temporali con Dash, Flask, TimescaleDB, and Docker - Parte 01

In questo tutorial in tre parti, ti mostrerò come creare un’applicazione reattiva a pagina singola interamente in Python, con grafici di serie temporali dinamici tramite Dash / Plotly, il framework Flask per siti Web e un database specializzato per le serie temporali chiamato TimescaleDB, che a sua volta è basato su PostgreSQL. Un bel boccone, ma questo è uno stack tecnologico piuttosto interessante che è facile da imparare e da programmare, poiché utilizza solo Python e SQL standard (senza JavaScript). Quindi è uno stack ideale per distribuire rapidamente applicazioni di data science.

La prima parte del tutorial si concentrerà sull’utilizzo di Docker per configurare il database TimescaleDB specializzato e PGAdmin per gestirlo. Creeremo alcuni dati IoT simulati e mostreremo alcune delle fantastiche funzionalità di TimescaleDB, che non troverai nel PostgreSQL standard.

La seconda parte si concentrerà sulla configurazione di un sito Web Python Flask che si integra con la straordinaria libreria Dash per la creazione di applicazioni a pagina singola (SPA) basate su JavaScript React. Ti mostrerò come integrare correttamente Dash in Flask, in modo da poter ottenere il meglio da entrambi i framework web.

La terza parte si concentrerà sull’utilizzo di Dash per creare grafici di serie temporali interattivi per il monitoraggio dei dati IoT o per mostrare la tua applicazione di data science.

Tutto il codice per questo tutorial può essere trovato qui su GitHub.

Uso Docker laddove possibile, per ambienti riproducibili e una distribuzione estremamente semplice utilizzando Docker Swarm, quindi se non hai familiarità con Docker, consulta la documentazione qui.

Parte 1 - TimescaleDB, PGAdmin e Docker

Per prima cosa, creiamo una rete Docker in modo che i nostri prossimi contenitori possano parlare tra loro:

            docker network create --attachable --driver bridge timescale_network
        
Successivamente, creiamo un database TimescaleDB locale utilizzando Docker-Compose. Questo avvierà rapidamente un database PostgreSQL locale con l’estensione TimescaleDB configurata automaticamente. Si usa il seguente file docker-compose.timescale.yml:
            # docker-compose.timescale.yml

version: '3.7'
services:
  timescale:
    image: timescale/timescaledb:1.7.4-pg12
    container_name: flaskapp_timescale
    volumes: 
      - type: volume
        # source: timescale-db # the volume name
        source: timescale_volume
        # target: the location in the container where the data are stored
        target: /var/lib/postgresql/data 
        read_only: false
      # Custom postgresql.conf file will be mounted (see command: as well)
      - type: bind
        source: ./postgresql_custom.conf
        target: /postgresql_custom.conf
        read_only: false
    env_file: .env
    environment: 
      POSTGRES_HOST: timescale
    command: ["-c", "config_file=/postgresql_custom.conf"]
    ports: 
      - 0.0.0.0:5432:5432
    networks:
      timescale_network:
    deploy:
      restart_policy:
        condition: on-failure

# Creates a named volume to persist our database data
volumes:
  timescale_volume:

# Joins our external network
networks:
  timescale_network:
    external: true
        

Nel precedente file Docker-Compose possiamo notare i seguenti aspetti:

  • Si utilizza il  timescale_network che abbiamo creato nel passaggio precedente.
  • Si utilizza un volume per rendere persistenti i dati del database, anche se il contenitore Docker viene rimosso o sostituito. Questo è molto comune per i database “Dockerizzati”.
  • Si utilizza la porta 5432 (questo sarà importante quando proveremo ad accedere al database).
  • Si utilizza un file di configurazione personalizzato e un file  .env per memorizzare le informazioni sensibili per la connessione al database, come la password del database. E’ quindi necessario creare questi due file.

Ecco il file di configurazione personalizzato, nel caso in cui desideri / devi modificare una di queste impostazioni in futuro. Il file è troppo lungo per essere inserito in un blocco di codice in questo articolo, quindi fai clic su questo collegamento , quindi copia e incolla il testo in un file chiamato postgresql_custom.conf e mettilo nella radice della cartella del progetto.

Successivamente, ecco un modello per il file .env, che puoi lasciare nella root della cartella del tuo progetto, insieme ai file Docker-Compose e a quelli di configurazione del database:

            # .env

# For the Postgres/TimescaleDB database. 
POSTGRES_USER=postgres
POSTGRES_PASSWORD=password
POSTGRES_HOST=timescale
POSTGRES_PORT=5432
POSTGRES_DB=postgres
PGDATA=/var/lib/postgresql/data
        

Ora che abbiamo aggiunto la configurazione personalizzata e i file .env, puoi avviare il database TimescaleDB con il seguente comando. Il -d esegue il comando in background (--detached).

            docker-compose -f docker-compose.timescale.yml up -d
        
Controlla i tuoi contenitori in esecuzione con docker container ls o la vecchia scuola docker ps. Se il contenitore si sta riavviando, controlla i log con docker logs <container id> e assicurati di aver configurato il file .env, il file di configurazione e la rete Docker da cui dipende. Infine, creiamo un ambiente PGAdmin amichevole per l’amministrazione del nostro database e l’esecuzione di SQL. Crea un file chiamato docker-compose.pgadmin.yml e aggiungi quanto segue:
            # docker-compose.pgadmin.yml

version: '3.7'
services:
  pgadmin:
    # Name of the container this service creates. Otherwise it's prefixed with the git repo name
    image: "dpage/pgadmin4:latest"
    container_name: flaskapp_pgadmin4
    restart: unless-stopped
    env_file: .env
    environment: 
      PGADMIN_LISTEN_PORT: 9000
    ports: 
      - 0.0.0.0:9000:9000
    volumes: 
      # So the database server settings get saved and stored even if the container is replaced or deleted
      - pgadmin:/var/lib/pgadmin
    networks:
      timescale_network:

volumes:
  pgadmin:

networks:
  timescale_network:
    external: true

        
Aggiungi le seguenti righe al tuo .env file per PGAdmin. Avrai bisogno di queste informazioni di accesso quando tenterai di accedere a PGAdmin nel browser web.
            # .env

# For the PGAdmin web app
[email protected]
PGADMIN_DEFAULT_PASSWORD=password
        

Avvia l’applicazione web PGAdmin (PostgreSQL Admin) con il seguente comando Docker:

            docker-compose -f docker-compose.pgadmin.yml up -d

        

Esegui di docker container lsnuovo per verificare se il contenitore PGAdmin è in esecuzione. Nota che abbiamo specificato una porta di 9000, quindi ora puoi accedere a PGAdmin su http://localhost:9000 o http://127.0.0.1:9000 .Accedi con il nome utente e la password che hai impostato nel tuo file .env.

Ora che hai effettuato l’accesso a PGAdmin, fai clic con il pulsante destro del mouse su “Server” e “Crea / Server…”. Chiamalo “TimescaleDB Locale” nella scheda “Generale” e digita quanto segue nella scheda “Connessione”:

  • Host : timescale (questo è il nome host del “Servizio” Docker definito nel primo file docker-compose.yml per il contenitore del database TimescaleDB)
  • Porta : 5432
  • Database di manutenzione : postgres
  • Nome utente : postgres
  • Password : password

Fai clic su “Salva” e dovresti essere connesso. Ora puoi fare doppio clic su “TimescaleDB Local” e puoi accedere alle tabelle del tuo database su “/ Databases / postgres / Schemas / public / Tables”. Molto intuitivo, eh? Nel menu “Strumenti”, fai clic su “Strumento di query” e sei pronto per iniziare a scrivere SQL.

Ora sei l’orgoglioso comandante di un database TimescaleDB, che è identico a un database PostgreSQL (“Il database open source più avanzato al mondo”, se credi nel loro marketing), tranne per il fatto che ora ha capacità speciali per gestire dati ad alta frequenza di serie temporali.

I dati delle serie temporali sono leggermente diversi dai normali dati relazionali per descrivere utenti e cose. I dati delle serie temporali possono arrivare in qualsiasi secondo o anche più volte al secondo, a seconda di ciò che si sta archiviando, quindi il database deve essere in grado di gestire molti inserimenti. Alcuni esempi sono i dati finanziari, come i prezzi di negoziazione del mercato azionario o dati di Internet of Things (IoT), solitamente per il monitoraggio di metriche ambientali come temperatura, pressione, umidità o qualsiasi altra cosa tu possa pensare. Di solito quando esegui query sui dati di serie temporali, sei interessato ai dati più recenti e di solito stai filtrando sulla colonna timestamp, quindi è assolutamente necessario indicizzarli. TimescaleDB è specializzato in questo genere di cose.

Creiamo uno speciale “Hypertable” di TimescaleDB e inseriamo alcuni dati con cui giocare. Ecco la documentazione . Ed ecco il tutorial da cui ottengo i dati SQL simulati.

In PGAdmin, se non ci sei già, nel menu “Strumenti”, fai clic su “Strumento di query” e digita il seguente SQL per creare due tabelle di dati IoT:

            CREATE TABLE sensors(
  id SERIAL PRIMARY KEY,
  type VARCHAR(50),
  location VARCHAR(50)
);

CREATE TABLE sensor_data (
  time TIMESTAMPTZ NOT NULL,
  sensor_id INTEGER,
  temperature DOUBLE PRECISION,
  cpu DOUBLE PRECISION,
  FOREIGN KEY (sensor_id) REFERENCES sensors (id)
);

CREATE EXTENSION IF NOT EXISTS timescaledb CASCADE


        

Ora vediamo una funzionalità speciale di TimeScaleDB che non puoi fare in un normale database PostgreSQL. Trasformeremo la tabella sensor_data  in una “Hypertable”. Dietro le quinte, TimescaleDB partizionerà i dati sulla dimensione temporale, semplificando il filtraggio, l’indicizzazione e l’eliminazione dei vecchi dati delle serie temporali.

Se sei arrivato a questo tutorial per sfruttare le caratteristiche uniche di TimescaleDB, di seguito vedrai dove avviene la magia.

Eseguiamo la seguente query in PGAdmin per creare l’hypertable, partizionato automaticamente sulla dimensione “time”:

            SELECT create_hypertable('sensor_data', 'time');

        

Ora che è stata creata la nostra tabella di serie temporali specializzata, creiamo un indice speciale sull’ID del sensore, poiché è molto probabile che filtreremo sia l’ID del sensore che l’ora.

            create index on sensor_data (sensor_id, time desc);
        

Aggiungiamo ora alcuni dati di esempio nella tabella “sensori”:

            INSERT INTO sensors (type, location) VALUES
    ('a','floor'),
    ('a', 'ceiling'),
    ('b','floor'),
    ('b', 'ceiling');

        

Ora la parte divertente: creiamo alcuni dati simulati per le serie temporali:

            INSERT INTO sensor_data 
  (time, sensor_id, cpu, temperature)
SELECT
  time,
  sensor_id,
  random() AS cpu,
  random()*100 AS temperature
FROM 
  generate_series(
    now() - interval '31 days', 
    now(), interval '5 minute'
  ) AS g1(time), 
  generate_series(1,4,1) AS g2(sensor_id);

        

Eseguiamo una semplice query di selezione per vedere alcuni dei nostri dati appena simulati:

            SELECT * 
FROM sensor_data
WHERE time > (now() - interval '1 day')
ORDER BY time;
        

Ecco un altro esempio di selezione dei dati aggregati (ovvero la media ad 1 ora, invece di vedere ogni singolo punto dati):

            SELECT 
  sensor_id,
  time_bucket('1 hour', time) AS period, 
  AVG(temperature) AS avg_temp, 
  AVG(cpu) AS avg_cpu 
FROM sensor_data 
GROUP BY 
  sensor_id, 
  time_bucket('1 hour', time)
ORDER BY 
  sensor_id, 
  time_bucket('1 hour', time);

        

Dal tutorial ufficiale di TimescaleDB, mostriamo altre due query. Innanzitutto, invece di una cronologia delle serie temporali, potremmo semplicemente desiderare i dati più recenti . Per questo, possiamo utilizzare la funzione “last()”:

            SELECT 
  time_bucket('30 minutes', time) AS period, 
  AVG(temperature) AS avg_temp, 
  last(temperature, time) AS last_temp --the latest value
FROM sensor_data 
GROUP BY period;

        

E, naturalmente, spesso vogliamo unire i dati delle serie temporali con i metadati (cioè i dati sui dati). In altre parole, otteniamo una posizione per ogni sensore, piuttosto che un ID sensore:

            SELECT 
  t2.location, --from the second metadata table
  time_bucket('30 minutes', time) AS period, 
  AVG(temperature) AS avg_temp, 
  last(temperature, time) AS last_temp, 
  AVG(cpu) AS avg_cpu 
FROM sensor_data t1 
INNER JOIN sensors t2 
  on t1.sensor_id = t2.id
GROUP BY 
  period, 
  t2.location;


        

TimescaleDB ha un’altra funzione molto utile chiamata “aggregazioni continue” per aggiornare continuamente ed efficientemente le visualizzazioni aggregate dei dati delle nostre serie temporali. Se desideriamo creare rapporti / grafici sui dati aggregati, il seguente codice per la creazione di viste fa al nostro caso:

            CREATE VIEW sensor_data_1_hour_view
WITH (timescaledb.continuous) AS --TimescaleDB continuous aggregate
SELECT 
  sensor_id,
  time_bucket('01:00:00'::interval, sensor_data.time) AS time,
  AVG(temperature) AS avg_temp, 
  AVG(cpu) AS avg_cpu
FROM sensor_data
GROUP BY 
  sensor_id,
  time_bucket('01:00:00'::interval, sensor_data.time)


        

Questo è tutto per la parte 1 di questo tutorial in tre parti su TimescaleDB, Dash e Flask. Ecco la parte 2 sull’integrazione di Dash e Flask. La parte 3 si concentrerà sulla creazione di grafici di serie temporali reattivi e interattivi in ​​Dash per la tua applicazione a pagina singola (SPA).

A presto!