Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 155 additions & 0 deletions src/fosslight_source/_kb_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright (c) 2020 LG Electronics Inc.
# SPDX-License-Identifier: Apache-2.0

import json
import logging
import time
import urllib.error
import urllib.request
from typing import Dict, List

import fosslight_util.constant as constant

logger = logging.getLogger(constant.LOGGER_NAME)

_SCAN_JOB_POLL_INTERVAL_SEC = 1.0
_SCAN_JOB_POLL_MAX_INTERVAL_SEC = 10.0
_SCAN_JOB_REQUEST_TIMEOUT_SEC = 30
_SCAN_JOB_MIN_WAIT_SEC = 300
_SCAN_JOB_PER_HASH_SEC = 35


def _kb_request(
kb_url: str,
path: str,
*,
method: str = "GET",
payload: dict | None = None,
kb_token: str = "",
timeout: int = _SCAN_JOB_REQUEST_TIMEOUT_SEC,
) -> dict:
data = None
if payload is not None:
data = json.dumps(payload).encode("utf-8")
request = urllib.request.Request(f"{kb_url.rstrip('/')}/{path.lstrip('/')}", data=data, method=method)
request.add_header("Accept", "application/json")
if payload is not None:
request.add_header("Content-Type", "application/json")
if kb_token:
request.add_header("Authorization", f"Bearer {kb_token}")

with urllib.request.urlopen(request, timeout=timeout) as response:
body = response.read().decode()
return json.loads(body) if body else {}


def _estimate_job_wait_timeout(file_hash_count: int) -> float:
return float(max(_SCAN_JOB_MIN_WAIT_SEC, file_hash_count * _SCAN_JOB_PER_HASH_SEC))


def _coerce_count(value, default: int) -> int:
if value is None:
return default
try:
count = int(value)
except (TypeError, ValueError):
return default
return count if count >= 0 else default


def fetch_origin_urls_via_scan_job(
file_hashes: List[str],
kb_url: str,
kb_token: str,
) -> Dict[str, str]:
"""
POST /scan/jobs 후 완료될 때까지 polling하여 file_hash -> origin_url 맵을 반환합니다.
"""
unique_hashes = list(dict.fromkeys(h for h in file_hashes if h))
if not unique_hashes:
return {}

create_payload = {"file_hashes": unique_hashes}
try:
created = _kb_request(kb_url, "scan/jobs", method="POST", payload=create_payload, kb_token=kb_token)
except urllib.error.HTTPError as e:
logger.warning(f"KB scan job create failed: HTTP {e.code} {e.reason}")
return {}
except urllib.error.URLError as e:
logger.warning(f"KB scan job create failed: {e}")
return {}
except (json.JSONDecodeError, Exception) as e:
logger.warning(f"KB scan job create failed: {e}")
return {}

job_id = created.get("job_id", "")
if not job_id:
logger.warning("KB scan job create response missing job_id")
return {}

fallback_count = len(unique_hashes)
accepted = _coerce_count(
created.get("accepted"),
_coerce_count(created.get("total"), fallback_count),
)
skipped = _coerce_count(created.get("skipped"), 0)
logger.info(
f"KB scan job created: job_id={job_id}, total={created.get('total', fallback_count)}, "
f"accepted={accepted}, skipped={skipped}"
)
if skipped:
logger.warning(f"KB scan job rate-limited: {skipped} file_hash(es) skipped by server")
if accepted == 0:
return {}

deadline = time.monotonic() + _estimate_job_wait_timeout(accepted)
interval = _SCAN_JOB_POLL_INTERVAL_SEC
origin_urls: Dict[str, str] = {}

while time.monotonic() < deadline:
try:
status = _kb_request(kb_url, f"scan/jobs/{job_id}", kb_token=kb_token)
except urllib.error.HTTPError as e:
if e.code == 404:
logger.warning(f"KB scan job not found: {job_id}")
return {}
logger.warning(f"KB scan job status failed: HTTP {e.code}")
time.sleep(interval)
interval = min(interval * 1.5, _SCAN_JOB_POLL_MAX_INTERVAL_SEC)
continue
except urllib.error.URLError as e:
logger.warning(f"KB scan job status failed: {e}")
time.sleep(interval)
interval = min(interval * 1.5, _SCAN_JOB_POLL_MAX_INTERVAL_SEC)
continue
except (json.JSONDecodeError, Exception) as e:
logger.warning(f"KB scan job status parse failed: {e}")
time.sleep(interval)
interval = min(interval * 1.5, _SCAN_JOB_POLL_MAX_INTERVAL_SEC)
continue

job_status = status.get("status", "")
if job_status == "completed":
for row in status.get("results", []):
if not isinstance(row, dict):
continue
file_hash = row.get("file_hash", "")
if row.get("success") and row.get("output") and file_hash:
origin_urls[file_hash] = row["output"]
logger.info(
f"KB scan job completed: job_id={job_id}, "
f"matched={len(origin_urls)}, failed={status.get('failed', 0)}"
)
return origin_urls

if job_status == "failed":
logger.warning(f"KB scan job failed: job_id={job_id}")
return {}

time.sleep(interval)
interval = min(interval * 1.5, _SCAN_JOB_POLL_MAX_INTERVAL_SEC)

logger.warning(f"KB scan job timed out: job_id={job_id}")
return origin_urls
73 changes: 23 additions & 50 deletions src/fosslight_source/_scan_item.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,7 @@
import os
import logging
import re
import json
import base64
import hashlib
import urllib.request
import urllib.error
import fosslight_util.constant as constant
from fosslight_util.oss_item import FileItem, OssItem, get_checksum_sha1

Expand Down Expand Up @@ -63,7 +59,7 @@ def __init__(self, value: str) -> None:
self.oss_version = ""

self.checksum = get_checksum_sha1(value)
self.kb_origin_url = "" # URL from OSS KB (_get_origin_url_from_md5_hash)
self.kb_origin_url = "" # URL from OSS KB
self.kb_evidence = "" # Evidence from KB API (exact_match or code snippet)

def __del__(self) -> None:
Expand Down Expand Up @@ -124,37 +120,18 @@ def _get_hash(self, path_to_scan: str = "") -> tuple:
logger.debug(f"Failed to compute MD5 for {self.source_name_or_path}: {e}")
return md5_hex, wfp

def _get_origin_url_from_md5_hash(
self, md5_hash: str, wfp: str = "", kb_url: str = DEFAULT_KB_URL, kb_token: str = ""
) -> str:
"""Return origin_url from KB API."""
try:
payload = {"file_hash": md5_hash}
if wfp and wfp.strip():
payload["wfp_base64"] = base64.b64encode(wfp.strip().encode("utf-8")).decode("ascii")
request = urllib.request.Request(
f"{kb_url}query", data=json.dumps(payload).encode('utf-8'), method='POST'
)
request.add_header('Accept', 'application/json')
request.add_header('Content-Type', 'application/json')
if kb_token:
request.add_header('Authorization', f'Bearer {kb_token}')

with urllib.request.urlopen(request, timeout=10) as response:
data = json.loads(response.read().decode())
if isinstance(data, dict):
return_code = data.get('return_code', -1)
if return_code == 0:
output = data.get('output', '')
if output:
return output
except urllib.error.URLError as e:
logger.debug(f"Failed to fetch origin_url from API for MD5 hash {md5_hash}: {e}")
except json.JSONDecodeError as e:
logger.debug(f"Failed to parse API response for MD5 hash {md5_hash}: {e}")
except Exception as e:
logger.debug(f"Error getting origin_url for MD5 hash {md5_hash}: {e}")
return ""
def _apply_kb_origin_url(self, origin_url: str) -> tuple[str, str, str]:
"""KB origin URL을 반영하고 (oss_name, oss_version, download_url)을 반환합니다."""
self.kb_origin_url = origin_url
self.kb_evidence = "exact_match"
extracted_name, extracted_version, repo_url = self._extract_oss_info_from_url(origin_url)
if extracted_name:
self.oss_name = extracted_name
if extracted_version:
self.oss_version = extracted_version
download_url = repo_url if repo_url else origin_url
self.download_location = [download_url]
return self.oss_name, self.oss_version, download_url

def _extract_oss_info_from_url(self, url: str) -> tuple:
"""
Expand Down Expand Up @@ -196,7 +173,9 @@ def _extract_oss_info_from_url(self, url: str) -> tuple:
return "", "", ""

def set_oss_item(
self, path_to_scan: str = "", run_kb: bool = False, kb_url: str = DEFAULT_KB_URL, kb_token: str = ""
self,
path_to_scan: str = "",
kb_origin_urls: dict[str, str] | None = None,
) -> None:
self.oss_items = []
if self.download_location:
Expand All @@ -207,21 +186,15 @@ def set_oss_item(
self.oss_items.append(item)
else:
item = OssItem(self.oss_name, self.oss_version, self.licenses)
if run_kb and not self.is_license_text:
md5_hash, wfp = self._get_hash(path_to_scan)
if kb_origin_urls and not self.is_license_text:
md5_hash = getattr(self, "_cached_kb_md5", "")
if not md5_hash:
md5_hash, _wfp = self._get_hash(path_to_scan)
if md5_hash:
origin_url = self._get_origin_url_from_md5_hash(md5_hash, wfp, kb_url, kb_token)
origin_url = kb_origin_urls.get(md5_hash, "")
if origin_url:
self.kb_origin_url = origin_url
self.kb_evidence = "exact_match"
extracted_name, extracted_version, repo_url = self._extract_oss_info_from_url(origin_url)
if extracted_name:
self.oss_name = extracted_name
if extracted_version:
self.oss_version = extracted_version
download_url = repo_url if repo_url else origin_url
self.download_location = [download_url]
item = OssItem(self.oss_name, self.oss_version, self.licenses, download_url)
oss_name, oss_version, download_url = self._apply_kb_origin_url(origin_url)
item = OssItem(oss_name, oss_version, self.licenses, download_url)

item.copyright = "\n".join(self.copyright)
item.comment = self.comment
Expand Down
71 changes: 54 additions & 17 deletions src/fosslight_source/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from .run_spdx_extractor import get_spdx_downloads
from .run_manifest_extractor import get_manifest_licenses
from ._scan_item import SourceItem, resolve_kb_config
from ._kb_client import fetch_origin_urls_via_scan_job
from fosslight_util.oss_item import ScannerItem
from typing import Tuple
from ._scan_item import is_manifest_file
Expand Down Expand Up @@ -330,6 +331,47 @@ def mark_oss_info_correction_files_as_excluded(scan_results: list) -> None:
item.comment = OSS_INFO_CORRECTION_COMMENT


def _collect_kb_file_hashes(
scancode_result: list,
path_to_scan: str,
excluded_files: set,
hide_progress: bool,
) -> tuple[list[str], list[tuple[SourceItem, str]]]:
"""scancode 결과 및 walk 대상 파일의 MD5 목록과 (extra_item, md5) 후보를 수집합니다."""
file_hashes: list[str] = []
extra_candidates: list[tuple[SourceItem, str]] = []

for item in scancode_result:
if item.is_license_text:
continue
md5_hash, _wfp = item._get_hash(path_to_scan)
if md5_hash:
item._cached_kb_md5 = md5_hash
file_hashes.append(md5_hash)

import tqdm
abs_path_to_scan = os.path.abspath(path_to_scan)
scancode_paths = {item.source_name_or_path for item in scancode_result}

files_to_scan = []
for root, _dirs, files in os.walk(path_to_scan):
for file in files:
files_to_scan.append(os.path.join(root, file))

for file_path in tqdm.tqdm(files_to_scan, desc="KB Hashing", disable=hide_progress):
rel_path = os.path.relpath(file_path, abs_path_to_scan).replace("\\", "/")
if rel_path in scancode_paths or rel_path in excluded_files:
continue
extra_item = SourceItem(rel_path)
md5_hash, _wfp = extra_item._get_hash(path_to_scan)
if md5_hash:
extra_item._cached_kb_md5 = md5_hash
file_hashes.append(md5_hash)
extra_candidates.append((extra_item, md5_hash))

return file_hashes, extra_candidates


def merge_results(
scancode_result: list = [], scanoss_result: list = [], spdx_downloads: dict = {},
path_to_scan: str = "", run_kb: bool = False, manifest_licenses: dict = {},
Expand Down Expand Up @@ -381,30 +423,25 @@ def merge_results(
new_result_item.is_manifest_file = True
scancode_result.append(new_result_item)

kb_origin_urls: dict[str, str] = {}
extra_candidates: list[tuple[SourceItem, str]] = []
if run_kb:
file_hashes, extra_candidates = _collect_kb_file_hashes(
scancode_result, path_to_scan, excluded_files, hide_progress
)
if file_hashes:
kb_origin_urls = fetch_origin_urls_via_scan_job(file_hashes, kb_url, kb_token)

for item in scancode_result:
item.set_oss_item(path_to_scan, run_kb, kb_url, kb_token)
item.set_oss_item(path_to_scan, kb_origin_urls=kb_origin_urls)

# Add OSSItem for files in path_to_scan that are not in scancode_result
# when KB returns an origin URL for their MD5 hash (skip excluded_files)
if run_kb:
import tqdm
abs_path_to_scan = os.path.abspath(path_to_scan)
scancode_paths = {item.source_name_or_path for item in scancode_result}

files_to_scan = []
for root, _dirs, files in os.walk(path_to_scan):
for file in files:
files_to_scan.append(os.path.join(root, file))

for file_path in tqdm.tqdm(files_to_scan, desc="KB Scanning", disable=hide_progress):
rel_path = os.path.relpath(file_path, abs_path_to_scan).replace("\\", "/")
if rel_path in scancode_paths or rel_path in excluded_files:
continue
extra_item = SourceItem(rel_path)
extra_item.set_oss_item(path_to_scan, run_kb, kb_url, kb_token)
for extra_item, _md5_hash in extra_candidates:
extra_item.set_oss_item(path_to_scan, kb_origin_urls=kb_origin_urls)
if extra_item.download_location:
scancode_result.append(extra_item)
scancode_paths.add(rel_path)

return scancode_result

Expand Down
Loading