Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/cronpal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from cronpal.field_parser import FieldParser
from cronpal.models import CronExpression, CronField, FieldType
from cronpal.parser import create_parser
from cronpal.special_parser import SpecialStringParser
from cronpal.validators import validate_expression

__all__ = [
Expand All @@ -23,6 +24,7 @@
"CronField",
"FieldType",
"FieldParser",
"SpecialStringParser",
"validate_expression",
"CronPalError",
"InvalidCronExpression",
Expand Down
100 changes: 63 additions & 37 deletions src/cronpal/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from cronpal.field_parser import FieldParser
from cronpal.models import CronExpression
from cronpal.parser import create_parser
from cronpal.special_parser import SpecialStringParser
from cronpal.validators import validate_expression, validate_expression_format


Expand All @@ -31,53 +32,43 @@ def main(args=None):
# Validate the expression first
validate_expression(parsed_args.expression)

# Parse the expression into fields
fields = validate_expression_format(parsed_args.expression)
# Check if it's a special string
special_parser = SpecialStringParser()
if special_parser.is_special_string(parsed_args.expression):
# Parse as special string
cron_expr = special_parser.parse(parsed_args.expression)

# Create a CronExpression object
cron_expr = CronExpression(parsed_args.expression)
print(f"✓ Valid cron expression: {cron_expr}")

# Parse all fields if not a special expression
if len(fields) == 5:
if parsed_args.verbose:
print(f" Special string: {cron_expr.raw_expression}")
description = special_parser.get_description(cron_expr.raw_expression)
print(f" Description: {description}")

# For @reboot, we don't have fields to show
if cron_expr.raw_expression.lower() != "@reboot":
_print_verbose_fields(cron_expr)
else:
# Parse the expression into fields
fields = validate_expression_format(parsed_args.expression)

# Create a CronExpression object
cron_expr = CronExpression(parsed_args.expression)

# Parse all fields
field_parser = FieldParser()
cron_expr.minute = field_parser.parse_minute(fields[0])
cron_expr.hour = field_parser.parse_hour(fields[1])
cron_expr.day_of_month = field_parser.parse_day_of_month(fields[2])
cron_expr.month = field_parser.parse_month(fields[3])
cron_expr.day_of_week = field_parser.parse_day_of_week(fields[4])

print(f"✓ Valid cron expression: {cron_expr}")

if parsed_args.verbose:
print(f" Raw expression: {cron_expr.raw_expression}")
print(f" Validation: PASSED")

if cron_expr.minute:
print(f" Minute field: {cron_expr.minute.raw_value}")
if cron_expr.minute.parsed_values:
_print_field_values(" ", cron_expr.minute.parsed_values)

if cron_expr.hour:
print(f" Hour field: {cron_expr.hour.raw_value}")
if cron_expr.hour.parsed_values:
_print_field_values(" ", cron_expr.hour.parsed_values)

if cron_expr.day_of_month:
print(f" Day of month field: {cron_expr.day_of_month.raw_value}")
if cron_expr.day_of_month.parsed_values:
_print_field_values(" ", cron_expr.day_of_month.parsed_values)
print(f"✓ Valid cron expression: {cron_expr}")

if cron_expr.month:
print(f" Month field: {cron_expr.month.raw_value}")
if cron_expr.month.parsed_values:
_print_field_values(" ", cron_expr.month.parsed_values)
_print_month_names(" ", cron_expr.month.parsed_values)

if cron_expr.day_of_week:
print(f" Day of week field: {cron_expr.day_of_week.raw_value}")
if cron_expr.day_of_week.parsed_values:
_print_field_values(" ", cron_expr.day_of_week.parsed_values)
_print_day_names(" ", cron_expr.day_of_week.parsed_values)
if parsed_args.verbose:
print(f" Raw expression: {cron_expr.raw_expression}")
print(f" Validation: PASSED")
_print_verbose_fields(cron_expr)

return 0

Expand All @@ -102,6 +93,41 @@ def main(args=None):
return 0


def _print_verbose_fields(cron_expr: CronExpression):
"""
Print verbose field information.

Args:
cron_expr: The CronExpression to print fields for.
"""
if cron_expr.minute:
print(f" Minute field: {cron_expr.minute.raw_value}")
if cron_expr.minute.parsed_values:
_print_field_values(" ", cron_expr.minute.parsed_values)

if cron_expr.hour:
print(f" Hour field: {cron_expr.hour.raw_value}")
if cron_expr.hour.parsed_values:
_print_field_values(" ", cron_expr.hour.parsed_values)

if cron_expr.day_of_month:
print(f" Day of month field: {cron_expr.day_of_month.raw_value}")
if cron_expr.day_of_month.parsed_values:
_print_field_values(" ", cron_expr.day_of_month.parsed_values)

if cron_expr.month:
print(f" Month field: {cron_expr.month.raw_value}")
if cron_expr.month.parsed_values:
_print_field_values(" ", cron_expr.month.parsed_values)
_print_month_names(" ", cron_expr.month.parsed_values)

if cron_expr.day_of_week:
print(f" Day of week field: {cron_expr.day_of_week.raw_value}")
if cron_expr.day_of_week.parsed_values:
_print_field_values(" ", cron_expr.day_of_week.parsed_values)
_print_day_names(" ", cron_expr.day_of_week.parsed_values)


def _print_field_values(prefix: str, values: set):
"""
Print field values in a nice format.
Expand Down
115 changes: 115 additions & 0 deletions src/cronpal/special_parser.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
"""Special string parser for cron expressions."""

from cronpal.constants import SPECIAL_STRINGS
from cronpal.exceptions import InvalidCronExpression
from cronpal.field_parser import FieldParser
from cronpal.models import CronExpression


class SpecialStringParser:
"""Parser for special cron strings like @yearly, @daily, etc."""

def __init__(self):
"""Initialize the special string parser."""
self.field_parser = FieldParser()

def is_special_string(self, expression: str) -> bool:
"""
Check if an expression is a special string.

Args:
expression: The expression to check.

Returns:
True if it's a special string, False otherwise.
"""
return expression.strip().lower() in [s.lower() for s in SPECIAL_STRINGS.keys()]

def parse(self, special_string: str) -> CronExpression:
"""
Parse a special string into a CronExpression.

Args:
special_string: The special string to parse (e.g., "@daily").

Returns:
A CronExpression object with parsed fields.

Raises:
InvalidCronExpression: If the string is not recognized.
"""
special_string = special_string.strip()

# Check if it's a valid special string
if special_string not in SPECIAL_STRINGS:
# Try case-insensitive match
special_lower = special_string.lower()
matching_key = None
for key in SPECIAL_STRINGS.keys():
if key.lower() == special_lower:
matching_key = key
break

if not matching_key:
available = ", ".join(sorted(SPECIAL_STRINGS.keys()))
raise InvalidCronExpression(
f"Unknown special string: '{special_string}'. "
f"Available: {available}"
)
special_string = matching_key

# Get the expanded expression
expanded = SPECIAL_STRINGS[special_string]

# Create the CronExpression with the original special string
cron_expr = CronExpression(special_string)

# Handle @reboot specially - it doesn't expand to standard fields
if special_string == "@reboot":
# @reboot is a special case that doesn't have time fields
return cron_expr

# Parse the expanded expression fields
fields = expanded.split()
if len(fields) != 5:
raise InvalidCronExpression(
f"Invalid expansion for {special_string}: {expanded}"
)

# Parse each field using the field parser
cron_expr.minute = self.field_parser.parse_minute(fields[0])
cron_expr.hour = self.field_parser.parse_hour(fields[1])
cron_expr.day_of_month = self.field_parser.parse_day_of_month(fields[2])
cron_expr.month = self.field_parser.parse_month(fields[3])
cron_expr.day_of_week = self.field_parser.parse_day_of_week(fields[4])

return cron_expr

def get_description(self, special_string: str) -> str:
"""
Get a human-readable description of a special string.

Args:
special_string: The special string to describe.

Returns:
A human-readable description.
"""
descriptions = {
"@yearly": "Run once a year at midnight on January 1st",
"@annually": "Run once a year at midnight on January 1st",
"@monthly": "Run once a month at midnight on the 1st",
"@weekly": "Run once a week at midnight on Sunday",
"@daily": "Run once a day at midnight",
"@midnight": "Run once a day at midnight",
"@hourly": "Run once an hour at the beginning of the hour",
"@reboot": "Run at system startup"
}

# Normalize the string
special_string = special_string.strip().lower()
for key, desc in descriptions.items():
if key.lower() == special_string:
return desc

return f"Unknown special string: {special_string}"
Loading
Loading