Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions bigframes/pandas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,13 @@ def reset_session():
reset_session.__doc__ = global_session.close_session.__doc__


def job_history() -> pandas.DataFrame:
return global_session.with_default_session(bigframes.session.Session.job_history)


job_history.__doc__ = inspect.getdoc(bigframes.session.Session.job_history)


# SQL Compilation uses recursive algorithms on deep trees
# 10M tree depth should be sufficient to generate any sql that is under bigquery limit
# Note: This limit does not have the desired effect on Python 3.12 in
Expand All @@ -385,6 +392,7 @@ def reset_session():
deploy_remote_function,
deploy_udf,
get_default_session_id,
job_history,
get_dummies,
merge,
qcut,
Expand Down Expand Up @@ -419,6 +427,7 @@ def reset_session():
"deploy_remote_function",
"deploy_udf",
"get_default_session_id",
"job_history",
"get_dummies",
"merge",
"qcut",
Expand Down
4 changes: 4 additions & 0 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,10 @@ def slot_millis_sum(self):
"""The sum of all slot time used by bigquery jobs in this session."""
return self._metrics.slot_millis

def job_history(self) -> pandas.DataFrame:
"""Returns a list of BigQuery jobs initiated by BigFrames in the current session."""
return pandas.DataFrame([job.__dict__ for job in self._metrics.jobs])

@property
def _allows_ambiguity(self) -> bool:
return self._allow_ambiguity
Expand Down
3 changes: 2 additions & 1 deletion bigframes/session/_io/bigquery/read_gbq_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ def check_if_index_columns_are_unique(
index_cols: List[str],
*,
publisher: bigframes.core.events.Publisher,
metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None,
) -> Tuple[str, ...]:
import bigframes.core.sql
import bigframes.session._io.bigquery
Expand All @@ -341,7 +342,7 @@ def check_if_index_columns_are_unique(
timeout=None,
location=None,
project=None,
metrics=None,
metrics=metrics,
query_with_job=False,
publisher=publisher,
)
Expand Down
4 changes: 4 additions & 0 deletions bigframes/session/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,9 @@ def _start_generic_job(self, job: formatting_helpers.GenericJob):
else:
job.result()

if self._metrics is not None and isinstance(job, google.cloud.bigquery.job.Job):
self._metrics.count_job_stats(query_job=job)

@overload
def read_gbq_table( # type: ignore[overload-overlap]
self,
Expand Down Expand Up @@ -878,6 +881,7 @@ def read_gbq_table(
table=table,
index_cols=index_cols,
publisher=self._publisher,
metrics=self._metrics,
)
if publish_execution:
self._publisher.publish(
Expand Down
170 changes: 146 additions & 24 deletions bigframes/session/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,54 @@
from __future__ import annotations

import dataclasses
import datetime
import os
from typing import Optional, Tuple
from typing import Any, Mapping, Optional, Tuple

import google.cloud.bigquery as bigquery
import google.cloud.bigquery.job as bq_job
import google.cloud.bigquery.table as bq_table

LOGGING_NAME_ENV_VAR = "BIGFRAMES_PERFORMANCE_LOG_NAME"


@dataclasses.dataclass
class JobMetadata:
job_id: Optional[str] = None
query_id: Optional[str] = None
location: Optional[str] = None
project: Optional[str] = None
creation_time: Optional[datetime.datetime] = None
start_time: Optional[datetime.datetime] = None
end_time: Optional[datetime.datetime] = None
duration_seconds: Optional[float] = None
status: Optional[str] = None
total_bytes_processed: Optional[int] = None
total_slot_ms: Optional[int] = None
job_type: Optional[str] = None
error_result: Optional[Mapping[str, Any]] = None
cached: Optional[bool] = None
job_url: Optional[str] = None
query: Optional[str] = None
destination_table: Optional[str] = None
source_uris: Optional[list[str]] = None
input_files: Optional[int] = None
input_bytes: Optional[int] = None
output_rows: Optional[int] = None
source_format: Optional[str] = None


@dataclasses.dataclass
class ExecutionMetrics:
execution_count: int = 0
slot_millis: int = 0
bytes_processed: int = 0
execution_secs: float = 0
query_char_count: int = 0
jobs: list[JobMetadata] = dataclasses.field(default_factory=list)

def count_job_stats(
self,
query_job: Optional[bq_job.QueryJob] = None,
query_job: Optional[bigquery.job.Job] = None,
row_iterator: Optional[bq_table.RowIterator] = None,
):
if query_job is None:
Expand All @@ -57,42 +84,137 @@ def count_job_stats(
self.slot_millis += slot_millis
self.execution_secs += exec_seconds

self.jobs.append(
JobMetadata(
job_id=getattr(row_iterator, "job_id", None),
query_id=getattr(row_iterator, "query_id", None),
location=getattr(row_iterator, "location", None),
project=getattr(row_iterator, "project", None),
creation_time=created,
start_time=getattr(row_iterator, "started", None),
end_time=ended,
duration_seconds=exec_seconds,
status="DONE",
total_bytes_processed=bytes_processed,
total_slot_ms=slot_millis,
job_type="query",
cached=getattr(row_iterator, "cache_hit", None),
query=getattr(row_iterator, "query", None),
job_url=f"https://console.cloud.google.com/bigquery?project={getattr(row_iterator, 'project', '')}&j=bq:{getattr(row_iterator, 'location', '')}:{getattr(row_iterator, 'job_id', '')}&page=queryresults"
if getattr(row_iterator, "job_id", None)
else None,
)
)

elif query_job.configuration.dry_run:
query_char_count = len(query_job.query)
query_char_count = len(getattr(query_job, "query", ""))

# TODO(tswast): Pass None after making benchmark publishing robust to missing data.
bytes_processed = 0
slot_millis = 0
exec_seconds = 0.0

elif (stats := get_performance_stats(query_job)) is not None:
query_char_count, bytes_processed, slot_millis, exec_seconds = stats
elif isinstance(query_job, bigquery.QueryJob):
if (stats := get_performance_stats(query_job)) is not None:
query_char_count, bytes_processed, slot_millis, exec_seconds = stats
self.execution_count += 1
self.query_char_count += query_char_count or 0
self.bytes_processed += bytes_processed or 0
self.slot_millis += slot_millis or 0
self.execution_secs += exec_seconds or 0

self.jobs.append(
JobMetadata(
job_id=query_job.job_id,
location=query_job.location,
project=query_job.project,
creation_time=query_job.created,
start_time=query_job.started,
end_time=query_job.ended,
duration_seconds=exec_seconds,
status=query_job.state,
total_bytes_processed=bytes_processed,
total_slot_ms=slot_millis,
job_type=query_job.job_type,
error_result=query_job.error_result,
cached=query_job.cache_hit,
query=query_job.query,
destination_table=str(query_job.destination)
if query_job.destination
else None,
job_url=f"https://console.cloud.google.com/bigquery?project={query_job.project}&j=bq:{query_job.location}:{query_job.job_id}&page=queryresults",
)
)

else:
# Handle other job types (e.g. LoadJob)
self.execution_count += 1
self.query_char_count += query_char_count or 0
self.bytes_processed += bytes_processed or 0
self.slot_millis += slot_millis or 0
self.execution_secs += exec_seconds or 0
duration = (
(query_job.ended - query_job.created).total_seconds()
if query_job.ended and query_job.created
else None
)

job_metadata = JobMetadata(
job_id=query_job.job_id,
location=query_job.location,
project=query_job.project,
creation_time=query_job.created,
start_time=query_job.started,
end_time=query_job.ended,
duration_seconds=duration,
status=query_job.state,
job_type=query_job.job_type,
error_result=query_job.error_result,
job_url=f"https://console.cloud.google.com/bigquery?project={query_job.project}&j=bq:{query_job.location}:{query_job.job_id}&page=queryresults",
)

if isinstance(query_job, bigquery.LoadJob):
job_metadata.output_rows = getattr(query_job, "output_rows", None)
job_metadata.input_files = getattr(query_job, "input_files", None)
job_metadata.input_bytes = getattr(query_job, "input_bytes", None)
job_metadata.destination_table = (
str(query_job.destination) if query_job.destination else None
)
if query_job.source_uris:
job_metadata.source_uris = list(query_job.source_uris)
if query_job.configuration and hasattr(
query_job.configuration, "source_format"
):
job_metadata.source_format = query_job.configuration.source_format

self.jobs.append(job_metadata)

# For pytest runs only, log information about the query job
# to a file in order to create a performance report.
if (
isinstance(query_job, bigquery.QueryJob)
and not query_job.configuration.dry_run
):
stats = get_performance_stats(query_job)
if stats:
write_stats_to_disk(
query_char_count=stats[0],
bytes_processed=stats[1],
slot_millis=stats[2],
exec_seconds=stats[3],
)
elif row_iterator is not None:
bytes_processed = getattr(row_iterator, "total_bytes_processed", 0) or 0
query_char_count = len(getattr(row_iterator, "query", "") or "")
slot_millis = getattr(row_iterator, "slot_millis", 0) or 0
created = getattr(row_iterator, "created", None)
ended = getattr(row_iterator, "ended", None)
exec_seconds = (
(ended - created).total_seconds() if created and ended else 0.0
)
write_stats_to_disk(
query_char_count=query_char_count,
bytes_processed=bytes_processed,
slot_millis=slot_millis,
exec_seconds=exec_seconds,
)

else:
# TODO(tswast): Pass None after making benchmark publishing robust to missing data.
bytes_processed = 0
query_char_count = 0
slot_millis = 0
exec_seconds = 0

write_stats_to_disk(
query_char_count=query_char_count,
bytes_processed=bytes_processed,
slot_millis=slot_millis,
exec_seconds=exec_seconds,
)


def get_performance_stats(
query_job: bigquery.QueryJob,
Expand Down
Loading
Loading