diff --git a/src/fosslight_source/_kb_client.py b/src/fosslight_source/_kb_client.py new file mode 100644 index 0000000..e299092 --- /dev/null +++ b/src/fosslight_source/_kb_client.py @@ -0,0 +1,155 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright (c) 2020 LG Electronics Inc. +# SPDX-License-Identifier: Apache-2.0 + +import json +import logging +import time +import urllib.error +import urllib.request +from typing import Dict, List + +import fosslight_util.constant as constant + +logger = logging.getLogger(constant.LOGGER_NAME) + +_SCAN_JOB_POLL_INTERVAL_SEC = 1.0 +_SCAN_JOB_POLL_MAX_INTERVAL_SEC = 10.0 +_SCAN_JOB_REQUEST_TIMEOUT_SEC = 30 +_SCAN_JOB_MIN_WAIT_SEC = 300 +_SCAN_JOB_PER_HASH_SEC = 35 + + +def _kb_request( + kb_url: str, + path: str, + *, + method: str = "GET", + payload: dict | None = None, + kb_token: str = "", + timeout: int = _SCAN_JOB_REQUEST_TIMEOUT_SEC, +) -> dict: + data = None + if payload is not None: + data = json.dumps(payload).encode("utf-8") + request = urllib.request.Request(f"{kb_url.rstrip('/')}/{path.lstrip('/')}", data=data, method=method) + request.add_header("Accept", "application/json") + if payload is not None: + request.add_header("Content-Type", "application/json") + if kb_token: + request.add_header("Authorization", f"Bearer {kb_token}") + + with urllib.request.urlopen(request, timeout=timeout) as response: + body = response.read().decode() + return json.loads(body) if body else {} + + +def _estimate_job_wait_timeout(file_hash_count: int) -> float: + return float(max(_SCAN_JOB_MIN_WAIT_SEC, file_hash_count * _SCAN_JOB_PER_HASH_SEC)) + + +def _coerce_count(value, default: int) -> int: + if value is None: + return default + try: + count = int(value) + except (TypeError, ValueError): + return default + return count if count >= 0 else default + + +def fetch_origin_urls_via_scan_job( + file_hashes: List[str], + kb_url: str, + kb_token: str, +) -> Dict[str, str]: + """ + POST /scan/jobs 후 완료될 때까지 polling하여 file_hash -> origin_url 맵을 반환합니다. + """ + unique_hashes = list(dict.fromkeys(h for h in file_hashes if h)) + if not unique_hashes: + return {} + + create_payload = {"file_hashes": unique_hashes} + try: + created = _kb_request(kb_url, "scan/jobs", method="POST", payload=create_payload, kb_token=kb_token) + except urllib.error.HTTPError as e: + logger.warning(f"KB scan job create failed: HTTP {e.code} {e.reason}") + return {} + except urllib.error.URLError as e: + logger.warning(f"KB scan job create failed: {e}") + return {} + except (json.JSONDecodeError, Exception) as e: + logger.warning(f"KB scan job create failed: {e}") + return {} + + job_id = created.get("job_id", "") + if not job_id: + logger.warning("KB scan job create response missing job_id") + return {} + + fallback_count = len(unique_hashes) + accepted = _coerce_count( + created.get("accepted"), + _coerce_count(created.get("total"), fallback_count), + ) + skipped = _coerce_count(created.get("skipped"), 0) + logger.info( + f"KB scan job created: job_id={job_id}, total={created.get('total', fallback_count)}, " + f"accepted={accepted}, skipped={skipped}" + ) + if skipped: + logger.warning(f"KB scan job rate-limited: {skipped} file_hash(es) skipped by server") + if accepted == 0: + return {} + + deadline = time.monotonic() + _estimate_job_wait_timeout(accepted) + interval = _SCAN_JOB_POLL_INTERVAL_SEC + origin_urls: Dict[str, str] = {} + + while time.monotonic() < deadline: + try: + status = _kb_request(kb_url, f"scan/jobs/{job_id}", kb_token=kb_token) + except urllib.error.HTTPError as e: + if e.code == 404: + logger.warning(f"KB scan job not found: {job_id}") + return {} + logger.warning(f"KB scan job status failed: HTTP {e.code}") + time.sleep(interval) + interval = min(interval * 1.5, _SCAN_JOB_POLL_MAX_INTERVAL_SEC) + continue + except urllib.error.URLError as e: + logger.warning(f"KB scan job status failed: {e}") + time.sleep(interval) + interval = min(interval * 1.5, _SCAN_JOB_POLL_MAX_INTERVAL_SEC) + continue + except (json.JSONDecodeError, Exception) as e: + logger.warning(f"KB scan job status parse failed: {e}") + time.sleep(interval) + interval = min(interval * 1.5, _SCAN_JOB_POLL_MAX_INTERVAL_SEC) + continue + + job_status = status.get("status", "") + if job_status == "completed": + for row in status.get("results", []): + if not isinstance(row, dict): + continue + file_hash = row.get("file_hash", "") + if row.get("success") and row.get("output") and file_hash: + origin_urls[file_hash] = row["output"] + logger.info( + f"KB scan job completed: job_id={job_id}, " + f"matched={len(origin_urls)}, failed={status.get('failed', 0)}" + ) + return origin_urls + + if job_status == "failed": + logger.warning(f"KB scan job failed: job_id={job_id}") + return {} + + time.sleep(interval) + interval = min(interval * 1.5, _SCAN_JOB_POLL_MAX_INTERVAL_SEC) + + logger.warning(f"KB scan job timed out: job_id={job_id}") + return origin_urls diff --git a/src/fosslight_source/_scan_item.py b/src/fosslight_source/_scan_item.py index b3d9cd9..8deb036 100644 --- a/src/fosslight_source/_scan_item.py +++ b/src/fosslight_source/_scan_item.py @@ -6,11 +6,7 @@ import os import logging import re -import json -import base64 import hashlib -import urllib.request -import urllib.error import fosslight_util.constant as constant from fosslight_util.oss_item import FileItem, OssItem, get_checksum_sha1 @@ -63,7 +59,7 @@ def __init__(self, value: str) -> None: self.oss_version = "" self.checksum = get_checksum_sha1(value) - self.kb_origin_url = "" # URL from OSS KB (_get_origin_url_from_md5_hash) + self.kb_origin_url = "" # URL from OSS KB self.kb_evidence = "" # Evidence from KB API (exact_match or code snippet) def __del__(self) -> None: @@ -124,37 +120,18 @@ def _get_hash(self, path_to_scan: str = "") -> tuple: logger.debug(f"Failed to compute MD5 for {self.source_name_or_path}: {e}") return md5_hex, wfp - def _get_origin_url_from_md5_hash( - self, md5_hash: str, wfp: str = "", kb_url: str = DEFAULT_KB_URL, kb_token: str = "" - ) -> str: - """Return origin_url from KB API.""" - try: - payload = {"file_hash": md5_hash} - if wfp and wfp.strip(): - payload["wfp_base64"] = base64.b64encode(wfp.strip().encode("utf-8")).decode("ascii") - request = urllib.request.Request( - f"{kb_url}query", data=json.dumps(payload).encode('utf-8'), method='POST' - ) - request.add_header('Accept', 'application/json') - request.add_header('Content-Type', 'application/json') - if kb_token: - request.add_header('Authorization', f'Bearer {kb_token}') - - with urllib.request.urlopen(request, timeout=10) as response: - data = json.loads(response.read().decode()) - if isinstance(data, dict): - return_code = data.get('return_code', -1) - if return_code == 0: - output = data.get('output', '') - if output: - return output - except urllib.error.URLError as e: - logger.debug(f"Failed to fetch origin_url from API for MD5 hash {md5_hash}: {e}") - except json.JSONDecodeError as e: - logger.debug(f"Failed to parse API response for MD5 hash {md5_hash}: {e}") - except Exception as e: - logger.debug(f"Error getting origin_url for MD5 hash {md5_hash}: {e}") - return "" + def _apply_kb_origin_url(self, origin_url: str) -> tuple[str, str, str]: + """KB origin URL을 반영하고 (oss_name, oss_version, download_url)을 반환합니다.""" + self.kb_origin_url = origin_url + self.kb_evidence = "exact_match" + extracted_name, extracted_version, repo_url = self._extract_oss_info_from_url(origin_url) + if extracted_name: + self.oss_name = extracted_name + if extracted_version: + self.oss_version = extracted_version + download_url = repo_url if repo_url else origin_url + self.download_location = [download_url] + return self.oss_name, self.oss_version, download_url def _extract_oss_info_from_url(self, url: str) -> tuple: """ @@ -196,7 +173,9 @@ def _extract_oss_info_from_url(self, url: str) -> tuple: return "", "", "" def set_oss_item( - self, path_to_scan: str = "", run_kb: bool = False, kb_url: str = DEFAULT_KB_URL, kb_token: str = "" + self, + path_to_scan: str = "", + kb_origin_urls: dict[str, str] | None = None, ) -> None: self.oss_items = [] if self.download_location: @@ -207,21 +186,15 @@ def set_oss_item( self.oss_items.append(item) else: item = OssItem(self.oss_name, self.oss_version, self.licenses) - if run_kb and not self.is_license_text: - md5_hash, wfp = self._get_hash(path_to_scan) + if kb_origin_urls and not self.is_license_text: + md5_hash = getattr(self, "_cached_kb_md5", "") + if not md5_hash: + md5_hash, _wfp = self._get_hash(path_to_scan) if md5_hash: - origin_url = self._get_origin_url_from_md5_hash(md5_hash, wfp, kb_url, kb_token) + origin_url = kb_origin_urls.get(md5_hash, "") if origin_url: - self.kb_origin_url = origin_url - self.kb_evidence = "exact_match" - extracted_name, extracted_version, repo_url = self._extract_oss_info_from_url(origin_url) - if extracted_name: - self.oss_name = extracted_name - if extracted_version: - self.oss_version = extracted_version - download_url = repo_url if repo_url else origin_url - self.download_location = [download_url] - item = OssItem(self.oss_name, self.oss_version, self.licenses, download_url) + oss_name, oss_version, download_url = self._apply_kb_origin_url(origin_url) + item = OssItem(oss_name, oss_version, self.licenses, download_url) item.copyright = "\n".join(self.copyright) item.comment = self.comment diff --git a/src/fosslight_source/cli.py b/src/fosslight_source/cli.py index e88e18f..e225196 100755 --- a/src/fosslight_source/cli.py +++ b/src/fosslight_source/cli.py @@ -29,6 +29,7 @@ from .run_spdx_extractor import get_spdx_downloads from .run_manifest_extractor import get_manifest_licenses from ._scan_item import SourceItem, resolve_kb_config +from ._kb_client import fetch_origin_urls_via_scan_job from fosslight_util.oss_item import ScannerItem from typing import Tuple from ._scan_item import is_manifest_file @@ -330,6 +331,47 @@ def mark_oss_info_correction_files_as_excluded(scan_results: list) -> None: item.comment = OSS_INFO_CORRECTION_COMMENT +def _collect_kb_file_hashes( + scancode_result: list, + path_to_scan: str, + excluded_files: set, + hide_progress: bool, +) -> tuple[list[str], list[tuple[SourceItem, str]]]: + """scancode 결과 및 walk 대상 파일의 MD5 목록과 (extra_item, md5) 후보를 수집합니다.""" + file_hashes: list[str] = [] + extra_candidates: list[tuple[SourceItem, str]] = [] + + for item in scancode_result: + if item.is_license_text: + continue + md5_hash, _wfp = item._get_hash(path_to_scan) + if md5_hash: + item._cached_kb_md5 = md5_hash + file_hashes.append(md5_hash) + + import tqdm + abs_path_to_scan = os.path.abspath(path_to_scan) + scancode_paths = {item.source_name_or_path for item in scancode_result} + + files_to_scan = [] + for root, _dirs, files in os.walk(path_to_scan): + for file in files: + files_to_scan.append(os.path.join(root, file)) + + for file_path in tqdm.tqdm(files_to_scan, desc="KB Hashing", disable=hide_progress): + rel_path = os.path.relpath(file_path, abs_path_to_scan).replace("\\", "/") + if rel_path in scancode_paths or rel_path in excluded_files: + continue + extra_item = SourceItem(rel_path) + md5_hash, _wfp = extra_item._get_hash(path_to_scan) + if md5_hash: + extra_item._cached_kb_md5 = md5_hash + file_hashes.append(md5_hash) + extra_candidates.append((extra_item, md5_hash)) + + return file_hashes, extra_candidates + + def merge_results( scancode_result: list = [], scanoss_result: list = [], spdx_downloads: dict = {}, path_to_scan: str = "", run_kb: bool = False, manifest_licenses: dict = {}, @@ -381,30 +423,25 @@ def merge_results( new_result_item.is_manifest_file = True scancode_result.append(new_result_item) + kb_origin_urls: dict[str, str] = {} + extra_candidates: list[tuple[SourceItem, str]] = [] + if run_kb: + file_hashes, extra_candidates = _collect_kb_file_hashes( + scancode_result, path_to_scan, excluded_files, hide_progress + ) + if file_hashes: + kb_origin_urls = fetch_origin_urls_via_scan_job(file_hashes, kb_url, kb_token) + for item in scancode_result: - item.set_oss_item(path_to_scan, run_kb, kb_url, kb_token) + item.set_oss_item(path_to_scan, kb_origin_urls=kb_origin_urls) # Add OSSItem for files in path_to_scan that are not in scancode_result # when KB returns an origin URL for their MD5 hash (skip excluded_files) if run_kb: - import tqdm - abs_path_to_scan = os.path.abspath(path_to_scan) - scancode_paths = {item.source_name_or_path for item in scancode_result} - - files_to_scan = [] - for root, _dirs, files in os.walk(path_to_scan): - for file in files: - files_to_scan.append(os.path.join(root, file)) - - for file_path in tqdm.tqdm(files_to_scan, desc="KB Scanning", disable=hide_progress): - rel_path = os.path.relpath(file_path, abs_path_to_scan).replace("\\", "/") - if rel_path in scancode_paths or rel_path in excluded_files: - continue - extra_item = SourceItem(rel_path) - extra_item.set_oss_item(path_to_scan, run_kb, kb_url, kb_token) + for extra_item, _md5_hash in extra_candidates: + extra_item.set_oss_item(path_to_scan, kb_origin_urls=kb_origin_urls) if extra_item.download_location: scancode_result.append(extra_item) - scancode_paths.add(rel_path) return scancode_result