Pianificare un web scraping con Apache Airflow

scienzadeidati articoli - Web Scarping con Airflow

Nel post precedente , abbiamo introdotto Apache Airflow e i suoi concetti di base, la  configurazione e l’utilizzo. In questo post, descriviamo come possiamo pianificare i nostri web scraper con l’aiuto di Apache Airflow.

A titolo di esempio, vedremo come effettuare lo scraping del sito https://allrecipes.com tramite perché lo scopo è utilizzare Airflow. Nel caso si desideri approfondire il web scraping si può consultare l’intera serie presente qui su scienzadeidati.com.

Quindi, lavoreremo su un flusso di lavoro composto dalle seguenti attività:

  • parse_recipes: analizza le singole ricette.
  • download_image: scarica l’immagine della ricetta.
  • store_data: memorizza  l’immagine e i dati analizzati in un database MySQL

Non descriviamo come pianificare i DAG e altre cose che abbiamo già trattato nella Parte 1.

articoli - airflow_web_scraping

Abbiamo impostato load_examples=False in airflow.cfg in modo da semplificare l’interfaccia ed avere solo i DAG che ci interessano. La struttura del codice di base è simile alla seguente:

            import datetime as dt

from airflow import DAG
from airflow.operators.python_operator import PythonOperator


def parse_recipes():
    return 'parse_recipes'


def download_image():
    return 'download_image'


def store_data():
    return 'store_data'


default_args = {
    'owner': 'airflow',
    'start_date': dt.datetime(2020, 12, 1, 10, 00, 00),
    'concurrency': 1,
    'retries': 0
}

with DAG('parsing_recipes',
         catchup=False,
         default_args=default_args,
         schedule_interval='*/10 * * * *',
         # schedule_interval=None,
         ) as dag:
    opr_parse_recipes = PythonOperator(task_id='parse_recipes',
                                       python_callable=parse_recipes)

    opr_download_image = PythonOperator(task_id='download_image',
                                        python_callable=download_image)
    opr_store_data = PythonOperator(task_id='store_data',
                                    python_callable=store_data)

opr_parse_recipes >> opr_download_image >> opr_store_data
        

Se registriamo questo DAG eseguendo airflow scheduler, otteniamo qualcosa di simile:

articoli - airflow_web_scraping

Da notare la potenza dei flussi di lavoro. E’ possibile visualizzare lo scheletro della nostra pipeline in modo da avere un’idea del nostro flusso dati senza entrare nei dettagli. Bello, vero?

Ora, implementiamo una per una le routine di ogni task. Prima di entrare nella logica di parse_recipies, è fondamentale descrivere come i task di Airflow possono comunicare tra loro.

 

Cos’è XCOM?

XCOM fornisce i metodi per far comunicare i task tra di loro. I dati inviati da un task vengono acquisiti da un’altro task. Se impostiamo provide_context=True, il valore restituito della funzione viene inserito in XCOM che di per sé non è altro che una tabella Db. Se controlliamo airflow.db troveremo una tabella con il nome xcom, al cui interno sono presenti le voci delle istanze di attività in esecuzione.

            
def parse_recipes(**kwargs):
    return 'RETURNS parse_recipes'

        

Il primo task non prevede ha tali modifiche oltre a fornire **kwargs che consentono di condividere coppie chiave/valore. E’ comunque necessario impostare provide_context=True per ogni operatore, in modo da renderlo compatibile con XCom. Ad esempio:

            opr_parse_recipes = PythonOperator(task_id='parse_recipes',
                                python_callable=parse_recipes, provide_context=True)
        

 

Il task download_image avrà le seguenti modifiche:

            
def download_image(**kwargs):
    ti = kwargs['ti']
    v1 = ti.xcom_pull(key=None, task_ids='parse_recipes')

    print('Printing Task 1 values in Download_image')
    print(v1)
    return 'download_image'
        

Con prima riga è ti=kwargs['t1'] otteniamo i dettagli delle istanze tramite la chiave di accesso ti.

Vediamo in dettaglio perchè è necessario prevedere l’uso di queste chiavi di accesso. Se analizziamo il parametro kwargs, otteniamo una stampa simile alla seguente, dove sono presenti chiavi come t1task_instance, ecc… che contengono il valore inviato da un task:

            {
  'dag': <DAG: parsing_recipes>,
  'ds': '2020-12-02',
  'next_ds': '2020-12-02',
  'prev_ds': '2020-12-02',
  'ds_nodash': '20201202',
  'ts': '2020-12-02T09:56:05.289457+00:00',
  'ts_nodash': '20201202T095605.289457+0000',
  'yesterday_ds': '2020-12-01',
  'yesterday_ds_nodash': '20201201',
  'tomorrow_ds': '2020-12-03',
  'tomorrow_ds_nodash': '20201203',
  'END_DATE': '2020-12-02',
  'end_date': '2020-12-02',
  'dag_run': <DagRunparsing_recipes@2020-12-0209:56:05.289457+00:00:manual__2020-12-02T09:56:05.289457+00:00, externallytriggered: True>,
  'run_id': 'manual__2020-12-02T09:56:05.289457+00:00',
  'execution_date': <Pendulum[2020-12-02T09: 56: 05.289457+00:00]>,
  'prev_execution_date': datetime.datetime(2020,12,2,9,56,tzinfo=<TimezoneInfo[UTC,GMT,+00:00:00,STD]>),
  'next_execution_date': datetime.datetime(2020,12,2,9,58,tzinfo=<TimezoneInfo[UTC,GMT,+00:00:00,STD]>),
  'latest_date': '2020-12-02',
  'macros': <module'airflow.macros'from'/anaconda3/anaconda/lib/python3.6/site-packages/airflow/macros/__init__.py'>,
  'params': {},
  'tables': None,
  'task': <Task(PythonOperator): download_image>,
  'task_instance': <TaskInstance: parsing_recipes.download_image2020-12-02T09: 56: 05.289457+00:00[running]>,
  'ti': <TaskInstance: parsing_recipes.download_image2020-12-02T09: 56: 05.289457+00:00[running]>,
  'task_instance_key_str': 'parsing_recipes__download_image__20201202',
  'conf': <module'airflow.configuration'from'/anaconda3/anaconda/lib/python3.6/site-packages/airflow/configuration.py'>,
  'test_mode': False,
  'var': {
    'value': None,
    'json': None
  },
  'inlets': [],
  'outlets': [],
  'templates_dict': None
}
        

Nella riga successiva si richiama il metodo xcom_pull per inserire il valore restituito di un determinato task. Nel nostro caso l’ID task è parse_recipes:

v1 = ti.xcom_pull(key=None, task_ids='parse_recipes')

Bene, dopo aver descritto il concetto di XCOM, torniamo al nostro codice originale.

Quindi il nostro processo prevede un primo task che legge gli URL dal file di testo (possiamo anche creare un altra task che sarà responsabile solo di recuperare i collegamenti url e archiviarli in un file o DB.  Lascio questa implementazione a voi lettori! ) e quindi la list delle voci viene passata al successivo task che ha il compito di scaricare le immagini e aggiungere le informazioni del file appena scaricato in  un dict e infine memorizzarlo nel database MySQL.

Il metodo download_image appare ora come segue:

            
def download_image(**kwargs):
    local_image_file = None
    idx = 0
    records = []
    ti = kwargs['ti']

    parsed_records = ti.xcom_pull(key=None, task_ids='parse_recipes')

    for rec in parsed_records:
        idx += 1
        image_url = rec['image_url']
        r_url = rec['url']
        print('Downloading Pic# {}'.format(idx))
        local_image_file = dl_img(image_url, r_url)
        rec['local_image'] = local_image_file
        records.append(rec)

    return records
        

Non descriviamo il metodo dl_img dato che esula dallo scopo di questo post. Se vuoi puoi controllare il codice direttamente su Github. Una volta scaricato il file, aggiungiamo la chiave nel record originale.

Il metodo store_data appare ora come segue:

            
def store_data(**kwargs):
    ti = kwargs['ti']

    parsed_records = ti.xcom_pull(key=None, task_ids='download_image')
    print('PRINTING DUMPED RECORDS in STORE DATA')
    print(parsed_records)
    return 'store_data'
        

Assicuriamoci di impostare provide_context=True per l’operatore opr_store_data altrimenti si ottiene il seguente errore:

Subtask store_data KeyError: 'ti'

Ora che i dati sono disponibili., sono inviati per essere memorizzati nel database MySQL.

Per utilizzare MySQL con Airflow, utilizzeremo gli hook forniti da Airflow.

Airflow Hooks ci consente di interagire con sistemi esterni: e-mail, S3, database e vari altri.

Prima di iniziare a programmare, dobbiamo configurare una connessione MySQL.

Sull’interfaccia utente Web di Airflow, dobbiamo andare su Amministratore > Connessioni. Qui abbiamo un elenco di connessioni esistenti se ci colleghiamo a http://0.0.0.0:8080/admin/connection/

articoli - airflow_web_scraping

Modifichiamo la connessione e impostiamo il nome e la password della tabella.

articoli - airflow_web_scraping

Per utilizzare la libreria MySQL dobbiamo importare MySqlHook 

from airflow.hooks.mysql_hook import MySqlHook

            
def store_data(**kwargs):
    ti = kwargs['ti']

    parsed_records = ti.xcom_pull(key=None, task_ids='download_image')
    connection = MySqlHook(mysql_conn_id='mysql_default')
    for r in parsed_records:
        url = r['url']
        data = json.dumps(r)
        sql = 'INSERT INTO recipes(url,data) VALUES (%s,%s)'
        connection.run(sql, autocommit=True, parameters=(url, data))
    return True
        

L’SQL della tabella è riportato di seguito:

            
CREATE TABLE recipes (
	'id' int(11) UNSIGNED NOT NULL AUTO_INCREMENT,
	'url' varchar(100) NOT NULL,
	'data' text NOT NULL,
	PRIMARY KEY (`id`)
);
        

Questo è tutto. Oh, aspetta… E’ possibile informare l’amministratore se il flusso di lavoro è stato eseguito correttamente? Airflow ci consente di utilizzare EmailOperator per tale scopo.

            
opr_email = EmailOperator(
        task_id='send_email',
        to='[email protected]',
        subject='Airflow Finished',
        html_content=""" <h3>DONE</h3> """,
        dag=dag
    )
        

Prima di usarlo dobbiamo apportare modifiche a airflow.cfg, nelle impostazioni relative alla posta. C’è una sezione [smtp].

            
[smtp]
# If you want airflow to send emails on retries, failure, and you want to use
# the airflow.utils.email.send_email_smtp function, you have to configure an
# smtp server here
smtp_host = smtp.gmail.com
smtp_starttls = False
smtp_ssl = True
# Uncomment and set the user/pass settings if you want to use SMTP AUTH
smtp_user = [email protected]
smtp_password = your_password
smtp_port = 465
smtp_mail_from = [email protected]
        

Conclusione

In questo post, abbiamo visto come introdurre Airflow nella  nostra architettura di scraping esistente e come utilizzare MySQL con Airflow. Ci sono varie possibilità per migliorarlo o estenderlo. Provate e fatemi sapere.

Il codice è disponibile su Github

Quando eseguiamo i web scraping, c’è un’alta probabilità di essere bloccato e non abbiamo altra scelta che aspettare e pregare di essere sbloccati. Gli indirizzi IP proxy vengono utilizzati per evitare tali circostanze.

Introduzione ad Apache Airflow

scienzadeidati articoli - Introduzione ad AirflowI

In questo post introduciamo Apache Airflow, un sistema di gestione del flusso di lavoro sviluppato da Airbnb.

Apache Airflow permette ovviamente di creare pipeline ETL per automatizzare le cose,  ma le sue funzionalità permettono di implementare molti altri scenari dove è necessario eseguire attività in un determinato ordine, almeno una volta o periodicamente. Ad esempio:

  • Monitoraggio dei Cron
  • Trasferire dati da un luogo all’altro.
  • Automatizzare le operazioni DevOps.
  • Recuperare periodicamente i dati dai siti Web ed  aggiornare un database.
  • Elaborazione dei dati per sistemi complessi.
  • Pipeline di  machine learning.

Le possibilità sono infinite.

Prima di vedere l’implementazione di Airflow nei nostri sistemi, descriviamo cosa effettivamente è Airflow e delle sue terminologie.

 

Che cos’è il Airflow?

Dal sito web:

Airflow è una piattaforma per creare, pianificare e monitorare i flussi di lavoro in modo programmatico.

Utilizza Airflow per creare flussi di lavoro come grafici aciclici diretti (DAG) di attività. Lo scheduler di Airflow esegue le tue attività su una serie di worker, seguendo le dipendenze specificate. Le ricche utilità della riga di comando semplificano l’esecuzione di interventi complessi sui DAG. La ricca interfaccia utente semplifica la visualizzazione delle pipeline in esecuzione in produzione, il monitoraggio dei progressi e la risoluzione dei problemi quando necessario.

Fondamentalmente, aiuta ad automatizzare gli script per eseguire attività. Airflow è basato su Python ma puoi eseguire un programma indipendentemente dal linguaggio. Ad esempio, la prima fase del flusso di lavoro deve eseguire un programma basato su C++ per eseguire l’analisi delle immagini e quindi un programma basato su Python per trasferire tali informazioni su S3. Le possibilità sono infinite.

 

Cos’è Dag?

Da Wikipedia

In matematica e informatica, un grafo aciclico diretto (DAG /ˈdæɡ/, è un grafo diretto finito senza cicli diretti. Cioè, consiste di un numero finito di vertici e spigoli, con ogni spigolo diretto da un vertice all’altro, in modo tale che non c’è modo di iniziare da nessun vertice v e seguire una sequenza di spigoli coerentemente diretta che alla fine torna nuovamente a v . Equivalentemente, un DAG è un grafo diretto che ha un ordinamento topologico, una sequenza di vertici tale che ogni arco è diretto da prima a dopo nella sequenza.

Proviamo a spiegarlo in parole semplici: puoi essere solo figlio di tuo padre ma non viceversa. OK, è un esempio zoppo o strano, ma non sono riuscito a trovare un esempio migliore per spiegare un ciclo diretto .

aricoli - airflow grafo

In Airflow tutti i flussi di lavoro sono DAG. Un Dag è composto da operatori. Un operatore definisce un’attività individuale che deve essere eseguita. Sono disponibili diversi tipi di operatori (come indicato sul sito Web Airflow):

  • BashOperator – esegue un comando bash
  • PythonOperator – richiama un’arbitraria funzione Python
  • EmailOperator – invia un’e-mail
  • SimpleHttpOperator – invia una richiesta HTTP
  • MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, ecc. – esegue un comando SQL
  • Sensor – attende un certo tempo, file, record di un database, chiave S3, ecc…

Possiamo anche trovare un operatore personalizzato secondo specifiche esigenze.

 

Installazione e configurazione

Ariflow è basato su Python. Il modo migliore per installarlo è tramite lo strumento pip.

pip install apache-airflow

Per verificare se è stato installato, eseguiamo il comando airflow version e otteniamo qualcosa del tipo:

            [2020-12-02 15:59:23,880] {__init__.py:51} INFO - Using executor SequentialExecutor
____________       _____________
____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
_/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
v1.10.0
You will need to install mysqlclient as well to incorporate MySQL in your workflows. It is optional though.
        

Inoltre possiamo anche installare mysqlclient per incorporare MySQL nei flussi di lavoro. È facoltativo e possiamo scegliere un diverso database, come postgresql, SQlite, ecc…

pip install mysqlclient

Prima di iniziare, è necessario creare una cartella ed impostala come AIRFLOW_HOME. Nel nostro caso è “airflow_home”. Una volta creata, eseguiamo il comando export per impostare la home nelle variabili d’ambiente.

export AIRFLOW_HOME='pwd' airflow_home

E’ necessario assicurarsi di essere nella directory padre di “airflow_home” prima di eseguire il comando export. Inoltre all’interno di “airflow_home” è necessario creare un’altra cartella per conservare i DAG, chiamata dags.

Se si imposta load_examples=False, gli esempi predefiniti non verranno caricati sull’interfaccia Web.

Ora dobbiamo eseguire il seguente comando all’interno della cartella “airflow_home”.

airflow db init

Questo comando crea i file airflow.cfg e unitests.cfg

In particolare airflow.db è un file SQLite per memorizzare tutta la configurazione relativa ai flussi di lavoro di esecuzione. airflow.cfg è quello di memorizzare tutte le impostazioni iniziali per gestire i wrokflow in esecuzione.

In questo file, è presente il parametro sql_alchemy_conn con il valore ../airflow_home/airflow.db.

Possiamo usare MySQL, ma per ora, seguiamo le impostazioni di base.

 

Airflow WebServer

Fin qui tutto bene, ora senza perdere tempo avviamo il web server.

airflow webserver

Quando si avvia il webserver si ha una stampa simile a quanto segue:

            2020-12-02 22:36:24,943] {__init__.py:51} INFO - Using executor SequentialExecutor
/anaconda3/anaconda/lib/python3.6/site-packages/airflow/bin/cli.py:1595: 
DeprecationWarning: The celeryd_concurrency option in [celery] 
has been renamed to worker_concurrency - the old setting has been used,
but please update your config. default=conf.get('celery', 'worker_concurrency')),
____________       _____________
____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
_/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
v1.10.0
        
            [2020-12-03 14:21:42,340] {__init__.py:57} INFO - Using executor SequentialExecutor
____________       _____________
____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
_/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/


/anaconda3/anaconda/lib/python3.6/site-packages/flask/exthook.py:71:
ExtDeprecationWarning: Importing flask.ext.cache is deprecated, use flask_cache instead.
.format(x=modname), ExtDeprecationWarning

[2020-12-03 14:21:43,119] [48995] {models.py:167} INFO - 
Filling up the DagBag from /Development/airflow_home/dags
Running the Gunicorn Server with:
Workers: 4 sync
Host: 0.0.0.0:8080
        

Ora, visitiamo la pagina 0.0.0.0:8080 e otteniamo la seguente schermata:

articoli - airflow webserver

Qui possiamo vedere una serie di voci. Queste sono gli esempi forniti con l’installazione Airflow. Si possono disattivarli modificando il file airflow.cfg e impostando il parametro load_examples su FALSE.

Le DAG Runs indicano quante volte è stato eseguito un determinato DAG. Recent Tasks indica quale attività tra molte attività all’interno di un DAG è attualmente in esecuzione e qual è il suo stato. Lo schedule è simile a quello che si usa quando  si pianifica un Cron, quindi al momento non ha bisogno di approfondimenti. Lo Schedule è responsabile dell’ora in cui un determinato DAG dovrebbe essere attivato.

articoli - airflow grafo view

Qui sopra veidamo lo screenshot di un DAG creato ed eseguito in precedenza. Le caselle rettangolari rappresentano un’attività (o Task). In alto a destra si possono vedere caselle di diverso colore, denominate: success, running, failed ecc. Questi sono gli stati che un DAG può assumere.

Nell’immagine sopra è possibile che tutte le caselle abbiano un bordo verde, tuttavia, se non siamo sicuri è sufficiente muovere il mouse sopra uno stato success e si evidenziano i task in quello stato, come nell’immagine seguente:

articoli - airflow grafo hover

Da notare che il colore di sfondo/riempimento di queste caselle è verde e canna. In alto a sinistra del pannello grigio, puoi vedere perché sono ci sono tali colori, questi colori di sfondo rappresentano i diversi tipi di operatori utilizzati in questo DAG. In questo caso, stiamo usando BashOperator e PythonOperator.

 

Esempio di base

Vediamo ora un esempio di base per descrivere il funzionamento di Airflow. Nella cartella dags, che è stata precedentemente creata  in airflow_home, creeremo il nostro primo DAG di esempio. In pratica si deve creare un file denominato, ad esempio,my_simple_dag.py.

La prima cosa da fare dopo le importazioni è scrivere le routine che serviranno come task per gli operatori. Useremo una miscela di BashOperator e PythonOperator.

            import datetime as dt

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator


def greet():
    print('Writing in file')
    with open('path/to/file/greet.txt', 'a+', encoding='utf8') as f:
        now = dt.datetime.now()
        t = now.strftime("%Y-%m-%d %H:%M")
        f.write(str(t) + '\n')
    return 'Greeted'
def respond():
    return 'Greet Responded Again'
        
Queste sono due semplici routine che non fanno altro che restituire un testo. Vedremo successivamente perché stiamo scrivendo qualcosa in un file di testo. La prossima cosa da fare è definire default_args e creare un’istanza DAG.
            default_args = {
    'owner': 'airflow',
    'start_date': dt.datetime(2020, 12, 03, 11, 00, 00),
    'concurrency': 1,
    'retries': 0
}
        

Nella variabile default_args si definisco una serie di parametri.

start_date indica quando questo DAG dovrebbe iniziare a eseguire il flusso di lavoro. Questo start_date potrebbe appartenere al passato. Nel mio caso, è il 3 dicembre alle 11:00 UTC. Questa data è già passata per me perché ora sono già le 11:15 UTC. Puoi sempre modificare questo parametro all’interno del file airflow.cfg e impostare il tuo fuso orario locale. Per ora, possiamo anche lasciare UTC. Nel caso in cui tu sia ancora curioso di sapere che ora viene utilizzata da Airflow, controlla in alto a destra nell’interfaccia utente Web di Airflow, dovresti vedere qualcosa di simile a quello indicato di seguito. Puoi usarlo come riferimento per pianificare le tue attività.

articoli - airflow_date

Il parametro retries tenta di eseguire il DAG X un numero di volte in caso di mancata esecuzione. Il parametro concurrency aiuta a determinare il numero di processi da utilizzare per l’esecuzione di più DAG. Per esempio, il DAG deve eseguire 4 istanze del passato, chiamati anche come Backfill , con un intervallo di 10 minuti (vedremo questo argomento complesso a breve) ed è stata impostata concurrency pari a 2, quindi 2 DAG sono lanciati nello stesso momento ed sono eseguite i task previsto in esse. Se hai già implementato multiprocessing in Python, dovresti facilmente capire il funzionamento.

            with DAG('my_simple_dag',
         default_args=default_args,
         schedule_interval='*/10 * * * *',
         ) as dag:
    opr_hello = BashOperator(task_id='say_Hi',
                             bash_command='echo "Hi!!"')

    opr_greet = PythonOperator(task_id='greet',
                               python_callable=greet)
    opr_sleep = BashOperator(task_id='sleep_me',
                             bash_command='sleep 5')

    opr_respond = PythonOperator(task_id='respond',
                                 python_callable=respond)
                                 
opr_hello >> opr_greet >> opr_sleep >> opr_respond
        

Ora usando Context Manager stiamo definendo un DAG con le sue proprietà, il primo parametro è l’ID del dag, nel nostro caso è my_simple_dag, il secondo parametro lo abbiamo già descritto, il 3° parametro è qualcosa che deve essere discusso insieme a start_date, che abbiamo menzionato in default_args.

All’interno di quel Context Manager, stiamo assegnando operatori associati all’ID del task. Nel nostro caso questi operatori etichettati come: opr_hello opr_greet opr_sleep e opr_respond. Questi nomi vengono quindi visualizzati nelle caselle rettangolari discusse in precedenza.

Prima di andare oltre, è meglio discutere i DAG Runs e lo Scheduler e quale ruolo svolgono nell’intero flusso di lavoro.

 

Che cos’è l’utilità di pianificazione di Airflow?

Airflow Scheduler è un processo di monitoraggio che viene eseguito continuamente e attiva l’esecuzione delle attività in base a schedule_interval e execution_date.

 

Cos’è il DagRun?

Un DagRun è l’istanza di un DAG che verrà eseguito di volta in volta. Quando viene eseguito, verranno eseguite tutte le attività al suo interno.

articoli - airflow_DAGruns

Supponiamo che start_date sia il 3 dicembre 2020 alle 12:00:00 UTC e che il DAG sia stato avviato alle 12:30:00 UTC con il schedule_interval di */10 * * * * (ogni 10 minuti).

Utilizzando gli stessi parametri default_args discusse sopra, di seguito sono riportate le istanze del DAG che verranno eseguite immediatamente, una alla volta dato che nostro caso concurrency è pari 1:

articoli - airflow DAG times

Perché accade questo? Siamo stati noi a deciderlo tramite i parametri di configurazione. Airflow ti offre la possibilità di eseguire i DAG passati. Il processo di esecuzione dei DAG passati si chiama Backfill. Il processo di backfilling consente effettivamente a Airflow di anticipare lo stato di tutti i DAG sin dal suo inizio. La funzione è stata fornita per scenari in cui si esegue un DAG che interroga alcuni DB o API come Google Analytics per recuperare dati precedenti e renderli parte del flusso di lavoro. Anche se non ci sono dati passati, Airflow lo eseguirà comunque per mantenere intatto lo stato dell’intero flusso di lavoro.

Una volta eseguiti i DAG precedenti, il successivo (quello che si intende eseguire) verrà eseguito alle 12:40:00 UTC. DA sottolineare che qualunque sia lo scheduler impostato, il DAG viene eseguito dopo che quell’intervallo di tempo. Nel nostro caso, se deve essere eseguito ogni 10 minuti, verrà eseguito dopo che sono trascorsi 10 minuti.

 

Esempio di Scheduler

Proviamo. Abilitiamo l’esecuzione di my_simple_dag e avviamo lo scheduler.

articoli - airflow my_simple_dag

Non appena si esegue il DAG si visualizza una schermata simile alla seguente:

articoli - airflow my_simple_dag start
Alcune delle attività sono in coda. Se facciamo clic sull’ID del DAG, my_simple_dag, vediamo una schermata come la seguente:
articoli - airflow my_simple_dag run

Da notare il timestamp nella colonna Run Id . Vediamo lo schema: Il primo è stato eseguito alle 10:00, poi alle 10:10, alle 10:20. Quindi si interrompe, vorrei chiarire ancora una volta che il DAG viene eseguito una volta trascorso il tempo di durata di 10 minuti. Lo scheduler è iniziato alle 10:30. quindi ha effettuato il backfilling di 3 runs ad intervalli di 10 minuti.

articoli - airflow my_simple_dag finish

Il DAG che è stato eseguito per le 10:30:00 UTC è stato effettivamente eseguito alle 10:40:00 UTC. L’ultimo record del DAGRun sarà sempre uno meno rispetto all’ora corrente. Nel nostro caso, l’ora della macchina era 10:40:00 UTC

articoli - airflow DAG tree

Se passiamo con il mouse su uno dei cerchi si può vedere il timestamp davanti a Run: questo indica l’ora in cui è stato eseguito. Si può notare che questi cerchi verdi hanno un intervallo di tempo di 10 minuti. La Tree View è un po’ complicata ma offre un quadro completo dell’intero flusso di lavoro. Nel nostro caso, è stato eseguito 4 volte e tutte le attività sono state eseguite correttamente, il colore verde scuro.

È possibile evitare il Backfilling in due modi: è possibile impostare start_date con una data futura o impostare catchup = False nell’istanza di DAG. Ad esempio:

            with DAG('my_simple_dag', 
                  catchup=False,
                  default_args=default_args,
                  schedule_interval='*/10 * * * *',
                  # schedule_interval=None,
                  ) as dag:
        
Quindi impostando catchup=False non importa se start_date appartiene o meno al passato. Verrà eseguito dall’ora corrente e continuerà. Impostando end_date è possibile interrompere l’esecuzione di un DAG.
            opr_hello >> opr_greet >> opr_sleep >> opr_respond
        

La riga di codice precedente indica la relazione tra gli operatori, quindi costruisce l’intero flusso di lavoro. In questo caso, L’operatore bit a bit sta indicando la relazione tra gli operatori. Quindi  opr_hello viene eseguito per primo e poi tutto il resto. Il flusso viene eseguito da sinistra a destra. In forma grafica appare come segue:

articoli - airflow_DAG treeview

Se modifichiamo la direzione dell’ultimo operatore, come segue:

            opr_hello >> opr_greet >> opr_sleep << opr_respond
        

Iil flusso sarà simile al seguente:

articoli - airflow_DAG treeview 2

Il taks respond verrà eseguito in parallelo e sleep verrà eseguito in entrambi i casi.

Conclusione

In questo post, abbiamo discusso di come è possibile introdurre un sistema di flusso di lavoro completo per pianificare e automatizzare i flussi di lavoro. Nella parte 2, presenteremo un caso reale per mostrare come è possibile utilizzare Airflow. Volevo includerlo in questo post, ma è già diventato abbastanza lungo ed era necessario spiegare il concetto DAGRun perché mi ci è voluto un po’ di tempo per capirlo.

Come sempre il codice di questo post è disponibile su Github .