From 20e4a0f79ff16eed02df5d9a26a48d35a76abbb6 Mon Sep 17 00:00:00 2001 From: Tit Date: Mon, 20 May 2019 22:19:42 +0200 Subject: [PATCH 1/2] Resolving linter errors flake8 Multiple linter errors resolved, reported by flake8. --- pysubtools/__init__.py | 12 +- pysubtools/exporters/base.py | 159 ++-- pysubtools/exporters/subrip.py | 113 +-- pysubtools/parsers/__init__.py | 14 +- pysubtools/parsers/base.py | 417 ++++++----- pysubtools/parsers/encodings.py | 222 +++--- pysubtools/parsers/microdvd.py | 456 ++++++------ pysubtools/parsers/subrip.py | 608 +++++++-------- pysubtools/subtitle.py | 1218 ++++++++++++++++--------------- pysubtools/utils.py | 31 +- setup.py | 25 +- tests/pysubtools_test.py | 607 +++++++-------- 12 files changed, 2010 insertions(+), 1872 deletions(-) diff --git a/pysubtools/__init__.py b/pysubtools/__init__.py index 7102b96..a663e9d 100644 --- a/pysubtools/__init__.py +++ b/pysubtools/__init__.py @@ -3,16 +3,16 @@ from __future__ import print_function from __future__ import unicode_literals -from zipfile import ZipExtFile +# from zipfile import ZipExtFile from . import parsers from . import exporters from .subtitle import Subtitle, SubtitleUnit, SubtitleLine __all__ = [ - 'Subtitle', - 'SubtitleUnit', - 'SubtitleLine', - 'parsers', - 'exporters', + 'Subtitle', + 'SubtitleUnit', + 'SubtitleLine', + 'parsers', + 'exporters', ] diff --git a/pysubtools/exporters/base.py b/pysubtools/exporters/base.py index 753ac85..273e864 100644 --- a/pysubtools/exporters/base.py +++ b/pysubtools/exporters/base.py @@ -7,82 +7,89 @@ from ..subtitle import Subtitle -class NoExporterFound(Exception): - pass - -class Exporter(object): - """Base class for exporting Subtitle.""" - - @staticmethod - def from_format(format, **options): - """Returns an exporter for specified 'format'.""" - for exporter in Exporter.__subclasses__(): - if exporter.FORMAT == format: - return exporter(**options) - raise NoExporterFound("Could not find exporter with name '{}'.".format(format)) - def __init__(self, **options): - self._init(**options) - - def _init(self, **options): - """A conveniance init method (no need for overloading).""" +class NoExporterFound(Exception): pass - def _export_metadata(self, metadata): - """ - Returns subtitles metadata in format. In your implementation, - make sure you output str object. Parameter 'metadata' is in - dict object. - """ - raise NotImplementedError - - def _export_unit(self, unit): - """ - Returns whole subtitle unit in format. In your implementation, - make sure you output str object. - """ - raise NotImplementedError - - def _export_end(self, metadata): - """Returns the end part of subtitle.""" - raise NotImplementedError - - @property - def format(self): - return self.FORMAT - - def export(self, output, subtitle): - """Exports to 'output', it may be filename or a file object.""" - if not isinstance(subtitle, Subtitle): - raise TypeError("Can export only Subtitle objects.") - - try: - basestring - except NameError: - # Python3 compat - basestring = str - - if isinstance(output, basestring): - output = io.BufferedWriter(io.open(output, 'wb')) - - try: - if isinstance(output, file): - output = io.BufferedWriter(io.FileIO(output.fileno(), closefd = False, mode = output.mode)) - except NameError: - # Python3 does not need this - pass - - if not isinstance(output, io.BufferedIOBase): - raise TypeError("Output needs to be a filename, file or BufferedIOBase with write capability.") - - # Export subtitle metadata - output.write(self._export_metadata(subtitle.meta)) - - # Go through units and export one by one - for unit in subtitle: - output.write(self._export_unit(unit)) - - # The final piece - output.write(self._export_end(subtitle.meta)) - - # Done + +class Exporter(object): + """Base class for exporting Subtitle.""" + + @staticmethod + def from_format(format, **options): + """Returns an exporter for specified 'format'.""" + for exporter in Exporter.__subclasses__(): + if exporter.FORMAT == format: + return exporter(**options) + raise NoExporterFound( + "Could not find exporter with name '{}'.".format(format)) + + def __init__(self, **options): + self._init(**options) + + def _init(self, **options): + """A conveniance init method (no need for overloading).""" + pass + + def _export_metadata(self, metadata): + """ + Returns subtitles metadata in format. In your implementation, + make sure you output str object. Parameter 'metadata' is in + dict object. + """ + raise NotImplementedError + + def _export_unit(self, unit): + """ + Returns whole subtitle unit in format. In your implementation, + make sure you output str object. + """ + raise NotImplementedError + + def _export_end(self, metadata): + """Returns the end part of subtitle.""" + raise NotImplementedError + + @property + def format(self): + return self.FORMAT + + def export(self, output, subtitle): + """Exports to 'output', it may be filename or a file object.""" + if not isinstance(subtitle, Subtitle): + raise TypeError("Can export only Subtitle objects.") + + try: + basestring + except NameError: + # Python3 compat + basestring = str + + if isinstance(output, basestring): + output = io.BufferedWriter(io.open(output, 'wb')) + + try: + if isinstance(output, file): + output = io.BufferedWriter( + io.FileIO(output.fileno(), closefd=False, + mode=output.mode)) + except NameError: + # Python3 does not need this + pass + + if not isinstance(output, io.BufferedIOBase): + raise TypeError( + """Output needs to be a filename, file or BufferedIOBase with + write capability.""") + + # Export subtitle metadata + output.write(self._export_metadata(subtitle.meta)) + + # Go through units and export one by one + for unit in subtitle: + output.write(self._export_unit(unit)) + + # The final piece + output.write(self._export_end(subtitle.meta)) + + # Done diff --git a/pysubtools/exporters/subrip.py b/pysubtools/exporters/subrip.py index ff367c4..641bc03 100644 --- a/pysubtools/exporters/subrip.py +++ b/pysubtools/exporters/subrip.py @@ -7,59 +7,62 @@ from ..subtitle import HumanTime + class SubRipExporter(Exporter): - """Exported for SubRip format.""" - FORMAT = 'SubRip' - - def _init(self, encoding = 'utf-8', line_ending = b'\r\n'): - self._encoding = encoding - self._line_ending = line_ending - - @staticmethod - def _convert_time(time): - output = [] - - if isinstance(time, (float, int)): - time = HumanTime.from_seconds(time) - elif not isinstance(time, HumanTime): - raise TypeError("Expecting time") - - output.append('{:02d}'.format(time.hours)) - output.append('{:02d}'.format(time.minutes)) - seconds = int(time.seconds) - miliseconds = int((time.seconds - seconds) * 1000) - output.append('{:02d},{:03d}'.format(seconds, miliseconds)) - - return ':'.join(output) - - def _export_metadata(self, metadata): - # No subtitle wide metadata, just reset counter - self._unit = 0 - return b'' - - def _export_unit(self, unit): - output = [] - - if self._unit: - # An empty line at the beginning - output.append(b'') - self._unit += 1 - - # Sequence - output.append(str(self._unit).encode(self._encoding)) - # Timing - # TODO 3D positions - output.append("{} --> {}".format(self._convert_time(unit.start), - self._convert_time(unit.end)).encode(self._encoding)) - # Text - output.append(self._line_ending.join([i.encode(self._encoding, 'ignore') for i in unit.lines])) - - # End of line - output.append(b'') - - # All done - return self._line_ending.join(output) - - def _export_end(self, metadata): - # No specific footer also - return b'' + """Exported for SubRip format.""" + FORMAT = 'SubRip' + + def _init(self, encoding='utf-8', line_ending=b'\r\n'): + self._encoding = encoding + self._line_ending = line_ending + + @staticmethod + def _convert_time(time): + output = [] + + if isinstance(time, (float, int)): + time = HumanTime.from_seconds(time) + elif not isinstance(time, HumanTime): + raise TypeError("Expecting time") + + output.append('{:02d}'.format(time.hours)) + output.append('{:02d}'.format(time.minutes)) + seconds = int(time.seconds) + miliseconds = int((time.seconds - seconds) * 1000) + output.append('{:02d},{:03d}'.format(seconds, miliseconds)) + + return ':'.join(output) + + def _export_metadata(self, metadata): + # No subtitle wide metadata, just reset counter + self._unit = 0 + return b'' + + def _export_unit(self, unit): + output = [] + + if self._unit: + # An empty line at the beginning + output.append(b'') + self._unit += 1 + + # Sequence + output.append(str(self._unit).encode(self._encoding)) + # Timing + # TODO 3D positions + output.append("{} --> {}".format( + self._convert_time(unit.start), + self._convert_time(unit.end)).encode(self._encoding)) + # Text + output.append(self._line_ending.join( + [i.encode(self._encoding, 'ignore') for i in unit.lines])) + + # End of line + output.append(b'') + + # All done + return self._line_ending.join(output) + + def _export_end(self, metadata): + # No specific footer also + return b'' diff --git a/pysubtools/parsers/__init__.py b/pysubtools/parsers/__init__.py index 8bddb32..f05a37b 100644 --- a/pysubtools/parsers/__init__.py +++ b/pysubtools/parsers/__init__.py @@ -7,13 +7,13 @@ from . import encodings # To load all parser -from . import subrip -from . import microdvd +# from . import subrip +# from . import microdvd __all__ = [ - 'Parser', - 'EncodingError', - 'NoParserError', - 'ParseError', - 'encodings', + 'Parser', + 'EncodingError', + 'NoParserError', + 'ParseError', + 'encodings', ] diff --git a/pysubtools/parsers/base.py b/pysubtools/parsers/base.py index 013c81a..d962afe 100644 --- a/pysubtools/parsers/base.py +++ b/pysubtools/parsers/base.py @@ -4,215 +4,230 @@ from __future__ import unicode_literals import io -import functools +# import functools from . import encodings + class NoParserError(Exception): - pass + pass + class ParseError(Exception): - def __init__(self, line_number, column, line, description): - self.line_number = line_number - self.column = column - self.line = line - self.description = description - super(ParseError, self).__init__(self.description) - - def __str__(self): - return str(unicode(self)) - - def __unicode__(self): - return u"Parse error on line {} at column {} error occurred '{}'".format( - self.line_number, - self.column, - self.description - ) + def __init__(self, line_number, column, line, description): + self.line_number = line_number + self.column = column + self.line = line + self.description = description + super(ParseError, self).__init__(self.description) + + def __str__(self): + return str(unicode(self)) + + def __unicode__(self): + return + u"Parse error on line {} at column {} error occurred '{}'".format( + self.line_number, + self.column, + self.description + ) + class ParseWarning(ParseError): - def __unicode__(self): - return "Parse warning on line {} at column {} warning occurred '{}'".format( - self.line_number, - self.column, - self.description - ) + def __unicode__(self): + return + "Parse warning on line {} at column {} warning occurred '{}'".format( + self.line_number, + self.column, + self.description + ) + class Parser(object): - """Abstract class for all parsers. - """ - LEVELS = ( - 'warning', - 'error' - ) - _subtitle = None - _stop_level = 'error' - parsed = None - encoding = None - encoding_confidence = None - - def __init__(self, stop_level = 'error'): - self.warnings = [] - self.errors = [] - self._data = None - self._stop_level = stop_level - - # Part of the parser internals - self._read_lines = [] - self._current_line_num = -1 - self._current_line = None - - def _add_msg(self, level, line_number, column, line, description): - if self._stop_level and self.LEVELS.index(level) >= self.LEVELS.index(self._stop_level): - if level == 'warning': - raise ParseWarning(line_number, column, line, description) - elif level == 'error': - raise ParseError(line_number, column, line, description) - - try: - line = unicode(line) - description = unicode(description) - except NameError: - # Python3 compat - line = str(line) - description = str(description) - - msg = { - 'line_number': int(line_number), - 'col': int(column), - 'line': line, - 'description': description - } - - if level == 'warning': - self.warnings.append(msg) - elif level == 'error': - self.errors.append(msg) - - def add_warning(self, *args, **kwargs): - self._add_msg('warning', *args, **kwargs) - - def add_error(self, *args, **kwargs): - self._add_msg('error', *args, **kwargs) - - @staticmethod - def _normalize_data(data): - try: - if isinstance(data, file): - data = io.BufferedReader(io.FileIO(data.fileno(), closefd = False)) - except NameError: - # Not needed in Python3 - pass - - if isinstance(data, bytes): - data = io.BytesIO(data) - elif not isinstance(data, (io.BytesIO, io.BufferedReader)): - raise TypeError("Needs to be a file object or bytes.") - data.seek(0) - return data - - @classmethod - def can_parse(cls, data): - data = cls._normalize_data(data) - return cls._can_parse(data) - - @classmethod - def _can_parse(cls, data): - """Needs to be reimplemented to quickly check if file seems the proper format.""" - raise NotImplementedError - - def _parse(self): - """ - Parses the file, it returns a list of units in specified format. Needs to be - implemented by the parser. It can also be a generator (yield) + """Abstract class for all parsers. """ - raise NotImplementedError - - def _parse_metadata(self): - """Parses the subtitle metadata (if format has a header at all).""" - return {} - - def parse(self, data = None, encoding = None, language = None, **kwargs): - """Parses the file and returns the subtitle. Check warnings after the parse.""" - if data: - # We have new data, discard old and set up for new - self._data = self._normalize_data(data) - # Check encoding - self.encoding, self.encoding_confidence = encodings.detect(self._data, encoding = encoding, language = language) - self._data.seek(0) - # Wrap it - self._data = io.TextIOWrapper(self._data, self.encoding, newline = '', errors = 'replace') - - - # Create subtitle - from .. import Subtitle, SubtitleUnit - sub = Subtitle(**self._parse_metadata()) - for unit in self._parse(**kwargs): - try: - sub.append(SubtitleUnit(**unit['data'])) - except TypeError: - # We may have malformed units - self.add_error( - self._current_line_num + 1, - 1, - self._current_line, - 'Wrongly parsed unit, might be a result of a previous error.' - ) - return sub - - @staticmethod - def from_data(data, encoding = None, language = None, **kwargs): - """Returns a parser that can parse 'data' in raw string.""" - data = Parser._normalize_data(data) - encoding, encoding_confidence = encodings.detect(data, encoding, language) - data.seek(0) - - for parser in Parser.__subclasses__(): - if not parser.can_parse(data): - continue - parser = parser(**kwargs) - parser._data = io.TextIOWrapper(data, encoding, newline = '', errors = 'replace') - parser.encoding = encoding - parser.encoding_confidence = encoding_confidence - return parser - raise NoParserError("Could not find parser.") - - @staticmethod - def from_format(format, **kwargs): - """Returns a parser with 'name'.""" - for parser in Parser.__subclasses__(): - if parser.FORMAT == format: - return parser(**kwargs) - raise NoParserError("Could not find parser.") - - def __del__(self): - # Detach _data - if self._data: - try: - self._data.detach() - except ValueError: - # We may have an already closed underlying file object - pass - - # Iteration methods - def _next_line(self): - line = self._data.readline() - if not line: - return False - self._current_line_num += 1 - - self._current_line = line.rstrip() - self._read_lines.append(line) - - return True - - def _fetch_line(self, line): - if line > self._current_line_num: - raise ValueError("Cannot seek forward.") - - return self._read_lines[line].rstrip() - - def _rewind(self): - self._current_line_num = -1 - self._read_lines = [] - self._current_line = None - self._data.seek(0) + LEVELS = ( + 'warning', + 'error' + ) + _subtitle = None + _stop_level = 'error' + parsed = None + encoding = None + encoding_confidence = None + + def __init__(self, stop_level='error'): + self.warnings = [] + self.errors = [] + self._data = None + self._stop_level = stop_level + + # Part of the parser internals + self._read_lines = [] + self._current_line_num = -1 + self._current_line = None + + def _add_msg(self, level, line_number, column, line, description): + if (self._stop_level and self.LEVELS.index(level) >= + self.LEVELS.index(self._stop_level)): + if level == 'warning': + raise ParseWarning(line_number, column, line, description) + elif level == 'error': + raise ParseError(line_number, column, line, description) + + try: + line = unicode(line) + description = unicode(description) + except NameError: + # Python3 compat + line = str(line) + description = str(description) + + msg = { + 'line_number': int(line_number), + 'col': int(column), + 'line': line, + 'description': description + } + + if level == 'warning': + self.warnings.append(msg) + elif level == 'error': + self.errors.append(msg) + + def add_warning(self, *args, **kwargs): + self._add_msg('warning', *args, **kwargs) + + def add_error(self, *args, **kwargs): + self._add_msg('error', *args, **kwargs) + + @staticmethod + def _normalize_data(data): + try: + if isinstance(data, file): + data = io.BufferedReader( + io.FileIO(data.fileno(), closefd=False)) + except NameError: + # Not needed in Python3 + pass + + if isinstance(data, bytes): + data = io.BytesIO(data) + elif not isinstance(data, (io.BytesIO, io.BufferedReader)): + raise TypeError("Needs to be a file object or bytes.") + data.seek(0) + return data + + @classmethod + def can_parse(cls, data): + data = cls._normalize_data(data) + return cls._can_parse(data) + + @classmethod + def _can_parse(cls, data): + """Needs to be reimplemented to quickly check if file seems + the proper format.""" + raise NotImplementedError + + def _parse(self): + """ + Parses the file, it returns a list of units in specified format. + Needs to be implemented by the parser. It can also be a generator + (yield) + """ + raise NotImplementedError + + def _parse_metadata(self): + """Parses the subtitle metadata (if format has a header at all).""" + return {} + + def parse(self, data=None, encoding=None, language=None, **kwargs): + """Parses the file and returns the subtitle. Check warnings after + the parse.""" + if data: + # We have new data, discard old and set up for new + self._data = self._normalize_data(data) + # Check encoding + self.encoding, self.encoding_confidence = encodings.detect( + self._data, encoding=encoding, language=language) + self._data.seek(0) + # Wrap it + self._data = io.TextIOWrapper( + self._data, self.encoding, newline='', errors='replace') + + # Create subtitle + from .. import Subtitle, SubtitleUnit + sub = Subtitle(**self._parse_metadata()) + for unit in self._parse(**kwargs): + try: + sub.append(SubtitleUnit(**unit['data'])) + except TypeError: + # We may have malformed units + self.add_error( + self._current_line_num + 1, + 1, + self._current_line, + """Wrongly parsed unit, might be a result of a previous + error.""" + ) + return sub + + @staticmethod + def from_data(data, encoding=None, language=None, **kwargs): + """Returns a parser that can parse 'data' in raw string.""" + data = Parser._normalize_data(data) + encoding, encoding_confidence = encodings.detect( + data, encoding, language) + data.seek(0) + + for parser in Parser.__subclasses__(): + if not parser.can_parse(data): + continue + parser = parser(**kwargs) + parser._data = io.TextIOWrapper( + data, encoding, newline='', errors='replace') + parser.encoding = encoding + parser.encoding_confidence = encoding_confidence + return parser + raise NoParserError("Could not find parser.") + + @staticmethod + def from_format(format, **kwargs): + """Returns a parser with 'name'.""" + for parser in Parser.__subclasses__(): + if parser.FORMAT == format: + return parser(**kwargs) + raise NoParserError("Could not find parser.") + + def __del__(self): + # Detach _data + if self._data: + try: + self._data.detach() + except ValueError: + # We may have an already closed underlying file object + pass + + # Iteration methods + def _next_line(self): + line = self._data.readline() + if not line: + return False + self._current_line_num += 1 + + self._current_line = line.rstrip() + self._read_lines.append(line) + + return True + + def _fetch_line(self, line): + if line > self._current_line_num: + raise ValueError("Cannot seek forward.") + + return self._read_lines[line].rstrip() + + def _rewind(self): + self._current_line_num = -1 + self._read_lines = [] + self._current_line = None + self._data.seek(0) diff --git a/pysubtools/parsers/encodings.py b/pysubtools/parsers/encodings.py index 42bc238..8b3462f 100644 --- a/pysubtools/parsers/encodings.py +++ b/pysubtools/parsers/encodings.py @@ -9,119 +9,127 @@ invalid_chars = u'\x9e' similar_encodings = { - 'ISO-8859-2': ['windows-1250'], - 'windows-1255': ['windows-1256'], - 'GB2312': ['GB18030'], - # Just a try - 'EUC-TW': ['BIG5-TW'], + 'ISO-8859-2': ['windows-1250'], + 'windows-1255': ['windows-1256'], + 'GB2312': ['GB18030'], + # Just a try + 'EUC-TW': ['BIG5-TW'], } + class EncodingError(Exception): - def __init__(self, message, tried_encodings = [], *args, **kwargs): - self.tried_encodings = tried_encodings - super(EncodingError, self).__init__(message, *args, **kwargs) + def __init__(self, message, tried_encodings=[], *args, **kwargs): + self.tried_encodings = tried_encodings + super(EncodingError, self).__init__(message, *args, **kwargs) + def guess_from_lang(lang): - """Specify ISO-639-1 language to guess probable encoding.""" - guesses = { - 'sl': ['windows-1250'], - 'pl': ['windows-1250'], - 'ko': ['euckr'], - 'ja': ['sjis'], - 'ar': ['windows-1256'], - 'el': ['windows-1253'], - 'zh': ['big5'], - 'he': ['windows-1255'], - 'ru': ['koi8-r'], - 'es': ['windows-1252'], - 'fr': ['windows-1252'], - 'bg': ['windows-1251'], - 'mk': ['windows-1251'], - 'th': ['windows-874'], - 'uk': ['koi8-u'], - 'sr': ['windows-1251'], - 'vi': ['windows-1258'], - 'fa': ['windows-1256'], - 'fi': ['iso8859-15'], - 'es': ['iso8859-15'], - 'pt': ['iso8859-15'], - 'da': ['iso8859-15'], - 'pt-br': ['iso8859-15'], - } - - # Revert to chardet - return guesses.get(lang, []) + """Specify ISO-639-1 language to guess probable encoding.""" + guesses = { + 'sl': ['windows-1250'], + 'pl': ['windows-1250'], + 'ko': ['euckr'], + 'ja': ['sjis'], + 'ar': ['windows-1256'], + 'el': ['windows-1253'], + 'zh': ['big5'], + 'he': ['windows-1255'], + 'ru': ['koi8-r'], + 'es': ['windows-1252'], + 'fr': ['windows-1252'], + 'bg': ['windows-1251'], + 'mk': ['windows-1251'], + 'th': ['windows-874'], + 'uk': ['koi8-u'], + 'sr': ['windows-1251'], + 'vi': ['windows-1258'], + 'fa': ['windows-1256'], + 'fi': ['iso8859-15'], + 'pt': ['iso8859-15'], + 'da': ['iso8859-15'], + 'pt-br': ['iso8859-15'], + } + + # Revert to chardet + return guesses.get(lang, []) + def can_decode(data, encoding): - reader = None - proper = False - try: - reader = io.TextIOWrapper(data, encoding, newline = '') - proper = True - # Go through data - for line in reader: - for char in invalid_chars: - if char in line: - proper = False - break - except (UnicodeDecodeError, LookupError): + reader = None proper = False - - if reader: - reader.detach() - data.seek(0) - return proper - -def detect(data, encoding = None, language = None): - """ - Tries to detect encoding for specified 'data'. Will return a tuple (encoding, confidence). - Confidence may be None, which means the encoding was detected from provided language or - encoding hint, or it stumbled over a unicode BOM. - """ - if not isinstance(data, (io.BytesIO, io.BufferedReader)): - raise TypeError("Needs to be a buffered file object.") - - tried_encodings = set() - - # Check for BOM (100% confidence) - test_data = data.read(8) - data.seek(0) - if test_data.startswith(codecs.BOM_UTF8): - return 'utf-8-sig', None - elif test_data.startswith(codecs.BOM_UTF16): - return 'utf16', None - - encodings = [] - if encoding: - encodings.append(encoding) - if language: - encodings += guess_from_lang(language) - - # Autodetect encoding - detected = chardet.detect(data.read()) - data.seek(0) - if detected and detected['encoding']: - encodings.append((detected['encoding'], detected['confidence'])) - if not encodings: - raise EncodingError("Have no clue where to start.") - - # Reverse order - encodings.reverse() - while True: - encoding = encodings.pop() - if can_decode(data, encoding if not isinstance(encoding, tuple) else encoding[0]): - # We've found it! - break - tried_encodings.add(encoding if not isinstance(encoding, tuple) else encoding[0]) - - similar = similar_encodings.get(encoding if not isinstance(encoding, tuple) else encoding[0]) - if similar: - encodings += list(set(similar).difference(tried_encodings)) + try: + reader = io.TextIOWrapper(data, encoding, newline='') + proper = True + # Go through data + for line in reader: + for char in invalid_chars: + if char in line: + proper = False + break + except (UnicodeDecodeError, LookupError): + proper = False + + if reader: + reader.detach() + data.seek(0) + return proper + + +def detect(data, encoding=None, language=None): + """ + Tries to detect encoding for specified 'data'. Will return a tuple + (encoding, confidence). Confidence may be None, which means the encoding + was detected from provided language or encoding hint, or it stumbled + over a unicode BOM. + """ + if not isinstance(data, (io.BytesIO, io.BufferedReader)): + raise TypeError("Needs to be a buffered file object.") + + tried_encodings = set() + + # Check for BOM (100% confidence) + test_data = data.read(8) + data.seek(0) + if test_data.startswith(codecs.BOM_UTF8): + return 'utf-8-sig', None + elif test_data.startswith(codecs.BOM_UTF16): + return 'utf16', None + + encodings = [] + if encoding: + encodings.append(encoding) + if language: + encodings += guess_from_lang(language) + + # Autodetect encoding + detected = chardet.detect(data.read()) + data.seek(0) + if detected and detected['encoding']: + encodings.append((detected['encoding'], detected['confidence'])) if not encodings: - # We lost :( - raise EncodingError("Could not detect proper encoding", list(tried_encodings)) - - if not isinstance(encoding, tuple): - encoding = (encoding, None) - - return encoding + raise EncodingError("Have no clue where to start.") + + # Reverse order + encodings.reverse() + while True: + encoding = encodings.pop() + if can_decode(data, encoding if not isinstance(encoding, tuple) else + encoding[0]): + # We've found it! + break + tried_encodings.add(encoding if not isinstance( + encoding, tuple) else encoding[0]) + + similar = similar_encodings.get( + encoding if not isinstance(encoding, tuple) else encoding[0]) + if similar: + encodings += list(set(similar).difference(tried_encodings)) + if not encodings: + # We lost :( + raise EncodingError( + "Could not detect proper encoding", list(tried_encodings)) + + if not isinstance(encoding, tuple): + encoding = (encoding, None) + + return encoding diff --git a/pysubtools/parsers/microdvd.py b/pysubtools/parsers/microdvd.py index 6c8fb0b..380c05c 100644 --- a/pysubtools/parsers/microdvd.py +++ b/pysubtools/parsers/microdvd.py @@ -8,229 +8,241 @@ from .base import Parser from ..subtitle import Frame, SubtitleLine + def update_dict(d, s): - """Update with recursion.""" - for key in s.keys(): - if isinstance(d.get(key), dict) and isinstance(s[key], dict): - update_dict(d[key], s[key]) - else: - d[key] = s[key] + """Update with recursion.""" + for key in s.keys(): + if isinstance(d.get(key), dict) and isinstance(s[key], dict): + update_dict(d[key], s[key]) + else: + d[key] = s[key] + class MicroDVDParser(Parser): - """Parser for SubRip. - """ - FORMAT = 'MicroDVD' - FORMAT_RE = re.compile(r'^\{(?P\d+)\}\{(?P\d+)\}(?P
(:?\{[^}]+\})*)(?P.*)$', re.M) - HEADER_RE = re.compile(r'^\{DEFAULT\}(?P
(:?\{[^}]+\})*)$') - - @classmethod - def _can_parse(cls, data): - # Go through first few lines - can = False - for i in range(0, 10): - line = data.readline() - if isinstance(line, bytes): - line = line.decode(errors = 'replace') - can = bool(cls.FORMAT_RE.search(line)) - if can: - break - data.seek(0) - return can - - def _parse_header(self, header, global_only = False): - # TODO Add FPS heuristic (first line as fps) - output = { - 'local': {} - } - ################################################################# - # Supported header tags (lowercase represent global and local): # - # * y - font-style # - # * i - italics # - # * b - bold # - # * u - underlined # - # * s - stroked # - # * f - font-family # - # * s - font-size # - # * c - color ($BBGGRR) -> #RRGGBB # - # * P - position x,y -> {'x': x, 'y': y} # - # * H - charset - unused (we've already set it) # - ################################################################# - if header is None: - header = u'' - # Break the header into several ones - for h in header.replace('{', '').split('}')[:-1]: - k, v = h.split(':') - if global_only and k.islower(): - continue - k = k.strip() - v = v.strip() - t = { - 'styles': { - '*': { - 'text-decoration': [] - } - } - } - - if k.lower() == 'y': - # Font style - for i in v.split(','): - i = i.strip() - if i == 'b': - t['styles']['*']['font-weight'] = 'bold' - elif i == 'i': - t['styles']['*']['text-style'] = 'italic' - elif i == 'u': - t['styles']['*']['text-decoration'].append('underline') - elif i == 's': - t['styles']['*']['text-decoration'].append('line-through') - else: - self.add_warning(self._current_line_num + 1, - 1, - self._fetch_line(self._current_line_num), - u"Unknown style tag {}.".format(i)) - t['styles']['*']['text-decoration'] = ' '.join(t['styles']['*']['text-decoration']) - elif k.lower() == 'f': - # Font family - t['styles']['*']['font-family'] = v.strip() - elif k.lower() == 's': - # Font size - t['styles']['*']['font-size'] = v.strip() + ('px' if v.strip().isdigit() else '') - elif k.lower() == 'c': - # Text color - v = v.strip() - if re.match('^\$[0-9a-fA-F]{6}$', v): - t['styles']['*']['color'] = '#' + v[5:] + v[3:5] + v[1:3] - else: - self.add_warning(self._current_line_num + 1, - 1, - self._fetch_line(self._current_line_num), - u"Wrong color format {}.".format(v)) - elif k == 'P': - # Position - m = re.match('^\s*(\d+)\s*,\s*(\d+)\s*$', v) - if not m: - self.add_warning(self._current_line_num + 1, - 1, - self._fetch_line(self._current_line_num), - u"Malformed position {}.".format(v)) - else: - t['position'] = { - 'x': int(m.group(1)), - 'y': int(m.group(2)) - } - elif k == 'H': - # Silently ignore since it is charset setting - pass - else: - self.add_warning(self._current_line_num + 1, - 1, - self._fetch_line(self._current_line_num), - u"Unknwon header {}.".format(k)) - - if not t['styles']['*']['text-decoration']: - del t['styles']['*']['text-decoration'] - - if k.islower(): - update_dict(output['local'], t) - else: - update_dict(output, t) - - if global_only: - del output['local'] - - return output - - def _to_header_dict(self, h): - if not h: - return {} - - try: - h = h[1:-1] - # Take pair out, and strip them - d = dict([(j.strip() for j in i.split(':')) for i in h.split('}{')]) - # Take only local ones - return {(k, v) for k, v in d.items() if k.islower()} - except ValueError: - # Cannot parse this header, probably it is wrong - line = self._fetch_line(self._current_line_num) - self.add_warning(self._current_line_num + 1, line.index(h), line, "It looks like a line header but it's not.") - return {} - - def _from_header_dict(self, h): - if not h: - return '' - return '{' + '}{'.join([':'.join([k, v]) for k, v in h.items()]) + '}' - - def _parse_metadata(self): - # Need for default header lines - self._skip_lines = set([]) - - output = {} - # Scan the whole subtitle for global metadata - while self._next_line(): - m = self.HEADER_RE.match(self._current_line.strip()) - if m: - output.update(self._parse_header(m.group('header'), global_only = True)) - self._skip_lines.add(self._current_line_num) - # Rewind - self._rewind() - return output - - def _parse(self, fps = None, **kwargs): - while self._next_line(): - if self._current_line_num in self._skip_lines: - # We have a metadata line - continue - - m = self.FORMAT_RE.match(self._current_line.strip()) - if not m: - self.add_error(self._current_line_num + 1, 1, self._current_line, "Could not parse line") - else: - if not m.group('text'): - self.add_warning(self._current_line_num + 1, - 1, - self._fetch_line(self._current_line_num), - "Empty unit.") - - start, end = int(m.group('start')), int(m.group('end')) - if fps: - start /= fps - end /= fps - else: - start, end = Frame(start), Frame(end) - # Parse main header - header = self._parse_header(m.groupdict().get('header', '')) - h_inherit = [self._to_header_dict(m.groupdict().get('header', ''))] - # Go through lines and parse out headers - lines = [] - for l in m.group('text').split('|'): - if l.startswith('{'): - h_i = l.index('}') + 1 - # We have a local header - h = self._to_header_dict(l[:h_i]) - else: - h_i = 0 - h = {} - h_inherit.append(h) - # Construct local header - h = {} - for i in h_inherit: - h.update(i) - h = self._parse_header(self._from_header_dict(h))['local'] - - # Construct line - lines.append(SubtitleLine(l[h_i:], **h)) - # Parse unit - data = { - 'start': start, - 'end': end, - 'lines': lines, - } - # Add unit metadata - header.pop('local') - data.update(header) - # Pass along the unit data - yield { - 'data': data + """Parser for SubRip. + """ + FORMAT = 'MicroDVD' + FORMAT_RE = re.compile(r'^\{(?P\d+)\}\{(?P\d+)\}(?P
(:?' + r'\{[^}]+\})*)(?P.*)$', re.M) + HEADER_RE = re.compile(r'^\{DEFAULT\}(?P
(:?\{[^}]+\})*)$') + + @classmethod + def _can_parse(cls, data): + # Go through first few lines + can = False + for i in range(0, 10): + line = data.readline() + if isinstance(line, bytes): + line = line.decode(errors='replace') + can = bool(cls.FORMAT_RE.search(line)) + if can: + break + data.seek(0) + return can + + def _parse_header(self, header, global_only=False): + # TODO Add FPS heuristic (first line as fps) + output = { + 'local': {} } + ################################################################# + # Supported header tags (lowercase represent global and local): # + # * y - font-style # + # * i - italics # + # * b - bold # + # * u - underlined # + # * s - stroked # + # * f - font-family # + # * s - font-size # + # * c - color ($BBGGRR) -> #RRGGBB # + # * P - position x,y -> {'x': x, 'y': y} # + # * H - charset - unused (we've already set it) # + ################################################################# + if header is None: + header = u'' + # Break the header into several ones + for h in header.replace('{', '').split('}')[:-1]: + k, v = h.split(':') + if global_only and k.islower(): + continue + k = k.strip() + v = v.strip() + t = { + 'styles': { + '*': { + 'text-decoration': [] + } + } + } + + if k.lower() == 'y': + # Font style + for i in v.split(','): + i = i.strip() + if i == 'b': + t['styles']['*']['font-weight'] = 'bold' + elif i == 'i': + t['styles']['*']['text-style'] = 'italic' + elif i == 'u': + t['styles']['*']['text-decoration'].append('underline') + elif i == 's': + t['styles']['*']['text-decoration'].append( + 'line-through') + else: + self.add_warning(self._current_line_num + 1, + 1, + self._fetch_line( + self._current_line_num), + u"Unknown style tag {}.".format(i)) + t['styles']['*']['text-decoration'] = ' '.join( + t['styles']['*']['text-decoration']) + elif k.lower() == 'f': + # Font family + t['styles']['*']['font-family'] = v.strip() + elif k.lower() == 's': + # Font size + t['styles']['*']['font-size'] = v.strip() + \ + ('px' if v.strip().isdigit() else '') + elif k.lower() == 'c': + # Text color + v = v.strip() + if re.match('^\$[0-9a-fA-F]{6}$', v): + t['styles']['*']['color'] = '#' + v[5:] + v[3:5] + v[1:3] + else: + self.add_warning(self._current_line_num + 1, + 1, + self._fetch_line(self._current_line_num), + u"Wrong color format {}.".format(v)) + elif k == 'P': + # Position + m = re.match('^\s*(\d+)\s*,\s*(\d+)\s*$', v) + if not m: + self.add_warning(self._current_line_num + 1, + 1, + self._fetch_line(self._current_line_num), + u"Malformed position {}.".format(v)) + else: + t['position'] = { + 'x': int(m.group(1)), + 'y': int(m.group(2)) + } + elif k == 'H': + # Silently ignore since it is charset setting + pass + else: + self.add_warning(self._current_line_num + 1, + 1, + self._fetch_line(self._current_line_num), + u"Unknwon header {}.".format(k)) + + if not t['styles']['*']['text-decoration']: + del t['styles']['*']['text-decoration'] + + if k.islower(): + update_dict(output['local'], t) + else: + update_dict(output, t) + + if global_only: + del output['local'] + + return output + + def _to_header_dict(self, h): + if not h: + return {} + + try: + h = h[1:-1] + # Take pair out, and strip them + d = dict([(j.strip() for j in i.split(':')) + for i in h.split('}{')]) + # Take only local ones + return {(k, v) for k, v in d.items() if k.islower()} + except ValueError: + # Cannot parse this header, probably it is wrong + line = self._fetch_line(self._current_line_num) + self.add_warning(self._current_line_num + 1, line.index(h), + line, "It looks like a line header but it's not.") + return {} + + def _from_header_dict(self, h): + if not h: + return '' + return '{' + '}{'.join([':'.join([k, v]) for k, v in h.items()]) + '}' + + def _parse_metadata(self): + # Need for default header lines + self._skip_lines = set([]) + + output = {} + # Scan the whole subtitle for global metadata + while self._next_line(): + m = self.HEADER_RE.match(self._current_line.strip()) + if m: + output.update(self._parse_header( + m.group('header'), global_only=True)) + self._skip_lines.add(self._current_line_num) + # Rewind + self._rewind() + return output + + def _parse(self, fps=None, **kwargs): + while self._next_line(): + if self._current_line_num in self._skip_lines: + # We have a metadata line + continue + + m = self.FORMAT_RE.match(self._current_line.strip()) + if not m: + self.add_error(self._current_line_num + 1, 1, + self._current_line, "Could not parse line") + else: + if not m.group('text'): + self.add_warning(self._current_line_num + 1, + 1, + self._fetch_line(self._current_line_num), + "Empty unit.") + + start, end = int(m.group('start')), int(m.group('end')) + if fps: + start /= fps + end /= fps + else: + start, end = Frame(start), Frame(end) + # Parse main header + header = self._parse_header(m.groupdict().get('header', '')) + h_inherit = [self._to_header_dict( + m.groupdict().get('header', ''))] + # Go through lines and parse out headers + lines = [] + for l in m.group('text').split('|'): + if l.startswith('{'): + h_i = l.index('}') + 1 + # We have a local header + h = self._to_header_dict(l[:h_i]) + else: + h_i = 0 + h = {} + h_inherit.append(h) + # Construct local header + h = {} + for i in h_inherit: + h.update(i) + h = self._parse_header(self._from_header_dict(h))['local'] + + # Construct line + lines.append(SubtitleLine(l[h_i:], **h)) + # Parse unit + data = { + 'start': start, + 'end': end, + 'lines': lines, + } + # Add unit metadata + header.pop('local') + data.update(header) + # Pass along the unit data + yield { + 'data': data + } diff --git a/pysubtools/parsers/subrip.py b/pysubtools/parsers/subrip.py index 7579685..6da4e41 100644 --- a/pysubtools/parsers/subrip.py +++ b/pysubtools/parsers/subrip.py @@ -4,301 +4,331 @@ from __future__ import unicode_literals import re -import io +# import io from state_machine import acts_as_state_machine, before, after,\ - State, Event, InvalidStateTransition + State, Event, InvalidStateTransition from .base import Parser + @acts_as_state_machine class SubRipStateMachine(object): - name = 'SubRip State machine' - - class Skip(Exception): - pass - - # Let us define some states - start = State(initial = True) - unit = State() - unit_text = State() - finished = State() - - # And events - found_sequence = Event(from_states = [start], to_state = unit) - found_header = Event(from_states = [unit, unit_text, start], to_state = unit_text) - found_text = Event(from_states = [unit_text, start], to_state = unit_text) - found_empty = Event(from_states = [unit_text, start, unit], to_state = start) - done = Event(from_states = [unit, unit_text, start], to_state = finished) - - # Regular expressions - # Components - _time = re.compile(r'(?:\d{1,2}:){2}\d{1,2},\d{1,3}') - - # Parts of unit - _sequence = re.compile(r'^\s*\d+\s*$') - _header = re.compile(r'^\s*([0-9:,.]+\s*-->\s*[0-9:,.]+)\s*(.*)$') - _tagged_header = re.compile(r'^\{([^}]*)\}') - - # Tagged properties - _tag_position = re.compile(r'\s*\\pos\(\s*(\d+)\s*,\s*(\d+)\s*\)\s*') - - def __init__(self, parser): - self.parser = parser - self._parsed = None - self.temp = None - self.paused = False - - @property - def read_lines(self): - return self.parser._read_lines - - @property - def current_line_num(self): - return self.parser._current_line_num - - @property - def current_line(self): - return self.parser._current_line - - @current_line.setter - def current_line(self, value): - self.parser._current_line = value - - def next_line(self): - return self.parser._next_line() - - def fetch_line(self, line): - return self.parser._fetch_line(line) - - def pause(self): - # Pauses for one iteration - self.paused = True - - def need_pause(self): - p = self.paused - self.paused = False - return p - - # Main iteration - def iterate(self): - # Main loop that decides what to do - if not self.need_pause(): - if not self.next_line(): - self.done() - return - - if not self.current_line.strip(): - self.found_empty() - return - if self.is_start: - m = self._sequence.match(self.current_line) - if m: - self.found_sequence() - return - m = self._header.match(self.current_line) - if m: - self.found_header() - return - - # We have text (presuming) - self.found_text() - - @before('found_sequence') - def validate_unit(self): - previous_seq = self.temp['sequence'] if self.temp else 0 - - sequence = int(self.current_line.strip()) - if sequence - previous_seq != 1: - self.pause() - self.current_line = str(previous_seq + 1) - self.parser.add_warning(self.current_line_num + 1, 1, self.fetch_line(self.current_line_num), "Sequence number out of sync") - raise self.Skip - - if self.current_state == self.unit_text: - # We need to remove last empty line - del self.temp['data']['lines'][-1] - - @after('found_sequence') - def create_unit(self): - self._parsed = self.temp - self.temp = { - 'sequence': int(self.current_line.strip()), - 'data': { - 'lines': [] - }, - } - - @before('found_header') - def validate_header(self): - if self.is_unit_text: - self.parser.add_warning(self.current_line_num + 1, 1, self.current_line, "Duplicated time information, ignoring.") - raise self.Skip - if self.is_start: - self.fix_sequence_skip() - self.parser.add_warning(self.current_line_num + 1, 1, self.current_line, "New unit starts without a sequence.") - - if '.' in self.current_line: - # Stay on same line - self.pause() - original = self.fetch_line(self.current_line_num) - col = self.current_line.index('.') - self.current_line = self.current_line.replace('.', ',', 1) - self.parser.add_warning(self.current_line_num + 1, col + 1, original, 'Used dot as decimal separator instead of comma.') - raise self.Skip - # Re-check header - m = self._header.match(self.current_line) - if m.group(2): - original = self.fetch_line(self.current_line_num) - # Found garbage - column = self.current_line.index(m.group(2)) + 1 - self.current_line = m.group(1) - # Re-try - self.pause() - self.parser.add_warning(self.current_line_num + 1, column, original, 'Header has unrecognized content at the end.') - raise self.Skip - - # Check it - start, end = self.current_line.split('-->') - start = self._time.match(start.strip()) - end = self._time.match(end.strip()) - - if not start or not end: - self.parser.add_error(self.current_line_num + 1, 1, self.fetch_line(self.current_line_num), "Could not parse timings.") - raise self.Skip - - return True - - @after('found_header') - def parse_time(self): - # It is safe to do it now - start, end = self.current_line.split('-->') - start = self._time.match(start.strip()) - end = self._time.match(end.strip()) - - start, end = start.group(0).split(':'), end.group(0).split(':') - - convert = lambda x: int(x[0]) * 3600 + int(x[1]) * 60 + float(x[2].replace(',', '.')) - self.temp['data'].update(dict( - start = convert(start), - end = convert(end) - )) - - def fix_sequence_skip(self): - self._parsed = self.temp - self.temp = { - 'sequence': (self.temp['sequence'] if self.temp else 0) + 1, - 'data': { - 'lines': [] - }, - } - - @before('found_text') - def validate_text(self): - if self.is_start: - if self.temp: - # Add empty line - self.temp['data']['lines'] += [u''] - else: - self.parser.add_warning(self.current_line_num + 1, 1, self.current_line, "Junk before first unit.") - raise self.Skip - - @after('found_text') - def insert_text(self): - # Check for tagged header inside text - tagged = self._tagged_header.match(self.current_line) - if tagged: - # Remove it - self.current_line = self.current_line[len(tagged.group(0)):] - tagged = tagged.group(1) - # Parse it further - pos = self._tag_position.search(tagged) - if pos: - self.temp['data']['position'] = { - 'x': int(pos.group(1)), - 'y': int(pos.group(2)) + name = 'SubRip State machine' + + class Skip(Exception): + pass + + # Let us define some states + start = State(initial=True) + unit = State() + unit_text = State() + finished = State() + + # And events + found_sequence = Event(from_states=[start], to_state=unit) + found_header = Event( + from_states=[unit, unit_text, start], to_state=unit_text) + found_text = Event(from_states=[unit_text, start], to_state=unit_text) + found_empty = Event(from_states=[unit_text, start, unit], to_state=start) + done = Event(from_states=[unit, unit_text, start], to_state=finished) + + # Regular expressions + # Components + _time = re.compile(r'(?:\d{1,2}:){2}\d{1,2},\d{1,3}') + + # Parts of unit + _sequence = re.compile(r'^\s*\d+\s*$') + _header = re.compile(r'^\s*([0-9:,.]+\s*-->\s*[0-9:,.]+)\s*(.*)$') + _tagged_header = re.compile(r'^\{([^}]*)\}') + + # Tagged properties + _tag_position = re.compile(r'\s*\\pos\(\s*(\d+)\s*,\s*(\d+)\s*\)\s*') + + def __init__(self, parser): + self.parser = parser + self._parsed = None + self.temp = None + self.paused = False + + @property + def read_lines(self): + return self.parser._read_lines + + @property + def current_line_num(self): + return self.parser._current_line_num + + @property + def current_line(self): + return self.parser._current_line + + @current_line.setter + def current_line(self, value): + self.parser._current_line = value + + def next_line(self): + return self.parser._next_line() + + def fetch_line(self, line): + return self.parser._fetch_line(line) + + def pause(self): + # Pauses for one iteration + self.paused = True + + def need_pause(self): + p = self.paused + self.paused = False + return p + + # Main iteration + def iterate(self): + # Main loop that decides what to do + if not self.need_pause(): + if not self.next_line(): + self.done() + return + + if not self.current_line.strip(): + self.found_empty() + return + if self.is_start: + m = self._sequence.match(self.current_line) + if m: + self.found_sequence() + return + m = self._header.match(self.current_line) + if m: + self.found_header() + return + + # We have text (presuming) + self.found_text() + + @before('found_sequence') + def validate_unit(self): + previous_seq = self.temp['sequence'] if self.temp else 0 + + sequence = int(self.current_line.strip()) + if sequence - previous_seq != 1: + self.pause() + self.current_line = str(previous_seq + 1) + self.parser.add_warning(self.current_line_num + 1, 1, + self.fetch_line( + self.current_line_num), + "Sequence number out of sync") + raise self.Skip + + if self.current_state == self.unit_text: + # We need to remove last empty line + del self.temp['data']['lines'][-1] + + @after('found_sequence') + def create_unit(self): + self._parsed = self.temp + self.temp = { + 'sequence': int(self.current_line.strip()), + 'data': { + 'lines': [] + }, } - tagged = tagged.replace(pos.group(0), '', 1) - - self.temp['data']['lines'] += [i.rstrip() for i in self.current_line.split('|')] - - # Unknown TAG headers - if tagged: - self.parser.add_warning(self.current_line_num + 1, 1, self.fetch_line(self.current_line_num), 'Tagged header not fully parsed.') - raise self.Skip - - @before('found_empty') - def validate_empty(self): - if self.current_state == self.start: - if self.temp: - # Add empty line to text (since previous line was a text) - self.temp['data']['lines'] += [u''] - else: - self.parser.add_warning(self.current_line_num + 1, 1, self.fetch_line(self.current_line_num), "Have empty line before first unit.") - raise self.Skip - elif self.is_unit: - self.parser.add_warning(self.current_line_num + 1, 1, self.fetch_line(self.current_line_num), "Have empty line between sequence number and timings.") - raise self.Skip - - @after('found_empty') - def insert_empty(self): - pass - - @before('done') - def final_unit(self): - self._missing_line = self.current_state != self.start - - @after('done') - def final_unit(self): - self._parsed = self.temp - self.temp = None - if self._missing_line: - original = self.fetch_line(self.current_line_num) - self.parser.add_warning(self.current_line_num + 1, len(self.current_line), original, 'Missing empty line after unit.') - raise self.Skip - - def parsed(self): - if not self._parsed: - return None - parsed = self._parsed - self._parsed = None - return parsed -class SubRipParser(Parser): - """Parser for SubRip. - """ - FORMAT = 'SubRip' - FORMAT_RE = re.compile(r'^(?:[^:]+:){2}[^- ]+\s+-->\s+(?:[^:]+:){2}.*$') - - @classmethod - def _can_parse(cls, data): - # Go through first few lines - can = False - for i in range(0, 10): - line = data.readline() - if isinstance(line, bytes): - line = line.decode('latin').replace('\x00', '') - - can = bool(cls.FORMAT_RE.search(line)) - if can: - break - data.seek(0) - return can - - def _parse(self, **kwargs): - machine = SubRipStateMachine(self) - - # We have a state machine. Let us start. - while True: - try: - if machine.current_state != machine.finished: - machine.iterate() - parsed = machine.parsed() - if parsed: - yield parsed - if machine.current_state == machine.finished: - break - except SubRipStateMachine.Skip: - # Just skip + @before('found_header') + def validate_header(self): + if self.is_unit_text: + self.parser.add_warning( + self.current_line_num + 1, 1, self.current_line, + "Duplicated time information, ignoring.") + raise self.Skip + if self.is_start: + self.fix_sequence_skip() + self.parser.add_warning( + self.current_line_num + 1, 1, self.current_line, + "New unit starts without a sequence.") + + if '.' in self.current_line: + # Stay on same line + self.pause() + original = self.fetch_line(self.current_line_num) + col = self.current_line.index('.') + self.current_line = self.current_line.replace('.', ',', 1) + self.parser.add_warning( + self.current_line_num + 1, col + 1, original, + 'Used dot as decimal separator instead of comma.') + raise self.Skip + # Re-check header + m = self._header.match(self.current_line) + if m.group(2): + original = self.fetch_line(self.current_line_num) + # Found garbage + column = self.current_line.index(m.group(2)) + 1 + self.current_line = m.group(1) + # Re-try + self.pause() + self.parser.add_warning( + self.current_line_num + 1, column, + original, 'Header has unrecognized content at the end.') + raise self.Skip + + # Check it + start, end = self.current_line.split('-->') + start = self._time.match(start.strip()) + end = self._time.match(end.strip()) + + if not start or not end: + self.parser.add_error( + self.current_line_num + 1, 1, self.fetch_line( + self.current_line_num), "Could not parse timings.") + raise self.Skip + + return True + + @after('found_header') + def parse_time(self): + # It is safe to do it now + start, end = self.current_line.split('-->') + start = self._time.match(start.strip()) + end = self._time.match(end.strip()) + + start, end = start.group(0).split(':'), end.group(0).split(':') + + def convert(x): return int(x[0]) * 3600 + \ + int(x[1]) * 60 + float(x[2].replace(',', '.')) + self.temp['data'].update(dict( + start=convert(start), + end=convert(end) + )) + + def fix_sequence_skip(self): + self._parsed = self.temp + self.temp = { + 'sequence': (self.temp['sequence'] if self.temp else 0) + 1, + 'data': { + 'lines': [] + }, + } + + @before('found_text') + def validate_text(self): + if self.is_start: + if self.temp: + # Add empty line + self.temp['data']['lines'] += [u''] + else: + self.parser.add_warning( + self.current_line_num + 1, 1, self.current_line, + "Junk before first unit.") + raise self.Skip + + @after('found_text') + def insert_text(self): + # Check for tagged header inside text + tagged = self._tagged_header.match(self.current_line) + if tagged: + # Remove it + self.current_line = self.current_line[len(tagged.group(0)):] + tagged = tagged.group(1) + # Parse it further + pos = self._tag_position.search(tagged) + if pos: + self.temp['data']['position'] = { + 'x': int(pos.group(1)), + 'y': int(pos.group(2)) + } + tagged = tagged.replace(pos.group(0), '', 1) + + self.temp['data']['lines'] += [i.rstrip() + for i in self.current_line.split('|')] + + # Unknown TAG headers + if tagged: + self.parser.add_warning( + self.current_line_num + 1, 1, self.fetch_line( + self.current_line_num), 'Tagged header not fully parsed.') + raise self.Skip + + @before('found_empty') + def validate_empty(self): + if self.current_state == self.start: + if self.temp: + # Add empty line to text (since previous line was a text) + self.temp['data']['lines'] += [u''] + else: + self.parser.add_warning( + self.current_line_num + 1, 1, self.fetch_line( + self.current_line_num), + "Have empty line before first unit.") + raise self.Skip + elif self.is_unit: + self.parser.add_warning( + self.current_line_num + 1, 1, self.fetch_line( + self.current_line_num), + "Have empty line between sequence number and timings.") + raise self.Skip + + @after('found_empty') + def insert_empty(self): pass - except InvalidStateTransition: - self.add_error(machine.current_line_num + 1, 1, machine.current_line, "Unparsable line") + + @before('done') + def final_unit(self): + self._missing_line = self.current_state != self.start + + @after('done') + def final_unit(self): + self._parsed = self.temp + self.temp = None + if self._missing_line: + original = self.fetch_line(self.current_line_num) + self.parser.add_warning(self.current_line_num + 1, len( + self.current_line), original, 'Missing empty line after unit.') + raise self.Skip + + def parsed(self): + if not self._parsed: + return None + parsed = self._parsed + self._parsed = None + return parsed + + +class SubRipParser(Parser): + """Parser for SubRip. + """ + FORMAT = 'SubRip' + FORMAT_RE = re.compile(r'^(?:[^:]+:){2}[^- ]+\s+-->\s+(?:[^:]+:){2}.*$') + + @classmethod + def _can_parse(cls, data): + # Go through first few lines + can = False + for i in range(0, 10): + line = data.readline() + if isinstance(line, bytes): + line = line.decode('latin').replace('\x00', '') + + can = bool(cls.FORMAT_RE.search(line)) + if can: + break + data.seek(0) + return can + + def _parse(self, **kwargs): + machine = SubRipStateMachine(self) + + # We have a state machine. Let us start. + while True: + try: + if machine.current_state != machine.finished: + machine.iterate() + parsed = machine.parsed() + if parsed: + yield parsed + if machine.current_state == machine.finished: + break + except SubRipStateMachine.Skip: + # Just skip + pass + except InvalidStateTransition: + self.add_error(machine.current_line_num + 1, 1, + machine.current_line, "Unparsable line") diff --git a/pysubtools/subtitle.py b/pysubtools/subtitle.py index 93dc2b0..e1b2517 100644 --- a/pysubtools/subtitle.py +++ b/pysubtools/subtitle.py @@ -8,625 +8,677 @@ import yaml from .utils import UnicodeMixin -def prepare_reader(f): - try: - is_str = isinstance(f, basestring) - except NameError: - # Python3 compat - is_str = isinstance(f, str) - - if is_str: - f = io.BufferedReader(io.open(f, 'rb')) - - try: - if isinstance(f, file): - f = io.BufferedReader(io.FileIO(f.fileno(), closefd = False)) - except NameError: - # No need in Python3 - pass - - if not isinstance(f, io.BufferedIOBase): - raise TypeError("Load method accepts filename or file object.") - return io.TextIOWrapper(f) - -class HumanTime(yaml.YAMLObject, UnicodeMixin): - yaml_loader = yaml.SafeLoader - yaml_dumper = yaml.SafeDumper - - yaml_tag = '!human_time' - - def __init__(self, hours = 0, minutes = 0, seconds = 0.): - self.hours = int(hours) - self.minutes = int(minutes) - self.seconds = float(seconds) - - @classmethod - def from_yaml(cls, loader, node): - value = loader.construct_scalar(node) - return float(cls.from_string(value)) - - @classmethod - def to_yaml(cls, dumper, data): - if isinstance(data, (int, float)): - data = cls.from_seconds(data) - - try: - return dumper.represent_scalar('!human_time', unicode(data)) - except NameError: - return dumper.represent_scalar('!human_time', str(data)) - - @classmethod - def from_seconds(cls, time): - obj = cls() - time = float(time) - obj.hours = int(time // 3600) - time -= obj.hours * 3600 - obj.minutes = int(time // 60) - time -= obj.minutes * 60 - obj.seconds = time - return obj - - @classmethod - def from_string(cls, time): - obj = cls() +def prepare_reader(f): try: - is_str = isinstance(time, basestring) + is_str = isinstance(f, basestring) except NameError: - is_str = isinstance(time, str) + # Python3 compat + is_str = isinstance(f, str) if is_str: - time = time.split(':') - obj.hours = int(time[0]) - obj.minutes = int(time[1]) - obj.seconds = float(time[2]) - else: - raise TypeError("Unknown time format.") - - return obj - - def __unicode__(self): - return '{:02d}:{:02d}:{:06.3f}'.format(self.hours, - self.minutes, - self.seconds) - - def __float__(self): - return self.to_seconds() - - def __int__(self): - return int(self.to_seconds()) - - def to_seconds(self): - return self.hours * 3600 + self.minutes * 60 + self.seconds - -class Frame(yaml.YAMLObject, UnicodeMixin): - yaml_loader = yaml.SafeLoader - yaml_dumper = yaml.SafeDumper - - yaml_tag = '!frame' - - def __init__(self, frame): - self._frame = frame - - @classmethod - def from_yaml(cls, loader, node): - value = loader.construct_scalar(node) - return cls(int(value)) - - @classmethod - def to_yaml(cls, dumper, data): - if isinstance(data, int): - data = cls(data) + f = io.BufferedReader(io.open(f, 'rb')) try: - return dumper.represent_scalar('!frame', unicode(data._frame)) + if isinstance(f, file): + f = io.BufferedReader(io.FileIO(f.fileno(), closefd=False)) except NameError: - # Python3 compat - return dumper.represent_scalar('!frame', str(data._frame)) - - def __int__(self): - raise ValueError("Cannot convert frame to time without specified FPS.") - - def __float__(self): - raise ValueError("Cannot convert frame to time without specified FPS.") + # No need in Python3 + pass - def __eq__(self, value): - return self._frame == value + if not isinstance(f, io.BufferedIOBase): + raise TypeError("Load method accepts filename or file object.") + return io.TextIOWrapper(f) - def __hash__(self): - return hash(self._frame) - def __gt__(self, value): - return self._frame > value - - def __lt__(self, value): - return self._frame < value - - def __repr__(self): - return 'Frame({})'.format(self._frame) +class HumanTime(yaml.YAMLObject, UnicodeMixin): + yaml_loader = yaml.SafeLoader + yaml_dumper = yaml.SafeDumper + + yaml_tag = '!human_time' + + def __init__(self, hours=0, minutes=0, seconds=0.): + self.hours = int(hours) + self.minutes = int(minutes) + self.seconds = float(seconds) + + @classmethod + def from_yaml(cls, loader, node): + value = loader.construct_scalar(node) + return float(cls.from_string(value)) + + @classmethod + def to_yaml(cls, dumper, data): + if isinstance(data, (int, float)): + data = cls.from_seconds(data) + + try: + return dumper.represent_scalar('!human_time', unicode(data)) + except NameError: + return dumper.represent_scalar('!human_time', str(data)) + + @classmethod + def from_seconds(cls, time): + obj = cls() + time = float(time) + obj.hours = int(time // 3600) + time -= obj.hours * 3600 + obj.minutes = int(time // 60) + time -= obj.minutes * 60 + obj.seconds = time + return obj + + @classmethod + def from_string(cls, time): + obj = cls() + + try: + is_str = isinstance(time, basestring) + except NameError: + is_str = isinstance(time, str) + + if is_str: + time = time.split(':') + obj.hours = int(time[0]) + obj.minutes = int(time[1]) + obj.seconds = float(time[2]) + else: + raise TypeError("Unknown time format.") -class SubtitleLine(UnicodeMixin, object): - """ - Class representing a line inside SubtitleUnit. It acts as an ordinary - unicode objects, but has an ability to store additional metadata. - """ - # Unhashable - __hash__ = None - - def __init__(self, text, **kwargs): - self.text = text - # Update with additional metadata - self.__dict__.update(kwargs) - - def export(self): - """Returns line in format for export.""" - output = dict(self.__dict__) - text = output.pop('text', '') - if not output: - output = text - else: - output['text'] = text - return output - - @classmethod - def from_export(cls, obj): - return cls(**obj) - - def __unicode__(self): - return self.text - - if sys.version_info[0] >= 3: # Python 3 - def __repr__(self): - return "SubtitleLine({}{})".format( - self.text, - (', ' + ', '.join([' = '.join([k, str(v)]) for k, v in self.meta.items()])) if self.meta else '' - ) - else: # Python 2 - def __repr__(self): - return "SubtitleLine({}{})".format( - self.text, - (', ' + ', '.join([' = '.join([k, unicode(v)]) for k, v in self.meta.items()])) if self.meta else '' - ).encode('utf8') - - def __eq__(self, other): - if not isinstance(other, SubtitleLine): - return False - return self.__dict__ == other.__dict__ - - def __len__(self): - return len(self.text) - - @property - def meta(self): - d = dict(self.__dict__) - # Remove important part of metadata - d.pop('text') - return d + return obj -class SubtitleLines(list): - """Modified list class for special tratment of lines.""" - __slots__ = () + def __unicode__(self): + return '{:02d}:{:02d}:{:06.3f}'.format(self.hours, + self.minutes, + self.seconds) - def __new__(cls, l = []): - obj = super(SubtitleLines, cls).__new__(cls) - for i in l: - obj.append(i) - return obj + def __float__(self): + return self.to_seconds() - @staticmethod - def _validate(value): - try: - if isinstance(value, unicode): - value = SubtitleLine(value) - except NameError: - # Python3 compat - if isinstance(value, str): - value = SubtitleLine(value) + def __int__(self): + return int(self.to_seconds()) - if not isinstance(value, SubtitleLine): - raise TypeError("Subtitle line needs to be unicode instead of '{}'".format(type(value))) - return value + def to_seconds(self): + return self.hours * 3600 + self.minutes * 60 + self.seconds - def append(self, value): - value = self._validate(value) - super(SubtitleLines, self).append(value) - def __setitem__(self, index, value): - value = self._validate(value) - super(SubtitleLines, self).__setattr__(index, value) +class Frame(yaml.YAMLObject, UnicodeMixin): + yaml_loader = yaml.SafeLoader + yaml_dumper = yaml.SafeDumper -class SubtitleUnit(object): - """Class for holding time and text data of a subtitle unit.""" - # Unhashable - __hash__ = None + yaml_tag = '!frame' - def __init__(self, start, end, lines = None, **meta): - self.start = float(start) if not isinstance(start, Frame) else start - self.end = float(end) if not isinstance(end, Frame) else end - self._lines = SubtitleLines() + def __init__(self, frame): + self._frame = frame - self.__dict__.update(meta) + @classmethod + def from_yaml(cls, loader, node): + value = loader.construct_scalar(node) + return cls(int(value)) - if lines is not None: - if not isinstance(lines, (list, set)): - lines = list(lines) + @classmethod + def to_yaml(cls, dumper, data): + if isinstance(data, int): + data = cls(data) - for line in lines: - self._lines.append(line) + try: + return dumper.represent_scalar('!frame', unicode(data._frame)) + except NameError: + # Python3 compat + return dumper.represent_scalar('!frame', str(data._frame)) - def distance(self, other): - """Calculates signed distance with other subtitle unit.""" - if not isinstance(other, SubtitleUnit): - raise TypeError("Can calculate distance only with SubtitleUnit and not '{}'".format(type(other))) + def __int__(self): + raise ValueError("Cannot convert frame to time without specified FPS.") - return other.start - self.start + def __float__(self): + raise ValueError("Cannot convert frame to time without specified FPS.") - def __iter__(self): - return self._lines.__iter__() + def __eq__(self, value): + return self._frame == value - def __setitem__(self, index, value): - self._lines[index] = value + def __hash__(self): + return hash(self._frame) - def __getitem__(self, index): - return self._lines[index] + def __gt__(self, value): + return self._frame > value - def append(self, value): - self._lines.append(value) + def __lt__(self, value): + return self._frame < value - @property - def lines(self): - try: - return map(unicode, self._lines) - except NameError: - # Python3 compat - return map(str, self._lines) - - @property - def duration(self): - """Returns duration of subtitle unit in seconds.""" - return self.end - self.start - - @property - def length(self): - """Returns length of the SubtitleUnit (in characters).""" - return sum((len(i) for i in self._lines)) - - def move(self, distance): - """Moves subtitle unit by 'distance' seconds.""" - if not isinstance(distance, (int, long, float)): - raise TypeError("Need type of int, long or float instead of '{}'".format(type(distance))) - self.start += distance - self.end += distance - - def get_moved(self, distance): - """Same as SubtitleUnit.move, just returns a copy while itself is unchanged.""" - clone = SubtitleUnit(**self.__dict__) - clone.move(distance) - return clone - - def stretch(self, factor): - """Stretches the unit for 'factor'. - """ - if not isinstance(factor, (int, long, float)): - raise TypeError("Need type of int, long or float instead of '{}'".format(type(factor))) - self.start *= factor - self.end *= factor - - def get_stretched(self, factor): - """Same as SubtitleUnit.stretch, just returns a copy while itself is unchanged.""" - clone = SubtitleUnit(**self.__dict__) - clone.stretch(factor) - return clone - - @property - def meta(self): - d = dict(self.__dict__) - # Remove important part of metadata and lines - d.pop('start') - d.pop('end') - d.pop('_lines') - return d - - def __sub__(self, other): - """See SubtitleUnit.get_moved.""" - if not isinstance(other, (int, long, float)): - raise TypeError("Need type of int, long or float instead of '{}'".format(type(other))) - return self.get_moved(-1 * other) - - def __add__(self, other): - """See SubtitleUnit.get_moved.""" - return self.get_moved(other) - - def __isub__(self, other): - """Same as SubtitleUnit.move.""" - if not isinstance(other, (int, long, float)): - raise TypeError("Need type of int, long or float instead of '{}'".format(type(other))) - self.move(-1 * other) - - def __iadd__(self, other): - """Same as SubtitleUnit.move""" - self.move(other) - - def __mul__(self, other): - """See SubtitleUnit.get_stretched.""" - return self.get_stretched(other) - - def __imul__(self, other): - """See SubtitleUnit.stretch.""" - self.stretch(other) - - def __eq__(self, other): - if not isinstance(other, SubtitleUnit): - raise TypeError("Can compare only with other SubtitleUnit, provided with '{}'".format(type(other))) - - return self.__dict__ == other.__dict__ - - def __len__(self): - return len(self._lines) - - if sys.version_info[0] >= 3: # Python 3 def __repr__(self): - d = dict(self.__dict__) - # Get known attributes - start = d.pop('start') - end = d.pop('end') - lines = d.pop('_lines') - return "SubtitleUnit({}, {}, {}, {})".format(start, end, lines, d) - else: # Python2 - def __repr__(self): - d = dict(self.__dict__) - # Get known attributes - start = d.pop('start') - end = d.pop('end') - lines = d.pop('_lines') - return b"SubtitleUnit({}, {}, {}, {})".format(start, end, repr(lines), d) - - def to_dict(self, human_time = True): - """Returns subtitle unit as a dict (with some human readable things).""" - output = {} - output.update(self.__dict__) - # Overide custom attributes - output['start'] = HumanTime.from_seconds(self.start) if human_time and not isinstance(self.start, Frame) else self.start - output['end'] = HumanTime.from_seconds(self.end) if human_time and not isinstance(self.end, Frame) else self.end - # And lines - output['lines'] = [i.export() for i in self._lines] - - # Remove lines - output.pop('_lines') - return output - - @classmethod - def from_dict(cls, input): - """Creates SubtitleUnit from specified 'input' dict.""" - input = dict(input) - lines = input.pop('lines', []) - try: - lines = [ - i if isinstance(i, unicode) else i.decode('utf-8') if isinstance(i, bytes) else SubtitleLine.from_export(i) for i in lines - ] - except NameError: - # Python3 compat - lines = [ - i if isinstance(i, str) else i.decode('utf-8') if isinstance(i, bytes) else SubtitleLine.from_export(i) for i in lines - ] + return 'Frame({})'.format(self._frame) - return cls( - lines = SubtitleLines(lines), - **input - ) -class Subtitle(object): - """ - The whole subtitle. - - To load a subtitle in non-native format, use parsers.Parser.from_data. - """ - # Unhashable - __hash__ = None - - def __init__(self, units = [], **meta): - self._units = [] - self.__dict__.update(meta) - for unit in units: - self.append(unit) - - def add_unit(self, unit): - """Adds a new 'unit' and sorts the units. If adding many units, use append instead.""" - self.append(unit) - self.order() - - def order(self): - """Maintains order of subtitles.""" - self._units.sort(key = lambda x: x.start) - - def check_overlaps(self): - """Checks for overlaps and returns them in list.""" - overlaps = [] - for current_unit in self._units[:-1]: - i = self._units.index(current_unit) - for next_unit in self._units[i + 1:]: - if current_unit.end > next_unit.start: - overlaps.append((i, self._units.index(next_unit))) - else: - break - - return overlaps - - def remove(self, unit): - """Proxy for internal storage.""" - if not isinstance(unit, SubtitleUnit): - raise TypeError("Can remove only SubtitleUnit, you passed '{}'".format(type(unit))) - - self._units.remove(unit) - - def index(self, unit): - """Proxy for internal storage.""" - if not isinstance(unit, SubtitleUnit): - raise TypeError("Can index only SubtitleUnit, you passed '{}'".format(type(unit))) - - return self._units.index(unit) - - def insert(self, index, unit): - """Proxy for internal storage.""" - if not isinstance(unit, SubtitleUnit): - raise TypeError("Can add only SubtitleUnit, you passed '{}'".format(type(unit))) - - return self._units.insert(index, unit) - - def append(self, unit): - """Proxy for internal storage.""" - if not isinstance(unit, SubtitleUnit): - raise TypeError("Can add only SubtitleUnit, you passed '{}'".format(type(unit))) - - return self._units.append(unit) - - def __getitem__(self, index): - """Proxy for internal storage.""" - return self._units[index] - - def __setitem__(self, index, unit): - """Proxy for internal storage.""" - if not isinstance(unit, SubtitleUnit): - raise TypeError("Can add only SubtitleUnit, you passed '{}'".format(type(unit))) - - self._units[index] = unit - - def __delitem__(self, index): - """Proxy for internal storage.""" - del self._units[index] - - def __len__(self): - """Proxy for internal storage.""" - return len(self._units) - - def __iter__(self): - """Proxy for internal storage.""" - return iter(self._units) - - def __reversed__(self): - """Proxy for internal storage.""" - return reversed(self._units) - - def __eq__(self, other): - """Proxy for internal storage.""" - if self.__dict__ != other.__dict__: - print(self.__dict__, other.__dict__) - return self.__dict__ == other.__dict__ - - def __contains__(self, unit): - """Proxy for internal storage.""" - # TODO make possible to test with string? - return unit in self._units - - @property - def meta(self): - # Remove non-metadata from dict - d = dict(self.__dict__) - d.pop('_units', None) - return d - - @classmethod - def from_dict(cls, data): - """Creates Subtitle object from dict, parsed from YAML.""" - if data is None: - data = {} - data = dict(data) - data['units'] = [SubtitleUnit.from_dict(i) for i in data.get('units') or []] - return cls(**data) - - @classmethod - def from_file(cls, input): - """ - Loads a subtitle from file in YAML format. If have multiple documents, - set 'multi' to True. Do note, when multi is set to True, this method - returns a generator object. - """ - input = prepare_reader(input) - - # Read - obj = cls.from_yaml(input) - - # Detach wrapper - input.detach() - - # Done - if obj: - return obj - - @classmethod - def from_file_multi(cls, input): - """Loads multiple subtitles from file 'input'. It returns a generator object.""" - input = prepare_reader(input) - - for i in cls.from_multi_yaml(input): - # Needed to prevent input from closing - yield i - - # Detach wrapper - input.detach() - - # Done - - @classmethod - def from_yaml(cls, input): - """Loads a subtitle from YAML format, uses safe loader.""" - # Construct a python dict - data = yaml.safe_load(input) - - # Return our subtitle - return cls.from_dict(data) - - @classmethod - def from_multi_yaml(cls, input): - """Loads multiple subtitles from YAML format, uses safe loader.""" - for data in yaml.safe_load_all(input): - yield cls.from_dict(data) - - def dump(self, output = None, human_time = True, allow_unicode = True): - """Dumps this subtitle in YAML format with safe dumper.""" - # Construct a python dict - obj = dict(self.__dict__) - obj['units'] = [i.to_dict(human_time) for i in obj.pop('_units')] - # Dump it - return yaml.safe_dump(obj, output, encoding = 'utf-8', - allow_unicode = allow_unicode, - indent = 2, - explicit_start = True, - default_flow_style = False) - - def save(self, output, human_time = True, close = True, allow_unicode = True): +class SubtitleLine(UnicodeMixin, object): """ - Saves the subtitle in native (YAML) format. If 'output' is file object, it will - be closed if 'close' set to True after save is done. + Class representing a line inside SubtitleUnit. It acts as an ordinary + unicode objects, but has an ability to store additional metadata. """ - try: - is_str = isinstance(output, basestring) - except NameError: - # Python3 compat - is_str = isinstance(output, str) + # Unhashable + __hash__ = None + + def __init__(self, text, **kwargs): + self.text = text + # Update with additional metadata + self.__dict__.update(kwargs) + + def export(self): + """Returns line in format for export.""" + output = dict(self.__dict__) + text = output.pop('text', '') + if not output: + output = text + else: + output['text'] = text + return output + + @classmethod + def from_export(cls, obj): + return cls(**obj) + + def __unicode__(self): + return self.text + + if sys.version_info[0] >= 3: # Python 3 + def __repr__(self): + return "SubtitleLine({}{})".format( + self.text, + (', ' + ', '.join([' = '.join([k, str(v)]) + for k, v in self.meta.items()])) + if self.meta else '' + ) + else: # Python 2 + def __repr__(self): + return "SubtitleLine({}{})".format( + self.text, + (', ' + ', '.join([' = '.join([k, unicode(v)]) + for k, v in self.meta.items()])) + if self.meta else '' + ).encode('utf8') + + def __eq__(self, other): + if not isinstance(other, SubtitleLine): + return False + return self.__dict__ == other.__dict__ + + def __len__(self): + return len(self.text) + + @property + def meta(self): + d = dict(self.__dict__) + # Remove important part of metadata + d.pop('text') + return d - if is_str: - try: - output = io.BufferedWriter(io.open(output, 'wb')) - except IOError: - # TODO Custom exception - raise - try: - if isinstance(output, file): - output = io.BufferedWriter(io.FileIO(output.fileno()), closefd = close) - except NameError: - # No need in Python3 - pass - if not isinstance(output, io.BufferedIOBase): - raise TypeError("Save method accepts filename or file object.") - # Put a text wrapper around it - output = io.TextIOWrapper(output, encoding = 'utf-8') +class SubtitleLines(list): + """Modified list class for special tratment of lines.""" + __slots__ = () + + def __new__(cls, ls=[]): + obj = super(SubtitleLines, cls).__new__(cls) + for i in ls: + obj.append(i) + return obj + + @staticmethod + def _validate(value): + try: + if isinstance(value, unicode): + value = SubtitleLine(value) + except NameError: + # Python3 compat + if isinstance(value, str): + value = SubtitleLine(value) + + if not isinstance(value, SubtitleLine): + raise TypeError( + "Subtitle line needs to be unicode instead of '{}'". + format(type(value))) + return value + + def append(self, value): + value = self._validate(value) + super(SubtitleLines, self).append(value) + + def __setitem__(self, index, value): + value = self._validate(value) + super(SubtitleLines, self).__setattr__(index, value) - self.dump(output, human_time = human_time, - allow_unicode = allow_unicode) - if close: - output.close() - else: - output.detach() +class SubtitleUnit(object): + """Class for holding time and text data of a subtitle unit.""" + # Unhashable + __hash__ = None + + def __init__(self, start, end, lines=None, **meta): + self.start = float(start) if not isinstance(start, Frame) else start + self.end = float(end) if not isinstance(end, Frame) else end + self._lines = SubtitleLines() + + self.__dict__.update(meta) + + if lines is not None: + if not isinstance(lines, (list, set)): + lines = list(lines) + + for line in lines: + self._lines.append(line) + + def distance(self, other): + """Calculates signed distance with other subtitle unit.""" + if not isinstance(other, SubtitleUnit): + raise TypeError( + "Can calculate distance only with SubtitleUnit and not '{}'". + format(type(other))) + + return other.start - self.start + + def __iter__(self): + return self._lines.__iter__() + + def __setitem__(self, index, value): + self._lines[index] = value + + def __getitem__(self, index): + return self._lines[index] + + def append(self, value): + self._lines.append(value) + + @property + def lines(self): + try: + return map(unicode, self._lines) + except NameError: + # Python3 compat + return map(str, self._lines) + + @property + def duration(self): + """Returns duration of subtitle unit in seconds.""" + return self.end - self.start + + @property + def length(self): + """Returns length of the SubtitleUnit (in characters).""" + return sum((len(i) for i in self._lines)) + + def move(self, distance): + """Moves subtitle unit by 'distance' seconds.""" + if not isinstance(distance, (int, long, float)): + raise TypeError( + "Need type of int, long or float instead of '{}'". + format(type(distance))) + self.start += distance + self.end += distance + + def get_moved(self, distance): + """Same as SubtitleUnit.move, just returns a copy while + itself is unchanged.""" + clone = SubtitleUnit(**self.__dict__) + clone.move(distance) + return clone + + def stretch(self, factor): + """Stretches the unit for 'factor'. + """ + if not isinstance(factor, (int, long, float)): + raise TypeError( + "Need type of int, long or float instead of '{}'". + format(type(factor))) + self.start *= factor + self.end *= factor + + def get_stretched(self, factor): + """Same as SubtitleUnit.stretch, just returns a copy while + itself is unchanged.""" + clone = SubtitleUnit(**self.__dict__) + clone.stretch(factor) + return clone + + @property + def meta(self): + d = dict(self.__dict__) + # Remove important part of metadata and lines + d.pop('start') + d.pop('end') + d.pop('_lines') + return d + + def __sub__(self, other): + """See SubtitleUnit.get_moved.""" + if not isinstance(other, (int, long, float)): + raise TypeError( + "Need type of int, long or float instead of '{}'". + format(type(other))) + return self.get_moved(-1 * other) + + def __add__(self, other): + """See SubtitleUnit.get_moved.""" + return self.get_moved(other) + + def __isub__(self, other): + """Same as SubtitleUnit.move.""" + if not isinstance(other, (int, long, float)): + raise TypeError( + "Need type of int, long or float instead of '{}'". + format(type(other))) + self.move(-1 * other) + + def __iadd__(self, other): + """Same as SubtitleUnit.move""" + self.move(other) + + def __mul__(self, other): + """See SubtitleUnit.get_stretched.""" + return self.get_stretched(other) + + def __imul__(self, other): + """See SubtitleUnit.stretch.""" + self.stretch(other) + + def __eq__(self, other): + if not isinstance(other, SubtitleUnit): + raise TypeError( + "Can compare only with other SubtitleUnit, provided with '{}'". + format(type(other))) + + return self.__dict__ == other.__dict__ + + def __len__(self): + return len(self._lines) + + if sys.version_info[0] >= 3: # Python 3 + def __repr__(self): + d = dict(self.__dict__) + # Get known attributes + start = d.pop('start') + end = d.pop('end') + lines = d.pop('_lines') + return "SubtitleUnit({}, {}, {}, {})".format(start, end, lines, d) + else: # Python2 + def __repr__(self): + d = dict(self.__dict__) + # Get known attributes + start = d.pop('start') + end = d.pop('end') + lines = d.pop('_lines') + return b"SubtitleUnit({}, {}, {}, {})".format(start, end, + repr(lines), d) + + def to_dict(self, human_time=True): + """Returns subtitle unit as a dict + (with some human readable things).""" + output = {} + output.update(self.__dict__) + # Overide custom attributes + output['start'] = HumanTime.from_seconds( + self.start) if (human_time and not + isinstance(self.start, Frame)) else self.start + output['end'] = HumanTime.from_seconds( + self.end) if (human_time and not + isinstance(self.end, Frame)) else self.end + # And lines + output['lines'] = [i.export() for i in self._lines] + + # Remove lines + output.pop('_lines') + return output + + @classmethod + def from_dict(cls, input): + """Creates SubtitleUnit from specified 'input' dict.""" + input = dict(input) + lines = input.pop('lines', []) + try: + lines = [ + i if isinstance(i, unicode) else i.decode('utf-8') + if isinstance(i, bytes) else SubtitleLine.from_export(i) + for i in lines + ] + except NameError: + # Python3 compat + lines = [ + i if isinstance(i, str) else i.decode('utf-8') + if isinstance(i, bytes) else SubtitleLine.from_export(i) + for i in lines + ] + + return cls( + lines=SubtitleLines(lines), + **input + ) + + +class Subtitle(object): + """ + The whole subtitle. + + To load a subtitle in non-native format, use parsers.Parser.from_data. + """ + # Unhashable + __hash__ = None + + def __init__(self, units=[], **meta): + self._units = [] + self.__dict__.update(meta) + for unit in units: + self.append(unit) + + def add_unit(self, unit): + """Adds a new 'unit' and sorts the units. If adding many units, + use append instead.""" + self.append(unit) + self.order() + + def order(self): + """Maintains order of subtitles.""" + self._units.sort(key=lambda x: x.start) + + def check_overlaps(self): + """Checks for overlaps and returns them in list.""" + overlaps = [] + for current_unit in self._units[:-1]: + i = self._units.index(current_unit) + for next_unit in self._units[i + 1:]: + if current_unit.end > next_unit.start: + overlaps.append((i, self._units.index(next_unit))) + else: + break + + return overlaps + + def remove(self, unit): + """Proxy for internal storage.""" + if not isinstance(unit, SubtitleUnit): + raise TypeError( + "Can remove only SubtitleUnit, you passed '{}'". + format(type(unit))) + + self._units.remove(unit) + + def index(self, unit): + """Proxy for internal storage.""" + if not isinstance(unit, SubtitleUnit): + raise TypeError( + "Can index only SubtitleUnit, you passed '{}'". + format(type(unit))) + + return self._units.index(unit) + + def insert(self, index, unit): + """Proxy for internal storage.""" + if not isinstance(unit, SubtitleUnit): + raise TypeError( + "Can add only SubtitleUnit, you passed '{}'". + format(type(unit))) + + return self._units.insert(index, unit) + + def append(self, unit): + """Proxy for internal storage.""" + if not isinstance(unit, SubtitleUnit): + raise TypeError( + "Can add only SubtitleUnit, you passed '{}'". + format(type(unit))) + + return self._units.append(unit) + + def __getitem__(self, index): + """Proxy for internal storage.""" + return self._units[index] + + def __setitem__(self, index, unit): + """Proxy for internal storage.""" + if not isinstance(unit, SubtitleUnit): + raise TypeError( + "Can add only SubtitleUnit, you passed '{}'". + format(type(unit))) + + self._units[index] = unit + + def __delitem__(self, index): + """Proxy for internal storage.""" + del self._units[index] + + def __len__(self): + """Proxy for internal storage.""" + return len(self._units) + + def __iter__(self): + """Proxy for internal storage.""" + return iter(self._units) + + def __reversed__(self): + """Proxy for internal storage.""" + return reversed(self._units) + + def __eq__(self, other): + """Proxy for internal storage.""" + if self.__dict__ != other.__dict__: + print(self.__dict__, other.__dict__) + return self.__dict__ == other.__dict__ + + def __contains__(self, unit): + """Proxy for internal storage.""" + # TODO make possible to test with string? + return unit in self._units + + @property + def meta(self): + # Remove non-metadata from dict + d = dict(self.__dict__) + d.pop('_units', None) + return d + + @classmethod + def from_dict(cls, data): + """Creates Subtitle object from dict, parsed from YAML.""" + if data is None: + data = {} + data = dict(data) + data['units'] = [SubtitleUnit.from_dict( + i) for i in data.get('units') or []] + return cls(**data) + + @classmethod + def from_file(cls, input): + """ + Loads a subtitle from file in YAML format. If have multiple documents, + set 'multi' to True. Do note, when multi is set to True, this method + returns a generator object. + """ + input = prepare_reader(input) + + # Read + obj = cls.from_yaml(input) + + # Detach wrapper + input.detach() + + # Done + if obj: + return obj + + @classmethod + def from_file_multi(cls, input): + """Loads multiple subtitles from file 'input'. It returns + a generator object.""" + input = prepare_reader(input) + + for i in cls.from_multi_yaml(input): + # Needed to prevent input from closing + yield i + + # Detach wrapper + input.detach() + + # Done + + @classmethod + def from_yaml(cls, input): + """Loads a subtitle from YAML format, uses safe loader.""" + # Construct a python dict + data = yaml.safe_load(input) + + # Return our subtitle + return cls.from_dict(data) + + @classmethod + def from_multi_yaml(cls, input): + """Loads multiple subtitles from YAML format, uses safe loader.""" + for data in yaml.safe_load_all(input): + yield cls.from_dict(data) + + def dump(self, output=None, human_time=True, allow_unicode=True): + """Dumps this subtitle in YAML format with safe dumper.""" + # Construct a python dict + obj = dict(self.__dict__) + obj['units'] = [i.to_dict(human_time) for i in obj.pop('_units')] + # Dump it + return yaml.safe_dump(obj, output, encoding='utf-8', + allow_unicode=allow_unicode, + indent=2, + explicit_start=True, + default_flow_style=False) + + def save(self, output, human_time=True, close=True, allow_unicode=True): + """ + Saves the subtitle in native (YAML) format. If 'output' is + file object, it will be closed if 'close' set to True after + save is done. + """ + try: + is_str = isinstance(output, basestring) + except NameError: + # Python3 compat + is_str = isinstance(output, str) + + if is_str: + try: + output = io.BufferedWriter(io.open(output, 'wb')) + except IOError: + # TODO Custom exception + raise + try: + if isinstance(output, file): + output = io.BufferedWriter( + io.FileIO(output.fileno()), closefd=close) + except NameError: + # No need in Python3 + pass + + if not isinstance(output, io.BufferedIOBase): + raise TypeError("Save method accepts filename or file object.") + # Put a text wrapper around it + output = io.TextIOWrapper(output, encoding='utf-8') + + self.dump(output, human_time=human_time, + allow_unicode=allow_unicode) + + if close: + output.close() + else: + output.detach() diff --git a/pysubtools/utils.py b/pysubtools/utils.py index d754e6d..4f2d906 100644 --- a/pysubtools/utils.py +++ b/pysubtools/utils.py @@ -6,21 +6,24 @@ import gzip import sys + class PatchedGzipFile(gzip.GzipFile): - """ - A patched gzip file to be able to use TextIOWrapper - around it. Not needed in Python 3.3+ - """ - def read1(self, n): - return self.read(n) + """ + A patched gzip file to be able to use TextIOWrapper + around it. Not needed in Python 3.3+ + """ + + def read1(self, n): + return self.read(n) + class UnicodeMixin(object): - """Mixin class to handle defining the proper __str__/__unicode__ - methods in Python 2 or 3.""" + """Mixin class to handle defining the proper __str__/__unicode__ + methods in Python 2 or 3.""" - if sys.version_info[0] >= 3: # Python 3 - def __str__(self): - return self.__unicode__() - else: # Python 2 - def __str__(self): - return self.__unicode__().encode('utf8') + if sys.version_info[0] >= 3: # Python 3 + def __str__(self): + return self.__unicode__() + else: # Python 2 + def __str__(self): + return self.__unicode__().encode('utf8') diff --git a/setup.py b/setup.py index 7527e5b..7c946b2 100644 --- a/setup.py +++ b/setup.py @@ -1,20 +1,21 @@ # -*- coding: utf8 -*- from setuptools import setup, find_packages -setup ( - name = "PySubTools", - version = "0.1.5", - packages = find_packages(), - test_suite = 'tests', +setup( + name="PySubTools", + version="0.1.5", + packages=find_packages(), + test_suite='tests', - install_requires = ['chardet', + install_requires=['chardet', 'state-machine', 'pyyaml'], - # metadata for upload to PyPI - author = "Gregor Kališnik, Unimatrix", - author_email = "gregor@kalisnik.si, info@unimatrix.si", - description = "A set of parsers and exports for subtitles in various formats", - license = "BSD", - keywords = "parsing exporting subtitles srt", + # metadata for upload to PyPI + author="Gregor Kališnik, Unimatrix", + author_email="gregor@kalisnik.si, info@unimatrix.si", + description="A set of parsers and exports for subtitles in various " + "formats", + license="BSD", + keywords="parsing exporting subtitles srt", ) diff --git a/tests/pysubtools_test.py b/tests/pysubtools_test.py index 63b4c86..a91c978 100644 --- a/tests/pysubtools_test.py +++ b/tests/pysubtools_test.py @@ -14,237 +14,242 @@ from pysubtools.exporters import Exporter from pysubtools.utils import PatchedGzipFile as GzipFile + class TestCase(unittest.TestCase): - def test_sif(self): - """Test 'Subtitle Intermediate Format' loaders and dumpers.""" - # Make a test subtitle with two lines - subtitle = Subtitle() - subtitle.add_unit(SubtitleUnit( - start = 15, - end = 30, - lines = ['First line with \u0161'] - )) - subtitle.add_unit(SubtitleUnit( - start = 65, - end = 89, - position = { - 'x': 50, - 'y': 30, - }, - lines = ['Another, but a two liner \u010d', - 'Yes, I said two liner! \u017e'] - )) - subtitle[0][0].special = True - subtitle[0][0].complex = { - 'a': {'4': 2}, - 'b': [1,2,3] - } - subtitle[1][1].test = 'test' - - # Write it - tmpfd, tmp = tempfile.mkstemp() - tmpfd2, tmp2 = tempfile.mkstemp() - subtitle.save(io.BufferedWriter(io.FileIO(tmpfd, mode = 'w'))) - subtitle.save(io.BufferedWriter(io.FileIO(tmpfd2, mode = 'w')), human_time = False) - - # Load it and test - assert Subtitle.from_file(tmp) == subtitle - assert Subtitle.from_file(tmp2) == subtitle - - # Remove temp files - os.unlink(tmp) - os.unlink(tmp2) - - # And some minor things - # repr - just checking for exceptions - repr(subtitle) - for u in subtitle: - repr(u) - for l in u: - repr(l) - - def test_sif_gz(self): - """Test gzipped 'Subtitle Intermediate Format' loaders and dumpers (just wrapped around GzipFile).""" - # Make a test subtitle with two lines - subtitle = Subtitle() - subtitle.add_unit(SubtitleUnit( - start = 15, - end = 30, - lines = [u'First line with \u0161'] - )) - subtitle.add_unit(SubtitleUnit( - start = 65, - end = 89, - lines = [u'Another, but a two liner \u010d', - u'Yes, I said two liner! \u017e'] - )) - - # Write it - tmpfd, tmp = tempfile.mkstemp() - tmpfd2, tmp2 = tempfile.mkstemp() - subtitle.save(GzipFile(tmp, mode = 'wb')) - subtitle.save(GzipFile(tmp2, mode = 'wb'), human_time = False) - - # Load it and test - assert Subtitle.from_file(GzipFile(tmp, mode = 'rb')) == subtitle - assert Subtitle.from_file(GzipFile(tmp2, mode = 'rb')) == subtitle - - # Remove temp files - os.unlink(tmp) - os.unlink(tmp2) - - def test_multi_sif_gz(self): - """Test multiple gzipped subtitles.""" - # Make a test subtitle with two lines - subtitle = Subtitle() - subtitle.add_unit(SubtitleUnit( - start = 15, - end = 30, - lines = ['First line with \u0161'] - )) - subtitle.add_unit(SubtitleUnit( - start = 65, - end = 89, - lines = ['Another, but a two liner \u010d', - 'Yes, I said two liner! \u017e'] - )) - subtitle2 = Subtitle() - subtitle2.add_unit(SubtitleUnit( - start = 16, - end = 31, - lines = ['First line with \u0161'] - )) - subtitle2.add_unit(SubtitleUnit( - start = 66, - end = 90, - lines = ['Another, but a two liner \u010d', - 'Yes, I said two liner! \u017e'] - )) - subtitle3 = Subtitle() - subtitle3.add_unit(SubtitleUnit( - start = 17, - end = 32, - lines = ['First line with \u0161'] - )) - subtitle3.add_unit(SubtitleUnit( - start = 67, - end = 91, - lines = ['Another, but a two liner \u010d', - 'Yes, I said two liner! \u017e'] - )) - - # Write it (several times) - tmpfd, tmp = tempfile.mkstemp() - tmpf = GzipFile(tmp, mode = 'wb') - subtitle.save(tmpf, close = False) - subtitle2.save(tmpf, close = False) - subtitle3.save(tmpf) - - # Load it and test - tmpf = GzipFile(tmp, mode = 'rb') - assert list(Subtitle.from_file_multi(tmpf)) == [subtitle, - subtitle2, - subtitle3] - tmpf.close() - - # Remove temp files - os.unlink(tmp) - - def test_parsers(self): - """Test parsers.""" - def format_test(root, suffix, format): - for filename in (i for i in os.listdir(root) if i.endswith(suffix)): - with open(os.path.join(root, filename), 'rb') as f: - parser = Parser.from_format(format, stop_level = None) - parsed = parser.parse(f) - - d = dict( - encoding = parser.encoding, - warnings = parser.warnings, - errors = parser.errors - ) - - result = os.path.join(root, filename[:-4]) - if os.path.isfile(result + '.msgs.yaml'): - loaded_d = yaml.load(open(result + '.msgs.yaml', 'r')) - sub = Subtitle.from_file(result + '.sif') - else: - # Write it - yaml.dump(d, open(result + '.msgs.yaml', 'w'), - default_flow_style = False) - parsed.save(result + '.sif', allow_unicode = False) - continue - - assert d == loaded_d - assert sub == parsed - - # Go through all subrips - format_test('./tests/data/srt', '.srt', 'SubRip') - format_test('./tests/data/microdvd', '.sub', 'MicroDVD') - - def test_encoding(self): - """Tests if internal encoder tester works as it should (premature IO closures are the concern)""" - f = open('./tests/data/corner/encoding_detection.srt', 'rb') - - # Test all possible paths - parser = Parser.from_format('SubRip') - - # As fileobj - sub1 = parser.parse(f) - - # As from_data with fileobj - f.seek(0) - parser = parser.from_data(f) - sub3 = parser.parse() - - # As string - f.seek(0) - d = f.read() - sub2 = parser.parse(d) - - # As from_data with string - parser = parser.from_data(d) - sub4 = parser.parse() - - # All of them must be the same - assert sub1 == sub2 == sub3 == sub4 - - f = open('./tests/data/corner/encoding_error.srt', 'rb') - try: - sub = parser.parse(f) - except encodings.EncodingError as e: - assert e.tried_encodings == [] - - def test_subrip_export(self): - """Tests SubRip exporter on a simple subtitle.""" - subtitle = Subtitle() - subtitle.add_unit(SubtitleUnit( - start = 15, - end = 30, - lines = ['First line with \u0161'] - )) - subtitle.add_unit(SubtitleUnit( - start = 65, - end = 89, - lines = ['Another, but a two liner \u010d', - 'Yes, I said two liner! \u017e'] - )) - subtitle.add_unit(SubtitleUnit( - start = 3665, - end = 3689, - lines = ['Another, but a two liner \u010d', - 'Yes, I said two liner! \u017e'] - )) - - # Construct exporter - exporter = Exporter.from_format('SubRip') - - # Export - buf = io.BytesIO() - exporter.export(buf, subtitle) - - # Now, check the outputted subtitle - assert buf.getvalue() == b"""1\r + def test_sif(self): + """Test 'Subtitle Intermediate Format' loaders and dumpers.""" + # Make a test subtitle with two lines + subtitle = Subtitle() + subtitle.add_unit(SubtitleUnit( + start=15, + end=30, + lines=['First line with \u0161'] + )) + subtitle.add_unit(SubtitleUnit( + start=65, + end=89, + position={ + 'x': 50, + 'y': 30, + }, + lines=['Another, but a two liner \u010d', + 'Yes, I said two liner! \u017e'] + )) + subtitle[0][0].special = True + subtitle[0][0].complex = { + 'a': {'4': 2}, + 'b': [1, 2, 3] + } + subtitle[1][1].test = 'test' + + # Write it + tmpfd, tmp = tempfile.mkstemp() + tmpfd2, tmp2 = tempfile.mkstemp() + subtitle.save(io.BufferedWriter(io.FileIO(tmpfd, mode='w'))) + subtitle.save(io.BufferedWriter( + io.FileIO(tmpfd2, mode='w')), human_time=False) + + # Load it and test + assert Subtitle.from_file(tmp) == subtitle + assert Subtitle.from_file(tmp2) == subtitle + + # Remove temp files + os.unlink(tmp) + os.unlink(tmp2) + + # And some minor things + # repr - just checking for exceptions + repr(subtitle) + for u in subtitle: + repr(u) + for l in u: + repr(l) + + def test_sif_gz(self): + """Test gzipped 'Subtitle Intermediate Format' loaders and dumpers + (just wrapped around GzipFile).""" + # Make a test subtitle with two lines + subtitle = Subtitle() + subtitle.add_unit(SubtitleUnit( + start=15, + end=30, + lines=[u'First line with \u0161'] + )) + subtitle.add_unit(SubtitleUnit( + start=65, + end=89, + lines=[u'Another, but a two liner \u010d', + u'Yes, I said two liner! \u017e'] + )) + + # Write it + tmpfd, tmp = tempfile.mkstemp() + tmpfd2, tmp2 = tempfile.mkstemp() + subtitle.save(GzipFile(tmp, mode='wb')) + subtitle.save(GzipFile(tmp2, mode='wb'), human_time=False) + + # Load it and test + assert Subtitle.from_file(GzipFile(tmp, mode='rb')) == subtitle + assert Subtitle.from_file(GzipFile(tmp2, mode='rb')) == subtitle + + # Remove temp files + os.unlink(tmp) + os.unlink(tmp2) + + def test_multi_sif_gz(self): + """Test multiple gzipped subtitles.""" + # Make a test subtitle with two lines + subtitle = Subtitle() + subtitle.add_unit(SubtitleUnit( + start=15, + end=30, + lines=['First line with \u0161'] + )) + subtitle.add_unit(SubtitleUnit( + start=65, + end=89, + lines=['Another, but a two liner \u010d', + 'Yes, I said two liner! \u017e'] + )) + subtitle2 = Subtitle() + subtitle2.add_unit(SubtitleUnit( + start=16, + end=31, + lines=['First line with \u0161'] + )) + subtitle2.add_unit(SubtitleUnit( + start=66, + end=90, + lines=['Another, but a two liner \u010d', + 'Yes, I said two liner! \u017e'] + )) + subtitle3 = Subtitle() + subtitle3.add_unit(SubtitleUnit( + start=17, + end=32, + lines=['First line with \u0161'] + )) + subtitle3.add_unit(SubtitleUnit( + start=67, + end=91, + lines=['Another, but a two liner \u010d', + 'Yes, I said two liner! \u017e'] + )) + + # Write it (several times) + tmpfd, tmp = tempfile.mkstemp() + tmpf = GzipFile(tmp, mode='wb') + subtitle.save(tmpf, close=False) + subtitle2.save(tmpf, close=False) + subtitle3.save(tmpf) + + # Load it and test + tmpf = GzipFile(tmp, mode='rb') + assert list(Subtitle.from_file_multi(tmpf)) == [subtitle, + subtitle2, + subtitle3] + tmpf.close() + + # Remove temp files + os.unlink(tmp) + + def test_parsers(self): + """Test parsers.""" + def format_test(root, suffix, format): + for filename in (i for i in os.listdir(root) + if i.endswith(suffix)): + with open(os.path.join(root, filename), 'rb') as f: + parser = Parser.from_format(format, stop_level=None) + parsed = parser.parse(f) + + d = dict( + encoding=parser.encoding, + warnings=parser.warnings, + errors=parser.errors + ) + + result = os.path.join(root, filename[:-4]) + if os.path.isfile(result + '.msgs.yaml'): + loaded_d = yaml.load(open(result + '.msgs.yaml', 'r')) + sub = Subtitle.from_file(result + '.sif') + else: + # Write it + yaml.dump(d, open(result + '.msgs.yaml', 'w'), + default_flow_style=False) + parsed.save(result + '.sif', allow_unicode=False) + continue + + assert d == loaded_d + assert sub == parsed + + # Go through all subrips + format_test('./tests/data/srt', '.srt', 'SubRip') + format_test('./tests/data/microdvd', '.sub', 'MicroDVD') + + def test_encoding(self): + """Tests if internal encoder tester works as it should + (premature IO closures are the concern)""" + f = open('./tests/data/corner/encoding_detection.srt', 'rb') + + # Test all possible paths + parser = Parser.from_format('SubRip') + + # As fileobj + sub1 = parser.parse(f) + + # As from_data with fileobj + f.seek(0) + parser = parser.from_data(f) + sub3 = parser.parse() + + # As string + f.seek(0) + d = f.read() + sub2 = parser.parse(d) + + # As from_data with string + parser = parser.from_data(d) + sub4 = parser.parse() + + # All of them must be the same + assert sub1 == sub2 == sub3 == sub4 + + f = open('./tests/data/corner/encoding_error.srt', 'rb') + try: + sub = parser.parse(f) + except encodings.EncodingError as e: + assert e.tried_encodings == [] + + def test_subrip_export(self): + """Tests SubRip exporter on a simple subtitle.""" + subtitle = Subtitle() + subtitle.add_unit(SubtitleUnit( + start=15, + end=30, + lines=['First line with \u0161'] + )) + subtitle.add_unit(SubtitleUnit( + start=65, + end=89, + lines=['Another, but a two liner \u010d', + 'Yes, I said two liner! \u017e'] + )) + subtitle.add_unit(SubtitleUnit( + start=3665, + end=3689, + lines=['Another, but a two liner \u010d', + 'Yes, I said two liner! \u017e'] + )) + + # Construct exporter + exporter = Exporter.from_format('SubRip') + + # Export + buf = io.BytesIO() + exporter.export(buf, subtitle) + + # Now, check the outputted subtitle + assert buf.getvalue() == b"""1\r 00:00:15,000 --> 00:00:30,000\r First line with \xc5\xa1\r \r @@ -259,12 +264,12 @@ def test_subrip_export(self): Yes, I said two liner! \xc5\xbe\r """ - # Now we try with different encoding - buf = io.BytesIO() - exporter = Exporter.from_format('SubRip', encoding = 'cp1250') - exporter.export(buf, subtitle) + # Now we try with different encoding + buf = io.BytesIO() + exporter = Exporter.from_format('SubRip', encoding='cp1250') + exporter.export(buf, subtitle) - assert buf.getvalue() == b"""1\r + assert buf.getvalue() == b"""1\r 00:00:15,000 --> 00:00:30,000\r First line with \x9a\r \r @@ -279,68 +284,70 @@ def test_subrip_export(self): Yes, I said two liner! \x9e\r """ - def test_subtitle_lines(self): - """Tests API of the subtitle lines.""" - sub = Subtitle() - sub.append(SubtitleUnit( - 0, 1, ['First line', 'Second line'] - )) - - # Check line access - assert str(sub[0][0]) == 'First line' - assert str(sub[0][1]) == 'Second line' - - # Add some metadata - sub[0][0].styles = { - 'color': 'red' - } - sub[0][1].styles = { - 'color': 'blue' - } - - # Check them up - assert sub[0][0].styles == { - 'color': 'red' - } - assert sub[0][1].styles == { - 'color': 'blue' - } - - # Update lines - sub[0][0].text = 'Just a line' - sub[0][1].text = 'Just another line' - - assert str(sub[0][0]) == 'Just a line' - assert str(sub[0][1]) == 'Just another line' - - # Metadata should still be there - assert sub[0][0].styles == { - 'color': 'red' - } - assert sub[0][1].styles == { - 'color': 'blue' - } - - def test_lookup(self): - """Some encoding python cannot read, we need to make sure it won't make a low-level error.""" - f = open('./tests/data/corner/lookup_error.srt', 'rb') - parser = Parser.from_format('SubRip') - - # This should work now (also, this subtitle has some special chars that without EUC-TW => BIG5-TW would not work) - sub = parser.parse(f, encoding = 'bullshit') - - def test_high_mem_srt(self): - """Tests a issue of high memory usage on subRip parser.""" - f = open('./tests/data/corner/high_mem.srt', 'rb') - parser = Parser.from_format('SubRip', stop_level = None) - - # This line should not break the parser - sub = parser.parse(f) - - def test_srt_autodetect(self): - """Found a example of srt not detected by autodetection feature.""" - f = open('./tests/data/corner/autodetect.srt', 'rb') - parser = Parser.from_data(f) - - # Will it parse? - sub = parser.parse(f) + def test_subtitle_lines(self): + """Tests API of the subtitle lines.""" + sub = Subtitle() + sub.append(SubtitleUnit( + 0, 1, ['First line', 'Second line'] + )) + + # Check line access + assert str(sub[0][0]) == 'First line' + assert str(sub[0][1]) == 'Second line' + + # Add some metadata + sub[0][0].styles = { + 'color': 'red' + } + sub[0][1].styles = { + 'color': 'blue' + } + + # Check them up + assert sub[0][0].styles == { + 'color': 'red' + } + assert sub[0][1].styles == { + 'color': 'blue' + } + + # Update lines + sub[0][0].text = 'Just a line' + sub[0][1].text = 'Just another line' + + assert str(sub[0][0]) == 'Just a line' + assert str(sub[0][1]) == 'Just another line' + + # Metadata should still be there + assert sub[0][0].styles == { + 'color': 'red' + } + assert sub[0][1].styles == { + 'color': 'blue' + } + + def test_lookup(self): + """Some encoding python cannot read, we need to make sure + it won't make a low-level error.""" + f = open('./tests/data/corner/lookup_error.srt', 'rb') + parser = Parser.from_format('SubRip') + + # This should work now (also, this subtitle has some special + # chars that without EUC-TW => BIG5-TW would not work) + sub = parser.parse(f, encoding='bullshit') + + def test_high_mem_srt(self): + """Tests a issue of high memory usage on subRip parser.""" + f = open('./tests/data/corner/high_mem.srt', 'rb') + parser = Parser.from_format('SubRip', stop_level=None) + + # This line should not break the parser + sub = parser.parse(f) + + def test_srt_autodetect(self): + """Found a example of srt not detected by autodetection feature.""" + f = open('./tests/data/corner/autodetect.srt', 'rb') + parser = Parser.from_data(f) + + # Will it parse? + sub = parser.parse(f) From 5fb86d9eb9b889390fdaaa812a6032acf393b48e Mon Sep 17 00:00:00 2001 From: Tit Date: Thu, 20 Jun 2019 14:14:31 +0200 Subject: [PATCH 2/2] Resolving comments on Github Resolving multiple comments from Github regarding coding style and additional flake8 linter errors. --- pysubtools/__init__.py | 2 -- pysubtools/exporters/base.py | 12 ++++++---- pysubtools/exporters/subrip.py | 3 ++- pysubtools/parsers/base.py | 5 ++-- pysubtools/parsers/encodings.py | 14 +++++------ pysubtools/parsers/microdvd.py | 8 +++---- pysubtools/subtitle.py | 42 ++++++++++++++++----------------- tests/pysubtools_test.py | 2 +- 8 files changed, 44 insertions(+), 44 deletions(-) diff --git a/pysubtools/__init__.py b/pysubtools/__init__.py index a663e9d..8c5b24f 100644 --- a/pysubtools/__init__.py +++ b/pysubtools/__init__.py @@ -3,8 +3,6 @@ from __future__ import print_function from __future__ import unicode_literals -# from zipfile import ZipExtFile - from . import parsers from . import exporters from .subtitle import Subtitle, SubtitleUnit, SubtitleLine diff --git a/pysubtools/exporters/base.py b/pysubtools/exporters/base.py index 273e864..5e295cb 100644 --- a/pysubtools/exporters/base.py +++ b/pysubtools/exporters/base.py @@ -22,7 +22,8 @@ def from_format(format, **options): if exporter.FORMAT == format: return exporter(**options) raise NoExporterFound( - "Could not find exporter with name '{}'.".format(format)) + "Could not find exporter with name '{}'.".format(format) + ) def __init__(self, **options): self._init(**options) @@ -60,7 +61,7 @@ def export(self, output, subtitle): raise TypeError("Can export only Subtitle objects.") try: - basestring + global basestring except NameError: # Python3 compat basestring = str @@ -71,8 +72,11 @@ def export(self, output, subtitle): try: if isinstance(output, file): output = io.BufferedWriter( - io.FileIO(output.fileno(), closefd=False, - mode=output.mode)) + io.FileIO( + output.fileno(), + closefd=False, + mode=output.mode) + ) except NameError: # Python3 does not need this pass diff --git a/pysubtools/exporters/subrip.py b/pysubtools/exporters/subrip.py index 641bc03..924e4c2 100644 --- a/pysubtools/exporters/subrip.py +++ b/pysubtools/exporters/subrip.py @@ -52,7 +52,8 @@ def _export_unit(self, unit): # TODO 3D positions output.append("{} --> {}".format( self._convert_time(unit.start), - self._convert_time(unit.end)).encode(self._encoding)) + self._convert_time(unit.end) + ).encode(self._encoding)) # Text output.append(self._line_ending.join( [i.encode(self._encoding, 'ignore') for i in unit.lines])) diff --git a/pysubtools/parsers/base.py b/pysubtools/parsers/base.py index d962afe..563fca3 100644 --- a/pysubtools/parsers/base.py +++ b/pysubtools/parsers/base.py @@ -4,7 +4,6 @@ from __future__ import unicode_literals import io -# import functools from . import encodings @@ -68,8 +67,8 @@ def __init__(self, stop_level='error'): self._current_line = None def _add_msg(self, level, line_number, column, line, description): - if (self._stop_level and self.LEVELS.index(level) >= - self.LEVELS.index(self._stop_level)): + if (self._stop_level and self.LEVELS.index(level) + >= self.LEVELS.index(self._stop_level)): if level == 'warning': raise ParseWarning(line_number, column, line, description) elif level == 'error': diff --git a/pysubtools/parsers/encodings.py b/pysubtools/parsers/encodings.py index 8b3462f..a81f5de 100644 --- a/pysubtools/parsers/encodings.py +++ b/pysubtools/parsers/encodings.py @@ -112,16 +112,16 @@ def detect(data, encoding=None, language=None): # Reverse order encodings.reverse() while True: - encoding = encodings.pop() - if can_decode(data, encoding if not isinstance(encoding, tuple) else - encoding[0]): + encoding = encoding_to_use = encodings.pop() + if isinstance(encoding, tuple) is not True: + encoding_to_use = encoding[0] + + if can_decode(data, encoding_to_use): # We've found it! break - tried_encodings.add(encoding if not isinstance( - encoding, tuple) else encoding[0]) + tried_encodings.add(encoding_to_use) - similar = similar_encodings.get( - encoding if not isinstance(encoding, tuple) else encoding[0]) + similar = similar_encodings.get(encoding_to_use) if similar: encodings += list(set(similar).difference(tried_encodings)) if not encodings: diff --git a/pysubtools/parsers/microdvd.py b/pysubtools/parsers/microdvd.py index 380c05c..4b43f05 100644 --- a/pysubtools/parsers/microdvd.py +++ b/pysubtools/parsers/microdvd.py @@ -101,12 +101,12 @@ def _parse_header(self, header, global_only=False): t['styles']['*']['font-family'] = v.strip() elif k.lower() == 's': # Font size - t['styles']['*']['font-size'] = v.strip() + \ - ('px' if v.strip().isdigit() else '') + t['styles']['*']['font-size'] = v.strip() + + ('px' if v.strip().isdigit() else '') elif k.lower() == 'c': # Text color v = v.strip() - if re.match('^\$[0-9a-fA-F]{6}$', v): + if re.match(r'^\$[0-9a-fA-F]{6}$', v): t['styles']['*']['color'] = '#' + v[5:] + v[3:5] + v[1:3] else: self.add_warning(self._current_line_num + 1, @@ -115,7 +115,7 @@ def _parse_header(self, header, global_only=False): u"Wrong color format {}.".format(v)) elif k == 'P': # Position - m = re.match('^\s*(\d+)\s*,\s*(\d+)\s*$', v) + m = re.match(r'^\s*(\d+)\s*,\s*(\d+)\s*$', v) if not m: self.add_warning(self._current_line_num + 1, 1, diff --git a/pysubtools/subtitle.py b/pysubtools/subtitle.py index e1b2517..1e99e70 100644 --- a/pysubtools/subtitle.py +++ b/pysubtools/subtitle.py @@ -216,9 +216,9 @@ class SubtitleLines(list): """Modified list class for special tratment of lines.""" __slots__ = () - def __new__(cls, ls=[]): + def __new__(cls, lines_=[]): obj = super(SubtitleLines, cls).__new__(cls) - for i in ls: + for i in lines_: obj.append(i) return obj @@ -234,8 +234,8 @@ def _validate(value): if not isinstance(value, SubtitleLine): raise TypeError( - "Subtitle line needs to be unicode instead of '{}'". - format(type(value))) + "Subtitle line needs to be unicode instead of '{}'" + .format(type(value))) return value def append(self, value): @@ -270,9 +270,8 @@ def distance(self, other): """Calculates signed distance with other subtitle unit.""" if not isinstance(other, SubtitleUnit): raise TypeError( - "Can calculate distance only with SubtitleUnit and not '{}'". - format(type(other))) - + "Can calculate distance only with SubtitleUnit and not '{}'" + .format(type(other))) return other.start - self.start def __iter__(self): @@ -309,8 +308,8 @@ def move(self, distance): """Moves subtitle unit by 'distance' seconds.""" if not isinstance(distance, (int, long, float)): raise TypeError( - "Need type of int, long or float instead of '{}'". - format(type(distance))) + "Need type of int, long or float instead of '{}'" + .format(type(distance))) self.start += distance self.end += distance @@ -326,8 +325,8 @@ def stretch(self, factor): """ if not isinstance(factor, (int, long, float)): raise TypeError( - "Need type of int, long or float instead of '{}'". - format(type(factor))) + "Need type of int, long or float instead of '{}'" + .format(type(factor))) self.start *= factor self.end *= factor @@ -351,8 +350,8 @@ def __sub__(self, other): """See SubtitleUnit.get_moved.""" if not isinstance(other, (int, long, float)): raise TypeError( - "Need type of int, long or float instead of '{}'". - format(type(other))) + "Need type of int, long or float instead of '{}'" + .format(type(other))) return self.get_moved(-1 * other) def __add__(self, other): @@ -363,8 +362,8 @@ def __isub__(self, other): """Same as SubtitleUnit.move.""" if not isinstance(other, (int, long, float)): raise TypeError( - "Need type of int, long or float instead of '{}'". - format(type(other))) + "Need type of int, long or float instead of '{}'" + .format(type(other))) self.move(-1 * other) def __iadd__(self, other): @@ -382,8 +381,8 @@ def __imul__(self, other): def __eq__(self, other): if not isinstance(other, SubtitleUnit): raise TypeError( - "Can compare only with other SubtitleUnit, provided with '{}'". - format(type(other))) + "Can compare only with other SubtitleUnit, provided with '{}'" + .format(type(other))) return self.__dict__ == other.__dict__ @@ -415,11 +414,10 @@ def to_dict(self, human_time=True): output.update(self.__dict__) # Overide custom attributes output['start'] = HumanTime.from_seconds( - self.start) if (human_time and not - isinstance(self.start, Frame)) else self.start - output['end'] = HumanTime.from_seconds( - self.end) if (human_time and not - isinstance(self.end, Frame)) else self.end + self.start) if (human_time and not isinstance(self.start, Frame)) \ + else self.start + output['end'] = HumanTime.from_seconds(self.end) if \ + (human_time and not isinstance(self.end, Frame)) else self.end # And lines output['lines'] = [i.export() for i in self._lines] diff --git a/tests/pysubtools_test.py b/tests/pysubtools_test.py index a91c978..5ff3f08 100644 --- a/tests/pysubtools_test.py +++ b/tests/pysubtools_test.py @@ -216,7 +216,7 @@ def test_encoding(self): f = open('./tests/data/corner/encoding_error.srt', 'rb') try: - sub = parser.parse(f) + sub1 = parser.parse(f) except encodings.EncodingError as e: assert e.tried_encodings == []