Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@

from app.agent.agent_types import LLMStats
from app.agent.experience.work_type import WorkType
from app.agent.linking_and_ranking_pipeline.infer_icatus_activities import InferIcatusActivitiesTool
from app.agent.linking_and_ranking_pipeline.cluster_responsibilities_tool import ClusterResponsibilitiesTool
from .infer_icatus_activities.utils import IcatusClassificationLevel
from .infer_occupation_tool import InferOccupationTool
from .pick_top_skills_tool import PickTopSkillsTool
from .skill_linking_tool import SkillLinkingTool
Expand Down Expand Up @@ -96,6 +98,12 @@ class ExperiencePipelineConfig(BaseModel):
to the skills associated with the occupations found."
"""

icatus_classification_level: IcatusClassificationLevel = IcatusClassificationLevel.FIRST_LEVEL
"""
Default is ClassificationLevel.FIRST_LEVEL
The classification level to use when inferring the icatus activities from the experience title and responsibilities.
"""

model_config = ConfigDict(
extra="forbid"
)
Expand Down Expand Up @@ -143,6 +151,10 @@ def __init__(self, *, config: ExperiencePipelineConfig, search_services: SearchS
self._search_services = search_services
self._cluster_responsibilities_tool = ClusterResponsibilitiesTool()
self._infer_occupations_tool = InferOccupationTool(search_services.occupation_skill_search_service)
self._infer_icatus_activities_tool = InferIcatusActivitiesTool(
search_services.occupation_skill_search_service,
classification_level=config.icatus_classification_level
)
self._skills_linking_tool = SkillLinkingTool(search_services.skill_search_service)
self._top_skills_picker = PickTopSkillsTool()
self._logger = logging.getLogger(__class__.__name__)
Expand Down Expand Up @@ -289,14 +301,26 @@ async def handle_cluster(self, *,
llm_stats = []
# 2.1 Infer the occupations and associated skills

inferred_occupations_response = await self._infer_occupations_tool.execute(experience_title=experience_title,
company=company_name,
work_type=work_type,
responsibilities=responsibilities,
country_of_interest=country_of_interest,
number_of_titles=config.number_of_occupation_alt_titles,
top_k=config.number_of_occupations_per_cluster,
top_p=config.number_of_occupations_candidates_per_title)
if work_type == WorkType.UNSEEN_UNPAID:
inferred_occupations_response = await self._infer_icatus_activities_tool.execute(
experience_title=experience_title,
company=company_name,
work_type=work_type,
responsibilities=responsibilities,
country_of_interest=country_of_interest,
number_of_titles=config.number_of_occupation_alt_titles,
top_k=config.number_of_occupations_per_cluster,
top_p=config.number_of_occupations_candidates_per_title,
)
else:
inferred_occupations_response = await self._infer_occupations_tool.execute(experience_title=experience_title,
company=company_name,
work_type=work_type,
responsibilities=responsibilities,
country_of_interest=country_of_interest,
number_of_titles=config.number_of_occupation_alt_titles,
top_k=config.number_of_occupations_per_cluster,
top_p=config.number_of_occupations_candidates_per_title)
llm_stats.extend(inferred_occupations_response.llm_stats)
occupation_labels = [esco_occupation.occupation.preferredLabel for esco_occupation in inferred_occupations_response.esco_occupations]
# 2.2 Link responsibilities to the associated skills
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .infer_icatus_activities_tool import InferIcatusActivitiesTool
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import json
import logging
from typing import Optional

from textwrap import dedent

from pydantic import BaseModel

from app.agent.agent_types import LLMStats
from app.agent.llm_caller import LLMCaller
from common_libs.llm.generative_models import GeminiGenerativeLLM
from common_libs.llm.models_utils import LLMConfig, MODERATE_TEMPERATURE_GENERATION_CONFIG, JSON_GENERATION_CONFIG

from .utils import IcatusClassificationLevel, IcatusFirstLevelNode, IcatusSecondLevelNode, TopLevelDivision

class ClassificationLLMResponse(BaseModel):
icatus_node: TopLevelDivision | IcatusFirstLevelNode | IcatusSecondLevelNode
llm_stats: list[LLMStats]


class _ClassificationLLMOutput(BaseModel):
reasoning: Optional[str]
dependent: bool
code: str


def _get_prompt(*,
experience_title: str,
responsibilities: list[str],
):
return dedent(""" \
<Input>
'Experience Title': {experience_title}
'Responsibilities': {responsibilities}
</Input>
""").format(
experience_title=experience_title,
responsibilities=json.dumps(responsibilities),
)


class _IcatusClassificationLLM:
def __init__(self,
classification_level: IcatusClassificationLevel,
logger: logging.Logger):
self.classification_level = classification_level
self._llm = GeminiGenerativeLLM(
system_instructions=classification_level.get_prompt(),
# Even if we are generating a JSON output, we still need to set the generation config to MODERATE_TEMPERATURE_GENERATION_CONFIG
# as we want to generate a more creative response.
config=LLMConfig(generation_config=MODERATE_TEMPERATURE_GENERATION_CONFIG | JSON_GENERATION_CONFIG)
)
self._llm_caller: LLMCaller[_ClassificationLLMOutput] = LLMCaller[_ClassificationLLMOutput](
model_response_type=_ClassificationLLMOutput)
self._logger = logger

async def execute(
self, *,
experience_title: str,
responsibilities: list[str],
) -> ClassificationLLMResponse:
"""
Returns a classified Icatus Node depending on the experience title and responsibilities.
"""

prompt = _get_prompt(
experience_title=experience_title,
responsibilities=responsibilities,
)
llm_response, llm_stats = await self._llm_caller.call_llm(
llm=self._llm,
llm_input=prompt,
logger=self._logger
)
if not llm_response.code:
self._logger.warning("Failed to classify the experience.")
raise ValueError("Experience was not classified correctly.")
return ClassificationLLMResponse(
icatus_node=self.classification_level.get_node_from_code(llm_response.code),
llm_stats=llm_stats
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import asyncio
import logging

from app.agent.experience.work_type import WorkType
from app.agent.linking_and_ranking_pipeline.infer_occupation_tool.infer_occupation_tool import InferOccupationTool, InferOccupationToolOutput
from app.countries import Country

from ._icatus_classification_llm import _IcatusClassificationLLM
from ..infer_occupation_tool._contextualization_llm import _ContextualizationLLM
from .utils import IcatusTerminalNode, TopLevelDivision, IcatusClassificationLevel
from app.vector_search.esco_search_service import OccupationSkillSearchService


class InferIcatusActivitiesTool:
def __init__(
self,
occupation_skill_search_service: OccupationSkillSearchService,
classification_level: IcatusClassificationLevel
):
self._logger = logging.getLogger(self.__class__.__name__)
self._occupation_skill_search_service = occupation_skill_search_service
self._infer_occupations_tool = InferOccupationTool(self._occupation_skill_search_service)
self.classification_level = classification_level

async def find_occupations(self, nodes: list[IcatusTerminalNode]):
occupation_data = await asyncio.gather(*(self._occupation_skill_search_service.get_by_esco_code(code=node.code) for node in nodes))
return occupation_data

def find_terminal_icatus_nodes(self, classification_response):
if self.classification_level == IcatusClassificationLevel.TOP_LEVEL and classification_response.icatus_node == TopLevelDivision.VOLUNTEERING: # Volunteering
return [node for node in IcatusTerminalNode if node.value.startswith("I5")]
elif self.classification_level == IcatusClassificationLevel.TOP_LEVEL:
return [node for node in IcatusTerminalNode if not node.value.startswith("I5")]
else:
return classification_response.icatus_node.get_terminal_nodes()

async def execute(self, *,
experience_title: str,
company: str,
work_type: WorkType,
responsibilities: list[str],
country_of_interest: Country,
number_of_titles: int,
top_k: int,
top_p: int,
) -> InferOccupationToolOutput:

# 1. Classify the experience as one of the icatus skills
icatus_classification_llm = _IcatusClassificationLLM(
classification_level=self.classification_level,
logger=self._logger
)
classification_response = await icatus_classification_llm.execute(
experience_title=experience_title,
responsibilities=responsibilities,
)
# 2. Consider different possibilities based on the output

terminal_icatus_nodes = self.find_terminal_icatus_nodes(classification_response)
icatus_occupations = await self.find_occupations(terminal_icatus_nodes)
contextualization_llm = _ContextualizationLLM(
country_of_interest=country_of_interest,
number_of_titles=number_of_titles,
logger=self._logger)
icatus_contextualization_response = await contextualization_llm.execute(
experience_title=experience_title,
company=company,
work_type=work_type,
responsibilities=responsibilities,
number_of_titles=number_of_titles
)
if classification_response.icatus_node.is_volunteering() and len(icatus_occupations)< top_k:
# 2.1 If the classification is volunteering, proceed with the usual inference
# and pre-attaches existing volunteering occupations
esco_occupations = await self._infer_occupations_tool.execute(
experience_title=experience_title,
company=company,
work_type=work_type,
responsibilities=responsibilities,
country_of_interest=country_of_interest,
number_of_titles=number_of_titles,
top_k=top_k - len(terminal_icatus_nodes),
top_p=top_p
)
return InferOccupationToolOutput(
contextual_titles=icatus_contextualization_response.contextual_titles + esco_occupations.contextual_titles,
esco_occupations=icatus_occupations+esco_occupations.esco_occupations,
responsibilities=responsibilities,
llm_stats=classification_response.llm_stats +icatus_contextualization_response.llm_stats
)

# 2.2 If the classification is not volunteering return a pre-fixed list of occupations
# with their contextualized titles

return InferOccupationToolOutput(contextual_titles=icatus_contextualization_response.contextual_titles,
esco_occupations=icatus_occupations,
responsibilities=responsibilities,
llm_stats=classification_response.llm_stats + icatus_contextualization_response.llm_stats
)
Loading