由 SuKai August 7, 2021
Prefect Core是一种新型的工作流管理工具,使得构建数据pipeline非常容易,并且能轻松添加重试、日志、动态映射、缓存、失败告警以及更多的附加功能。
为什么需要工作流
当你的代码按照预期运行,你可能甚至不需要工作流框架。我们将只用编写实现业务逻辑开发代码视为支持业务的正向工程实践。只有当出现问题时候,一个类似Prefect的系统的价值才会凸显。代码掺杂业务目标和成功失败稳定性保证的是业务负向工程实践。从这个角度看,工作流框架实际上是风险管理工具,像保险,需要的时候就在那里,不需要的时候看不到。
为什么选择Prefect
Prefect将代码转化成一个健壮的,分布式的pipeline。开发者能继续使用已有工具、语言、基础结构和脚本。Prefect按照支持业务正向工程实践的原则,支持丰富的DAG结构,并且不会阻碍业务。开发者可以通过少量的函数式钩子和功能API就能转化脚本,或者你可以直接访问延迟的计算图,或者任何组合。
Prefect Agent
Prefect Agent负责执行Prefect工作流并进行监控,Prefect支持本地、Docker、K8S等Agent。Kubernetes Agent负责创建Kubernetes Job来执行Prefect工作流。
代码示例
一个简单的流水线,获取数据,训练模型两个任务。工作流名称:train-wine-quality-model,归属于项目wine-quality-project,Storage保存到S3,运行结果信息保存到S3,使用IntervalSchedule调度器,间隔2分钟任务调度一次。
from prefect import task, Flow, Parameter, Client
from prefect.run_configs import KubernetesRun
from prefect.schedules import IntervalSchedule
from prefect.storage import S3
from prefect.engine.results.s3_result import S3Result
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.linear_model import ElasticNet
from datetime import timedelta
import numpy as np
import pandas as pd
import mlflow
import requests
import os
os.environ["AWS_ACCESS_KEY_ID"] = "mlflow"
os.environ["AWS_SECRET_ACCESS_KEY"] = "sukai"
os.environ["MLFLOW_S3_ENDPOINT_URL"] = f"http://s3.platform.sukai.com/"
os.environ["PREFECT__USER_CONFIG_PATH"] = "/home/jovyan/ai-demo/prefect/config.toml"
os.environ["PREFECT__CONTEXT__SECRETS__AWS_CREDENTIALS"] = '{"ACCESS_KEY": "mlflow", "SECRET_ACCESS_KEY": "sukai"}'
def eval_metrics(actual, pred):
rmse = np.sqrt(mean_squared_error(actual, pred))
mae = mean_absolute_error(actual, pred)
r2 = r2_score(actual, pred)
return rmse, mae, r2
@task
def fetch_data():
csv_url = "http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv"
data = pd.read_csv(csv_url, sep=";")
return data
@task
def train_model(data, mlflow_experiment_id, alpha=0.5, l1_ratio=0.5):
mlflow.set_tracking_uri(f'http://mlflow.platform.sukai.com/')
train, test = train_test_split(data)
# The predicted column is "quality" which is a scalar from [3, 9]
train_x = train.drop(["quality"], axis=1)
test_x = test.drop(["quality"], axis=1)
train_y = train[["quality"]]
test_y = test[["quality"]]
with mlflow.start_run(experiment_id=mlflow_experiment_id):
lr = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42)
lr.fit(train_x, train_y)
predicted_qualities = lr.predict(test_x)
(rmse, mae, r2) = eval_metrics(test_y, predicted_qualities)
print("Elasticnet model (alpha=%f, l1_ratio=%f):" % (alpha, l1_ratio))
print(" RMSE: %s" % rmse)
print(" MAE: %s" % mae)
print(" R2: %s" % r2)
mlflow.log_param("alpha", alpha)
mlflow.log_param("l1_ratio", l1_ratio)
mlflow.log_metric("rmse", rmse)
mlflow.log_metric("r2", r2)
mlflow.log_metric("mae", mae)
mlflow.sklearn.log_model(lr, "model")
domain = "platform.sukai.com"
prefect_url = f"http://apollo.{domain}/graphql"
prefect_project_name = "wine-quality-project"
docker_image = "prefecthq/prefect:0.15.9-python3.8-sukai20"
def create_prefect_flow():
run_config = KubernetesRun(
#labels=[""],
service_account_name="prefect-serviceaccount",
image=docker_image,
env={
"PREFECT__CONTEXT__SECRETS__AWS_CREDENTIALS": '{"ACCESS_KEY": "mlflow", "SECRET_ACCESS_KEY": "sukai"}',
"AWS_ACCESS_KEY_ID": "mlflow",
"AWS_SECRET_ACCESS_KEY": "sukai",
"MLFLOW_S3_ENDPOINT_URL": "http://s3.platform.sukai.com/",
"MLFLOW_TRACKING_URI": "http://mlflow.platform.sukai.com/"
}
)
storage = S3(
bucket="prefect-sukai",
client_options=dict(endpoint_url=os.getenv("MLFLOW_S3_ENDPOINT_URL")),
)
result = S3Result(
bucket="prefect-sukai",
boto3_kwargs=dict(
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
endpoint_url=os.getenv("MLFLOW_S3_ENDPOINT_URL"),
),
)
prefect_client = Client()
schedule = IntervalSchedule(interval=timedelta(minutes=2))
with Flow("train-wine-quality-model", schedule, storage=storage, result=result, run_config=run_config) as flow:
alpha = Parameter('alpha', default=0.3)
l1_ratio = Parameter('l1_ratio', default=0.3)
data = fetch_data()
train_model(data=data, mlflow_experiment_id=4, alpha=alpha, l1_ratio=l1_ratio)
training_flow_id = prefect_client.register(flow, project_name=prefect_project_name)
create_prefect_flow()
部署Prefect
开启Postgresql持久化,开启Prefect UI/API的ingress,创建一个租户帐号。
sukai@sukai:/mnt/d/04.github/ai/server/helm/prefect-server$ cat values.yaml
postgresql:
persistence:
enabled: true
size: 80Gi
apollo:
ingress:
enabled: true
annotations:
kubernetes.io/ingress.class: traefik
hosts:
- apollo.platform.sukai.com
ui:
ingress:
enabled: true
annotations:
kubernetes.io/ingress.class: traefik
hosts:
- prefect.platform.sukai.com
path: /
jobs:
createTenant:
enabled: true
tenant:
name: default
slug: default
sukai@sukai:/mnt/d/04.github/ai/server/helm/prefect-server$
sukai@sukai:/mnt/d/04.github/ai/server$ helm install -n ai prefect helm/prefect-server
WARNING: Kubernetes configuration file is group-readable. This is insecure. Location: /home/sukai/.kube/config
WARNING: Kubernetes configuration file is world-readable. This is insecure. Location: /home/sukai/.kube/config
W1117 10:55:29.177636 588 warnings.go:70] spec.template.spec.initContainers[0].env[8].name: duplicate name "PREFECT_SERVER__HASURA__HOST"
W1117 10:55:29.177657 588 warnings.go:70] spec.template.spec.containers[0].env[9].name: duplicate name "PREFECT_SERVER__HASURA__HOST"
NAME: prefect
LAST DEPLOYED: Wed Nov 17 10:55:28 2021
NAMESPACE: ai
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
#1 Run the following command to get the UI URL:
UI_HOST=$( \
kubectl get svc \
--namespace ai \
--template "{{ range (index .status.loadBalancer.ingress 0) }}{{.}}{{ end }}" \
prefect-ui \
) \
&& echo "UI available at: http://$UI_HOST:8080"
NOTE: It may take a few minutes for the LoadBalancer IP to be available.
You can watch the status of by running: kubectl get --namespace ai svc -w prefect-ui
#2 Run the following command to get the Apollo GraphQL API URL:
API_HOST=$( \
kubectl get svc \
--namespace ai \
--template "{{ range (index .status.loadBalancer.ingress 0) }}{{.}}{{ end }}" \
prefect-apollo \
) \
&& echo "API available at: http://$API_HOST:4200/graphql"
#3 The UI has been configured to point to 'http://localhost:4200/graphql' by default.
- The API location you retrieved in #2 should match this url
- The default can be changed in the helm deployment at 'ui.apolloApiUrl' if it is incorrect
- The location can also be changed within the UI itself per user
- The API must be accessible from the user's machine for the UI to work
修改Prefect Server GraphQL endpoint
创建项目
执行代码
客户端配置Prefect
backend = "server"
[server]
host = "http://apollo.platform.sukai.com"
port = "80"
endpoint = "${server.host}:${server.port}"
(base) jovyan@jupyter-0:~/ai-demo/prefect$ pip install "prefect[kubernetes]" -i https://pypi.tuna.tsinghua.edu.cn/simple
(base) jovyan@jupyter-0:~/ai-demo/prefect$ python wine-quality-pipeline.py
[2021-11-18 03:05:51+0000] INFO - prefect.S3 | Uploading train-wine-quality-model/2021-11-18t03-05-51-561252-00-00 to prefect-sukai
Flow URL: http://localhost:8080/default/flow/1b3c08e5-3405-432d-8f68-d895769d7ea4
└── ID: 5aac74d0-72c1-4d75-9197-bcc0da440618
└── Project: wine-quality-project
└── Labels: []
查看Prefect
查看MLflow实验跟踪记录
查看Kubernetes中Pod
sukai@sukai:/mnt/d/04.github/ai/server$ kubectl -n ai describe pod prefect-job-6b3a0a1e--1-86gcv
Name: prefect-job-6b3a0a1e--1-86gcv
Namespace: ai
Priority: 0
Node: hello-precision-7920-rack/192.168.0.244
Start Time: Thu, 18 Nov 2021 10:16:00 +0800
Labels: controller-uid=a6a962f8-0bdc-4856-9524-fd5374151f30
job-name=prefect-job-6b3a0a1e
prefect.io/flow_id=f73b8cf6-e300-42e7-9046-6823e300dc99
prefect.io/flow_run_id=5cc44aed-85b5-47fb-9504-0d434974fe53
prefect.io/identifier=6b3a0a1e
Annotations: cni.projectcalico.org/podIP:
cni.projectcalico.org/podIPs:
Status: Succeeded
IP: 10.244.135.61
IPs:
IP: 10.244.135.61
Controlled By: Job/prefect-job-6b3a0a1e
Containers:
flow:
Container ID: docker://88295349196e5c4f1ea27f9e2e4af4d4b830a24719c0ad600e2019a3dbc45d7f
Image: prefecthq/prefect:0.15.9-python3.8-sukai20
Image ID: docker://sha256:a2261c36a690a10c544c3b059bf4e55625b4593da3b2806940cf93005f9d230e
Port: <none>
Host Port: <none>
Args:
prefect
execute
flow-run
State: Terminated
Reason: Completed
Exit Code: 0
Started: Thu, 18 Nov 2021 10:16:01 +0800
Finished: Thu, 18 Nov 2021 10:16:08 +0800
Ready: False
Restart Count: 0
Environment:
PREFECT__LOGGING__LEVEL: INFO
AWS_ACCESS_KEY_ID: mlflow
MLFLOW_TRACKING_URI: http://mlflow.platform.sukai.com/
AWS_SECRET_ACCESS_KEY: sukai
MLFLOW_S3_ENDPOINT_URL: http://s3.platform.sukai.com/
PREFECT__CONTEXT__SECRETS__AWS_CREDENTIALS: {"ACCESS_KEY": "mlflow", "SECRET_ACCESS_KEY": "sukai"}
PREFECT__BACKEND: server
PREFECT__CLOUD__AGENT__LABELS: []
PREFECT__CLOUD__API: http://prefect-apollo.ai:4200/graphql
PREFECT__CLOUD__AUTH_TOKEN:
PREFECT__CLOUD__API_KEY:
PREFECT__CLOUD__TENANT_ID:
PREFECT__CLOUD__USE_LOCAL_SECRETS: false
PREFECT__CLOUD__SEND_FLOW_RUN_LOGS: true
PREFECT__CONTEXT__FLOW_RUN_ID: 5cc44aed-85b5-47fb-9504-0d434974fe53
PREFECT__CONTEXT__FLOW_ID: f73b8cf6-e300-42e7-9046-6823e300dc99
PREFECT__CONTEXT__IMAGE: prefecthq/prefect:0.15.9-python3.8-sukai20
PREFECT__ENGINE__FLOW_RUNNER__DEFAULT_CLASS: prefect.engine.cloud.CloudFlowRunner
PREFECT__ENGINE__TASK_RUNNER__DEFAULT_CLASS: prefect.engine.cloud.CloudTaskRunner
PREFECT__LOGGING__LOG_TO_CLOUD: true
Mounts:
/var/run/secrets/kubernetes.io/serviceaccount from kube-api-access-pvgjb (ro)
Conditions:
Type Status
Initialized True
Ready False
ContainersReady False
PodScheduled True
Volumes:
kube-api-access-pvgjb:
Type: Projected (a volume that contains injected data from multiple sources)
TokenExpirationSeconds: 3607
ConfigMapName: kube-root-ca.crt
ConfigMapOptional: <nil>
DownwardAPI: true
QoS Class: BestEffort
Node-Selectors: <none>
Tolerations: node.kubernetes.io/not-ready:NoExecute op=Exists for 300s
node.kubernetes.io/unreachable:NoExecute op=Exists for 300s
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal Scheduled 26s default-scheduler Successfully assigned ai/prefect-job-6b3a0a1e--1-86gcv to hello-precision-7920-rack
Normal Pulled 25s kubelet Container image "prefecthq/prefect:0.15.9-python3.8-sukai20" already present on machine
Normal Created 25s kubelet Created container flow
Normal Started 25s kubelet Started container flow
sukai@sukai:/mnt/d/04.github/ai/server$ kubectl -n ai logs prefect-job-6b3a0a1e--1-86gcv
[2021-11-18 02:16:02+0000] INFO - prefect.S3 | Downloading flow from s3://prefect-sukai/train-wine-quality-model/2021-11-18t02-15-04-448956-00-00
[2021-11-18 02:16:02+0000] INFO - prefect.S3 | Flow successfully downloaded. ETag: "b5bc762bf4a73a8298b6a61294e58f2e", LastModified: 2021-11-18T02:15:04+00:00, VersionId: None
[2021-11-18 02:16:03+0000] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'train-wine-quality-model'
[2021-11-18 02:16:03+0000] INFO - prefect.CloudTaskRunner | Task 'fetch_data': Starting task run...
[2021-11-18 02:16:05+0000] INFO - prefect.CloudTaskRunner | Task 'fetch_data': Finished task run for task with final state: 'Success'
[2021-11-18 02:16:05+0000] INFO - prefect.CloudTaskRunner | Task 'train_model': Starting task run...
Elasticnet model (alpha=0.300000, l1_ratio=0.300000):
RMSE: 0.7138934813010738
MAE: 0.5774349731953587
R2: 0.21323931512613703
[2021-11-18 02:16:07+0000] INFO - prefect.CloudTaskRunner | Task 'train_model': Finished task run for task with final state: 'Success'
[2021-11-18 02:16:07+0000] INFO - prefect.CloudFlowRunner | Flow run SUCCESS: all reference tasks succeeded