Simplehttpoperator Airflow, Data Templated
Solution 1:
I struggled a lot due to the same error. So, I created my own Operator (called as ExtendedHttpOperator) which is a combination of PythonOperator and SimpleHttpOperator. This worked for me :)
This operator receives a function where we can collect data passed from the API (using dag_run.conf), and parse it (if required) before passing it to an API.
from plugins.operators.extended_http_operator import ExtendedHttpOperator
testing_extend = ExtendedHttpOperator(
task_id="process_user_ids",
http_conn_id="user_api",
endpoint="/kafka",
headers={"Content-Type": "application/json"},
data_fn=passing_data,
op_kwargs={"api": "kafka"},
method="POST",
log_response=True,
response_check=lambda response: Trueif validate_response(response) isTrueelseFalse,
)
defpassing_data(**context):
api = context["api"]
dag_run_conf = context["dag_run"].conf
return json.dumps(dag_run_conf[api])
defvalidate_response(res):
if res.status_code == 200:
returnTrueelse:
returnFalse
Here is how you can add ExtendedHttpOperator to your airflow:
Put extended_http_operator.py file inside your_airflow_project/plugins/operators folder
# extended_http_operator.py filefrom airflow.operators.http_operator import SimpleHttpOperator
from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowException
from airflow.hooks.http_hook import HttpHook
from typing importOptional, Dict"""
Extend Simple Http Operator with a callable function to formulate data. This data function will
be able to access the context to retrieve data such as task instance. This allow us to write cleaner
code rather than writing one long template line to formulate the json data.
"""classExtendedHttpOperator(SimpleHttpOperator):
@apply_defaultsdef__init__(
self,
data_fn,
log_response: bool = False,
op_kwargs: Optional[Dict] = None,
*args,
**kwargs
):
super(ExtendedHttpOperator, self).__init__(*args, **kwargs)
ifnotcallable(data_fn):
raise AirflowException("`data_fn` param must be callable")
self.data_fn = data_fn
self.context = None
self.op_kwargs = op_kwargs or {}
self.log_response = log_response
defexecute(self, context):
context.update(self.op_kwargs)
self.context = context
http = HttpHook(self.method, http_conn_id=self.http_conn_id)
data_result = self.execute_callable(context)
self.log.info("Calling HTTP method")
self.log.info("Post Data: {}".format(data_result))
response = http.run(
self.endpoint, data_result, self.headers, self.extra_options
)
if self.log_response:
self.log.info(response.text)
if self.response_check:
ifnot self.response_check(response):
raise AirflowException("Invalid parameters")
defexecute_callable(self, context):
return self.data_fn(**context)
Dont forget to create empty __init__.py
files inside plugins
and plugins/operators
folders.
Solution 2:
I couldn't find a solution. Only way I could do this, like passing information that I send over the --conf to the operator was adding a new PythonOperator that collect the info, and using then XCom on my SimpleHTTPOperator
Code
defgenerate_data(**kwargs):
confs = kwargs['dag_run'].conf
logging.info(confs)
return {'url': confs["url"], 'fileType': confs["fileType"]}
with DAG(
dag_id='airflow_http_operator',
default_args=default_args,
catchup=False,
schedule_interval="@once",
tags=['http']
) as dag:
generate_dict = PythonOperator(
task_id='generate_dict',
python_callable=generate_data,
provide_context=True
)
result = SimpleHttpOperator(
task_id="schema_detector",
http_conn_id='schema_detector',
endpoint='api/schema/infer',
method='PUT',
headers={"Content-Type": "application/json"},
data="{{ task_instance.xcom_pull(task_ids='generate_dict') |tojson}}",
log_response=True,
response_check=lambda response: response.ok,
response_filter=lambda response: response.json())
Post a Comment for "Simplehttpoperator Airflow, Data Templated"