#!/usr/bin/env python # Copyright (C) 2009 by Thomas Petazzoni # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA import argparse import datetime import fnmatch import os from collections import defaultdict import re import subprocess import requests # URL checking import json import ijson import certifi import distutils.version import time import gzip from urllib3 import HTTPSConnectionPool from urllib3.exceptions import HTTPError from multiprocessing import Pool NVD_START_YEAR = 2002 NVD_JSON_VERSION = "1.0" NVD_BASE_URL = "https://nvd.nist.gov/feeds/json/cve/" + NVD_JSON_VERSION INFRA_RE = re.compile(r"\$\(eval \$\(([a-z-]*)-package\)\)") URL_RE = re.compile(r"\s*https?://\S*\s*$") RM_API_STATUS_ERROR = 1 RM_API_STATUS_FOUND_BY_DISTRO = 2 RM_API_STATUS_FOUND_BY_PATTERN = 3 RM_API_STATUS_NOT_FOUND = 4 # Used to make multiple requests to the same host. It is global # because it's used by sub-processes. http_pool = None class Package: all_licenses = list() all_license_files = list() all_versions = dict() all_ignored_cves = dict() def __init__(self, name, path): self.name = name self.path = path self.infras = None self.has_license = False self.has_license_files = False self.has_hash = False self.patch_count = 0 self.warnings = 0 self.current_version = None self.url = None self.url_status = None self.url_worker = None self.cves = list() self.latest_version = (RM_API_STATUS_ERROR, None, None) def pkgvar(self): return self.name.upper().replace("-", "_") def set_url(self): """ Fills in the .url field """ self.url_status = "No Config.in" for filename in os.listdir(os.path.dirname(self.path)): if fnmatch.fnmatch(filename, 'Config.*'): fp = open(os.path.join(os.path.dirname(self.path), filename), "r") for config_line in fp: if URL_RE.match(config_line): self.url = config_line.strip() self.url_status = "Found" fp.close() return self.url_status = "Missing" fp.close() def set_infra(self): """ Fills in the .infras field """ self.infras = list() with open(self.path, 'r') as f: lines = f.readlines() for l in lines: match = INFRA_RE.match(l) if not match: continue infra = match.group(1) if infra.startswith("host-"): self.infras.append(("host", infra[5:])) else: self.infras.append(("target", infra)) def set_license(self): """ Fills in the .has_license and .has_license_files fields """ var = self.pkgvar() if var in self.all_licenses: self.has_license = True if var in self.all_license_files: self.has_license_files = True def set_hash_info(self): """ Fills in the .has_hash field """ hashpath = self.path.replace(".mk", ".hash") self.has_hash = os.path.exists(hashpath) def set_patch_count(self): """ Fills in the .patch_count field """ self.patch_count = 0 pkgdir = os.path.dirname(self.path) for subdir, _, _ in os.walk(pkgdir): self.patch_count += len(fnmatch.filter(os.listdir(subdir), '*.patch')) def set_current_version(self): """ Fills in the .current_version field """ var = self.pkgvar() if var in self.all_versions: self.current_version = self.all_versions[var] def set_check_package_warnings(self): """ Fills in the .warnings field """ cmd = ["./utils/check-package"] pkgdir = os.path.dirname(self.path) for root, dirs, files in os.walk(pkgdir): for f in files: if f.endswith(".mk") or f.endswith(".hash") or f == "Config.in" or f == "Config.in.host": cmd.append(os.path.join(root, f)) o = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()[1] lines = o.splitlines() for line in lines: m = re.match("^([0-9]*) warnings generated", line.decode()) if m: self.warnings = int(m.group(1)) return def is_cve_ignored(self, cve): """ Tells if the CVE is ignored by the package """ return cve in self.all_ignored_cves.get(self.pkgvar(), []) def __eq__(self, other): return self.path == other.path def __lt__(self, other): return self.path < other.path def __str__(self): return "%s (path='%s', license='%s', license_files='%s', hash='%s', patches=%d)" % \ (self.name, self.path, self.has_license, self.has_license_files, self.has_hash, self.patch_count) class CVE: """An accessor class for CVE Items in NVD files""" def __init__(self, nvd_cve): """Initialize a CVE from its NVD JSON representation""" self.nvd_cve = nvd_cve @staticmethod def download_nvd_year(nvd_path, year): metaf = "nvdcve-%s-%s.meta" % (NVD_JSON_VERSION, year) path_metaf = os.path.join(nvd_path, metaf) jsonf_gz = "nvdcve-%s-%s.json.gz" % (NVD_JSON_VERSION, year) path_jsonf_gz = os.path.join(nvd_path, jsonf_gz) # If the database file is less than a day old, we assume the NVD data # locally available is recent enough. if os.path.exists(path_jsonf_gz) and os.stat(path_jsonf_gz).st_mtime >= time.time() - 86400: return path_jsonf_gz # If not, we download the meta file url = "%s/%s" % (NVD_BASE_URL, metaf) print("Getting %s" % url) page_meta = requests.get(url) page_meta.raise_for_status() # If the meta file already existed, we compare the existing # one with the data newly downloaded. If they are different, # we need to re-download the database. # If the database does not exist locally, we need to redownload it in # any case. if os.path.exists(path_metaf) and os.path.exists(path_jsonf_gz): meta_known = open(path_metaf, "r").read() if page_meta.text == meta_known: return path_jsonf_gz # Grab the compressed JSON NVD, and write files to disk url = "%s/%s" % (NVD_BASE_URL, jsonf_gz) print("Getting %s" % url) page_json = requests.get(url) page_json.raise_for_status() open(path_jsonf_gz, "wb").write(page_json.content) open(path_metaf, "w").write(page_meta.text) return path_jsonf_gz @classmethod def read_nvd_dir(cls, nvd_dir): """ Iterate over all the CVEs contained in NIST Vulnerability Database feeds since NVD_START_YEAR. If the files are missing or outdated in nvd_dir, a fresh copy will be downloaded, and kept in .json.gz """ for year in range(NVD_START_YEAR, datetime.datetime.now().year + 1): filename = CVE.download_nvd_year(nvd_dir, year) try: content = ijson.items(gzip.GzipFile(filename), 'CVE_Items.item') except: print("ERROR: cannot read %s. Please remove the file then rerun this script" % filename) raise for cve in content: yield cls(cve['cve']) def each_product(self): """Iterate over each product section of this cve""" for vendor in self.nvd_cve['affects']['vendor']['vendor_data']: for product in vendor['product']['product_data']: yield product @property def identifier(self): """The CVE unique identifier""" return self.nvd_cve['CVE_data_meta']['ID'] @property def pkg_names(self): """The set of package names referred by this CVE definition""" return set(p['product_name'] for p in self.each_product()) def affects(self, br_pkg): """ True if the Buildroot Package object passed as argument is affected by this CVE. """ if br_pkg.is_cve_ignored(self.identifier): return False for product in self.each_product(): if product['product_name'] != br_pkg.name: continue for v in product['version']['version_data']: if v["version_affected"] == "=": if br_pkg.current_version == v["version_value"]: return True elif v["version_affected"] == "<=": pkg_version = distutils.version.LooseVersion(br_pkg.current_version) if not hasattr(pkg_version, "version"): print("Cannot parse package '%s' version '%s'" % (br_pkg.name, br_pkg.current_version)) continue cve_affected_version = distutils.version.LooseVersion(v["version_value"]) if not hasattr(cve_affected_version, "version"): print("Cannot parse CVE affected version '%s'" % v["version_value"]) continue return pkg_version <= cve_affected_version else: print("version_affected: %s" % v['version_affected']) return False def get_pkglist(npackages, package_list): """ Builds the list of Buildroot packages, returning a list of Package objects. Only the .name and .path fields of the Package object are initialized. npackages: limit to N packages package_list: limit to those packages in this list """ WALK_USEFUL_SUBDIRS = ["boot", "linux", "package", "toolchain"] WALK_EXCLUDES = ["boot/common.mk", "linux/linux-ext-.*.mk", "package/freescale-imx/freescale-imx.mk", "package/gcc/gcc.mk", "package/gstreamer/gstreamer.mk", "package/gstreamer1/gstreamer1.mk", "package/gtk2-themes/gtk2-themes.mk", "package/matchbox/matchbox.mk", "package/opengl/opengl.mk", "package/qt5/qt5.mk", "package/x11r7/x11r7.mk", "package/doc-asciidoc.mk", "package/pkg-.*.mk", "package/nvidia-tegra23/nvidia-tegra23.mk", "toolchain/toolchain-external/pkg-toolchain-external.mk", "toolchain/toolchain-external/toolchain-external.mk", "toolchain/toolchain.mk", "toolchain/helpers.mk", "toolchain/toolchain-wrapper.mk"] packages = list() count = 0 for root, dirs, files in os.walk("."): rootdir = root.split("/") if len(rootdir) < 2: continue if rootdir[1] not in WALK_USEFUL_SUBDIRS: continue for f in files: if not f.endswith(".mk"): continue # Strip ending ".mk" pkgname = f[:-3] if package_list and pkgname not in package_list: continue pkgpath = os.path.join(root, f) skip = False for exclude in WALK_EXCLUDES: # pkgpath[2:] strips the initial './' if re.match(exclude, pkgpath[2:]): skip = True continue if skip: continue p = Package(pkgname, pkgpath) packages.append(p) count += 1 if npackages and count == npackages: return packages return packages def package_init_make_info(): # Fetch all variables at once variables = subprocess.check_output(["make", "BR2_HAVE_DOT_CONFIG=y", "-s", "printvars", "VARS=%_LICENSE %_LICENSE_FILES %_VERSION %_IGNORE_CVES"]) variable_list = variables.decode().splitlines() # We process first the host package VERSION, and then the target # package VERSION. This means that if a package exists in both # target and host variants, with different values (eg. version # numbers (unlikely)), we'll report the target one. variable_list = [x[5:] for x in variable_list if x.startswith("HOST_")] + \ [x for x in variable_list if not x.startswith("HOST_")] for l in variable_list: # Get variable name and value pkgvar, value = l.split("=") # Strip the suffix according to the variable if pkgvar.endswith("_LICENSE"): # If value is "unknown", no license details available if value == "unknown": continue pkgvar = pkgvar[:-8] Package.all_licenses.append(pkgvar) elif pkgvar.endswith("_LICENSE_FILES"): if pkgvar.endswith("_MANIFEST_LICENSE_FILES"): continue pkgvar = pkgvar[:-14] Package.all_license_files.append(pkgvar) elif pkgvar.endswith("_VERSION"): if pkgvar.endswith("_DL_VERSION"): continue pkgvar = pkgvar[:-8] Package.all_versions[pkgvar] = value elif pkgvar.endswith("_IGNORE_CVES"): pkgvar = pkgvar[:-12] Package.all_ignored_cves[pkgvar] = value.split() def check_url_status_worker(url, url_status): if url_status != "Missing" and url_status != "No Config.in": try: url_status_code = requests.head(url, timeout=30).status_code if url_status_code >= 400: return "Invalid(%s)" % str(url_status_code) except requests.exceptions.RequestException: return "Invalid(Err)" return "Ok" return url_status def check_package_urls(packages): pool = Pool(processes=64) for pkg in packages: pkg.url_worker = pool.apply_async(check_url_status_worker, (pkg.url, pkg.url_status)) for pkg in packages: pkg.url_status = pkg.url_worker.get(timeout=3600) del pkg.url_worker pool.terminate() def release_monitoring_get_latest_version_by_distro(pool, name): try: req = pool.request('GET', "/api/project/Buildroot/%s" % name) except HTTPError: return (RM_API_STATUS_ERROR, None, None) if req.status != 200: return (RM_API_STATUS_NOT_FOUND, None, None) data = json.loads(req.data) if 'version' in data: return (RM_API_STATUS_FOUND_BY_DISTRO, data['version'], data['id']) else: return (RM_API_STATUS_FOUND_BY_DISTRO, None, data['id']) def release_monitoring_get_latest_version_by_guess(pool, name): try: req = pool.request('GET', "/api/projects/?pattern=%s" % name) except HTTPError: return (RM_API_STATUS_ERROR, None, None) if req.status != 200: return (RM_API_STATUS_NOT_FOUND, None, None) data = json.loads(req.data) projects = data['projects'] projects.sort(key=lambda x: x['id']) for p in projects: if p['name'] == name and 'version' in p: return (RM_API_STATUS_FOUND_BY_PATTERN, p['version'], p['id']) return (RM_API_STATUS_NOT_FOUND, None, None) def check_package_latest_version_worker(name): """Wrapper to try both by name then by guess""" print(name) res = release_monitoring_get_latest_version_by_distro(http_pool, name) if res[0] == RM_API_STATUS_NOT_FOUND: res = release_monitoring_get_latest_version_by_guess(http_pool, name) return res def check_package_latest_version(packages): """ Fills in the .latest_version field of all Package objects This field has a special format: (status, version, id) with: - status: one of RM_API_STATUS_ERROR, RM_API_STATUS_FOUND_BY_DISTRO, RM_API_STATUS_FOUND_BY_PATTERN, RM_API_STATUS_NOT_FOUND - version: string containing the latest version known by release-monitoring.org for this package - id: string containing the id of the project corresponding to this package, as known by release-monitoring.org """ global http_pool http_pool = HTTPSConnectionPool('release-monitoring.org', port=443, cert_reqs='CERT_REQUIRED', ca_certs=certifi.where(), timeout=30) worker_pool = Pool(processes=64) results = worker_pool.map(check_package_latest_version_worker, (pkg.name for pkg in packages)) for pkg, r in zip(packages, results): pkg.latest_version = r worker_pool.terminate() del http_pool def check_package_cves(nvd_path, packages): if not os.path.isdir(nvd_path): os.makedirs(nvd_path) for cve in CVE.read_nvd_dir(nvd_path): for pkg_name in cve.pkg_names: if pkg_name in packages and cve.affects(packages[pkg_name]): packages[pkg_name].cves.append(cve.identifier) def calculate_stats(packages): stats = defaultdict(int) for pkg in packages: # If packages have multiple infra, take the first one. For the # vast majority of packages, the target and host infra are the # same. There are very few packages that use a different infra # for the host and target variants. if len(pkg.infras) > 0: infra = pkg.infras[0][1] stats["infra-%s" % infra] += 1 else: stats["infra-unknown"] += 1 if pkg.has_license: stats["license"] += 1 else: stats["no-license"] += 1 if pkg.has_license_files: stats["license-files"] += 1 else: stats["no-license-files"] += 1 if pkg.has_hash: stats["hash"] += 1 else: stats["no-hash"] += 1 if pkg.latest_version[0] == RM_API_STATUS_FOUND_BY_DISTRO: stats["rmo-mapping"] += 1 else: stats["rmo-no-mapping"] += 1 if not pkg.latest_version[1]: stats["version-unknown"] += 1 elif pkg.latest_version[1] == pkg.current_version: stats["version-uptodate"] += 1 else: stats["version-not-uptodate"] += 1 stats["patches"] += pkg.patch_count stats["total-cves"] += len(pkg.cves) if len(pkg.cves) != 0: stats["pkg-cves"] += 1 return stats html_header = """ Statistics of Buildroot packages Results

""" html_footer = """ """ def infra_str(infra_list): if not infra_list: return "Unknown" elif len(infra_list) == 1: return "%s
%s" % (infra_list[0][1], infra_list[0][0]) elif infra_list[0][1] == infra_list[1][1]: return "%s
%s + %s" % \ (infra_list[0][1], infra_list[0][0], infra_list[1][0]) else: return "%s (%s)
%s (%s)" % \ (infra_list[0][1], infra_list[0][0], infra_list[1][1], infra_list[1][0]) def boolean_str(b): if b: return "Yes" else: return "No" def dump_html_pkg(f, pkg): f.write(" \n") f.write(" %s\n" % pkg.path[2:]) # Patch count td_class = ["centered"] if pkg.patch_count == 0: td_class.append("nopatches") elif pkg.patch_count < 5: td_class.append("somepatches") else: td_class.append("lotsofpatches") f.write(" %s\n" % (" ".join(td_class), str(pkg.patch_count))) # Infrastructure infra = infra_str(pkg.infras) td_class = ["centered"] if infra == "Unknown": td_class.append("wrong") else: td_class.append("correct") f.write(" %s\n" % (" ".join(td_class), infra_str(pkg.infras))) # License td_class = ["centered"] if pkg.has_license: td_class.append("correct") else: td_class.append("wrong") f.write(" %s\n" % (" ".join(td_class), boolean_str(pkg.has_license))) # License files td_class = ["centered"] if pkg.has_license_files: td_class.append("correct") else: td_class.append("wrong") f.write(" %s\n" % (" ".join(td_class), boolean_str(pkg.has_license_files))) # Hash td_class = ["centered"] if pkg.has_hash: td_class.append("correct") else: td_class.append("wrong") f.write(" %s\n" % (" ".join(td_class), boolean_str(pkg.has_hash))) # Current version if len(pkg.current_version) > 20: current_version = pkg.current_version[:20] + "..." else: current_version = pkg.current_version f.write(" %s\n" % current_version) # Latest version if pkg.latest_version[0] == RM_API_STATUS_ERROR: td_class.append("version-error") if pkg.latest_version[1] is None: td_class.append("version-unknown") elif pkg.latest_version[1] != pkg.current_version: td_class.append("version-needs-update") else: td_class.append("version-good") if pkg.latest_version[0] == RM_API_STATUS_ERROR: latest_version_text = "Error" elif pkg.latest_version[0] == RM_API_STATUS_NOT_FOUND: latest_version_text = "Not found" else: if pkg.latest_version[1] is None: latest_version_text = "Found, but no version" else: latest_version_text = "%s" % \ (pkg.latest_version[2], str(pkg.latest_version[1])) latest_version_text += "
" if pkg.latest_version[0] == RM_API_STATUS_FOUND_BY_DISTRO: latest_version_text += "found by distro" else: latest_version_text += "found by guess" f.write(" %s\n" % (" ".join(td_class), latest_version_text)) # Warnings td_class = ["centered"] if pkg.warnings == 0: td_class.append("correct") else: td_class.append("wrong") f.write(" %d\n" % (" ".join(td_class), pkg.warnings)) # URL status td_class = ["centered"] url_str = pkg.url_status if pkg.url_status == "Missing" or pkg.url_status == "No Config.in": td_class.append("missing_url") elif pkg.url_status.startswith("Invalid"): td_class.append("invalid_url") url_str = "%s" % (pkg.url, pkg.url_status) else: td_class.append("good_url") url_str = "Link" % pkg.url f.write(" %s\n" % (" ".join(td_class), url_str)) # CVEs td_class = ["centered"] if len(pkg.cves) == 0: td_class.append("correct") else: td_class.append("wrong") f.write(" \n" % " ".join(td_class)) for cve in pkg.cves: f.write(" %s
\n" % (cve, cve)) f.write(" \n") f.write(" \n") def dump_html_all_pkgs(f, packages): f.write(""" """) for pkg in sorted(packages): dump_html_pkg(f, pkg) f.write("
Package Patch count Infrastructure License License files Hash file Current version Latest version Warnings Upstream URL CVEs
") def dump_html_stats(f, stats): f.write("
\n") f.write("\n") infras = [infra[6:] for infra in stats.keys() if infra.startswith("infra-")] for infra in infras: f.write(" \n" % (infra, stats["infra-%s" % infra])) f.write(" \n" % stats["license"]) f.write(" \n" % stats["no-license"]) f.write(" \n" % stats["license-files"]) f.write(" \n" % stats["no-license-files"]) f.write(" \n" % stats["hash"]) f.write(" \n" % stats["no-hash"]) f.write(" \n" % stats["patches"]) f.write("\n" % stats["rmo-mapping"]) f.write("\n" % stats["rmo-no-mapping"]) f.write("\n" % stats["version-uptodate"]) f.write("\n" % stats["version-not-uptodate"]) f.write("\n" % stats["version-unknown"]) f.write("\n" % stats["pkg-cves"]) f.write("\n" % stats["total-cves"]) f.write("
Packages using the %s infrastructure%s
Packages having license information%s
Packages not having license information%s
Packages having license files information%s
Packages not having license files information%s
Packages having a hash file%s
Packages not having a hash file%s
Total number of patches%s
Packages having a mapping on release-monitoring.org%s
Packages lacking a mapping on release-monitoring.org%s
Packages that are up-to-date%s
Packages that are not up-to-date%s
Packages with no known upstream version%s
Packages affected by CVEs%s
Total number of CVEs affecting all packages%s
\n") def dump_html_gen_info(f, date, commit): # Updated on Mon Feb 19 08:12:08 CET 2018, Git commit aa77030b8f5e41f1c53eb1c1ad664b8c814ba032 f.write("

Updated on %s, git commit %s

\n" % (str(date), commit)) def dump_html(packages, stats, date, commit, output): with open(output, 'w') as f: f.write(html_header) dump_html_all_pkgs(f, packages) dump_html_stats(f, stats) dump_html_gen_info(f, date, commit) f.write(html_footer) def dump_json(packages, stats, date, commit, output): # Format packages as a dictionnary instead of a list # Exclude local field that does not contains real date excluded_fields = ['url_worker', 'name'] pkgs = { pkg.name: { k: v for k, v in pkg.__dict__.items() if k not in excluded_fields } for pkg in packages } # Aggregate infrastructures into a single dict entry statistics = { k: v for k, v in stats.items() if not k.startswith('infra-') } statistics['infra'] = {k[6:]: v for k, v in stats.items() if k.startswith('infra-')} # The actual structure to dump, add commit and date to it final = {'packages': pkgs, 'stats': statistics, 'commit': commit, 'date': str(date)} with open(output, 'w') as f: json.dump(final, f, indent=2, separators=(',', ': ')) f.write('\n') def parse_args(): parser = argparse.ArgumentParser() output = parser.add_argument_group('output', 'Output file(s)') output.add_argument('--html', dest='html', action='store', help='HTML output file') output.add_argument('--json', dest='json', action='store', help='JSON output file') packages = parser.add_mutually_exclusive_group() packages.add_argument('-n', dest='npackages', type=int, action='store', help='Number of packages') packages.add_argument('-p', dest='packages', action='store', help='List of packages (comma separated)') parser.add_argument('--nvd-path', dest='nvd_path', help='Path to the local NVD database') args = parser.parse_args() if not args.html and not args.json: parser.error('at least one of --html or --json (or both) is required') return args def __main__(): args = parse_args() if args.packages: package_list = args.packages.split(",") else: package_list = None date = datetime.datetime.utcnow() commit = subprocess.check_output(['git', 'rev-parse', 'HEAD']).splitlines()[0].decode() print("Build package list ...") packages = get_pkglist(args.npackages, package_list) print("Getting package make info ...") package_init_make_info() print("Getting package details ...") for pkg in packages: pkg.set_infra() pkg.set_license() pkg.set_hash_info() pkg.set_patch_count() pkg.set_check_package_warnings() pkg.set_current_version() pkg.set_url() print("Checking URL status") check_package_urls(packages) print("Getting latest versions ...") check_package_latest_version(packages) if args.nvd_path: print("Checking packages CVEs") check_package_cves(args.nvd_path, {p.name: p for p in packages}) print("Calculate stats") stats = calculate_stats(packages) if args.html: print("Write HTML") dump_html(packages, stats, date, commit, args.html) if args.json: print("Write JSON") dump_json(packages, stats, date, commit, args.json) __main__()