Wir verwenden den Luftstrom als Terminplaner. Ich möchte einen einfachen Bash-Operator in einer DAG aufrufen. Das Bash-Skript benötigt ein Kennwort als Argument für die weitere Verarbeitung.
Wie kann ich ein Passwort sicher im airflow (config/variables/connection) speichern und in der dag-Definitionsdatei darauf zugreifen?
Ich bin neu in Airflow und Python, daher wird ein Codeausschnitt geschätzt.
Sie können das Kennwort in einem Hook speichern - dieses wird verschlüsselt, solange Sie Ihren Fernet-Schlüssel eingerichtet haben.
So erstellen Sie eine Verbindung.
from airflow.models import Connection
def create_conn(username, password, Host=None):
new_conn = Connection(conn_id=f'{username}_connection',
login=username,
Host=host if Host else None)
new_conn.set_password(password)
Dieses Kennwort wird dann in der von Ihnen eingerichteten Datenbank verschlüsselt.
Zugriff auf dieses Passwort:
from airflow.hooks.base_hook import BaseHook
connection = BaseHook.get_connection("username_connection")
password = connection.password # This is a getter that returns the unencrypted password.
BEARBEITEN:
Es gibt einen einfacheren Weg, eine Verbindung über die Benutzeroberfläche herzustellen:
In diesem Fall würde ich einen PythonOperator verwenden, von dem aus Sie eine Hook
für Ihre Datenbankverbindung mit hook = PostgresHook(postgres_conn_id=postgres_conn_id)
erhalten können. Sie können dann get_connection
über diesen Hook aufrufen, um ein Verbindungsobjekt zu erhalten, über das Sie den Host, das Login und das Kennwort für Ihre Datenbankverbindung erhalten können.
Verwenden Sie schließlich subprocess.call(your_script.sh, connection_string)
, um die Verbindungsdetails als Parameter zu übergeben.
Diese Methode ist etwas verworren, aber Sie können die Verschlüsselung für Datenbankverbindungen in Airflow beibehalten. Sie sollten auch in der Lage sein, diese Strategie in eine separate Operator-Klasse zu ziehen, die das Basisverhalten von PythonOperator erbt und die Logik für das Abrufen des Hooks und das Aufrufen des Bash-Skripts hinzufügt.
Das habe ich benutzt.
def add_slack_token(ds, **kwargs):
""""Add a slack token"""
session = settings.Session()
new_conn = Connection(conn_id='slack_token')
new_conn.set_password(SLACK_LEGACY_TOKEN)
if not (session.query(Connection).filter(Connection.conn_id ==
new_conn.conn_id).first()):
session.add(new_conn)
session.commit()
else:
msg = '\n\tA connection with `conn_id`={conn_id} already exists\n'
msg = msg.format(conn_id=new_conn.conn_id)
print(msg)
dag = DAG(
'add_connections',
default_args=default_args,
schedule_interval="@once")
t2 = PythonOperator(
dag=dag,
task_id='add_slack_token',
python_callable=add_slack_token,
provide_context=True,
)
Sie können das Kennwort in den Luftstromvariablen speichern. https://airflow.incubator.Apache.org/ui.html#variable-view
from airflow.models import Variable
command = """
echo "{{ params.my_param }}"
"""
task = BashOperator(
task_id='templated',
bash_command=command,
params={'my_param': MyPass},
dag=dag)