wake-up-neo.net

Wie kann verhindert werden, dass der Luftstrom beim Nachfüllen von Dagläufen erfolgt?

Angenommen, Sie haben eine Airflow-DAG, die für das Auffüllen nicht sinnvoll ist. Das bedeutet, dass das Ausführen der nachfolgenden Abläufe nach dem einmaligen Ausführen völlig sinnlos ist.

Wenn Sie beispielsweise Daten aus einer Quelle laden, die nur stündlich in Ihre Datenbank aktualisiert wird, importieren Sie beim Abfüllen, das schnell hintereinander erfolgt, immer wieder dieselben Daten.

Dies ist besonders ärgerlich, wenn Sie eine neue stündliche Aufgabe instanziieren und für jede verpasste Stunde N ausgeführt werden, wobei redundante Arbeit erledigt wird, bevor sie in dem von Ihnen angegebenen Intervall ausgeführt wird.

Die einzige Lösung, die mir einfällt, ist etwas, von dem sie ausdrücklich in FAQ der Dokumente abraten

Wir empfehlen, dynamische Werte als start_date zu verwenden, insbesondere datetime.now(), da dies sehr verwirrend sein kann.

Gibt es eine Möglichkeit, das Abfüllen für eine DAG zu deaktivieren, oder sollte ich das oben tun?

34
m0meni

Aktualisieren Sie auf Airflow Version 1.8 und verwenden Sie catchup_by_default = False in der airflow.cfg oder wenden Sie catchup = False auf jeden Ihrer Dags an.

https://github.com/Apache/incubator-airflow/blob/master/UPDATING.md#catchup_by_default

29
sage88

Dies scheint ein ungelöstes Luftstromproblem zu sein. Ich weiß, ich hätte genau das gleiche Feature. Hier ist, soweit ich gekommen bin; es kann für andere nützlich sein.

Die Funktionen der Benutzeroberfläche (zumindest in 1.7.1.3) können bei diesem Problem hilfreich sein. Wenn Sie in die Strukturansicht gehen und auf eine bestimmte Aufgabe (quadratische Felder) klicken, erscheint eine Dialogschaltfläche mit der Schaltfläche "Erfolg markieren". Wenn Sie auf "Vorher" klicken und dann auf "Erfolg markieren" klicken, werden alle Instanzen dieser Aufgabe in der DAG als erfolgreich gekennzeichnet und sie werden nicht ausgeführt. Die DAG der obersten Ebene (Kreise oben) kann auf ähnliche Weise auch als erfolgreich gekennzeichnet werden, es scheint jedoch nicht möglich zu sein, mehrere DAG-Instanzen zu kennzeichnen.

Ich habe es noch nicht tief genug untersucht, aber es ist möglich, den Unterbefehl 'trigger_dag' zu verwenden, um Zustände von DAGs zu markieren. siehe hier: https://github.com/Apache/incubator-airflow/pull/644/commits/4d30d4d79f1a18b071b585500474248e5f46d67d

Eine CLI-Funktion zur Kennzeichnung von DAGs befindet sich in den Werken: http://mail-archives.Apache.org/mod_mbox/airflow-commits/201606.mbox/%[email protected]%3Ehttps://github.com/Apache/incubator-airflow/pull/1590

UPDATE (28.09.2016): Ein neuer Operator 'LatestOnlyOperator' wurde hinzugefügt ( https://github.com/Apache/incubator-airflow/pull/1752 ), der nur die neueste Version ausführt Version der nachgelagerten Aufgaben. Klingt sehr nützlich und hoffentlich wird es bald in die Veröffentlichungen kommen

UPDATE 2: Ab Airflow 1.8 wurde die LatestOnlyOperator freigegeben.

14
Ziggy Eunicien

Wenn Sie catchup = False in Ihrer dag-Deklaration setzen, wird diese genaue Funktionalität bereitgestellt.

Ich habe nicht den "Ruf" zu kommentieren, aber ich wollte sagen, dass catchup = False (von mir) genau für diesen Zweck entwickelt wurde. Darüber hinaus kann ich überprüfen, dass es in 1.10.1 funktioniert, wenn es explizit in der Instantiierung gesetzt wird. Ich sehe jedoch nicht, dass es funktioniert, wenn es in den Standardarmen platziert wird. Ich war zwar seit 18 Monaten nicht bei Airflow, also wird es ein wenig dauern, bis ich einen Blick darauf werfen kann, warum die Standard-Argumente nicht für Aufholjagd funktionieren.

dag = DAG('example_dag',
        max_active_runs=3,
        catchup=False,
        schedule_interval=timedelta(minutes=5),
        default_args=default_args)
0
Ben Tallman