Skip to content

Commit 6c0459e

Browse files
authored
Merge pull request #79 from godatadriven/kg-deferrable-operators
Add deferrable operator examples
2 parents cd31429 + 8e40a9b commit 6c0459e

File tree

20 files changed

+268
-4
lines changed

20 files changed

+268
-4
lines changed

.whirl.env

+1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
AIRFLOW_VERSION=2.3.2
22
PYTHON_VERSION=3.9
33
AIRFLOW__API__AUTH_BACKEND=airflow.api.auth.backend.basic_auth
4+
MINIMAL_AIRFLOW_VERSION=2.2.5

docker/airflow-python/entrypoint.sh

+3-4
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ if [[ ${AIRFLOW_COMMAND} == "scheduler" || ${AIRFLOW_COMMAND} == "singlemachine"
1717
echo "y" | airflow db reset
1818
airflow users create --username admin --password admin --firstname Anonymous --lastname Admin --role Admin --email admin@example.org
1919
else
20-
if [[ ${AIRFLOW_COMMAND} == "webserver" ]]; then
20+
if [[ ${AIRFLOW_COMMAND} == "webserver" || ${AIRFLOW_COMMAND} == "triggerer" ]]; then
2121
echo "wait a bit more to let the scheduler do the database reset."
22-
sleep 15
22+
sleep 30
2323
fi
2424
fi
2525

@@ -41,8 +41,7 @@ done
4141

4242
if [[ ${AIRFLOW_COMMAND} == "singlemachine" ]]; then
4343
nohup /entrypoint scheduler -D &
44-
# echo "wait a while for the scheduler to be started"
45-
# sleep 15
44+
nohup /entrypoint triggerer -D &
4645
/entrypoint webserver -p 5000
4746
else
4847
/entrypoint "${@}"
+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# postgres env vars
2+
POSTGRES_HOST=postgresdb
3+
POSTGRES_PORT=5432
4+
POSTGRES_PASSWORD=pAssw0rd
5+
POSTGRES_USER=airflow
6+
POSTGRES_DB=airflow
7+
8+
# Airflow variables
9+
AIRFLOW__CORE__EXPOSE_CONFIG=True
10+
AIRFLOW__WEBSERVER__EXPOSE_CONFIG=True
11+
AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=False
12+
AIRFLOW__CORE__LOAD_EXAMPLES=False
13+
AIRFLOW__CORE__FERNET_KEY=W5gmA+dp84hkZEzpxPw4LTmhbXA1uVxKZsgIfay8wno=
14+
AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@${POSTGRES_HOST}:${POSTGRES_PORT}/${POSTGRES_DB}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
version: '3'
2+
3+
services:
4+
webserver:
5+
image: docker-whirl-airflow:py-${PYTHON_VERSION}-local
6+
command: ["webserver", "-p", "5000"]
7+
ports:
8+
- '5000:5000' # HTTP (Airflow Web UI)
9+
env_file:
10+
- .whirl.env
11+
environment:
12+
- WHIRL_SETUP_FOLDER
13+
- AIRFLOW__API__AUTH_BACKEND
14+
volumes:
15+
- ${DAG_FOLDER}:/opt/airflow/dags
16+
- ${ENVIRONMENT_FOLDER}/whirl.setup.d:${WHIRL_SETUP_FOLDER}/env.d/
17+
- ${DAG_FOLDER}/whirl.setup.d:${WHIRL_SETUP_FOLDER}/dag.d/
18+
depends_on:
19+
- mockserver
20+
- postgresdb
21+
22+
scheduler:
23+
image: docker-whirl-airflow:py-${PYTHON_VERSION}-local
24+
command: ["scheduler"]
25+
env_file:
26+
- .whirl.env
27+
environment:
28+
- WHIRL_SETUP_FOLDER
29+
- AIRFLOW__API__AUTH_BACKEND
30+
volumes:
31+
- ${DAG_FOLDER}:/opt/airflow/dags
32+
- ${ENVIRONMENT_FOLDER}/whirl.setup.d:${WHIRL_SETUP_FOLDER}/env.d/
33+
- ${DAG_FOLDER}/whirl.setup.d:${WHIRL_SETUP_FOLDER}/dag.d/
34+
depends_on:
35+
- mockserver
36+
- postgresdb
37+
38+
triggerer:
39+
image: docker-whirl-airflow:py-${PYTHON_VERSION}-local
40+
command: ["triggerer"]
41+
env_file:
42+
- .whirl.env
43+
environment:
44+
- WHIRL_SETUP_FOLDER
45+
- AIRFLOW__API__AUTH_BACKEND
46+
volumes:
47+
- ${DAG_FOLDER}:/opt/airflow/dags
48+
- ${ENVIRONMENT_FOLDER}/whirl.setup.d:${WHIRL_SETUP_FOLDER}/env.d/
49+
- ${DAG_FOLDER}/whirl.setup.d:${WHIRL_SETUP_FOLDER}/dag.d/
50+
depends_on:
51+
- mockserver
52+
- postgresdb
53+
54+
55+
mockserver:
56+
image: jamesdbloom/mockserver:mockserver-5.13.2
57+
ports:
58+
- 1080:1080
59+
- 1081:1081
60+
environment:
61+
- LOG_LEVEL=ERROR
62+
- SERVER_PORT=1080,1081
63+
64+
postgresdb:
65+
image: postgres:13
66+
ports:
67+
- 5432:5432
68+
environment:
69+
- POSTGRES_HOST=postgresdb
70+
- POSTGRES_PORT
71+
- POSTGRES_PASSWORD
72+
- POSTGRES_USER
73+
- POSTGRES_DB
74+
75+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
.gitkeep
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
whirl.setup.d
2+
.whirl.env
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
WHIRL_ENVIRONMENT=airflow-with-mockserver
2+
AIRFLOW__LOGGING__LOGGING_LEVEL=DEBUG
3+
MINIMAL_AIRFLOW_VERSION=2.3.0

examples/airflow-deferrable-operator-custom/custom/__init__.py

Whitespace-only changes.

examples/airflow-deferrable-operator-custom/custom/operators/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
from datetime import timedelta
2+
3+
from airflow.sensors.base import BaseSensorOperator
4+
5+
from custom.triggers.api_check_trigger import ApiCheckTrigger
6+
7+
8+
class WaitForStartedStatusSensor(BaseSensorOperator):
9+
def execute(self, context):
10+
self.defer(trigger=ApiCheckTrigger(url="http://mockserver:1080/testapi", expected_status="Started"), method_name="execute_complete", timeout=timedelta(minutes=2))
11+
12+
def execute_complete(self, context, event=None):
13+
# We have no more work to do here. Mark as complete.
14+
print(event)
15+
return

examples/airflow-deferrable-operator-custom/custom/triggers/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import aiohttp
2+
import asyncio
3+
from typing import Any, Dict, Tuple
4+
5+
from airflow.triggers.base import BaseTrigger, TriggerEvent
6+
7+
class ApiCheckTrigger(BaseTrigger):
8+
"""
9+
A trigger that calls a Rest API until a certain status is returned.
10+
"""
11+
12+
def __init__(self, url:str, expected_status: int):
13+
super().__init__()
14+
self.url = url
15+
self.expected_status = expected_status
16+
17+
def serialize(self) -> Tuple[str, Dict[str, Any]]:
18+
return ("custom.triggers.api_check_trigger.ApiCheckTrigger", {"url": self.url, "expected_status": self.expected_status})
19+
20+
async def run(self):
21+
"""
22+
Simple time delay loop until the relevant status is found.
23+
"""
24+
current_status = "Init"
25+
while self.expected_status != current_status:
26+
await asyncio.sleep(1)
27+
async with aiohttp.ClientSession() as session:
28+
async with session.get(self.url) as response:
29+
api_call_result = await response.json()
30+
current_status = api_call_result["status"]
31+
32+
# Send our single event and then we're done
33+
yield TriggerEvent(api_call_result)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
"""
2+
Example DAG demonstrating ``TimeDeltaSensorAsync``, a drop in replacement for ``TimeDeltaSensor`` that
3+
defers and doesn't occupy a worker slot while it waits
4+
"""
5+
6+
from datetime import timedelta, datetime
7+
8+
from airflow import DAG
9+
from airflow.operators.empty import EmptyOperator
10+
from airflow.sensors.time_delta import TimeDeltaSensorAsync
11+
12+
from custom.operators.api_check_operator import WaitForStartedStatusSensor
13+
14+
with DAG(
15+
dag_id="example_custom_sensor_async",
16+
schedule_interval=None,
17+
start_date=datetime.now() - timedelta(minutes=20),
18+
catchup=False
19+
) as dag:
20+
21+
wait = WaitForStartedStatusSensor(task_id="wait")
22+
23+
finish = EmptyOperator(task_id="finish")
24+
wait >> finish

examples/airflow-deferrable-operator-custom/whirl.setup.d/.gitkeep

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
#!/usr/bin/env bash
2+
3+
echo "============================="
4+
echo "== Configure API Responses =="
5+
echo "============================="
6+
7+
# Creating a expectation for our mockserver to respond to a specific api rest call with a pendig status for the first 30 invocations
8+
# For docs on creating expectations see: http://www.mock-server.com/mock_server/creating_expectations.html
9+
curl -v -X PUT "http://mockserver:1080/mockserver/expectation" -d '{
10+
"id": "pending_expectation",
11+
"httpRequest": {
12+
"path": "/testapi"
13+
},
14+
"httpResponse": {
15+
"statusCode": 200,
16+
"headers": {
17+
"content-type": [
18+
"application/json"
19+
]
20+
},
21+
"body": {
22+
"type": "JSON",
23+
"json": "{\"status\": \"Pending\"}"
24+
}
25+
},
26+
"times": {
27+
"remainingTimes": 30,
28+
"unlimited": false
29+
},
30+
"timeToLive": {
31+
"unlimited": true
32+
},
33+
"priority": 10
34+
}'
35+
36+
curl -v -X PUT "http://mockserver:1080/mockserver/expectation" -d '{
37+
"id": "started_expectation",
38+
"httpRequest": {
39+
"path": "/testapi"
40+
},
41+
"httpResponse": {
42+
"statusCode": 200,
43+
"headers": {
44+
"content-type": [
45+
"application/json"
46+
]
47+
},
48+
"body": {
49+
"type": "JSON",
50+
"json": "{\"status\": \"Started\"}"
51+
}
52+
},
53+
"priority": 0
54+
}'
55+
56+
pip install aiohttp
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
whirl.setup.d
2+
.whirl.env
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
WHIRL_ENVIRONMENT=just-airflow
2+
AIRFLOW__LOGGING__LOGGING_LEVEL=DEBUG
3+
MINIMAL_AIRFLOW_VERSION=2.3.0
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
"""
2+
Example DAG demonstrating ``TimeDeltaSensorAsync``, a drop in replacement for ``TimeDeltaSensor`` that
3+
defers and doesn't occupy a worker slot while it waits
4+
"""
5+
6+
from datetime import timedelta, datetime
7+
8+
from airflow import DAG
9+
from airflow.operators.empty import EmptyOperator
10+
from airflow.sensors.time_delta import TimeDeltaSensorAsync
11+
12+
with DAG(
13+
dag_id="example_time_delta_sensor_async",
14+
schedule_interval=None,
15+
start_date=datetime.now() - timedelta(minutes=20),
16+
catchup=False
17+
) as dag:
18+
19+
wait = TimeDeltaSensorAsync(task_id="wait", delta=timedelta(seconds=120))
20+
21+
finish = EmptyOperator(task_id="finish")
22+
wait >> finish

examples/airflow-deferrable-operator/whirl.setup.d/.gitkeep

Whitespace-only changes.

whirl

+14
Original file line numberDiff line numberDiff line change
@@ -227,9 +227,23 @@ check_dagrun_result() {
227227
fi
228228
}
229229

230+
function version_lt() {
231+
local VER1="${1}"
232+
local VER2="${2}"
233+
test "$(echo "${VER1} ${VER2}" | tr " " "\n" | sort -rV | head -n 1)" != "${VER1}";
234+
}
235+
230236
start() {
231237
echo "Starting airflow local run for environment ${WHIRL_ENVIRONMENT}"
232238

239+
if [ "${CI_MODE}" == true ]; then
240+
if version_lt "${AIRFLOW_VERSION}" "${MINIMAL_AIRFLOW_VERSION}"; then
241+
echo "${AIRFLOW_VERSION} less then minimal version ${MINIMAL_AIRFLOW_VERSION}";
242+
echo "Skipping running in CI MODE....";
243+
exit 0;
244+
fi
245+
fi
246+
233247
# Possible cleanup before starting
234248
run_compose_setup_scripts
235249

0 commit comments

Comments
 (0)