Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from vulnerabilities.pipelines.v2_importers import apache_kafka_importer as apache_kafka_importer_v2
from vulnerabilities.pipelines.v2_importers import apache_tomcat_importer as apache_tomcat_v2
from vulnerabilities.pipelines.v2_importers import archlinux_importer as archlinux_importer_v2
from vulnerabilities.pipelines.v2_importers import collabora_importer as collabora_importer_v2
from vulnerabilities.pipelines.v2_importers import collect_fix_commits as collect_fix_commits_v2
from vulnerabilities.pipelines.v2_importers import curl_importer as curl_importer_v2
from vulnerabilities.pipelines.v2_importers import debian_importer as debian_importer_v2
Expand Down Expand Up @@ -118,6 +119,7 @@
retiredotnet_importer_v2.RetireDotnetImporterPipeline,
ubuntu_osv_importer_v2.UbuntuOSVImporterPipeline,
alpine_linux_importer_v2.AlpineLinuxImporterPipeline,
collabora_importer_v2.CollaboraImporterPipeline,
nvd_importer.NVDImporterPipeline,
github_importer.GitHubAPIImporterPipeline,
gitlab_importer.GitLabImporterPipeline,
Expand Down
118 changes: 118 additions & 0 deletions vulnerabilities/pipelines/v2_importers/collabora_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import json
import logging
from typing import Iterable

import dateparser
import requests

from vulnerabilities.importer import AdvisoryDataV2
from vulnerabilities.importer import ReferenceV2
from vulnerabilities.importer import VulnerabilitySeverity
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
from vulnerabilities.severity_systems import SCORING_SYSTEMS

logger = logging.getLogger(__name__)

COLLABORA_URL = "https://api.github.com/repos/CollaboraOnline/online/security-advisories"


class CollaboraImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
"""Collect Collabora Online security advisories from the GitHub Security Advisory API."""

pipeline_id = "collabora_importer"
spdx_license_expression = "LicenseRef-scancode-proprietary-license"
license_url = "https://github.com/CollaboraOnline/online/security/advisories"
precedence = 200

@classmethod
def steps(cls):
return (cls.collect_and_store_advisories,)

def advisories_count(self) -> int:
return 0

def collect_advisories(self) -> Iterable[AdvisoryDataV2]:
url = COLLABORA_URL
params = {"state": "published", "per_page": 100}
while url:
try:
resp = requests.get(url, params=params, timeout=30)
resp.raise_for_status()
except Exception as e:
logger.error("Failed to fetch Collabora advisories from %s: %s", url, e)
break
for item in resp.json():
advisory = parse_advisory(item)
if advisory:
yield advisory
# cursor is already embedded in the next URL
url = resp.links.get("next", {}).get("url")
params = None


def parse_advisory(data: dict):
"""Parse a GitHub security advisory object; return None if the GHSA ID is missing."""
ghsa_id = data.get("ghsa_id") or ""
if not ghsa_id:
return None

cve_id = data.get("cve_id") or ""
aliases = [cve_id] if cve_id else []

summary = data.get("summary") or ""
html_url = data.get("html_url") or ""
references = [ReferenceV2(url=html_url)] if html_url else []

date_published = None
published_at = data.get("published_at") or ""
if published_at:
date_published = dateparser.parse(published_at)
if date_published is None:
logger.warning("Could not parse date %r for %s", published_at, ghsa_id)

severities = []
cvss_v3 = (data.get("cvss_severities") or {}).get("cvss_v3") or {}
cvss_vector = cvss_v3.get("vector_string") or ""
cvss_score = cvss_v3.get("score")
if cvss_vector and cvss_score:
system = (
SCORING_SYSTEMS["cvssv3.1"]
if cvss_vector.startswith("CVSS:3.1/")
else SCORING_SYSTEMS["cvssv3"]
)
severities.append(
VulnerabilitySeverity(
system=system,
value=str(cvss_score),
scoring_elements=cvss_vector,
)
)

weaknesses = []
for cwe_str in data.get("cwe_ids") or []:
# cwe_ids entries are like "CWE-79"; extract the integer part
suffix = cwe_str[4:] if cwe_str.upper().startswith("CWE-") else ""
if suffix.isdigit():
weaknesses.append(int(suffix))

return AdvisoryDataV2(
advisory_id=ghsa_id,
aliases=aliases,
summary=summary,
affected_packages=[],
references=references,
date_published=date_published,
severities=severities,
weaknesses=weaknesses,
url=html_url,
original_advisory_text=json.dumps(data, indent=2, ensure_ascii=False),
)
153 changes: 153 additions & 0 deletions vulnerabilities/tests/test_collabora_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import json
import os
from unittest import TestCase
from unittest.mock import MagicMock
from unittest.mock import patch

from vulnerabilities.pipelines.v2_importers.collabora_importer import CollaboraImporterPipeline
from vulnerabilities.pipelines.v2_importers.collabora_importer import parse_advisory

TEST_DATA = os.path.join(os.path.dirname(__file__), "test_data", "collabora")


def load_json(filename):
with open(os.path.join(TEST_DATA, filename), encoding="utf-8") as f:
return json.load(f)


class TestCollaboraImporter(TestCase):
def test_parse_advisory_with_cvss31(self):
# mock1: GHSA-68v6-r6qq-mmq2, CVSS 3.1 score 5.3, no CWEs
data = load_json("collabora_mock1.json")
advisory = parse_advisory(data)
self.assertIsNotNone(advisory)
self.assertEqual(advisory.advisory_id, "GHSA-68v6-r6qq-mmq2")
self.assertIn("CVE-2026-23623", advisory.aliases)
self.assertEqual(len(advisory.severities), 1)
self.assertEqual(advisory.severities[0].value, "5.3")
self.assertIn("CVSS:3.1/", advisory.severities[0].scoring_elements)
self.assertEqual(advisory.weaknesses, [])
self.assertEqual(len(advisory.references), 1)
self.assertIsNotNone(advisory.date_published)

def test_parse_advisory_with_cvss30_and_cwe(self):
# mock2: GHSA-7582-pwfh-3pwr, CVSS 3.0 score 9.0, CWE-79
data = load_json("collabora_mock2.json")
advisory = parse_advisory(data)
self.assertIsNotNone(advisory)
self.assertEqual(advisory.advisory_id, "GHSA-7582-pwfh-3pwr")
self.assertIn("CVE-2023-34088", advisory.aliases)
self.assertEqual(len(advisory.severities), 1)
self.assertEqual(advisory.severities[0].value, "9.0")
self.assertIn("CVSS:3.0/", advisory.severities[0].scoring_elements)
self.assertEqual(advisory.weaknesses, [79])

def test_parse_advisory_missing_ghsa_id_returns_none(self):
advisory = parse_advisory({"cve_id": "CVE-2024-0001", "summary": "test"})
self.assertIsNone(advisory)

def test_parse_advisory_no_cve_id_has_empty_aliases(self):
data = load_json("collabora_mock1.json")
data = dict(data)
data["cve_id"] = None
advisory = parse_advisory(data)
self.assertIsNotNone(advisory)
self.assertEqual(advisory.aliases, [])

def test_parse_advisory_no_cvss_has_empty_severities(self):
data = load_json("collabora_mock1.json")
data = dict(data)
data["cvss_severities"] = {
"cvss_v3": {"vector_string": None, "score": None},
"cvss_v4": None,
}
advisory = parse_advisory(data)
self.assertIsNotNone(advisory)
self.assertEqual(advisory.severities, [])

def test_parse_advisory_multiple_cwes(self):
data = load_json("collabora_mock1.json")
data = dict(data)
data["cwe_ids"] = ["CWE-79", "CWE-89", "CWE-200"]
advisory = parse_advisory(data)
self.assertIsNotNone(advisory)
self.assertEqual(advisory.weaknesses, [79, 89, 200])

def test_parse_advisory_malformed_cwe_skipped(self):
data = load_json("collabora_mock1.json")
data = dict(data)
data["cwe_ids"] = ["CWE-abc", "INVALID", "CWE-79", ""]
advisory = parse_advisory(data)
self.assertIsNotNone(advisory)
self.assertEqual(advisory.weaknesses, [79])

def test_parse_advisory_no_html_url_empty_references(self):
data = load_json("collabora_mock1.json")
data = dict(data)
data["html_url"] = None
advisory = parse_advisory(data)
self.assertIsNotNone(advisory)
self.assertEqual(advisory.references, [])
self.assertEqual(advisory.url, "")

def test_parse_advisory_summary_stored(self):
data = load_json("collabora_mock1.json")
advisory = parse_advisory(data)
self.assertIsNotNone(advisory)
self.assertIsInstance(advisory.summary, str)
self.assertEqual(advisory.summary, data["summary"])

def test_parse_advisory_original_text_is_json(self):
data = load_json("collabora_mock1.json")
advisory = parse_advisory(data)
self.assertIsNotNone(advisory)
parsed = json.loads(advisory.original_advisory_text)
self.assertEqual(parsed["ghsa_id"], data["ghsa_id"])


class TestCollaboraImporterPipeline(TestCase):
def _mock_response(self, data, next_url=None):
resp = MagicMock()
resp.json.return_value = data
resp.raise_for_status.return_value = None
resp.links = {"next": {"url": next_url}} if next_url else {}
return resp

@patch("vulnerabilities.pipelines.v2_importers.collabora_importer.requests.get")
def test_collect_advisories_single_page(self, mock_get):
data = load_json("collabora_mock1.json")
mock_get.return_value = self._mock_response([data])
advisories = list(CollaboraImporterPipeline().collect_advisories())
self.assertEqual(len(advisories), 1)
self.assertEqual(advisories[0].advisory_id, data["ghsa_id"])

@patch("vulnerabilities.pipelines.v2_importers.collabora_importer.requests.get")
def test_collect_advisories_pagination(self, mock_get):
data1 = load_json("collabora_mock1.json")
data2 = load_json("collabora_mock2.json")
mock_get.side_effect = [
self._mock_response([data1], next_url="https://api.github.com/page2"),
self._mock_response([data2]),
]
advisories = list(CollaboraImporterPipeline().collect_advisories())
self.assertEqual(len(advisories), 2)
self.assertEqual(advisories[0].advisory_id, data1["ghsa_id"])
self.assertEqual(advisories[1].advisory_id, data2["ghsa_id"])

@patch("vulnerabilities.pipelines.v2_importers.collabora_importer.requests.get")
def test_collect_advisories_http_error_logs_and_stops(self, mock_get):
mock_get.side_effect = Exception("connection refused")
logger_name = "vulnerabilities.pipelines.v2_importers.collabora_importer"
with self.assertLogs(logger_name, level="ERROR") as cm:
advisories = list(CollaboraImporterPipeline().collect_advisories())
self.assertEqual(advisories, [])
self.assertTrue(any("connection refused" in msg for msg in cm.output))
Loading