diff --git a/vulnerabilities/api_v2.py b/vulnerabilities/api_v2.py index 74975b819..6e0ab9213 100644 --- a/vulnerabilities/api_v2.py +++ b/vulnerabilities/api_v2.py @@ -9,6 +9,7 @@ from django.db.models import Prefetch +from django.db.models import Q from django_filters import rest_framework as filters from drf_spectacular.utils import OpenApiParameter from drf_spectacular.utils import extend_schema @@ -25,15 +26,9 @@ from rest_framework.reverse import reverse from rest_framework.throttling import AnonRateThrottle -from vulnerabilities.models import AdvisoryReference -from vulnerabilities.models import AdvisorySeverity -from vulnerabilities.models import AdvisoryV2 -from vulnerabilities.models import AdvisoryWeakness from vulnerabilities.models import CodeFix from vulnerabilities.models import CodeFixV2 -from vulnerabilities.models import ImpactedPackage from vulnerabilities.models import Package -from vulnerabilities.models import PackageV2 from vulnerabilities.models import PipelineRun from vulnerabilities.models import PipelineSchedule from vulnerabilities.models import Vulnerability @@ -41,7 +36,6 @@ from vulnerabilities.models import VulnerabilitySeverity from vulnerabilities.models import Weakness from vulnerabilities.throttling import PermissionBasedUserRateThrottle -from vulnerabilities.utils import group_advisories_by_content class CharInFilter(filters.BaseInFilter, filters.CharFilter): @@ -58,16 +52,6 @@ class Meta: fields = ["cwe_id", "name", "description"] -class AdvisoryWeaknessSerializer(serializers.ModelSerializer): - cwe_id = serializers.CharField() - name = serializers.CharField() - description = serializers.CharField() - - class Meta: - model = AdvisoryWeakness - fields = ["cwe_id", "name", "description"] - - class VulnerabilityReferenceV2Serializer(serializers.ModelSerializer): url = serializers.CharField() reference_type = serializers.CharField() @@ -78,29 +62,6 @@ class Meta: fields = ["url", "reference_type", "reference_id"] -class AdvisoryReferenceSerializer(serializers.ModelSerializer): - url = serializers.CharField() - reference_type = serializers.CharField() - reference_id = serializers.CharField() - - class Meta: - model = AdvisoryReference - fields = ["url", "reference_type", "reference_id"] - - -class AdvisorySeveritySerializer(serializers.ModelSerializer): - class Meta: - model = AdvisorySeverity - fields = ["url", "value", "scoring_system", "scoring_elements", "published_at"] - - def to_representation(self, instance): - data = super().to_representation(instance) - published_at = data.get("published_at", None) - if not published_at: - data.pop("published_at") - return data - - class VulnerabilitySeverityV2Serializer(serializers.ModelSerializer): class Meta: model = VulnerabilitySeverity @@ -141,58 +102,6 @@ def get_aliases(self, obj): return [alias.alias for alias in obj.aliases.all()] -class AdvisoryV2Serializer(serializers.ModelSerializer): - aliases = serializers.SerializerMethodField() - weaknesses = AdvisoryWeaknessSerializer(many=True) - references = AdvisoryReferenceSerializer(many=True) - severities = AdvisorySeveritySerializer(many=True) - advisory_id = serializers.CharField(source="avid", read_only=True) - related_ssvc_trees = serializers.SerializerMethodField() - - def get_related_ssvc_trees(self, obj): - related_ssvcs = obj.related_ssvcs.all().select_related("source_advisory") - source_ssvcs = obj.source_ssvcs.all().select_related("source_advisory") - - seen = set() - result = [] - - for ssvc in list(related_ssvcs) + list(source_ssvcs): - key = (ssvc.vector, ssvc.source_advisory_id) - if key in seen: - continue - seen.add(key) - - result.append( - { - "vector": ssvc.vector, - "decision": ssvc.decision, - "options": ssvc.options, - "source_url": ssvc.source_advisory.url, - } - ) - - return result - - class Meta: - model = AdvisoryV2 - fields = [ - "advisory_id", - "url", - "aliases", - "summary", - "severities", - "weaknesses", - "references", - "exploitability", - "weighted_severity", - "risk_score", - "related_ssvc_trees", - ] - - def get_aliases(self, obj): - return [alias.alias for alias in obj.aliases.all()] - - class VulnerabilityListSerializer(serializers.ModelSerializer): url = serializers.SerializerMethodField() @@ -333,107 +242,6 @@ def get_fixing_vulnerabilities(self, obj): return [vuln.vulnerability_id for vuln in obj.fixing_vulnerabilities.all()] -class PackageV3Serializer(serializers.ModelSerializer): - purl = serializers.CharField(source="package_url") - risk_score = serializers.FloatField(read_only=True) - affected_by_vulnerabilities = serializers.SerializerMethodField() - fixing_vulnerabilities = serializers.SerializerMethodField() - next_non_vulnerable_version = serializers.SerializerMethodField() - latest_non_vulnerable_version = serializers.SerializerMethodField() - - class Meta: - model = Package - fields = [ - "purl", - "affected_by_vulnerabilities", - "fixing_vulnerabilities", - "next_non_vulnerable_version", - "latest_non_vulnerable_version", - "risk_score", - ] - - def get_affected_by_vulnerabilities(self, package): - """Return a dictionary with advisory as keys and their details, including fixed_by_packages.""" - impacts = package.affected_in_impacts.select_related("advisory").prefetch_related( - "fixed_by_packages" - ) - - avids = {impact.advisory.avid for impact in impacts if impact.advisory_id} - - latest_advisories = AdvisoryV2.objects.latest_for_avids(avids) - advisory_by_avid = {adv.avid: adv for adv in latest_advisories} - impact_by_avid = {} - - advisories = [] - for impact in impacts: - avid = impact.advisory.avid - advisory = advisory_by_avid.get(avid) - if not advisory: - continue - advisories.append(advisory) - impact_by_avid[avid] = impact - - grouped_advisories = group_advisories_by_content(advisories=advisories) - - advs = [] - - for hash in grouped_advisories: - advs.append(grouped_advisories[hash]) - - result = [] - - for advisory in advs: - primary_advisory = advisory["primary"] - avid = primary_advisory.avid - impact = impact_by_avid.get(avid) - if not impact: - continue - result.append( - { - "advisory_id": primary_advisory.avid, - "fixed_by_packages": [pkg.purl for pkg in impact.fixed_by_packages.all()], - "duplicate_advisory_ids": [adv.avid for adv in advisory["secondary"]], - } - ) - - return result - - def get_fixing_vulnerabilities(self, package): - impacts = package.fixed_in_impacts.select_related("advisory") - - avids = {impact.advisory.avid for impact in impacts if impact.advisory_id} - - latest_advisories = AdvisoryV2.objects.latest_for_avids(avids) - - grouped_advisories = group_advisories_by_content(advisories=latest_advisories) - - advs = [] - - for hash in grouped_advisories: - advs.append(grouped_advisories[hash]) - - result = [] - - for advisory in advs: - primary_advisory = advisory["primary"] - result.append( - { - "advisory_id": primary_advisory.avid, - "duplicate_advisory_ids": [adv.avid for adv in advisory["secondary"]], - } - ) - - return result - - def get_next_non_vulnerable_version(self, package): - if next_non_vulnerable := package.get_non_vulnerable_versions()[0]: - return next_non_vulnerable.version - - def get_latest_non_vulnerable_version(self, package): - if latest_non_vulnerable := package.get_non_vulnerable_versions()[-1]: - return latest_non_vulnerable.version - - class PackageurlListSerializer(serializers.Serializer): purls = serializers.ListField( child=serializers.CharField(), @@ -462,27 +270,6 @@ class PackageV2FilterSet(filters.FilterSet): purl = filters.CharFilter(field_name="package_url") -class AdvisoryPackageV2FilterSet(filters.FilterSet): - affected_by_advisory = filters.CharFilter( - field_name="affected_in_impacts__advisory__avid", - label="Affected By Advisory ID", - help_text="Filter packages affected by a specific Advisory ID.", - ) - - fixing_advisory = filters.CharFilter( - field_name="fixed_in_impacts__advisory__avid", - label="Fixed By Advisory ID", - help_text="Filter packages fixed by a specific Advisory ID.", - ) - - purls = CharInFilter( - field_name="package_url", - lookup_expr="in", - label="Package URL", - help_text="Filter by one or more Package URLs. Multi-value supported (comma-separated).", - ) - - class PackageV2ViewSet(viewsets.ReadOnlyModelViewSet): queryset = Package.objects.all().prefetch_related( Prefetch( @@ -1062,339 +849,3 @@ def get_view_name(self): if self.detail: return "Pipeline Instance" return "Pipeline Jobs" - - -class PackageV3ViewSet(viewsets.ReadOnlyModelViewSet): - queryset = PackageV2.objects.all() - serializer_class = PackageV3Serializer - filter_backends = [filters.DjangoFilterBackend] - filterset_class = AdvisoryPackageV2FilterSet - throttle_classes = [AnonRateThrottle, PermissionBasedUserRateThrottle] - - def get_queryset(self): - return ( - super() - .get_queryset() - .prefetch_related( - Prefetch( - "affected_in_impacts", - queryset=ImpactedPackage.objects.select_related("advisory").prefetch_related( - "fixed_by_packages", - ), - ), - Prefetch( - "fixed_in_impacts", - queryset=ImpactedPackage.objects.select_related("advisory"), - ), - ) - .with_is_vulnerable() - ) - - def list(self, request, *args, **kwargs): - queryset = self.filter_queryset(self.get_queryset()) - page = self.paginate_queryset(queryset) - - packages = page if page is not None else queryset - - avids = set() - - for package in packages: - for impact in package.affected_in_impacts.all(): - if impact.advisory_id: - avids.add(impact.advisory.avid) - - for impact in package.fixed_in_impacts.all(): - if impact.advisory_id: - avids.add(impact.advisory.avid) - - latest_advisories = AdvisoryV2.objects.latest_for_avids(avids) - - advisory_data = {adv.avid: AdvisoryV2Serializer(adv).data for adv in latest_advisories} - - serializer = self.get_serializer(packages, many=True) - - if page is not None: - return self.get_paginated_response( - { - "packages": serializer.data, - "advisories_by_id": advisory_data, - } - ) - - return Response( - { - "packages": serializer.data, - "advisories_by_id": advisory_data, - } - ) - - @extend_schema( - request=PackageurlListSerializer, - responses={200: PackageV2Serializer(many=True)}, - ) - @action( - detail=False, - methods=["post"], - serializer_class=PackageurlListSerializer, - filter_backends=[], - pagination_class=None, - ) - def bulk_lookup(self, request): - """ - Return the response for exact PackageURLs requested for. - """ - serializer = self.serializer_class(data=request.data) - if not serializer.is_valid(): - return Response( - status=status.HTTP_400_BAD_REQUEST, - data={ - "error": serializer.errors, - "message": "A non-empty 'purls' list of PURLs is required.", - }, - ) - - purls = serializer.validated_data.get("purls") - - packages = ( - PackageV2.objects.for_purls(purls) - .prefetch_related( - Prefetch( - "affected_in_impacts", - queryset=ImpactedPackage.objects.select_related("advisory").prefetch_related( - "fixed_by_packages" - ), - ), - Prefetch( - "fixed_in_impacts", - queryset=ImpactedPackage.objects.select_related("advisory"), - ), - ) - .with_is_vulnerable() - ) - - avids = set() - - for package in packages: - for impact in package.affected_in_impacts.all(): - if impact.advisory_id: - avids.add(impact.advisory.avid) - - for impact in package.fixed_in_impacts.all(): - if impact.advisory_id: - avids.add(impact.advisory.avid) - - latest_advisories = AdvisoryV2.objects.latest_for_avids(avids) - - advisory_data = { - adv.avid: AdvisoryV2Serializer(adv, context={"request": request}).data - for adv in latest_advisories - } - - package_data = PackageV3Serializer( - packages, - many=True, - context={"request": request}, - ).data - - return Response( - { - "packages": package_data, - "advisories_by_id": advisory_data, - } - ) - - @extend_schema( - request=PackageBulkSearchRequestSerializer, - responses={200: PackageV2Serializer(many=True)}, - ) - @action( - detail=False, - methods=["post"], - serializer_class=PackageBulkSearchRequestSerializer, - filter_backends=[], - pagination_class=None, - ) - def bulk_search(self, request): - """ - Lookup for vulnerable packages using many Package URLs at once. - """ - serializer = self.serializer_class(data=request.data) - if not serializer.is_valid(): - return Response( - status=status.HTTP_400_BAD_REQUEST, - data={ - "error": serializer.errors, - "message": "A non-empty 'purls' list of PURLs is required.", - }, - ) - - validated_data = serializer.validated_data - purls = validated_data.get("purls") - purl_only = validated_data.get("purl_only", False) - plain_purl = validated_data.get("plain_purl", False) - - if plain_purl: - purl_objects = [PackageURL.from_string(purl) for purl in purls] - plain_purl_objects = [ - PackageURL( - type=purl.type, - namespace=purl.namespace, - name=purl.name, - version=purl.version, - ) - for purl in purl_objects - ] - plain_purls = [str(purl) for purl in plain_purl_objects] - - query = ( - PackageV2.objects.filter(plain_package_url__in=plain_purls) - .order_by("plain_package_url") - .distinct("plain_package_url") - .prefetch_related( - Prefetch( - "affected_in_impacts", - queryset=ImpactedPackage.objects.select_related( - "advisory" - ).prefetch_related( - "fixed_by_packages", - ), - ), - Prefetch( - "fixed_in_impacts", - queryset=ImpactedPackage.objects.select_related("advisory"), - ), - ) - .with_is_vulnerable() - ) - - packages = query - - avids = set() - for package in packages: - for impact in package.affected_in_impacts.all(): - if impact.advisory_id: - avids.add(impact.advisory.avid) - for impact in package.fixed_in_impacts.all(): - if impact.advisory_id: - avids.add(impact.advisory.avid) - - latest_advisories = AdvisoryV2.objects.latest_for_avids(avids) - advisory_data = { - adv.avid: AdvisoryV2Serializer(adv, context={"request": request}).data - for adv in latest_advisories - } - - if not purl_only: - package_data = PackageV3Serializer( - packages, - many=True, - context={"request": request}, - ).data - return Response( - { - "packages": package_data, - "advisories_by_id": advisory_data, - } - ) - - # Using order by and distinct because there will be - # many fully qualified purl for a single plain purl - vulnerable_purls = query.vulnerable().only("plain_package_url") - vulnerable_purls = [str(package.plain_package_url) for package in vulnerable_purls] - return Response(data=vulnerable_purls) - - query = ( - PackageV2.objects.filter(package_url__in=purls) - .order_by("plain_package_url") - .distinct("plain_package_url") - .prefetch_related( - Prefetch( - "affected_in_impacts", - queryset=ImpactedPackage.objects.select_related("advisory").prefetch_related( - "fixed_by_packages", - ), - ), - Prefetch( - "fixed_in_impacts", - queryset=ImpactedPackage.objects.select_related("advisory"), - ), - ) - .with_is_vulnerable() - ) - packages = query - - avids = set() - for package in packages: - for impact in package.affected_in_impacts.all(): - if impact.advisory_id: - avids.add(impact.advisory.avid) - for impact in package.fixed_in_impacts.all(): - if impact.advisory_id: - avids.add(impact.advisory.avid) - - latest_advisories = AdvisoryV2.objects.latest_for_avids(avids) - advisory_data = { - adv.avid: AdvisoryV2Serializer(adv, context={"request": request}).data - for adv in latest_advisories - } - - if not purl_only: - package_data = PackageV3Serializer( - packages, - many=True, - context={"request": request}, - ).data - return Response( - { - "packages": package_data, - "advisories_by_id": advisory_data, - } - ) - - vulnerable_purls = query.vulnerable().only("package_url") - vulnerable_purls = [str(package.package_url) for package in vulnerable_purls] - return Response(data=vulnerable_purls) - - @action(detail=False, methods=["get"]) - def all(self, request): - """ - Return a list of Package URLs of vulnerable packages. - """ - vulnerable_purls = ( - PackageV2.objects.vulnerable() - .only("package_url") - .order_by("package_url") - .distinct() - .values_list("package_url", flat=True) - ) - return Response(vulnerable_purls) - - @extend_schema( - request=LookupRequestSerializer, - responses={200: PackageV2Serializer(many=True)}, - ) - @action( - detail=False, - methods=["post"], - serializer_class=LookupRequestSerializer, - filter_backends=[], - pagination_class=None, - ) - def lookup(self, request): - """ - Return the response for exact PackageURL requested for. - """ - serializer = self.serializer_class(data=request.data) - if not serializer.is_valid(): - return Response( - status=status.HTTP_400_BAD_REQUEST, - data={ - "error": serializer.errors, - "message": "A 'purl' is required.", - }, - ) - validated_data = serializer.validated_data - purl = validated_data.get("purl") - - qs = self.get_queryset().for_purls([purl]).with_is_vulnerable() - return Response(PackageV3Serializer(qs, many=True, context={"request": request}).data) diff --git a/vulnerabilities/api_v3.py b/vulnerabilities/api_v3.py new file mode 100644 index 000000000..14a3effa7 --- /dev/null +++ b/vulnerabilities/api_v3.py @@ -0,0 +1,400 @@ +# +# Copyright (c) nexB Inc. and others. All rights reserved. +# VulnerableCode is a trademark of nexB Inc. +# SPDX-License-Identifier: Apache-2.0 +# See http://www.apache.org/licenses/LICENSE-2.0 for the license text. +# See https://github.com/aboutcode-org/vulnerablecode for support or download. +# See https://aboutcode.org for more information about nexB OSS projects. +# + +from urllib.parse import urlencode + +from django_filters import rest_framework as filters +from packageurl import PackageURL +from rest_framework import serializers +from rest_framework import viewsets +from rest_framework.reverse import reverse +from rest_framework.throttling import AnonRateThrottle + +from vulnerabilities.models import AdvisoryReference +from vulnerabilities.models import AdvisorySeverity +from vulnerabilities.models import AdvisoryV2 +from vulnerabilities.models import AdvisoryWeakness +from vulnerabilities.models import PackageV2 +from vulnerabilities.throttling import PermissionBasedUserRateThrottle +from vulnerabilities.utils import group_advisories_by_content + + +class PackageQuerySerializer(serializers.Serializer): + purls = serializers.ListField( + child=serializers.CharField(), + required=False, + default=list, + ) + details = serializers.BooleanField(default=False) + approximate = serializers.BooleanField(default=False) + + def validate(self, data): + if not data["purls"]: + if data["details"] or data["approximate"]: + raise serializers.ValidationError( + "details and approximate must be false when purls is empty" + ) + return data + + +class AdvisoryQuerySerializer(serializers.Serializer): + purls = serializers.ListField( + child=serializers.CharField(), + required=False, + default=list, + ) + + def validate(self, data): + if not data["purls"]: + raise serializers.ValidationError("purls is required") + return data + + +class AdvisoryReferenceSerializer(serializers.ModelSerializer): + url = serializers.CharField() + reference_type = serializers.CharField() + reference_id = serializers.CharField() + + class Meta: + model = AdvisoryReference + fields = ["url", "reference_type", "reference_id"] + + +class AdvisorySeveritySerializer(serializers.ModelSerializer): + class Meta: + model = AdvisorySeverity + fields = ["url", "value", "scoring_system", "scoring_elements", "published_at"] + + def to_representation(self, instance): + data = super().to_representation(instance) + published_at = data.get("published_at", None) + if not published_at: + data.pop("published_at") + return data + + +class AdvisoryWeaknessSerializer(serializers.ModelSerializer): + cwe_id = serializers.CharField() + name = serializers.CharField() + description = serializers.CharField() + + class Meta: + model = AdvisoryWeakness + fields = ["cwe_id", "name", "description"] + + +class AdvisoryV3Serializer(serializers.ModelSerializer): + aliases = serializers.SerializerMethodField() + weaknesses = AdvisoryWeaknessSerializer(many=True) + references = AdvisoryReferenceSerializer(many=True) + severities = AdvisorySeveritySerializer(many=True) + advisory_id = serializers.CharField(source="avid", read_only=True) + related_ssvc_trees = serializers.SerializerMethodField() + + def get_related_ssvc_trees(self, obj): + related_ssvcs = obj.related_ssvcs.all().select_related("source_advisory") + source_ssvcs = obj.source_ssvcs.all().select_related("source_advisory") + + seen = set() + result = [] + + for ssvc in list(related_ssvcs) + list(source_ssvcs): + key = (ssvc.vector, ssvc.source_advisory_id) + if key in seen: + continue + seen.add(key) + + result.append( + { + "vector": ssvc.vector, + "decision": ssvc.decision, + "options": ssvc.options, + "source_url": ssvc.source_advisory.url, + } + ) + + return result + + class Meta: + model = AdvisoryV2 + fields = [ + "advisory_id", + "url", + "aliases", + "summary", + "severities", + "weaknesses", + "references", + "exploitability", + "weighted_severity", + "risk_score", + "related_ssvc_trees", + ] + + def get_aliases(self, obj): + return [alias.alias for alias in obj.aliases.all()] + + +class PackageV3Serializer(serializers.ModelSerializer): + purl = serializers.CharField(source="package_url") + risk_score = serializers.FloatField(read_only=True) + affected_by_vulnerabilities = serializers.SerializerMethodField() + affected_by_vulnerabilities_url = serializers.SerializerMethodField() + fixing_vulnerabilities = serializers.SerializerMethodField() + fixing_vulnerabilities_url = serializers.SerializerMethodField() + next_non_vulnerable_version = serializers.SerializerMethodField() + latest_non_vulnerable_version = serializers.SerializerMethodField() + + class Meta: + model = PackageV2 + fields = [ + "purl", + "affected_by_vulnerabilities", + "affected_by_vulnerabilities_url", + "fixing_vulnerabilities", + "fixing_vulnerabilities_url", + "next_non_vulnerable_version", + "latest_non_vulnerable_version", + "risk_score", + ] + + def to_representation(self, instance): + data = super().to_representation(instance) + + if data.get("affected_by_vulnerabilities") is None: + data.pop("affected_by_vulnerabilities", None) + else: + data.pop("affected_by_vulnerabilities_url", None) + + if data.get("fixing_vulnerabilities") is None: + data.pop("fixing_vulnerabilities", None) + else: + data.pop("fixing_vulnerabilities_url", None) + + return data + + def get_affected_by_vulnerabilities_url(self, obj): + request = self.context.get("request") + if not request: + return None + + base = reverse("affected-by-advisories-list") + url = request.build_absolute_uri(base) + + return f"{url}?{urlencode({'purl': obj.package_url})}" + + def get_fixing_vulnerabilities_url(self, obj): + request = self.context.get("request") + if not request: + return None + + base = reverse("fixing-advisories-list") + url = request.build_absolute_uri(base) + + return f"{url}?{urlencode({'purl': obj.package_url})}" + + def get_affected_by_vulnerabilities(self, package): + """Return a dictionary with advisory as keys and their details, including fixed_by_packages.""" + advisories_qs = AdvisoryV2.objects.latest_affecting_advisories_for_purl(package.package_url) + + advisories = list(advisories_qs[:101]) + if len(advisories) > 100: + return None + + advisory_by_avid = {adv.avid: adv for adv in advisories} + avids = advisory_by_avid.keys() + + impacts = ( + package.affected_in_impacts.filter(advisory__avid__in=avids) + .select_related("advisory") + .prefetch_related("fixed_by_packages") + ) + + impact_by_avid = {impact.advisory.avid: impact for impact in impacts} + + grouped = group_advisories_by_content(advisories) + + result = [] + for entry in grouped.values(): + primary = entry["primary"] + impact = impact_by_avid.get(primary.avid) + if not impact: + continue + + result.append( + { + "advisory_id": primary.avid, + "fixed_by_packages": [pkg.purl for pkg in impact.fixed_by_packages.all()], + "duplicate_advisory_ids": [a.avid for a in entry["secondary"]], + } + ) + + return result + + def get_fixing_vulnerabilities(self, package): + advisories_qs = AdvisoryV2.objects.latest_fixed_by_advisories_for_purl(package.package_url) + + advisories = list(advisories_qs[:101]) + if len(advisories) > 100: + return None + + advisory_by_avid = {adv.avid: adv for adv in advisories} + avids = advisory_by_avid.keys() + + impacts = ( + package.fixed_in_impacts.filter(advisory__avid__in=avids) + .select_related("advisory") + .prefetch_related("fixed_by_packages") + ) + + impact_by_avid = {impact.advisory.avid: impact for impact in impacts} + + grouped = group_advisories_by_content(advisories) + + result = [] + for entry in grouped.values(): + primary = entry["primary"] + impact = impact_by_avid.get(primary.avid) + if not impact: + continue + + result.append( + { + "advisory_id": primary.avid, + "duplicate_advisory_ids": [a.avid for a in entry["secondary"]], + } + ) + + return result + + def get_next_non_vulnerable_version(self, package): + if next_non_vulnerable := package.get_non_vulnerable_versions()[0]: + return next_non_vulnerable.version + + def get_latest_non_vulnerable_version(self, package): + if latest_non_vulnerable := package.get_non_vulnerable_versions()[-1]: + return latest_non_vulnerable.version + + +class PackageV3ViewSet(viewsets.GenericViewSet): + queryset = PackageV2.objects.all() + serializer_class = PackageV3Serializer + filter_backends = [filters.DjangoFilterBackend] + throttle_classes = [AnonRateThrottle, PermissionBasedUserRateThrottle] + + def create(self, request, *args, **kwargs): + serializer = PackageQuerySerializer(data=request.data) + serializer.is_valid(raise_exception=True) + + purls = serializer.validated_data["purls"] + details = serializer.validated_data["details"] + approximate = serializer.validated_data["approximate"] + + if not purls: + vulnerable_purls = ( + PackageV2.objects.vulnerable() + .only("package_url") + .distinct() + .values_list("package_url", flat=True) + .order_by("package_url") + ) + page = self.paginate_queryset(vulnerable_purls) + return self.get_paginated_response(page) + + plain_purls = None + + if approximate: + plain_purls = [ + str( + PackageURL( + type=p.type, + namespace=p.namespace, + name=p.name, + version=p.version, + ) + ) + for p in map(PackageURL.from_string, purls) + ] + + if not details: + if approximate: + query = ( + PackageV2.objects.filter(plain_package_url__in=plain_purls) + .values_list("plain_package_url", flat=True) + .distinct() + .order_by("plain_package_url") + ) + else: + query = ( + PackageV2.objects.filter(package_url__in=purls) + .values_list("package_url", flat=True) + .distinct() + .order_by("package_url") + ) + + page = self.paginate_queryset(query) + return self.get_paginated_response(page) + + if approximate: + query = ( + PackageV2.objects.filter(plain_package_url__in=plain_purls) + .order_by("plain_package_url") + .distinct("plain_package_url") + ) + else: + query = ( + PackageV2.objects.filter(package_url__in=purls) + .order_by("package_url") + .distinct("package_url") + ) + + page = self.paginate_queryset(query) + serializer = self.get_serializer(page, many=True, context={"request": request}) + return self.get_paginated_response(serializer.data) + + +class AdvisoryV3ViewSet(viewsets.GenericViewSet): + queryset = AdvisoryV2.objects.all() + serializer_class = AdvisoryV3Serializer + filter_backends = [filters.DjangoFilterBackend] + throttle_classes = [AnonRateThrottle, PermissionBasedUserRateThrottle] + + def create(self, request, *args, **kwargs): + serializer = PackageQuerySerializer(data=request.data) + serializer.is_valid(raise_exception=True) + + purls = serializer.validated_data["purls"] + + latest_advisories = AdvisoryV2.objects.latest_advisories_for_purls(purls=purls) + + page = self.paginate_queryset(latest_advisories) + serializer = self.get_serializer(page, many=True, context={"request": request}) + return self.get_paginated_response(serializer.data) + + +class PackageAdvisoriesViewSet(viewsets.ReadOnlyModelViewSet): + serializer_class = AdvisoryV3Serializer + relation = None + throttle_classes = [AnonRateThrottle, PermissionBasedUserRateThrottle] + + def get_queryset(self): + purl = self.request.query_params.get("purl") + + if not purl: + return AdvisoryV2.objects.none() + + return AdvisoryV2.objects.filter(**{self.relation: purl}).latest_per_avid() + + +class FixingAdvisoriesViewSet(PackageAdvisoriesViewSet): + relation = "impacted_packages__fixed_by_packages__package_url" + + +class AffectedByAdvisoriesViewSet(PackageAdvisoriesViewSet): + relation = "impacted_packages__affecting_packages__package_url" diff --git a/vulnerabilities/models.py b/vulnerabilities/models.py index 1981a2861..5ebd79184 100644 --- a/vulnerabilities/models.py +++ b/vulnerabilities/models.py @@ -2876,6 +2876,36 @@ def latest_per_avid(self): def latest_for_avids(self, avids): return self.filter(avid__in=avids).latest_per_avid() + def latest_affecting_advisories_for_purl(self, purl): + return self.filter( + impacted_packages__affecting_packages__package_url=purl + ).latest_per_avid() + + def latest_affecting_advisories_for_purls(self, purls): + return self.filter( + impacted_packages__affecting_packages__package_url__in=purls + ).latest_per_avid() + + def latest_fixed_by_advisories_for_purl(self, purl): + return self.filter(impacted_packages__fixed_by_packages__package_url=purl).latest_per_avid() + + def latest_fixed_by_advisories_for_purls(self, purls): + return self.filter( + impacted_packages__fixed_by_packages__package_url__in=purls + ).latest_per_avid() + + def latest_advisories_for_purl(self, purl): + return self.filter( + Q(impacted_packages__affecting_packages__package_url=purl) + | Q(impacted_packages__fixed_by_packages__package_url=purl) + ).latest_per_avid() + + def latest_advisories_for_purls(self, purls): + return self.filter( + Q(impacted_packages__affecting_packages__package_url__in=purls) + | Q(impacted_packages__fixed_by_packages__package_url__in=purls) + ).latest_per_avid() + class AdvisoryV2(models.Model): """ diff --git a/vulnerablecode/urls.py b/vulnerablecode/urls.py index 49948a3b9..08d1371d7 100644 --- a/vulnerablecode/urls.py +++ b/vulnerablecode/urls.py @@ -20,10 +20,13 @@ from vulnerabilities.api import CPEViewSet from vulnerabilities.api import PackageViewSet from vulnerabilities.api import VulnerabilityViewSet +from vulnerabilities.api_v3 import AdvisoryV3ViewSet +from vulnerabilities.api_v3 import AffectedByAdvisoriesViewSet from vulnerabilities.api_v2 import CodeFixV2ViewSet from vulnerabilities.api_v2 import CodeFixViewSet +from vulnerabilities.api_v3 import FixingAdvisoriesViewSet from vulnerabilities.api_v2 import PackageV2ViewSet -from vulnerabilities.api_v2 import PackageV3ViewSet +from vulnerabilities.api_v3 import PackageV3ViewSet from vulnerabilities.api_v2 import PipelineScheduleV2ViewSet from vulnerabilities.api_v2 import VulnerabilityV2ViewSet from vulnerabilities.views import AdminLoginView @@ -70,6 +73,11 @@ def __init__(self, *args, **kwargs): api_v3_router = OptionalSlashRouter() api_v3_router.register("packages", PackageV3ViewSet, basename="package-v3") +api_v3_router.register("advisories", AdvisoryV3ViewSet, basename="advisory-v3") +api_v3_router.register( + "affected-by-advisories", AffectedByAdvisoriesViewSet, basename="affected-by-advisories" +) +api_v3_router.register("fixing-advisories", FixingAdvisoriesViewSet, basename="fixing-advisories") urlpatterns = [ path("admin/login/", AdminLoginView.as_view(), name="admin-login"),