diff --git a/python/langsmith/client.py b/python/langsmith/client.py index c211a3a3..a29b3354 100644 --- a/python/langsmith/client.py +++ b/python/langsmith/client.py @@ -57,6 +57,8 @@ logger = logging.getLogger(__name__) _urllib3_logger = logging.getLogger("urllib3.connectionpool") +X_API_KEY = "x-api-key" + def _is_localhost(url: str) -> bool: """Check if the URL is localhost. @@ -287,18 +289,30 @@ def _get_tracing_sampling_rate() -> float | None: return sampling_rate +def _get_env(var_names: Sequence[str], default: Optional[str] = None) -> Optional[str]: + for var_name in var_names: + var = os.getenv(var_name) + if var is not None: + return var + return default + + def _get_api_key(api_key: Optional[str]) -> Optional[str]: - api_key = api_key or os.getenv("LANGSMITH_API_KEY", os.getenv("LANGCHAIN_API_KEY")) - if api_key is None or not api_key.strip(): + api_key_ = ( + api_key + if api_key is not None + else _get_env(("LANGSMITH_API_KEY", "LANGCHAIN_API_KEY")) + ) + if api_key_ is None or not api_key_.strip(): return None - return api_key.strip().strip('"').strip("'") + return api_key_.strip().strip('"').strip("'") def _get_api_url(api_url: Optional[str]) -> str: - _api_url = api_url or os.getenv( - "LANGSMITH_ENDPOINT", - os.getenv( - "LANGCHAIN_ENDPOINT", + _api_url = api_url or cast( + str, + _get_env( + ("LANGSMITH_ENDPOINT", "LANGCHAIN_ENDPOINT"), "https://api.smith.langchain.com", ), ) @@ -307,6 +321,25 @@ def _get_api_url(api_url: Optional[str]) -> str: return _api_url.strip().strip('"').strip("'").rstrip("/") +def _get_write_api_urls(_write_api_urls: Optional[Dict[str, str]]) -> Dict[str, str]: + _write_api_urls = _write_api_urls or json.loads( + os.getenv("LANGSMITH_RUNS_ENDPOINTS", "{}") + ) + processed_write_api_urls = {} + for url, api_key in _write_api_urls.items(): + processed_url = url.strip() + if not processed_url: + raise ls_utils.LangSmithUserError( + "LangSmith runs API URL within LANGSMITH_RUNS_ENDPOINTS cannot be empty" + ) + processed_url = processed_url.strip().strip('"').strip("'").rstrip("/") + processed_api_key = api_key.strip().strip('"').strip("'") + _validate_api_key_if_hosted(processed_url, processed_api_key) + processed_write_api_urls[processed_url] = processed_api_key + + return processed_write_api_urls + + def _as_uuid(value: ID_TYPE, var: Optional[str] = None) -> uuid.UUID: try: return uuid.UUID(value) if not isinstance(value, uuid.UUID) else value @@ -358,6 +391,7 @@ class Client: "_hide_inputs", "_hide_outputs", "_info", + "_write_api_urls", ] def __init__( @@ -373,6 +407,7 @@ def __init__( hide_inputs: Optional[Union[Callable[[dict], dict], bool]] = None, hide_outputs: Optional[Union[Callable[[dict], dict], bool]] = None, info: Optional[Union[dict, ls_schemas.LangSmithInfo]] = None, + api_urls: Optional[Dict[str, str]] = None, ) -> None: """Initialize a Client instance. @@ -403,17 +438,45 @@ def __init__( info: Optional[ls_schemas.LangSmithInfo] The information about the LangSmith API. If not provided, it will be fetched from the API. + api_urls: Optional[Dict[str, str]] + A dictionary of write API URLs and their corresponding API keys. + Useful for multi-tenant setups. Data is only read from the first + URL in the dictionary. However, ONLY Runs are written (POST and PATCH) + to all URLs in the dictionary. Feedback, sessions, datasets, examples, + annotation queues and evaluation results are only written to the first. Raises: ------ LangSmithUserError If the API key is not provided when using the hosted service. + If both api_url and api_urls are provided. """ + if api_url and api_urls: + raise ls_utils.LangSmithUserError( + "You cannot provide both api_url and api_urls." + ) + + if ( + os.getenv("LANGSMITH_ENDPOINT") or os.getenv("LANGCHAIN_ENDPOINT") + ) and os.getenv("LANGSMITH_RUNS_ENDPOINTS"): + raise ls_utils.LangSmithUserError( + "You cannot provide both LANGSMITH_ENDPOINT / LANGCHAIN_ENDPOINT " + "and LANGSMITH_RUNS_ENDPOINTS." + ) + self.tracing_sample_rate = _get_tracing_sampling_rate() self._sampled_post_uuids: set[uuid.UUID] = set() - self.api_key = _get_api_key(api_key) - self.api_url = _get_api_url(api_url) - _validate_api_key_if_hosted(self.api_url, self.api_key) + self._write_api_urls: Mapping[str, Optional[str]] = _get_write_api_urls( + api_urls + ) + if self._write_api_urls: + self.api_url = next(iter(self._write_api_urls)) + self.api_key: Optional[str] = self._write_api_urls[self.api_url] + else: + self.api_url = _get_api_url(api_url) + self.api_key = _get_api_key(api_key) + _validate_api_key_if_hosted(self.api_url, self.api_key) + self._write_api_urls = {self.api_url: self.api_key} self.retry_config = retry_config or _default_retry_config() self.timeout_ms = timeout_ms or 10000 self._web_url = web_url @@ -514,7 +577,7 @@ def _headers(self) -> Dict[str, str]: """ headers = {"User-Agent": f"langsmith-py/{langsmith.__version__}"} if self.api_key: - headers["x-api-key"] = self.api_key + headers[X_API_KEY] = self.api_key return headers @property @@ -1036,21 +1099,23 @@ def create_run( self._create_run(run_create) def _create_run(self, run_create: dict): - headers = { - **self._headers, - "Accept": "application/json", - "Content-Type": "application/json", - } - self.request_with_retries( - "post", - f"{self.api_url}/runs", - request_kwargs={ - "data": _dumps_json(run_create), - "headers": headers, - "timeout": self.timeout_ms / 1000, - }, - to_ignore=(ls_utils.LangSmithConflictError,), - ) + for api_url, api_key in self._write_api_urls.items(): + headers = { + **self._headers, + "Accept": "application/json", + "Content-Type": "application/json", + X_API_KEY: api_key, + } + self.request_with_retries( + "post", + f"{api_url}/runs", + request_kwargs={ + "data": _dumps_json(run_create), + "headers": headers, + "timeout": self.timeout_ms / 1000, + }, + to_ignore=(ls_utils.LangSmithConflictError,), + ) def _hide_run_inputs(self, inputs: dict): if self._hide_inputs is False: @@ -1186,22 +1251,24 @@ def handle_429(response: requests.Response, attempt: int) -> bool: return False try: - self.request_with_retries( - "post", - f"{self.api_url}/runs/batch", - request_kwargs={ - "data": body, - "timeout": self.timeout_ms / 1000, - "headers": { - **self._headers, - "Accept": "application/json", - "Content-Type": "application/json", + for api_url, api_key in self._write_api_urls.items(): + self.request_with_retries( + "post", + f"{api_url}/runs/batch", + request_kwargs={ + "data": body, + "timeout": self.timeout_ms / 1000, + "headers": { + **self._headers, + "Accept": "application/json", + "Content-Type": "application/json", + X_API_KEY: api_key, + }, }, - }, - to_ignore=(ls_utils.LangSmithConflictError,), - stop_after_attempt=3, - handle_response=handle_429, - ) + to_ignore=(ls_utils.LangSmithConflictError,), + stop_after_attempt=3, + handle_response=handle_429, + ) except Exception as e: logger.warning(f"Failed to batch ingest runs: {repr(e)}") @@ -1275,21 +1342,23 @@ def update_run( return self._update_run(data) def _update_run(self, run_update: dict) -> None: - headers = { - **self._headers, - "Accept": "application/json", - "Content-Type": "application/json", - } + for api_url, api_key in self._write_api_urls.items(): + headers = { + **self._headers, + "Accept": "application/json", + "Content-Type": "application/json", + X_API_KEY: api_key, + } - self.request_with_retries( - "patch", - f"{self.api_url}/runs/{run_update['id']}", - request_kwargs={ - "data": _dumps_json(run_update), - "headers": headers, - "timeout": self.timeout_ms / 1000, - }, - ) + self.request_with_retries( + "patch", + f"{api_url}/runs/{run_update['id']}", + request_kwargs={ + "data": _dumps_json(run_update), + "headers": headers, + "timeout": self.timeout_ms / 1000, + }, + ) def _load_child_runs(self, run: ls_schemas.Run) -> ls_schemas.Run: """Load child runs for a given run. diff --git a/python/langsmith/schemas.py b/python/langsmith/schemas.py index ed18c2ca..abaa6d99 100644 --- a/python/langsmith/schemas.py +++ b/python/langsmith/schemas.py @@ -221,7 +221,7 @@ class RunBase(BaseModel): """List of events associated with the run, like start and end events.""" - inputs: dict + inputs: dict = Field(default_factory=dict) """Inputs used for the run.""" outputs: Optional[dict] = None @@ -301,7 +301,8 @@ def __init__(self, _host_url: Optional[str] = None, **kwargs: Any) -> None: """Initialize a Run object.""" if not kwargs.get("trace_id"): kwargs = {"trace_id": kwargs.get("id"), **kwargs} - super().__init__(**kwargs) + inputs = kwargs.pop("inputs", None) or {} + super().__init__(**kwargs, inputs=inputs) self._host_url = _host_url if not self.dotted_order.strip() and not self.parent_run_id: self.dotted_order = f"{self.start_time.isoformat()}{self.id}" diff --git a/python/tests/unit_tests/test_client.py b/python/tests/unit_tests/test_client.py index 98eef13a..dcc6f545 100644 --- a/python/tests/unit_tests/test_client.py +++ b/python/tests/unit_tests/test_client.py @@ -99,7 +99,7 @@ def test_validate_api_url(monkeypatch: pytest.MonkeyPatch) -> None: def test_validate_api_key(monkeypatch: pytest.MonkeyPatch) -> None: # Scenario 1: Both LANGCHAIN_API_KEY and LANGSMITH_API_KEY are set, - # but api_key is not + # but api_key is not monkeypatch.setenv("LANGCHAIN_API_KEY", "env_langchain_api_key") monkeypatch.setenv("LANGSMITH_API_KEY", "env_langsmith_api_key") @@ -107,7 +107,7 @@ def test_validate_api_key(monkeypatch: pytest.MonkeyPatch) -> None: assert client.api_key == "env_langsmith_api_key" # Scenario 2: Both LANGCHAIN_API_KEY and LANGSMITH_API_KEY are set, - # and api_key is set + # and api_key is set monkeypatch.setenv("LANGCHAIN_API_KEY", "env_langchain_api_key") monkeypatch.setenv("LANGSMITH_API_KEY", "env_langsmith_api_key") @@ -129,6 +129,36 @@ def test_validate_api_key(monkeypatch: pytest.MonkeyPatch) -> None: assert client.api_key == "env_langsmith_api_key" +def test_validate_multiple_urls(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("LANGCHAIN_ENDPOINT", "https://api.smith.langchain-endpoint.com") + monkeypatch.setenv("LANGSMITH_ENDPOINT", "https://api.smith.langsmith-endpoint.com") + monkeypatch.setenv("LANGSMITH_RUNS_ENDPOINTS", "{}") + + with pytest.raises(ls_utils.LangSmithUserError): + Client() + + monkeypatch.undo() + with pytest.raises(ls_utils.LangSmithUserError): + Client( + api_url="https://api.smith.langchain.com", + api_key="123", + api_urls={"https://api.smith.langchain.com": "123"}, + ) + + data = { + "https://api.smith.langsmith-endpoint_1.com": "123", + "https://api.smith.langsmith-endpoint_2.com": "456", + "https://api.smith.langsmith-endpoint_3.com": "789", + } + monkeypatch.delenv("LANGCHAIN_ENDPOINT", raising=False) + monkeypatch.delenv("LANGSMITH_ENDPOINT", raising=False) + monkeypatch.setenv("LANGSMITH_RUNS_ENDPOINTS", json.dumps(data)) + client = Client() + assert client._write_api_urls == data + assert client.api_url == "https://api.smith.langsmith-endpoint_1.com" + assert client.api_key == "123" + + def test_headers(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.delenv("LANGCHAIN_API_KEY", raising=False) client = Client(api_url="http://localhost:1984", api_key="123") @@ -238,22 +268,22 @@ def test_get_api_key() -> None: def test_get_api_url() -> None: - assert _get_api_url("http://provided.url", "api_key") == "http://provided.url" + assert _get_api_url("http://provided.url") == "http://provided.url" with patch.dict(os.environ, {"LANGCHAIN_ENDPOINT": "http://env.url"}): - assert _get_api_url(None, "api_key") == "http://env.url" + assert _get_api_url(None) == "http://env.url" with patch.dict(os.environ, {}, clear=True): - assert _get_api_url(None, "api_key") == "https://api.smith.langchain.com" + assert _get_api_url(None) == "https://api.smith.langchain.com" with patch.dict(os.environ, {}, clear=True): - assert _get_api_url(None, None) == "https://api.smith.langchain.com" + assert _get_api_url(None) == "https://api.smith.langchain.com" with patch.dict(os.environ, {"LANGCHAIN_ENDPOINT": "http://env.url"}): - assert _get_api_url(None, None) == "http://env.url" + assert _get_api_url(None) == "http://env.url" with pytest.raises(ls_utils.LangSmithUserError): - _get_api_url(" ", "api_key") + _get_api_url(" ") def test_create_run_unicode() -> None: