diff --git a/dlt/common/pipeline.py b/dlt/common/pipeline.py
index 4f0e4f368d..2c258ec8a6 100644
--- a/dlt/common/pipeline.py
+++ b/dlt/common/pipeline.py
@@ -15,7 +15,6 @@
     NamedTuple,
     Optional,
     Protocol,
-    Sequence,
     Tuple,
     TypeVar,
     Mapping,
@@ -23,7 +22,7 @@
 )
 from typing_extensions import NotRequired
 
-from dlt.common.typing import TypedDict
+from dlt.common.typing import TypedDict, Unpack
 from dlt.common.configuration import configspec
 from dlt.common.configuration import known_sections
 from dlt.common.configuration.container import Container
@@ -46,7 +45,7 @@
     NormalizeMetrics,
     StepMetrics,
 )
-from dlt.common.schema import Schema
+from dlt.common.schema import Schema, TAnySchemaColumns, TTableFormat
 from dlt.common.schema.typing import (
     TColumnSchema,
     TWriteDispositionConfig,
@@ -474,6 +473,48 @@ class TSourceState(TPipelineState):
     sources: Dict[str, Dict[str, Any]]  # type: ignore[misc]
 
 
+class SupportsPipelineRunArgs(TypedDict, total=False):
+    destination: Optional[TDestinationReferenceArg]
+    """A name of the destination to which dlt will load the data, or a destination module imported from `dlt.destination`.
+        If not provided, the value passed to `dlt.pipeline` will be used."""
+    staging: Optional[TDestinationReferenceArg]
+    """The staging dataset."""
+    dataset_name: Optional[str]
+    """A name of the dataset to which the data will be loaded. A dataset is a logical group of tables ie. `schema` in relational databases or folder grouping many files.
+        If not provided, the value passed to `dlt.pipeline` will be used. If not provided at all then defaults to the `pipeline_name`."""
+    credentials: Optional[Any]
+    """Credentials for the `destination` ie. database connection string or a dictionary with google cloud credentials.
+        In most cases should be set to None, which lets `dlt` to use `secrets.toml` or environment variables to infer right credentials values."""
+    table_name: Optional[str]
+    """The name of the table to which the data should be loaded within the `dataset`. This argument is required for a `data` that is a list/Iterable or Iterator without `__name__` attribute.
+        The behavior of this argument depends on the type of the `data`:
+        * generator functions - the function name is used as table name, `table_name` overrides this default
+        * `@dlt.resource` - resource contains the full table schema and that includes the table name. `table_name` will override this property. Use with care!
+        * `@dlt.source` - source contains several resources each with a table schema. `table_name` will override all table names within the source and load the data into single table."""
+    write_disposition: Optional[TWriteDispositionConfig]
+    """Controls how to write data to a table. Accepts a shorthand string literal or configuration dictionary.
+        Allowed shorthand string literals: `append` will always add new data at the end of the table. `replace` will replace existing data with new data. `skip` will prevent data from loading. "merge" will deduplicate and merge data based on "primary_key" and "merge_key" hints. Defaults to "append".
+        Write behaviour can be further customized through a configuration dictionary. For example, to obtain an SCD2 table provide `write_disposition={"disposition": "merge", "strategy": "scd2"}`.
+        Please note that in case of `dlt.resource` the table schema value will be overwritten and in case of `dlt.source`, the values in all resources will be overwritten."""
+    columns: Optional[TAnySchemaColumns]
+    """A list of column schemas. Typed dictionary describing column names, data types, write disposition and performance hints that gives you full control over the created table schema."""
+    primary_key: Optional[TColumnNames]
+    """A column name or a list of column names that comprise a private key. Typically used with "merge" write disposition to deduplicate loaded data."""
+    schema: Optional[Schema]
+    """An explicit `Schema` object in which all table schemas will be grouped. By default `dlt` takes the schema from the source (if passed in `data` argument) or creates a default one itself."""
+    loader_file_format: Optional[TLoaderFileFormat]
+    """The file format the loader will use to create the load package. Not all file_formats are compatible with all destinations. Defaults to the preferred file format of the selected destination."""
+    table_format: Optional[TTableFormat]
+    """The table format used by the destination to store tables. Currently you can select table format on filesystem and Athena destinations."""
+    schema_contract: Optional[TSchemaContract]
+    """On override for the schema contract settings, this will replace the schema contract settings for all tables in the schema. Defaults to None."""
+    refresh: Optional[TRefreshMode]
+    """Fully or partially reset sources before loading new data in this run. The following refresh modes are supported:
+        * `drop_sources` - Drop tables and source and resource state for all sources currently being processed in `run` or `extract` methods of the pipeline. (Note: schema history is erased)
+        * `drop_resources`-  Drop tables and resource state for all resources being processed. Source level state is not modified. (Note: schema history is erased)
+        * `drop_data` - Wipe all data and resource state for all resources being processed. Schema is not modified."""
+
+
 class SupportsPipeline(Protocol):
     """A protocol with core pipeline operations that lets high level abstractions ie. sources to access pipeline methods and properties"""
 
@@ -508,21 +549,7 @@ def set_local_state_val(self, key: str, value: Any) -> None:
     def get_local_state_val(self, key: str) -> Any:
         """Gets value from local state. Local state is not synchronized with destination."""
 
-    def run(
-        self,
-        data: Any = None,
-        *,
-        destination: TDestinationReferenceArg = None,
-        dataset_name: str = None,
-        credentials: Any = None,
-        table_name: str = None,
-        write_disposition: TWriteDispositionConfig = None,
-        columns: Sequence[TColumnSchema] = None,
-        primary_key: TColumnNames = None,
-        schema: Schema = None,
-        loader_file_format: TLoaderFileFormat = None,
-        schema_contract: TSchemaContract = None,
-    ) -> LoadInfo: ...
+    def run(self, data: Any = None, **kwargs: Unpack[SupportsPipelineRunArgs]) -> LoadInfo: ...
 
     def _set_context(self, is_active: bool) -> None:
         """Called when pipeline context activated or deactivate"""
@@ -532,19 +559,7 @@ def _make_schema_with_default_name(self) -> Schema:
 
 
 class SupportsPipelineRun(Protocol):
-    def __call__(
-        self,
-        *,
-        destination: TDestinationReferenceArg = None,
-        dataset_name: str = None,
-        credentials: Any = None,
-        table_name: str = None,
-        write_disposition: TWriteDispositionConfig = None,
-        columns: Sequence[TColumnSchema] = None,
-        schema: Schema = None,
-        loader_file_format: TLoaderFileFormat = None,
-        schema_contract: TSchemaContract = None,
-    ) -> LoadInfo: ...
+    def __call__(self, data: Any = None, **kwargs: Unpack[SupportsPipelineRunArgs]) -> LoadInfo: ...
 
 
 @configspec
diff --git a/dlt/common/schema/__init__.py b/dlt/common/schema/__init__.py
index 9cb5e2ab76..8533962aa0 100644
--- a/dlt/common/schema/__init__.py
+++ b/dlt/common/schema/__init__.py
@@ -8,6 +8,8 @@
     TColumnHint,
     TColumnSchema,
     TColumnSchemaBase,
+    TAnySchemaColumns,
+    TTableFormat,
 )
 from dlt.common.schema.typing import COLUMN_HINTS
 from dlt.common.schema.schema import Schema, DEFAULT_SCHEMA_CONTRACT_MODE
@@ -23,6 +25,8 @@
     "TColumnHint",
     "TColumnSchema",
     "TColumnSchemaBase",
+    "TAnySchemaColumns",
+    "TTableFormat",
     "COLUMN_HINTS",
     "Schema",
     "verify_schema_hash",
diff --git a/dlt/pipeline/__init__.py b/dlt/pipeline/__init__.py
index 175bee33b3..37afe832ae 100644
--- a/dlt/pipeline/__init__.py
+++ b/dlt/pipeline/__init__.py
@@ -10,12 +10,18 @@
     TSchemaContract,
 )
 
-from dlt.common.typing import TSecretStrValue, Any
+from dlt.common.typing import TSecretStrValue, Any, Unpack
 from dlt.common.configuration import with_config
 from dlt.common.configuration.container import Container
 from dlt.common.configuration.inject import get_orig_args, last_config
 from dlt.common.destination import TLoaderFileFormat, Destination, TDestinationReferenceArg
-from dlt.common.pipeline import LoadInfo, PipelineContext, get_dlt_pipelines_dir, TRefreshMode
+from dlt.common.pipeline import (
+    LoadInfo,
+    PipelineContext,
+    get_dlt_pipelines_dir,
+    TRefreshMode,
+    SupportsPipelineRunArgs,
+)
 from dlt.common.runtime import apply_runtime_config, init_telemetry
 
 from dlt.pipeline.configuration import PipelineConfiguration, ensure_correct_pipeline_kwargs
@@ -235,18 +241,7 @@ def attach(
 
 def run(
     data: Any,
-    *,
-    destination: TDestinationReferenceArg = None,
-    staging: TDestinationReferenceArg = None,
-    dataset_name: str = None,
-    table_name: str = None,
-    write_disposition: TWriteDispositionConfig = None,
-    columns: Sequence[TColumnSchema] = None,
-    schema: Schema = None,
-    loader_file_format: TLoaderFileFormat = None,
-    table_format: TTableFormat = None,
-    schema_contract: TSchemaContract = None,
-    refresh: Optional[TRefreshMode] = None,
+    **kwargs: Unpack[SupportsPipelineRunArgs],
 ) -> LoadInfo:
     """Loads the data in `data` argument into the destination specified in `destination` and dataset specified in `dataset_name`.
 
@@ -266,61 +261,14 @@ def run(
     Next it will make sure that data from the previous is fully processed. If not, `run` method normalizes and loads pending data items.
     Only then the new data from `data` argument is extracted, normalized and loaded.
 
-    Args:
-        data (Any): Data to be loaded to destination
-
-        destination (str | DestinationReference, optional): A name of the destination to which dlt will load the data, or a destination module imported from `dlt.destination`.
-            If not provided, the value passed to `dlt.pipeline` will be used.
-
-        dataset_name (str, optional): A name of the dataset to which the data will be loaded. A dataset is a logical group of tables ie. `schema` in relational databases or folder grouping many files.
-            If not provided, the value passed to `dlt.pipeline` will be used. If not provided at all then defaults to the `pipeline_name`
-
-        table_name (str, optional): The name of the table to which the data should be loaded within the `dataset`. This argument is required for a `data` that is a list/Iterable or Iterator without `__name__` attribute.
-            The behavior of this argument depends on the type of the `data`:
-            * generator functions: the function name is used as table name, `table_name` overrides this default
-            * `@dlt.resource`: resource contains the full table schema and that includes the table name. `table_name` will override this property. Use with care!
-            * `@dlt.source`: source contains several resources each with a table schema. `table_name` will override all table names within the source and load the data into single table.
-
-        write_disposition (TWriteDispositionConfig, optional): Controls how to write data to a table. Accepts a shorthand string literal or configuration dictionary.
-            Allowed shorthand string literals: `append` will always add new data at the end of the table. `replace` will replace existing data with new data. `skip` will prevent data from loading. "merge" will deduplicate and merge data based on "primary_key" and "merge_key" hints. Defaults to "append".
-            Write behaviour can be further customized through a configuration dictionary. For example, to obtain an SCD2 table provide `write_disposition={"disposition": "merge", "strategy": "scd2"}`.
-            Please note that in case of `dlt.resource` the table schema value will be overwritten and in case of `dlt.source`, the values in all resources will be overwritten.
-
-        columns (Sequence[TColumnSchema], optional): A list of column schemas. Typed dictionary describing column names, data types, write disposition and performance hints that gives you full control over the created table schema.
-
-        schema (Schema, optional): An explicit `Schema` object in which all table schemas will be grouped. By default `dlt` takes the schema from the source (if passed in `data` argument) or creates a default one itself.
-
-        loader_file_format (Literal["jsonl", "insert_values", "parquet"], optional): The file format the loader will use to create the load package. Not all file_formats are compatible with all destinations. Defaults to the preferred file format of the selected destination.
-
-        table_format (Literal["delta", "iceberg"], optional): The table format used by the destination to store tables. Currently you can select table format on filesystem and Athena destinations.
-
-        schema_contract (TSchemaContract, optional): On override for the schema contract settings, this will replace the schema contract settings for all tables in the schema. Defaults to None.
-
-        refresh (str | TRefreshMode): Fully or partially reset sources before loading new data in this run. The following refresh modes are supported:
-            * `drop_sources`: Drop tables and source and resource state for all sources currently being processed in `run` or `extract` methods of the pipeline. (Note: schema history is erased)
-            * `drop_resources`: Drop tables and resource state for all resources being processed. Source level state is not modified. (Note: schema history is erased)
-            * `drop_data`: Wipe all data and resource state for all resources being processed. Schema is not modified.
-
     Raises:
         PipelineStepFailed: when a problem happened during `extract`, `normalize` or `load` steps.
     Returns:
         LoadInfo: Information on loaded data including the list of package ids and failed job statuses. Please not that `dlt` will not raise if a single job terminally fails. Such information is provided via LoadInfo.
     """
-    destination = Destination.from_reference(destination)
-    return pipeline().run(
-        data,
-        destination=destination,
-        staging=staging,
-        dataset_name=dataset_name,
-        table_name=table_name,
-        write_disposition=write_disposition,
-        columns=columns,
-        schema=schema,
-        loader_file_format=loader_file_format,
-        table_format=table_format,
-        schema_contract=schema_contract,
-        refresh=refresh,
-    )
+    destination = Destination.from_reference(kwargs.pop("destination", None))
+    kwargs["destination"] = destination
+    return pipeline().run(data, **kwargs)
 
 
 # plug default tracking module
diff --git a/dlt/pipeline/pipeline.py b/dlt/pipeline/pipeline.py
index 84d8c2c65b..b38169c097 100644
--- a/dlt/pipeline/pipeline.py
+++ b/dlt/pipeline/pipeline.py
@@ -17,6 +17,7 @@
     ContextManager,
     Union,
 )
+from typing_extensions import Unpack
 
 import dlt
 from dlt.common import logger
@@ -84,6 +85,7 @@
     LoadInfo,
     NormalizeInfo,
     PipelineContext,
+    SupportsPipelineRunArgs,
     TStepInfo,
     SupportsPipeline,
     TPipelineLocalState,
@@ -618,20 +620,7 @@ def load(
     def run(
         self,
         data: Any = None,
-        *,
-        destination: TDestinationReferenceArg = None,
-        staging: TDestinationReferenceArg = None,
-        dataset_name: str = None,
-        credentials: Any = None,
-        table_name: str = None,
-        write_disposition: TWriteDispositionConfig = None,
-        columns: TAnySchemaColumns = None,
-        primary_key: TColumnNames = None,
-        schema: Schema = None,
-        loader_file_format: TLoaderFileFormat = None,
-        table_format: TTableFormat = None,
-        schema_contract: TSchemaContract = None,
-        refresh: Optional[TRefreshMode] = None,
+        **kwargs: Unpack[SupportsPipelineRunArgs],
     ) -> LoadInfo:
         """Loads the data from `data` argument into the destination specified in `destination` and dataset specified in `dataset_name`.
 
@@ -651,51 +640,16 @@ def run(
         Next it will make sure that data from the previous is fully processed. If not, `run` method normalizes, loads pending data items and **exits**
         If there was no pending data, new data from `data` argument is extracted, normalized and loaded.
 
-        Args:
-            data (Any): Data to be loaded to destination
-
-            destination (str | DestinationReference, optional): A name of the destination to which dlt will load the data, or a destination module imported from `dlt.destination`.
-                If not provided, the value passed to `dlt.pipeline` will be used.
-
-            dataset_name (str, optional): A name of the dataset to which the data will be loaded. A dataset is a logical group of tables ie. `schema` in relational databases or folder grouping many files.
-                If not provided, the value passed to `dlt.pipeline` will be used. If not provided at all then defaults to the `pipeline_name`
-
-            credentials (Any, optional): Credentials for the `destination` ie. database connection string or a dictionary with google cloud credentials.
-                In most cases should be set to None, which lets `dlt` to use `secrets.toml` or environment variables to infer right credentials values.
-
-            table_name (str, optional): The name of the table to which the data should be loaded within the `dataset`. This argument is required for a `data` that is a list/Iterable or Iterator without `__name__` attribute.
-                The behavior of this argument depends on the type of the `data`:
-                * generator functions - the function name is used as table name, `table_name` overrides this default
-                * `@dlt.resource` - resource contains the full table schema and that includes the table name. `table_name` will override this property. Use with care!
-                * `@dlt.source` - source contains several resources each with a table schema. `table_name` will override all table names within the source and load the data into single table.
-
-            write_disposition (TWriteDispositionConfig, optional): Controls how to write data to a table. Accepts a shorthand string literal or configuration dictionary.
-                Allowed shorthand string literals: `append` will always add new data at the end of the table. `replace` will replace existing data with new data. `skip` will prevent data from loading. "merge" will deduplicate and merge data based on "primary_key" and "merge_key" hints. Defaults to "append".
-                Write behaviour can be further customized through a configuration dictionary. For example, to obtain an SCD2 table provide `write_disposition={"disposition": "merge", "strategy": "scd2"}`.
-                Please note that in case of `dlt.resource` the table schema value will be overwritten and in case of `dlt.source`, the values in all resources will be overwritten.
-
-            columns (Sequence[TColumnSchema], optional): A list of column schemas. Typed dictionary describing column names, data types, write disposition and performance hints that gives you full control over the created table schema.
-
-            primary_key (str | Sequence[str]): A column name or a list of column names that comprise a private key. Typically used with "merge" write disposition to deduplicate loaded data.
-
-            schema (Schema, optional): An explicit `Schema` object in which all table schemas will be grouped. By default `dlt` takes the schema from the source (if passed in `data` argument) or creates a default one itself.
-
-            loader_file_format (Literal["jsonl", "insert_values", "parquet"], optional): The file format the loader will use to create the load package. Not all file_formats are compatible with all destinations. Defaults to the preferred file format of the selected destination.
-
-            table_format (Literal["delta", "iceberg"], optional): The table format used by the destination to store tables. Currently you can select table format on filesystem and Athena destinations.
-
-            schema_contract (TSchemaContract, optional): On override for the schema contract settings, this will replace the schema contract settings for all tables in the schema. Defaults to None.
-
-            refresh (str | TRefreshMode): Fully or partially reset sources before loading new data in this run. The following refresh modes are supported:
-                * `drop_sources` - Drop tables and source and resource state for all sources currently being processed in `run` or `extract` methods of the pipeline. (Note: schema history is erased)
-                * `drop_resources`-  Drop tables and resource state for all resources being processed. Source level state is not modified. (Note: schema history is erased)
-                * `drop_data` - Wipe all data and resource state for all resources being processed. Schema is not modified.
-
         Raises:
             PipelineStepFailed: when a problem happened during `extract`, `normalize` or `load` steps.
         Returns:
             LoadInfo: Information on loaded data including the list of package ids and failed job statuses. Please not that `dlt` will not raise if a single job terminally fails. Such information is provided via LoadInfo.
         """
+        destination = kwargs.get("destination", None)
+        credentials = kwargs.get("credentials", None)
+        staging = kwargs.get("staging", None)
+        dataset_name = kwargs.get("dataset_name", None)
+        loader_file_format = kwargs.get("loader_file_format", None)
 
         signals.raise_if_signalled()
         self.activate()
@@ -731,14 +685,14 @@ def run(
         if data is not None:
             self.extract(
                 data,
-                table_name=table_name,
-                write_disposition=write_disposition,
-                columns=columns,
-                primary_key=primary_key,
-                schema=schema,
-                table_format=table_format,
-                schema_contract=schema_contract,
-                refresh=refresh or self.refresh,
+                table_name=kwargs.get("table_name", None),
+                write_disposition=kwargs.get("write_disposition", None),
+                columns=kwargs.get("columns", None),
+                primary_key=kwargs.get("primary_key", None),
+                schema=kwargs.get("schema", None),
+                table_format=kwargs.get("table_format", None),
+                schema_contract=kwargs.get("schema_contract", None),
+                refresh=kwargs.get("refresh", self.refresh),
             )
             self.normalize(loader_file_format=loader_file_format)
             return self.load(destination, dataset_name, credentials=credentials)