Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 164 additions & 0 deletions docs/source/user-guide/dataframe/execution-metrics.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
.. Licensed to the Apache Software Foundation (ASF) under one
.. or more contributor license agreements. See the NOTICE file
.. distributed with this work for additional information
.. regarding copyright ownership. The ASF licenses this file
.. to you under the Apache License, Version 2.0 (the
.. "License"); you may not use this file except in compliance
.. with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
.. software distributed under the License is distributed on an
.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
.. KIND, either express or implied. See the License for the
.. specific language governing permissions and limitations
.. under the License.

.. _execution_metrics:

Execution Metrics
=================

Overview
--------

When DataFusion executes a query it compiles the logical plan into a tree of
*physical plan operators* (e.g. ``FilterExec``, ``ProjectionExec``,
``HashAggregateExec``). Each operator can record runtime statistics while it
runs. These statistics are called **execution metrics**.

Typical metrics include:

- **output_rows** – number of rows produced by the operator
- **elapsed_compute** – total CPU time (nanoseconds) spent inside the operator
- **spill_count** – number of times the operator spilled data to disk
- **spilled_bytes** – total bytes written to disk during spills
- **spilled_rows** – total rows written to disk during spills

Metrics are collected *per-partition*: DataFusion may execute each operator
in parallel across several partitions. The convenience properties on
:py:class:`~datafusion.MetricsSet` (e.g. ``output_rows``, ``elapsed_compute``)
automatically sum the named metric across **all** partitions, giving a single
aggregate value for the operator as a whole. You can also access the raw
per-partition :py:class:`~datafusion.Metric` objects via
:py:meth:`~datafusion.MetricsSet.metrics`.

When Are Metrics Available?
---------------------------

Metrics are populated only **after** the DataFrame has been executed.
Execution is triggered by any of the terminal operations:

- :py:meth:`~datafusion.DataFrame.collect`
- :py:meth:`~datafusion.DataFrame.collect_partitioned`
- :py:meth:`~datafusion.DataFrame.execute_stream`
- :py:meth:`~datafusion.DataFrame.execute_stream_partitioned`

Calling :py:meth:`~datafusion.ExecutionPlan.collect_metrics` before execution
will return entries with empty (or ``None``) metric sets because the operators
have not run yet.

Reading the Physical Plan Tree
--------------------------------

:py:meth:`~datafusion.DataFrame.execution_plan` returns the root
:py:class:`~datafusion.ExecutionPlan` node of the physical plan tree. The tree
mirrors the operator pipeline: the root is typically a projection or
coalescing node; its children are filters, aggregates, scans, etc.

The ``operator_name`` string returned by
:py:meth:`~datafusion.ExecutionPlan.collect_metrics` is the *display* name of
the node, for example ``"FilterExec: column1@0 > 1"``. This is the same string
you would see when calling ``plan.display()``.

Available Metrics
-----------------

The following metrics are directly accessible as properties on
:py:class:`~datafusion.MetricsSet`:

.. list-table::
:header-rows: 1
:widths: 25 75

* - Property
- Description
* - ``output_rows``
- Number of rows emitted by the operator (summed across partitions).
* - ``elapsed_compute``
- CPU time in nanoseconds spent inside the operator's execute loop
(summed across partitions).
* - ``spill_count``
- Number of spill-to-disk events due to memory pressure (summed across
partitions).
* - ``spilled_bytes``
- Total bytes written to disk during spills (summed across partitions).
* - ``spilled_rows``
- Total rows written to disk during spills (summed across partitions).

Any metric not listed above can be accessed via
:py:meth:`~datafusion.MetricsSet.sum_by_name`, or by iterating over the raw
:py:class:`~datafusion.Metric` objects returned by
:py:meth:`~datafusion.MetricsSet.metrics`.

Labels
------

A :py:class:`~datafusion.Metric` may carry *labels*: key/value pairs that
provide additional context. For example, some operators tag their output
metrics with an ``output_type`` label to distinguish between intermediate and
final output:

.. code-block:: python

for metric in metrics_set.metrics():
print(metric.name, metric.labels())
# output_rows {'output_type': 'final'}

Labels are operator-specific; most metrics have no labels.

End-to-End Example
------------------

.. code-block:: python

from datafusion import SessionContext

ctx = SessionContext()
ctx.sql("CREATE TABLE sales AS VALUES (1, 100), (2, 200), (3, 50)")

df = ctx.sql("SELECT * FROM sales WHERE column1 > 1")

# Execute the query — this populates the metrics
results = df.collect()

# Retrieve the physical plan with metrics
plan = df.execution_plan()

# Walk every operator and print its metrics
for operator_name, ms in plan.collect_metrics():
if ms.output_rows is not None:
print(f"{operator_name}")
print(f" output_rows = {ms.output_rows}")
print(f" elapsed_compute = {ms.elapsed_compute} ns")

# Access raw per-partition metrics
for operator_name, ms in plan.collect_metrics():
for metric in ms.metrics():
print(
f" partition={metric.partition} "
f"{metric.name}={metric.value} "
f"labels={metric.labels()}"
)

API Reference
-------------

- :py:class:`datafusion.ExecutionPlan` — physical plan node
- :py:meth:`datafusion.ExecutionPlan.collect_metrics` — walk the tree and
return ``(operator_name, MetricsSet)`` pairs
- :py:meth:`datafusion.ExecutionPlan.metrics` — return the
:py:class:`~datafusion.MetricsSet` for a single node
- :py:class:`datafusion.MetricsSet` — aggregated metrics for one operator
- :py:class:`datafusion.Metric` — a single per-partition metric value
9 changes: 9 additions & 0 deletions docs/source/user-guide/dataframe/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,16 @@ DataFusion provides many built-in functions for data manipulation:
For a complete list of available functions, see the :py:mod:`datafusion.functions` module documentation.


Execution Metrics
-----------------

After executing a DataFrame (via ``collect()``, ``execute_stream()``, etc.),
DataFusion populates per-operator runtime statistics such as row counts and
compute time. See :doc:`execution-metrics` for a full explanation and
worked example.

.. toctree::
:maxdepth: 1

rendering
execution-metrics
4 changes: 3 additions & 1 deletion python/datafusion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
from .expr import Expr, WindowFrame
from .io import read_avro, read_csv, read_json, read_parquet
from .options import CsvReadOptions
from .plan import ExecutionPlan, LogicalPlan
from .plan import ExecutionPlan, LogicalPlan, Metric, MetricsSet
from .record_batch import RecordBatch, RecordBatchStream
from .user_defined import (
Accumulator,
Expand Down Expand Up @@ -85,6 +85,8 @@
"Expr",
"InsertOp",
"LogicalPlan",
"Metric",
"MetricsSet",
"ParquetColumnOptions",
"ParquetWriterOptions",
"RecordBatch",
Expand Down
168 changes: 168 additions & 0 deletions python/datafusion/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

from __future__ import annotations

import datetime

from typing import TYPE_CHECKING, Any

import datafusion._internal as df_internal
Expand All @@ -29,6 +31,8 @@
__all__ = [
"ExecutionPlan",
"LogicalPlan",
"Metric",
"MetricsSet",
]


Expand Down Expand Up @@ -151,3 +155,167 @@ def to_proto(self) -> bytes:
Tables created in memory from record batches are currently not supported.
"""
return self._raw_plan.to_proto()

def metrics(self) -> MetricsSet | None:
"""Return metrics for this plan node after execution, or None if unavailable."""
raw = self._raw_plan.metrics()
if raw is None:
return None
return MetricsSet(raw)
Comment on lines +159 to +164
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is leading me to think we should have some high level documentation, probably in the DataFrame page (or a subpage under it). Some of the things it would be good to do are to explain to a user what kinds of information they could find under these metrics and why that data are not available until after the DataFrame has been executed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, I think that would be super helpful. I can extend this to include a new user-facing RST page covering things like what metrics are, when they're available, how the physical plan tree maps to operators, etc.


def collect_metrics(self) -> list[tuple[str, MetricsSet]]:
"""Return runtime statistics for each step of the query execution.

DataFusion executes a query as a pipeline of operators — for example a
data source scan, followed by a filter, followed by a projection. After
the DataFrame has been executed (via
:py:meth:`~datafusion.DataFrame.collect`,
:py:meth:`~datafusion.DataFrame.execute_stream`, etc.), each operator
records statistics such as how many rows it produced and how much CPU
time it consumed.

Each entry in the returned list corresponds to one operator that
recorded metrics. The first element of the tuple is the operator's
description string — the same text shown by
:py:meth:`display_indent` — which identifies both the operator type
and its key parameters, for example ``"FilterExec: column1@0 > 1"``
or ``"DataSourceExec: partitions=1"``.

Returns:
A list of ``(description, MetricsSet)`` tuples ordered from the
outermost operator (top of the execution tree) down to the
data-source leaves. Only operators that recorded at least one
metric are included. Returns an empty list if called before the
DataFrame has been executed.
"""
result: list[tuple[str, MetricsSet]] = []

def _walk(node: ExecutionPlan) -> None:
ms = node.metrics()
if ms is not None:
result.append((node.display(), ms))
for child in node.children():
_walk(child)

_walk(self)
return result


class MetricsSet:
"""A set of metrics for a single execution plan operator.

A physical plan operator runs independently across one or more partitions.
:py:meth:`metrics` returns the raw per-partition :py:class:`Metric` objects.
The convenience properties (:py:attr:`output_rows`, :py:attr:`elapsed_compute`,
etc.) automatically sum the named metric across *all* partitions, giving a
single aggregate value for the operator as a whole.
"""

def __init__(self, raw: df_internal.MetricsSet) -> None:
"""This constructor should not be called by the end user."""
self._raw = raw

def metrics(self) -> list[Metric]:
"""Return all individual metrics in this set."""
return [Metric(m) for m in self._raw.metrics()]

@property
def output_rows(self) -> int | None:
"""Sum of output_rows across all partitions."""
return self._raw.output_rows()

@property
def elapsed_compute(self) -> int | None:
"""Total CPU time (in nanoseconds) spent inside this operator's execute loop.

Summed across all partitions. Returns ``None`` if no ``elapsed_compute``
metric was recorded.
"""
return self._raw.elapsed_compute()

@property
def spill_count(self) -> int | None:
"""Number of times this operator spilled data to disk due to memory pressure.

This is a count of spill events, not a byte count. Summed across all
partitions. Returns ``None`` if no ``spill_count`` metric was recorded.
"""
return self._raw.spill_count()

@property
def spilled_bytes(self) -> int | None:
"""Sum of spilled_bytes across all partitions."""
return self._raw.spilled_bytes()

@property
def spilled_rows(self) -> int | None:
"""Sum of spilled_rows across all partitions."""
return self._raw.spilled_rows()

def sum_by_name(self, name: str) -> int | None:
"""Sum the named metric across all partitions.

Useful for accessing any metric not exposed as a first-class property.
Returns ``None`` if no metric with the given name was recorded.

Args:
name: The metric name, e.g. ``"output_rows"`` or ``"elapsed_compute"``.
"""
return self._raw.sum_by_name(name)

def __repr__(self) -> str:
"""Return a string representation of the metrics set."""
return repr(self._raw)


class Metric:
"""A single execution metric with name, value, partition, and labels."""

def __init__(self, raw: df_internal.Metric) -> None:
"""This constructor should not be called by the end user."""
self._raw = raw

@property
def name(self) -> str:
"""The name of this metric (e.g. ``output_rows``)."""
return self._raw.name

@property
def value(self) -> int | None:
"""The numeric value of this metric, or ``None`` when not representable.

``None`` is returned for metric types whose value has not yet been set
(e.g. ``StartTimestamp`` / ``EndTimestamp`` before the operator runs)
and for any metric variant whose value cannot be expressed as an integer.
Timestamp metrics, when available, are returned as nanoseconds since the
Unix epoch.
"""
return self._raw.value

@property
def value_as_datetime(self) -> datetime.datetime | None:
"""The value as a UTC datetime for timestamp metrics, or ``None``."""
return self._raw.value_as_datetime()

@property
def partition(self) -> int | None:
"""The 0-based partition index this metric applies to.

Returns ``None`` for metrics that are not partition-specific (i.e. they
apply globally across all partitions of the operator).
"""
return self._raw.partition

def labels(self) -> dict[str, str]:
"""Return the labels associated with this metric.

Labels provide additional context for a metric. For example::

>>> metric.labels()
{'output_type': 'final'}
"""
return self._raw.labels()

def __repr__(self) -> str:
"""Return a string representation of the metric."""
return repr(self._raw)
Loading