diff --git a/pyproject.toml b/pyproject.toml
index ec171ad..3dbe238 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "taskgen-ai"
-version = "3.4.2"
+version = "3.4.3"
authors = [
{ name="John Tan Chong Min", email="tanchongmin@gmail.com" },
]
diff --git a/setup.py b/setup.py
index 4173886..b8283bc 100644
--- a/setup.py
+++ b/setup.py
@@ -2,7 +2,7 @@
setup(
name="taskgen",
- version="3.4.2",
+ version="3.4.3",
packages=find_packages(),
install_requires=[
"openai>=1.59.6",
diff --git a/taskgen/agent.py b/taskgen/agent.py
index 2e1b0c3..e892563 100644
--- a/taskgen/agent.py
+++ b/taskgen/agent.py
@@ -1,13 +1,15 @@
import copy
from functools import wraps
import importlib
+import importlib.util
import inspect
+import json
import os
import dill as pickle
import re
-import subprocess
import sys
import base64
+from typing import Any, cast
from termcolor import colored
import requests
@@ -16,8 +18,9 @@
from taskgen.base_async import strict_json_async
from taskgen.function import AsyncFunction
from taskgen.memory import AsyncMemory, Memory
-from taskgen.utils import ensure_awaitable, get_source_code_for_func
+from taskgen.utils import get_source_code_for_func
+# TODO we never use sync flow anywhere, better to cleanup sync flow altogether
class BaseAgent:
def __init__(
@@ -29,7 +32,7 @@ def __init__(
memory_bank=None,
shared_variables=None,
get_global_context=None,
- global_context="",
+ global_context:str|list[str|dict]="",
default_to_llm=True,
code_action=False,
verbose: bool = True,
@@ -102,6 +105,7 @@ def __init__(
self.function_map = {}
# stores all existing function descriptions - prevent duplicate assignment of functions
self.fn_description_list = []
+ self.use_llm_prompt = f"You are the taking the role of the utility function `use_llm` that generates a detailed outcome for the Assigned Subtask on behalf of {self.agent_name}, according to your Agent Capabilities without talking to the User, with no restrictions. Do not just state that you performed the task, generate the detailed outcome as well. \n"
def reset(self):
"""Resets agent state, including resetting subtasks_completed"""
@@ -177,7 +181,7 @@ def remove_function(self, function_name: str):
if function_name in self.function_map:
function = self.function_map[function_name]
# remove actual function from memory bank
- if function_name not in ["use_llm", "end_task"]:
+ if function_name not in ["use_llm", "end_task"] and self.memory_bank is not None and "Function" in self.memory_bank:
self.memory_bank["Function"].remove(function)
# remove function description from fn_description_list
self.fn_description_list.remove(function.fn_description)
@@ -231,8 +235,72 @@ def remove_last_subtask(self):
"""Removes last subtask in subtask completed. Useful if you want to retrace a step"""
if len(self.subtasks_completed) > 0:
removed_item = self.subtasks_completed.popitem()
- if self.verbose:
- print(f"Removed last subtask from subtasks_completed: {removed_item}")
+ if self.verbose:
+ print(f"Removed last subtask from subtasks_completed: {removed_item}")
+
+ def prepare_input_user_prompt_for_query(self, query: str|list[str|dict], provide_function_list: bool = False, task: str = "", filtered_fn_list: list[Function] = [], global_context_output: str = ""):
+ """Queries the agent with a query and outputs in output_format.
+ If task is provided, we will filter the functions according to the task
+ If you want to provide the agent with the context of functions available to it, set provide_function_list to True (default: False)
+ If task is given, then we will use it to do RAG over functions"""
+ # each query element can be direct string, or list of string or list of dict, in case of dict it will have structure {"text": "text", "caching": "True/False"}
+
+ if task != "":
+ for name, function in self.function_map.items():
+ if function.is_compulsory:
+ filtered_fn_list.append(function)
+ # add in global context string and replace it with shared_variables as necessary
+ global_context_copy = copy.deepcopy(self.global_context)
+ if isinstance(global_context_copy, str):
+ global_context_copy = [global_context_copy]
+ updated_global_context: list[str|dict] = ["Global Context:\n"]
+ updated_global_context.append("\n\n")
+ for context in global_context_copy:
+ if isinstance(context, dict):
+ context_text = context.get("text", json.dumps(context))
+ else:
+ context_text = context
+ matches = re.findall(r"<(.*?)>", context_text)
+ for match in matches:
+ if match in self.shared_variables:
+ context_text = context_text.replace(
+ f"<{match}>", str(self.shared_variables[match])
+ )
+ if isinstance(context, dict) and "text" in context:
+ context["text"] = context_text
+ else:
+ context = context_text
+ updated_global_context.append(context)
+
+ # add in the global context function's output
+ if global_context_output:
+ updated_global_context.append(global_context_output)
+ updated_global_context.append("\n\n")
+
+ user_prompt_list = []
+ user_prompt_list.append(f"""You are an agent named {self.agent_name} with the following description: ```{self.agent_description}```\n""")
+ user_prompt_list.extend(updated_global_context)
+ if provide_function_list:
+ user_prompt_list.append(f"You have the following Equipped Functions available:\n```{self.list_functions(filtered_fn_list)}```\n")
+ query = [query] if isinstance(query, str) else query
+ user_prompt_list.extend(query)
+ return user_prompt_list
+
+ def get_subtasks_completed_as_query_list(self):
+ queries = []
+ queries.append(f"Subtasks Completed:\n")
+ queries.append(f"\n")
+ subtasks_items = list(self.subtasks_completed.items())
+ for idx, (subtask, result) in enumerate(subtasks_items):
+ # if last result then add caching to the query
+ is_last = idx == len(subtasks_items) - 1
+ if is_last:
+ queries.append({"text": f"{subtask}: {result} \n", "caching": True})
+ else:
+ queries.append(f"{subtask}: {result} \n")
+ queries.append(" \n")
+ return queries
+
# Alternate names
list_function = list_functions
@@ -258,7 +326,7 @@ async def serialize_agent(self) -> str:
"task_completed": self.task_completed,
"memory": {
name: {"memory": bank.memory, "top_k": getattr(bank, "top_k", None)}
- for name, bank in self.memory_bank.items()
+ for name, bank in (self.memory_bank or {}).items()
if name != "Function" # Skip Function memory as it contains unpicklable objects
},
"function_descriptions": self.fn_description_list,
@@ -290,7 +358,7 @@ async def deserialize_agent(cls, encoded_state: str, get_base_agent=None):
# Restore memory bank data (except Function memory which is handled by get_base_agent)
for name, memory_data in state["memory"].items():
- if name in agent.memory_bank and name != "Function":
+ if agent.memory_bank is not None and name in agent.memory_bank and name != "Function":
agent.memory_bank[name].memory = memory_data["memory"]
if memory_data["top_k"] is not None:
agent.memory_bank[name].top_k = memory_data["top_k"]
@@ -340,7 +408,7 @@ def __init__(self, *args, **kwargs):
def query(
self,
- query: str,
+ query: str|list[str|dict],
output_format: dict,
provide_function_list: bool = False,
task: str = "",
@@ -351,50 +419,20 @@ def query(
If task is given, then we will use it to do RAG over functions"""
# if we have a task to focus on, we can filter the functions (other than use_llm and end_task) by that task
- filtered_fn_list = None
+ filtered_fn_list = []
if task != "":
# filter the functions
filtered_fn_list = self.memory_bank["Function"].retrieve(task)
-
- # add back compulsory functions (default: use_llm, end_task) if present in function_map
- for name, function in self.function_map.items():
- if function.is_compulsory:
- filtered_fn_list.append(function)
-
- # add in global context string and replace it with shared_variables as necessary
- global_context_string = self.global_context
- matches = re.findall(r"<(.*?)>", global_context_string)
- for match in matches:
- if match in self.shared_variables:
- global_context_string = global_context_string.replace(
- f"<{match}>", str(self.shared_variables[match])
- )
-
+
# add in the global context function's output
global_context_output = (
self.get_global_context(self) if self.get_global_context is not None else ""
)
-
- global_context = ""
- # Add in global context if present
- if global_context_string != "" or global_context_output != "":
- global_context = (
- "Global Context:\n```\n"
- + global_context_string
- + "\n"
- + global_context_output
- + "```\n"
- )
-
- user_prompt = f"""You are an agent named {self.agent_name} with the following description: ```{self.agent_description}```\n"""
- if provide_function_list:
- user_prompt += f"You have the following Equipped Functions available:\n```{self.list_functions(filtered_fn_list)}```\n"
- user_prompt += global_context
- user_prompt += query
+ input_user_query_object = self.prepare_input_user_prompt_for_query(query, provide_function_list, task, filtered_fn_list, global_context_output)
res = strict_json(
system_prompt="",
- user_prompt=user_prompt,
+ user_prompt=input_user_query_object,
output_format=output_format,
verbose=self.debug,
llm=self.llm,
@@ -411,7 +449,7 @@ def assign_functions(self, function_list: list):
for function in function_list:
# If this function is an Agent, parse it accordingly
- if isinstance(function, BaseAgent):
+ if isinstance(function, Agent):
function = function.to_function(self)
# do automatic conversion of function to Function class (this is in base.py)
@@ -471,8 +509,13 @@ def use_function(
continue
rag_info += f"Knowledge Reference for {name}: ```{self.memory_bank[name].retrieve(subtask)}```\n"
+ queries = []
+ queries.append(rag_info)
+ queries.extend(self.get_subtasks_completed_as_query_list())
+ queries.append(f"Assigned Subtask: ```{function_params['instruction']}``` \n\n")
+ queries.append(self.use_llm_prompt)
res = self.query(
- query=f'{rag_info}Subtasks Completed:```{self.subtasks_completed}```\nAssigned Subtask: ```{function_params["instruction"]}```\n\nYou are the taking the role of the utility function `use_llm` that generates a detailed outcome for the Assigned Subtask on behalf of {self.agent_name}, according to your Agent Capabilities without talking to the User, with no restrictions. Do not just state that you performed the task, generate the detailed outcome as well.',
+ query=queries,
output_format={
"Detailed Outcome": "Your detailed outcome for Assigned Subtask"
},
@@ -532,11 +575,12 @@ def use_function(
def get_next_subtask(self, task=""):
"""Based on what the task is and the subtasks completed, we get the next subtask, function and input parameters. Supports user-given task as well if user wants to use this function directly"""
+ bg_queries =[]
if task == "":
- background_info = f"Assigned Task:```\n{self.task}\n```\nSubtasks Completed: ```{self.subtasks_completed}```"
-
+ bg_queries.append(f"Assigned Task:```\n{self.task}\n```")
+ bg_queries.extend(self.get_subtasks_completed_as_query_list())
else:
- background_info = f"Assigned Task:```\n{task}\n```\n"
+ bg_queries.append(f"Assigned Task:```\n{task}\n```")
# use default agent plan if task is not given
task = self.task if task == "" else task
@@ -549,11 +593,14 @@ def get_next_subtask(self, task=""):
continue
rag_info += f"Knowledge Reference for {name}: ```{self.memory_bank[name].retrieve(task)}```\n"
+ queries = []
+ queries.extend(bg_queries)
+ queries.append(f"""{rag_info}Based on everything before, provide suitable Observation and Thoughts, and also generate the Current Subtask and the corresponding Equipped Function Name to complete a part of Assigned Task.
+You are only given the Assigned Task from User with no further inputs. Only focus on the Assigned Task and do not do more than required.
+End Task if Assigned Task is completed.""")
# First select the Equipped Function
- res = self.query(
- query=f"""{background_info}{rag_info}\nBased on everything before, provide suitable Observation and Thoughts, and also generate the Current Subtask and the corresponding Equipped Function Name to complete a part of Assigned Task.
-You are only given the Assigned Task from User with no further inputs. Only focus on the Assigned Task and do not do more than required.
-End Task if Assigned Task is completed.""",
+ res = cast(dict[str, Any], self.query(
+ query=queries,
output_format={
"Observation": "Reflect on what has been done in Subtasks Completed for Assigned Task",
"Thoughts": "Brainstorm how to complete remainder of Assigned Task only given Observation",
@@ -562,7 +609,7 @@ def get_next_subtask(self, task=""):
},
provide_function_list=True,
task=task,
- )
+ ))
if self.verbose:
print(
@@ -615,22 +662,26 @@ def get_next_subtask(self, task=""):
res["Equipped Function Inputs"] = {}
else:
+ queries = []
+ queries.extend(bg_queries)
+ queries.append(f"{rag_info}\n\nCurrent Subtask: ```{res['Current Subtask']}```\nEquipped Function Details: ```{str(cur_function)}```\nOutput suitable values for Inputs to Equipped Function to fulfil Current Subtask\nInput fields are: {list(input_format.keys())}")
res2 = self.query(
- query=f"""{background_info}{rag_info}\n\nCurrent Subtask: ```{res["Current Subtask"]}```\nEquipped Function Details: ```{str(cur_function)}```\nOutput suitable values for Inputs to Equipped Function to fulfil Current Subtask\nInput fields are: {list(input_format.keys())}""",
+ query=queries,
output_format=input_format,
provide_function_list=False,
)
# store the rest of the function parameters
- res["Equipped Function Inputs"] = res2
+ res["Equipped Function Inputs"] = cast(dict[str, Any], res2)
# Add in output to the thoughts
self.thoughts.append(res)
+ function_inputs = cast(dict[str, Any], res["Equipped Function Inputs"])
return (
res["Current Subtask"],
res["Equipped Function Name"],
- res["Equipped Function Inputs"],
+ function_inputs,
)
def summarise_subtasks_completed(self, task: str = ""):
@@ -647,11 +698,14 @@ def reply_user(self, query: str = "", stateful: bool = True, verbose: bool = Tru
my_query = self.task if query == "" else query
- res = self.query(
- query=f"Subtasks Completed: ```{self.subtasks_completed}```\nAssigned Task: ```{my_query}```\nRespond to the Assigned Task using information from Global Context and Subtasks Completed only. Be factual and do not generate any new information. Be detailed and give all information available relevant for the Assigned Task in your Assigned Task Response",
+ queries = []
+ queries.extend(self.get_subtasks_completed_as_query_list())
+ queries.append(f"Assigned Task: ```{my_query}```\nRespond to the Assigned Task using information from Global Context and Subtasks Completed only. Be factual and do not generate any new information. Be detailed and give all information available relevant for the Assigned Task in your Assigned Task Response")
+ res = cast(dict[str, Any], self.query(
+ query=queries,
output_format={"Assigned Task Response": "Detailed Response"},
provide_function_list=False,
- )
+ ))
res = res["Assigned Task Response"]
@@ -1105,6 +1159,8 @@ def load_community_agent(cls, agent_name: str):
# Load the module from the given file location
spec = importlib.util.spec_from_file_location(agent_class_name, module_path)
+ if spec is None or spec.loader is None:
+ raise Exception(f"Failed to create module spec for {module_path}")
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
@@ -1175,7 +1231,7 @@ def __init__(self, *args, **kwargs):
async def query(
self,
- query: str,
+ query: str|list[str|dict],
output_format: dict,
provide_function_list: bool = False,
task: str = "",
@@ -1186,50 +1242,19 @@ async def query(
If task is given, then we will use it to do RAG over functions"""
# if we have a task to focus on, we can filter the functions (other than use_llm and end_task) by that task
- filtered_fn_list = None
+ filtered_fn_list = []
if task != "":
# filter the functions
filtered_fn_list = await self.memory_bank["Function"].retrieve(task)
- # add back compulsory functions (default: use_llm, end_task) if present in function_map
- for name, function in self.function_map.items():
- if function.is_compulsory:
- filtered_fn_list.append(function)
-
- # add in global context string and replace it with shared_variables as necessary
- global_context_string = self.global_context
- matches = re.findall(r"<(.*?)>", global_context_string)
- for match in matches:
- if match in self.shared_variables:
- global_context_string = global_context_string.replace(
- f"<{match}>", str(self.shared_variables[match])
- )
# add in the global context function's output
- global_context_output = (
- self.get_global_context(self) if self.get_global_context is not None else ""
- )
-
- global_context = ""
- # Add in global context if present
- if global_context_string != "" or global_context_output != "":
- global_context = (
- "Global Context:\n```\n"
- + global_context_string
- + "\n"
- + global_context_output
- + "```\n"
- )
-
- user_prompt = f"""You are an agent named {self.agent_name} with the following description: ```{self.agent_description}```\n"""
- if provide_function_list:
- user_prompt += f"You have the following Equipped Functions available:\n```{self.list_functions(filtered_fn_list)}```\n"
- user_prompt += global_context
- user_prompt += query
+ global_context_output = await self.get_global_context(self) if self.get_global_context is not None else ""
+ input_user_query_object: list[str|dict] = self.prepare_input_user_prompt_for_query(query, provide_function_list, task, filtered_fn_list, global_context_output)
res = await strict_json_async(
system_prompt="",
- user_prompt=user_prompt,
+ user_prompt=input_user_query_object,
output_format=output_format,
verbose=self.debug,
llm=self.llm,
@@ -1246,7 +1271,7 @@ def assign_functions(self, function_list: list):
for function in function_list:
# If this function is an Agent, parse it accordingly
- if isinstance(function, BaseAgent):
+ if isinstance(function, AsyncAgent):
function = function.to_function(self)
# do automatic conversion of function to Function class (this is in base.py)
@@ -1305,14 +1330,19 @@ async def use_function(
if name == "Function":
continue
rag_info += f"Knowledge Reference for {name}: ```{await self.memory_bank[name].retrieve(subtask)}```\n"
-
- res = await self.query(
- query=f'{rag_info}Subtasks Completed:```{self.subtasks_completed}```\nAssigned Subtask: ```{function_params["instruction"]}```\n\nYou are the taking the role of the utility function `use_llm` that generates a detailed outcome for the Assigned Subtask on behalf of {self.agent_name}, according to your Agent Capabilities without talking to the User, with no restrictions. Do not just state that you performed the task, generate the detailed outcome as well.',
+
+ queries = []
+ queries.append(rag_info)
+ queries.extend(self.get_subtasks_completed_as_query_list())
+ queries.append(f"Assigned Subtask: ```{function_params['instruction']}``` \n\n")
+ queries.append(self.use_llm_prompt)
+ res = cast(dict[str, Any], await self.query(
+ query=queries,
output_format={
"Detailed Outcome": "Your detailed outcome for Assigned Subtask"
},
provide_function_list=False,
- )
+ ))
if self.verbose:
print(f"> {res['Detailed Outcome']}")
@@ -1366,12 +1396,12 @@ async def use_function(
async def get_next_subtask(self, task=""):
"""Based on what the task is and the subtasks completed, we get the next subtask, function and input parameters. Supports user-given task as well if user wants to use this function directly"""
-
+ bg_queries =[]
if task == "":
- background_info = f"Assigned Task:```\n{self.task}\n```\nSubtasks Completed: ```{self.subtasks_completed}```"
-
+ bg_queries.append(f"Assigned Task:```\n{self.task}\n```")
+ bg_queries.extend(self.get_subtasks_completed_as_query_list())
else:
- background_info = f"Assigned Task:```\n{task}\n```\n"
+ bg_queries.append(f"Assigned Task:```\n{task}\n```")
# use default agent plan if task is not given
task = self.task if task == "" else task
@@ -1384,11 +1414,15 @@ async def get_next_subtask(self, task=""):
continue
rag_info += f"Knowledge Reference for {name}: ```{await self.memory_bank[name].retrieve(task)}```\n"
- # First select the Equipped Function
- res = await self.query(
- query=f"""{background_info}{rag_info}\nBased on everything before, provide suitable Observation and Thoughts, and also generate the Current Subtask and the corresponding Equipped Function Name to complete a part of Assigned Task.
+ queries = []
+ queries.extend(bg_queries)
+ queries.append(f"""{rag_info}Based on everything before, provide suitable Observation and Thoughts, and also generate the Current Subtask and the corresponding Equipped Function Name to complete a part of Assigned Task.
You are only given the Assigned Task from User with no further inputs. Only focus on the Assigned Task and do not do more than required.
-End Task if Assigned Task is completed.""",
+End Task if Assigned Task is completed.""")
+
+ # First select the Equipped Function
+ res = cast(dict[str, Any], await self.query(
+ query=queries,
output_format={
"Observation": "Reflect on what has been done in Subtasks Completed for Assigned Task",
"Thoughts": "Brainstorm how to complete remainder of Assigned Task only given Observation",
@@ -1397,7 +1431,7 @@ async def get_next_subtask(self, task=""):
},
provide_function_list=True,
task=task,
- )
+ ))
if self.verbose:
print(
@@ -1450,22 +1484,26 @@ async def get_next_subtask(self, task=""):
res["Equipped Function Inputs"] = {}
else:
+ queries = []
+ queries.extend(bg_queries)
+ queries.append(f"{rag_info}\n\nCurrent Subtask: ```{res['Current Subtask']}```\nEquipped Function Details: ```{str(cur_function)}```\nOutput suitable values for Inputs to Equipped Function to fulfil Current Subtask\nInput fields are: {list(input_format.keys())}")
res2 = await self.query(
- query=f"""{background_info}{rag_info}\n\nCurrent Subtask: ```{res["Current Subtask"]}```\nEquipped Function Details: ```{str(cur_function)}```\Output suitable values for Inputs to Equipped Function to fulfil Current Subtask\nInput fields are: {list(input_format.keys())}""",
+ query=queries,
output_format=input_format,
provide_function_list=False,
)
# store the rest of the function parameters
- res["Equipped Function Inputs"] = res2
+ res["Equipped Function Inputs"] = cast(dict[str, Any], res2)
# Add in output to the thoughts
self.thoughts.append(res)
+ function_inputs = cast(dict[str, Any], res["Equipped Function Inputs"])
return (
res["Current Subtask"],
res["Equipped Function Name"],
- res["Equipped Function Inputs"],
+ function_inputs,
)
async def summarise_subtasks_completed(self, task: str = ""):
@@ -1484,11 +1522,14 @@ async def reply_user(
my_query = self.task if query == "" else query
- res = await self.query(
- query=f"Subtasks Completed: ```{self.subtasks_completed}```\nAssigned Task: ```{my_query}```\nRespond to the Assigned Task in detail using information from Global Context and Subtasks Completed only. Be factual and do not generate any new information. Be detailed and give all information available relevant for the Assigned Task in your Assigned Task Response",
+ queries = []
+ queries.extend(self.get_subtasks_completed_as_query_list())
+ queries.append(f"Assigned Task: ```{my_query}```\nRespond to the Assigned Task in detail using information from Global Context and Subtasks Completed only. Be factual and do not generate any new information. Be detailed and give all information available relevant for the Assigned Task in your Assigned Task Response")
+ res = cast(dict[str, Any], await self.query(
+ query=queries,
output_format={"Assigned Task Response": "Detailed Response"},
provide_function_list=False,
- )
+ ))
res = res["Assigned Task Response"]
@@ -1538,7 +1579,7 @@ async def run(
# otherwise do the task
for i in range(num_subtasks):
# Determine next subtask, or if task is complete. Always execute if it is the first subtask
- subtask, function_name, function_params = await self.get_next_subtask()
+ subtask, function_name, function_params = await self.get_next_subtask()
if function_name == "end_task":
self.task_completed = True
if self.verbose:
@@ -1697,7 +1738,8 @@ async def __call__(self, instruction: str):
)
agent_copy.subtasks_completed = {}
- output = await agent_copy.run(instruction, self.meta_agent.overall_task)
+ # Type assertion: agent_copy is an AsyncAgent, so run is async
+ output = await cast(AsyncAgent, agent_copy).run(instruction, self.meta_agent.overall_task)
# append result of inner agent to meta agent
agent_copy.verbose = False
@@ -1728,4 +1770,4 @@ async def wrapper(*args, **kwargs):
result = await method(*args, **all_kwargs)
return result
- return wrapper
+ return wrapper
\ No newline at end of file
diff --git a/taskgen/base.py b/taskgen/base.py
index 1df1c99..17591e1 100644
--- a/taskgen/base.py
+++ b/taskgen/base.py
@@ -3,6 +3,7 @@
import ast
from typing import Tuple
+# TODO we never use sync flow anywhere, better to cleanup sync flow altogether
### Helper Functions ###
@@ -14,11 +15,11 @@ def convert_to_list(field: str, **kwargs) -> list:
res = chat(system_msg, user_msg, **kwargs)
# Extract out list items
- field = re.findall(r'\(%item\)\s*(.*?)\n*(?=\(%item\)|$)', res, flags=re.DOTALL)
- return field
+ items = re.findall(r'\(%item\)\s*(.*?)\n*(?=\(%item\)|$)', res, flags=re.DOTALL)
+ return items
-def convert_to_dict(field: str, keys: dict, delimiter: str) -> dict:
+def convert_to_dict(field: str, keys:list, delimiter: str) -> dict:
'''Converts the string field into a dictionary with keys by splitting on '{delimiter}{key}{delimiter}' '''
output_d = {}
for key in keys:
@@ -201,7 +202,7 @@ def type_check_and_convert(field, key, data_type, **kwargs):
-def check_datatype(field, key: dict, data_type: str, **kwargs):
+def check_datatype(field, key, data_type: str, **kwargs):
''' Ensures that output field of the key of JSON dictionary is of data_type
Currently supports int, float, str, code, enum, lists, nested lists, dict, dict with keys
Takes in **kwargs for the LLM model
@@ -265,7 +266,7 @@ def check_datatype(field, key: dict, data_type: str, **kwargs):
-def check_key(field: str, output_format, new_output_format, delimiter: str, delimiter_num: int, **kwargs):
+def check_key(field, output_format, new_output_format, delimiter: str, delimiter_num: int, **kwargs):
''' Check whether each key in dict, or elements in list of new_output_format is present in field, and whether they meet the right data type requirements, then convert field to the right data type
If needed, calls LLM model with parameters **kwargs to correct the output format for improperly formatted list
output_format is user-given output format at each level, new_output_format is with delimiters in keys, and angle brackets surrounding values
@@ -278,7 +279,7 @@ def check_key(field: str, output_format, new_output_format, delimiter: str, deli
# this is the processed output dictionary for that particular layer in the output structure
output_d = {}
# check key appears for each element in the output
- output_d = convert_to_dict(field, output_format.keys(), cur_delimiter)
+ output_d = convert_to_dict(field, list(output_format.keys()), cur_delimiter)
# after creating dictionary, step into next layer
for key, value in output_d.items():
@@ -368,22 +369,23 @@ def wrap_with_angle_brackets(d: dict, delimiter: str, delimiter_num: int) -> dic
else:
return d
-def chat(system_prompt: str, user_prompt: str, model: str = 'gpt-4o-mini', temperature: float = 0, verbose: bool = False, host: str = 'openai', llm = None, **kwargs):
+def chat(system_prompt: str, user_prompt: str|list[str], model: str = 'gpt-4o-mini', temperature: float = 0, verbose: bool = False, host: str = 'openai', llm = None, **kwargs):
r"""Performs a chat with the host's LLM model with system prompt, user prompt, model, verbose and kwargs
Returns the output string res
- system_prompt: String. Write in whatever you want the LLM to become. e.g. "You are a \"
- - user_prompt: String. The user input. Later, when we use it as a function, this is the function input
+ - user_prompt: String or list of strings. The user input. Later, when we use it as a function, this is the function input
- model: String. The LLM model to use for json generation
- verbose: Boolean (default: False). Whether or not to print out the system prompt, user prompt, GPT response
- host: String. The provider of the LLM
- llm: User-made llm function.
- Inputs:
- system_prompt: String. Write in whatever you want the LLM to become. e.g. "You are a \"
- - user_prompt: String. The user input. Later, when we use it as a function, this is the function input
+ - user_prompt: String or list of strings. The user input. Later, when we use it as a function, this is the function input
- Output:
- res: String. The response of the LLM call
- **kwargs: Dict. Additional arguments for LLM chat
"""
+ res =""
if llm is not None:
''' If you specified your own LLM, then we just feed in the system and user prompt
LLM function should take in system prompt (str) and user prompt (str), and output a response (str) '''
@@ -401,6 +403,8 @@ def chat(system_prompt: str, user_prompt: str, model: str = 'gpt-4o-mini', tempe
from openai import OpenAI
client = OpenAI()
+ if isinstance(user_prompt, list):
+ user_prompt = "\n".join(user_prompt)
response = client.chat.completions.create(
model=model,
temperature = temperature,
@@ -421,14 +425,14 @@ def chat(system_prompt: str, user_prompt: str, model: str = 'gpt-4o-mini', tempe
### Main Functions ###
-def strict_json(system_prompt: str, user_prompt: str, output_format: dict, return_as_json = False, custom_checks: dict = None, check_data = None, delimiter: str = '###', num_tries: int = 3, openai_json_mode: bool = False, **kwargs):
+def strict_json(system_prompt: str, user_prompt: str|list[str], output_format: dict, return_as_json = False, custom_checks: dict|None = None, check_data = None, delimiter: str = '###', num_tries: int = 3, openai_json_mode: bool = False, **kwargs):
r""" Ensures that OpenAI will always adhere to the desired output JSON format defined in output_format.
Uses rule-based iterative feedback to ask GPT to self-correct.
Keeps trying up to num_tries it it does not. Returns empty JSON if unable to after num_tries iterations.
Inputs (compulsory):
- system_prompt: String. Write in whatever you want GPT to become. e.g. "You are a \"
- - user_prompt: String. The user input. Later, when we use it as a function, this is the function input
+ - user_prompt: String or list of strings. The user input. Later, when we use it as a function, this is the function input
- output_format: Dict. JSON format with the key as the output key, and the value as the output description
Inputs (optional):
@@ -459,8 +463,11 @@ def strict_json(system_prompt: str, user_prompt: str, output_format: dict, retur
output_format_prompt = "\nOutput in the following json string format: " + str(output_format) + "\nBe concise."
- my_system_prompt = str(system_prompt) + output_format_prompt
- my_user_prompt = str(user_prompt)
+ my_system_prompt = str(system_prompt)
+ if isinstance(user_prompt, list):
+ my_user_prompt = user_prompt + [output_format_prompt]
+ else:
+ my_user_prompt = [user_prompt] + [output_format_prompt]
res = chat(my_system_prompt, my_user_prompt, response_format = {"type": "json_object"}, **kwargs)
@@ -487,8 +494,11 @@ def strict_json(system_prompt: str, user_prompt: str, output_format: dict, retur
Ensure the following output keys are present in the json: {' '.join(list(new_output_format.keys()))}'''
for i in range(num_tries):
- my_system_prompt = str(system_prompt) + output_format_prompt + error_msg
- my_user_prompt = str(user_prompt)
+ my_system_prompt = str(system_prompt)
+ if isinstance(user_prompt, list):
+ my_user_prompt = user_prompt + [output_format_prompt] + [error_msg]
+ else:
+ my_user_prompt = [user_prompt] + [output_format_prompt] + [error_msg]
# Use OpenAI to get a response
res = chat(my_system_prompt, my_user_prompt, **kwargs)
@@ -514,7 +524,7 @@ def strict_json(system_prompt: str, user_prompt: str, output_format: dict, retur
# do checks for keys and output format, remove escape characters so code can be run
end_dict = check_key(res, output_format, new_output_format, delimiter, delimiter_num = 1, **kwargs)
-
+ assert isinstance(end_dict, dict)
# run user defined custom checks now
for key in end_dict:
if key in custom_checks:
diff --git a/taskgen/base_async.py b/taskgen/base_async.py
index e9a8571..5e341b2 100644
--- a/taskgen/base_async.py
+++ b/taskgen/base_async.py
@@ -18,8 +18,8 @@ async def convert_to_list_async(field: str, **kwargs) -> list:
res = await chat_async(system_msg, user_msg, **kwargs)
# Extract out list items
- field = re.findall(r'\(%item\)\s*(.*?)\n*(?=\(%item\)|$)', res, flags=re.DOTALL)
- return field
+ items = re.findall(r'\(%item\)\s*(.*?)\n*(?=\(%item\)|$)', res, flags=re.DOTALL)
+ return items
@@ -41,7 +41,7 @@ async def llm_check_async(field, llm_check_msg: str, **kwargs) -> Tuple[bool, st
return requirement_met, action_needed
-async def check_datatype_async(field, key: dict, data_type: str, **kwargs):
+async def check_datatype_async(field, key, data_type: str, **kwargs):
''' Ensures that output field of the key of JSON dictionary is of data_type
Currently supports int, float, str, code, enum, lists, nested lists, dict, dict with keys
Takes in **kwargs for the LLM model
@@ -104,7 +104,7 @@ async def check_datatype_async(field, key: dict, data_type: str, **kwargs):
-async def check_key_async(field: str, output_format, new_output_format, delimiter: str, delimiter_num: int, **kwargs):
+async def check_key_async(field, output_format, new_output_format, delimiter: str, delimiter_num: int, **kwargs):
''' Check whether each key in dict, or elements in list of new_output_format is present in field, and whether they meet the right data type requirements, then convert field to the right data type
If needed, calls LLM model with parameters **kwargs to correct the output format for improperly formatted list
output_format is user-given output format at each level, new_output_format is with delimiters in keys, and angle brackets surrounding values
@@ -117,7 +117,7 @@ async def check_key_async(field: str, output_format, new_output_format, delimite
# this is the processed output dictionary for that particular layer in the output structure
output_d = {}
# check key appears for each element in the output
- output_d = convert_to_dict(field, output_format.keys(), cur_delimiter)
+ output_d = convert_to_dict(field, list(output_format.keys()), cur_delimiter)
# after creating dictionary, step into next layer
for key, value in output_d.items():
@@ -169,11 +169,11 @@ async def check_key_async(field: str, output_format, new_output_format, delimite
-async def chat_async(system_prompt: str, user_prompt: str, model: str = 'gpt-4o-mini', temperature: float = 0, verbose: bool = False, host: str = 'openai', llm= None, **kwargs):
+async def chat_async(system_prompt: str, user_prompt: str|list[str|dict], model: str = 'gpt-4o-mini', temperature: float = 0, verbose: bool = False, host: str = 'openai', llm= None, **kwargs):
r"""Performs a chat with the host's LLM model with system prompt, user prompt, model, verbose and kwargs
Returns the output string res
- system_prompt: String. Write in whatever you want the LLM to become. e.g. "You are a \"
- - user_prompt: String. The user input. Later, when we use it as a function, this is the function input
+ - user_prompt: String or list of strings. The user input. Later, when we use it as a function, this is the function input
- model: String. The LLM model to use for json generation
- verbose: Boolean (default: False). Whether or not to print out the system prompt, user prompt, GPT response
- host: String. The provider of the LLM
@@ -185,6 +185,7 @@ async def chat_async(system_prompt: str, user_prompt: str, model: str = 'gpt-4o-
- res: String. The response of the LLM call
- **kwargs: Dict. Additional arguments for LLM chat
"""
+ res = ""
if llm is not None:
ensure_awaitable(llm, 'llm')
''' If you specified your own LLM, then we just feed in the system and user prompt
@@ -203,6 +204,8 @@ async def chat_async(system_prompt: str, user_prompt: str, model: str = 'gpt-4o-
from openai import AsyncOpenAI
client = AsyncOpenAI()
+ if isinstance(user_prompt, list):
+ user_prompt = "\n".join([prompt.get("text", "") if isinstance(prompt, dict) and prompt.get("text", "") else str(prompt) for prompt in user_prompt])
response = await client.chat.completions.create(
model=model,
temperature = temperature,
@@ -226,14 +229,14 @@ async def chat_async(system_prompt: str, user_prompt: str, model: str = 'gpt-4o-
### Main Functions ###
-async def strict_json_async(system_prompt: str, user_prompt: str, output_format: dict, return_as_json = False, custom_checks: dict = None, check_data = None, delimiter: str = '###', num_tries: int = 3, openai_json_mode: bool = False, **kwargs):
+async def strict_json_async(system_prompt: str, user_prompt: str|list[str|dict], output_format: dict, return_as_json = False, custom_checks: dict|None = None, check_data = None, delimiter: str = '###', num_tries: int = 3, openai_json_mode: bool = False, **kwargs):
r""" Ensures that OpenAI will always adhere to the desired output JSON format defined in output_format.
Uses rule-based iterative feedback to ask GPT to self-correct.
Keeps trying up to num_tries it it does not. Returns empty JSON if unable to after num_tries iterations.
Inputs (compulsory):
- system_prompt: String. Write in whatever you want GPT to become. e.g. "You are a \"
- - user_prompt: String. The user input. Later, when we use it as a function, this is the function input
+ - user_prompt: String or list of strings or dictionaries. The user input. Later, when we use it as a function, this is the function input
- output_format: Dict. JSON format with the key as the output key, and the value as the output description
Inputs (optional):
@@ -264,8 +267,14 @@ async def strict_json_async(system_prompt: str, user_prompt: str, output_format:
output_format_prompt = "\nOutput in the following json string format: " + str(output_format) + "\nBe concise."
- my_system_prompt = str(system_prompt) + output_format_prompt
- my_user_prompt = str(user_prompt)
+ my_system_prompt = str(system_prompt)
+ my_user_prompt: list[str | dict] = []
+ if isinstance(user_prompt, list):
+ my_user_prompt.extend(user_prompt)
+ my_user_prompt.append(output_format_prompt)
+ else:
+ my_user_prompt.append(user_prompt)
+ my_user_prompt.append(output_format_prompt)
res = await chat_async(my_system_prompt, my_user_prompt, response_format = {"type": "json_object"}, **kwargs)
@@ -292,9 +301,16 @@ async def strict_json_async(system_prompt: str, user_prompt: str, output_format:
Ensure the following output keys are present in the json: {' '.join(list(new_output_format.keys()))}'''
for i in range(num_tries):
- my_system_prompt = str(system_prompt) + output_format_prompt + error_msg
- my_user_prompt = str(user_prompt)
-
+ my_system_prompt = str(system_prompt)
+ my_user_prompt = []
+ if isinstance(user_prompt, list):
+ my_user_prompt.extend(user_prompt)
+ my_user_prompt.append(output_format_prompt)
+ my_user_prompt.append(error_msg)
+ else:
+ my_user_prompt.append(user_prompt)
+ my_user_prompt.append(output_format_prompt)
+ my_user_prompt.append(error_msg)
# Use OpenAI to get a response
res = await chat_async(my_system_prompt, my_user_prompt, **kwargs)
@@ -320,6 +336,7 @@ async def strict_json_async(system_prompt: str, user_prompt: str, output_format:
# do checks for keys and output format, remove escape characters so code can be run
end_dict = await check_key_async(res, output_format, new_output_format, delimiter, delimiter_num = 1, **kwargs)
+ assert isinstance(end_dict, dict)
# run user defined custom checks now
for key in end_dict:
if key in custom_checks:
diff --git a/taskgen/function.py b/taskgen/function.py
index 149e5ba..66e96dd 100644
--- a/taskgen/function.py
+++ b/taskgen/function.py
@@ -1,6 +1,7 @@
+import json
import re
import inspect
-from typing import get_type_hints
+from typing import Any, cast, get_type_hints
from taskgen.base import strict_json
from taskgen.base_async import strict_json_async
@@ -112,7 +113,7 @@ def get_fn_output(my_function) -> dict:
class BaseFunction:
def __init__(self,
fn_description: str = '',
- output_format: dict = None,
+ output_format: dict|None = None,
examples = None,
external_fn = None,
is_compulsory = False,
@@ -289,11 +290,11 @@ def __init__(self, *args, **kwargs):
self.fn_name = self.external_fn.__name__
# otherwise, generate name out
else:
- res = strict_json(system_prompt = "Output a function name to summarise the usage of this function.",
+ res = cast(dict[str, Any], strict_json(system_prompt = "Output a function name to summarise the usage of this function.",
user_prompt = str(self.fn_description),
output_format = {"Thoughts": "What function does", "Name": "Function name with _ separating words that summarises what function does"},
- llm = self.llm,
- **self.kwargs)
+ llm = self.llm,
+ **self.kwargs))
self.fn_name = res['Name']
# change instance's name to function's name
self.__name__ = self.fn_name
@@ -322,7 +323,7 @@ def __call__(self, *args, **kwargs):
# If strict_json function, do the function.
if self.external_fn is None:
res = strict_json(system_prompt = self.fn_description,
- user_prompt = function_kwargs,
+ user_prompt = json.dumps(function_kwargs),
output_format = self.output_format,
llm = self.llm,
**self.kwargs, **strict_json_kwargs)
@@ -373,11 +374,11 @@ def __init__(self, *args, **kwargs):
async def async_init(self):
''' This generates the name for the function using strict_json_async '''
if self.fn_name is None:
- res = await strict_json_async(system_prompt = "Output a function name to summarise the usage of this function.",
+ res = cast(dict[str, Any], await strict_json_async(system_prompt = "Output a function name to summarise the usage of this function.",
user_prompt = str(self.fn_description),
output_format = {"Thoughts": "What function does", "Name": "Function name with _ separating words that summarises what function does"},
llm = self.llm,
- **self.kwargs)
+ **self.kwargs))
self.fn_name = res['Name']
# change instance's name to function's name
@@ -410,11 +411,11 @@ async def __call__(self, *args, **kwargs):
# If strict_json function, do the function.
if self.external_fn is None:
- res = await strict_json_async(system_prompt = self.fn_description,
- user_prompt = function_kwargs,
+ res = cast(dict[str, Any], await strict_json_async(system_prompt = self.fn_description,
+ user_prompt = json.dumps(function_kwargs),
output_format = self.output_format,
llm = self.llm,
- **self.kwargs, **strict_json_kwargs)
+ **self.kwargs, **strict_json_kwargs))
# Else run the external function
else:
diff --git a/taskgen/memory.py b/taskgen/memory.py
index 4f25a4d..e06803b 100644
--- a/taskgen/memory.py
+++ b/taskgen/memory.py
@@ -1,9 +1,5 @@
-from ast import List
import asyncio
-import hashlib
-import os
-import time
-from typing import Any
+from typing import Any, cast
from docx import Document
from langchain_text_splitters import RecursiveCharacterTextSplitter
# import chromadb
@@ -45,7 +41,7 @@ def reset(self):
"""Clears all memories"""
@abstractmethod
- def retrieve(self, task: str):
+ def retrieve(self, task: str) -> Any:
"""Retrieves some memories according to task"""
pass
@@ -122,7 +118,7 @@ class BaseMemory(MemoryTemplate):
def __init__(
self,
- memory: list = None,
+ memory: list|None = None,
top_k: int = 3,
mapper=lambda x: x,
approach="retrieve_by_ranker",
@@ -152,9 +148,9 @@ def append(self, memory_list, mapper=None):
memory_list = [memory_list]
self.memory.extend(memory_list)
- def remove(self, memory_to_remove):
+ def remove(self, existing_memory):
"""Removes a memory"""
- self.memory.remove(memory_to_remove)
+ self.memory.remove(existing_memory)
def reset(self):
"""Clears all memory"""
@@ -166,7 +162,7 @@ def isempty(self) -> bool:
def get_python_representation(self, include_memory_elements) -> str:
"""Returns a string representation of the object for debugging"""
- return f"Memory(memory={self.memory if include_memory_elements else []}, top_k={self.top_k}, mapper={get_source_code_for_func(self.mapper)}, approach='{self.approach}', ranker={self.ranker.get_python_representation() if hasattr(self.ranker, 'get_python_representation') else 'None'})"
+ return f"Memory(memory={self.memory if include_memory_elements else []}, top_k={self.top_k}, mapper={get_source_code_for_func(self.mapper)}, approach='{self.approach}', ranker={self.ranker.get_python_representation() if self.ranker is not None and hasattr(self.ranker, 'get_python_representation') else 'None'})"
class Memory(BaseMemory):
@@ -221,14 +217,14 @@ def retrieve_by_ranker(self, task: str) -> list:
def retrieve_by_llm(self, task: str) -> list:
"""Performs retrieval via LLMs
Returns the key list as well as the value list"""
- res = strict_json(
+ res = cast(dict[str, Any], strict_json(
f'You are to output the top {self.top_k} most similar list items in Memory relevant to this: ```{task}```\nMemory: {[f"{i}. {self.mapper(mem)}" for i, mem in enumerate(self.memory)]}',
"",
output_format={
f"top_{self.top_k}_list": f"Indices of top {self.top_k} most similar list items in Memory, type: list[int]"
},
llm=self.llm,
- )
+ ))
top_k_indices = res[f"top_{self.top_k}_list"]
return [self.memory[index] for index in top_k_indices]
@@ -290,14 +286,14 @@ async def retrieve_by_ranker(self, task: str) -> list:
async def retrieve_by_llm(self, task: str) -> list:
"""Performs retrieval via LLMs
Returns the key list as well as the value list"""
- res = await strict_json_async(
+ res = cast(dict[str, Any], await strict_json_async(
f'You are to output the top {self.top_k} most similar list items in Memory relevant to this: ```{task}```\nMemory: {[f"{i}. {self.mapper(mem)}" for i, mem in enumerate(self.memory)]}',
"",
output_format={
f"top_{self.top_k}_list": f"Indices of top {self.top_k} most similar list items in Memory, type: list[int]"
},
llm=self.llm,
- )
+ ))
top_k_indices = res[f"top_{self.top_k}_list"]
return [self.memory[index] for index in top_k_indices]
diff --git a/taskgen/utils.py b/taskgen/utils.py
index 96b5d30..1916fc2 100644
--- a/taskgen/utils.py
+++ b/taskgen/utils.py
@@ -8,7 +8,10 @@ def get_source_code_for_func(fn):
if fn.__name__ == "":
source_line = inspect.getsource(fn)
source_line = source_line.split('#')[0]
- match = re.search(r"\blambda\b[^:]+:.*", source_line).group(0)
+ match = re.search(r"\blambda\b[^:]+:.*", source_line)
+ if match is None:
+ raise ValueError(f"Could not extract lambda function from source: {source_line}")
+ match = match.group(0)
splits = [s for s in match.split(",") if s != ""]
fn_code = splits[0]
idx = 1
diff --git a/taskgen/wrapper.py b/taskgen/wrapper.py
index aae2c50..e8341b4 100644
--- a/taskgen/wrapper.py
+++ b/taskgen/wrapper.py
@@ -1,3 +1,4 @@
+from typing import Any, cast
from taskgen.agent import Agent
from taskgen.base import strict_json
@@ -22,7 +23,7 @@ class ConversationWrapper(Agent):
- ConversationWrapper uses `chat()` which chats with the Agent and the Agent will perform actions and reply the chat message'''
- def __init__(self, agent: Agent, persistent_memory: dict = None, person = 'User', conversation = None, num_past_conversation: int = 5, verbose: bool = True):
+ def __init__(self, agent: Agent, persistent_memory: dict|None = None, person = 'User', conversation = None, num_past_conversation: int = 5, verbose: bool = True):
# Initialize the parent Agent
super().__init__(**agent.__dict__) # Inherit all of the attributes of the passed agent
# Initiatlize the functions
@@ -69,7 +70,7 @@ def chat(self, cur_msg):
self.reset()
## Replies the person
- res = self.query(f'''Summary of Past Conversation: ```{self.shared_variables['Summary of Conversation']}```
+ res = cast(dict[str, Any], self.query(f'''Summary of Past Conversation: ```{self.shared_variables['Summary of Conversation']}```
Past Conversation: ```{self.shared_variables['Conversation'][-self.num_past_conversation:]}```
Latest Input from {self.person}: ```{cur_msg}```
Actions Done for Latest Input: ```{actions_done}```
@@ -82,7 +83,7 @@ def chat(self, cur_msg):
output_format = {"Thoughts": f"How to reply",
f"Reply to {self.person}": f"Your reply as {self.agent_name}",
- "Summary of Conversation": "Summarise key points of entire conversation in at most two sentences, building on previous Summary"})
+ "Summary of Conversation": "Summarise key points of entire conversation in at most two sentences, building on previous Summary"}))
# Update the Summary of Conversation and Append the conversation
self.shared_variables['Summary of Conversation'] = res['Summary of Conversation']
@@ -126,7 +127,7 @@ class ConversableAgent:
- **Summary of Conversation**: A summary of the current conversation
- ConversableAgent uses `chat()` which chats with the Agent and the Agent will perform actions and reply the chat message'''
- def __init__(self, agent: Agent, persistent_memory: dict = None, person = 'User', conversation = None, num_past_conversation: int = 5, verbose: bool = True):
+ def __init__(self, agent: Agent, persistent_memory: dict|None = None, person = 'User', conversation = None, num_past_conversation: int = 5, verbose: bool = True):
self.agent = agent
self.persistent_memory = persistent_memory
self.num_past_conversation = num_past_conversation