python动态注册执行action
代码核心思想解析
这段 Python 代码主要用于 注册、管理和执行异步操作(Action),结合 Pydantic
进行 参数校验,并 支持动态扩展和装饰器方式注册异步任务。以下是代码的详细解析。
@self.registry.action('Complete task', param_model=DoneAction)
async def done(params: DoneAction):return ActionResult(is_done=True, extracted_content=params.text)class Registry:"""Service for registering and managing actions"""def __init__(self, exclude_actions: list[str] = []):self.registry = ActionRegistry()self.telemetry = ProductTelemetry()self.exclude_actions = exclude_actionsdef _create_param_model(self, function: Callable) -> Type[BaseModel]:"""Creates a Pydantic model from function signature"""sig = signature(function)params = {name: (param.annotation, ... if param.default == param.empty else param.default)for name, param in sig.parameters.items()if name != 'browser' and name != 'page_extraction_llm' and name != 'available_file_paths'}# TODO: make the types here workreturn create_model(f'{function.__name__}_parameters',__base__=ActionModel,**params, # type: ignore)def action(self,description: str,param_model: Optional[Type[BaseModel]] = None,):"""Decorator for registering actions"""def decorator(func: Callable):# Skip registration if action is in exclude_actionsif func.__name__ in self.exclude_actions:return func# Create param model from function if not providedactual_param_model = param_model or self._create_param_model(func)# Wrap sync functions to make them asyncif not iscoroutinefunction(func):async def async_wrapper(*args, **kwargs):return await asyncio.to_thread(func, *args,