← AWS MLA-C01 — ML Engineer Associate

Lab: Domain 1 — Data Preparation Hands-On

Lab: Domain 1 — Data Preparation Hands-On

These labs walk you through the real AWS services you’ll be tested on in the MLA-C01 exam. Each lab explains what you’re building, why it matters, and how it connects to exam questions — not just code.


Lab 1: SageMaker Feature Store

What Is This Lab About?

Imagine you work at an e-commerce company. Your ML team builds fraud detection models, recommendation engines, and churn predictors. Every team independently calculates features like “customer lifetime value” or “average order size” — but they each compute them slightly differently. Worse, the features used during training don’t always match what’s available during real-time inference.

SageMaker Feature Store solves this by providing a single source of truth for ML features that serves both training (batch) and inference (real-time).

What You’ll Build

┌─────────────────────────────────────────────────────────────┐
│                  SageMaker Feature Store                     │
│                                                             │
│  Your Data                                                  │
│  (customer CSV)                                             │
│       │                                                     │
│       ▼                                                     │
│  ┌─────────────┐       ┌──────────────────────────────────┐ │
│  │ Feature      │       │                                  │ │
│  │ Group:       │       │  ONLINE STORE                    │ │
│  │ "customers"  │──────▶│  • DynamoDB-backed               │ │
│  │              │       │  • Latest value per customer     │ │
│  │ Fields:      │       │  • <10ms lookup                  │ │
│  │ • customer_id│       │  • Used at inference time:       │ │
│  │ • age        │       │    "Give me customer 1001's      │ │
│  │ • income     │       │     features RIGHT NOW"          │ │
│  │ • credit_score       └──────────────────────────────────┘ │
│  │ • purchases  │                                           │
│  │ • EventTime  │       ┌──────────────────────────────────┐ │
│  │              │       │                                  │ │
│  │              │──────▶│  OFFLINE STORE                   │ │
│  │              │       │  • S3 Parquet files              │ │
│  └─────────────┘       │  • Full history of all values    │ │
│                        │  • Query with Athena SQL         │ │
│                        │  • Used for training:            │ │
│                        │    "Give me all customer features │ │
│                        │     AS THEY WERE on Jan 1st"     │ │
│                        └──────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘

Feature Store Architecture

What’s ACTUALLY Running Behind the Scenes?

Feature Store isn’t magic. When you create a Feature Group, AWS provisions real infrastructure:

Online Store — it’s DynamoDB under the hood:

  • AWS creates a DynamoDB table for your feature group
  • Each row is keyed by your record_identifier (e.g., customer_id)
  • Only the latest version of each record is kept (DynamoDB overwrites the previous)
  • This is why reads are <10ms — DynamoDB is purpose-built for single-digit millisecond key-value lookups
  • When your inference endpoint calls get_record(customer_id=1001), it’s doing a DynamoDB GetItem — that’s it
  • You pay DynamoDB read/write capacity units (included in Feature Store pricing)

Offline Store — it’s S3 + Glue Data Catalog + Athena:

  • Every record you ingest is ALSO appended to S3 as Parquet files (takes 5-15 minutes to sync)
  • The Parquet files are partitioned by year/month/day/hour for efficient querying
  • The table is automatically registered in AWS Glue Data Catalog (the metadata store)
  • When you call feature_group.athena_query(), it runs a query on Amazon Athena which scans the Parquet files in S3
  • Because it’s append-only (never overwrites), you have FULL HISTORY — every value every record ever had
  • This is how point-in-time queries work: WHERE event_time <= timestamp filters to what you knew at that moment

Why do we need BOTH?

  • Training needs historical data (offline) — “what were the features when this fraud happened?”
  • Inference needs current data (online) — “what are this customer’s features RIGHT NOW?”
  • Without Feature Store, teams build separate systems that inevitably diverge: the features used in training don’t match inference, causing training-serving skew — the #1 silent killer of ML models in production

The sync mechanism:

Ingest API call
    │
    ├──▶ Online Store (DynamoDB) — immediate, latest value only
    │
    └──▶ Offline Store (S3 Parquet) — async, 5-15 min delay, full history
         │
         └──▶ Glue Data Catalog (metadata) — auto-registered
              │
              └──▶ Athena can query it with SQL

Why This Matters for the Exam

Feature Store appears in ~10-15% of MLA-C01 questions. The key tested concepts:

  1. Online vs Offline — when to use which (real-time inference vs training)
  2. Point-in-time correctness — preventing label leakage in temporal data
  3. Feature reuse — sharing features across teams
  4. Both stores enabled — when features are needed for training AND real-time serving

Step 1: Setup Your Environment

You’ll need a SageMaker Studio notebook or any environment with the SageMaker SDK. The execution role needs Feature Store and S3 permissions.

import sagemaker
import boto3
import pandas as pd
import numpy as np
import time
from sagemaker.feature_store.feature_group import FeatureGroup

role = sagemaker.get_execution_role()
session = sagemaker.Session()
region = session.boto_region_name
bucket = session.default_bucket()
prefix = "featurestore-lab"

Step 2: Create Your Dataset

In production, this data comes from your data warehouse or ETL pipeline. Here we simulate 5 customers with features a fraud detection model might use.

customer_data = pd.DataFrame({
    "customer_id": [1001, 1002, 1003, 1004, 1005],
    "age": [25, 34, 45, 28, 52],
    "income": [50000, 75000, 120000, 45000, 95000],
    "credit_score": [680, 720, 780, 650, 740],
    "total_purchases": [12, 45, 89, 5, 67],
})

Critical requirement: Feature Store needs an EventTime column. This is a float64 timestamp that tracks WHEN each record was valid. It’s how Feature Store enables point-in-time queries (“give me features as they were at time X”).

current_time = int(round(time.time()))
customer_data["EventTime"] = pd.Series(
    [current_time] * len(customer_data), dtype="float64"
)

Feature Store only supports 3 data types:

TypePython EquivalentExample
Stringstr“customer_123”
Fractionalfloat6450000.00
Integralint6425

No booleans, no dates, no arrays. You must convert everything to these three types.

Step 3: Create the Feature Group

A Feature Group is like a table in a database. It has a schema (feature definitions), a primary key (record identifier), and a timestamp (event time).

from time import gmtime, strftime

feature_group_name = "customer-features-" + strftime("%d-%H-%M-%S", gmtime())

feature_group = FeatureGroup(
    name=feature_group_name,
    sagemaker_session=session
)

# Auto-detect feature types from the dataframe columns
feature_group.load_feature_definitions(data_frame=customer_data)

Now create it with both online and offline stores enabled. This is the most common configuration because you typically need features for both training (offline) and real-time inference (online).

feature_group.create(
    s3_uri=f"s3://{bucket}/{prefix}",        # offline store location
    record_identifier_name="customer_id",     # primary key
    event_time_feature_name="EventTime",      # required timestamp
    role_arn=role,
    enable_online_store=True,                 # enables real-time lookups
)

The offline store is always created (to S3 as Parquet). The enable_online_store=True additionally creates the real-time DynamoDB-backed store.

Wait for it to be ready:

def wait_for_feature_group(fg):
    status = fg.describe().get("FeatureGroupStatus")
    while status == "Creating":
        print(f"Status: {status}...")
        time.sleep(5)
        status = fg.describe().get("FeatureGroupStatus")
    print(f"Feature Group ready: {status}")

wait_for_feature_group(feature_group)

Step 4: Ingest Data

This pushes your dataframe into both stores simultaneously. The online store gets the latest value immediately. The offline store syncs within a few minutes.

feature_group.ingest(
    data_frame=customer_data,
    max_workers=3,     # 3 parallel ingestion threads
    wait=True          # block until complete
)
print("Ingestion complete")

Step 5: Query the Online Store (Real-Time)

This is what your inference endpoint calls. When a customer makes a transaction, your fraud model needs their features in milliseconds — not minutes.

Single record lookup — “give me customer 1001’s latest features”:

featurestore_runtime = boto3.client(
    "sagemaker-featurestore-runtime", region_name=region
)

record = featurestore_runtime.get_record(
    FeatureGroupName=feature_group_name,
    RecordIdentifierValueAsString="1001"
)

# Returns: [{"FeatureName": "age", "ValueAsString": "25"}, ...]
for feature in record["Record"]:
    print(f"  {feature['FeatureName']}: {feature['ValueAsString']}")

Batch lookup — “give me features for these 3 customers at once”:

batch = featurestore_runtime.batch_get_record(
    Identifiers=[{
        "FeatureGroupName": feature_group_name,
        "RecordIdentifiersValueAsString": ["1001", "1002", "1003"],
    }]
)

for record in batch["Records"]:
    customer = {f["FeatureName"]: f["ValueAsString"] for f in record["Record"]}
    print(f"  Customer {customer['customer_id']}: score={customer['credit_score']}")

Step 6: Query the Offline Store (Training Data)

This is what your training pipeline uses. The offline store keeps the full history in S3 as Parquet files, registered in the Glue Data Catalog, so you can query with Athena SQL.

query = feature_group.athena_query()
table_name = query.table_name  # auto-registered in Glue

query_string = f"""
SELECT customer_id, age, income, credit_score, total_purchases
FROM "{table_name}"
WHERE credit_score > 700
ORDER BY income DESC
"""

query.run(
    query_string=query_string,
    output_location=f"s3://{bucket}/{prefix}/query_results/"
)
query.wait()

df = query.as_dataframe()
print(df)

Note: The offline store takes a few minutes to sync from the online store. If your query returns empty, wait 5-10 minutes and try again.

Step 7: Point-in-Time Query (Prevent Label Leakage)

This is the most tested Feature Store concept on the exam. Here’s the problem it solves:

BAD (without point-in-time):
  You're training a model to predict January fraud.
  You accidentally use customer features from March.
  The model "cheats" — it sees future information.
  Works great in training, fails in production.

GOOD (with point-in-time):
  You query features AS THEY WERE on January 1st.
  No future information leaks in.
  Model learns from what was actually knowable at prediction time.
# Get features as they existed at a specific timestamp
jan_1_timestamp = 1704067200  # 2024-01-01 00:00:00 UTC

query_string = f"""
SELECT customer_id, age, income, credit_score
FROM "{table_name}"
WHERE event_time <= {jan_1_timestamp}
"""

query.run(query_string=query_string,
          output_location=f"s3://{bucket}/{prefix}/pit_results/")
query.wait()
historical_df = query.as_dataframe()

Step 8: Cleanup

feature_group.delete()

Exam Scenarios for This Lab

Question PatternAnswer
“Reuse features across teams”Feature Store
“Consistent features between training and serving”Feature Store (both stores enabled)
“Low-latency feature lookup at inference”Online Store (get_record())
“Historical features for training”Offline Store (Athena query)
“Prevent label leakage in time-series training”Point-in-time correctness
“Feature types supported”String, Fractional, Integral only

Lab 2: Building a Data Pipeline — S3 → Glue → Athena

What Is This Lab About?

Most ML projects start with raw data sitting in S3 — CSVs, JSONs, logs. Before you can train a model, you need to:

  1. Discover what’s in those files (schema, data types)
  2. Catalog it so tools can find and query it
  3. Transform it into ML-ready format (Parquet, cleaned, encoded)
  4. Query it efficiently without loading into memory

This pipeline is the foundation of every ML data preparation workflow on AWS.

What You’ll Build

┌────────────┐     ┌──────────────┐     ┌──────────────┐     ┌────────────┐
│            │     │              │     │              │     │            │
│  Raw CSV   │────▶│ Glue Crawler │────▶│ Glue Data    │────▶│  Athena    │
│  in S3     │     │              │     │ Catalog      │     │  SQL Query │
│            │     │ "What's the  │     │              │     │            │
│ sales.csv  │     │  schema?"    │     │ "table:sales │     │ "SELECT *  │
│ users.csv  │     │              │     │  columns:    │     │  WHERE     │
│            │     │ Discovers:   │     │  date, amount│     │  amt > 100"│
│            │     │ columns,     │     │  user_id..." │     │            │
│            │     │ types,       │     │              │     │ Scans only │
│            │     │ partitions   │     │ Hive-compat  │     │ relevant   │
│            │     │              │     │ metastore    │     │ data in S3 │
└────────────┘     └──────────────┘     └──────────────┘     └────────────┘

Optional next step:
┌──────────────┐     ┌────────────┐
│ Glue ETL Job │────▶│ Parquet in │
│              │     │ S3 (faster,│
│ CSV → Parquet│     │ cheaper    │
│ Clean nulls  │     │ queries)   │
│ Partition by │     │            │
│ date         │     └────────────┘
└──────────────┘

Why This Matters for the Exam

  • Glue Crawlers populate the Data Catalog that Athena uses
  • Parquet format reduces Athena cost by 10-100x vs CSV
  • Partitioning by date/category = only scan relevant data
  • Glue Job Bookmarks = process only new files (incremental ETL)

Step 1: Upload Raw Data to S3

import boto3

s3 = boto3.client("s3")
bucket = "my-ml-data-lake"

# Upload CSV files
s3.upload_file("data/sales_2024.csv", bucket, "raw/sales/year=2024/sales.csv")
s3.upload_file("data/sales_2025.csv", bucket, "raw/sales/year=2025/sales.csv")

Notice the Hive-style partitioning in the path: year=2024/. Glue Crawler automatically recognizes this and creates partition columns, so Athena queries like WHERE year = 2024 skip the 2025 data entirely.

Step 2: Create and Run Glue Crawler

The crawler scans your S3 path, figures out the schema (column names, types), and registers it in the Data Catalog.

glue = boto3.client("glue")

# Create a database to hold our tables
glue.create_database(
    DatabaseInput={"Name": "ml_database", "Description": "ML training data"}
)

# Create the crawler
glue.create_crawler(
    Name="sales-data-crawler",
    Role="arn:aws:iam::role/GlueServiceRole",
    DatabaseName="ml_database",
    Targets={"S3Targets": [{"Path": f"s3://{bucket}/raw/sales/"}]},
    SchemaChangePolicy={
        "UpdateBehavior": "UPDATE_IN_DATABASE",
        "DeleteBehavior": "LOG",
    },
)

# Run it
glue.start_crawler(Name="sales-data-crawler")
print("Crawler started — takes 1-3 minutes")

After the crawler completes, you’ll have a table sales in ml_database with automatically detected columns and year as a partition column.

Step 3: Query with Athena

Now you can run SQL directly on the S3 data — no database server, no data loading, no infrastructure.

athena = boto3.client("athena")

response = athena.start_query_execution(
    QueryString="""
        SELECT year, COUNT(*) as total_orders, AVG(amount) as avg_order
        FROM ml_database.sales
        WHERE amount > 100
        GROUP BY year
        ORDER BY year
    """,
    QueryExecutionContext={"Database": "ml_database"},
    ResultConfiguration={"OutputLocation": f"s3://{bucket}/athena-results/"},
)

print(f"Query submitted: {response['QueryExecutionId']}")

Cost optimization: Athena charges per TB scanned.

Format1 TB CSV scan1 TB Parquet scanCost difference
CSV$5.00Baseline
Parquet$0.5010x cheaper
Parquet + partition$0.05100x cheaper

Step 4: Convert to Parquet with Glue ETL (Optional)

For production, convert CSV to Parquet to save money and speed up queries.

# Athena CTAS (Create Table As Select) — easiest way
athena.start_query_execution(
    QueryString="""
        CREATE TABLE ml_database.sales_parquet
        WITH (
            format = 'PARQUET',
            external_location = 's3://my-ml-data-lake/processed/sales/',
            partitioned_by = ARRAY['year']
        )
        AS SELECT * FROM ml_database.sales
    """,
    QueryExecutionContext={"Database": "ml_database"},
    ResultConfiguration={"OutputLocation": f"s3://{bucket}/athena-results/"},
)

Glue Job Bookmarks — Incremental Processing

Without bookmarks, a daily Glue ETL job reprocesses ALL files every run. With bookmarks, it only processes files added since the last run.

Day 1: Files A, B, C processed → bookmark saves "processed up to C"
Day 2: Files D, E added → Glue only processes D, E (skips A, B, C)
Day 3: File F added → Glue only processes F

This is configured in the Glue job properties: --job-bookmark-option job-bookmark-enable


Lab 3: Real-Time Streaming with Kinesis

What Is This Lab About?

Your e-commerce website generates thousands of click events per second. You need to:

  1. Ingest events in real-time (Kinesis Data Streams)
  2. Deliver them to S3 for batch training (Data Firehose)
  3. Optionally process them in-flight for real-time ML (Flink)

What You’ll Build

  User clicks on website
        │
        ▼
┌──────────────────┐
│ Kinesis Data      │     ┌───────────────────┐
│ Streams           │────▶│ Consumer: Lambda   │──▶ SageMaker Endpoint
│                   │     │ (real-time scoring)│    (fraud check per click)
│ Shard 1: 1 MB/s  │     └───────────────────┘
│ Shard 2: 1 MB/s  │
│                   │     ┌───────────────────┐
│ Ordering: per     │────▶│ Consumer: Firehose │──▶ S3 (Parquet)
│ partition key     │     │ (batch delivery)   │    (training data)
│                   │     │ Buffer: 60s/1MB    │
│ Retention: 24h-   │     └───────────────────┘
│ 365d (replayable) │
└──────────────────┘

Key Concepts to Understand

Shards are the unit of throughput. Each shard handles:

  • Write: 1 MB/s or 1,000 records/s
  • Read: 2 MB/s (shared) or 2 MB/s per consumer (enhanced fan-out)

If you need 10 MB/s write throughput, you need 10 shards.

Partition keys determine which shard receives a record. Records with the same partition key always go to the same shard, preserving ordering per key.

Step 1: Create a Stream

kinesis = boto3.client("kinesis")

kinesis.create_stream(
    StreamName="clickstream",
    ShardCount=2,  # 2 MB/s write, 4 MB/s read
)
print("Stream creating... takes ~30 seconds")

Step 2: Send Events

Each event represents a user action. We use user_id as the partition key so all events from the same user are ordered.

import json

for i in range(100):
    event = {
        "user_id": f"user_{i % 10}",
        "action": "click",
        "product_id": f"prod_{i % 50}",
        "timestamp": int(time.time()),
        "page": "product_detail",
    }

    kinesis.put_record(
        StreamName="clickstream",
        Data=json.dumps(event),
        PartitionKey=event["user_id"],  # same user → same shard → ordered
    )

Step 3: Read Events (Consumer)

In production, you’d use Lambda or a KCL (Kinesis Client Library) app. Here’s a basic reader:

# Get shard iterator
response = kinesis.describe_stream(StreamName="clickstream")
shard_id = response["StreamDescription"]["Shards"][0]["ShardId"]

iterator = kinesis.get_shard_iterator(
    StreamName="clickstream",
    ShardId=shard_id,
    ShardIteratorType="TRIM_HORIZON",  # start from oldest record
)["ShardIterator"]

# Read records
response = kinesis.get_records(ShardIterator=iterator, Limit=10)
for record in response["Records"]:
    data = json.loads(record["Data"])
    print(f"  {data['user_id']} clicked {data['product_id']}")

Kinesis vs Firehose — When to Use Which

Need custom processing logic?
  YES → Kinesis Data Streams (you build consumers)
  NO  → Data Firehose (just deliver to S3/Redshift)

Need real-time (<1 second)?
  YES → Kinesis Data Streams (~200ms latency)
  NO  → Data Firehose (60-second minimum buffer)

Need to replay data?
  YES → Kinesis Data Streams (24h-365d retention)
  NO  → Data Firehose (delivers and forgets)

Want zero management?
  YES → Data Firehose (auto-scales, no shards)
  NO  → Kinesis Data Streams (you manage shards)

Lab 4: SageMaker Processing — Data Preprocessing at Scale

What Is This Lab About?

Before training, raw data needs cleaning: handle missing values, encode categories, scale numbers, split into train/test. SageMaker Processing runs your preprocessing script on managed infrastructure — you don’t provision servers.

How It’s Different from Training

Processing Job:                    Training Job:
  Input: raw data                    Input: PROCESSED data
  Output: cleaned data               Output: MODEL
  Script: preprocessing.py           Script: train.py
  Purpose: DATA PREPARATION          Purpose: MODEL TRAINING
  Container paths:                   Container paths:
    /opt/ml/processing/input/          /opt/ml/input/data/
    /opt/ml/processing/train/          /opt/ml/model/
    /opt/ml/processing/test/           /opt/ml/output/

Exam trap: SageMaker Processing ≠ SageMaker Training. Questions test this distinction.

What You’ll Build

S3 (raw CSV)
    │
    ▼
┌─────────────────────────────────────────────────────┐
│  SageMaker Processing Job                            │
│                                                     │
│  preprocessing.py:                                  │
│    1. Read CSV from /opt/ml/processing/input/       │
│    2. Handle missing values (median imputation)     │
│    3. Encode categoricals (one-hot encoding)        │
│    4. Scale numerics (standardization)              │
│    5. Split into train/test (80/20)                 │
│    6. Write to /opt/ml/processing/train/ and        │
│                /opt/ml/processing/test/              │
└──────────────┬──────────────────┬───────────────────┘
               │                  │
               ▼                  ▼
        S3 (train.csv)     S3 (test.csv)
               │
               ▼
        SageMaker Training Job (next step)

Step 1: Write the Preprocessing Script

This script runs INSIDE the SageMaker Processing container. It reads from and writes to specific paths.

# preprocessing.py — saved locally, uploaded to the Processing job
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer

if __name__ == "__main__":
    # SageMaker Processing mounts input data here
    df = pd.read_csv("/opt/ml/processing/input/data.csv")

    y = df.pop("target")

    # Identify column types
    numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
    categorical_cols = df.select_dtypes(include=["object"]).columns.tolist()

    # Build transformation pipeline
    preprocessor = ColumnTransformer(transformers=[
        ("num", Pipeline([
            ("imputer", SimpleImputer(strategy="median")),
            ("scaler", StandardScaler()),
        ]), numeric_cols),
        ("cat", Pipeline([
            ("imputer", SimpleImputer(strategy="constant", fill_value="missing")),
            ("onehot", OneHotEncoder(handle_unknown="ignore")),
        ]), categorical_cols),
    ])

    X = preprocessor.fit_transform(df)
    y_arr = y.to_numpy().reshape(-1, 1)

    if hasattr(X, "toarray"):
        X = X.toarray()

    data = np.concatenate([y_arr, X], axis=1)
    train, test = train_test_split(data, test_size=0.2, random_state=42)

    # SageMaker Processing expects output at these paths
    pd.DataFrame(train).to_csv(
        "/opt/ml/processing/train/train.csv", header=False, index=False
    )
    pd.DataFrame(test).to_csv(
        "/opt/ml/processing/test/test.csv", header=False, index=False
    )
    print(f"Train: {train.shape}, Test: {test.shape}")

Step 2: Run the Processing Job

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput

processor = SKLearnProcessor(
    framework_version="1.2-1",
    instance_type="ml.m5.xlarge",
    instance_count=1,
    role=role,
    sagemaker_session=session,
)

processor.run(
    code="preprocessing.py",                # your local script
    inputs=[
        ProcessingInput(
            source="s3://my-bucket/raw/data.csv",           # S3 input
            destination="/opt/ml/processing/input",          # mounted here
        ),
    ],
    outputs=[
        ProcessingOutput(
            output_name="train",
            source="/opt/ml/processing/train",               # container path
            destination="s3://my-bucket/processed/train/",   # S3 output
        ),
        ProcessingOutput(
            output_name="test",
            source="/opt/ml/processing/test",
            destination="s3://my-bucket/processed/test/",
        ),
    ],
)
print("Processing complete — cleaned data in S3")

Processing Job vs Other Options

ScenarioUse
Simple pandas/sklearn transformsSageMaker Processing (SKLearnProcessor)
Spark-based distributed transformsSageMaker Processing (PySparkProcessor) or EMR
Visual no-code prepData Wrangler (inside SageMaker Studio)
Visual no-code prep for analystsGlue DataBrew
Petabyte-scale custom SparkEMR
Serverless ETL with Data CatalogGlue ETL

Domain 1 Lab Summary

LabServiceYou Learned
1Feature StoreCreate feature groups, ingest, query online/offline, point-in-time
2S3 + Glue + AthenaSchema discovery, Data Catalog, SQL on S3, Parquet optimization
3KinesisReal-time streaming, shards, partition keys, Kinesis vs Firehose
4SageMaker ProcessingPreprocessing at scale, container paths, Processing vs Training