photon-os-advisories/generate.py

231 lines
8.1 KiB
Python

import sys
from glob import glob
import subprocess
import markdown
import json
import canonicaljson
import urllib.request
from datetime import datetime
import copy
import os
import re
from bs4 import BeautifulSoup
CVE_REGEX = r"CVE-\d{4}-\d{4,7}"
FILE_FORMAT = "/Security-Updates-{version}.md"
ADVISORY_URL = "https://github.com/vmware/photon/wiki/Security-Update-{slug}"
PHOTON_VERSIONS = range(1, 5)
ADVISORIES_DIR = "photon-wiki"
def last_modified_date(file):
p = int(
subprocess.check_output(
["git", "log", "--date=iso-strict", "-1", "--format=%ct", "--", file],
cwd=ADVISORIES_DIR,
)
.decode("utf-8")
.strip()
)
return datetime.utcfromtimestamp(p)
def created_date(file):
with open(ADVISORIES_DIR + "/" + file) as f:
for line in f:
if line.startswith("Issue"):
return datetime.strptime(line.split(": ")[1].strip(), "%Y-%m-%d")
def advisory_slug(os_version, advisory):
_id = int(float(advisory.split("-")[-1]))
return f"{os_version}.0-{_id}"
def get_osv(cve_data_all_versions):
for os_version in PHOTON_VERSIONS:
filename = FILE_FORMAT.format(version=os_version)
file = ADVISORIES_DIR + filename
print(f"Parsing {filename}")
# Returns the version that fixed any of the given CVEs + OS + Package combination
# there should only be one
def cve_fixed_version(package, cves, os_version, advisory):
# list of fixed versions with a matching
# CVE/pkg/OS combination
fixed_versions = set(
[
x["res_ver"]
for cve in cves
for x in cve_data_all_versions.get(cve, list())
if (x and x["os"] == os_version and x["pkg"] == package)
]
)
# There should only be a single such reference
if len(fixed_versions) != 1:
f = ", ".join(list(fixed_versions))
print(f"[{advisory}] Invalid Versions: {package} ({f})")
return None
return fixed_versions.pop()
with open(file, "r") as f:
table_html = markdown.markdown(
f.read(), extensions=["markdown.extensions.tables"]
)
soup = BeautifulSoup(table_html, "html.parser")
for tr in soup.find("tbody").find_all("tr"):
(advisory, severity, published_date, packages, cves) = [
x.text for x in tr.find_all("td")
]
packages = json.loads(packages.replace("'", '"'))
cves = re.findall(CVE_REGEX, cves)
slug = advisory_slug(os_version, advisory)
advisory_file = f"Security-Update-{slug}.md"
modified = last_modified_date(advisory_file)
published = created_date(advisory_file)
def affected(pkg, cves, os_version):
r = {
"package": {
"ecosystem": f"Photon OS:{os_version}.0",
"name": pkg,
"purl": f"pkg:rpm/vmware/{pkg}?distro=photon-{os_version}",
}
}
fixed_version = cve_fixed_version(pkg, cves, os_version, advisory)
if fixed_version:
r["ranges"] = {
"events": [
{"introduced": "0"},
{"fixed": fixed_version},
],
"type": "ECOSYSTEM",
}
return r
yield {
"id": advisory,
"modified": modified.isoformat("T", timespec='seconds') + "Z",
"published": published.isoformat("T", timespec='seconds') + "Z",
"related": cves,
"affected": [affected(pkg, cves, os_version) for pkg in packages],
"references": [
{"type": "ADVISORY", "url": ADVISORY_URL.format(slug=slug)}
],
}
def merge_advisories(advisory_file, data):
def dedup_dicts(items):
dedupped = [ json.loads(i) for i in set(canonicaljson.encode_canonical_json(item) for item in items)]
return dedupped
# read the current advisory data as json
with open(advisory_file, "r") as f:
original = json.load(f)
current = copy.deepcopy(original)
# merge the data
assert current["id"] == data["id"]
# Add any new data, but use a set, to avoid
# duplicate entries
for key in ['affected', 'related', 'references']:
if current[key]:
current[key].extend(data[key])
current[key] = dedup_dicts(current[key])
elif data[key]:
current[key] = data[key]
# Pick the earlier published date
# and the later modified date
current["published"] = (
min(
datetime.strptime(current["published"], "%Y-%m-%dT%H:%M:%SZ"),
datetime.strptime(data["published"], "%Y-%m-%dT%H:%M:%SZ"),
).isoformat("T", timespec='seconds')
+ "Z"
)
current["modified"] = (
max(
datetime.strptime(current["modified"], "%Y-%m-%dT%H:%M:%SZ"),
datetime.strptime(data["modified"], "%Y-%m-%dT%H:%M:%SZ"),
).isoformat("T", timespec='seconds')
+ "Z"
)
no_important_changes = True
# One of the important keys has changed
for key in ["affected", "references", "related", "published"]:
if canonicaljson.encode_canonical_json(
original[key]
) != canonicaljson.encode_canonical_json(current[key]):
no_important_changes = False
break
if no_important_changes:
return None
# If there were important changes, but modified hasn't changed
# bump the timestamp so downstream can pick up changes
if original['modified'] == current['modified']:
current['modified'] = datetime.utcnow().isoformat("T", timespec='seconds') + "Z"
return current
def fetch_cve_metadata(PHOTON_VERSIONS):
cve_metadata = {}
for branch in PHOTON_VERSIONS:
url = f"https://packages.vmware.com/photon/photon_cve_metadata/cve_data_photon{branch}.0.json"
with urllib.request.urlopen(url) as r:
data = json.loads(r.read().decode())
for row in data:
row["os"] = branch
cve = row.pop("cve_id")
if (
row["aff_ver"]
== f"all versions before {row['res_ver']} are vulnerable"
):
del row["aff_ver"]
else:
print(row)
raise Exception("Unimplemented affected version range")
if cve in cve_metadata:
cve_metadata[cve].append(row)
else:
cve_metadata[cve] = [row]
print(f"[+] CVE metadata for Photon OS {branch}.0: Added {len(data)} CVEs")
return cve_metadata
def __main__(advisory_id = None):
cve_metadata = fetch_cve_metadata(PHOTON_VERSIONS)
advisories = set()
for d in get_osv(cve_metadata):
advisories.add(d['id'])
# If we are only running for a single advisory
# Check and continue if it doesn't match
if advisory_id and d['id'] != advisory_id:
continue
fn = f"advisories/{d['id']}.json"
if os.path.exists(fn):
d = merge_advisories(fn, d)
if d:
with open(fn, "wb") as f:
f.write(canonicaljson.encode_pretty_printed_json(d))
# Remove any advisories that are no longer in the upstream data
for advisory in os.listdir("advisories"):
if advisory.endswith(".json"):
if advisory[:-5] not in advisories:
print(f"[-] Removing {advisory}")
# os.unlink(f"advisories/{advisory}")
if __name__ == "__main__":
if len(sys.argv) >=2:
__main__(sys.argv[1])
else:
__main__()