diff --git a/.github/workflows/crossplane-ci.yml b/.github/workflows/crossplane-ci.yml index 1a2db25..10af4f7 100644 --- a/.github/workflows/crossplane-ci.yml +++ b/.github/workflows/crossplane-ci.yml @@ -12,7 +12,7 @@ jobs: strategy: matrix: - python-version: ["2.7", "3.6", "3.7", "3.8", "3.9", "3.10", pypy-3.6, pypy-3.7, pypy-3.8, pypy-3.9] + python-version: ["3.6", "3.7", "3.8", "3.9", "3.10", pypy-3.6, pypy-3.7, pypy-3.8, pypy-3.9] steps: - uses: actions/checkout@v3 diff --git a/AUTHORS.rst b/AUTHORS.rst index 37e8961..11946a8 100644 --- a/AUTHORS.rst +++ b/AUTHORS.rst @@ -14,4 +14,4 @@ Contributors * Ivan Poluyanov `@poluyanov `_ * Raymond Lau `@Raymond26 `_ * Luca Comellini `@lucacome `_ -* Ron Vider `@RonVider `_ \ No newline at end of file +* Ron Vider `@RonVider `_ diff --git a/CODE_OF_CONDUCT.md b/CODE_OF_CONDUCT.md index 2d9ce8c..bc3c7d3 100644 --- a/CODE_OF_CONDUCT.md +++ b/CODE_OF_CONDUCT.md @@ -72,4 +72,4 @@ members of the project's leadership. This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4, available at https://www.contributor-covenant.org/version/1/4/code-of-conduct.html -[homepage]: https://www.contributor-covenant.org \ No newline at end of file +[homepage]: https://www.contributor-covenant.org diff --git a/NOTICE b/NOTICE index dd859f0..ba54737 100644 --- a/NOTICE +++ b/NOTICE @@ -15,4 +15,3 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. - diff --git a/crossplane/__main__.py b/crossplane/__main__.py index 0ac64f6..456ad17 100644 --- a/crossplane/__main__.py +++ b/crossplane/__main__.py @@ -3,7 +3,8 @@ import io import os import sys -from argparse import ArgumentParser, RawDescriptionHelpFormatter +import typing as t +from argparse import ArgumentParser, Action, RawDescriptionHelpFormatter, Namespace from traceback import format_exception from . import __version__ @@ -12,34 +13,37 @@ from .builder import build as build_string, build_files, _enquote, DELIMITERS from .formatter import format as format_file from .compat import json, input +from .typedefs import DictStatement -def _prompt_yes(): +def _prompt_yes() -> bool: try: return input('overwrite? (y/n [n]) ').lower().startswith('y') except (KeyboardInterrupt, EOFError): sys.exit(1) -def _dump_payload(obj, fp, indent): - kwargs = {'indent': indent} +def _dump_payload(obj: t.Any, fp: t.TextIO, indent: t.Optional[int]) -> None: + kwargs: t.Dict[str, t.Any] = {'indent': indent} if indent is None: kwargs['separators'] = ',', ':' fp.write(json.dumps(obj, **kwargs) + u'\n') -def parse(filename, out, indent=None, catch=None, tb_onerror=None, ignore='', - single=False, comments=False, strict=False, combine=False): +def parse(filename: str, out: str, indent: t.Optional[int] = None, + catch: t.Optional[bool] = None, tb_onerror: t.Optional[bool] = None, + ignore: str = '', single: bool = False, comments: bool = False, + strict: bool = False, combine: bool = False) -> None: - ignore = ignore.split(',') if ignore else [] + ignored: t.List[str] = ignore.split(',') if ignore else [] - def callback(e): + def callback(e: Exception) -> str: exc = sys.exc_info() + (10,) - return ''.join(format_exception(*exc)).rstrip() + return ''.join(format_exception(*exc)).rstrip() # type: ignore[call-overload, arg-type, unused-ignore] - kwargs = { + kwargs: t.Dict[str, t.Any] = { 'catch_errors': catch, - 'ignore': ignore, + 'ignore': ignored, 'combine': combine, 'single': single, 'comments': comments, @@ -57,8 +61,9 @@ def callback(e): o.close() -def build(filename, dirname=None, force=False, indent=4, tabs=False, - header=True, stdout=False, verbose=False): +def build(filename: str, dirname: t.Optional[str] = None, force: bool = False, + indent: int = 4, tabs: bool = False, header: bool = True, + stdout: bool = False, verbose: bool = False) -> None: if dirname is None: dirname = os.getcwd() @@ -108,8 +113,8 @@ def build(filename, dirname=None, force=False, indent=4, tabs=False, print('wrote to ' + path) -def lex(filename, out, indent=None, line_numbers=False): - payload = list(lex_file(filename)) +def lex(filename: str, out: str, indent: t.Optional[int] = None, line_numbers: bool = False) -> None: + payload: t.List[t.Any] = list(lex_file(filename)) if line_numbers: payload = [(token, lineno) for token, lineno, quoted in payload] else: @@ -121,7 +126,7 @@ def lex(filename, out, indent=None, line_numbers=False): o.close() -def minify(filename, out): +def minify(filename: str, out: str) -> None: payload = parse_file( filename, single=True, @@ -132,7 +137,7 @@ def minify(filename, out): strict=False ) o = sys.stdout if out is None else io.open(out, 'w', encoding='utf-8') - def write_block(block): + def write_block(block: t.List[DictStatement]) -> None: for stmt in block: o.write(_enquote(stmt['directive'])) if stmt['directive'] == 'if': @@ -152,7 +157,7 @@ def write_block(block): o.close() -def format(filename, out, indent=4, tabs=False): +def format(filename: str, out: str, indent: int = 4, tabs: bool = False) -> None: output = format_file(filename, indent=indent, tabs=tabs) o = sys.stdout if out is None else io.open(out, 'w', encoding='utf-8') try: @@ -162,7 +167,7 @@ def format(filename, out, indent=4, tabs=False): class _SubparserHelpFormatter(RawDescriptionHelpFormatter): - def _format_action(self, action): + def _format_action(self, action: Action) -> str: line = super(RawDescriptionHelpFormatter, self)._format_action(action) if action.nargs == 'A...': @@ -175,7 +180,7 @@ def _format_action(self, action): return line -def parse_args(args=None): +def parse_args(args: t.Optional[t.List[str]] = None) -> Namespace: parser = ArgumentParser( formatter_class=_SubparserHelpFormatter, description='various operations for nginx config files', @@ -184,7 +189,7 @@ def parse_args(args=None): parser.add_argument('-V', '--version', action='version', version='%(prog)s ' + __version__) subparsers = parser.add_subparsers(title='commands') - def create_subparser(function, help): + def create_subparser(function: t.Callable[..., None], help: str) -> ArgumentParser: name = function.__name__ prog = 'crossplane ' + name p = subparsers.add_parser(name, prog=prog, help=help, description=help) @@ -231,11 +236,11 @@ def create_subparser(function, help): g.add_argument('-i', '--indent', type=int, metavar='NUM', help='number of spaces to indent output', default=4) g.add_argument('-t', '--tabs', action='store_true', help='indent with tabs instead of spaces') - def help(command): - if command not in parser._actions[-1].choices: + def help(command: str) -> None: + if command not in t.cast(t.Dict[str, t.Any], parser._actions[-1].choices): parser.error('unknown command %r' % command) else: - parser._actions[-1].choices[command].print_help() + t.cast(t.Dict[str, t.Any], parser._actions[-1].choices)[command].print_help() p = create_subparser(help, 'show help for commands') p.add_argument('command', help='command to show help for') @@ -249,7 +254,7 @@ def help(command): return parsed -def main(): +def main() -> None: kwargs = parse_args().__dict__ func = kwargs.pop('_subcommand') func(**kwargs) diff --git a/crossplane/analyzer.py b/crossplane/analyzer.py index e9d0e8e..3dfcda3 100644 --- a/crossplane/analyzer.py +++ b/crossplane/analyzer.py @@ -1,9 +1,11 @@ # -*- coding: utf-8 -*- +import typing as t from .errors import ( NgxParserDirectiveUnknownError, NgxParserDirectiveContextError, NgxParserDirectiveArgumentsError ) +from .typedefs import StatusType, DictResponse, DictFile, DictStatement # bit masks for different directive argument styles NGX_CONF_NOARGS = 0x00000001 # 0 args @@ -67,7 +69,7 @@ Since some directives can have different behaviors in different contexts, we use lists of bit masks, each describing a valid way to use the directive. -Definitions for directives that're available in the open source version of +Definitions for directives that're available in the open source version of nginx were taken directively from the source code. In fact, the variable names for the bit masks defined above were taken from the nginx source code. @@ -2111,7 +2113,7 @@ } -def enter_block_ctx(stmt, ctx): +def enter_block_ctx(stmt: DictStatement, ctx: t.Tuple[str, ...]) -> t.Tuple[str, ...]: # don't nest because NGX_HTTP_LOC_CONF just means "location block in http" if ctx and ctx[0] == 'http' and stmt['directive'] == 'location': return ('http', 'location') @@ -2120,8 +2122,8 @@ def enter_block_ctx(stmt, ctx): return ctx + (stmt['directive'],) -def analyze(fname, stmt, term, ctx=(), strict=False, check_ctx=True, - check_args=True): +def analyze(fname: str, stmt: DictStatement, term: str, ctx: t.Tuple[str, ...] = (), strict: bool = False, check_ctx: bool = True, + check_args: bool = True) -> None: directive = stmt['directive'] line = stmt['line'] @@ -2151,7 +2153,7 @@ def analyze(fname, stmt, term, ctx=(), strict=False, check_ctx=True, if not check_args: return - valid_flag = lambda x: x.lower() in ('on', 'off') + valid_flag: t.Callable[[str], bool] = lambda x: x.lower() in ('on', 'off') # do this in reverse because we only throw errors at the end if no masks # are valid, and typically the first bit mask is what the parser expects @@ -2181,7 +2183,7 @@ def analyze(fname, stmt, term, ctx=(), strict=False, check_ctx=True, raise NgxParserDirectiveArgumentsError(reason % directive, fname, line) -def register_external_directives(directives): - for directive, bitmasks in directives.iteritems(): +def register_external_directives(directives: t.Dict[str, t.List[int]]) -> None: + for directive, bitmasks in directives.items(): if bitmasks: DIRECTIVES[directive] = bitmasks diff --git a/crossplane/builder.py b/crossplane/builder.py index 049d224..3a3dc0f 100644 --- a/crossplane/builder.py +++ b/crossplane/builder.py @@ -1,16 +1,24 @@ # -*- coding: utf-8 -*- +import typing as t import codecs import os import re from .compat import PY2 +from .typedefs import StatusType, DictResponse, DictFile, DictStatement +if t.TYPE_CHECKING: + MatchBytes = re.Match[bytes] +else: + MatchBytes = re.Match + +ExtBuilderType = t.Callable[[DictStatement, str, int, bool], str] DELIMITERS = ('{', '}', ';') -EXTERNAL_BUILDERS = {} +EXTERNAL_BUILDERS: t.Dict[str, ExtBuilderType] = {} ESCAPE_SEQUENCES_RE = re.compile(r'(\\x[0-9a-f]{2}|\\[0-7]{1,3})') -def _escape(string): +def _escape(string: str) -> t.Generator[str, None, None]: prev, char = '', '' for char in string: if prev == '\\' or prev + char == '${': @@ -26,7 +34,7 @@ def _escape(string): yield char -def _needs_quotes(string): +def _needs_quotes(string: str) -> bool: if string == '': return True @@ -50,12 +58,11 @@ def _needs_quotes(string): return char in ('\\', '$') or expanding - -def _replace_escape_sequences(match): - return match.group(1).decode('string-escape') +def _replace_escape_sequences(match: MatchBytes) -> str: + return t.cast(str, match.group(1).decode('string-escape')) -def _enquote(arg): +def _enquote(arg: str) -> str: if not _needs_quotes(arg): return arg @@ -71,7 +78,7 @@ def _enquote(arg): return arg -def build(payload, indent=4, tabs=False, header=False): +def build(payload: t.List[DictStatement], indent: int = 4, tabs: bool = False, header: bool = False) -> str: padding = '\t' if tabs else ' ' * indent head = '' @@ -81,7 +88,7 @@ def build(payload, indent=4, tabs=False, header=False): head += '# https://github.com/nginxinc/crossplane/issues\n' head += '\n' - def _build_block(output, block, depth, last_line): + def _build_block(output: str, block: t.List[DictStatement], depth: int, last_line: int) -> str: margin = padding * depth for stmt in block: @@ -123,7 +130,7 @@ def _build_block(output, block, depth, last_line): return head + body -def build_files(payload, dirname=None, indent=4, tabs=False, header=False): +def build_files(payload: DictResponse, dirname: t.Optional[str] = None, indent: int = 4, tabs: bool = False, header: bool = False) -> None: """ Uses a full nginx config payload (output of crossplane.parse) to build config files, then writes those files to disk. @@ -149,6 +156,6 @@ def build_files(payload, dirname=None, indent=4, tabs=False, header=False): fp.write(output) -def register_external_builder(builder, directives): +def register_external_builder(builder: ExtBuilderType, directives: t.Iterable[str]) -> None: for directive in directives: EXTERNAL_BUILDERS[directive] = builder diff --git a/crossplane/compat.py b/crossplane/compat.py index 2829ffe..39652e8 100644 --- a/crossplane/compat.py +++ b/crossplane/compat.py @@ -1,11 +1,12 @@ # -*- coding: utf-8 -*- +import typing as t import functools import sys try: import simplejson as json except ImportError: - import json + import json # type: ignore[no-redef] PY2 = (sys.version_info[0] == 2) PY3 = (sys.version_info[0] == 3) @@ -18,13 +19,13 @@ basestring = str -def fix_pep_479(generator): +def fix_pep_479(generator: t.Any) -> t.Any: """ Python 3.7 breaks crossplane's lexer because of PEP 479 Read more here: https://www.python.org/dev/peps/pep-0479/ """ @functools.wraps(generator) - def _wrapped_generator(*args, **kwargs): + def _wrapped_generator(*args: t.Any, **kwargs: t.Any) -> t.Generator[t.Any, None, None]: try: for x in generator(*args, **kwargs): yield x @@ -32,3 +33,5 @@ def _wrapped_generator(*args, **kwargs): return return _wrapped_generator + +__all__ = ['PY2', 'PY3', 'input', 'basestring', 'fix_pep_479', 'json'] diff --git a/crossplane/errors.py b/crossplane/errors.py index bcb7904..898fd12 100644 --- a/crossplane/errors.py +++ b/crossplane/errors.py @@ -1,14 +1,15 @@ # -*- coding: utf-8 -*- +import typing as t class NgxParserBaseException(Exception): - def __init__(self, strerror, filename, lineno): + def __init__(self, strerror: str, filename: t.Optional[str], lineno: t.Optional[int]) -> None: self.args = (strerror, filename, lineno) self.filename = filename self.lineno = lineno self.strerror = strerror - def __str__(self): + def __str__(self) -> str: if self.lineno is not None: return '%s in %s:%s' % self.args else: diff --git a/crossplane/ext/abstract.py b/crossplane/ext/abstract.py index 41dd377..189e93e 100644 --- a/crossplane/ext/abstract.py +++ b/crossplane/ext/abstract.py @@ -1,24 +1,26 @@ # -*- coding: utf-8 -*- +import typing as t from crossplane.analyzer import register_external_directives from crossplane.lexer import register_external_lexer from crossplane.parser import register_external_parser from crossplane.builder import register_external_builder +from crossplane.typedefs import StatusType, DictResponse, DictFile, DictStatement class CrossplaneExtension(object): - directives = {} + directives: t.Dict[str, t.Any] = {} - def register_extension(self): - register_external_directives(directive=self.directives) + def register_extension(self) -> None: + register_external_directives(directives=self.directives) register_external_lexer(directives=self.directives, lexer=self.lex) register_external_parser(directives=self.directives, parser=self.parse) register_external_builder(directives=self.directives, builder=self.build) - def lex(self, token_iterator, directive): + def lex(self, token_iterator: t.Iterator[t.Tuple[str, int]], directive: str) -> t.Iterable[t.Tuple[str, int, bool]]: raise NotImplementedError - def parse(self, stmt, parsing, tokens, ctx=(), consume=False): + def parse(self, stmt: DictStatement, parsing: None, tokens: t.List[str], ctx: t.Tuple[str, ...] = (), consume: bool=False) -> None: raise NotImplementedError - def build(self, stmt, padding, state, indent=4, tabs=False): + def build(self, stmt: DictStatement, padding: str, indent: int=4, tabs: bool=False) -> str: raise NotImplementedError diff --git a/crossplane/ext/lua.py b/crossplane/ext/lua.py index 0bdb367..8bf8c16 100644 --- a/crossplane/ext/lua.py +++ b/crossplane/ext/lua.py @@ -1,20 +1,24 @@ # -*- coding: utf-8 -*- +import typing as t from crossplane.lexer import register_external_lexer from crossplane.builder import register_external_builder from crossplane.compat import fix_pep_479 from crossplane.errors import NgxParserBaseException from crossplane.ext.abstract import CrossplaneExtension +from crossplane.typedefs import StatusType, DictResponse, DictFile, DictStatement +T = t.TypeVar('T') -class EmplaceIter: - def __init__(self, it): + +class EmplaceIter(t.Generic[T]): + def __init__(self, it: t.Iterator[T]) -> None: self.it = it - self.ret = [] + self.ret: t.List[T] = [] - def __iter__(self): + def __iter__(self) -> t.Iterator[T]: return self - def __next__(self): + def __next__(self) -> T: if len(self.ret) > 0: v = self.ret.pop() return v @@ -22,7 +26,7 @@ def __next__(self): next = __next__ - def put_back(self, v): + def put_back(self, v: T) -> None: self.ret.append(v) @@ -50,12 +54,12 @@ class LuaBlockPlugin(CrossplaneExtension): 'ssl_session_store_by_lua_block': [], } - def register_extension(self): + def register_extension(self) -> None: register_external_lexer(directives=self.directives, lexer=self.lex) register_external_builder(directives=self.directives, builder=self.build) @fix_pep_479 - def lex(self, char_iterator, directive): + def lex(self, char_iterator: t.Iterator[t.Tuple[str, int]], directive: str) -> t.Generator[t.Tuple[str, int, bool], None, None]: if directive == "set_by_lua_block": # https://github.com/openresty/lua-nginx-module#set_by_lua_block # The sole *_by_lua_block directive that has an arg @@ -123,10 +127,10 @@ def lex(self, char_iterator, directive): raise StopIteration token += char - def parse(self, stmt, parsing, tokens, ctx=(), consume=False): + def parse(self, stmt: DictStatement, parsing: None, tokens: t.List[str], ctx: t.Tuple[str, ...] = (), consume: bool=False) -> None: pass - def build(self, stmt, padding, indent=4, tabs=False): + def build(self, stmt: DictStatement, padding: str, indent: int=4, tabs: bool=False) -> str: built = stmt['directive'] if built == 'set_by_lua_block': block = stmt['args'][1] diff --git a/crossplane/formatter.py b/crossplane/formatter.py index 80c3b21..a82d5b4 100644 --- a/crossplane/formatter.py +++ b/crossplane/formatter.py @@ -4,7 +4,7 @@ from .parser import parse -def format(filename, indent=4, tabs=False): +def format(filename: str, indent: int = 4, tabs: bool = False) -> str: payload = parse( filename, comments=True, diff --git a/crossplane/lexer.py b/crossplane/lexer.py index 2db9c6e..6b6fbbd 100644 --- a/crossplane/lexer.py +++ b/crossplane/lexer.py @@ -1,15 +1,18 @@ # -*- coding: utf-8 -*- import itertools import io +import typing as t from .compat import fix_pep_479 from .errors import NgxParserSyntaxError +from .typedefs import StatusType, DictResponse, DictFile, DictStatement -EXTERNAL_LEXERS = {} +ExtLexerType = t.Callable[[t.Iterator[t.Tuple[str, int]], str], t.Iterable[t.Tuple[str, int, bool]]] +EXTERNAL_LEXERS: t.Dict[str, ExtLexerType] = {} @fix_pep_479 -def _iterescape(iterable): +def _iterescape(iterable: t.Iterable[str]) -> t.Generator[str, None, None]: chars = iter(iterable) for char in chars: if char == '\\': @@ -17,7 +20,7 @@ def _iterescape(iterable): yield char -def _iterlinecount(iterable): +def _iterlinecount(iterable: t.Iterable[str]) -> t.Generator[t.Tuple[str, int], None, None]: line = 1 chars = iter(iterable) for char in chars: @@ -27,7 +30,7 @@ def _iterlinecount(iterable): @fix_pep_479 -def _lex_file_object(file_obj): +def _lex_file_object(file_obj: t.TextIO) -> t.Generator[t.Tuple[str, int, bool], None, None]: """ Generates token tuples from an nginx config file object @@ -37,9 +40,11 @@ def _lex_file_object(file_obj): token_line = 0 # the line the token starts on next_token_is_directive = True - it = itertools.chain.from_iterable(file_obj) - it = _iterescape(it) # treat escaped characters differently - it = _iterlinecount(it) # count the number of newline characters + it0: t.Iterator[str] + it0 = itertools.chain.from_iterable(file_obj) + it0 = _iterescape(it0) # treat escaped characters differently + it: t.Iterator[t.Tuple[str, int]] + it = _iterlinecount(it0) # count the number of newline characters for char, line in it: # handle whitespace @@ -120,7 +125,7 @@ def _lex_file_object(file_obj): token += char -def _balance_braces(tokens, filename=None): +def _balance_braces(tokens: t.Iterable[t.Tuple[str, int, bool]], filename: t.Optional[str] = None) -> t.Generator[t.Tuple[str, int, bool], None, None]: """Raises syntax errors if braces aren't balanced""" depth = 0 @@ -143,7 +148,7 @@ def _balance_braces(tokens, filename=None): raise NgxParserSyntaxError(reason, filename, line) -def lex(filename): +def lex(filename: str) -> t.Generator[t.Tuple[str, int, bool], None, None]: """Generates tokens from an nginx config file""" with io.open(filename, mode='r', encoding='utf-8', errors='replace') as f: it = _lex_file_object(f) @@ -152,6 +157,6 @@ def lex(filename): yield (token, line, quoted) -def register_external_lexer(directives, lexer): +def register_external_lexer(directives: t.Iterable[str], lexer: ExtLexerType) -> None: for directive in directives: EXTERNAL_LEXERS[directive] = lexer diff --git a/crossplane/parser.py b/crossplane/parser.py index 09268c8..be264b4 100644 --- a/crossplane/parser.py +++ b/crossplane/parser.py @@ -1,17 +1,21 @@ # -*- coding: utf-8 -*- +import typing as t import glob import os from .lexer import lex from .analyzer import analyze, enter_block_ctx -from .errors import NgxParserDirectiveError +from .errors import NgxParserBaseException, NgxParserDirectiveError +from .typedefs import StatusType, DictResponse, DictError, DictFile, DictFileError, DictStatement # map of external / third-party directives to a parse function -EXTERNAL_PARSERS = {} +ErrorCallbackType = t.Callable[[Exception], t.Any] +ExtParserType = t.Callable[[DictStatement, None, t.List[str], t.Tuple[str, ...], bool], None] +EXTERNAL_PARSERS: t.Dict[str, ExtParserType] = {} # TODO: raise special errors for invalid "if" args -def _prepare_if_args(stmt): +def _prepare_if_args(stmt: DictStatement) -> None: """Removes parentheses from an "if" directive's arguments""" args = stmt['args'] if args and args[0].startswith('(') and args[-1].endswith(')'): @@ -22,9 +26,9 @@ def _prepare_if_args(stmt): args[:] = args[start:end] -def parse(filename, onerror=None, catch_errors=True, ignore=(), single=False, - comments=False, strict=False, combine=False, check_ctx=True, - check_args=True): +def parse(filename: str, onerror: t.Optional[ErrorCallbackType] = None, catch_errors: bool=True, ignore:t.Container[str]=(), single:bool=False, + comments:bool=False, strict:bool=False, combine:bool=False, check_ctx:bool=True, + check_args:bool=True) -> DictResponse: """ Parses an nginx config file and returns a nested dict payload @@ -42,24 +46,24 @@ def parse(filename, onerror=None, catch_errors=True, ignore=(), single=False, """ config_dir = os.path.dirname(filename) - payload = { + payload: DictResponse = { 'status': 'ok', 'errors': [], 'config': [], } # start with the main nginx config file/context - includes = [(filename, ())] # stores (filename, config context) tuples + includes: t.List[t.Tuple[str, t.Tuple[str, ...]]] = [(filename, ())] # stores (filename, config context) tuples included = {filename: 0} # stores {filename: array index} map - def _handle_error(parsing, e): + def _handle_error(parsing: DictFile, e: Exception) -> None: """Adds representaions of an error to the payload""" file = parsing['file'] error = str(e) line = getattr(e, 'lineno', None) - parsing_error = {'error': error, 'line': line} - payload_error = {'file': file, 'error': error, 'line': line} + parsing_error: DictFileError = {'error': error, 'line': line} + payload_error: DictError = {'file': file, 'error': error, 'line': line} if onerror is not None: payload_error['callback'] = onerror(e) @@ -69,10 +73,10 @@ def _handle_error(parsing, e): payload['status'] = 'failed' payload['errors'].append(payload_error) - def _parse(parsing, tokens, ctx=(), consume=False): + def _parse(parsing: DictFile, tokens: t.Iterator[t.Tuple[str, int, bool]], ctx: t.Tuple[str, ...]=(), consume: bool = False) -> t.List[DictStatement]: """Recursively parses nginx config contexts""" fname = parsing['file'] - parsed = [] + parsed: t.List[DictStatement] = [] # parse recursively by pulling from a flat stream of tokens for token, lineno, quoted in tokens: @@ -91,6 +95,7 @@ def _parse(parsing, tokens, ctx=(), consume=False): # the first token should always(?) be an nginx directive directive = token + stmt: DictStatement if combine: stmt = { @@ -180,7 +185,7 @@ def _parse(parsing, tokens, ctx=(), consume=False): fnames = [pattern] except Exception as e: fnames = [] - e.lineno = stmt['line'] + t.cast(NgxParserBaseException, e).lineno = stmt['line'] if catch_errors: _handle_error(parsing, e) else: @@ -192,7 +197,7 @@ def _parse(parsing, tokens, ctx=(), consume=False): if fname not in included: included[fname] = len(includes) includes.append((fname, ctx)) - index = included[fname] + index: int = included[fname] stmt['includes'].append(index) # if this statement terminated with '{' then it is a block @@ -204,7 +209,7 @@ def _parse(parsing, tokens, ctx=(), consume=False): # add all comments found inside args after stmt is added for comment in comments_in_args: - comment_stmt = { + comment_stmt: DictStatement = { 'directive': '#', 'line': stmt['line'], 'args': [], @@ -217,7 +222,7 @@ def _parse(parsing, tokens, ctx=(), consume=False): # the includes list grows as "include" directives are found in _parse for fname, ctx in includes: tokens = lex(fname) - parsing = { + parsing: DictFile = { 'file': fname, 'status': 'ok', 'errors': [], @@ -236,7 +241,7 @@ def _parse(parsing, tokens, ctx=(), consume=False): return payload -def _combine_parsed_configs(old_payload): +def _combine_parsed_configs(old_payload: DictResponse) -> DictResponse: """ Combines config files into one by using include directives. @@ -245,7 +250,7 @@ def _combine_parsed_configs(old_payload): """ old_configs = old_payload['config'] - def _perform_includes(block): + def _perform_includes(block: t.Iterable[DictStatement]) -> t.Generator[DictStatement, None, None]: for stmt in block: if 'block' in stmt: stmt['block'] = list(_perform_includes(stmt['block'])) @@ -257,7 +262,7 @@ def _perform_includes(block): else: yield stmt # do not yield include stmt itself - combined_config = { + combined_config: DictFile = { 'file': old_configs[0]['file'], 'status': 'ok', 'errors': [], @@ -272,7 +277,7 @@ def _perform_includes(block): first_config = old_configs[0]['parsed'] combined_config['parsed'] += _perform_includes(first_config) - combined_payload = { + combined_payload: DictResponse = { 'status': old_payload.get('status', 'ok'), 'errors': old_payload.get('errors', []), 'config': [combined_config] @@ -280,7 +285,7 @@ def _perform_includes(block): return combined_payload -def register_external_parser(parser, directives): +def register_external_parser(parser: ExtParserType, directives: t.Iterable[str]) -> None: """ :param parser: parser function :param directives: list of directive strings diff --git a/crossplane/py.typed b/crossplane/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/crossplane/typedefs.py b/crossplane/typedefs.py new file mode 100644 index 0000000..7a178e1 --- /dev/null +++ b/crossplane/typedefs.py @@ -0,0 +1,46 @@ +# -*- coding: utf-8 -*- +import typing as t + +# This construction only required unless crossplane supports python<3.10 (for TypeAlias) or at least python<3.8 (for TypedDict and Literal) +if t.TYPE_CHECKING: + from typing_extensions import TypeAlias, TypedDict, Literal + StatusType: TypeAlias = Literal['ok', 'failed'] + + class DictStatementBase(TypedDict): + directive: str + line: int + args: t.List[str] + + class DictStatement(DictStatementBase, total=False): + includes: t.List[int] + block: t.List['DictStatement'] + comment: str + file: str + + class DictFileError(TypedDict): + error: str + line: t.Optional[int] + + class DictFile(TypedDict): + file: str + status: StatusType + errors: t.List[DictFileError] + parsed: t.List[DictStatement] + + class DictErrorBase(TypedDict): + error: str + file: str + line: t.Optional[int] + + class DictError(DictErrorBase, total=False): + callback: t.Any + + class DictResponse(TypedDict): + status: StatusType + errors: t.List[DictError] + config: t.List[DictFile] +else: + StatusType = str + DictResponse = DictError = DictFile = DictFileError = DictStatement = dict + +__all__ = ['StatusType', 'DictResponse', 'DictError', 'DictFile', 'DictFileError', 'DictStatement'] diff --git a/setup.py b/setup.py index 27010e6..10d8bbe 100644 --- a/setup.py +++ b/setup.py @@ -17,7 +17,7 @@ here = os.path.abspath(os.path.dirname(__file__)) -def get_readme(): +def get_readme() -> str: path = os.path.join(here, 'README.md') with io.open(path, encoding='utf-8') as f: return '\n' + f.read() @@ -27,20 +27,20 @@ class UploadCommand(Command): """Support setup.py upload.""" description = 'Build and publish the package.' - user_options = [] + user_options: list[str] = [] @staticmethod - def status(s): + def status(s: str) -> None: """Prints things in bold.""" print('\033[1m{0}\033[0m'.format(s)) - def initialize_options(self): + def initialize_options(self) -> None: pass - def finalize_options(self): + def finalize_options(self) -> None: pass - def run(self): + def run(self) -> None: try: self.status('Removing previous builds…') shutil.rmtree(os.path.join(here, 'dist')) @@ -73,8 +73,6 @@ def run(self): 'Intended Audience :: Information Technology', 'License :: OSI Approved :: Apache Software License', 'Programming Language :: Python', - 'Programming Language :: Python :: 2', - 'Programming Language :: Python :: 2.7', 'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3.6', 'Programming Language :: Python :: 3.7', diff --git a/tests/__init__.py b/tests/__init__.py index 6c08cfa..286e9c0 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,5 +1,7 @@ # -*- coding: utf-8 -*- import os +import typing as t +import py.path from crossplane.compat import basestring from crossplane.parser import parse @@ -8,7 +10,7 @@ here = os.path.dirname(__file__) -def assert_equal_payloads(a, b, ignore_keys=()): +def assert_equal_payloads(a: t.Any, b: t.Any, ignore_keys: t.Iterable[str] = ()) -> None: assert type(a) == type(b) if isinstance(a, list): assert len(a) == len(b) @@ -25,7 +27,7 @@ def assert_equal_payloads(a, b, ignore_keys=()): assert a == b -def compare_parsed_and_built(conf_dirname, conf_basename, tmpdir, **kwargs): +def compare_parsed_and_built(conf_dirname: str, conf_basename: str, tmpdir: py.path.local, **kwargs: t.Any) -> None: original_dirname = os.path.join(here, 'configs', conf_dirname) original_path = os.path.join(original_dirname, conf_basename) original_payload = parse(original_path, **kwargs) diff --git a/tox.ini b/tox.ini index 83342e5..a956678 100644 --- a/tox.ini +++ b/tox.ini @@ -11,7 +11,7 @@ addopts = -vv --showlocals --disable-warnings -rf -p no:warnings testpaths = tests/ [tox] -envlist = py27, py36, py37, py38, py39, py310, pypy +envlist = py36, py37, py38, py39, py310, pypy skipsdist = true [testenv]