kumquat-buildroot/support/scripts/cve.py
Yann E. MORIN 5f253e3e04 support/scripts/cve: fix running on older ijson versions
Commit 22b6945552 (support/scripts/cve.py: switch from NVD to FKIE for
the JSON files) had to change the decompressor from gz to xz, as the new
location is using xz compression.

That commit mentioned that it was spawning an external xz process to do
the decompression, on the pretence that "there is no xz decompressor in
Python stdlib."

Before version 3.1, ijson.items() only accepted a file-like object as
input (that file-like object could yield bytes() or str(), both were
supported). Starting with version 3.1, ijson.items() also accepts that
it be directly passed bytes() or str() directly. subprocess.check_output()
means we are now passing bytes() to ijson.items(), so it fails on ijson
versions before 3.1, with failures such as:

    [...]
      File "/usr/lib/python3/dist-packages/ijson/backends/python.py", line 25, in Lexer
        if type(f.read(0)) == bytetype:
    AttributeError: 'bytes' object has no attribute 'read'

Ubuntu 20.04, on which the pkg-stats run to generate the daily report,
only has ijson 2.3. More recent distros have more recent versions of
ijson, like Fedora 39 that has 3.2.3, recent enough to support being fed
bytes(). Commit 22b6945552 was tested on Fedora 39, so did not catch
the issue.

However, the reasoning in 22b6945552 is wrong: there *is* the lzma
module, at least since python 3.3 (that is, aeons ago), which is able to
read xz-compressed files; it also has an API similar to the gzip module,
and can provide a file-like object that exposes the decompressed data.

So, do just that: provide an lzma-wrapped file-like object to ijson, so
that we can eventually recover our daily reports that everything is
broken! :-]

Note that this construct still works on recent versions!

Reported-by: Thomas Petazzoni <thomas.petazzoni@bootlin.com>
Signed-off-by: Yann E. MORIN <yann.morin.1998@free.fr>
Cc: Arnout Vandecappelle (Essensium/Mind) <arnout@mind.be>
Signed-off-by: Peter Korsgaard <peter@korsgaard.com>
2024-02-29 18:47:16 +01:00

281 lines
9.8 KiB
Python
Executable File

#!/usr/bin/env python3
# Copyright (C) 2009 by Thomas Petazzoni <thomas.petazzoni@free-electrons.com>
# Copyright (C) 2020 by Gregory CLEMENT <gregory.clement@bootlin.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import datetime
import os
import requests # URL checking
import distutils.version
import lzma
import time
import sys
import operator
try:
import ijson
# backend is a module in < 2.5, a string in >= 2.5
if 'python' in getattr(ijson.backend, '__name__', ijson.backend):
try:
import ijson.backends.yajl2_cffi as ijson
except ImportError:
sys.stderr.write('Warning: Using slow ijson python backend\n')
except ImportError:
sys.stderr.write("You need ijson to parse NVD for CVE check\n")
exit(1)
sys.path.append('utils/')
NVD_START_YEAR = 1999
NVD_BASE_URL = "https://github.com/fkie-cad/nvd-json-data-feeds/releases/latest/download"
ops = {
'>=': operator.ge,
'>': operator.gt,
'<=': operator.le,
'<': operator.lt,
'=': operator.eq
}
# Check if two CPE IDs match each other
def cpe_matches(cpe1, cpe2):
cpe1_elems = cpe1.split(":")
cpe2_elems = cpe2.split(":")
remains = filter(lambda x: x[0] not in ["*", "-"] and x[1] not in ["*", "-"] and x[0] != x[1],
zip(cpe1_elems, cpe2_elems))
return len(list(remains)) == 0
def cpe_product(cpe):
return cpe.split(':')[4]
def cpe_version(cpe):
return cpe.split(':')[5]
class CVE:
"""An accessor class for CVE Items in NVD files"""
CVE_AFFECTS = 1
CVE_DOESNT_AFFECT = 2
CVE_UNKNOWN = 3
def __init__(self, nvd_cve):
"""Initialize a CVE from its NVD JSON representation"""
self.nvd_cve = nvd_cve
@staticmethod
def download_nvd_year(nvd_path, year):
metaf = "CVE-%s.meta" % year
path_metaf = os.path.join(nvd_path, metaf)
jsonf_xz = "CVE-%s.json.xz" % year
path_jsonf_xz = os.path.join(nvd_path, jsonf_xz)
# If the database file is less than a day old, we assume the NVD data
# locally available is recent enough.
if os.path.exists(path_jsonf_xz) and os.stat(path_jsonf_xz).st_mtime >= time.time() - 86400:
return path_jsonf_xz
# If not, we download the meta file
url = "%s/%s" % (NVD_BASE_URL, metaf)
print("Getting %s" % url)
page_meta = requests.get(url)
page_meta.raise_for_status()
# If the meta file already existed, we compare the existing
# one with the data newly downloaded. If they are different,
# we need to re-download the database.
# If the database does not exist locally, we need to redownload it in
# any case.
if os.path.exists(path_metaf) and os.path.exists(path_jsonf_xz):
meta_known = open(path_metaf, "r").read()
if page_meta.text == meta_known:
return path_jsonf_xz
# Grab the compressed JSON NVD, and write files to disk
url = "%s/%s" % (NVD_BASE_URL, jsonf_xz)
print("Getting %s" % url)
page_json = requests.get(url)
page_json.raise_for_status()
open(path_jsonf_xz, "wb").write(page_json.content)
open(path_metaf, "w").write(page_meta.text)
return path_jsonf_xz
@staticmethod
def sort_id(cve_ids):
def cve_key(cve_id):
year, id_ = cve_id.split('-')[1:]
return (int(year), int(id_))
return sorted(cve_ids, key=cve_key)
@classmethod
def read_nvd_dir(cls, nvd_dir):
"""
Iterate over all the CVEs contained in NIST Vulnerability Database
feeds since NVD_START_YEAR. If the files are missing or outdated in
nvd_dir, a fresh copy will be downloaded, and kept in .json.gz
"""
for year in range(NVD_START_YEAR, datetime.datetime.now().year + 1):
filename = CVE.download_nvd_year(nvd_dir, year)
try:
content = ijson.items(lzma.LZMAFile(filename), 'cve_items.item')
except: # noqa: E722
print("ERROR: cannot read %s. Please remove the file then rerun this script" % filename)
raise
for cve in content:
yield cls(cve)
def each_product(self):
"""Iterate over each product section of this cve"""
for vendor in self.nvd_cve['cve']['affects']['vendor']['vendor_data']:
for product in vendor['product']['product_data']:
yield product
def parse_node(self, node):
"""
Parse the node inside the configurations section to extract the
cpe information usefull to know if a product is affected by
the CVE. Actually only the product name and the version
descriptor are needed, but we also provide the vendor name.
"""
# The node containing the cpe entries matching the CVE can also
# contain sub-nodes, so we need to manage it.
for child in node.get('children', ()):
for parsed_node in self.parse_node(child):
yield parsed_node
for cpe in node.get('cpeMatch', ()):
if not cpe['vulnerable']:
return
product = cpe_product(cpe['criteria'])
version = cpe_version(cpe['criteria'])
# ignore when product is '-', which means N/A
if product == '-':
return
op_start = ''
op_end = ''
v_start = ''
v_end = ''
if version != '*' and version != '-':
# Version is defined, this is a '=' match
op_start = '='
v_start = version
else:
# Parse start version, end version and operators
if 'versionStartIncluding' in cpe:
op_start = '>='
v_start = cpe['versionStartIncluding']
if 'versionStartExcluding' in cpe:
op_start = '>'
v_start = cpe['versionStartExcluding']
if 'versionEndIncluding' in cpe:
op_end = '<='
v_end = cpe['versionEndIncluding']
if 'versionEndExcluding' in cpe:
op_end = '<'
v_end = cpe['versionEndExcluding']
yield {
'id': cpe['criteria'],
'v_start': v_start,
'op_start': op_start,
'v_end': v_end,
'op_end': op_end
}
def each_cpe(self):
for nodes in self.nvd_cve.get('configurations', []):
for node in nodes['nodes']:
for cpe in self.parse_node(node):
yield cpe
@property
def identifier(self):
"""The CVE unique identifier"""
return self.nvd_cve['id']
@property
def affected_products(self):
"""The set of CPE products referred by this CVE definition"""
return set(cpe_product(p['id']) for p in self.each_cpe())
def affects(self, name, version, cve_ignore_list, cpeid=None):
"""
True if the Buildroot Package object passed as argument is affected
by this CVE.
"""
if self.identifier in cve_ignore_list:
return self.CVE_DOESNT_AFFECT
pkg_version = distutils.version.LooseVersion(version)
if not hasattr(pkg_version, "version"):
print("Cannot parse package '%s' version '%s'" % (name, version))
pkg_version = None
# if we don't have a cpeid, build one based on name and version
if not cpeid:
cpeid = "cpe:2.3:*:*:%s:%s:*:*:*:*:*:*:*" % (name, version)
# if we have a cpeid, use its version instead of the package
# version, as they might be different due to
# <pkg>_CPE_ID_VERSION
else:
pkg_version = distutils.version.LooseVersion(cpe_version(cpeid))
for cpe in self.each_cpe():
if not cpe_matches(cpe['id'], cpeid):
continue
if not cpe['v_start'] and not cpe['v_end']:
return self.CVE_AFFECTS
if not pkg_version:
continue
if cpe['v_start']:
try:
cve_affected_version = distutils.version.LooseVersion(cpe['v_start'])
inrange = ops.get(cpe['op_start'])(pkg_version, cve_affected_version)
except TypeError:
return self.CVE_UNKNOWN
# current package version is before v_start, so we're
# not affected by the CVE
if not inrange:
continue
if cpe['v_end']:
try:
cve_affected_version = distutils.version.LooseVersion(cpe['v_end'])
inrange = ops.get(cpe['op_end'])(pkg_version, cve_affected_version)
except TypeError:
return self.CVE_UNKNOWN
# current package version is after v_end, so we're
# not affected by the CVE
if not inrange:
continue
# We're in the version range affected by this CVE
return self.CVE_AFFECTS
return self.CVE_DOESNT_AFFECT