From c2cfea9426e6dd262fc9e34e5204f5e7c043b067 Mon Sep 17 00:00:00 2001 From: Oliver Falk Date: Mon, 5 Mar 2018 11:01:27 +0100 Subject: [PATCH 1/4] Add examples directory with first example script --- examples/.gitignore | 2 + examples/check_sec_updates.py | 225 ++++++++++++++++++++++++++++++++++ examples/test.py | 59 +++++++++ 3 files changed, 286 insertions(+) create mode 100644 examples/.gitignore create mode 100755 examples/check_sec_updates.py create mode 100644 examples/test.py diff --git a/examples/.gitignore b/examples/.gitignore new file mode 100644 index 0000000..a295864 --- /dev/null +++ b/examples/.gitignore @@ -0,0 +1,2 @@ +*.pyc +__pycache__ diff --git a/examples/check_sec_updates.py b/examples/check_sec_updates.py new file mode 100755 index 0000000..f5f2e92 --- /dev/null +++ b/examples/check_sec_updates.py @@ -0,0 +1,225 @@ +#!/usr/bin/python + +from sys import stdin, stdout, exit +from sys import path as syspath +syspath.append('..') +from rhsda import ApiClient +from re import match as rematch +from pprint import pprint +from datetime import datetime +from sqlalchemy import Column, String, Integer +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker +from pickle import dumps as pickle_dump +from pickle import loads as pickle_load +from json import dumps as json_dump +from json import loads as json_load +import logging +from logging import debug, info, warning, critical +from rpmUtils.miscutils import splitFilename + +exitvals = { + 'OK': 0, + 'WARNING': 1, + 'CRITICAL': 2, + 'UNKNOWN': 3, +} + +#logging.basicConfig(format = '%(message)s', level=logging.DEBUG) +logging.basicConfig(format = '%(message)s', level=logging.WARNING) + +engine = create_engine('sqlite:///updates_cache.db') +Base = declarative_base() +Session = sessionmaker(bind=engine) + +class CVE_cache(Base): + """ + Define our CVE cache table + name = RPM package name + date = the date we queried CVEs for + product = the product name we used to query the API + data = The serialized data we got from the API + """ + + __tablename__ = 'cvecache' + + id = Column(Integer, primary_key=True) + name = Column(String) + date = Column(String) # TODO: Well, not good, but for the moment... + product = Column(String) + data = Column(String) + + + def __repr__(self): + return "='%s', product='%s')>" % ( + self.name, + self.date, + self.product, + ) + + +def main(input = stdin, quiet = False): + try: + installed_packages = dict() + os_maj_version = None + os_name = None + api = ApiClient(logLevel='error') + + session = Session() + + severity = 'low' + output_text = '' + issues = 0 + + try: + Base.metadata.create_all(engine) + except Exception as e: + debug('Exception: %s' % e) + pass + + # Iterate of the lines from stdin + for line in input: + # Ignore lines that are commented out or empty lines + if line.startswith('#'): continue + if rematch(r"""^\s+$""", line): continue + + (buildtime, name, version, release) = line.rstrip().split(' ') + + # Ignore public gpg keys + if name == 'gpg-pubkey': continue + + # Check if it's the OS release package (redhat-release, centos-release, + # fedora-release [note: not supported!] + re_result = rematch(r"""^(\w+)-release.*""", name) + if re_result: + os_name = re_result.group(1) + re_result = rematch(r"""^(\d+).*""", version) + (os_maj_version, ) = re_result.group(1) + + # Figure out which package is the latest and skip packages with the same + # NVR, but different arch + if name in installed_packages: + if version == installed_packages[name]['version'] and \ + release == installed_packages[name]['release']: + # Skip. It's most probably the same package, but different + # arch. Eg. i686/x86_64 on x64 systems + continue + if buildtime < installed_packages[name]['buildtime']: + debug('%s-%s-%s (%s) is older than %s-%s-%s (%s)' % ( + name, version, release, buildtime, + name, + installed_packages[name]['version'], + installed_packages[name]['release'], + installed_packages[name]['buildtime']) + ) + continue + installed_packages[name] = { + 'buildtime': buildtime, + 'version': version, + 'release': release, + } + + info('This is a: %s %s' % (os_name, os_maj_version)) + for name in installed_packages: + search_date = datetime.fromtimestamp( + float(installed_packages[name]['buildtime']) + ).strftime("%Y-%m-%d") + + product = "(linux %s)" % os_maj_version + + # Check if we already have any information in the database + query = session.query(CVE_cache).filter_by( + name = name, + product = product, + date = search_date) + + data = None + # Nothing in DB, query online + if not query.count() > 0: + data = api.find_cves(after = search_date, + package = name, + product = product) + + new_data = [] + for item in data: + debug('Item: %s' % item) + packages = [] + for pkg in item['affected_packages']: + pkg_name = splitFilename(pkg)[0] + if pkg_name == name: + packages.append(pkg) + + # If some package names match, add it. + if packages: + new_data.append(item) + data = new_data + + + # Debug output (found something or not...) + if data: + debug('Issues for %s: %s' % (name, data)) + else: + debug('Nothing found for %s' % name) + + # Whatever the result was, we cache it + dbobj = CVE_cache(name = name, product = product, date = search_date, data = pickle_dump(data)) + session.add(dbobj) + session.commit() + + # Found data in DB, unpickle + else: + data = pickle_load(query.one().data) + if data: + debug('(Cached) issues for %s: %s' % (name, data)) + else: + debug('(Cached) Nothing found for %s' % name) + + # We have data, either fresh from the API (web) or from the + # database + if data: + output_text += '%s has security issues:' % name + for issue in data: + output_text += ' * %s (%s)\n - %s\n' % ( + issue['bugzilla_description'].rstrip().lstrip(), issue['severity'], issue['resource_url']) + if issue['severity'] == 'moderate' and severity != 'important': + severity = 'moderate' + elif issue['severity'] == 'important' and severity != 'important': + severity = 'important' + + issues += 1 + + session.close() + level = 'OK' + if output_text: + if severity == 'important' or severity == 'moderate': + level = 'CRITICAL' + elif severity == 'low': + level = 'WARNING' + if not quiet: + print('%s: %i security issues found in %i packages (highest severity: %s)' % (level, issues, len(installed_packages), severity)) + print(output_text) + exit(exitvals[level]) + + except KeyboardInterrupt as e: + stdout.flush() + if not quiet: + print('UNKNOWN: %s' % e) + exit(exitvals['UNKNOWN']) + except Exception as e: + stdout.flush() + if not quiet: + print('UNKNOWN: %s' % e) + exit(exitvals['UNKNOWN']) + + if not quiet: + print('OK: Obviously, no issues were found') + exit(exitvals['OK']) + +if __name__ == '__main__': + """ + Simple example script for rhsda, showing how to find CVEs that apply to + your packages. Generate the list with: + rpm -qa --qf '%{buildtime} %{name} %{version} %{release}\n' + """ + main() diff --git a/examples/test.py b/examples/test.py new file mode 100644 index 0000000..112393d --- /dev/null +++ b/examples/test.py @@ -0,0 +1,59 @@ +import unittest +import check_sec_updates +from StringIO import StringIO + +class TestExitCodes(unittest.TestCase): + + def test_ok(self): + s = StringIO( +""" +1488124984 redhat-release-server 6Server 6.9.0.4.el6 +1258685031 bc 1.06.95 1.el6 +1276766929 gstreamer 0.10.29 1.el6 +""") + with self.assertRaises(SystemExit) as cm: + check_sec_updates.main(input = s, quiet = True) + self.assertEqual(cm.exception.code, 0, 'Must exit with OK') + + def test_warning(self): + s = StringIO( +""" +1488124984 redhat-release-server 6Server 6.9.0.4.el6 +1457450163 mc 4.7.0.2 6.el6 + +# This one has CVEs +1346670317 libxslt 1.1.26 2.el6_3.1 + +1258685031 bc 1.06.95 1.el6 +1276766929 gstreamer 0.10.29 1.el6 +""") + with self.assertRaises(SystemExit) as cm: + check_sec_updates.main(input = s, quiet = True) + self.assertEqual(cm.exception.code, 1, 'Must exit with WARNING (1)') + + def test_critical(self): + s = StringIO( +""" +1488124984 redhat-release-server 6Server 6.9.0.4.el6 +# This one has an important CVE +1425043132 pcre 7.8 7.el6 +""") + with self.assertRaises(SystemExit) as cm: + check_sec_updates.main(input = s, quiet = True) + self.assertEqual(cm.exception.code, 2, 'Must exit with CRITICAL (2)') + + def test_unknown(self): + s = StringIO( +""" +1488124984 redhat-release-server 6Server 6.9.0.4.el6 +# This is a b0rken entry +1425043132 pcre +""") + with self.assertRaises(SystemExit) as cm: + check_sec_updates.main(input = s, quiet = True) + self.assertEqual(cm.exception.code, 3, 'Must exit with UNKNOWN (3)') + + + +if __name__ == '__main__': + unittest.main() From 2e6c59282bd682e5025a1d50c6c314da948b2109 Mon Sep 17 00:00:00 2001 From: Oliver Falk Date: Mon, 5 Mar 2018 11:25:13 +0100 Subject: [PATCH 2/4] Better search for include dirs and move cache DB to /var/tmp --- examples/check_sec_updates.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/examples/check_sec_updates.py b/examples/check_sec_updates.py index f5f2e92..43e228a 100755 --- a/examples/check_sec_updates.py +++ b/examples/check_sec_updates.py @@ -2,7 +2,12 @@ from sys import stdin, stdout, exit from sys import path as syspath -syspath.append('..') +from os.path import dirname, abspath +from os.path import join as pathjoin +curdir = dirname(abspath(__file__)) +topdir = abspath(pathjoin(curdir, '..')) +syspath.append(curdir) +syspath.append(topdir) from rhsda import ApiClient from re import match as rematch from pprint import pprint @@ -26,10 +31,12 @@ 'UNKNOWN': 3, } +tmppath = '/var/tmp' + #logging.basicConfig(format = '%(message)s', level=logging.DEBUG) logging.basicConfig(format = '%(message)s', level=logging.WARNING) -engine = create_engine('sqlite:///updates_cache.db') +engine = create_engine('sqlite:///%s/updates_cache.db' % tmppath) Base = declarative_base() Session = sessionmaker(bind=engine) From 7f1865a78218792fb9957f2f125ab5f26d3bd44b Mon Sep 17 00:00:00 2001 From: Oliver Falk Date: Mon, 5 Mar 2018 14:27:09 +0100 Subject: [PATCH 3/4] Use package that will fore sure yield a warning --- examples/test.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/examples/test.py b/examples/test.py index 112393d..2fc1cb9 100644 --- a/examples/test.py +++ b/examples/test.py @@ -18,14 +18,8 @@ def test_ok(self): def test_warning(self): s = StringIO( """ -1488124984 redhat-release-server 6Server 6.9.0.4.el6 -1457450163 mc 4.7.0.2 6.el6 - -# This one has CVEs -1346670317 libxslt 1.1.26 2.el6_3.1 - -1258685031 bc 1.06.95 1.el6 -1276766929 gstreamer 0.10.29 1.el6 +1176766929 redhat-release 5Server 5.9.0.5 +576766929 hypervkvpd 0 0.6 """) with self.assertRaises(SystemExit) as cm: check_sec_updates.main(input = s, quiet = True) From 589a2a71f5eab9d942660f9d0ac409ae994a305e Mon Sep 17 00:00:00 2001 From: Oliver Falk Date: Mon, 5 Mar 2018 14:27:54 +0100 Subject: [PATCH 4/4] It requires more logic... Compare packages version/release --- examples/check_sec_updates.py | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/examples/check_sec_updates.py b/examples/check_sec_updates.py index 43e228a..73894ad 100755 --- a/examples/check_sec_updates.py +++ b/examples/check_sec_updates.py @@ -22,7 +22,7 @@ from json import loads as json_load import logging from logging import debug, info, warning, critical -from rpmUtils.miscutils import splitFilename +from rpmUtils.miscutils import splitFilename, compareEVR exitvals = { 'OK': 0, @@ -70,6 +70,7 @@ def main(input = stdin, quiet = False): try: installed_packages = dict() os_maj_version = None + os_min_version = None os_name = None api = ApiClient(logLevel='error') @@ -98,11 +99,14 @@ def main(input = stdin, quiet = False): # Check if it's the OS release package (redhat-release, centos-release, # fedora-release [note: not supported!] - re_result = rematch(r"""^(\w+)-release.*""", name) + re_result = rematch(r"""^(fedora|redhat|centos)-release.*""", name) if re_result: os_name = re_result.group(1) + if os_name == 'fedora': + print('Fedora is not supported yet') + exit(0) re_result = rematch(r"""^(\d+).*""", version) - (os_maj_version, ) = re_result.group(1) + os_maj_version = re_result.group(1) # Figure out which package is the latest and skip packages with the same # NVR, but different arch @@ -133,7 +137,12 @@ def main(input = stdin, quiet = False): float(installed_packages[name]['buildtime']) ).strftime("%Y-%m-%d") - product = "(linux %s)" % os_maj_version + # Querying with minor release doesn't work (ATM?) + if os_min_version and 1 == 0: + product = "(linux %s.%s)" % (os_maj_version, os_min_version) + else: + product = "(linux %s)" % os_maj_version + debug('Query product: %s' % product) # Check if we already have any information in the database query = session.query(CVE_cache).filter_by( @@ -153,12 +162,19 @@ def main(input = stdin, quiet = False): debug('Item: %s' % item) packages = [] for pkg in item['affected_packages']: - pkg_name = splitFilename(pkg)[0] + # TODO? We do not care about epoch ATM + (pkg_name, pkg_version, pkg_release) = splitFilename(pkg)[0:3] + if pkg_name == name: - packages.append(pkg) + print('pkg: %s' % pkg_name) + if compareEVR([0, pkg_version, pkg_release], + [0, installed_packages[name]['version'], + installed_packages[name]['release']]) > 0: + packages.append(pkg) # If some package names match, add it. if packages: + item['affected_packages'] = packages new_data.append(item) data = new_data @@ -185,9 +201,9 @@ def main(input = stdin, quiet = False): # We have data, either fresh from the API (web) or from the # database if data: - output_text += '%s has security issues:' % name + output_text += '%s has security issues:\n' % name for issue in data: - output_text += ' * %s (%s)\n - %s\n' % ( + output_text += ' * %s (%s)\n - %s\n' % ( issue['bugzilla_description'].rstrip().lstrip(), issue['severity'], issue['resource_url']) if issue['severity'] == 'moderate' and severity != 'important': severity = 'moderate'