Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion scripts/cve_check_ng.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,10 @@ def main(args: dict):
logger.setLevel(logging.DEBUG)

bitbakeinfo = bitbake_runner.get_bitbake_information(args.image_name)
if bitbakeinfo["cve_db_v2_name"] is None:
logger.error("CVE database file name is not defined")
exit(1)

disable_plugins = create_disable_plugins_list(args.disable_plugins)

dpkg_status_file = (
Expand Down Expand Up @@ -365,7 +369,7 @@ def main(args: dict):
kev_info_list = fetch_kev_data(cve_data_dir)

# Create CVE report data
creator = NvdCveInfoListCreator(cve_data_dir, installed_packages, kev_info_list)
creator = NvdCveInfoListCreator(bitbakeinfo["cve_db_v2_name"], cve_data_dir, installed_packages, kev_info_list)
creator.create_cve_info_list(cve_check_merged_list)

cve_info_list = creator.get_nvd_info_list()
Expand Down
12 changes: 12 additions & 0 deletions scripts/lib/python/bitbake_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def get_bitbake_information(image):
pattern_repo_isar_dir = r'\nREPO_ISAR_DIR="([^"]*)"'
pattern_image_distro = r'\nDISTRO="([^"]*)"'
pattern_cve_db_predownload = r'\nCVE_DB_PREDOWNLOAD_URL="([^"]*)"'
pattern_cve_db_v2_predownload = r'\nCVE_DB_V2_PREDOWNLOAD_URL="([^"]*)"'
pattern_layer_emlinux = r'\nLAYERDIR_emlinux="([^"]*)"'

deploy_image_dir = re.findall(pattern_deploy_image_dir, output)[0]
Expand All @@ -82,6 +83,15 @@ def get_bitbake_information(image):
else:
cve_db_predownload = None

cve_db_v2_url = re.findall(pattern_cve_db_v2_predownload, output)
cve_db_v2_name = "nvd_cve_db_v2.db"
if not len(cve_db_v2_url) == 0:
cve_db_v2_predownload = cve_db_v2_url[0]
cve_db_v2_name = cve_db_v2_predownload.split("/")[-1]
else:
cve_db_v2_predownload = None


dpkg_status = f"{deploy_image_dir}/{image_full_name}.dpkg_status"
return {
"deploy_dir": deploy_dir,
Expand All @@ -95,5 +105,7 @@ def get_bitbake_information(image):
"repo_isar_dir": repo_isar_dir,
"image_distro": image_distro,
"cve_db_predownload": cve_db_predownload,
"cve_db_v2_predownload": cve_db_v2_predownload,
"cve_db_v2_name": cve_db_v2_name,
"emlinux_layer_dir": emlinux_layer_dir,
}
2 changes: 2 additions & 0 deletions scripts/lib/python/cve/cve_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def _write_text_report(self, cve_info_list: NvdCveNvdInfoList) -> list[str]:
f.write(f"CVE SUMMARY: {ci.summary}\n")
f.write(f"CVSS v2 BASE SCORE: {ci.scorev2}\n")
f.write(f"CVSS v3 BASE SCORE: {ci.scorev3}\n")
f.write(f"CVSS v4 BASE SCORE: {ci.scorev4}\n")
f.write(f"VECTOR: {ci.vector}\n")
f.write(f"VECTORSTRING: {ci.vector_string}\n")

Expand Down Expand Up @@ -162,6 +163,7 @@ def _create_issue_data(self, cve_info: NvdCveNvdInfo) -> list[dict]:
data["CVE SUMMARY"] = ci.summary
data["CVSS v2 BASE SCORE"] = ci.scorev2
data["CVSS v3 BASE SCORE"] = ci.scorev3
data["CVSS v4 BASE SCORE"] = ci.scorev4
data["VECTOR"] = ci.vector
data["VECTORSTRING"] = ci.vector_string

Expand Down
17 changes: 10 additions & 7 deletions scripts/lib/python/cve/nvd_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@

import sqlite3

CVE_DATABASE_NAME = "nvd_cve_db.db"


class NvdCveNvdInfo:
def __init__(
self,
Expand All @@ -28,6 +25,7 @@ def __init__(
summary: str,
scorev2: str,
scorev3: str,
scorev4: str,
vector: str,
vector_string: str,
status: str,
Expand All @@ -42,6 +40,7 @@ def __init__(
self.summary = summary.strip()
self.scorev2 = scorev2
self.scorev3 = scorev3
self.scorev4 = scorev4
self.vector = vector
self.vector_string = vector_string
self.status = status
Expand Down Expand Up @@ -159,11 +158,12 @@ def get_cve_status(self, src_pkg_name: str, cveid: str) -> str:
class NvdCveInfoListCreator:
def __init__(
self,
cve_db_name: str,
cve_data_dir: str,
package_info_list: PackageList,
kev_info_list: KevInfoList,
) -> None:
self.db_file = f"{cve_data_dir}/{CVE_DATABASE_NAME}"
self.db_file = f"{cve_data_dir}/{cve_db_name}"
self.nvd_info_list = NvdCveNvdInfoList()
self.package_info_list = package_info_list
self.kev_info_list = kev_info_list
Expand Down Expand Up @@ -214,7 +214,7 @@ def _get_cve_information(
status: str,
) -> NvdCveNvdInfo:
c = self.conn.cursor()
sql = f'SELECT VULNSTATUS, SUMMARY, SCOREV2, SCOREV3, VECTOR, VECTORSTRING FROM NVD WHERE ID="{cveid}"'
sql = f'SELECT VULNSTATUS, SUMMARY, SCOREV2, SCOREV3, SCOREV4, VECTOR, VECTORSTRING FROM NVD WHERE ID="{cveid}"'
cursor = c.execute(sql)
data = cursor.fetchone()
c.close()
Expand All @@ -223,6 +223,7 @@ def _get_cve_information(
summary = ""
scorev2 = "0.0"
scorev3 = "0.0"
scorev4 = "0.0"
vector = "UNKNOWN"
vector_string = "UNKNOWN"

Expand All @@ -233,8 +234,9 @@ def _get_cve_information(
summary = data[1]
scorev2 = data[2]
scorev3 = data[3]
vector = data[4]
vector_string = data[5]
scorev4 = data[4]
vector = data[5]
vector_string = data[6]

return NvdCveNvdInfo(
cveid,
Expand All @@ -244,6 +246,7 @@ def _get_cve_information(
summary,
scorev2,
scorev3,
scorev4,
vector,
vector_string,
vuln_status,
Expand Down
55 changes: 46 additions & 9 deletions scripts/lib/python/cve/plugin/eml_cve_nvd_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ def __init__(
cve_products,
)

self.predownload_url = self.bitbakeinfo["cve_db_predownload"]
# default database name
self.cve_db_name = self.bitbakeinfo["cve_db_v2_name"]
self.predownload_url = self.bitbakeinfo["cve_db_v2_predownload"]
self.predownload = self.args.cve_db_predownload

self.db_file = f"{self.cve_data_dir}/{nvd_lib.CVE_DATABASE_NAME}"
self.db_file = f"{self.cve_data_dir}/{self.cve_db_name}"
self.nvd_api_key = args.nvd_api_key

def update_database(self) -> bool:
Expand Down Expand Up @@ -171,7 +172,7 @@ def _initialize_nvd_cve_db(self, conn: sqlite3.Connection) -> None:

c.execute(
"CREATE TABLE IF NOT EXISTS NVD (ID TEXT UNIQUE, VULNSTATUS TEXT, SUMMARY TEXT, SCOREV2 TEXT, \
SCOREV3 TEXT, MODIFIED INTEGER, VECTOR TEXT, VECTORSTRING TEXT)"
SCOREV3 TEXT, SCOREV4 TEXT, MODIFIED INTEGER, VECTOR TEXT, VECTORSTRING TEXT)"
)

c.execute(
Expand Down Expand Up @@ -312,6 +313,15 @@ def _cpe_generator():
"insert into PRODUCTS values (?, ?, ?, ?, ?, ?, ?)", _cpe_generator()
).close()


def _column_exsits(self, conn, column_name) -> bool:
cur = conn.cursor()
cur.execute("PRAGMA table_info(NVD)")
columns = [row[1] for row in cur.fetchall()]
cur.close()
return column_name in columns


def _update_db(self, conn, elt):
"""
Update a single entry in the on-disk database
Expand All @@ -327,6 +337,10 @@ def _update_db(self, conn, elt):
else:
vulnStatus = ""

cvssv2 = 0.0
cvssv3 = 0.0
cvssv4 = 0.0

cveDesc = ""
for desc in elt["cve"]["descriptions"]:
if desc["lang"] == "en":
Expand All @@ -341,8 +355,8 @@ def _update_db(self, conn, elt):
]
cvssv2 = elt["cve"]["metrics"]["cvssMetricV2"][0]["cvssData"]["baseScore"]
except KeyError:
cvssv2 = 0.0
cvssv3 = None
pass

try:
accessVector = (
accessVector
Expand All @@ -355,6 +369,7 @@ def _update_db(self, conn, elt):
cvssv3 = elt["cve"]["metrics"]["cvssMetricV30"][0]["cvssData"]["baseScore"]
except KeyError:
pass

try:
accessVector = (
accessVector
Expand All @@ -370,18 +385,37 @@ def _update_db(self, conn, elt):
)
except KeyError:
pass

try:
accessVector = (
accessVector
or elt['cve']['metrics']['cvssMetricV40'][0]['cvssData']['attackVector']
)

vectorString = (
vectorString
or elt['cve']['metrics']['cvssMetricV40'][0]['cvssData']['vectorString']
)
cvssv4 = elt['cve']['metrics']['cvssMetricV40'][0]['cvssData']['baseScore']
except KeyError:
pass

accessVector = accessVector or "UNKNOWN"
vectorString = vectorString or "UNKNOWN"
cvssv3 = cvssv3 or 0.0

if not self._column_exsits(conn, "SCOREV4"):
logger.error(f"SCOREV4 column is not found in your NVD CVE database, please remove old one then create new database")
return False

conn.execute(
"insert or replace into NVD values (?, ?, ?, ?, ?, ?, ?, ?)",
"insert or replace into NVD values (?, ?, ?, ?, ?, ?, ?, ?, ?)",
[
cveId.strip(),
vulnStatus.strip(),
cveDesc.strip(),
cvssv2,
cvssv3,
cvssv4,
date.strip(),
accessVector.strip(),
vectorString.strip(),
Expand All @@ -400,6 +434,8 @@ def _update_db(self, conn, elt):
except KeyError:
logger.debug("CVE %s has no configurations" % cveId)

return True

def _nvd_request_next(self, url: str, request_args: Any) -> str:
"""
Request next part of the NVD dabase
Expand Down Expand Up @@ -464,7 +500,8 @@ def _fetch_all_cves(self, conn: sqlite3.Connection, last_modified: str) -> bool:
per_page = data["resultsPerPage"]
logger.debug(f"Got {per_page} entries")
for cve in data["vulnerabilities"]:
self._update_db(conn, cve)
if not self._update_db(conn, cve):
return False

if per_page == 0:
# no more data
Expand Down