Skip to content Skip to sidebar Skip to footer

Simplehttpoperator Airflow, Data Templated

I'm trying to rendered correctly data inside a SimpleHttpOperator in Airflow with configuration that I send via dag_run result = SimpleHttpOperator( task_id='schema_detecto

Solution 1:

I struggled a lot due to the same error. So, I created my own Operator (called as ExtendedHttpOperator) which is a combination of PythonOperator and SimpleHttpOperator. This worked for me :)

This operator receives a function where we can collect data passed from the API (using dag_run.conf), and parse it (if required) before passing it to an API.

from plugins.operators.extended_http_operator import ExtendedHttpOperator

testing_extend = ExtendedHttpOperator(
        task_id="process_user_ids",
        http_conn_id="user_api",
        endpoint="/kafka",
        headers={"Content-Type": "application/json"},
        data_fn=passing_data,
        op_kwargs={"api": "kafka"},
        method="POST",
        log_response=True,
        response_check=lambda response: Trueif validate_response(response) isTrueelseFalse,
    )

defpassing_data(**context):
    api = context["api"]
    dag_run_conf = context["dag_run"].conf
    return json.dumps(dag_run_conf[api])

defvalidate_response(res):
    if res.status_code == 200:
        returnTrueelse:
        returnFalse

Here is how you can add ExtendedHttpOperator to your airflow:

Put extended_http_operator.py file inside your_airflow_project/plugins/operators folder

# extended_http_operator.py filefrom airflow.operators.http_operator import SimpleHttpOperator
from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowException
from airflow.hooks.http_hook import HttpHook
from typing importOptional, Dict"""
Extend Simple Http Operator with a callable function to formulate data. This data function will
be able to access the context to retrieve data such as task instance. This allow us to write cleaner 
code rather than writing one long template line to formulate the json data.
"""classExtendedHttpOperator(SimpleHttpOperator):
    @apply_defaultsdef__init__(
        self,
        data_fn,
        log_response: bool = False,
        op_kwargs: Optional[Dict] = None,
        *args,
        **kwargs
    ):
        super(ExtendedHttpOperator, self).__init__(*args, **kwargs)
        ifnotcallable(data_fn):
            raise AirflowException("`data_fn` param must be callable")
        self.data_fn = data_fn
        self.context = None
        self.op_kwargs = op_kwargs or {}
        self.log_response = log_response

    defexecute(self, context):
        context.update(self.op_kwargs)
        self.context = context
        http = HttpHook(self.method, http_conn_id=self.http_conn_id)

        data_result = self.execute_callable(context)

        self.log.info("Calling HTTP method")
        self.log.info("Post Data: {}".format(data_result))
        response = http.run(
            self.endpoint, data_result, self.headers, self.extra_options
        )
        if self.log_response:
            self.log.info(response.text)
        if self.response_check:
            ifnot self.response_check(response):
                raise AirflowException("Invalid parameters")

    defexecute_callable(self, context):
        return self.data_fn(**context)

Dont forget to create empty __init__.py files inside plugins and plugins/operators folders.

Solution 2:

I couldn't find a solution. Only way I could do this, like passing information that I send over the --conf to the operator was adding a new PythonOperator that collect the info, and using then XCom on my SimpleHTTPOperator

Code

defgenerate_data(**kwargs):
    confs = kwargs['dag_run'].conf
    logging.info(confs)
    return {'url': confs["url"], 'fileType': confs["fileType"]}


with DAG(
    dag_id='airflow_http_operator',
    default_args=default_args,
    catchup=False,
    schedule_interval="@once",
    tags=['http']
) as dag:
    generate_dict = PythonOperator(
        task_id='generate_dict',
        python_callable=generate_data,
        provide_context=True
    )
    result = SimpleHttpOperator(
        task_id="schema_detector",
        http_conn_id='schema_detector',
        endpoint='api/schema/infer',
        method='PUT',
        headers={"Content-Type": "application/json"},
        data="{{ task_instance.xcom_pull(task_ids='generate_dict') |tojson}}",
        log_response=True,
        response_check=lambda response: response.ok,
        response_filter=lambda response: response.json())

Post a Comment for "Simplehttpoperator Airflow, Data Templated"