wake-up-neo.net

Speichern und greifen Sie auf das Passwort mit Apache Airflow zu

Wir verwenden den Luftstrom als Terminplaner. Ich möchte einen einfachen Bash-Operator in einer DAG aufrufen. Das Bash-Skript benötigt ein Kennwort als Argument für die weitere Verarbeitung. 

Wie kann ich ein Passwort sicher im airflow (config/variables/connection) speichern und in der dag-Definitionsdatei darauf zugreifen? 

Ich bin neu in Airflow und Python, daher wird ein Codeausschnitt geschätzt.

9
Anup

Sie können das Kennwort in einem Hook speichern - dieses wird verschlüsselt, solange Sie Ihren Fernet-Schlüssel eingerichtet haben.

So erstellen Sie eine Verbindung.

from airflow.models import Connection
def create_conn(username, password, Host=None):
    new_conn = Connection(conn_id=f'{username}_connection',
                                  login=username,
                                  Host=host if Host else None)
    new_conn.set_password(password)

Dieses Kennwort wird dann in der von Ihnen eingerichteten Datenbank verschlüsselt.

Zugriff auf dieses Passwort:

from airflow.hooks.base_hook import BaseHook

 connection = BaseHook.get_connection("username_connection")
 password = connection.password # This is a getter that returns the unencrypted password.

BEARBEITEN:

Es gibt einen einfacheren Weg, eine Verbindung über die Benutzeroberfläche herzustellen:

 Main Menu Dann:  Create Connection

18
Daniel Lee

In diesem Fall würde ich einen PythonOperator verwenden, von dem aus Sie eine Hook für Ihre Datenbankverbindung mit hook = PostgresHook(postgres_conn_id=postgres_conn_id) erhalten können. Sie können dann get_connection über diesen Hook aufrufen, um ein Verbindungsobjekt zu erhalten, über das Sie den Host, das Login und das Kennwort für Ihre Datenbankverbindung erhalten können. 

Verwenden Sie schließlich subprocess.call(your_script.sh, connection_string), um die Verbindungsdetails als Parameter zu übergeben.

Diese Methode ist etwas verworren, aber Sie können die Verschlüsselung für Datenbankverbindungen in Airflow beibehalten. Sie sollten auch in der Lage sein, diese Strategie in eine separate Operator-Klasse zu ziehen, die das Basisverhalten von PythonOperator erbt und die Logik für das Abrufen des Hooks und das Aufrufen des Bash-Skripts hinzufügt.

1
Matthijs Brouns

Das habe ich benutzt.

    def add_slack_token(ds, **kwargs):
        """"Add a slack token"""
        session = settings.Session()

        new_conn = Connection(conn_id='slack_token')
        new_conn.set_password(SLACK_LEGACY_TOKEN)

        if not (session.query(Connection).filter(Connection.conn_id == 
         new_conn.conn_id).first()):
        session.add(new_conn)
        session.commit()
        else:
            msg = '\n\tA connection with `conn_id`={conn_id} already exists\n'
            msg = msg.format(conn_id=new_conn.conn_id)
            print(msg)

    dag = DAG(
        'add_connections',
        default_args=default_args,
        schedule_interval="@once")


    t2 = PythonOperator(
        dag=dag,
        task_id='add_slack_token',
        python_callable=add_slack_token,
        provide_context=True,
    )
0
Don Bar

Sie können das Kennwort in den Luftstromvariablen speichern. https://airflow.incubator.Apache.org/ui.html#variable-view

  1. Erstellen Sie eine Variable mit Schlüssel und Wert in der Benutzeroberfläche, z. B. mypass: XXX
  2. Import Variable from airflow.models import Variable
  3. MyPass = Variable.get ("mypass")
  4. Übergeben Sie MyPass an Ihr Bash-Skript:
command = """
          echo "{{ params.my_param }}"
          """



task = BashOperator(
        task_id='templated',
        bash_command=command,
        params={'my_param': MyPass},
        dag=dag)
0
Chengzhi