Canonicalization improvements

Do not delete existing advisories, but merge things properly.
This commit is contained in:
Nemo 2023-04-28 14:09:07 +05:30
parent 98a24690d1
commit fee22eda9d
2 changed files with 44 additions and 25 deletions

View File

@ -14,6 +14,9 @@ jobs:
with: with:
ref: ${{ github.head_ref }} ref: ${{ github.head_ref }}
submodules: true submodules: true
# pypy is much faster at generating lots of JSON
# and canonicaljson also uses pypy-json instead
# of simplejson (https://github.com/matrix-org/python-canonicaljson/pull/25/files)
- uses: actions/setup-python@v4 - uses: actions/setup-python@v4
with: with:
python-version: 'pypy3.9' python-version: 'pypy3.9'

View File

@ -3,6 +3,7 @@ from glob import glob
import subprocess import subprocess
import markdown import markdown
import json import json
import canonicaljson
import urllib.request import urllib.request
from datetime import datetime from datetime import datetime
import copy import copy
@ -52,12 +53,14 @@ def get_osv(cve_data_all_versions):
def cve_fixed_version(package, cves, os_version, advisory): def cve_fixed_version(package, cves, os_version, advisory):
# list of fixed versions with a matching # list of fixed versions with a matching
# CVE/pkg/OS combination # CVE/pkg/OS combination
fixed_versions = set([ fixed_versions = set(
x["res_ver"] [
for cve in cves x["res_ver"]
for x in cve_data_all_versions.get(cve, list()) for cve in cves
if (x and x["os"] == os_version and x["pkg"] == package) for x in cve_data_all_versions.get(cve, list())
]) if (x and x["os"] == os_version and x["pkg"] == package)
]
)
# There should only be a single such reference # There should only be a single such reference
if len(fixed_versions) != 1: if len(fixed_versions) != 1:
f = ", ".join(list(fixed_versions)) f = ", ".join(list(fixed_versions))
@ -96,7 +99,7 @@ def get_osv(cve_data_all_versions):
{"introduced": "0"}, {"introduced": "0"},
{"fixed": fixed_version}, {"fixed": fixed_version},
], ],
"type": "ECOSYSTEM" "type": "ECOSYSTEM",
} }
return r return r
@ -105,10 +108,7 @@ def get_osv(cve_data_all_versions):
"modified": modified.isoformat("T") + "Z", "modified": modified.isoformat("T") + "Z",
"published": published.isoformat("T") + "Z", "published": published.isoformat("T") + "Z",
"related": cves, "related": cves,
"affected": [ "affected": [affected(pkg, cves, os_version) for pkg in packages],
affected(pkg, cves, os_version)
for pkg in packages
],
"references": [ "references": [
{"type": "ADVISORY", "url": ADVISORY_URL.format(slug=slug)} {"type": "ADVISORY", "url": ADVISORY_URL.format(slug=slug)}
], ],
@ -116,18 +116,24 @@ def get_osv(cve_data_all_versions):
def merge_advisories(advisory_file, data): def merge_advisories(advisory_file, data):
def dedup_dicts(items):
dedupped = [ json.loads(i) for i in set(canonicaljson.encode_canonical_json(item) for item in items)]
return dedupped
# read the current advisory data as json # read the current advisory data as json
with open(advisory_file, "r") as f: with open(advisory_file, "r") as f:
original = json.load(f) original = json.load(f)
current = copy.deepcopy(original) current = copy.deepcopy(original)
# merge the data # merge the data
assert current["id"] == data["id"] assert current["id"] == data["id"]
current["affected"].extend(data["affected"]) # Add any new data, but use a set, to avoid
current["references"].extend(data["references"]) # duplicate entries
current["related"].extend(data["related"]) for key in ['affected', 'references', 'related']:
if current[key]:
# Make sure no CVE references are duplicated current[key].extend(data[key])
current["related"] = list(set(current["related"])).sort() current[key] = dedup_dicts(current[key])
elif data[key]:
current[key] = data[key]
# Pick the earlier published date # Pick the earlier published date
# and the later modified date # and the later modified date
@ -149,9 +155,13 @@ def merge_advisories(advisory_file, data):
no_important_changes = True no_important_changes = True
# One of the important keys has changed # One of the important keys has changed
for key in ["id", "affected", "references", "related", "published"]: for key in ["affected", "references", "related", "published"]:
if current[key] != original[key]: if canonicaljson.encode_canonical_json(
original[key]
) != canonicaljson.encode_canonical_json(current[key]):
print(f"Found changes in {current['id']} / {key}")
no_important_changes = False no_important_changes = False
break
if no_important_changes: if no_important_changes:
return None return None
@ -182,19 +192,25 @@ def fetch_cve_metadata(PHOTON_VERSIONS):
print(f"[+] CVE metadata for Photon OS {branch}.0: Added {len(data)} CVEs") print(f"[+] CVE metadata for Photon OS {branch}.0: Added {len(data)} CVEs")
return cve_metadata return cve_metadata
def __main__():
def __main__(advisory_id = None):
cve_metadata = fetch_cve_metadata(PHOTON_VERSIONS) cve_metadata = fetch_cve_metadata(PHOTON_VERSIONS)
for advisory in glob("advisories/*.json"):
os.remove(advisory)
for d in get_osv(cve_metadata): for d in get_osv(cve_metadata):
# If we are only running for a single advisory
# Check and continue if it doesn't match
if advisory_id and d['id'] != advisory_id:
continue
fn = f"advisories/{d['id']}.json" fn = f"advisories/{d['id']}.json"
if os.path.exists(fn): if os.path.exists(fn):
d = merge_advisories(fn, d) d = merge_advisories(fn, d)
if d: if d:
with open(fn, "w") as f: with open(fn, "wb") as f:
f.write(json.dumps(d, indent=4, sort_keys=True)) f.write(canonicaljson.encode_pretty_printed_json(d))
if __name__ == "__main__": if __name__ == "__main__":
__main__() if len(sys.argv) >=2:
__main__(sys.argv[1])
else:
__main__()