kumquat-buildroot/support/scripts/pkg-stats
Thomas Petazzoni 4f0868fa64 support/scripts/pkg-stats: show progress of upstream URL and latest version
This commit slightly improves the output of pkg-stats by showing the
progress of the upstream URL checks and latest version retrieval, on a
package basis:

Checking URL status
[0001/0062] curlpp
[0002/0062] cmocka
[0003/0062] snappy
[0004/0062] nload
[...]
[0060/0062] librtas
[0061/0062] libsilk
[0062/0062] jhead
Getting latest versions ...
[0001/0064] libglob
[0002/0064] perl-http-daemon
[0003/0064] shadowsocks-libev
[...]
[0061/0064] lua-flu
[0062/0064] python-aiohttp-security
[0063/0064] ljlinenoise
[0064/0064] matchbox-lib

Note that the above sample was run on 64 packages. Only 62 packages
appear for the URL status check, because packages that do not have any
URL in their Config.in file, or don't have any Config.in file at all,
are not checked and therefore not accounted.

Signed-off-by: Thomas Petazzoni <thomas.petazzoni@bootlin.com>
2020-08-11 22:30:52 +02:00

1114 lines
38 KiB
Python
Executable File

#!/usr/bin/env python3
# Copyright (C) 2009 by Thomas Petazzoni <thomas.petazzoni@free-electrons.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
import aiohttp
import argparse
import asyncio
import datetime
import fnmatch
import os
from collections import defaultdict
import re
import subprocess
import requests # NVD database download
import json
import ijson
import distutils.version
import time
import gzip
import sys
sys.path.append('utils/')
from getdeveloperlib import parse_developers # noqa: E402
NVD_START_YEAR = 2002
NVD_JSON_VERSION = "1.0"
NVD_BASE_URL = "https://nvd.nist.gov/feeds/json/cve/" + NVD_JSON_VERSION
INFRA_RE = re.compile(r"\$\(eval \$\(([a-z-]*)-package\)\)")
URL_RE = re.compile(r"\s*https?://\S*\s*$")
RM_API_STATUS_ERROR = 1
RM_API_STATUS_FOUND_BY_DISTRO = 2
RM_API_STATUS_FOUND_BY_PATTERN = 3
RM_API_STATUS_NOT_FOUND = 4
CVE_AFFECTS = 1
CVE_DOESNT_AFFECT = 2
CVE_UNKNOWN = 3
class Defconfig:
def __init__(self, name, path):
self.name = name
self.path = path
self.developers = None
def set_developers(self, developers):
"""
Fills in the .developers field
"""
self.developers = [
developer.name
for developer in developers
if developer.hasfile(self.path)
]
def get_defconfig_list():
"""
Builds the list of Buildroot defconfigs, returning a list of Defconfig
objects.
"""
return [
Defconfig(name[:-len('_defconfig')], os.path.join('configs', name))
for name in os.listdir('configs')
if name.endswith('_defconfig')
]
class Package:
all_licenses = dict()
all_license_files = list()
all_versions = dict()
all_ignored_cves = dict()
# This is the list of all possible checks. Add new checks to this list so
# a tool that post-processeds the json output knows the checks before
# iterating over the packages.
status_checks = ['cve', 'developers', 'hash', 'license',
'license-files', 'patches', 'pkg-check', 'url', 'version']
def __init__(self, name, path):
self.name = name
self.path = path
self.pkg_path = os.path.dirname(path)
self.infras = None
self.license = None
self.has_license = False
self.has_license_files = False
self.has_hash = False
self.patch_files = []
self.warnings = 0
self.current_version = None
self.url = None
self.url_worker = None
self.cves = list()
self.latest_version = {'status': RM_API_STATUS_ERROR, 'version': None, 'id': None}
self.status = {}
def pkgvar(self):
return self.name.upper().replace("-", "_")
def set_url(self):
"""
Fills in the .url field
"""
self.status['url'] = ("warning", "no Config.in")
for filename in os.listdir(os.path.dirname(self.path)):
if fnmatch.fnmatch(filename, 'Config.*'):
fp = open(os.path.join(os.path.dirname(self.path), filename), "r")
for config_line in fp:
if URL_RE.match(config_line):
self.url = config_line.strip()
self.status['url'] = ("ok", "found")
fp.close()
return
self.status['url'] = ("error", "missing")
fp.close()
@property
def patch_count(self):
return len(self.patch_files)
@property
def has_valid_infra(self):
try:
if self.infras[0][1] == 'virtual':
return False
except IndexError:
return False
return True
def set_infra(self):
"""
Fills in the .infras field
"""
self.infras = list()
with open(self.path, 'r') as f:
lines = f.readlines()
for l in lines:
match = INFRA_RE.match(l)
if not match:
continue
infra = match.group(1)
if infra.startswith("host-"):
self.infras.append(("host", infra[5:]))
else:
self.infras.append(("target", infra))
def set_license(self):
"""
Fills in the .status['license'] and .status['license-files'] fields
"""
if not self.has_valid_infra:
self.status['license'] = ("na", "no valid package infra")
self.status['license-files'] = ("na", "no valid package infra")
return
var = self.pkgvar()
self.status['license'] = ("error", "missing")
self.status['license-files'] = ("error", "missing")
if var in self.all_licenses:
self.license = self.all_licenses[var]
self.status['license'] = ("ok", "found")
if var in self.all_license_files:
self.status['license-files'] = ("ok", "found")
def set_hash_info(self):
"""
Fills in the .status['hash'] field
"""
if not self.has_valid_infra:
self.status['hash'] = ("na", "no valid package infra")
self.status['hash-license'] = ("na", "no valid package infra")
return
hashpath = self.path.replace(".mk", ".hash")
if os.path.exists(hashpath):
self.status['hash'] = ("ok", "found")
else:
self.status['hash'] = ("error", "missing")
def set_patch_count(self):
"""
Fills in the .patch_count, .patch_files and .status['patches'] fields
"""
if not self.has_valid_infra:
self.status['patches'] = ("na", "no valid package infra")
return
pkgdir = os.path.dirname(self.path)
for subdir, _, _ in os.walk(pkgdir):
self.patch_files = fnmatch.filter(os.listdir(subdir), '*.patch')
if self.patch_count == 0:
self.status['patches'] = ("ok", "no patches")
elif self.patch_count < 5:
self.status['patches'] = ("warning", "some patches")
else:
self.status['patches'] = ("error", "lots of patches")
def set_current_version(self):
"""
Fills in the .current_version field
"""
var = self.pkgvar()
if var in self.all_versions:
self.current_version = self.all_versions[var]
def set_check_package_warnings(self):
"""
Fills in the .warnings and .status['pkg-check'] fields
"""
cmd = ["./utils/check-package"]
pkgdir = os.path.dirname(self.path)
self.status['pkg-check'] = ("error", "Missing")
for root, dirs, files in os.walk(pkgdir):
for f in files:
if f.endswith(".mk") or f.endswith(".hash") or f == "Config.in" or f == "Config.in.host":
cmd.append(os.path.join(root, f))
o = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()[1]
lines = o.splitlines()
for line in lines:
m = re.match("^([0-9]*) warnings generated", line.decode())
if m:
self.warnings = int(m.group(1))
if self.warnings == 0:
self.status['pkg-check'] = ("ok", "no warnings")
else:
self.status['pkg-check'] = ("error", "{} warnings".format(self.warnings))
return
def is_cve_ignored(self, cve):
"""
Tells if the CVE is ignored by the package
"""
return cve in self.all_ignored_cves.get(self.pkgvar(), [])
def set_developers(self, developers):
"""
Fills in the .developers and .status['developers'] field
"""
self.developers = [
dev.name
for dev in developers
if dev.hasfile(self.path)
]
if self.developers:
self.status['developers'] = ("ok", "{} developers".format(len(self.developers)))
else:
self.status['developers'] = ("warning", "no developers")
def is_status_ok(self, name):
return self.status[name][0] == 'ok'
def __eq__(self, other):
return self.path == other.path
def __lt__(self, other):
return self.path < other.path
def __str__(self):
return "%s (path='%s', license='%s', license_files='%s', hash='%s', patches=%d)" % \
(self.name, self.path, self.is_status_ok('license'),
self.is_status_ok('license-files'), self.status['hash'], self.patch_count)
class CVE:
"""An accessor class for CVE Items in NVD files"""
def __init__(self, nvd_cve):
"""Initialize a CVE from its NVD JSON representation"""
self.nvd_cve = nvd_cve
@staticmethod
def download_nvd_year(nvd_path, year):
metaf = "nvdcve-%s-%s.meta" % (NVD_JSON_VERSION, year)
path_metaf = os.path.join(nvd_path, metaf)
jsonf_gz = "nvdcve-%s-%s.json.gz" % (NVD_JSON_VERSION, year)
path_jsonf_gz = os.path.join(nvd_path, jsonf_gz)
# If the database file is less than a day old, we assume the NVD data
# locally available is recent enough.
if os.path.exists(path_jsonf_gz) and os.stat(path_jsonf_gz).st_mtime >= time.time() - 86400:
return path_jsonf_gz
# If not, we download the meta file
url = "%s/%s" % (NVD_BASE_URL, metaf)
print("Getting %s" % url)
page_meta = requests.get(url)
page_meta.raise_for_status()
# If the meta file already existed, we compare the existing
# one with the data newly downloaded. If they are different,
# we need to re-download the database.
# If the database does not exist locally, we need to redownload it in
# any case.
if os.path.exists(path_metaf) and os.path.exists(path_jsonf_gz):
meta_known = open(path_metaf, "r").read()
if page_meta.text == meta_known:
return path_jsonf_gz
# Grab the compressed JSON NVD, and write files to disk
url = "%s/%s" % (NVD_BASE_URL, jsonf_gz)
print("Getting %s" % url)
page_json = requests.get(url)
page_json.raise_for_status()
open(path_jsonf_gz, "wb").write(page_json.content)
open(path_metaf, "w").write(page_meta.text)
return path_jsonf_gz
@classmethod
def read_nvd_dir(cls, nvd_dir):
"""
Iterate over all the CVEs contained in NIST Vulnerability Database
feeds since NVD_START_YEAR. If the files are missing or outdated in
nvd_dir, a fresh copy will be downloaded, and kept in .json.gz
"""
for year in range(NVD_START_YEAR, datetime.datetime.now().year + 1):
filename = CVE.download_nvd_year(nvd_dir, year)
try:
content = ijson.items(gzip.GzipFile(filename), 'CVE_Items.item')
except: # noqa: E722
print("ERROR: cannot read %s. Please remove the file then rerun this script" % filename)
raise
for cve in content:
yield cls(cve['cve'])
def each_product(self):
"""Iterate over each product section of this cve"""
for vendor in self.nvd_cve['affects']['vendor']['vendor_data']:
for product in vendor['product']['product_data']:
yield product
@property
def identifier(self):
"""The CVE unique identifier"""
return self.nvd_cve['CVE_data_meta']['ID']
@property
def pkg_names(self):
"""The set of package names referred by this CVE definition"""
return set(p['product_name'] for p in self.each_product())
def affects(self, br_pkg):
"""
True if the Buildroot Package object passed as argument is affected
by this CVE.
"""
if br_pkg.is_cve_ignored(self.identifier):
return CVE_DOESNT_AFFECT
for product in self.each_product():
if product['product_name'] != br_pkg.name:
continue
for v in product['version']['version_data']:
if v["version_affected"] == "=":
if br_pkg.current_version == v["version_value"]:
return CVE_AFFECTS
elif v["version_affected"] == "<=":
pkg_version = distutils.version.LooseVersion(br_pkg.current_version)
if not hasattr(pkg_version, "version"):
print("Cannot parse package '%s' version '%s'" % (br_pkg.name, br_pkg.current_version))
continue
cve_affected_version = distutils.version.LooseVersion(v["version_value"])
if not hasattr(cve_affected_version, "version"):
print("Cannot parse CVE affected version '%s'" % v["version_value"])
continue
try:
affected = pkg_version <= cve_affected_version
break
except TypeError:
return CVE_UNKNOWN
if affected:
return CVE_AFFECTS
else:
return CVE_DOESNT_AFFECT
else:
print("version_affected: %s" % v['version_affected'])
return CVE_DOESNT_AFFECT
def get_pkglist(npackages, package_list):
"""
Builds the list of Buildroot packages, returning a list of Package
objects. Only the .name and .path fields of the Package object are
initialized.
npackages: limit to N packages
package_list: limit to those packages in this list
"""
WALK_USEFUL_SUBDIRS = ["boot", "linux", "package", "toolchain"]
WALK_EXCLUDES = ["boot/common.mk",
"linux/linux-ext-.*.mk",
"package/freescale-imx/freescale-imx.mk",
"package/gcc/gcc.mk",
"package/gstreamer/gstreamer.mk",
"package/gstreamer1/gstreamer1.mk",
"package/gtk2-themes/gtk2-themes.mk",
"package/matchbox/matchbox.mk",
"package/opengl/opengl.mk",
"package/qt5/qt5.mk",
"package/x11r7/x11r7.mk",
"package/doc-asciidoc.mk",
"package/pkg-.*.mk",
"package/nvidia-tegra23/nvidia-tegra23.mk",
"toolchain/toolchain-external/pkg-toolchain-external.mk",
"toolchain/toolchain-external/toolchain-external.mk",
"toolchain/toolchain.mk",
"toolchain/helpers.mk",
"toolchain/toolchain-wrapper.mk"]
packages = list()
count = 0
for root, dirs, files in os.walk("."):
rootdir = root.split("/")
if len(rootdir) < 2:
continue
if rootdir[1] not in WALK_USEFUL_SUBDIRS:
continue
for f in files:
if not f.endswith(".mk"):
continue
# Strip ending ".mk"
pkgname = f[:-3]
if package_list and pkgname not in package_list:
continue
pkgpath = os.path.join(root, f)
skip = False
for exclude in WALK_EXCLUDES:
# pkgpath[2:] strips the initial './'
if re.match(exclude, pkgpath[2:]):
skip = True
continue
if skip:
continue
p = Package(pkgname, pkgpath)
packages.append(p)
count += 1
if npackages and count == npackages:
return packages
return packages
def package_init_make_info():
# Fetch all variables at once
variables = subprocess.check_output(["make", "BR2_HAVE_DOT_CONFIG=y", "-s", "printvars",
"VARS=%_LICENSE %_LICENSE_FILES %_VERSION %_IGNORE_CVES"])
variable_list = variables.decode().splitlines()
# We process first the host package VERSION, and then the target
# package VERSION. This means that if a package exists in both
# target and host variants, with different values (eg. version
# numbers (unlikely)), we'll report the target one.
variable_list = [x[5:] for x in variable_list if x.startswith("HOST_")] + \
[x for x in variable_list if not x.startswith("HOST_")]
for l in variable_list:
# Get variable name and value
pkgvar, value = l.split("=")
# Strip the suffix according to the variable
if pkgvar.endswith("_LICENSE"):
# If value is "unknown", no license details available
if value == "unknown":
continue
pkgvar = pkgvar[:-8]
Package.all_licenses[pkgvar] = value
elif pkgvar.endswith("_LICENSE_FILES"):
if pkgvar.endswith("_MANIFEST_LICENSE_FILES"):
continue
pkgvar = pkgvar[:-14]
Package.all_license_files.append(pkgvar)
elif pkgvar.endswith("_VERSION"):
if pkgvar.endswith("_DL_VERSION"):
continue
pkgvar = pkgvar[:-8]
Package.all_versions[pkgvar] = value
elif pkgvar.endswith("_IGNORE_CVES"):
pkgvar = pkgvar[:-12]
Package.all_ignored_cves[pkgvar] = value.split()
check_url_count = 0
async def check_url_status(session, pkg, npkgs, retry=True):
global check_url_count
try:
async with session.get(pkg.url) as resp:
if resp.status >= 400:
pkg.status['url'] = ("error", "invalid {}".format(resp.status))
check_url_count += 1
print("[%04d/%04d] %s" % (check_url_count, npkgs, pkg.name))
return
except (aiohttp.ClientError, asyncio.TimeoutError):
if retry:
return await check_url_status(session, pkg, npkgs, retry=False)
else:
pkg.status['url'] = ("error", "invalid (err)")
check_url_count += 1
print("[%04d/%04d] %s" % (check_url_count, npkgs, pkg.name))
return
pkg.status['url'] = ("ok", "valid")
check_url_count += 1
print("[%04d/%04d] %s" % (check_url_count, npkgs, pkg.name))
async def check_package_urls(packages):
tasks = []
connector = aiohttp.TCPConnector(limit_per_host=5)
async with aiohttp.ClientSession(connector=connector, trust_env=True) as sess:
packages = [p for p in packages if p.status['url'][0] == 'ok']
for pkg in packages:
tasks.append(check_url_status(sess, pkg, len(packages)))
await asyncio.wait(tasks)
def check_package_latest_version_set_status(pkg, status, version, identifier):
pkg.latest_version = {
"status": status,
"version": version,
"id": identifier,
}
if pkg.latest_version['status'] == RM_API_STATUS_ERROR:
pkg.status['version'] = ('warning', "Release Monitoring API error")
elif pkg.latest_version['status'] == RM_API_STATUS_NOT_FOUND:
pkg.status['version'] = ('warning', "Package not found on Release Monitoring")
if pkg.latest_version['version'] is None:
pkg.status['version'] = ('warning', "No upstream version available on Release Monitoring")
elif pkg.latest_version['version'] != pkg.current_version:
pkg.status['version'] = ('error', "The newer version {} is available upstream".format(pkg.latest_version['version']))
else:
pkg.status['version'] = ('ok', 'up-to-date')
async def check_package_get_latest_version_by_distro(session, pkg, retry=True):
url = "https://release-monitoring.org//api/project/Buildroot/%s" % pkg.name
try:
async with session.get(url) as resp:
if resp.status != 200:
return False
data = await resp.json()
version = data['version'] if 'version' in data else None
check_package_latest_version_set_status(pkg,
RM_API_STATUS_FOUND_BY_DISTRO,
version,
data['id'])
return True
except (aiohttp.ClientError, asyncio.TimeoutError):
if retry:
return await check_package_get_latest_version_by_distro(session, pkg, retry=False)
else:
return False
async def check_package_get_latest_version_by_guess(session, pkg, retry=True):
url = "https://release-monitoring.org/api/projects/?pattern=%s" % pkg.name
try:
async with session.get(url) as resp:
if resp.status != 200:
return False
data = await resp.json()
# filter projects that have the right name and a version defined
projects = [p for p in data['projects'] if p['name'] == pkg.name and 'version' in p]
projects.sort(key=lambda x: x['id'])
if len(projects) > 0:
check_package_latest_version_set_status(pkg,
RM_API_STATUS_FOUND_BY_DISTRO,
projects[0]['version'],
projects[0]['id'])
return True
except (aiohttp.ClientError, asyncio.TimeoutError):
if retry:
return await check_package_get_latest_version_by_guess(session, pkg, retry=False)
else:
return False
check_latest_count = 0
async def check_package_latest_version_get(session, pkg, npkgs):
global check_latest_count
if await check_package_get_latest_version_by_distro(session, pkg):
check_latest_count += 1
print("[%04d/%04d] %s" % (check_latest_count, npkgs, pkg.name))
return
if await check_package_get_latest_version_by_guess(session, pkg):
check_latest_count += 1
print("[%04d/%04d] %s" % (check_latest_count, npkgs, pkg.name))
return
check_package_latest_version_set_status(pkg,
RM_API_STATUS_NOT_FOUND,
None, None)
check_latest_count += 1
print("[%04d/%04d] %s" % (check_latest_count, npkgs, pkg.name))
async def check_package_latest_version(packages):
"""
Fills in the .latest_version field of all Package objects
This field is a dict and has the following keys:
- status: one of RM_API_STATUS_ERROR,
RM_API_STATUS_FOUND_BY_DISTRO, RM_API_STATUS_FOUND_BY_PATTERN,
RM_API_STATUS_NOT_FOUND
- version: string containing the latest version known by
release-monitoring.org for this package
- id: string containing the id of the project corresponding to this
package, as known by release-monitoring.org
"""
for pkg in [p for p in packages if not p.has_valid_infra]:
pkg.status['version'] = ("na", "no valid package infra")
tasks = []
connector = aiohttp.TCPConnector(limit_per_host=5)
async with aiohttp.ClientSession(connector=connector, trust_env=True) as sess:
packages = [p for p in packages if p.has_valid_infra]
for pkg in packages:
tasks.append(check_package_latest_version_get(sess, pkg, len(packages)))
await asyncio.wait(tasks)
def check_package_cves(nvd_path, packages):
if not os.path.isdir(nvd_path):
os.makedirs(nvd_path)
for cve in CVE.read_nvd_dir(nvd_path):
for pkg_name in cve.pkg_names:
if pkg_name in packages and cve.affects(packages[pkg_name]) == CVE_AFFECTS:
packages[pkg_name].cves.append(cve.identifier)
def calculate_stats(packages):
stats = defaultdict(int)
stats['packages'] = len(packages)
for pkg in packages:
# If packages have multiple infra, take the first one. For the
# vast majority of packages, the target and host infra are the
# same. There are very few packages that use a different infra
# for the host and target variants.
if len(pkg.infras) > 0:
infra = pkg.infras[0][1]
stats["infra-%s" % infra] += 1
else:
stats["infra-unknown"] += 1
if pkg.is_status_ok('license'):
stats["license"] += 1
else:
stats["no-license"] += 1
if pkg.is_status_ok('license-files'):
stats["license-files"] += 1
else:
stats["no-license-files"] += 1
if pkg.is_status_ok('hash'):
stats["hash"] += 1
else:
stats["no-hash"] += 1
if pkg.latest_version['status'] == RM_API_STATUS_FOUND_BY_DISTRO:
stats["rmo-mapping"] += 1
else:
stats["rmo-no-mapping"] += 1
if not pkg.latest_version['version']:
stats["version-unknown"] += 1
elif pkg.latest_version['version'] == pkg.current_version:
stats["version-uptodate"] += 1
else:
stats["version-not-uptodate"] += 1
stats["patches"] += pkg.patch_count
stats["total-cves"] += len(pkg.cves)
if len(pkg.cves) != 0:
stats["pkg-cves"] += 1
return stats
html_header = """
<head>
<script src=\"https://www.kryogenix.org/code/browser/sorttable/sorttable.js\"></script>
<style type=\"text/css\">
table {
width: 100%;
}
td {
border: 1px solid black;
}
td.centered {
text-align: center;
}
td.wrong {
background: #ff9a69;
}
td.correct {
background: #d2ffc4;
}
td.nopatches {
background: #d2ffc4;
}
td.somepatches {
background: #ffd870;
}
td.lotsofpatches {
background: #ff9a69;
}
td.good_url {
background: #d2ffc4;
}
td.missing_url {
background: #ffd870;
}
td.invalid_url {
background: #ff9a69;
}
td.version-good {
background: #d2ffc4;
}
td.version-needs-update {
background: #ff9a69;
}
td.version-unknown {
background: #ffd870;
}
td.version-error {
background: #ccc;
}
</style>
<title>Statistics of Buildroot packages</title>
</head>
<a href=\"#results\">Results</a><br/>
<p id=\"sortable_hint\"></p>
"""
html_footer = """
</body>
<script>
if (typeof sorttable === \"object\") {
document.getElementById(\"sortable_hint\").innerHTML =
\"hint: the table can be sorted by clicking the column headers\"
}
</script>
</html>
"""
def infra_str(infra_list):
if not infra_list:
return "Unknown"
elif len(infra_list) == 1:
return "<b>%s</b><br/>%s" % (infra_list[0][1], infra_list[0][0])
elif infra_list[0][1] == infra_list[1][1]:
return "<b>%s</b><br/>%s + %s" % \
(infra_list[0][1], infra_list[0][0], infra_list[1][0])
else:
return "<b>%s</b> (%s)<br/><b>%s</b> (%s)" % \
(infra_list[0][1], infra_list[0][0],
infra_list[1][1], infra_list[1][0])
def boolean_str(b):
if b:
return "Yes"
else:
return "No"
def dump_html_pkg(f, pkg):
f.write(" <tr>\n")
f.write(" <td>%s</td>\n" % pkg.path[2:])
# Patch count
td_class = ["centered"]
if pkg.patch_count == 0:
td_class.append("nopatches")
elif pkg.patch_count < 5:
td_class.append("somepatches")
else:
td_class.append("lotsofpatches")
f.write(" <td class=\"%s\">%s</td>\n" %
(" ".join(td_class), str(pkg.patch_count)))
# Infrastructure
infra = infra_str(pkg.infras)
td_class = ["centered"]
if infra == "Unknown":
td_class.append("wrong")
else:
td_class.append("correct")
f.write(" <td class=\"%s\">%s</td>\n" %
(" ".join(td_class), infra_str(pkg.infras)))
# License
td_class = ["centered"]
if pkg.is_status_ok('license'):
td_class.append("correct")
else:
td_class.append("wrong")
f.write(" <td class=\"%s\">%s</td>\n" %
(" ".join(td_class), boolean_str(pkg.is_status_ok('license'))))
# License files
td_class = ["centered"]
if pkg.is_status_ok('license-files'):
td_class.append("correct")
else:
td_class.append("wrong")
f.write(" <td class=\"%s\">%s</td>\n" %
(" ".join(td_class), boolean_str(pkg.is_status_ok('license-files'))))
# Hash
td_class = ["centered"]
if pkg.is_status_ok('hash'):
td_class.append("correct")
else:
td_class.append("wrong")
f.write(" <td class=\"%s\">%s</td>\n" %
(" ".join(td_class), boolean_str(pkg.is_status_ok('hash'))))
# Current version
if len(pkg.current_version) > 20:
current_version = pkg.current_version[:20] + "..."
else:
current_version = pkg.current_version
f.write(" <td class=\"centered\">%s</td>\n" % current_version)
# Latest version
if pkg.latest_version['status'] == RM_API_STATUS_ERROR:
td_class.append("version-error")
if pkg.latest_version['version'] is None:
td_class.append("version-unknown")
elif pkg.latest_version['version'] != pkg.current_version:
td_class.append("version-needs-update")
else:
td_class.append("version-good")
if pkg.latest_version['status'] == RM_API_STATUS_ERROR:
latest_version_text = "<b>Error</b>"
elif pkg.latest_version['status'] == RM_API_STATUS_NOT_FOUND:
latest_version_text = "<b>Not found</b>"
else:
if pkg.latest_version['version'] is None:
latest_version_text = "<b>Found, but no version</b>"
else:
latest_version_text = "<a href=\"https://release-monitoring.org/project/%s\"><b>%s</b></a>" % \
(pkg.latest_version['id'], str(pkg.latest_version['version']))
latest_version_text += "<br/>"
if pkg.latest_version['status'] == RM_API_STATUS_FOUND_BY_DISTRO:
latest_version_text += "found by <a href=\"https://release-monitoring.org/distro/Buildroot/\">distro</a>"
else:
latest_version_text += "found by guess"
f.write(" <td class=\"%s\">%s</td>\n" %
(" ".join(td_class), latest_version_text))
# Warnings
td_class = ["centered"]
if pkg.warnings == 0:
td_class.append("correct")
else:
td_class.append("wrong")
f.write(" <td class=\"%s\">%d</td>\n" %
(" ".join(td_class), pkg.warnings))
# URL status
td_class = ["centered"]
url_str = pkg.status['url'][1]
if pkg.status['url'][0] in ("error", "warning"):
td_class.append("missing_url")
if pkg.status['url'][0] == "error":
td_class.append("invalid_url")
url_str = "<a href=%s>%s</a>" % (pkg.url, pkg.status['url'][1])
else:
td_class.append("good_url")
url_str = "<a href=%s>Link</a>" % pkg.url
f.write(" <td class=\"%s\">%s</td>\n" %
(" ".join(td_class), url_str))
# CVEs
td_class = ["centered"]
if len(pkg.cves) == 0:
td_class.append("correct")
else:
td_class.append("wrong")
f.write(" <td class=\"%s\">\n" % " ".join(td_class))
for cve in pkg.cves:
f.write(" <a href=\"https://security-tracker.debian.org/tracker/%s\">%s<br/>\n" % (cve, cve))
f.write(" </td>\n")
f.write(" </tr>\n")
def dump_html_all_pkgs(f, packages):
f.write("""
<table class=\"sortable\">
<tr>
<td>Package</td>
<td class=\"centered\">Patch count</td>
<td class=\"centered\">Infrastructure</td>
<td class=\"centered\">License</td>
<td class=\"centered\">License files</td>
<td class=\"centered\">Hash file</td>
<td class=\"centered\">Current version</td>
<td class=\"centered\">Latest version</td>
<td class=\"centered\">Warnings</td>
<td class=\"centered\">Upstream URL</td>
<td class=\"centered\">CVEs</td>
</tr>
""")
for pkg in sorted(packages):
dump_html_pkg(f, pkg)
f.write("</table>")
def dump_html_stats(f, stats):
f.write("<a id=\"results\"></a>\n")
f.write("<table>\n")
infras = [infra[6:] for infra in stats.keys() if infra.startswith("infra-")]
for infra in infras:
f.write(" <tr><td>Packages using the <i>%s</i> infrastructure</td><td>%s</td></tr>\n" %
(infra, stats["infra-%s" % infra]))
f.write(" <tr><td>Packages having license information</td><td>%s</td></tr>\n" %
stats["license"])
f.write(" <tr><td>Packages not having license information</td><td>%s</td></tr>\n" %
stats["no-license"])
f.write(" <tr><td>Packages having license files information</td><td>%s</td></tr>\n" %
stats["license-files"])
f.write(" <tr><td>Packages not having license files information</td><td>%s</td></tr>\n" %
stats["no-license-files"])
f.write(" <tr><td>Packages having a hash file</td><td>%s</td></tr>\n" %
stats["hash"])
f.write(" <tr><td>Packages not having a hash file</td><td>%s</td></tr>\n" %
stats["no-hash"])
f.write(" <tr><td>Total number of patches</td><td>%s</td></tr>\n" %
stats["patches"])
f.write("<tr><td>Packages having a mapping on <i>release-monitoring.org</i></td><td>%s</td></tr>\n" %
stats["rmo-mapping"])
f.write("<tr><td>Packages lacking a mapping on <i>release-monitoring.org</i></td><td>%s</td></tr>\n" %
stats["rmo-no-mapping"])
f.write("<tr><td>Packages that are up-to-date</td><td>%s</td></tr>\n" %
stats["version-uptodate"])
f.write("<tr><td>Packages that are not up-to-date</td><td>%s</td></tr>\n" %
stats["version-not-uptodate"])
f.write("<tr><td>Packages with no known upstream version</td><td>%s</td></tr>\n" %
stats["version-unknown"])
f.write("<tr><td>Packages affected by CVEs</td><td>%s</td></tr>\n" %
stats["pkg-cves"])
f.write("<tr><td>Total number of CVEs affecting all packages</td><td>%s</td></tr>\n" %
stats["total-cves"])
f.write("</table>\n")
def dump_html_gen_info(f, date, commit):
# Updated on Mon Feb 19 08:12:08 CET 2018, Git commit aa77030b8f5e41f1c53eb1c1ad664b8c814ba032
f.write("<p><i>Updated on %s, git commit %s</i></p>\n" % (str(date), commit))
def dump_html(packages, stats, date, commit, output):
with open(output, 'w') as f:
f.write(html_header)
dump_html_all_pkgs(f, packages)
dump_html_stats(f, stats)
dump_html_gen_info(f, date, commit)
f.write(html_footer)
def dump_json(packages, defconfigs, stats, date, commit, output):
# Format packages as a dictionnary instead of a list
# Exclude local field that does not contains real date
excluded_fields = ['url_worker', 'name']
pkgs = {
pkg.name: {
k: v
for k, v in pkg.__dict__.items()
if k not in excluded_fields
} for pkg in packages
}
defconfigs = {
d.name: {
k: v
for k, v in d.__dict__.items()
} for d in defconfigs
}
# Aggregate infrastructures into a single dict entry
statistics = {
k: v
for k, v in stats.items()
if not k.startswith('infra-')
}
statistics['infra'] = {k[6:]: v for k, v in stats.items() if k.startswith('infra-')}
# The actual structure to dump, add commit and date to it
final = {'packages': pkgs,
'stats': statistics,
'defconfigs': defconfigs,
'package_status_checks': Package.status_checks,
'commit': commit,
'date': str(date)}
with open(output, 'w') as f:
json.dump(final, f, indent=2, separators=(',', ': '))
f.write('\n')
def resolvepath(path):
return os.path.abspath(os.path.expanduser(path))
def parse_args():
parser = argparse.ArgumentParser()
output = parser.add_argument_group('output', 'Output file(s)')
output.add_argument('--html', dest='html', type=resolvepath,
help='HTML output file')
output.add_argument('--json', dest='json', type=resolvepath,
help='JSON output file')
packages = parser.add_mutually_exclusive_group()
packages.add_argument('-n', dest='npackages', type=int, action='store',
help='Number of packages')
packages.add_argument('-p', dest='packages', action='store',
help='List of packages (comma separated)')
parser.add_argument('--nvd-path', dest='nvd_path',
help='Path to the local NVD database', type=resolvepath)
args = parser.parse_args()
if not args.html and not args.json:
parser.error('at least one of --html or --json (or both) is required')
return args
def __main__():
args = parse_args()
if args.packages:
package_list = args.packages.split(",")
else:
package_list = None
date = datetime.datetime.utcnow()
commit = subprocess.check_output(['git', 'rev-parse',
'HEAD']).splitlines()[0].decode()
print("Build package list ...")
packages = get_pkglist(args.npackages, package_list)
print("Getting developers ...")
developers = parse_developers()
print("Build defconfig list ...")
defconfigs = get_defconfig_list()
for d in defconfigs:
d.set_developers(developers)
print("Getting package make info ...")
package_init_make_info()
print("Getting package details ...")
for pkg in packages:
pkg.set_infra()
pkg.set_license()
pkg.set_hash_info()
pkg.set_patch_count()
pkg.set_check_package_warnings()
pkg.set_current_version()
pkg.set_url()
pkg.set_developers(developers)
print("Checking URL status")
loop = asyncio.get_event_loop()
loop.run_until_complete(check_package_urls(packages))
print("Getting latest versions ...")
loop = asyncio.get_event_loop()
loop.run_until_complete(check_package_latest_version(packages))
if args.nvd_path:
print("Checking packages CVEs")
check_package_cves(args.nvd_path, {p.name: p for p in packages})
print("Calculate stats")
stats = calculate_stats(packages)
if args.html:
print("Write HTML")
dump_html(packages, stats, date, commit, args.html)
if args.json:
print("Write JSON")
dump_json(packages, defconfigs, stats, date, commit, args.json)
__main__()