Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/7214.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added a "debug-logs" option to the TASK_DIAGNOSTICS / X-Task-Diagnostics feature - dumps task-specific logs into a profile artifact at a DEBUG level.
1 change: 1 addition & 0 deletions CHANGES/7215.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added a "logs" option to the TASK_DIAGNOSTICS / X-Task-Diagnostics feature - dumps task-specific logs into a profile artifact.
1 change: 1 addition & 0 deletions CHANGES/7219.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"profile options" designated using the X-Task-Diagnostics feature should be propagated to child tasks.
4 changes: 2 additions & 2 deletions docs/dev/guides/pull-request-walkthrough.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ Please be sure to follow the [Pulp policy on AI Usage](site:help/more/governance

1. Add `functional tests` or `unit tests` where appropriate and ensure tests
are passing on the CI.
2. Add a [`CHANGES entry`](site:pulpcore/docs/dev/guides/git/#markdown-header-changelog-update).
2. Add a [`CHANGES entry`](site:pulpcore/docs/dev/guides/git/#changelog-update).
3. Update relevent `documentation`. Please build the docs to test!
4. If the PR is a simple feature or a bugfix, [`rebase and squash`](site:pulpcore/docs/dev/guides/git/#markdown-header-rebasing-and-squashing) to a single commit.
4. If the PR is a simple feature or a bugfix, [`rebase and squash`](site:pulpcore/docs/dev/guides/git/#rebasing-and-squashing) to a single commit.
If the PR is a complex feature, make sure that all commits are cleanly separated and have meaningful commit messages.
5. Make sure you tag commits with `closes #IssueNumber` or `ref #IssueNumber` when working on a tracked issue.
6. If AI was used, make sure you are following the [Pulp policy on AI Usage](site:help/more/governance/ai_policy/)
Expand Down
5 changes: 4 additions & 1 deletion docs/dev/learn/tasks/diagnostics.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ The following diagnostics are supported currently:
- memray:
Dumps a profile which can be processed with `memray`, which shows which lines and functions were
responsible for the most allocations at the time of peak RSS of the process

- logs:
Dumps all logs specific to the task. Formatting not guaranteed to be identical.
- debug-logs:
Same as the `logs` option except it exports logs at a DEBUG level, whether or not the system is configured to print DEBUG logs otherwise.

## Memory Logging

Expand Down
6 changes: 5 additions & 1 deletion pulpcore/app/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@
"handlers": {
"console": {
"class": "logging.StreamHandler",
"level": "INFO",
"formatter": "simple",
"filters": ["correlation_id"],
}
Expand Down Expand Up @@ -374,8 +375,11 @@
# lines and functions, at the time of peak RSS of the task process. This adds significant
# runtime overhead to the task process, 20-40%. Tweaking code might be warranted for
# some advanced settings.
# * "logs" - Dumps the logs specific to each task.
# * "debug-logs" - Same as "logs" but it exports logs at a DEBUG level, even if the system is
# otherwise not configured to print DEBUG logs.
# NOTE: "memray" and "pyinstrument" require additional packages to be installed on the system.
TASK_DIAGNOSTICS = [] # ["memory", "pyinstrument", "memray"]
TASK_DIAGNOSTICS = [] # ["memory", "pyinstrument", "memray", "logs", "debug-logs"]

ANALYTICS = True

Expand Down
2 changes: 0 additions & 2 deletions pulpcore/plugin/stages/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ async def run(self):
content = await self._in_q.get()
if content is None:
break
log.debug("%(name)s - next: %(content)s.", {"name": self, "content": content})
yield content

async def batches(self, minsize=500):
Expand Down Expand Up @@ -171,7 +170,6 @@ async def put(self, item):
if item is None:
raise ValueError(_("(None) not permitted."))
await self._out_q.put(item)
log.debug("{name} - put: {content}".format(name=self, content=item))

def __str__(self):
return "[{id}] {name}".format(id=id(self), name=self.__class__.__name__)
Expand Down
55 changes: 55 additions & 0 deletions pulpcore/tasking/_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ def _execute_task_and_profile(task, profile_options):
if "memray" in profile_options:
_execute_task = _memray_diagnostic_decorator(temp_dir, _execute_task)

is_debug = "debug-logs" in profile_options
if "logs" in profile_options or is_debug:
_execute_task = _logging_decorator(temp_dir, is_debug, _execute_task)

_execute_task(task)


Expand Down Expand Up @@ -218,6 +222,57 @@ def __memray_diagnostic_decorator(task):
return __memray_diagnostic_decorator


def _logging_decorator(temp_dir, is_debug, func):
def __logging_decorator(task):
log_file_path = os.path.join(temp_dir, "task_logs.log")

# Create a file handler that captures all logging levels
file_handler = logging.FileHandler(log_file_path, mode="w", encoding="utf-8")
file_handler.setLevel(logging.NOTSET) # Capture all levels

# Create a formatter for consistent log formatting
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
)
file_handler.setFormatter(formatter)

# Get the root logger to capture all logs
root_logger = logging.getLogger()
original_level = root_logger.level

try:
# Add the handler to the root logger
root_logger.addHandler(file_handler)

if is_debug:
# Temporarily lower the root logger level to allow all messages through
# The existing handlers maintain their own levels, so they won't be affected
root_logger.setLevel(logging.NOTSET)

# Execute the task
func(task)
finally:
# Always restore original level and remove the handler
if is_debug:
root_logger.setLevel(original_level)
root_logger.removeHandler(file_handler)
file_handler.close()

# Save the log file as a ProfileArtifact
artifact = Artifact.init_and_validate(log_file_path)
try:
# it's unlikely for a log file to be identical, but we retain the same check as the
# other decorators
artifact.save()
except IntegrityError:
artifact = Artifact.objects.get(sha256=artifact.sha256)

ProfileArtifact.objects.get_or_create(artifact=artifact, name="task_logs", task=task)
_logger.info("Created task logging diagnostic data.")

return __logging_decorator


def dispatch_scheduled_tasks():
# Warning, dispatch_scheduled_tasks is not race condition free!
now = timezone.now()
Expand Down
6 changes: 6 additions & 0 deletions pulpcore/tasking/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ def _execute_task(task):
with with_task_context(task):
task.set_running()
domain = get_domain()

try:
# If this task is being spawned by another task, we should inherit the profile options
# from the current task.
ctx_token = x_task_diagnostics_var.set(task.profile_options)
log_task_start(task, domain)
task_function = get_task_function(task)
result = task_function()
Expand All @@ -89,6 +93,7 @@ def _execute_task(task):
else:
task.set_completed(result)
log_task_completed(task, domain)
x_task_diagnostics_var.reset(ctx_token)
send_task_notification(task)


Expand Down Expand Up @@ -352,6 +357,7 @@ async def adispatch(
def get_task_payload(
function_name, task_group, args, kwargs, resources, versions, immediate, deferred, app_lock
):
"""Create arguments for creation of a new task"""
payload = {
"state": TASK_STATES.WAITING,
"logging_cid": (get_guid()),
Expand Down