diff --git a/.github/workflows/update.yaml b/.github/workflows/update.yaml index 7c179e3..d15119e 100644 --- a/.github/workflows/update.yaml +++ b/.github/workflows/update.yaml @@ -14,6 +14,9 @@ jobs: with: ref: ${{ github.head_ref }} submodules: true + # pypy is much faster at generating lots of JSON + # and canonicaljson also uses pypy-json instead + # of simplejson (https://github.com/matrix-org/python-canonicaljson/pull/25/files) - uses: actions/setup-python@v4 with: python-version: 'pypy3.9' diff --git a/generate.py b/generate.py index 4844567..9e831ef 100644 --- a/generate.py +++ b/generate.py @@ -3,6 +3,7 @@ from glob import glob import subprocess import markdown import json +import canonicaljson import urllib.request from datetime import datetime import copy @@ -52,12 +53,14 @@ def get_osv(cve_data_all_versions): def cve_fixed_version(package, cves, os_version, advisory): # list of fixed versions with a matching # CVE/pkg/OS combination - fixed_versions = set([ - x["res_ver"] - for cve in cves - for x in cve_data_all_versions.get(cve, list()) - if (x and x["os"] == os_version and x["pkg"] == package) - ]) + fixed_versions = set( + [ + x["res_ver"] + for cve in cves + for x in cve_data_all_versions.get(cve, list()) + if (x and x["os"] == os_version and x["pkg"] == package) + ] + ) # There should only be a single such reference if len(fixed_versions) != 1: f = ", ".join(list(fixed_versions)) @@ -96,7 +99,7 @@ def get_osv(cve_data_all_versions): {"introduced": "0"}, {"fixed": fixed_version}, ], - "type": "ECOSYSTEM" + "type": "ECOSYSTEM", } return r @@ -105,10 +108,7 @@ def get_osv(cve_data_all_versions): "modified": modified.isoformat("T") + "Z", "published": published.isoformat("T") + "Z", "related": cves, - "affected": [ - affected(pkg, cves, os_version) - for pkg in packages - ], + "affected": [affected(pkg, cves, os_version) for pkg in packages], "references": [ {"type": "ADVISORY", "url": ADVISORY_URL.format(slug=slug)} ], @@ -116,18 +116,24 @@ def get_osv(cve_data_all_versions): def merge_advisories(advisory_file, data): + + def dedup_dicts(items): + dedupped = [ json.loads(i) for i in set(canonicaljson.encode_canonical_json(item) for item in items)] + return dedupped # read the current advisory data as json with open(advisory_file, "r") as f: original = json.load(f) current = copy.deepcopy(original) # merge the data assert current["id"] == data["id"] - current["affected"].extend(data["affected"]) - current["references"].extend(data["references"]) - current["related"].extend(data["related"]) - - # Make sure no CVE references are duplicated - current["related"] = list(set(current["related"])).sort() + # Add any new data, but use a set, to avoid + # duplicate entries + for key in ['affected', 'references', 'related']: + if current[key]: + current[key].extend(data[key]) + current[key] = dedup_dicts(current[key]) + elif data[key]: + current[key] = data[key] # Pick the earlier published date # and the later modified date @@ -149,9 +155,13 @@ def merge_advisories(advisory_file, data): no_important_changes = True # One of the important keys has changed - for key in ["id", "affected", "references", "related", "published"]: - if current[key] != original[key]: + for key in ["affected", "references", "related", "published"]: + if canonicaljson.encode_canonical_json( + original[key] + ) != canonicaljson.encode_canonical_json(current[key]): + print(f"Found changes in {current['id']} / {key}") no_important_changes = False + break if no_important_changes: return None @@ -182,19 +192,25 @@ def fetch_cve_metadata(PHOTON_VERSIONS): print(f"[+] CVE metadata for Photon OS {branch}.0: Added {len(data)} CVEs") return cve_metadata -def __main__(): + +def __main__(advisory_id = None): cve_metadata = fetch_cve_metadata(PHOTON_VERSIONS) - for advisory in glob("advisories/*.json"): - os.remove(advisory) for d in get_osv(cve_metadata): + # If we are only running for a single advisory + # Check and continue if it doesn't match + if advisory_id and d['id'] != advisory_id: + continue fn = f"advisories/{d['id']}.json" if os.path.exists(fn): d = merge_advisories(fn, d) if d: - with open(fn, "w") as f: - f.write(json.dumps(d, indent=4, sort_keys=True)) + with open(fn, "wb") as f: + f.write(canonicaljson.encode_pretty_printed_json(d)) if __name__ == "__main__": - __main__() + if len(sys.argv) >=2: + __main__(sys.argv[1]) + else: + __main__()