机器学习工作流

SuKai August 7, 2021

Prefect Core是一种新型的工作流管理工具,使得构建数据pipeline非常容易,并且能轻松添加重试、日志、动态映射、缓存、失败告警以及更多的附加功能。

为什么需要工作流

当你的代码按照预期运行,你可能甚至不需要工作流框架。我们将只用编写实现业务逻辑开发代码视为支持业务的正向工程实践。只有当出现问题时候,一个类似Prefect的系统的价值才会凸显。代码掺杂业务目标和成功失败稳定性保证的是业务负向工程实践。从这个角度看,工作流框架实际上是风险管理工具,像保险,需要的时候就在那里,不需要的时候看不到。

为什么选择Prefect

Prefect将代码转化成一个健壮的,分布式的pipeline。开发者能继续使用已有工具、语言、基础结构和脚本。Prefect按照支持业务正向工程实践的原则,支持丰富的DAG结构,并且不会阻碍业务。开发者可以通过少量的函数式钩子和功能API就能转化脚本,或者你可以直接访问延迟的计算图,或者任何组合。

Prefect Agent

Prefect Agent负责执行Prefect工作流并进行监控,Prefect支持本地、Docker、K8S等Agent。Kubernetes Agent负责创建Kubernetes Job来执行Prefect工作流。

代码示例

一个简单的流水线,获取数据,训练模型两个任务。工作流名称:train-wine-quality-model,归属于项目wine-quality-project,Storage保存到S3,运行结果信息保存到S3,使用IntervalSchedule调度器,间隔2分钟任务调度一次。

from prefect import task, Flow, Parameter, Client
from prefect.run_configs import KubernetesRun
from prefect.schedules import IntervalSchedule
from prefect.storage import S3
from prefect.engine.results.s3_result import S3Result

from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.linear_model import ElasticNet

from datetime import timedelta

import numpy as np
import pandas as pd

import mlflow
import requests

import os
os.environ["AWS_ACCESS_KEY_ID"] = "mlflow"
os.environ["AWS_SECRET_ACCESS_KEY"] = "sukai"
os.environ["MLFLOW_S3_ENDPOINT_URL"] = f"http://s3.platform.sukai.com/"
os.environ["PREFECT__USER_CONFIG_PATH"] = "/home/jovyan/ai-demo/prefect/config.toml"
os.environ["PREFECT__CONTEXT__SECRETS__AWS_CREDENTIALS"] = '{"ACCESS_KEY": "mlflow", "SECRET_ACCESS_KEY": "sukai"}'

def eval_metrics(actual, pred):
    rmse = np.sqrt(mean_squared_error(actual, pred))
    mae = mean_absolute_error(actual, pred)
    r2 = r2_score(actual, pred)
    return rmse, mae, r2

@task
def fetch_data():
    csv_url = "http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv"
    data = pd.read_csv(csv_url, sep=";")
    return data
 
@task
def train_model(data, mlflow_experiment_id, alpha=0.5, l1_ratio=0.5):
    mlflow.set_tracking_uri(f'http://mlflow.platform.sukai.com/')
 
    train, test = train_test_split(data)
 
    # The predicted column is "quality" which is a scalar from [3, 9]
    train_x = train.drop(["quality"], axis=1)
    test_x = test.drop(["quality"], axis=1)
    train_y = train[["quality"]]
    test_y = test[["quality"]]
    
    with mlflow.start_run(experiment_id=mlflow_experiment_id):
        lr = ElasticNet(alpha=alpha, l1_ratio=l1_ratio, random_state=42)
        lr.fit(train_x, train_y)
        predicted_qualities = lr.predict(test_x)
        (rmse, mae, r2) = eval_metrics(test_y, predicted_qualities)
 
        print("Elasticnet model (alpha=%f, l1_ratio=%f):" % (alpha, l1_ratio))
        print("  RMSE: %s" % rmse)
        print("  MAE: %s" % mae)
        print("  R2: %s" % r2)
 
        mlflow.log_param("alpha", alpha)
        mlflow.log_param("l1_ratio", l1_ratio)
        mlflow.log_metric("rmse", rmse)
        mlflow.log_metric("r2", r2)
        mlflow.log_metric("mae", mae)
 
        mlflow.sklearn.log_model(lr, "model")
    
domain = "platform.sukai.com"
prefect_url = f"http://apollo.{domain}/graphql"

prefect_project_name = "wine-quality-project"
docker_image = "prefecthq/prefect:0.15.9-python3.8-sukai20"


def create_prefect_flow():
    run_config = KubernetesRun(
        #labels=[""],
        service_account_name="prefect-serviceaccount",
        image=docker_image,
        env={
            "PREFECT__CONTEXT__SECRETS__AWS_CREDENTIALS": '{"ACCESS_KEY": "mlflow", "SECRET_ACCESS_KEY": "sukai"}',
            "AWS_ACCESS_KEY_ID": "mlflow",
            "AWS_SECRET_ACCESS_KEY": "sukai",
            "MLFLOW_S3_ENDPOINT_URL": "http://s3.platform.sukai.com/",
            "MLFLOW_TRACKING_URI": "http://mlflow.platform.sukai.com/"
        }
    )
    
    storage = S3(
        bucket="prefect-sukai",
        client_options=dict(endpoint_url=os.getenv("MLFLOW_S3_ENDPOINT_URL")),
    )
    
    result = S3Result(
        bucket="prefect-sukai",
        boto3_kwargs=dict(
            aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
            aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY"),
            endpoint_url=os.getenv("MLFLOW_S3_ENDPOINT_URL"),
        ),
    )
    
    prefect_client = Client()
    schedule = IntervalSchedule(interval=timedelta(minutes=2))

    with Flow("train-wine-quality-model", schedule, storage=storage, result=result, run_config=run_config) as flow:
        alpha = Parameter('alpha', default=0.3)
        l1_ratio = Parameter('l1_ratio', default=0.3)
        data = fetch_data()
        train_model(data=data, mlflow_experiment_id=4, alpha=alpha, l1_ratio=l1_ratio)
    training_flow_id = prefect_client.register(flow, project_name=prefect_project_name)    

create_prefect_flow()

部署Prefect

开启Postgresql持久化,开启Prefect UI/API的ingress,创建一个租户帐号。

sukai@sukai:/mnt/d/04.github/ai/server/helm/prefect-server$ cat values.yaml
postgresql:
  persistence:
    enabled: true
    size: 80Gi
apollo:
  ingress:
    enabled: true
    annotations:
      kubernetes.io/ingress.class: traefik
    hosts:
      - apollo.platform.sukai.com
ui:
  ingress:
    enabled: true
    annotations:
      kubernetes.io/ingress.class: traefik
    hosts:
      - prefect.platform.sukai.com
    path: /

jobs:
  createTenant:
    enabled: true
    tenant:
      name: default
      slug: default

sukai@sukai:/mnt/d/04.github/ai/server/helm/prefect-server$
sukai@sukai:/mnt/d/04.github/ai/server$ helm install -n ai prefect helm/prefect-server
WARNING: Kubernetes configuration file is group-readable. This is insecure. Location: /home/sukai/.kube/config
WARNING: Kubernetes configuration file is world-readable. This is insecure. Location: /home/sukai/.kube/config
W1117 10:55:29.177636     588 warnings.go:70] spec.template.spec.initContainers[0].env[8].name: duplicate name "PREFECT_SERVER__HASURA__HOST"
W1117 10:55:29.177657     588 warnings.go:70] spec.template.spec.containers[0].env[9].name: duplicate name "PREFECT_SERVER__HASURA__HOST"
NAME: prefect
LAST DEPLOYED: Wed Nov 17 10:55:28 2021
NAMESPACE: ai
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
#1 Run the following command to get the UI URL:

  UI_HOST=$( \
    kubectl get svc \
    --namespace ai \
    --template "{{ range (index .status.loadBalancer.ingress 0) }}{{.}}{{ end }}" \
    prefect-ui \
  ) \
  && echo "UI available at: http://$UI_HOST:8080"

  NOTE: It may take a few minutes for the LoadBalancer IP to be available.
        You can watch the status of by running: kubectl get --namespace ai svc -w prefect-ui

#2 Run the following command to get the Apollo GraphQL API URL:

  API_HOST=$( \
    kubectl get svc \
    --namespace ai \
    --template "{{ range (index .status.loadBalancer.ingress 0) }}{{.}}{{ end }}" \
    prefect-apollo \
  ) \
  && echo "API available at: http://$API_HOST:4200/graphql"

#3 The UI has been configured to point to 'http://localhost:4200/graphql' by default.
  - The API location you retrieved in #2 should match this url
  - The default can be changed in the helm deployment at 'ui.apolloApiUrl' if it is incorrect
  - The location can also be changed within the UI itself per user
  - The API must be accessible from the user's machine for the UI to work

修改Prefect Server GraphQL endpoint

image-20211117112452473

image-20211117112532153

创建项目

image-20211118115354149

执行代码

客户端配置Prefect

backend = "server"
[server]
host = "http://apollo.platform.sukai.com"
port = "80"
endpoint = "${server.host}:${server.port}"
(base) jovyan@jupyter-0:~/ai-demo/prefect$ pip install "prefect[kubernetes]" -i https://pypi.tuna.tsinghua.edu.cn/simple
(base) jovyan@jupyter-0:~/ai-demo/prefect$ python wine-quality-pipeline.py 
[2021-11-18 03:05:51+0000] INFO - prefect.S3 | Uploading train-wine-quality-model/2021-11-18t03-05-51-561252-00-00 to prefect-sukai
Flow URL: http://localhost:8080/default/flow/1b3c08e5-3405-432d-8f68-d895769d7ea4
 └── ID: 5aac74d0-72c1-4d75-9197-bcc0da440618
 └── Project: wine-quality-project
 └── Labels: []

查看Prefect

image-20211118115428224

image-20211118115511619

image-20211118115546143

image-20211118115601711

image-20211118115631385

image-20211118115706372

image-20211118115735042

查看MLflow实验跟踪记录

image-20211118135129650

image-20211118135106075

查看Kubernetes中Pod

sukai@sukai:/mnt/d/04.github/ai/server$ kubectl -n ai describe pod prefect-job-6b3a0a1e--1-86gcv
Name:         prefect-job-6b3a0a1e--1-86gcv
Namespace:    ai
Priority:     0
Node:         hello-precision-7920-rack/192.168.0.244
Start Time:   Thu, 18 Nov 2021 10:16:00 +0800
Labels:       controller-uid=a6a962f8-0bdc-4856-9524-fd5374151f30
              job-name=prefect-job-6b3a0a1e
              prefect.io/flow_id=f73b8cf6-e300-42e7-9046-6823e300dc99
              prefect.io/flow_run_id=5cc44aed-85b5-47fb-9504-0d434974fe53
              prefect.io/identifier=6b3a0a1e
Annotations:  cni.projectcalico.org/podIP:
              cni.projectcalico.org/podIPs:
Status:       Succeeded
IP:           10.244.135.61
IPs:
  IP:           10.244.135.61
Controlled By:  Job/prefect-job-6b3a0a1e
Containers:
  flow:
    Container ID:  docker://88295349196e5c4f1ea27f9e2e4af4d4b830a24719c0ad600e2019a3dbc45d7f
    Image:         prefecthq/prefect:0.15.9-python3.8-sukai20
    Image ID:      docker://sha256:a2261c36a690a10c544c3b059bf4e55625b4593da3b2806940cf93005f9d230e
    Port:          <none>
    Host Port:     <none>
    Args:
      prefect
      execute
      flow-run
    State:          Terminated
      Reason:       Completed
      Exit Code:    0
      Started:      Thu, 18 Nov 2021 10:16:01 +0800
      Finished:     Thu, 18 Nov 2021 10:16:08 +0800
    Ready:          False
    Restart Count:  0
    Environment:
      PREFECT__LOGGING__LEVEL:                      INFO
      AWS_ACCESS_KEY_ID:                            mlflow
      MLFLOW_TRACKING_URI:                          http://mlflow.platform.sukai.com/
      AWS_SECRET_ACCESS_KEY:                        sukai
      MLFLOW_S3_ENDPOINT_URL:                       http://s3.platform.sukai.com/
      PREFECT__CONTEXT__SECRETS__AWS_CREDENTIALS:   {"ACCESS_KEY": "mlflow", "SECRET_ACCESS_KEY": "sukai"}
      PREFECT__BACKEND:                             server
      PREFECT__CLOUD__AGENT__LABELS:                []
      PREFECT__CLOUD__API:                          http://prefect-apollo.ai:4200/graphql
      PREFECT__CLOUD__AUTH_TOKEN:
      PREFECT__CLOUD__API_KEY:
      PREFECT__CLOUD__TENANT_ID:
      PREFECT__CLOUD__USE_LOCAL_SECRETS:            false
      PREFECT__CLOUD__SEND_FLOW_RUN_LOGS:           true
      PREFECT__CONTEXT__FLOW_RUN_ID:                5cc44aed-85b5-47fb-9504-0d434974fe53
      PREFECT__CONTEXT__FLOW_ID:                    f73b8cf6-e300-42e7-9046-6823e300dc99
      PREFECT__CONTEXT__IMAGE:                      prefecthq/prefect:0.15.9-python3.8-sukai20
      PREFECT__ENGINE__FLOW_RUNNER__DEFAULT_CLASS:  prefect.engine.cloud.CloudFlowRunner
      PREFECT__ENGINE__TASK_RUNNER__DEFAULT_CLASS:  prefect.engine.cloud.CloudTaskRunner
      PREFECT__LOGGING__LOG_TO_CLOUD:               true
    Mounts:
      /var/run/secrets/kubernetes.io/serviceaccount from kube-api-access-pvgjb (ro)
Conditions:
  Type              Status
  Initialized       True
  Ready             False
  ContainersReady   False
  PodScheduled      True
Volumes:
  kube-api-access-pvgjb:
    Type:                    Projected (a volume that contains injected data from multiple sources)
    TokenExpirationSeconds:  3607
    ConfigMapName:           kube-root-ca.crt
    ConfigMapOptional:       <nil>
    DownwardAPI:             true
QoS Class:                   BestEffort
Node-Selectors:              <none>
Tolerations:                 node.kubernetes.io/not-ready:NoExecute op=Exists for 300s
                             node.kubernetes.io/unreachable:NoExecute op=Exists for 300s
Events:
  Type    Reason     Age   From               Message
  ----    ------     ----  ----               -------
  Normal  Scheduled  26s   default-scheduler  Successfully assigned ai/prefect-job-6b3a0a1e--1-86gcv to hello-precision-7920-rack
  Normal  Pulled     25s   kubelet            Container image "prefecthq/prefect:0.15.9-python3.8-sukai20" already present on machine
  Normal  Created    25s   kubelet            Created container flow
  Normal  Started    25s   kubelet            Started container flow


sukai@sukai:/mnt/d/04.github/ai/server$ kubectl -n ai logs prefect-job-6b3a0a1e--1-86gcv
[2021-11-18 02:16:02+0000] INFO - prefect.S3 | Downloading flow from s3://prefect-sukai/train-wine-quality-model/2021-11-18t02-15-04-448956-00-00
[2021-11-18 02:16:02+0000] INFO - prefect.S3 | Flow successfully downloaded. ETag: "b5bc762bf4a73a8298b6a61294e58f2e", LastModified: 2021-11-18T02:15:04+00:00, VersionId: None
[2021-11-18 02:16:03+0000] INFO - prefect.CloudFlowRunner | Beginning Flow run for 'train-wine-quality-model'
[2021-11-18 02:16:03+0000] INFO - prefect.CloudTaskRunner | Task 'fetch_data': Starting task run...
[2021-11-18 02:16:05+0000] INFO - prefect.CloudTaskRunner | Task 'fetch_data': Finished task run for task with final state: 'Success'
[2021-11-18 02:16:05+0000] INFO - prefect.CloudTaskRunner | Task 'train_model': Starting task run...
Elasticnet model (alpha=0.300000, l1_ratio=0.300000):
  RMSE: 0.7138934813010738
  MAE: 0.5774349731953587
  R2: 0.21323931512613703
[2021-11-18 02:16:07+0000] INFO - prefect.CloudTaskRunner | Task 'train_model': Finished task run for task with final state: 'Success'
[2021-11-18 02:16:07+0000] INFO - prefect.CloudFlowRunner | Flow run SUCCESS: all reference tasks succeeded