09a71e6a75
Currently, the CPE XML database is parsed into a Python dict, which is then pickled into a local file, to speed up the processing of further invocations. However, it turns out that since the initial implementation, we have switched the XML parsing from the out of tree xmltodict module to the standard ElementTree one, which has made the parsing much faster. The pickle caching only saves 6 seconds, on something that takes more than 13 minutes total. In addition, this pickle caching consumes a significant amount of RAM, causing the Python process to be OOM-killed on a server with 4 GB of RAM. So let's just drop this caching entirely. Signed-off-by: Thomas Petazzoni <thomas.petazzoni@bootlin.com> Signed-off-by: Yann E. MORIN <yann.morin.1998@free.fr>
176 lines
7.7 KiB
Python
176 lines
7.7 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import xml.etree.ElementTree as ET
|
|
from xml.etree.ElementTree import Element, SubElement
|
|
import gzip
|
|
import os
|
|
import pickle
|
|
import requests
|
|
import time
|
|
from xml.dom import minidom
|
|
|
|
VALID_REFS = ['VENDOR', 'VERSION', 'CHANGE_LOG', 'PRODUCT', 'PROJECT', 'ADVISORY']
|
|
|
|
CPEDB_URL = "https://static.nvd.nist.gov/feeds/xml/cpe/dictionary/official-cpe-dictionary_v2.3.xml.gz"
|
|
|
|
ns = {
|
|
'': 'http://cpe.mitre.org/dictionary/2.0',
|
|
'cpe-23': 'http://scap.nist.gov/schema/cpe-extension/2.3',
|
|
'xml': 'http://www.w3.org/XML/1998/namespace'
|
|
}
|
|
|
|
|
|
class CPE:
|
|
def __init__(self, cpe_str, titles, refs):
|
|
self.cpe_str = cpe_str
|
|
self.titles = titles
|
|
self.references = refs
|
|
self.cpe_cur_ver = "".join(self.cpe_str.split(":")[5:6])
|
|
|
|
def update_xml_dict(self):
|
|
ET.register_namespace('', 'http://cpe.mitre.org/dictionary/2.0')
|
|
cpes = Element('cpe-list')
|
|
cpes.set('xmlns:cpe-23', "http://scap.nist.gov/schema/cpe-extension/2.3")
|
|
cpes.set('xmlns:ns6', "http://scap.nist.gov/schema/scap-core/0.1")
|
|
cpes.set('xmlns:scap-core', "http://scap.nist.gov/schema/scap-core/0.3")
|
|
cpes.set('xmlns:config', "http://scap.nist.gov/schema/configuration/0.1")
|
|
cpes.set('xmlns:xsi', "http://www.w3.org/2001/XMLSchema-instance")
|
|
cpes.set('xmlns:meta', "http://scap.nist.gov/schema/cpe-dictionary-metadata/0.2")
|
|
cpes.set('xsi:schemaLocation', " ".join(["http://scap.nist.gov/schema/cpe-extension/2.3",
|
|
"https://scap.nist.gov/schema/cpe/2.3/cpe-dictionary-extension_2.3.xsd",
|
|
"http://cpe.mitre.org/dictionary/2.0",
|
|
"https://scap.nist.gov/schema/cpe/2.3/cpe-dictionary_2.3.xsd",
|
|
"http://scap.nist.gov/schema/cpe-dictionary-metadata/0.2",
|
|
"https://scap.nist.gov/schema/cpe/2.1/cpe-dictionary-metadata_0.2.xsd",
|
|
"http://scap.nist.gov/schema/scap-core/0.3",
|
|
"https://scap.nist.gov/schema/nvd/scap-core_0.3.xsd",
|
|
"http://scap.nist.gov/schema/configuration/0.1",
|
|
"https://scap.nist.gov/schema/nvd/configuration_0.1.xsd",
|
|
"http://scap.nist.gov/schema/scap-core/0.1",
|
|
"https://scap.nist.gov/schema/nvd/scap-core_0.1.xsd"]))
|
|
item = SubElement(cpes, 'cpe-item')
|
|
cpe_short_name = CPE.short_name(self.cpe_str)
|
|
cpe_new_ver = CPE.version_update(self.cpe_str)
|
|
|
|
item.set('name', 'cpe:/' + cpe_short_name)
|
|
self.titles[0].text.replace(self.cpe_cur_ver, cpe_new_ver)
|
|
for title in self.titles:
|
|
item.append(title)
|
|
if self.references:
|
|
item.append(self.references)
|
|
cpe23item = SubElement(item, 'cpe-23:cpe23-item')
|
|
cpe23item.set('name', self.cpe_str)
|
|
|
|
# Generate the XML as a string
|
|
xmlstr = ET.tostring(cpes)
|
|
|
|
# And use minidom to pretty print the XML
|
|
return minidom.parseString(xmlstr).toprettyxml(encoding="utf-8").decode("utf-8")
|
|
|
|
@staticmethod
|
|
def version(cpe):
|
|
return cpe.split(":")[5]
|
|
|
|
@staticmethod
|
|
def product(cpe):
|
|
return cpe.split(":")[4]
|
|
|
|
@staticmethod
|
|
def short_name(cpe):
|
|
return ":".join(cpe.split(":")[2:6])
|
|
|
|
@staticmethod
|
|
def version_update(cpe):
|
|
return ":".join(cpe.split(":")[5:6])
|
|
|
|
@staticmethod
|
|
def no_version(cpe):
|
|
return ":".join(cpe.split(":")[:5])
|
|
|
|
|
|
class CPEDB:
|
|
def __init__(self, nvd_path):
|
|
self.all_cpes = dict()
|
|
self.all_cpes_no_version = dict()
|
|
self.nvd_path = nvd_path
|
|
|
|
def get_xml_dict(self):
|
|
print("CPE: Setting up NIST dictionary")
|
|
if not os.path.exists(os.path.join(self.nvd_path, "cpe")):
|
|
os.makedirs(os.path.join(self.nvd_path, "cpe"))
|
|
|
|
cpe_dict_local = os.path.join(self.nvd_path, "cpe", os.path.basename(CPEDB_URL))
|
|
if not os.path.exists(cpe_dict_local) or os.stat(cpe_dict_local).st_mtime < time.time() - 86400:
|
|
print("CPE: Fetching xml manifest from [" + CPEDB_URL + "]")
|
|
cpe_dict = requests.get(CPEDB_URL)
|
|
open(cpe_dict_local, "wb").write(cpe_dict.content)
|
|
|
|
print("CPE: Unzipping xml manifest...")
|
|
nist_cpe_file = gzip.GzipFile(fileobj=open(cpe_dict_local, 'rb'))
|
|
print("CPE: Converting xml manifest to dict...")
|
|
tree = ET.parse(nist_cpe_file)
|
|
all_cpedb = tree.getroot()
|
|
self.parse_dict(all_cpedb)
|
|
|
|
def parse_dict(self, all_cpedb):
|
|
# Cycle through the dict and build two dict to be used for custom
|
|
# lookups of partial and complete CPE objects
|
|
# The objects are then used to create new proposed XML updates if
|
|
# if is determined one is required
|
|
# Out of the different language titles, select English
|
|
for cpe in all_cpedb.findall(".//{http://cpe.mitre.org/dictionary/2.0}cpe-item"):
|
|
cpe_titles = []
|
|
for title in cpe.findall('.//{http://cpe.mitre.org/dictionary/2.0}title[@xml:lang="en-US"]', ns):
|
|
title.tail = None
|
|
cpe_titles.append(title)
|
|
|
|
# Some older CPE don't include references, if they do, make
|
|
# sure we handle the case of one ref needing to be packed
|
|
# in a list
|
|
cpe_ref = cpe.find(".//{http://cpe.mitre.org/dictionary/2.0}references")
|
|
if cpe_ref:
|
|
for ref in cpe_ref.findall(".//{http://cpe.mitre.org/dictionary/2.0}reference"):
|
|
ref.tail = None
|
|
ref.text = ref.text.upper()
|
|
if ref.text not in VALID_REFS:
|
|
ref.text = ref.text + "-- UPDATE this entry, here are some examples and just one word should be used -- " + ' '.join(VALID_REFS) # noqa E501
|
|
cpe_ref.tail = None
|
|
cpe_ref.text = None
|
|
|
|
cpe_str = cpe.find(".//{http://scap.nist.gov/schema/cpe-extension/2.3}cpe23-item").get('name')
|
|
item = CPE(cpe_str, cpe_titles, cpe_ref)
|
|
cpe_str_no_version = CPE.no_version(cpe_str)
|
|
# This dict must have a unique key for every CPE version
|
|
# which allows matching to the specific obj data of that
|
|
# NIST dict entry
|
|
self.all_cpes.update({cpe_str: item})
|
|
# This dict has one entry for every CPE (w/o version) to allow
|
|
# partial match (no valid version) check (the obj is saved and
|
|
# used as seed for suggested xml updates. By updating the same
|
|
# non-version'd entry, it assumes the last update here is the
|
|
# latest version in the NIST dict)
|
|
self.all_cpes_no_version.update({cpe_str_no_version: item})
|
|
|
|
def find_partial(self, cpe_str):
|
|
cpe_str_no_version = CPE.no_version(cpe_str)
|
|
if cpe_str_no_version in self.all_cpes_no_version:
|
|
return cpe_str_no_version
|
|
|
|
def find_partial_obj(self, cpe_str):
|
|
cpe_str_no_version = CPE.no_version(cpe_str)
|
|
if cpe_str_no_version in self.all_cpes_no_version:
|
|
return self.all_cpes_no_version[cpe_str_no_version]
|
|
|
|
def find_partial_latest_version(self, cpe_str_partial):
|
|
cpe_obj = self.find_partial_obj(cpe_str_partial)
|
|
return cpe_obj.cpe_cur_ver
|
|
|
|
def find(self, cpe_str):
|
|
if self.find_partial(cpe_str):
|
|
if cpe_str in self.all_cpes:
|
|
return cpe_str
|
|
|
|
def gen_update_xml(self, cpe_str):
|
|
cpe = self.find_partial_obj(cpe_str)
|
|
return cpe.update_xml_dict()
|