From bd8588ec1d7fb3bc3fec0e78bd17bf819a4edb42 Mon Sep 17 00:00:00 2001 From: Tushar Goel Date: Wed, 11 Mar 2026 14:45:08 +0530 Subject: [PATCH 1/2] Change API design Signed-off-by: Tushar Goel --- vulnerabilities/api_v2.py | 569 ++++++++++++++------------------------ vulnerabilities/models.py | 30 ++ vulnerablecode/urls.py | 8 + 3 files changed, 240 insertions(+), 367 deletions(-) diff --git a/vulnerabilities/api_v2.py b/vulnerabilities/api_v2.py index 74975b819..8cbd8ccc0 100644 --- a/vulnerabilities/api_v2.py +++ b/vulnerabilities/api_v2.py @@ -8,7 +8,10 @@ # +from urllib.parse import urlencode + from django.db.models import Prefetch +from django.db.models import Q from django_filters import rest_framework as filters from drf_spectacular.utils import OpenApiParameter from drf_spectacular.utils import extend_schema @@ -20,6 +23,7 @@ from rest_framework import viewsets from rest_framework.authentication import SessionAuthentication from rest_framework.decorators import action +from rest_framework.exceptions import MethodNotAllowed from rest_framework.permissions import BasePermission from rest_framework.response import Response from rest_framework.reverse import reverse @@ -141,7 +145,7 @@ def get_aliases(self, obj): return [alias.alias for alias in obj.aliases.all()] -class AdvisoryV2Serializer(serializers.ModelSerializer): +class AdvisoryV3Serializer(serializers.ModelSerializer): aliases = serializers.SerializerMethodField() weaknesses = AdvisoryWeaknessSerializer(many=True) references = AdvisoryReferenceSerializer(many=True) @@ -337,7 +341,9 @@ class PackageV3Serializer(serializers.ModelSerializer): purl = serializers.CharField(source="package_url") risk_score = serializers.FloatField(read_only=True) affected_by_vulnerabilities = serializers.SerializerMethodField() + affected_by_vulnerabilities_url = serializers.SerializerMethodField() fixing_vulnerabilities = serializers.SerializerMethodField() + fixing_vulnerabilities_url = serializers.SerializerMethodField() next_non_vulnerable_version = serializers.SerializerMethodField() latest_non_vulnerable_version = serializers.SerializerMethodField() @@ -346,80 +352,118 @@ class Meta: fields = [ "purl", "affected_by_vulnerabilities", + "affected_by_vulnerabilities_url", "fixing_vulnerabilities", + "fixing_vulnerabilities_url", "next_non_vulnerable_version", "latest_non_vulnerable_version", "risk_score", ] + def to_representation(self, instance): + data = super().to_representation(instance) + + if data.get("affected_by_vulnerabilities") is None: + data.pop("affected_by_vulnerabilities", None) + else: + data.pop("affected_by_vulnerabilities_url", None) + + if data.get("fixing_vulnerabilities") is None: + data.pop("fixing_vulnerabilities", None) + else: + data.pop("fixing_vulnerabilities_url", None) + + return data + + def get_affected_by_vulnerabilities_url(self, obj): + request = self.context.get("request") + if not request: + return None + + base = reverse("affected-by-advisories-list") + url = request.build_absolute_uri(base) + + return f"{url}?{urlencode({'purl': obj.package_url})}" + + def get_fixing_vulnerabilities_url(self, obj): + request = self.context.get("request") + if not request: + return None + + base = reverse("fixing-advisories-list") + url = request.build_absolute_uri(base) + + return f"{url}?{urlencode({'purl': obj.package_url})}" + def get_affected_by_vulnerabilities(self, package): """Return a dictionary with advisory as keys and their details, including fixed_by_packages.""" - impacts = package.affected_in_impacts.select_related("advisory").prefetch_related( - "fixed_by_packages" - ) + advisories_qs = AdvisoryV2.objects.latest_affecting_advisories_for_purl(package.package_url) - avids = {impact.advisory.avid for impact in impacts if impact.advisory_id} + advisories = list(advisories_qs[:101]) + if len(advisories) > 100: + return None - latest_advisories = AdvisoryV2.objects.latest_for_avids(avids) - advisory_by_avid = {adv.avid: adv for adv in latest_advisories} - impact_by_avid = {} - - advisories = [] - for impact in impacts: - avid = impact.advisory.avid - advisory = advisory_by_avid.get(avid) - if not advisory: - continue - advisories.append(advisory) - impact_by_avid[avid] = impact + advisory_by_avid = {adv.avid: adv for adv in advisories} + avids = advisory_by_avid.keys() - grouped_advisories = group_advisories_by_content(advisories=advisories) + impacts = ( + package.affected_in_impacts.filter(advisory__avid__in=avids) + .select_related("advisory") + .prefetch_related("fixed_by_packages") + ) - advs = [] + impact_by_avid = {impact.advisory.avid: impact for impact in impacts} - for hash in grouped_advisories: - advs.append(grouped_advisories[hash]) + grouped = group_advisories_by_content(advisories) result = [] - - for advisory in advs: - primary_advisory = advisory["primary"] - avid = primary_advisory.avid - impact = impact_by_avid.get(avid) + for entry in grouped.values(): + primary = entry["primary"] + impact = impact_by_avid.get(primary.avid) if not impact: continue + result.append( { - "advisory_id": primary_advisory.avid, + "advisory_id": primary.avid, "fixed_by_packages": [pkg.purl for pkg in impact.fixed_by_packages.all()], - "duplicate_advisory_ids": [adv.avid for adv in advisory["secondary"]], + "duplicate_advisory_ids": [a.avid for a in entry["secondary"]], } ) return result def get_fixing_vulnerabilities(self, package): - impacts = package.fixed_in_impacts.select_related("advisory") + advisories_qs = AdvisoryV2.objects.latest_fixed_by_advisories_for_purl(package.package_url) - avids = {impact.advisory.avid for impact in impacts if impact.advisory_id} + advisories = list(advisories_qs[:101]) + if len(advisories) > 100: + return None - latest_advisories = AdvisoryV2.objects.latest_for_avids(avids) + advisory_by_avid = {adv.avid: adv for adv in advisories} + avids = advisory_by_avid.keys() - grouped_advisories = group_advisories_by_content(advisories=latest_advisories) + impacts = ( + package.fixed_in_impacts.filter(advisory__avid__in=avids) + .select_related("advisory") + .prefetch_related("fixed_by_packages") + ) - advs = [] + impact_by_avid = {impact.advisory.avid: impact for impact in impacts} - for hash in grouped_advisories: - advs.append(grouped_advisories[hash]) + grouped = group_advisories_by_content(advisories) result = [] + for entry in grouped.values(): + primary = entry["primary"] + impact = impact_by_avid.get(primary.avid) + if not impact: + continue - for advisory in advs: - primary_advisory = advisory["primary"] result.append( { - "advisory_id": primary_advisory.avid, - "duplicate_advisory_ids": [adv.avid for adv in advisory["secondary"]], + "advisory_id": primary.avid, + "duplicate_advisory_ids": [a.avid for a in entry["secondary"]], } ) @@ -462,27 +506,6 @@ class PackageV2FilterSet(filters.FilterSet): purl = filters.CharFilter(field_name="package_url") -class AdvisoryPackageV2FilterSet(filters.FilterSet): - affected_by_advisory = filters.CharFilter( - field_name="affected_in_impacts__advisory__avid", - label="Affected By Advisory ID", - help_text="Filter packages affected by a specific Advisory ID.", - ) - - fixing_advisory = filters.CharFilter( - field_name="fixed_in_impacts__advisory__avid", - label="Fixed By Advisory ID", - help_text="Filter packages fixed by a specific Advisory ID.", - ) - - purls = CharInFilter( - field_name="package_url", - lookup_expr="in", - label="Package URL", - help_text="Filter by one or more Package URLs. Multi-value supported (comma-separated).", - ) - - class PackageV2ViewSet(viewsets.ReadOnlyModelViewSet): queryset = Package.objects.all().prefetch_related( Prefetch( @@ -1064,337 +1087,149 @@ def get_view_name(self): return "Pipeline Jobs" -class PackageV3ViewSet(viewsets.ReadOnlyModelViewSet): - queryset = PackageV2.objects.all() - serializer_class = PackageV3Serializer - filter_backends = [filters.DjangoFilterBackend] - filterset_class = AdvisoryPackageV2FilterSet - throttle_classes = [AnonRateThrottle, PermissionBasedUserRateThrottle] - - def get_queryset(self): - return ( - super() - .get_queryset() - .prefetch_related( - Prefetch( - "affected_in_impacts", - queryset=ImpactedPackage.objects.select_related("advisory").prefetch_related( - "fixed_by_packages", - ), - ), - Prefetch( - "fixed_in_impacts", - queryset=ImpactedPackage.objects.select_related("advisory"), - ), - ) - .with_is_vulnerable() - ) - - def list(self, request, *args, **kwargs): - queryset = self.filter_queryset(self.get_queryset()) - page = self.paginate_queryset(queryset) - - packages = page if page is not None else queryset - - avids = set() - - for package in packages: - for impact in package.affected_in_impacts.all(): - if impact.advisory_id: - avids.add(impact.advisory.avid) - - for impact in package.fixed_in_impacts.all(): - if impact.advisory_id: - avids.add(impact.advisory.avid) - - latest_advisories = AdvisoryV2.objects.latest_for_avids(avids) - - advisory_data = {adv.avid: AdvisoryV2Serializer(adv).data for adv in latest_advisories} - - serializer = self.get_serializer(packages, many=True) - - if page is not None: - return self.get_paginated_response( - { - "packages": serializer.data, - "advisories_by_id": advisory_data, - } - ) - - return Response( - { - "packages": serializer.data, - "advisories_by_id": advisory_data, - } - ) - - @extend_schema( - request=PackageurlListSerializer, - responses={200: PackageV2Serializer(many=True)}, - ) - @action( - detail=False, - methods=["post"], - serializer_class=PackageurlListSerializer, - filter_backends=[], - pagination_class=None, +class PackageQuerySerializer(serializers.Serializer): + purls = serializers.ListField( + child=serializers.CharField(), + required=False, + default=list, ) - def bulk_lookup(self, request): - """ - Return the response for exact PackageURLs requested for. - """ - serializer = self.serializer_class(data=request.data) - if not serializer.is_valid(): - return Response( - status=status.HTTP_400_BAD_REQUEST, - data={ - "error": serializer.errors, - "message": "A non-empty 'purls' list of PURLs is required.", - }, - ) - - purls = serializer.validated_data.get("purls") - - packages = ( - PackageV2.objects.for_purls(purls) - .prefetch_related( - Prefetch( - "affected_in_impacts", - queryset=ImpactedPackage.objects.select_related("advisory").prefetch_related( - "fixed_by_packages" - ), - ), - Prefetch( - "fixed_in_impacts", - queryset=ImpactedPackage.objects.select_related("advisory"), - ), - ) - .with_is_vulnerable() - ) - - avids = set() - - for package in packages: - for impact in package.affected_in_impacts.all(): - if impact.advisory_id: - avids.add(impact.advisory.avid) + details = serializers.BooleanField(default=False) + approximate = serializers.BooleanField(default=False) + + def validate(self, data): + if not data["purls"]: + if data["details"] or data["approximate"]: + raise serializers.ValidationError( + "details and approximate must be false when purls is empty" + ) + return data - for impact in package.fixed_in_impacts.all(): - if impact.advisory_id: - avids.add(impact.advisory.avid) - latest_advisories = AdvisoryV2.objects.latest_for_avids(avids) +class AdvisoryQuerySerializer(serializers.Serializer): + purls = serializers.ListField( + child=serializers.CharField(), + required=False, + default=list, + ) - advisory_data = { - adv.avid: AdvisoryV2Serializer(adv, context={"request": request}).data - for adv in latest_advisories - } + def validate(self, data): + if not data["purls"]: + raise serializers.ValidationError("purls is required") + return data - package_data = PackageV3Serializer( - packages, - many=True, - context={"request": request}, - ).data - return Response( - { - "packages": package_data, - "advisories_by_id": advisory_data, - } - ) +class PackageV3ViewSet(viewsets.GenericViewSet): + queryset = PackageV2.objects.all() + serializer_class = PackageV3Serializer + filter_backends = [filters.DjangoFilterBackend] + throttle_classes = [AnonRateThrottle, PermissionBasedUserRateThrottle] - @extend_schema( - request=PackageBulkSearchRequestSerializer, - responses={200: PackageV2Serializer(many=True)}, - ) - @action( - detail=False, - methods=["post"], - serializer_class=PackageBulkSearchRequestSerializer, - filter_backends=[], - pagination_class=None, - ) - def bulk_search(self, request): - """ - Lookup for vulnerable packages using many Package URLs at once. - """ - serializer = self.serializer_class(data=request.data) - if not serializer.is_valid(): - return Response( - status=status.HTTP_400_BAD_REQUEST, - data={ - "error": serializer.errors, - "message": "A non-empty 'purls' list of PURLs is required.", - }, + def create(self, request, *args, **kwargs): + serializer = PackageQuerySerializer(data=request.data) + serializer.is_valid(raise_exception=True) + + purls = serializer.validated_data["purls"] + details = serializer.validated_data["details"] + approximate = serializer.validated_data["approximate"] + + if not purls: + vulnerable_purls = ( + PackageV2.objects.vulnerable() + .only("package_url") + .distinct() + .values_list("package_url", flat=True) + .order_by("package_url") ) - - validated_data = serializer.validated_data - purls = validated_data.get("purls") - purl_only = validated_data.get("purl_only", False) - plain_purl = validated_data.get("plain_purl", False) - - if plain_purl: - purl_objects = [PackageURL.from_string(purl) for purl in purls] - plain_purl_objects = [ - PackageURL( - type=purl.type, - namespace=purl.namespace, - name=purl.name, - version=purl.version, + page = self.paginate_queryset(vulnerable_purls) + return self.get_paginated_response(page) + + plain_purls = None + + if approximate: + plain_purls = [ + str( + PackageURL( + type=p.type, + namespace=p.namespace, + name=p.name, + version=p.version, + ) ) - for purl in purl_objects + for p in map(PackageURL.from_string, purls) ] - plain_purls = [str(purl) for purl in plain_purl_objects] + if not details: + if approximate: + query = ( + PackageV2.objects.filter(plain_package_url__in=plain_purls) + .values_list("plain_package_url", flat=True) + .distinct() + .order_by("plain_package_url") + ) + else: + query = ( + PackageV2.objects.filter(package_url__in=purls) + .values_list("package_url", flat=True) + .distinct() + .order_by("package_url") + ) + + page = self.paginate_queryset(query) + return self.get_paginated_response(page) + + if approximate: query = ( PackageV2.objects.filter(plain_package_url__in=plain_purls) .order_by("plain_package_url") .distinct("plain_package_url") - .prefetch_related( - Prefetch( - "affected_in_impacts", - queryset=ImpactedPackage.objects.select_related( - "advisory" - ).prefetch_related( - "fixed_by_packages", - ), - ), - Prefetch( - "fixed_in_impacts", - queryset=ImpactedPackage.objects.select_related("advisory"), - ), - ) - .with_is_vulnerable() + ) + else: + query = ( + PackageV2.objects.filter(package_url__in=purls) + .order_by("package_url") + .distinct("package_url") ) - packages = query + page = self.paginate_queryset(query) + serializer = self.get_serializer(page, many=True, context={"request": request}) + return self.get_paginated_response(serializer.data) - avids = set() - for package in packages: - for impact in package.affected_in_impacts.all(): - if impact.advisory_id: - avids.add(impact.advisory.avid) - for impact in package.fixed_in_impacts.all(): - if impact.advisory_id: - avids.add(impact.advisory.avid) - - latest_advisories = AdvisoryV2.objects.latest_for_avids(avids) - advisory_data = { - adv.avid: AdvisoryV2Serializer(adv, context={"request": request}).data - for adv in latest_advisories - } - if not purl_only: - package_data = PackageV3Serializer( - packages, - many=True, - context={"request": request}, - ).data - return Response( - { - "packages": package_data, - "advisories_by_id": advisory_data, - } - ) +class AdvisoryV3ViewSet(viewsets.GenericViewSet): + queryset = AdvisoryV2.objects.all() + serializer_class = AdvisoryV3Serializer + filter_backends = [filters.DjangoFilterBackend] + throttle_classes = [AnonRateThrottle, PermissionBasedUserRateThrottle] - # Using order by and distinct because there will be - # many fully qualified purl for a single plain purl - vulnerable_purls = query.vulnerable().only("plain_package_url") - vulnerable_purls = [str(package.plain_package_url) for package in vulnerable_purls] - return Response(data=vulnerable_purls) + def create(self, request, *args, **kwargs): + serializer = PackageQuerySerializer(data=request.data) + serializer.is_valid(raise_exception=True) - query = ( - PackageV2.objects.filter(package_url__in=purls) - .order_by("plain_package_url") - .distinct("plain_package_url") - .prefetch_related( - Prefetch( - "affected_in_impacts", - queryset=ImpactedPackage.objects.select_related("advisory").prefetch_related( - "fixed_by_packages", - ), - ), - Prefetch( - "fixed_in_impacts", - queryset=ImpactedPackage.objects.select_related("advisory"), - ), - ) - .with_is_vulnerable() - ) - packages = query + purls = serializer.validated_data["purls"] - avids = set() - for package in packages: - for impact in package.affected_in_impacts.all(): - if impact.advisory_id: - avids.add(impact.advisory.avid) - for impact in package.fixed_in_impacts.all(): - if impact.advisory_id: - avids.add(impact.advisory.avid) - - latest_advisories = AdvisoryV2.objects.latest_for_avids(avids) - advisory_data = { - adv.avid: AdvisoryV2Serializer(adv, context={"request": request}).data - for adv in latest_advisories - } + latest_advisories = AdvisoryV2.objects.latest_advisories_for_purls(purls=purls) - if not purl_only: - package_data = PackageV3Serializer( - packages, - many=True, - context={"request": request}, - ).data - return Response( - { - "packages": package_data, - "advisories_by_id": advisory_data, - } - ) + page = self.paginate_queryset(latest_advisories) + serializer = self.get_serializer(page, many=True, context={"request": request}) + return self.get_paginated_response(serializer.data) - vulnerable_purls = query.vulnerable().only("package_url") - vulnerable_purls = [str(package.package_url) for package in vulnerable_purls] - return Response(data=vulnerable_purls) - @action(detail=False, methods=["get"]) - def all(self, request): - """ - Return a list of Package URLs of vulnerable packages. - """ - vulnerable_purls = ( - PackageV2.objects.vulnerable() - .only("package_url") - .order_by("package_url") - .distinct() - .values_list("package_url", flat=True) - ) - return Response(vulnerable_purls) +class PackageAdvisoriesViewSet(viewsets.ReadOnlyModelViewSet): + serializer_class = AdvisoryV3Serializer + relation = None - @extend_schema( - request=LookupRequestSerializer, - responses={200: PackageV2Serializer(many=True)}, - ) - @action( - detail=False, - methods=["post"], - serializer_class=LookupRequestSerializer, - filter_backends=[], - pagination_class=None, - ) - def lookup(self, request): - """ - Return the response for exact PackageURL requested for. - """ - serializer = self.serializer_class(data=request.data) - if not serializer.is_valid(): - return Response( - status=status.HTTP_400_BAD_REQUEST, - data={ - "error": serializer.errors, - "message": "A 'purl' is required.", - }, - ) - validated_data = serializer.validated_data - purl = validated_data.get("purl") + def get_queryset(self): + purl = self.request.query_params.get("purl") - qs = self.get_queryset().for_purls([purl]).with_is_vulnerable() - return Response(PackageV3Serializer(qs, many=True, context={"request": request}).data) + if not purl: + return AdvisoryV2.objects.none() + + return AdvisoryV2.objects.filter(**{self.relation: purl}).latest_per_avid() + + +class FixingAdvisoriesViewSet(PackageAdvisoriesViewSet): + relation = "impacted_packages__fixed_by_packages__package_url" + + +class AffectedByAdvisoriesViewSet(PackageAdvisoriesViewSet): + relation = "impacted_packages__affecting_packages__package_url" diff --git a/vulnerabilities/models.py b/vulnerabilities/models.py index 1981a2861..5ebd79184 100644 --- a/vulnerabilities/models.py +++ b/vulnerabilities/models.py @@ -2876,6 +2876,36 @@ def latest_per_avid(self): def latest_for_avids(self, avids): return self.filter(avid__in=avids).latest_per_avid() + def latest_affecting_advisories_for_purl(self, purl): + return self.filter( + impacted_packages__affecting_packages__package_url=purl + ).latest_per_avid() + + def latest_affecting_advisories_for_purls(self, purls): + return self.filter( + impacted_packages__affecting_packages__package_url__in=purls + ).latest_per_avid() + + def latest_fixed_by_advisories_for_purl(self, purl): + return self.filter(impacted_packages__fixed_by_packages__package_url=purl).latest_per_avid() + + def latest_fixed_by_advisories_for_purls(self, purls): + return self.filter( + impacted_packages__fixed_by_packages__package_url__in=purls + ).latest_per_avid() + + def latest_advisories_for_purl(self, purl): + return self.filter( + Q(impacted_packages__affecting_packages__package_url=purl) + | Q(impacted_packages__fixed_by_packages__package_url=purl) + ).latest_per_avid() + + def latest_advisories_for_purls(self, purls): + return self.filter( + Q(impacted_packages__affecting_packages__package_url__in=purls) + | Q(impacted_packages__fixed_by_packages__package_url__in=purls) + ).latest_per_avid() + class AdvisoryV2(models.Model): """ diff --git a/vulnerablecode/urls.py b/vulnerablecode/urls.py index 49948a3b9..0fcee200b 100644 --- a/vulnerablecode/urls.py +++ b/vulnerablecode/urls.py @@ -20,8 +20,11 @@ from vulnerabilities.api import CPEViewSet from vulnerabilities.api import PackageViewSet from vulnerabilities.api import VulnerabilityViewSet +from vulnerabilities.api_v2 import AdvisoryV3ViewSet +from vulnerabilities.api_v2 import AffectedByAdvisoriesViewSet from vulnerabilities.api_v2 import CodeFixV2ViewSet from vulnerabilities.api_v2 import CodeFixViewSet +from vulnerabilities.api_v2 import FixingAdvisoriesViewSet from vulnerabilities.api_v2 import PackageV2ViewSet from vulnerabilities.api_v2 import PackageV3ViewSet from vulnerabilities.api_v2 import PipelineScheduleV2ViewSet @@ -70,6 +73,11 @@ def __init__(self, *args, **kwargs): api_v3_router = OptionalSlashRouter() api_v3_router.register("packages", PackageV3ViewSet, basename="package-v3") +api_v3_router.register("advisories", AdvisoryV3ViewSet, basename="advisory-v3") +api_v3_router.register( + "affected-by-advisories", AffectedByAdvisoriesViewSet, basename="affected-by-advisories" +) +api_v3_router.register("fixing-advisories", FixingAdvisoriesViewSet, basename="fixing-advisories") urlpatterns = [ path("admin/login/", AdminLoginView.as_view(), name="admin-login"), From 4b3dfc2907ca23ce3ed0d76c791691a194c0a806 Mon Sep 17 00:00:00 2001 From: Tushar Goel Date: Wed, 11 Mar 2026 15:06:42 +0530 Subject: [PATCH 2/2] Restructure V3 API Signed-off-by: Tushar Goel --- vulnerabilities/api_v2.py | 384 ------------------------------------ vulnerabilities/api_v3.py | 400 ++++++++++++++++++++++++++++++++++++++ vulnerablecode/urls.py | 8 +- 3 files changed, 404 insertions(+), 388 deletions(-) create mode 100644 vulnerabilities/api_v3.py diff --git a/vulnerabilities/api_v2.py b/vulnerabilities/api_v2.py index 8cbd8ccc0..6e0ab9213 100644 --- a/vulnerabilities/api_v2.py +++ b/vulnerabilities/api_v2.py @@ -8,8 +8,6 @@ # -from urllib.parse import urlencode - from django.db.models import Prefetch from django.db.models import Q from django_filters import rest_framework as filters @@ -23,21 +21,14 @@ from rest_framework import viewsets from rest_framework.authentication import SessionAuthentication from rest_framework.decorators import action -from rest_framework.exceptions import MethodNotAllowed from rest_framework.permissions import BasePermission from rest_framework.response import Response from rest_framework.reverse import reverse from rest_framework.throttling import AnonRateThrottle -from vulnerabilities.models import AdvisoryReference -from vulnerabilities.models import AdvisorySeverity -from vulnerabilities.models import AdvisoryV2 -from vulnerabilities.models import AdvisoryWeakness from vulnerabilities.models import CodeFix from vulnerabilities.models import CodeFixV2 -from vulnerabilities.models import ImpactedPackage from vulnerabilities.models import Package -from vulnerabilities.models import PackageV2 from vulnerabilities.models import PipelineRun from vulnerabilities.models import PipelineSchedule from vulnerabilities.models import Vulnerability @@ -45,7 +36,6 @@ from vulnerabilities.models import VulnerabilitySeverity from vulnerabilities.models import Weakness from vulnerabilities.throttling import PermissionBasedUserRateThrottle -from vulnerabilities.utils import group_advisories_by_content class CharInFilter(filters.BaseInFilter, filters.CharFilter): @@ -62,16 +52,6 @@ class Meta: fields = ["cwe_id", "name", "description"] -class AdvisoryWeaknessSerializer(serializers.ModelSerializer): - cwe_id = serializers.CharField() - name = serializers.CharField() - description = serializers.CharField() - - class Meta: - model = AdvisoryWeakness - fields = ["cwe_id", "name", "description"] - - class VulnerabilityReferenceV2Serializer(serializers.ModelSerializer): url = serializers.CharField() reference_type = serializers.CharField() @@ -82,29 +62,6 @@ class Meta: fields = ["url", "reference_type", "reference_id"] -class AdvisoryReferenceSerializer(serializers.ModelSerializer): - url = serializers.CharField() - reference_type = serializers.CharField() - reference_id = serializers.CharField() - - class Meta: - model = AdvisoryReference - fields = ["url", "reference_type", "reference_id"] - - -class AdvisorySeveritySerializer(serializers.ModelSerializer): - class Meta: - model = AdvisorySeverity - fields = ["url", "value", "scoring_system", "scoring_elements", "published_at"] - - def to_representation(self, instance): - data = super().to_representation(instance) - published_at = data.get("published_at", None) - if not published_at: - data.pop("published_at") - return data - - class VulnerabilitySeverityV2Serializer(serializers.ModelSerializer): class Meta: model = VulnerabilitySeverity @@ -145,58 +102,6 @@ def get_aliases(self, obj): return [alias.alias for alias in obj.aliases.all()] -class AdvisoryV3Serializer(serializers.ModelSerializer): - aliases = serializers.SerializerMethodField() - weaknesses = AdvisoryWeaknessSerializer(many=True) - references = AdvisoryReferenceSerializer(many=True) - severities = AdvisorySeveritySerializer(many=True) - advisory_id = serializers.CharField(source="avid", read_only=True) - related_ssvc_trees = serializers.SerializerMethodField() - - def get_related_ssvc_trees(self, obj): - related_ssvcs = obj.related_ssvcs.all().select_related("source_advisory") - source_ssvcs = obj.source_ssvcs.all().select_related("source_advisory") - - seen = set() - result = [] - - for ssvc in list(related_ssvcs) + list(source_ssvcs): - key = (ssvc.vector, ssvc.source_advisory_id) - if key in seen: - continue - seen.add(key) - - result.append( - { - "vector": ssvc.vector, - "decision": ssvc.decision, - "options": ssvc.options, - "source_url": ssvc.source_advisory.url, - } - ) - - return result - - class Meta: - model = AdvisoryV2 - fields = [ - "advisory_id", - "url", - "aliases", - "summary", - "severities", - "weaknesses", - "references", - "exploitability", - "weighted_severity", - "risk_score", - "related_ssvc_trees", - ] - - def get_aliases(self, obj): - return [alias.alias for alias in obj.aliases.all()] - - class VulnerabilityListSerializer(serializers.ModelSerializer): url = serializers.SerializerMethodField() @@ -337,147 +242,6 @@ def get_fixing_vulnerabilities(self, obj): return [vuln.vulnerability_id for vuln in obj.fixing_vulnerabilities.all()] -class PackageV3Serializer(serializers.ModelSerializer): - purl = serializers.CharField(source="package_url") - risk_score = serializers.FloatField(read_only=True) - affected_by_vulnerabilities = serializers.SerializerMethodField() - affected_by_vulnerabilities_url = serializers.SerializerMethodField() - fixing_vulnerabilities = serializers.SerializerMethodField() - fixing_vulnerabilities_url = serializers.SerializerMethodField() - next_non_vulnerable_version = serializers.SerializerMethodField() - latest_non_vulnerable_version = serializers.SerializerMethodField() - - class Meta: - model = Package - fields = [ - "purl", - "affected_by_vulnerabilities", - "affected_by_vulnerabilities_url", - "fixing_vulnerabilities", - "fixing_vulnerabilities_url", - "next_non_vulnerable_version", - "latest_non_vulnerable_version", - "risk_score", - ] - - def to_representation(self, instance): - data = super().to_representation(instance) - - if data.get("affected_by_vulnerabilities") is None: - data.pop("affected_by_vulnerabilities", None) - else: - data.pop("affected_by_vulnerabilities_url", None) - - if data.get("fixing_vulnerabilities") is None: - data.pop("fixing_vulnerabilities", None) - else: - data.pop("fixing_vulnerabilities_url", None) - - return data - - def get_affected_by_vulnerabilities_url(self, obj): - request = self.context.get("request") - if not request: - return None - - base = reverse("affected-by-advisories-list") - url = request.build_absolute_uri(base) - - return f"{url}?{urlencode({'purl': obj.package_url})}" - - def get_fixing_vulnerabilities_url(self, obj): - request = self.context.get("request") - if not request: - return None - - base = reverse("fixing-advisories-list") - url = request.build_absolute_uri(base) - - return f"{url}?{urlencode({'purl': obj.package_url})}" - - def get_affected_by_vulnerabilities(self, package): - """Return a dictionary with advisory as keys and their details, including fixed_by_packages.""" - advisories_qs = AdvisoryV2.objects.latest_affecting_advisories_for_purl(package.package_url) - - advisories = list(advisories_qs[:101]) - if len(advisories) > 100: - return None - - advisory_by_avid = {adv.avid: adv for adv in advisories} - avids = advisory_by_avid.keys() - - impacts = ( - package.affected_in_impacts.filter(advisory__avid__in=avids) - .select_related("advisory") - .prefetch_related("fixed_by_packages") - ) - - impact_by_avid = {impact.advisory.avid: impact for impact in impacts} - - grouped = group_advisories_by_content(advisories) - - result = [] - for entry in grouped.values(): - primary = entry["primary"] - impact = impact_by_avid.get(primary.avid) - if not impact: - continue - - result.append( - { - "advisory_id": primary.avid, - "fixed_by_packages": [pkg.purl for pkg in impact.fixed_by_packages.all()], - "duplicate_advisory_ids": [a.avid for a in entry["secondary"]], - } - ) - - return result - - def get_fixing_vulnerabilities(self, package): - advisories_qs = AdvisoryV2.objects.latest_fixed_by_advisories_for_purl(package.package_url) - - advisories = list(advisories_qs[:101]) - if len(advisories) > 100: - return None - - advisory_by_avid = {adv.avid: adv for adv in advisories} - avids = advisory_by_avid.keys() - - impacts = ( - package.fixed_in_impacts.filter(advisory__avid__in=avids) - .select_related("advisory") - .prefetch_related("fixed_by_packages") - ) - - impact_by_avid = {impact.advisory.avid: impact for impact in impacts} - - grouped = group_advisories_by_content(advisories) - - result = [] - for entry in grouped.values(): - primary = entry["primary"] - impact = impact_by_avid.get(primary.avid) - if not impact: - continue - - result.append( - { - "advisory_id": primary.avid, - "duplicate_advisory_ids": [a.avid for a in entry["secondary"]], - } - ) - - return result - - def get_next_non_vulnerable_version(self, package): - if next_non_vulnerable := package.get_non_vulnerable_versions()[0]: - return next_non_vulnerable.version - - def get_latest_non_vulnerable_version(self, package): - if latest_non_vulnerable := package.get_non_vulnerable_versions()[-1]: - return latest_non_vulnerable.version - - class PackageurlListSerializer(serializers.Serializer): purls = serializers.ListField( child=serializers.CharField(), @@ -1085,151 +849,3 @@ def get_view_name(self): if self.detail: return "Pipeline Instance" return "Pipeline Jobs" - - -class PackageQuerySerializer(serializers.Serializer): - purls = serializers.ListField( - child=serializers.CharField(), - required=False, - default=list, - ) - details = serializers.BooleanField(default=False) - approximate = serializers.BooleanField(default=False) - - def validate(self, data): - if not data["purls"]: - if data["details"] or data["approximate"]: - raise serializers.ValidationError( - "details and approximate must be false when purls is empty" - ) - return data - - -class AdvisoryQuerySerializer(serializers.Serializer): - purls = serializers.ListField( - child=serializers.CharField(), - required=False, - default=list, - ) - - def validate(self, data): - if not data["purls"]: - raise serializers.ValidationError("purls is required") - return data - - -class PackageV3ViewSet(viewsets.GenericViewSet): - queryset = PackageV2.objects.all() - serializer_class = PackageV3Serializer - filter_backends = [filters.DjangoFilterBackend] - throttle_classes = [AnonRateThrottle, PermissionBasedUserRateThrottle] - - def create(self, request, *args, **kwargs): - serializer = PackageQuerySerializer(data=request.data) - serializer.is_valid(raise_exception=True) - - purls = serializer.validated_data["purls"] - details = serializer.validated_data["details"] - approximate = serializer.validated_data["approximate"] - - if not purls: - vulnerable_purls = ( - PackageV2.objects.vulnerable() - .only("package_url") - .distinct() - .values_list("package_url", flat=True) - .order_by("package_url") - ) - page = self.paginate_queryset(vulnerable_purls) - return self.get_paginated_response(page) - - plain_purls = None - - if approximate: - plain_purls = [ - str( - PackageURL( - type=p.type, - namespace=p.namespace, - name=p.name, - version=p.version, - ) - ) - for p in map(PackageURL.from_string, purls) - ] - - if not details: - if approximate: - query = ( - PackageV2.objects.filter(plain_package_url__in=plain_purls) - .values_list("plain_package_url", flat=True) - .distinct() - .order_by("plain_package_url") - ) - else: - query = ( - PackageV2.objects.filter(package_url__in=purls) - .values_list("package_url", flat=True) - .distinct() - .order_by("package_url") - ) - - page = self.paginate_queryset(query) - return self.get_paginated_response(page) - - if approximate: - query = ( - PackageV2.objects.filter(plain_package_url__in=plain_purls) - .order_by("plain_package_url") - .distinct("plain_package_url") - ) - else: - query = ( - PackageV2.objects.filter(package_url__in=purls) - .order_by("package_url") - .distinct("package_url") - ) - - page = self.paginate_queryset(query) - serializer = self.get_serializer(page, many=True, context={"request": request}) - return self.get_paginated_response(serializer.data) - - -class AdvisoryV3ViewSet(viewsets.GenericViewSet): - queryset = AdvisoryV2.objects.all() - serializer_class = AdvisoryV3Serializer - filter_backends = [filters.DjangoFilterBackend] - throttle_classes = [AnonRateThrottle, PermissionBasedUserRateThrottle] - - def create(self, request, *args, **kwargs): - serializer = PackageQuerySerializer(data=request.data) - serializer.is_valid(raise_exception=True) - - purls = serializer.validated_data["purls"] - - latest_advisories = AdvisoryV2.objects.latest_advisories_for_purls(purls=purls) - - page = self.paginate_queryset(latest_advisories) - serializer = self.get_serializer(page, many=True, context={"request": request}) - return self.get_paginated_response(serializer.data) - - -class PackageAdvisoriesViewSet(viewsets.ReadOnlyModelViewSet): - serializer_class = AdvisoryV3Serializer - relation = None - - def get_queryset(self): - purl = self.request.query_params.get("purl") - - if not purl: - return AdvisoryV2.objects.none() - - return AdvisoryV2.objects.filter(**{self.relation: purl}).latest_per_avid() - - -class FixingAdvisoriesViewSet(PackageAdvisoriesViewSet): - relation = "impacted_packages__fixed_by_packages__package_url" - - -class AffectedByAdvisoriesViewSet(PackageAdvisoriesViewSet): - relation = "impacted_packages__affecting_packages__package_url" diff --git a/vulnerabilities/api_v3.py b/vulnerabilities/api_v3.py new file mode 100644 index 000000000..14a3effa7 --- /dev/null +++ b/vulnerabilities/api_v3.py @@ -0,0 +1,400 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +from urllib.parse import urlencode + +from django_filters import rest_framework as filters +from packageurl import PackageURL +from rest_framework import serializers +from rest_framework import viewsets +from rest_framework.reverse import reverse +from rest_framework.throttling import AnonRateThrottle + +from vulnerabilities.models import AdvisoryReference +from vulnerabilities.models import AdvisorySeverity +from vulnerabilities.models import AdvisoryV2 +from vulnerabilities.models import AdvisoryWeakness +from vulnerabilities.models import PackageV2 +from vulnerabilities.throttling import PermissionBasedUserRateThrottle +from vulnerabilities.utils import group_advisories_by_content + + +class PackageQuerySerializer(serializers.Serializer): + purls = serializers.ListField( + child=serializers.CharField(), + required=False, + default=list, + ) + details = serializers.BooleanField(default=False) + approximate = serializers.BooleanField(default=False) + + def validate(self, data): + if not data["purls"]: + if data["details"] or data["approximate"]: + raise serializers.ValidationError( + "details and approximate must be false when purls is empty" + ) + return data + + +class AdvisoryQuerySerializer(serializers.Serializer): + purls = serializers.ListField( + child=serializers.CharField(), + required=False, + default=list, + ) + + def validate(self, data): + if not data["purls"]: + raise serializers.ValidationError("purls is required") + return data + + +class AdvisoryReferenceSerializer(serializers.ModelSerializer): + url = serializers.CharField() + reference_type = serializers.CharField() + reference_id = serializers.CharField() + + class Meta: + model = AdvisoryReference + fields = ["url", "reference_type", "reference_id"] + + +class AdvisorySeveritySerializer(serializers.ModelSerializer): + class Meta: + model = AdvisorySeverity + fields = ["url", "value", "scoring_system", "scoring_elements", "published_at"] + + def to_representation(self, instance): + data = super().to_representation(instance) + published_at = data.get("published_at", None) + if not published_at: + data.pop("published_at") + return data + + +class AdvisoryWeaknessSerializer(serializers.ModelSerializer): + cwe_id = serializers.CharField() + name = serializers.CharField() + description = serializers.CharField() + + class Meta: + model = AdvisoryWeakness + fields = ["cwe_id", "name", "description"] + + +class AdvisoryV3Serializer(serializers.ModelSerializer): + aliases = serializers.SerializerMethodField() + weaknesses = AdvisoryWeaknessSerializer(many=True) + references = AdvisoryReferenceSerializer(many=True) + severities = AdvisorySeveritySerializer(many=True) + advisory_id = serializers.CharField(source="avid", read_only=True) + related_ssvc_trees = serializers.SerializerMethodField() + + def get_related_ssvc_trees(self, obj): + related_ssvcs = obj.related_ssvcs.all().select_related("source_advisory") + source_ssvcs = obj.source_ssvcs.all().select_related("source_advisory") + + seen = set() + result = [] + + for ssvc in list(related_ssvcs) + list(source_ssvcs): + key = (ssvc.vector, ssvc.source_advisory_id) + if key in seen: + continue + seen.add(key) + + result.append( + { + "vector": ssvc.vector, + "decision": ssvc.decision, + "options": ssvc.options, + "source_url": ssvc.source_advisory.url, + } + ) + + return result + + class Meta: + model = AdvisoryV2 + fields = [ + "advisory_id", + "url", + "aliases", + "summary", + "severities", + "weaknesses", + "references", + "exploitability", + "weighted_severity", + "risk_score", + "related_ssvc_trees", + ] + + def get_aliases(self, obj): + return [alias.alias for alias in obj.aliases.all()] + + +class PackageV3Serializer(serializers.ModelSerializer): + purl = serializers.CharField(source="package_url") + risk_score = serializers.FloatField(read_only=True) + affected_by_vulnerabilities = serializers.SerializerMethodField() + affected_by_vulnerabilities_url = serializers.SerializerMethodField() + fixing_vulnerabilities = serializers.SerializerMethodField() + fixing_vulnerabilities_url = serializers.SerializerMethodField() + next_non_vulnerable_version = serializers.SerializerMethodField() + latest_non_vulnerable_version = serializers.SerializerMethodField() + + class Meta: + model = PackageV2 + fields = [ + "purl", + "affected_by_vulnerabilities", + "affected_by_vulnerabilities_url", + "fixing_vulnerabilities", + "fixing_vulnerabilities_url", + "next_non_vulnerable_version", + "latest_non_vulnerable_version", + "risk_score", + ] + + def to_representation(self, instance): + data = super().to_representation(instance) + + if data.get("affected_by_vulnerabilities") is None: + data.pop("affected_by_vulnerabilities", None) + else: + data.pop("affected_by_vulnerabilities_url", None) + + if data.get("fixing_vulnerabilities") is None: + data.pop("fixing_vulnerabilities", None) + else: + data.pop("fixing_vulnerabilities_url", None) + + return data + + def get_affected_by_vulnerabilities_url(self, obj): + request = self.context.get("request") + if not request: + return None + + base = reverse("affected-by-advisories-list") + url = request.build_absolute_uri(base) + + return f"{url}?{urlencode({'purl': obj.package_url})}" + + def get_fixing_vulnerabilities_url(self, obj): + request = self.context.get("request") + if not request: + return None + + base = reverse("fixing-advisories-list") + url = request.build_absolute_uri(base) + + return f"{url}?{urlencode({'purl': obj.package_url})}" + + def get_affected_by_vulnerabilities(self, package): + """Return a dictionary with advisory as keys and their details, including fixed_by_packages.""" + advisories_qs = AdvisoryV2.objects.latest_affecting_advisories_for_purl(package.package_url) + + advisories = list(advisories_qs[:101]) + if len(advisories) > 100: + return None + + advisory_by_avid = {adv.avid: adv for adv in advisories} + avids = advisory_by_avid.keys() + + impacts = ( + package.affected_in_impacts.filter(advisory__avid__in=avids) + .select_related("advisory") + .prefetch_related("fixed_by_packages") + ) + + impact_by_avid = {impact.advisory.avid: impact for impact in impacts} + + grouped = group_advisories_by_content(advisories) + + result = [] + for entry in grouped.values(): + primary = entry["primary"] + impact = impact_by_avid.get(primary.avid) + if not impact: + continue + + result.append( + { + "advisory_id": primary.avid, + "fixed_by_packages": [pkg.purl for pkg in impact.fixed_by_packages.all()], + "duplicate_advisory_ids": [a.avid for a in entry["secondary"]], + } + ) + + return result + + def get_fixing_vulnerabilities(self, package): + advisories_qs = AdvisoryV2.objects.latest_fixed_by_advisories_for_purl(package.package_url) + + advisories = list(advisories_qs[:101]) + if len(advisories) > 100: + return None + + advisory_by_avid = {adv.avid: adv for adv in advisories} + avids = advisory_by_avid.keys() + + impacts = ( + package.fixed_in_impacts.filter(advisory__avid__in=avids) + .select_related("advisory") + .prefetch_related("fixed_by_packages") + ) + + impact_by_avid = {impact.advisory.avid: impact for impact in impacts} + + grouped = group_advisories_by_content(advisories) + + result = [] + for entry in grouped.values(): + primary = entry["primary"] + impact = impact_by_avid.get(primary.avid) + if not impact: + continue + + result.append( + { + "advisory_id": primary.avid, + "duplicate_advisory_ids": [a.avid for a in entry["secondary"]], + } + ) + + return result + + def get_next_non_vulnerable_version(self, package): + if next_non_vulnerable := package.get_non_vulnerable_versions()[0]: + return next_non_vulnerable.version + + def get_latest_non_vulnerable_version(self, package): + if latest_non_vulnerable := package.get_non_vulnerable_versions()[-1]: + return latest_non_vulnerable.version + + +class PackageV3ViewSet(viewsets.GenericViewSet): + queryset = PackageV2.objects.all() + serializer_class = PackageV3Serializer + filter_backends = [filters.DjangoFilterBackend] + throttle_classes = [AnonRateThrottle, PermissionBasedUserRateThrottle] + + def create(self, request, *args, **kwargs): + serializer = PackageQuerySerializer(data=request.data) + serializer.is_valid(raise_exception=True) + + purls = serializer.validated_data["purls"] + details = serializer.validated_data["details"] + approximate = serializer.validated_data["approximate"] + + if not purls: + vulnerable_purls = ( + PackageV2.objects.vulnerable() + .only("package_url") + .distinct() + .values_list("package_url", flat=True) + .order_by("package_url") + ) + page = self.paginate_queryset(vulnerable_purls) + return self.get_paginated_response(page) + + plain_purls = None + + if approximate: + plain_purls = [ + str( + PackageURL( + type=p.type, + namespace=p.namespace, + name=p.name, + version=p.version, + ) + ) + for p in map(PackageURL.from_string, purls) + ] + + if not details: + if approximate: + query = ( + PackageV2.objects.filter(plain_package_url__in=plain_purls) + .values_list("plain_package_url", flat=True) + .distinct() + .order_by("plain_package_url") + ) + else: + query = ( + PackageV2.objects.filter(package_url__in=purls) + .values_list("package_url", flat=True) + .distinct() + .order_by("package_url") + ) + + page = self.paginate_queryset(query) + return self.get_paginated_response(page) + + if approximate: + query = ( + PackageV2.objects.filter(plain_package_url__in=plain_purls) + .order_by("plain_package_url") + .distinct("plain_package_url") + ) + else: + query = ( + PackageV2.objects.filter(package_url__in=purls) + .order_by("package_url") + .distinct("package_url") + ) + + page = self.paginate_queryset(query) + serializer = self.get_serializer(page, many=True, context={"request": request}) + return self.get_paginated_response(serializer.data) + + +class AdvisoryV3ViewSet(viewsets.GenericViewSet): + queryset = AdvisoryV2.objects.all() + serializer_class = AdvisoryV3Serializer + filter_backends = [filters.DjangoFilterBackend] + throttle_classes = [AnonRateThrottle, PermissionBasedUserRateThrottle] + + def create(self, request, *args, **kwargs): + serializer = PackageQuerySerializer(data=request.data) + serializer.is_valid(raise_exception=True) + + purls = serializer.validated_data["purls"] + + latest_advisories = AdvisoryV2.objects.latest_advisories_for_purls(purls=purls) + + page = self.paginate_queryset(latest_advisories) + serializer = self.get_serializer(page, many=True, context={"request": request}) + return self.get_paginated_response(serializer.data) + + +class PackageAdvisoriesViewSet(viewsets.ReadOnlyModelViewSet): + serializer_class = AdvisoryV3Serializer + relation = None + throttle_classes = [AnonRateThrottle, PermissionBasedUserRateThrottle] + + def get_queryset(self): + purl = self.request.query_params.get("purl") + + if not purl: + return AdvisoryV2.objects.none() + + return AdvisoryV2.objects.filter(**{self.relation: purl}).latest_per_avid() + + +class FixingAdvisoriesViewSet(PackageAdvisoriesViewSet): + relation = "impacted_packages__fixed_by_packages__package_url" + + +class AffectedByAdvisoriesViewSet(PackageAdvisoriesViewSet): + relation = "impacted_packages__affecting_packages__package_url" diff --git a/vulnerablecode/urls.py b/vulnerablecode/urls.py index 0fcee200b..08d1371d7 100644 --- a/vulnerablecode/urls.py +++ b/vulnerablecode/urls.py @@ -20,13 +20,13 @@ from vulnerabilities.api import CPEViewSet from vulnerabilities.api import PackageViewSet from vulnerabilities.api import VulnerabilityViewSet -from vulnerabilities.api_v2 import AdvisoryV3ViewSet -from vulnerabilities.api_v2 import AffectedByAdvisoriesViewSet +from vulnerabilities.api_v3 import AdvisoryV3ViewSet +from vulnerabilities.api_v3 import AffectedByAdvisoriesViewSet from vulnerabilities.api_v2 import CodeFixV2ViewSet from vulnerabilities.api_v2 import CodeFixViewSet -from vulnerabilities.api_v2 import FixingAdvisoriesViewSet +from vulnerabilities.api_v3 import FixingAdvisoriesViewSet from vulnerabilities.api_v2 import PackageV2ViewSet -from vulnerabilities.api_v2 import PackageV3ViewSet +from vulnerabilities.api_v3 import PackageV3ViewSet from vulnerabilities.api_v2 import PipelineScheduleV2ViewSet from vulnerabilities.api_v2 import VulnerabilityV2ViewSet from vulnerabilities.views import AdminLoginView