Azure ML Pipelines and MLOps with GitHub Actions – Part 2

It is usually better to start at the beginning so you might want to head over to Part 1 if you missed it.

In Part 2 I will focus on managing the pipeline run with the “run context” and then registering the model in a way that ties the model back to the pipeline, artifacts, and code that published it. In Part 3 I will conclude with pipeline schedules and GitHub Actions.

All of the source code can be found on my GitHub so don’t be shy to give it a star 🙂

Leveraging Run Context

Azure ML allows execution of a Python script in a container that can be sent/run on AML compute clusters instead of a local machine. This could be a data transformation script, a training script, or an inferencing script. The below examples shows how to do this for a simple training script (stand alone, in absence of a pipeline)

from azureml.core import Experiment
experiment_name = 'train-on-amlcompute'
experiment = Experiment(workspace = ws, name = experiment_name)

from azureml.core import ScriptRunConfig
src = ScriptRunConfig(source_directory=project_folder, 
                      script='train.py', 
                      compute_target=cpu_cluster, 
                      environment=myenv)
 
run = experiment.submit(config=src)

These "runs" are executed via a submit command from an experiment. Being able to log information to the run from within the script itself (in the above example train.py) is key.

In this repo, iris_supervised_model.py leverages run context to log metrics, tables, and properties. run = Run.get_context() This is the magic line that connects a vanilla Python script to the context of the run, inside the experiment, inside the Azure ML workspace.

Now, metrics can be logged
run.log("accuracy",best_score)
and tables
run.log_confusion_matrix('Confusion matrix '+name, confusion_matrix(Y_train, model.predict(X_train)))

See this sample notebook for all the things logging.

TIP When relying on run context of Azure ML (such as environment variables being passed in from the driver script) performing the following check early in the script can allow defaults to be set for anything that would have been passed in. This allows for local testing which is a time saver.

if (run.id.startswith('OfflineRun')):
	os.environ['AZUREML_DATAREFERENCE_irisdata'] = '.\sample_data.csv'
	os.environ['AZUREML_DATAREFERENCE_model_output'] = '.\model_output'

Managing the pipeline execution

In a pipeline, each run is at the step level, or a child of a parent run which is the pipeline itself.

Pipeline Parent Child

It may be best to log important metrics or properties at the pipeline level rather than at the step level (or both). run.parent will get the parent run context. The code below sets the two properties by passing in a dictionary as the parameter and those same values on two tags as well.

run.parent.add_properties({'best_model':best_model[0],'accuracy':best_score})
run.parent.tag("best_model",best_model[0])
run.parent.tag("accuracy",best_score)

Properties are immutable while tags are not, however tags are more predominant in the Azure ML Run UI so they are easier to read. tags

To review the added properties click "Raw JSON" under "see all properties".
see all properties

properties

Now that the results of the training are published to the parent pipeline tags (and properties), they can be used to control what happens in execution of later steps. In register_model.py, the accuracy score is going to control if this model will be registered or not.

Model Registration

The model artifact should be registered as it allows "one click" deployment for real time inferencing hosted on AKS or ACI. Even if the intention is to use it for batch inferencing with Azure ML pipelines it is a more organized way as shown below to keep full context of how the model was built verse just storing the pickle file off in a cloud storage location.

In context of this example pipeline, training has been completed in iris_supervised_model.py. The best model accuracy has been recorded in the tags of the pipeline run.

In the next step of the pipeline register_model.py, retrieve the parent pipeline run context with parentrun = run.parent and review the tags that have been set.

The below code block shows getting the accuracy score from the tag dictionary for the current pipeline run, but also an alternative method to interegate previous steps in the pipeline to retrieve the tags by using parentrun.get_children()

tagsdict = parentrun.get_tags()
if (tagsdict.get("best_model")) != None:
    model_type = tagsdict['best_model']
    model_accuracy = float(tagsdict['accuracy'])
    training_run_id = parentrun.id
else:
    for step in parentrun.get_children():
        print("Outputs of step " + step.name)
        if step.name == training_step_name:
                tagsdict = step.get_tags()
                model_type = tagsdict['best_model']
                model_accuracy = float(tagsdict['accuracy'])
                training_run_id = step.id

The model can be registered directly to the workspace, but the context of how the model was built is then disconnected from the training pipeline. Instead, the model will be registered from the pipeline run object. To do this the model artifact (model.pkl file) needs to be uploaded to the parent run.

# to register a model to a run, the file has to be uploaded to that run first.
model_output = os.environ['AZUREML_DATAREFERENCE_model_output']
parentrun.upload_file('model.pkl',model_output+'/model.pkl')

Next, see if the model name is already registered. If so, record the accuracy score of the previous model to compare against the new model. If this is the first time the model has been trained it won’t exist in the registry so set the accuracy to beat equal to 0.

try:
    model = Model(ws, model_name)
    acc_to_beat = float(model.properties["accuracy"])
except:
    acc_to_beat = 0

Compare the new model accuracy with the previous model accuracy to beat and if the model is better, register it. Note: the model is being registered via parentrun.register_model and not Model.register_model. This is important as it nicely ties the registered model and artifact back to all the context of how it was created.

if model_accuracy > acc_to_beat:
    print("model is better, registering")

    # Registering the model to the parent run (the pipeline). The entire pipeline encapsulates the training process.
    model = parentrun.register_model(
                       model_name=model_name,
                       model_path=model_path,
                       model_framework=Model.Framework.SCIKITLEARN, 
                       model_framework_version=sklearn.__version__,
                       sample_input_dataset=dataset,
                       resource_configuration=ResourceConfiguration(cpu=1, memory_in_gb=0.5),
                       description='basic iris classification',
                       tags={'quality': 'good', 'type': 'classification'})

Set additional properties for accuracy and model_type so that the next time training is ran the current accuracy will be compared against that model (just like above)

model.add_properties({"accuracy":model_accuracy,"model_type":model_type})
model.add_tags({"accuracy":model_accuracy,"model_type":model_type})

Access the run logs, outputs, code snapshots from registered model

In the model registry, when registering from the run itself, it hyperlinks to the run id.
Model

This links back to the pipeline run.
Pipeline

Notice that when clicking on the iris_supervised_model.py step, there is access to the outputs/logs, metrics, and even the snapshots of the code used to generate the model artifact that is registered.
Snapshot

Conclusion

Registering the model from the pipeline run gives complete context of how the model was built and registered! Its sets up real time and batch inferencing deployment as next steps.

Up Next Part 3

Azure ML Pipelines and MLOps with GitHub Actions – Part 1

I have been working on customer projects with Azure ML pretty regularly over the last two years. Some common challenges:

  • Microsoft highly promotes the AKS deployment for real time inference, yet most of the time customers are still looking for an effective way to do batch scoring.
  • When customers leverage Azure ML pipelines for batch processes they struggle with the concept of pushing datasets and files between steps. This erodes the true power of splitting an ML process into steps.
  • MLOps is hard and overwhelming.

This is not a “start from the beginning” blog post. This is going to assume that you have familiarity with Azure ML If you are not, the sample notebooks are seriously EXCELLENT! However, they seem to get you 90% there but miss out on implementation details that are key for success.

The scenario I am using below and can be found on my GitHub. It is an Azure Pipeline that trains several iris classification models. It picks the best one and logs it. In the next pipeline step, if that model is better than the previous training run, it will register the model. This training pipeline can be put on a schedule or it can be triggered from a code check in. In this case, from a GitHub action.

In a later blog post, i will discuss in more detail the model registration process (some production tips there) and the GitHub action, but I will start with properly passing datasets and files between steps.

Passing datasets and files between steps

Other than a few blogs I have found on the internet, instructions on how to properly pass files or datasets between steps are hard to find.

pipeline_image.PNG

In the above image you can see that irisdata is passed into iris_supervised_model.py and then model_output is the output. When you define the pipeline in the driver script, the input data is a DataReference object and any data passed between steps is a PipelineData object.

from azureml.core.datastore import Datastore
from azureml.data.data_reference import DataReference
ds = ws.get_default_datastore()
print("Default Blobstore's name: {}".format(ds.name))

dataset_ref = DataReference(
    datastore=ds,
    data_reference_name='irisdata',
    path_on_datastore="data/sample_data.csv")
print("DataReference object created")
from azureml.pipeline.core import Pipeline, PipelineData
model_output = PipelineData("model_output",datastore=ds)
print("PipelineData object created for models")

In the PythonScriptStep, utilize the input and output parameters.

from azureml.pipeline.steps import PythonScriptStep
trainingScript = PythonScriptStep(
    script_name="iris_supervised_model.py", 
    inputs=[dataset_ref],
    outputs=[model_output],
    compute_target=aml_compute, 
    source_directory="./azureml",
    runconfig=run_config
)

Simply pass the “model_output” from outputs as input to the next step (the register_model.py that will be a focus of the next blog post) and so on.

Using these references in the script

When you submit a pipeline job to run, a container is created and all the files in the source_directory specified in the PythonScriptStep are imported into the container. The input and outputs effectively become mount points for blob storage to that container. In the iris_supervised_model.py script step this mount point is accessible via an environment variable that looks like the below.

os.environ['AZUREML_DATAREFERENCE_irisdata']

This is also the same environment variable format used for the output location (the PipelineData object) which appears to be a randomly created storage location given to you from AzureML.

mounted_output_path = os.environ['AZUREML_DATAREFERENCE_model_output']

Looking at the mounted_output_path variable above gives a location like: mnt/batch/tasks/shared/LS_root/jobs/amlworkspacesjh/azureml/715a1dca-fafc-4899-ae78-ffffffffffff/mounts/workspaceblobstore/azureml/71ab64d9-bc4c-4b74-a5a5-ffffffffffff/model_output


You should be able to treat these environment variables as a file location just like a local path. So for the irisdata which was a csv file in the data reference you can read it like normal.

df = pd.read_csv(os.environ['AZUREML_DATAREFERENCE_irisdata'], names=column_headers)

For the model_output we pickle the model file and save it to the mounted_output_path.

pkl_filename = "model.pkl"
mounted_output_path = os.environ['AZUREML_DATAREFERENCE_model_output']
with open(os.path.join(mounted_output_path, pkl_filename), 'wb') as file:
    pickle.dump(best_model[1], file)

Now look into register_model.py, we utilize the PipelineData object (model_output) as our input and reference the same environment variable as in iris_supervised_model.py

mounted_output_path = os.environ['AZUREML_DATAREFERENCE_model_output']
print("model path",model_output)
print("files in model path",os.listdir(path=model_output))

In the file list, model.pkl is there right where it was created in the training script.

Conclusion

The ability to pass data between pipeline steps is pretty easy, but the documentation on using the magic “AZUREML_DATAREFERENCE_***” environment variables is lacking in most of the sample notebooks I have found. Just remember that these are mount points and can be interacted with just like local files basically.

Up next, Part 2

The “where have I been?” footnote

It has been 21 months since my last blog post. My role at Microsoft has led me to focus much more on cloud data services for only a couple of customers. I loved to blog about Power BI but I just haven’t been in that space for awhile as my day to day responsibilities were handed over to the much more capable @notaboutthecell. I have been working a lot on real time stream processing (with Databricks / Cosmos DB / Azure Functions) and ML engineering activities with Azure ML. Blog posts have been difficult as so much of my work is implementation oriented and it is hard to recreate everything in a publicly sharable way.

Or maybe i have just been lazy 🙂

Anyway, I am sure that the blog posts in my future are probably going to be more narrow in application and probably won’t be “marathon reads” that explain everything in detail but hopefully enough to connect the dots for the people who need it.