Realizzazione di un’applicazione di web scraping con Python, Celery e Django

scienzadeidati articoli - Realizzazione applicazione di web scraping con Python Celery e Django

In questa è la 3°parte del tutorial relativo alla creazione di uno strumento di web scraping con Python, descriviamo come integrare il web scraper schedulato all’interno di un’applicazione web Django.

La 1° parte, Creazione di uno scraper di feed RSS con Python, illustra come utilizzare Requests e Beautiful Soup.

La 2° parte, Web scraping automatizzato con Python e Celery, descrive come pianificare le attività di scraping web con Celery, una coda di task.

Il codice di questo articolo è disponibile pubblicamente su GitHub.

Requisiti

In precedenza, abbiamo creato un semplice lettore di feed RSS che raccoglie informazioni da HackerNews utilizzando Requests e BeautifulSoup. Dopo aver creato lo script di scraping di base, abbiamo descritto come integrare Celery nell’applicazione per fungere da sistema di gestione delle attività. Usando Celery, siamo stati in grado di pianificare le attività di scraping in modo che siano effettuati periodicamente ad intervalli fissi: questo permette di eseguire lo script senza interazione umana.

Il prossimo passo è raggruppare le attività di scraping pianificate in un’applicazione Web utilizzando Django. In questo modo possiamo accedere a un database, visualizzare i dati su un sito Web e terminare la creazione di un’app di “scraping”. L’obiettivo di questo progetto è creare qualcosa di scalabile, simile a un aggregatore.

Questo articolo non è una guida dettagliata al web framework Django. E’ invece orientato verso un approccio “Hello World”, seguito dalla visualizzazione di contenuti acquisiti dall’app web.

Per raggiunge questo obiettivo dobbiamo utilizzare i seguenti strumenti:

  • Python 3.7+
  • Requests – per le richieste web
  • BeautifulSoup 4 – Strumento per il parsing  HTML
  • Un editor di testo (PyCharm o Visual Studio Code)
  • Celery – Coda asincrona di attività con messaggi distribuiti
  • RabbitMQ – Un broker di messaggi
  • lxml – Se si usa un ambiente virtuale
  • Django – Un framework  web con Python
  • Pipenv – Un pacchetto per gestire ambienti virtuali

Nota: tutte le dipendenze dell’applicazione sono elencate nel file Pipfile/ Pipfile.lock.

Obiettivi

Si vuole realizzare un’applicazione Web che utilizza un sistema di gestione dei task per raccogliere i dati e memorizzarli nel database.

articoli - applicazione scraping Celery Django
Quanto sopra mostra come un’ applicazione Django prevede l’invio di task sistema di code, che li esegue e salva gli eventi nel database. Mentre l’applicazione Django è in esecuzione, non è richiesto di eseguire manualment nessuna attività di web scraping. Di seguito l’elenco dei passaggi previsti per realizzare l’applicazione desiderata:
  1. Installare Django, il framework Python usato per creare la base dell’applicazione web
  2. Creare un progetto Django e avviare il server
  3. Generare l’app scraping per  acqusisire i dati
  4. Configurare celery.py e tasks.py per effettuare l’estrazione dei dati
  5. Integrare i dati con la view HomePage di Django
Nota: se sei a tuo agio con Django, vai al passaggio 4.

Inizializzazione

Per iniziare dobbiamo creare un ambiente virtuale per il progetto Django, e quindi creare lo starter. Questo codice è disponibile su GitHub di Scienzadeidati.com.

Il file Piplock specifca tutti i requisiti del progetto, in questo modo l’ambiente virtuale verrà avviato con tutti i pacchetti necessari.

				
					$ mkdir django_celery_web_scraping && cd django_celery_web_scraping
$ pipenv install requests bs4 lxml django celery
				
			

Inoltre, è necessario assicurarsi che RabbitMQ sia installato, come descritto nel precedente articolo.

Nota: in questo articolo stiamo usando Ubuntu, quindi i comandi potrebbero differire a seconda del sistema operativo. Inoltre, per brevità, abbiamo omesso il codice che non ha subito modifiche, usando ….

Creare un progetto Django e avviare il server

Il primo passo per la configurazione del progetto,  dobbiamo creare un’istanza di una shell pipenv, e quindi creere un progetto Django. Successivamente, dobbiamo iniziare la creazione dell’applicazione Django ed effettuare le generiche impostazioni.

				
					# django_web_scraping

$ pipenv shell
$ django-admin startproject django_web_scraping .
$ python manage.py createsuperuser
$ python manage.py makemigrations
$ python manage.py migrate
				
			

Tramite alcuni dei comandi precedenti, creiamo un’istanza della shell dell’ambiente virtuale per eseguire i comandi Django. Il comando startproject crea l’applicazione iniziale all’interno della directory che stiamo utilizzando . e quindi si eseguono gli altri comandi: createsuperuser, makemigrations, migrate.

E’ ora possibile avviare il server per mostrare che siamo operativi.

Nota: assicuriamoci che questi comandi siano eseguiti in una  shell <code>pipenv</code>.

				
					$ python manage.py runserver
				
			
Collegandosi all’indirizzo localhost:8000 possiamo vedere che il server è avviato e funzionante.
articoli - applicazione scraping Celery Django 1
Ora dobbiamo creare un URL in urls.py dove specificare la view della homepage.
				
					# urls.py 

from django.contrib import admin
from django.urls import path, include

from .views import HomePageView # new

urlpatterns = [
    path('', HomePageView.as_view(), name='home'), # homepage
    path('admin/', admin.site.urls),
]
				
			

Quanto sopra è una vista generica importata dal file <code>views.py</code> che  dobbiamo creare nella directory principale del progetto

				
					# django_web_scraping/views.py

from django.shortcuts import render
from django.views import generic

# Create your views here.

class HomePageView(generic.ListView):
    template_name = 'home.html'
    
				
			
Successivamente, dobbiamo creare la directory dei modelli, il modello HTML di base e il modello della home page
				
					$ mkdir templates && touch templates/base.html && touch templates/home.html
				
			
Per registrare i file modello, dobbiamo aggiungere la directory templates alle impostazioni di Django:
				
					# settings.py
TEMPLATES = [
    ...
    'DIRS': ['templates'], # new
    ...
]
    
				
			
Ora aggiungiamo un semplice codice HTML per il template base
				
					# base.html

{% load static %}

<!DOCTYPE html>
<html>
    <head>
        <meta charset="utf-8">
            <title>{% block title %}Django Web Scraping Example  
                   {% endblock title %}</title>
    </head>
    <body>
        <div class="container">
            {% block content %}
            {% endblock content %}
        </div>
    </body>
</html>
				
			
I contenuti di tutte le pagine dell’applicazione web sono inseriti nei contenitori contrassegnati con {% block %} del modello base.html.
				
					# home.html

{% extends 'base.html' %}

{% block content %}
Hello World
{% endblock content %}
				
			
Dopo aver definito i modelli, l’esempio “Hello World” è completato.

Genera l'app scraping per raccogliere i dati

In questao paragrafano descriviamo come creare l’applicazione di scraping e il modello di dati. Questo è integrato nel file settings.py e i suoi dati sono passati all’applicazione principale HomePageView.
				
					$ python manage.py startapp scraping
				
			
Registriamo l’applicazione all’interno delle impostazioni.
				
					# settings.py

INSTALLED_APPS [
    ...
    'scraping.apps.ScrapingConfig', # new
]
				
			
Quini dobbiamo creare il modello in cui salvara i dati, fortunatamente la struttura dati del feed RSS ha pochissimi campi.
				
					# models.py

from django.db import models

# Create your models here.
class News(models.Model):
    title = models.CharField(max_length=200)
    link = models.CharField(max_length=2083, default="", unique=True)
    published = models.DateTimeField()
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    source = models.CharField(max_length=30, default="", blank=True, null=True)
    
				
			

I campi del modello News hanno i seguenti significati:

  • title – Dati RSS strutturati
  • link – Il link dell’articolo
  • published – La data in cui l’articolo è stato pubblicato su HackerNews
  • created_at – La data di immissione dei dati, “now” per impostazione predefinita
  • updated_at – La data dell’ultimo aggiornamento dei dati
  • source – HackerNews (o qualsiasi altro sito che scegliamo di analizzare)

Dopo aver creato il modello, l’applicazione Django non viene caricata perché mancano le migrazioni (ovvero la creazione delle tabelle).

				
					$ python manage.py makemigrations
$ python manage.py migrate
				
			

Nota: non prevediamo nessun URL per questa app, poiché stiamo solo inviando i dati all’applicazione principale.

Configurazione del file celery.py

I passaggi precedenti in questo articolo hanno descritto le basi per costruire il progetto, vediamo ora  come integrare Celery e gli stessi tasks
Questa sezione si basa sul codice descritto negli articoli precedenti. Iniziamo con un file celery.py per l’applicazione Celery, quindi aggiungiamo i task dal codice base dell’articolo Web scraping automatizzato con Python e Celery

				
					$ touch django_web_scraping/celery.py
				
			

La configurazione di cui sopra deve essere posizionata all’interno della directory principale del progetto e fungerà da file di “impostazioni” per la coda dei task.

				
					# celery.py

import os
from celery import Celery
from celery.schedules import crontab # scheduler

# default django settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE','django_web_scraping.settings')
app = Celery('django_web_scraping')
app.conf.timezone = 'UTC'
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
				
			

Queste sono le impostazioni predefinite dalla documentazione di Celery, e prevedono che l’applicazione Celery utilizzi il modulo settings ed individuare automaticamente i task.

La seconda configurazione fondamentale prima di creare i task è specificare il file settings.py per il broker di messaggi (RabbitMQ) e Celery.

				
					# settings.py

# celery
CELERY_BROKER_URL = 'amqp://localhost:5672'
CELERY_RESULT_BACKEND = 'amqp://localhost:5672'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'
				
			

Includere task.py

I task definiti in tasks.py sono simili a quelli descritti nel precedente articolo. Le principali modifiche sono:

  • La funzione di salvataggio
  • Come richiamiamo gli oggetti

Anziché salvare i dati dello scraping nei file .txt, prevediamo di memorizzarli come voci nel database predefinito (SQLite).

Iniziamo con la funzione di scraping, per descrivere come  i dati sono estratti. Il seguente blocco di codice mostra l’intero task condiviso, con importazioni specifiche per questo task.

				
					# scraping/tasks.py

# scraping
import requests
from bs4 import BeautifulSoup
import json
from datetime import datetime
import lxml

# scraping function
@shared_task
def hackernews_rss():
    article_list = []
    try:
        print('Starting the scraping tool')
        # execute my request, parse the data using XML
        # parser in BS4
        r = requests.get('https://news.ycombinator.com/rss')
        soup = BeautifulSoup(r.content, features='xml')
        # select only the "items" I want from the data
        articles = soup.findAll('item')
    
        # for each "item" I want, parse it into a list
        for a in articles:
            title = a.find('title').text
            link = a.find('link').text
            published_wrong = a.find('pubDate').text
            published = datetime.strptime(published_wrong, '%a, %d %b %Y %H:%M:%S %z')
            # print(published, published_wrong) # checking correct date format
            # create an "article" object with the data
            # from each "item"
            article = {
                'title': title,
                'link': link,
                'published': published,
                'source': 'HackerNews RSS'
            }
            # append my "article_list" with each "article" object
            article_list.append(article)
            print('Finished scraping the articles')
    
            # after the loop, dump my saved objects into a .txt file
            return save_function(article_list)
    except Exception as e:
        print('The scraping job failed. See exception:')
        print(e)
				
			
I codice di cui sopra prevede di:
  • Inviare una richiesta al feed RSS di HackerNews, ottenere gli elementi elencati, e quindi restituire i dati XML.
  • Separare i dati XML in “elementi” utilizzando soup.findAll('item'), e quindi analizzare i dati utilizzando la libreria LXML.
  • Pulire i dati in formato JSON, prestando particolare attenzione al formato della data estratta da item per ogni articolo. Questo è importante per salvare gli articoli nel database.
  • Assicurarsi che le date siano in un formato accettato dal database.
  • Aggiungere l’articolo a un elenco di elementi.
  • Chiamare la save_function() con l’elenco di articoli come parametro.
Se l’attività di scraping ha esito negativo, gestiamo le informazioni da restituire tramite la funzione Exception. Successivamente, iniziamo a esaminare il metodo save_function() che è stato implementato nell’articolo precedente. Questo è stato adattato per utilizzare il modello News che è stato creato all’interno dell’applicazione di scraping.
				
					# scraping/tasks.py

@shared_task(serializer='json')
def save_function(article_list):
    print('starting')
    new_count = 0

    for article in article_list:
        try:
            News.objects.create(
                title = article['title'],
                link = article['link'],
                published = article['published'],
                source = article['source']
            )
            new_count += 1
        except Exception as e:
            print('failed at latest_article is none')
            print(e)
            break
    return print('finished')
				
			
La funzione save_function() utilizza il parametro article_list passato dalla funzione di scraping, e salva ogni oggetto article nel database. Nel repository Github abbiamo previsto una versione aggiornata della funzione save_function() che recupera il più recente articolo di HackerNews salvato nel database ed interrompe l’elaborazione.

Invio dei dati alla HomePageView

Dopo aver creato celery.py e tasks.py, siamo in grado di integrare i dati in HomePageView per mostrarli sull’applicazione web. Per iniziare, apriamo views.py presente nella root del progetto, quindi aggiungiamo il modello News al suo interno. Questo consente di chiamare gli oggetti tag article all’interno dei modelli Django.
				
					# django_web_scraping/views.py

from scraping.models import News # bring News into the views

class HomePageView(generic.ListView):
    template_name = 'home.html'
    context_object_name = 'articles' 
    # assign "News" object list to the object "articles"
    # pass news objects as queryset for listview
    def get_queryset(self):
        return News.objects.all()
				
			

Avvio e test dell'applicazione

Dopo avere aggiornato la HomePageView, il progetto è pronto per essere lanciato e testato. In modo analogo alla Parte 1 e alla Parte 2 di questa serie, dobbiamo usare più finestre di terminale.

Per avviare il progetto abbiamo bisogno di:

  • Avviare il servizio broker RabbitMQ.
  • Avviare il server Django.
  • Abilitare i task di Celery.

I passaggi precedenti richiedono più terminali, come descritto di seguito.

Terminale #1 – RabbitMQ

Innanzitutto, verifichiamo che non ci sia in esecuzione un’istanza di RabbitMQ.

Nota: utilizziamo sudo perché l’installazione predefinita non ha concesso le autorizzazioni appropriate.

				
					$ sudo rabbitmqctl shutdown
$ sudo rabbitmq-server start # start server
				
			

Terminale #2  – Django

Django è facile da avviare, iniziamo solo con il comando runserver. Usando Pipenv, eseguiamo il comando nella shell
				
					$ pipenv shell
$ python manage.py runserver
				
			
articoli - applicazione scraping Celery Django 3a

Terminale #3 – Celery

Ora che il progetto è in esecuzione,, possiamo abilitare i task di Celery

				
					$ celery -A django_web_scraping worker -B -l INFO
				
			
articoli - applicazione scraping Celery Django 4

Una volta che i servizi di cui sopra sono stati avviati, siamo in grado di controllare l’output dello scraping sulla homepage (raggiungibile all’indirizzo 127.0.0.1:8000).

Nella homepage sono visualizzati, in forma tabellare, i dati acquisiti dallo scraping e restituiti dai task di Celery che abbiamo creato. Se osserviamo l’output dei task, vediamo che stanno fallendo perché i dati non soddisfano il vincolo univoco (ad esempio, è un duplicato e non ci sono nuovi post).
Una modifica futura può essere prevedere l’esecuzione  di tasks.py a intervalli maggiori, perché il feed RSS probabilmente non avrà molti aggiornamenti ad intervalli di un minuto.

Conclusione

Abbiamo integrato con successo Django, Celery, RabbitMQ e le librerie di web scraping di Python per creare un lettore di feed RSS. Questo tutorial ha fornito una panoramica sull’aggregazione dei dati nella forma di applicazione web, simile a popolari siti (come Feedly).

 

Possibili sviluppi futuri

  • Aggregare altri siti Web o feed di notizie
  • Modificare save_function() in modo da evitare di salvare ogni singolo oggetto ad ogni scraping (meno chiamate al database!!).
  • Creare un proprio feed RSS, con i dati aggregati.

Web scraping automatizzato con Python e Celery

scienzadeidati articoli - Web scraping automatizzato Python Celery

Questa è la 2° parte del tutorial che descrive come creare uno strumento di web scraping con Python. In questo articolo vediamo come integrare Celery, un sistema di gestione delle attività, nel nostro progetto di scraping web.

La 1° parte, Creazione di uno scraper di feed RSS con Python, descrive come utilizzare Requests e Beautiful Soup.

La 3° parte di questa serie, Realizzazione di un’applicazione di web scraping con Python, Celery e Django, descrive come integrare uno strumento di web scraping nelle applicazioni web.

Requisiti

Nell’articolo precedente, abbiamo creato un semplice lettore di feed RSS che estrae informazioni da HackerNews utilizzando Requests e BeautifulSoup (vedi il codice su GitHub).

In questo articolo usiamo il codice come base per la creazione di un sistema di gestione delle attività e lo scraping pianificato.
Il successivo passaggio nella raccolta di dati da siti Web che cambiano frequentemente (ad esempio, un feed RSS che mostra un numero X di elementi alla volta), è quello di eseguire lo scraping su base regolare. Nell’esempio di scraping precedente, abbiamo utilizzato la riga di comando per eseguire il nostro codice; tuttavia, questa non è una soluzione scalabile. Per automatizzare l’esecuzione si può usare Celery per creare un sistema di pianificazione delle attività con esecuzione periodica.

Per raggiunge questo obiettivo dobbiamo utilizzare i seguenti strumenti:

Nota: tutte le dipendenze della libreria sono elencate nei file requirements.txt e Pipfile/ Pipfile.lock.

Come funziona Celery

Celery è un sistema di gestione delle attività, opera in combinazione con un broker di messaggi per svolgere un lavoro asincrono .

articoli - web scraping Celery

Quanto sopra illustra che il nostro produttore di attività (la nostra app di scraping web) passerà le informazioni sulle attività alla coda (Celery) per essere eseguite. Lo scheduler (Celery beat) li eseguirà come cron job, senza alcun lavoro aggiuntivo o interazione al di fuori dell’avvio dell’app Celery.

Obiettivi

Di seguito uno schema dei passaggi da prevedere per creare la nostra applicazione di scraping automatizzato:

  1. Installazione di Celery e RabbitMQ: Celery gestisce l’accodamento e l’esecuzione delle attività, mentre RabbitMQ gestirà i messaggi avanti e indietro
  2. Come avviare RabbitMQ e comprendere i log
  3. Creazione di un proof-of-concept “Hello World” con Celery per verificarne il funzionamento.
  4. Registrazione delle funzioni di scraping in tasks.py con Celery
  5. Sviluppare e gestire ulteriormente le attività di scraping
  6. Creazione ed esecuzione di una pianificazione per l’attività di scraping

Nota: le introduzioni a RabbitMQ e Celery sono piuttosto lunghe, se si ha già esperienza con questi strumenti si può saltare direttamente al punto 4 .

Inizializzazione

Iniziamo aprendo la directory del progetto, in questo caso è Web_Scraping_RSS_Celery_Django dell’articolo precedente. Se lo desideri, può essere clonato da GitHub tramite il link precedentemente menzionato.

Nota: stiamo usando Ubuntu, quindi i comandi potrebbero differire a seconda del sistema operativo.

Inoltre, per brevità, abbiamo omesso parti di codice replicato, usando ….
Infine, i requisiti del progetto possono essere installati utilizzando il comando pip come  nel seguente esempio.

				
					$ pip install celery
				
			


Perché Celery e RabbitMQ? Perché non un'altra tecnologia?

Utilizziamo Celery e RabbitMQ perché sono abbastanza semplici da configurare, testare e ridimensionare in un ambiente di produzione. Sebbene possiamo eseguire un’attività periodica utilizzando altre librerie, o semplicemente con i cron job generali, vogliamo costruire  qualcosa da riutilizzare nei prossimi progetti.

Una soluzione sarà più duratura e richiede meno manutenzione se usiamo una tecnologia che possiamo scalare nel prossimo progetto, imparando alcuni comandi e strumenti chiave man mano che aumentiamo gradualmente la complessità.

Configurazione di RabbitMQ

Far funzionare un server RabbitMQ è molto più semplice su Ubuntu rispetto a un ambiente Windows. Seguiremo la documentazione  ufficiale della guida all’installazione.

Ecco i comandi di installazione per Debian e Ubuntu:

				
					$ sudo apt-get update -y

$ sudo apt-get install curl gnupg -y

$ curl -fsSl https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc | sudo apt-key add -

$ sudo apt-get install apt-transport https

$ sudo tee /etc/apt/sources.list.d/bintray.rabbitmq.list <<EOF

$ deb https://dl.bintray.com/rabbitmq-erlang/debian bionic erlang

$ deb https://dl.bintray.com/rabbitmq/debian bionic main

$ EOF

$ sudo apt-get update -y

$ sudo apt-get install rabbitmq-server -y --fix-missing
				
			

Dopo aver installato RabbitMQ per la prima volta in un ambiente virtuale, si avvia automaticamente. Per verificare che il comando rabbitmq-server funzioni (quello da usare con Celery), è necessario disattivare il servizio.

Da notare che le autorizzazioni predefinite per l’installazione prevedono che Only root or rabbitmq should run rabbitmqctl shutdown, che può risultare strano. Per risolverlo, possiamo semplicemente eseguire il comando con sudo.

				
					$ sudo rabbitmqctl shutdown
				
			

E’ quindi possibile testare il server utilizzando il comando rabbitmq-server,

				
					$ sudo rabbitmq-server
				
			

Si ottiene il seguente output

articoli - web scraping Celery 1

NOTA: si può terminare il comando rabbitmq-server usando Control+C.

La configurazione di RabbitMQ su Windows richiede passaggi aggiuntivi. La documentazione ufficiale contiene una guida per l’installazione manuale.

Testare Celery con RabbitMQ

Prima di immergersi nella codifica di ogni progetto, è buona norma testare gli esempi di base in stile “Hello World” per i pacchetti e framework previsti dal progetto. Questo permette di acquisire una comprensione di base di cosa è possibile aspettarsi, insieme ad alcuni comandi del terminale da aggiungere alla cassetta degli attrezzi per quella specifica tecnologia.

In questo caso, Celery viene fornito con il proprio proof-of-concept “Hello World” sotto forma di un’attività (task) che effettua una semplice addizione aritmetica. Questo è disponibile in forma estesa sulla documentazione ufficiale di Celery. Di seguito viene descritto brevemente; tuttavia, se si desidera spiegazioni o approfondimenti, è consigliato leggere la documentazione ufficiale.

Ora che abbiamo installato e convalidato il broker RabbitMQ, possiamo iniziare a creare il file tasks.py. Questo contiene tutte le attività (task) da eseguire, sia che si tratti di una operazione aritmetica, di uno scraping web o di un salvataggio di utenti in un database. Vediamo ora le modifiche all’interno della directory del progetto.

				
					$ touch tasks.py
				
			

Per creare un task per effettuare un’addizione aritmetica, dobbiamo importare la libreria Celery e creare una funzione con il flag @app.task per consentire ai worker di Celery di ricevere l’attività nel sistema di code.

				
					# tasks.py

from celery import Celery

app = Celery('tasks') # defining the app name to be used in our flag

@app.task # registering the task to the app
def add(x, y):
    return x + y
				
			

Utilizzando il task add, possiamo iniziare a testarne l’esecuzione. È qui che le cose potrebbero iniziare a confondersi, poiché nel passaggio successivo sono necessari 3 terminali aperti contemporaneamente.

Iniziamo con una rapida spiegazione, quindi descriviamo il codice e visualizzare le schermate.

Spiegazione

Per completare il  test, eseguiamo il task Celery usando la riga di comando importando il tasks.py e  poi lanciarlo. Affinché i task siano ricevuti dalla coda, dobbiamo avere attivi i servizi Celery worker e RabbitMQ. Il server RabbitMQ funge da nostro broker di messaggi mentre il  worker Celery esegue le attività.

Indichiamo ogni passaggio con i numeri di terminale:

  1. Rabbit MQ
  2. Worker di Celery
  3. Esecuzione del task


Terminale #1

Iniziamo avviando il server RabbitMQ nel terminale n. 1.

				
					# RabbitMQ

$ sudo rabbitmq-server
				
			
articoli - web scraping Celery 2

Terminale #2

Successivamente, possiamo avviare il processo del worker Celery nel terminale n. 2. Descriviamo anche le impostazioni dettagliate per il  worker in modo da illustrare come apparirà l’output.

Nota: deve essere eseguito dalla directory del progetto.

				
					# Celery worker

$ celery -A tasks worker -l INFO
				
			

Analizziamo il precedente comando:

  • celery – La libreriache stiamo chiamando
  • -A tasks – Si esplicita che vogliamo l’app  tasks
  • worker – Avvio del processo worker
  • -l INFO – Garantisce di avere un dettagliato log degli eventi nella console

Per verificare se il worker è stato avviato correttamente, è necessario avere la riga concurrency: 4 (prefork) nel terminale.

Inoltre, notiamo che l’app [tasks] è stata importata, insieme alla registrazione dei task presenti in tasks.py. Il worker ha registrato una singola attività (1), tasks.add.

articoli - web scraping Celery 3

Terminale #3:

Successivamente, possiamo iniziare l’esecuzione del test nel terminale n. 3. Eseguiamo un ciclo per eseguire più task che vengono catturati dal  servizio worker. Lo realizziamo importando add da tasks.py, e quindi eseguendo un ciclo for.

Nota: dopo la riga add.delay(i, i), si deve usare Control+Invio per eseguire il comando.

				
					$ python
>>> from tasks import add # pulling in add from tasks.py
>>> for i in range(1000):
...    add.delay(i, i) # delay calls the task
				
			

Ora possiamo a vedere un grande blocco di output nel terminale n. 3 (l’esecuzione dell’attività Celery).

Questo mostra che il worker sta ricevendo il risultato dell’attività dal terminale n. 2.

articoli - web scraping Celery 4

Terminale 2:

Se controlliamo il worker Celery nel terminale n. 2, il processo che esegue l’addizione aritmetica, vediamo che sta rilevando ciascuna delle esecuzioni dell task.

articoli - web scraping Celery 5

Abbiamo dimostrato con successo che Celery e RabbitMQ sono installati correttamente. Questo getta le basi per gli altri task che  vogliamo implementare (es. web scraping) dimostrando come interagiscono Celery, Celery worker e RabbitMQ.
Ora che abbiamo coperto l’installazione e le nozioni di base, ci addentreremo nella tasks.pycreazione delle nostre attività di scraping web.

Creazione del task.py con Celery per il web scraping

L’esempio descritto nel paragrafo precedente ha aiutato a descrivere il processo da usare per eseguire i task utilizzando Celery, dimostrando anche come i task sono registrati tramite i worker di Celery.

Basandoci sul questo esempio, iniziamo creando i task di scraping. A tale scopo, per semplicità possiamo copiare il codice  dello script scraping.py, descritto nella 1° parte di questo tutorial, nel file tasks.py.

Quindi possiamo rimuovere la funzione def add(x, y) dell’esempio e copiare l’importazione delle librerie (Requests e BeautifulSoup), e le funzioni di scraping.

Nota: usiamo le stesse funzioni, solamente che sono state copiate in tasks.py.

				
					# tasks.py

from celery import Celery
import requests # pulling data
from bs4 import BeautifulSoup # xml parsing
import json # exporting to files

app = Celery('tasks')

# save function
def save_function(article_list):
    with open('articles.txt', 'w' as outfile:
        json.dump(article_list, outfile)
        
# scraping function
def hackernews_rss():
    article_list = []
    try:
        # execute my request, parse the data using the XML 
        # parser in BS4
        r = requests.get('https://news.ycombinator.com/rss')
        soup = BeautifulSoup(r.content, features='xml')
        # select only the "items" I want from the data      
        articles = soup.findAll('item')
        # for each "item" I want, parse it into a list
        for a in articles:
            title = a.find('title').text
            link = a.find('link').text
            published = a.find('pubDate').text
            # create an "article" object with the data
            # from each "item"
            article = {
                'title': title,
                'link': link,
                'published': published
                }
            # append my "article_list" with each "article" object
            article_list.append(article)
        # after the loop, dump my saved objects into a .txt file
        return save_function(article_list)
    except Exception as e:
        print('The scraping job failed. See exception: ')
        print(e)
				
			

Le funzioni di web scraping sono ora in tasks.py, insieme alle loro dipendenze. Il passaggio successivo è registrare i task con la app Celery, semplicemente specificando il decorator @app.task prima di ogni funzione.

				
					# tasks.py

... # same as above

@app.task
def save_function(article_list):
    ...
    
@app.task
def hackernews_rss():
    ...
				
			

Miglioramenti alle funzioni di scraping

Sebbene le funzioni di scraping già implementate abbiano dimostrato di funzionare per estrarre i dati dal feed RSS di HackerNews, abbiamo ancora qualche margine di miglioramento. Di seguito abbiamo descritto le modifiche da fare all’interno del set di strumenti di scraping prima di automatizzare l’esecuzione dello script.

  1. Salvare l’output in file .json con data e ora.
  2. Aggiungere una data  e ora created_at per ogni articolo.
  3. Aggiungere una  stringa source, nel caso in cui desideriamo fare lo scraping di altri siti

Le precedenti modifiche sono semplici perchè abbiamo già effettuato la maggior parte del lavoro di implementazione durante il primo articolo. Sebbene non sia significativo, lo scambio in .json sarà un po’ più facile da leggere rispetto a .txt. Inoltre i due campi aggiuntivi contengono informazione che rendono l’applicazione più “scalabile” quando si aggiungono altri feed per lo scraping e quando si analizzano i dati in un secondo momento.

Iniziamo con aggiornare la funzione save_function per produrre un file .json e aggiungere un timestamp. In questo modo si migliora la qualità del lavoro quando dobbiamo riutilizzare dati di scraping precedenti.

				
					# tasks.py

from datetime import datetime # for time stamps

... 

def save_function(articles_list):
    # timestamp and filename
    timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
    filename = 'articles-{}.json'.format(timestamp)
    # creating our articles file with timestamp
    with open(filename, 'w').format(timestamp) as outfile:
        json.dump(article_list, outfile)
        
				
			

Stiamo usando la funzione datetime.now().strftime(...) per creare un timestamp da aggiungere al file usando .format(timestamp).

Passando alle due modifiche all’interno della funzione hackernews_rss(), aggiungiamo alcune informazioni sulla fonte (HackerNews RSS) e un timestamp created_at. Si tratta di due semplici modifiche che ci aiutano a differenziare le informazioni nel caso si voglia aggiungere ulteriori funzioni di scraping.

				
					# tasks.py

...

def hackernews_rss():
    ...
        for a in articles:
            ...
            article = {
                ...
                'created_at': str(datetime.now()),
                'source': 'HackerNews RSS'
                }
        ...
				
			
Le modifiche precedenti illustrano l’aggiunta dei campi created_at e source. Un rapido suggerimento: se ometti amo la transizione str() lo script non verrà eseguito a causa di un errore Object of type datetime is not JSON serializable.

Pianificazione dei task con Celery:

Ora sfruttiamo i poteri di pianificazione dei task di Celery utilizzando beat_schedule. Questo ci consente di registrare i task con l’agente di pianificazione in orari specifici.
Un’ottima guida per la schedulazione di task è sicuramente all’interno della documentazione ufficiale . Ci sono anche alcuni esempi di pianificazione aggiuntivi  nel file tasks.py del repository GitHub di questo tutorial.

Prevediamo di eseguire il task di scraping ogni minuto, in modo da dimostrere come il worker Celery interagisce con  i task pianificati. Ciò non comporterà dati diversi, poiché questo feed RSS non viene aggiornato ogni minuto con  nuovi dati.
L’obiettivo di questo  esempio è mostrare l’ouput dei file degli articoli e una semplice pianificazione dei task.

Creazione dello scheduler

				
					# tasks.py

... 

from celery.schedules import crontab # scheduler

# scheduled task execution
app.conf.beat_schedule = {
    # executes every 1 minute
    'scraping-task-one-min': {
        'task': 'tasks.hackernews_rss',
        'schedule': crontab()
    }
}
...
				
			

La configurazione di cui sopra registra le pianificazioni dei task all’interno della stessa app Celery. All’avvio, siamo in grado di chiamare lo scheduler di Celery per ricontrollare ciò che è stato messo in coda.

Esecuzione dei task

Ora che il codice è stato completato, è ora di accendere il server RabbitMQ e avviare i  worker Celery.
Per questo esempio, utilizziamo 2 schede del terminale:

  1. Server RabbitMQ
  2. Worker di Celery

 

Terminale #1

Per avviare il server RabbitMQ (il broker di messaggi), usiamo lo stesso comando descritto in precedenza.

Nota: si può anche prevedere di ricontrollare che il nodo creato all’avvio sia spento, poiché non si avvia con i log e genererà un errore se non viene terminato in precedenza.

				
					$ sudo rabbitmqctl shutdown
$ sudo rabbitmq-server
				
			

Si ottiene un output simile a l precedente esempio (come il seguente screenshot).

Una volta avviato il server RabbitMQ, possiamo iniziare con il terminale n. 2.

Terminale #2

Modifichiamo leggermente il comando, poiché ora include una notazione per -B la quale chiama il worker che esegue il beat scheduler.

				
					$ celery -A tasks worker -B -l INFO
				
			
L’output della console mostra l’avvio dell’applicazione e (in attesa della schedulazione prevista) stampa le informazioni relative all’esecuzione dei task. Nel seguente screenshot:
  • Si registra il [tasks]
  • Avvio dello scheduler beat
  • Il worker MainProcess riceve un’attività Received task: tasks.hackernews_rss
  • Avvio di ForkPoolWorker ed esecuzione del task, quindi restituisce un risultato
articoli - web scraping Celery 7

Suggerimento: possiamo interrompere l’esecuzione del task pianificato con Control + C, dato che il task viene eseguito a tempo indeterminato.

Dopo che il task è stato eseguito correttamente, la funzione save_function() ha generato il file .json.

Conclusione

In questo articolo abbiamo ampliato con successo un semplice strumento di web scraping in modo di prevedere una pianificazione dell’esecuzione. Ciò garantisce che non abbiamo più bisogno di eseguire manualmente lo script di scraping e che possiamo “impostarlo e dimenticarlo”. Tramite la schedulazione dello script, l’applicazione sarà in grado di analizzare i siti alla ricerca di dati che cambiano in base a una pianificazione prestabilita (ad esempio, ogni 15 minuti) e restituire ogni volta nuovi dati.

Quali sono i prossimi passi?

Nella parte 3, descriviamo un’applicazione Django con integrazione di Celery e web scraping. Questo è un ottimo esempio di un’applicazione web che effettua lo scraping e popola un sito web con informazioni dello scraping. Il prodotto finale sarà un aggregatore di notizie che estrae da più feed RSS.

Ulteriori letture

Questo articolo ha descritto molte informazioni sull’esecuzione dei task tramite numerosi esempi. Sebbene siamo stati in grado di effettuare alcune esecuzioni programmate, non siamo entrati nei dettagli di ciascuna delle tecnologie che abbiamo utilizzato. Ecco alcuni ottimi articoli (alcuni su Medium, altri no) che approfondiscono maggiormente la magia di Celery.

Creazione di uno scraper di feed RSS con Python

scienzadeidati articoli - Costruire uno scraper di feed RSS con Python

Questo articolo è la prima parte 1 di un tutorial relativo alla creazione di un’applicazione di web scraping con Python. In questo articolo ci concentriamo ad approfondire le librerie Requests e BeautifulSoup.

Nella 2° parte del tutorial vedremo come schedulare lo scraping tramite Celery.

Nella 3° parte descriviamo come realizzazione di un’applicazione web scraping con Django.

Il codice di questo articolo è disponibile pubblicamente su GitHub.

Requisiti

Ho già utilizzato lo scraping web in diversi modi all’interno dei miei progetti, che si tratti di raccolta di dati per l’analisi, creazione di notifiche quando i siti vengono modificati o creazione di applicazioni web.

Questo articolo illustrerà un semplice  di feed RSS per HackerNews. Il feed RSS, disponibile a questo link, prevede aggiornamenti a intervalli regolari per i  nuovi post e attività sul sito.

Sebbene ciò possa essere ottenuto utilizzando un lettore RSS, questo è inteso come un  semplice esempio di utilizzo di Python, che può essere adattato facilmente ad altri siti Web.

In questo esempio usiamo:

Obiettivi

Di seguito uno schema dei passaggi da prevedere per creare la nostra applicazione:

  1. Creazione della directory  del progetto e del file scraping.py.
  2. Verificare che possiamo eseguire il ping del feed RSS che sarà oggetto dello scraping.
  3. Scraping del contenuto XML del sito.
  4. Analisi del contenuto utilizzando BS4.
  5. Output del contenuto in un file .txt

Inizializzazione

Il primo passo è la creazione della directory del progetto e accedere a quella directory tramite riga di comando. Una volta all’interno della directory di progetto, creiamo il file di progetto.

Nota: in questo caso stiamo usando Ubuntu, quindi questi comandi potrebbero differire a seconda del sistema operativo.

            $ mkdir web_scraping_example && cd web_scraping_example

$ touch scraping.py
        
Successivamente è necessario installare le librerie Python previste dai requisiti. Si può usare il comando pip, come nell’esempio seguente:
            $ pip install requests
$ pip install bs4
        

Importazione delle librerie

Ora che le basi del progetto sono state impostate, possiamo iniziare a scrivere il codice del web scraping.

All’interno del file scraping.py, si deve importare le librerie che abbiamo installato utilizzando pip.

            # scraping.py

import requests
from bs4 import BeautifulSoup
        

Quanto sopra ci consente di utilizzare le funzioni fornite dalle librerie Requests e BeautifulSoup.

Ora possiamo iniziare a testare la capacità di eseguire il ping del feed RSS di HackerNews e scrivere lo script di scraping.

Nota: di seguito non includeremo le righe di importazione. Tutto il codice che rimane uguale verrà annotato come ‘…’ sopra o sotto le nuove righe.

Testare la richiesta

Quando eseguiamo lo scraping web, iniziamo inviando una richiesta a un sito web. Per assicurarci di essere in grado di eseguire lo scraping, dobbiamo verificare che  possiamo stabilre una connessione al sito.

Iniziamo creando la nostra funzione di scraping di base. Questo sarà ciò a cui eseguiremo

            # scraping.py

# library imports
...

# scraping function
def hackernews_rss('https://news.ycombinator.com/rss'):
    try:
        r = requests.get()
        return print('The scraping job succeeded: ', r.status_code)
    except Exception as e:
        print('The scraping job failed. See exception: ')
        print(e)
        
        
print('Starting scraping')
hackernews_rss()
print('Finished scraping')
        

Nel codice di cui sopra,  si richiama la funzione requests.get(...) della libreria Requests per recuperare il sito Web da analizzare. Quindi si stampa a video lo stato della richiesta, contenuto nel parametro r.status_code, per verificare che il sito Web sia stato chiamato correttamente.
Inoltre, abbiamo inserito tutto dentro un try: except: in modo da rilevare eventuali errori che si potremmo verificare in seguito.

Una volta eseguito lo script, si ottiene un codice di stato pari a 200. Questo conferma che siamo in grado di eseguire il ping del sito e “ottenere” informazioni.

            $ python scraping.py

Starting scraping
The scraping job succeeded: 200
Finsihed scraping
        

Scraping del contenuto

Il nostro script ha restituito un codice di stato pari a 200, siamo pronti per iniziare ad estrarre il contenuto XML dal sito. A tal fine, dobbiamo utilizzare BS4 insieme a Requests.
            # scraping.py

...

def hackernews_rss():
    try:
        r = requests.get('https://news.ycombinator.com/rss')
        soup = BeautifulSoup(r.content, features='xml')
        
        return print(soup)
...
        

Questo codice assegne il contenuto XML acquisito da HackerNews alla variabile soup. Stiamo usando r.content per passare l’XML restituito a BeautifulSoup, che analizzeremo nel prossimo paragrafo.

Una cosa fondamentale da notare è che stiamo sfruttando il parametro features='xml', questo parametro varia a seconda dei dati da acquisire (ad esempio, in un scraper HTML si dichiarerà come ‘HTML’).

L’output di quanto sopra sarà un gran disordine di contenuti che non ha molto senso. Questo è solo per visualizzare quali informazioni stiamo estraendo con successo dal sito web.

Analisi dei dati

Abbiamo descritto come possiamo estrarre l’XML dal feed RSS di HackerNews. E’ quindi il momento di analizzare le informazioni.

Abbiamo scelto di usare il feed RSS perché è molto più semplice da analizzare rispetto alle informazioni html del sito Web, poiché non dobbiamo preoccuparci degli elementi HTML nidificati e di individuare le informazioni esatte.

Iniziamo osservando la struttura del feed:

            <item>
    <title>...</title>
    <link>...</link>
    <pubDate>...</pubDate>
    <comments>...</comments>
    <description>...</description>
</item>
        

Ciascuno dei tag disponibili sul feed RSS segue la struttura di cui sopra, contenente tutte le informazioni all’interno dei tag <item>...</item>.

Sfruttiamo la coerenza dei tag item per analizzare le informazioni.

            # scraping.py 

...

def hackernews_rss():
    article_list = []
    try:
        r = requests.get('https://news.ycombinator.com/rss')
        soup = BeautifulSoup(r.content, features='xml')
        articles = soup.findAll('item')        
        for a in articles:
            title = a.find('title').text
            link = a.find('link').text
            published = a.find('pubDate').text
            article = {
                'title': title,
                'link': link,
                'published': published
                }
            article_list.append(article)
        return print(article_list)
...
        

In questo codice, effettuiamo un controllo di tutti gli item in articles = soup.findAll('item'). Questo consente di estrarre ciascuno dei tag <item>...</item> dall’XML che abbiamo acquisito.

Ciascuno degli articoli viene analizzato utilizzando il ciclo: for a in articles:, questo consente di analizzare le informazioni in variabili separate e aggiungerle a un dizionario vuoto che abbiamo creato prima del ciclo.
BS4 ha scomposto l’XML in una stringa, consentendoci di chiamare la funzione .find() per cercare i tag in ciascuno degli oggetti. Usando .text siamo in grado di estrarre gli elementi <tag>...</tag> e salvare solamente il testo.
I risultati sono inseriti in un elenco tramite il comando article_list.append(article) in modo da potervi accedere in seguito.

Infine inseriamo la stampa della lista degli articoli in modo da visualizzare un output durante l’esecuzione dello script di scraping.

Output in un file

Il feed RSS è stato ora inviato correttamente in una funzione print() per visualizzare l’elenco prodotto a termine del parsing. Ora possiamo inserire i dati in un file .txt, in modo che possano riessere utilizzati in future analisi e altre attività relative ai dati. Utilizziamo la libreria JSON per la visualizzazione dei dati più semplice per noi; tuttavia, descriviamo anche un esempio senza la libreria JSON.

Iniziamo creando un’altra funzione def save_function(): che elabora l’elenco prodotto dalla funzione hackernews_rss(). In questo modo sarà più facile apportare modifiche in futuro.

            # scraping.py
import json

...

def save_function(article_list):
    with open('articles.txt', 'w') as outfile:
        json.dump(article_list, outfile)

...
        

Quanto sopra utilizza la libreria JSON per scrivere l’output dello scraping nel file articles.txt. Questo file verrà sovrascritto durante l’esecuzione dello script.

Un altro metodo per scrivere nel file .txt prevede un ciclo for:

            # scraping.py

...

def save_function(article_list):
    with open('articles.txt', 'w') as f:
        for a in article_list:
            f.write(a+'\n')
        f.close()
        
...
        
Ora che abbiamo creato la funzione save_function(), possiamo adattare la funzione di scraping per salvare i dati.
            # scraping.py

...

def hackernews_rss():
    ...
    try:
        ...
        return save_function(article_list)

...
        

Modificando il comando return print(article_list) in return save_function(article_list) siamo in grado di inserire i dati in un file .txt.

L’esecuzione dello script produrrà ora un file .txt dei dati acquisiti dal feed RSS di HackerNews.

Conclusione

Abbiamo creato con successo uno script Python per il di scraping dei feed RSS utilizzando Requests e BeautifulSoup. Questo ci consente di analizzare le informazioni XML in un formato leggibile con cui lavorare in futuro.

Questo script è la base per costruire alcune funzionalità interessanti:

  • Scraping di informazioni più complesse utilizzando elementi HTML.
  • Utilizzo di Selenium per lo scraping di siti dove il rendering lato client rende inutile l’uso di Requests.
  • Creazione di un’applicazione Web che acquisirà i dati dello scraping e li visualizzerà (ad esempio, un aggregatore)
  • Estrazione di dati da siti Web a seconda di una pianificazione e schedulazione.


Sebbene questo articolo abbia  descritto le basi del web scraping, possiamo iniziare ad approfondire alcuni dettagli.

Possiamo inserire questo script all’interno di una sequenza di altri script, schedulati tramite  Celery, oppure possiamo aggregare le informazioni su un’applicazione web utilizzando Django.

Questo articolo fa parte di una serie in 3 parti in cui descriviamo semplici esempi di web scraping e aggregazione su base programmata.

Creazione di Microservizi con Python e FastAPI

scienzadeidati articoli - Creazione di Microservizi con Python e FastAPI

Come sviluppatore Python potresti aver sentito parlare del termine microservizi e desideri creare da solo un microservizio Python. I microservice sono un’ottima architettura per la creazione di applicazioni altamente scalabili. Prima di iniziare a creare l’applicazione usando i microservice, è necessario conoscere i vantaggi e gli svantaggi dell’uso dei microservizi. In questo articolo imparerai i vantaggi e gli svantaggi dell’utilizzo dei microservice. Imparerai anche come creare il tuo microservice e distribuirlo utilizzando Docker Compose.

In questo tutorial vedremo:

  • Quali sono i vantaggi e gli svantaggi dei microservizi.
  • Perché dovresti creare microservice con Python.
  • Come creare API REST utilizzando FastAPI e PostgreSQL.
  • Come creare microservice utilizzando FastAPI.
  • Come eseguire i microservice usando docker-compose.
  • Come gestire i microservice utilizzando Nginx.

Prima creeremo una semplice API REST usando FastAPI e poi utilizzeremo PostgreSQL come nostro database. Estenderemo quindi la stessa applicazione a un microservizio.

 

Introduzione ai microservice

Il microservice è un approccio per suddividere grandi applicazioni monolitiche in singole applicazioni specializzate in uno specifico servizio/funzionalità. Questo approccio è spesso noto come architettura orientata ai servizi o SOA.

Nell’architettura monolitica, ogni logica di business risiede nella stessa applicazione. I servizi dell’applicazione come la gestione degli utenti, l’autenticazione e altre funzionalità utilizzano lo stesso database.

In un’architettura di microservice, l’applicazione è suddivisa in diversi servizi separati che vengono eseguiti in processi separati. Esiste un database diverso per le diverse funzionalità dell’applicazione e i servizi comunicano tra loro utilizzando HTTP, AMQP o un protocollo binario come TCP, a seconda della natura di ciascun servizio. La comunicazione tra servizi può essere eseguita anche utilizzando le code di messaggi come RabbitMQ, Kafka o Redis.

 

Vantaggi dei microservice

L’architettura a microservizi offre molti vantaggi. Alcuni di questi vantaggi sono:

  • Un’applicazione debolmente accoppiata significa che i diversi servizi possono essere costruiti utilizzando le tecnologie più adatte alle singole funzioni e specificità. Quindi, il team di sviluppo non è vincolato alle scelte fatte durante l’avvio del progetto.
  • Poiché i servizi sono responsabili di funzionalità specifiche, la comprensione e il controllo dell’applicazione è più semplice e facile.
  • Anche il ridimensionamento dell’applicazione diventa più semplice perché se uno dei servizi richiede un utilizzo elevato della GPU, solo il server che contiene quel servizio deve avere una GPU elevata e gli altri possono essere eseguiti su un server normale.
 

Svantaggi dei microservice

L’architettura dei microservizi non è un proiettile d’argento che risolve tutti i tuoi problemi, ha anche i suoi svantaggi. Alcuni di questi inconvenienti sono:

  • Poiché diversi servizi utilizzano un diverso database, le transazioni che coinvolgono più di un servizio devono gestire la consistenza dei dati.
  • La suddivisione perfetta dei servizi è molto difficile da ottenere al primo tentativo e questo deve essere ripetuto prima di ottenere la migliore separazione possibile dei servizi.
  • Poiché i servizi comunicano tra loro attraverso l’uso dell’interazione di rete, ciò rende l’applicazione più lenta a causa della latenza della rete e del servizio.
 

Perché Microservice in Python?

Python è uno strumento perfetto per la creazione di microservizi perché questo linguaggio ha una semplice curva di apprendimento, tonnellate di librerie e pacchetti già implementati e una grande community di utilizzatori. Grazie all’introduzione della programmazione asincrona in Python, sono emersi framework web con prestazioni alla pari con GO e Node.js.

 

Introduzione a FastAPI

FastAPI è un moderno framework Web ad alte prestazioni, dotato di tantissime funzioni interessanti come la documentazione automatica basata su OpenAPI e la libreria di convalida e serializzazione integrata. In questo link puoi leggere l’elenco di tutte le fantastiche funzionalità presenti FastAPI.

 

Perché FastAPI

Alcuni dei motivi per cui penso che FastAPI sia un’ottima scelta per la creazione di microservizi in Python sono:

  • Documentazione automatica
    Supporto Async/Await
  • Convalida e serializzazione integrate
  • Tipo 100% annotato, quindi il completamento automatico funziona alla grande
 

Installazione FastAPI

Prima di installare FastAPI è necessario creare una nuova directory movie_service e creare un nuovo ambiente virtuale all’interno della directory appena creata usando virtualenv.

Se non l’hai già installato virtualenv:

            pip install virtualenv
        
Ora, si crea un nuovo ambiente virtuale.
            virtualenv env
        

Nei sistemi Mac/Linux si può attivare l’ambiente virtuale usando il comando:

            source ./env/bin/activate
        
Gli utenti Windows possono invece eseguire questo comando:
            .\env\Scripts\activate
        
Infine, siamo pronti per installare FastAPI eseguendo il seguente comando:
            pip install fastapi
        

Poiché FastAPI non viene fornito con un service integrato, è necessario installare uvicorn per eseguire il service. uvicorn è un server ASGI che ci consente di utilizzare le funzionalità async/await.

Per installare uvicorn si può usare il comando:

            pip install uvicorn
        

Creazione di un semplice REST API utilizzando FastAPI

Prima di iniziare a creare un microservice utilizzando FastAPI, impariamo le basi di FastAPI. Creiamo una nuova directory zcode>app e un nuovo file main.py all’interno della directory appena creata.

Aggiungiamo il seguente codice in main.py.

            #~/movie_service/app/main.py

from fastapi import FastAPI

app = FastAPI()


@app.get('/')
async def index():
    return {"Real": "Python"}
        

E’ necessario importare e creare un’istanza di FastAPI e quindi registrare l’endpoint radice / che restituisce un file JSON.

È possibile eseguire il server delle applicazioni utilizzando uvicorn app.main:app --reload. Qui app.main indica che si sta usando il file main.py all’interno della directory app e :app indica a FastAPI il nome della nostra istanza.

Possiamo accedere all’app da http://127.0.0.1:8000. Per accedere alla documentazione automatica bisogna andare su http://127.0.0.1:8000/docs. Possiamo giocare e interagire con l’API dal browser stesso.

Aggiungiamo alcune funzionalità CRUD alla nostra applicazione. Aggiorniamo il main.py in modo che assomigli a quanto segue:

            #~/movie_service/app/main.py

from fastapi import FastAPI
from pydantic import BaseModel
from typing import List

app = FastAPI()

fake_movie_db = [
    {
        'name': 'Star Wars: Episode IX - The Rise of Skywalker',
        'plot': 'The surviving members of the resistance face the First Order once again.',
        'genres': ['Action', 'Adventure', 'Fantasy'],
        'casts': ['Daisy Ridley', 'Adam Driver']
    }
]

class Movie(BaseModel):
    name: str
    plot: str
    genres: List[str]
    casts: List[str]


@app.get('/', response_model=List[Movie])
async def index():
    return fake_movie_db
        

Come puoi vedere hai creato una nuova classe Movie che estende la classe BaseModel di Pydantic.

Il modello Movie contiene il nome, la foto, il genere e il cast. Pydantic è integrato con FastAPI che rende la creazione di modelli e la convalida delle richieste un gioco da ragazzi.

Se andiamo alla pagina della documentazione possiamo vedere che sono descritti i campi del nostro modello nella sezione di risposta di esempio. Questo è possibile perché abbiamo definito il response_model nella definizione del percorso nel decoratore @app.get.

Ora aggiungiamo l’endpoint per aggiungere un film al nostro elenco di film.

Aggiungiamo una nuova definizione di endpoint per gestire la richiesta POST.

            @app.post('/', status_code=201)
async def add_movie(payload: Movie):
    movie = payload.dict()
    fake_movie_db.append(movie)
    return {'id': len(fake_movie_db) - 1}
        

Ora apriamo il browser e testiamo la nuova API. Proviamo ad aggiungere un film con un campo non valido o senza i campi obbligatori e verifichiamo che la convalida venga gestita automaticamente da FastAPI.

Aggiungiamo un nuovo endpoint per aggiornare il film.

            @app.put('/{id}')
async def update_movie(id: int, payload: Movie):
    movie = payload.dict()
    movies_length = len(fake_movie_db)
    if 0 <= id <= movies_length:
        fake_movie_db[id] = movie
        return None
    raise HTTPException(status_code=404, 
                        detail="Movie with given id not found")
        

Ecco l’indice id della nostra lista fake_movie_db.

Nota: ricorda di importare HTTPException da Fastapi

Ora possiamo anche aggiungere l’endpoint per eliminare il film.

            @app.delete('/{id}')
async def delete_movie(id: int):
    movies_length = len(fake_movie_db)
    if 0 <= id <= movies_length:
        del fake_movie_db[id]
        return None
    raise HTTPException(status_code=404, 
                        detail="Movie with given id not found")
        
Prima di andare avanti, strutturiamo la nostra app in un modo migliore. Creiamo una nuova cartella api all’interno di app e creiamo un nuovo file movies.py all’interno della cartella creata di recente. Spostiamo tutti i codici relativi alle route da main.py a movies.py. Quindi, movies.py dovrebbe essere simile al seguente:
            #~/movie-service/app/api/movies.py

from typing import List
from fastapi import Header, APIRouter

from app.api.models import Movie

fake_movie_db = [
    {
        'name': 'Star Wars: Episode IX - The Rise of Skywalker',
        'plot': 'The surviving members of the resistance face the First Order once again.',
        'genres': ['Action', 'Adventure', 'Fantasy'],
        'casts': ['Daisy Ridley', 'Adam Driver']
    }
]

movies = APIRouter()

@movies.get('/', response_model=List[Movie])
async def index():
    return fake_movie_db

@movies.post('/', status_code=201)
async def add_movie(payload: Movie):
    movie = payload.dict()
    fake_movie_db.append(movie)
    return {'id': len(fake_movie_db) - 1}

@movies.put('/{id}')
async def update_movie(id: int, payload: Movie):
    movie = payload.dict()
    movies_length = len(fake_movie_db)
    if 0 <= id <= movies_length:
        fake_movie_db[id] = movie
        return None
    raise HTTPException(status_code=404, detail="Movie with given id not found")

@movies.delete('/{id}')
async def delete_movie(id: int):
    movies_length = len(fake_movie_db)
    if 0 <= id <= movies_length:
        del fake_movie_db[id]
        return None
    raise HTTPException(status_code=404, detail="Movie with given id not found")
        

Qui abbiamo registrato un nuovo percorso API utilizzando APIRouter di FastAPI.

Inoltre, creiamo un nuovo file models.py all’interno di api in cui definiamo i nostri modelli Pydantic.

            #~/movie-service/api/models.py

from typing import List
from pydantic import BaseModel

class Movie(BaseModel):
    name: str
    plot: str
    genres: List[str]
    casts: List[str]

        
Ora dobbiamo indicare a FastAPI dove trovare i persorsi web. A tale scopo dobbiamo registrare all’interno di main.py il nuovo file con le “routes”.
            #~/movie-service/app/main.py

from fastapi import FastAPI

from app.api.movies import movies

app = FastAPI()

app.include_router(movies)
        
Infine, la struttura della directory della nostra applicazione è simile alla seguente:
            movie-service
├── app
│   ├── api
│   │   ├── models.py
│   │   ├── movies.py
│   |── main.py
└── env
        

Prima di proseguire è bene assicurarsi che l’applicazione funzioni correttamente.

 

Utilizzo del database PostgreSQL con FastAPI

In precedenza abbiamo usato lista di stringhe Python per simulare un elenco di film, ma ora siamo pronti per utilizzare un database reale a tale questo scopo. In particolare, in questa applicazione useremo PostgreSQL. Installaremo PostgreSQL, se non l’hai già fatto. Dopo aver installato PostgreSQL creeremo un nuovo database che chiameremo movie_db.

Utilizzeramo encode/database per connettersi al database utilizzando il supporto async e await.

Installa la libreria richiesta utilizzando:

            pip install 'databases[postgresql]'
        

questo comando installerà anche sqlalchemy e asyncpg, che sono necessari per lavorare con PostgreSQL.

Creiamo un nuovo file all’interno di api e lo chiamiamo db.py. Questo file conterrà il modello del database reale per il REST API.

            #~/movie-service/app/api/db.py

from sqlalchemy import (Column, Integer, MetaData, String, Table,
                        create_engine, ARRAY)

from databases import Database

DATABASE_URL = 'postgresql://movie_user:movie_password@localhost/movie_db'

engine = create_engine(DATABASE_URL)
metadata = MetaData()

movies = Table(
    'movies',
    metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String(50)),
    Column('plot', String(250)),
    Column('genres', ARRAY(String)),
    Column('casts', ARRAY(String))
)

database = Database(DATABASE_URL)
        

In particolare, DATABASE_URI è l’URL utilizzato per connettersi al database PostgreSQL. movie_user è il nome dell’utente del database, movie_password è la password dell’utente del database ed movie_db è il nome del database.

Proprio come in SQLAlchemy, abbiamo creato la tabella per il database dei film.

Aggiorniamo quindi main.py per connettersi al database. Il codice di main.py è il seguente:

            #~/movie-service/app/main.py

from fastapi import FastAPI
from app.api.movies import movies
from app.api.db import metadata, database, engine

metadata.create_all(engine)

app = FastAPI()

@app.on_event("startup")
async def startup():
    await database.connect()

@app.on_event("shutdown")
async def shutdown():
    await database.disconnect()
    
app.include_router(movies)
        
FastAPI fornisce alcuni gestori di eventi che è possibile utilizzare per connettersi al nostro database all’avvio dell’applicazione e disconnettersi alla chiusura. Aggiorniamo movies.py in modo che utilizzi un database invece di un falso elenco Python.
            #~/movie-service/app/api/movies.py

from typing import List
from fastapi import Header, APIRouter

from app.api.models import MovieIn, MovieOut
from app.api import db_manager

movies = APIRouter()

@movies.get('/', response_model=List[MovieOut])
async def index():
    return await db_manager.get_all_movies()

@movies.post('/', status_code=201)
async def add_movie(payload: MovieIn):
    movie_id = await db_manager.add_movie(payload)
    response = {
        'id': movie_id,
        **payload.dict()
    }

    return response

@movies.put('/{id}')
async def update_movie(id: int, payload: MovieIn):
    movie = payload.dict()
    fake_movie_db[id] = movie
    return None

@movies.put('/{id}')
async def update_movie(id: int, payload: MovieIn):
    movie = await db_manager.get_movie(id)
    if not movie:
        raise HTTPException(status_code=404, detail="Movie not found")

    update_data = payload.dict(exclude_unset=True)
    movie_in_db = MovieIn(**movie)

    updated_movie = movie_in_db.copy(update=update_data)

    return await db_manager.update_movie(id, updated_movie)

@movies.delete('/{id}')
async def delete_movie(id: int):
    movie = await db_manager.get_movie(id)
    if not movie:
        raise HTTPException(status_code=404, detail="Movie not found")
    return await db_manager.delete_movie(id)
        
Aggiungiamo db_manager.py per manipolare il nostro database.
            #~/movie-service/app/api/db_manager.py

from app.api.models import MovieIn, MovieOut, MovieUpdate
from app.api.db import movies, database

async def add_movie(payload: MovieIn):
    query = movies.insert().values(**payload.dict())
    return await database.execute(query=query)

async def get_all_movies():
    query = movies.select()
    return await database.fetch_all(query=query)

async def get_movie(id):
    query = movies.select(movies.c.id==id)
    return await database.fetch_one(query=query)

async def delete_movie(id: int):
    query = movies.delete().where(movies.c.id==id)
    return await database.execute(query=query)

async def update_movie(id: int, payload: MovieIn):
    query = (
        movies
        .update()
        .where(movies.c.id == id)
        .values(**payload.dict())
    )
    return await database.execute(query=query)
        
Aggiorniamo il nostro models.py in modo che possiamo utilizzare il modello Pydantic con la tabella sqlalchemy.
            #~/movie-service/app/api/models.py

from pydantic import BaseModel
from typing import List, Optional


class MovieIn(BaseModel):
    name: str
    plot: str
    genres: List[str]
    casts: List[str]


class MovieOut(MovieIn):
    id: int


class MovieUpdate(MovieIn):
    name: Optional[str] = None
    plot: Optional[str] = None
    genres: Optional[List[str]] = None
    casts: Optional[List[str]] = None
        

La classe MovieIn è il modello base da usare per aggiungere un film al database. Dobbiamo aggiungere id a questo modello per ottenerlo dal database, creando il modello ModeulOut. Il modello MovieUpdate consente di impostare i valori nel modello come facoltativi in modo che durante l’aggiornamento del filmato possa essere inviato solo il campo che deve essere aggiornato.

Possiamo ora collegarci alla pagina “docs” della nostra applicazione ed inizia a giocare con l’API.

 

 

Modelli di gestione dei dati nei microservice

La gestione dei dati è uno degli aspetti più critici durante la creazione di un microservizio. Poiché le diverse funzioni dell’applicazione sono gestite da servizi diversi, l’utilizzo di un database può essere complicato.

Di seguito sono riportati alcuni modelli che è possibile utilizzare per gestire il flusso di dati nell’applicazione.

 

Database per servizio

L’utilizzo di un database per ogni servizio è ottimo se vuoi che i tuoi microservizi siano il più possibile accoppiati debolmente. Avere un database diverso per ogni servizio ci consente di scalare servizi diversi in modo indipendente. Una transazione che coinvolge più database viene eseguita tramite API ben definite. Ciò ha il suo svantaggio in quanto non è semplice implementare transazioni complesse che coinvolgono più servizi . Inoltre, l’aggiunta del sovraccarico di rete rende questo approccio meno efficiente da usare.

 

Database condiviso

Se ci sono molte transazioni che coinvolgono più servizi è meglio usare un database condiviso. Ciò comporta i vantaggi di un’applicazione altamente coerente, ma elimina la maggior parte dei vantaggi offerti dall’architettura dei microservizi. Gli sviluppatori che lavorano su un servizio devono coordinarsi con le modifiche allo schema in altri servizi.

 

Composizione API

Nelle transazioni che coinvolgono più database, il compositore API funge da gateway API ed esegue chiamate API ad altri microservizi nell’ordine richiesto. Infine, i risultati di ogni microservizio vengono restituiti al servizio client dopo aver eseguito un join in memoria. Lo svantaggio di questo approccio è l’inefficienza dei join in memoria di un set di dati di grandi dimensioni.

 

 

Creazione di un microservice Python in Docker

Il problema della distribuzione del microservice può essere notevolmente ridotto utilizzando Docker. Docker aiuta a incapsulare ogni servizio e a scalarlo in modo indipendente.

 

Installazione di Docker e Docker Compose

Se non hai già installato docker nel tuo sistema, verifichiamo se docker è installato eseguendo il comando docker. Dopo aver completato l’installazione di Docker, installiamo Docker Compose . Docker Compose viene utilizzato per definire ed eseguire più container Docker. E’ utile anche per facilitare l’interazione tra i container.

 

Creazione del servizio Movies

Poiché gran parte del lavoro per la creazione di un servizio Movies è già stato svolto all’inizio con FastAPI, riutilizzeremo il codice che abbiamo già scritto. Creiamo una cartella nuova di zecca, che chiamerò python-microservices e spostiamoci il codice che abbiamo scritto in precedenza.

Quindi, la struttura delle cartelle sarebbe simile a questa:

            python-microservices/
└── movie-service/
    ├── app/
    └── env/
        

Prima di tutto, creiamo un file requirements.txt in cui conserveremo tutte le dipendenze che utilizzeremo in movie-service.

Creiamo un nuovo file requirements.txt all’interno di movie-service e aggiungiamo quanto segue:

            asyncpg==0.20.1
databases[postgresql]==0.2.6
fastapi==0.48.0
SQLAlchemy==1.3.13
uvicorn==0.11.2
httpx==0.11.1
        

Abbiamo usato tutte le librerie menzionate nel file tranne httpx che utilizzerai mentre effettui una chiamata API da servizio ad un altro.

Creiamo un Dockerfile all’interno di movie-service come segue:

            FROM python:3.8-slim

WORKDIR /app

COPY ./requirements.txt /app/requirements.txt

RUN apt-get update \
    && apt-get install gcc -y \
    && apt-get clean

RUN pip install -r /app/requirements.txt \
    && rm -rf /root/.cache/pip

COPY . /app/
        

Per prima cosa definiamo quale versione di Python usare. Quindi impostiamo la cartella app come WORKDIR all’interno del contenitore Docker. Dopodiché viene installato gcc, richiesto dalle librerie che utilizziamo nell’applicazione.

Infine, installiamo tutte le dipendenze in requirements.txt e copiamo tutti i file all’interno di movie-service/app.

Aggiorniamo db.py e sostituiamo:

            DATABASE_URI = 'postgresql://movie_user:movie_password@localhost/movie_db'
        

con:

            DATABASE_URI = os.getenv('DATABASE_URI')
        

NOTA: non dimentichiamo di importare os nella parte superiore del file.

È necessario eseguire questa operazione in modo da poter fornire in seguito DATABASE_URI come variabile di ambiente.

Inoltre, aggiorniamo main.py e sostituiamo:

            app.include_router(movies)
        
con:
            app.include_router(movies, prefix='/api/v1/movies', tags=['movies'])
        

Abbiamo aggiunto prefix=/api/v1/movies in modo da gestire più facilmente le diverse versioni dell’API. Inoltre, i tag facilitano la ricerca delle API relative ai movies nei docs di FastAPI.

Inoltre, dobbiamo aggiornare i nostri modelli in modo che casts memorizzi l’ID del cast invece del nome effettivo. Quindi, aggiorniamo models.py come segue:

            #~/python-microservices/movie-service/app/api/models.py

from pydantic import BaseModel
from typing import List, Optional

class MovieIn(BaseModel):
    name: str
    plot: str
    genres: List[str]
    casts_id: List[int]


class MovieOut(MovieIn):
    id: int


class MovieUpdate(MovieIn):
    name: Optional[str] = None
    plot: Optional[str] = None
    genres: Optional[List[str]] = None
    casts_id: Optional[List[int]] = None
        
Allo stesso modo, dobbiamo aggiornare le tabelle del database, aggiorniamo db.py:
            #~/python-microservices/movie-service/app/api/db.py

import os

from sqlalchemy import (Column, DateTime, Integer, MetaData, String, Table,
                        create_engine, ARRAY)

from databases import Database

DATABASE_URL = os.getenv('DATABASE_URL')

engine = create_engine(DATABASE_URL)
metadata = MetaData()

movies = Table(
    'movies',
    metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String(50)),
    Column('plot', String(250)),
    Column('genres', ARRAY(String)),
    Column('casts_id', ARRAY(Integer))
)

database = Database(DATABASE_URL)
        

Ora, aggiorniamo movies.py per verificare se il cast con l’ID specificato si presenta nel cast-service prima di aggiungere un nuovo film o aggiornare un film.

            #~/python-microservices/movie-service/app/api/movies.py

from typing import List
from fastapi import APIRouter, HTTPException

from app.api.models import MovieOut, MovieIn, MovieUpdate
from app.api import db_manager
from app.api.service import is_cast_present

movies = APIRouter()

@movies.post('/', response_model=MovieOut, status_code=201)
async def create_movie(payload: MovieIn):
    for cast_id in payload.casts_id:
        if not is_cast_present(cast_id):
            raise HTTPException(status_code=404, detail=f"Cast with id:{cast_id} not found")

    movie_id = await db_manager.add_movie(payload)
    response = {
        'id': movie_id,
        **payload.dict()
    }

    return response

@movies.get('/', response_model=List[MovieOut])
async def get_movies():
    return await db_manager.get_all_movies()

@movies.get('/{id}/', response_model=MovieOut)
async def get_movie(id: int):
    movie = await db_manager.get_movie(id)
    if not movie:
        raise HTTPException(status_code=404, detail="Movie not found")
    return movie

@movies.put('/{id}/', response_model=MovieOut)
async def update_movie(id: int, payload: MovieUpdate):
    movie = await db_manager.get_movie(id)
    if not movie:
        raise HTTPException(status_code=404, detail="Movie not found")

    update_data = payload.dict(exclude_unset=True)

    if 'casts_id' in update_data:
        for cast_id in payload.casts_id:
            if not is_cast_present(cast_id):
                raise HTTPException(status_code=404, detail=f"Cast with given id:{cast_id} not found")

    movie_in_db = MovieIn(**movie)

    updated_movie = movie_in_db.copy(update=update_data)

    return await db_manager.update_movie(id, updated_movie)

@movies.delete('/{id}', response_model=None)
async def delete_movie(id: int):
    movie = await db_manager.get_movie(id)
    if not movie:
        raise HTTPException(status_code=404, detail="Movie not found")
    return await db_manager.delete_movie(id)
        

Aggiungiamo un servizio per effettuare una chiamata API al casts-service:

            #~/python-microservices/movie-service/app/api/service.py

import os
import httpx

CAST_SERVICE_HOST_URL = 'http://localhost:8002/api/v1/casts/'
url = os.environ.get('CAST_SERVICE_HOST_URL') or CAST_SERVICE_HOST_URL

def is_cast_present(cast_id: int):
    r = httpx.get(f'{url}{cast_id}')
    return True if r.status_code == 200 else False
        

Nel precedente codice si effettua una chiamata API per ricavare il cast con uno specifico id e restituire true se il cast esiste altrimenti false.


Creazione del casts-service

Analogamente al movie-service, per creare un casts-service utilizzeremo FastAPI e un database PostgreSQL.

Creiamo una struttura di cartelle come la seguente:

            python-microservices/
.
├── cast_service/
│   ├── app/
│   │   ├── api/
│   │   │   ├── casts.py
│   │   │   ├── db_manager.py
│   │   │   ├── db.py
│   │   │   ├── models.py
│   │   ├── main.py
│   ├── Dockerfile
│   └── requirements.txt
├── movie_service/
...
        
Aggiungiamo quanto segue a requirements.txt:
            asyncpg==0.20.1
databases[postgresql]==0.2.6
fastapi==0.48.0
SQLAlchemy==1.3.13
uvicorn==0.11.2
        
Dockerfile:
            FROM python:3.8-slim

WORKDIR /app

COPY ./requirements.txt /app/requirements.txt

RUN apt-get update \
    && apt-get install gcc -y \
    && apt-get clean

RUN pip install -r /app/requirements.txt \
    && rm -rf /root/.cache/pip

COPY . /app/
        
main.py:
            #~/python-microservices/cast-service/app/main.py

from fastapi import FastAPI
from app.api.casts import casts
from app.api.db import metadata, database, engine

metadata.create_all(engine)

app = FastAPI()

@app.on_event("startup")
async def startup():
    await database.connect()

@app.on_event("shutdown")
async def shutdown():
    await database.disconnect()

app.include_router(casts, prefix='/api/v1/casts', tags=['casts'])
        

Abbiamo aggiunto il prefix /api/v1/casts in modo che la gestione dell’API diventi più semplice. Inoltre, l’aggiunta dei tags facilita la ricerca dei documenti relativi a casts nel docs di FastAPI.

casts.py

            #~/python-microservices/cast-service/app/api/casts.py

from fastapi import APIRouter, HTTPException
from typing import List

from app.api.models import CastOut, CastIn, CastUpdate
from app.api import db_manager

casts = APIRouter()

@casts.post('/', response_model=CastOut, status_code=201)
async def create_cast(payload: CastIn):
    cast_id = await db_manager.add_cast(payload)

    response = {
        'id': cast_id,
        **payload.dict()
    }

    return response

@casts.get('/{id}/', response_model=CastOut)
async def get_cast(id: int):
    cast = await db_manager.get_cast(id)
    if not cast:
        raise HTTPException(status_code=404, detail="Cast not found")
    return cast
        
db_manager.py
            #~/python-microservices/cast-service/app/api/db_manager.py

from app.api.models import CastIn, CastOut, CastUpdate
from app.api.db import casts, database


async def add_cast(payload: CastIn):
    query = casts.insert().values(**payload.dict())

    return await database.execute(query=query)

async def get_cast(id):
    query = casts.select(casts.c.id==id)
    return await database.fetch_one(query=query)
        
db.py
            #~/python-microservices/cast-service/app/api/db.py

import os

from sqlalchemy import (Column, Integer, MetaData, String, Table,
                        create_engine, ARRAY)

from databases import Database

DATABASE_URI = os.getenv('DATABASE_URI')

engine = create_engine(DATABASE_URI)
metadata = MetaData()

casts = Table(
    'casts',
    metadata,
    Column('id', Integer, primary_key=True),
    Column('name', String(50)),
    Column('nationality', String(20)),
)

database = Database(DATABASE_URI)
        
models.py
            #~/python-microservices/cast-service/app/api/models.py

from pydantic import BaseModel
from typing import List, Optional

class CastIn(BaseModel):
    name: str
    nationality: Optional[str] = None


class CastOut(CastIn):
    id: int


class CastUpdate(CastIn):
    name: Optional[str] = None
        

Esecuzione del microservizio utilizzando Docker Compose

Per eseguire i microservice, è necessario creare un  filedocker-compose.yml e aggiungiamo quanto segue:

            version: '3.7'

services:
  movie_service:
    build: ./movie-service
    command: uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
    volumes:
      - ./movie-service/:/app/
    ports:
      - 8001:8000
    environment:
      - DATABASE_URI=postgresql://movie_db_username:movie_db_password@movie_db/movie_db_dev
      - CAST_SERVICE_HOST_URL=http://cast_service:8000/api/v1/casts/

  movie_db:
    image: postgres:12.1-alpine
    volumes:
      - postgres_data_movie:/var/lib/postgresql/data/
    environment:
      - POSTGRES_USER=movie_db_username
      - POSTGRES_PASSWORD=movie_db_password
      - POSTGRES_DB=movie_db_dev

  cast_service:
    build: ./cast-service
    command: uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
    volumes:
      - ./cast-service/:/app/
    ports:
      - 8002:8000
    environment:
      - DATABASE_URI=postgresql://cast_db_username:cast_db_password@cast_db/cast_db_dev

  cast_db:
    image: postgres:12.1-alpine
    volumes:
      - postgres_data_cast:/var/lib/postgresql/data/
    environment:
      - POSTGRES_USER=cast_db_username
      - POSTGRES_PASSWORD=cast_db_password
      - POSTGRES_DB=cast_db_dev

volumes:
  postgres_data_movie:
  postgres_data_cast:
        

Abbiamo 4 diversi servizi, movie_service, un database per movie_service, cast_service e un database per cast service. Abbiamo esposto il movie_service alla porta 8001 e modo simile cast_service alla porta 8002.

Per il database, abbiamo utilizzato i volumi in modo che i dati non vengano distrutti quando il contenitore docker viene chiuso. 

Eseguiamo il docker-compose usando il comando:

            docker-compose up -d
        

Questo comando crea la dockar image, se non esiste già, e la esegue.

Andiamo su http://localhost:8002/docs per aggiungere un cast nel casts-service. Allo stesso modo, http://localhost:8001/docs  per aggiungere il film nel movie-service.

Utilizziamo di Nginx per accedere a entrambi i servizi utilizzando un singolo indirizzo host

Abbiamo distribuito i microservizi utilizzando Docker compose, ma c’è un piccolo problema. È necessario accedere a ciascuno dei microservizi utilizzando una porta diversa. Si può risolvere questo problema utilizzando il reverse-proxy di Nginx , utilizzando Nginx puoi indirizzare la richiesta aggiungendo un middleware che indirizza le nostre richieste a diversi servizi in base all’URL dell’API.

Aggiungamo un nuovo file nginx_config.conf all’interno python-microservices con i seguenti contenuti.

            server {
  listen 8080;

  location /api/v1/movies {
    proxy_pass http://movie_service:8000/api/v1/movies;
  }

  location /api/v1/casts {
    proxy_pass http://cast_service:8000/api/v1/casts;
  }

}
        

Stiamo eseguendo Nginx alla porta 8080 e instradando le richieste al movie-service se l’endpoint inizia con /api/v1/movies e in modo simile al casts-service se l’endpoint inizia con /api/v1/casts.

Ora dobbiamo aggiungere il servizio nginx nel nostro file docker-compose-yml. Aggiungiamo il seguente servizio dopo il servizio cast_db:

            ...
nginx:
    image: nginx:latest
    ports:
      - "8080:8080"
    volumes:
      - ./nginx_config.conf:/etc/nginx/conf.d/default.conf
    depends_on:
      - cast_service
      - movie_service
...
        
Ora, spegnamo i contenitori docker con il comando:
            docker-compose down
        

Ed eseguiamolo di nuovo con:

            docker-compose up -d
        

Ora possiamo accedere sia al movie-service che al casts-service tramite la porta 8080.

Collegandosi a http://localhost:8080/api/v1/movies/ otteniamo l’elenco dei film.

Ora, per accedere ai docs dei service è necessario modificare il  main.py del movie-service la seguente riga:

            app = FastAPI()
        

con

            app = FastAPI(openapi_url="/api/v1/movies/openapi.json", docs_url="/api/v1/movies/docs")
        
Allo stesso modo, per il casts-service si deve sostituirlo con
            app = FastAPI(openapi_url="/api/v1/casts/openapi.json", docs_url="/api/v1/casts/docs")
        

Abbiamo cambiato l’endpoint in cui vengono serviti i docs e da dove viene servito il openapi.json.

Ora possiamo accedere ai docs da http://localhost:8080/api/v1/movies/docs per il movie-service e da http://localhost:8080/api/v1/casts/docs per il casts-service.

 

Conclusione

L’architettura del microservice è ottima per suddividere una grande applicazione monolitica in logiche di business separate, ma anche questo comporta una complicazione. Python è ottimo per la creazione di microservizi grazie all’esperienza degli sviluppatori e a tonnellate di pacchetti e framework per rendere gli sviluppatori più produttivi.

Puoi trovare il codice completo presentato in questo tutorial su Github.