Skip to content

Commit

Permalink
feat: allow multiple versions of one application
Browse files Browse the repository at this point in the history
  • Loading branch information
RDWimmers committed Jul 18, 2024
1 parent 40ea341 commit 0a9bb22
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 45 deletions.
10 changes: 8 additions & 2 deletions docs/tree/getting_started.rst
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,23 @@ The :class:`~.NucleiClient` can provide you with a list of `Nuclei` applications
print(client.applications)
And can also fetch the available versions for an application for you:

.. ipython:: python
versions = print(client.get_versions(app="PileCore"))
And can also fetch the available endpoints for an application for you:

.. ipython:: python
endpoints = print(client.get_endpoints("PileCore"))
endpoints = print(client.get_endpoints(app="PileCore", version="latest"))
You can also check the applications to which you have full access:

.. ipython:: python
endpoints = print(client.user_permissions)
permissions = print(client.user_permissions)
If an application is not listed here, your usage of the app is limited. Check the
documentation of the specific apps to see the limitations.
Expand Down
170 changes: 132 additions & 38 deletions src/nuclei/client/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from typing import Any, List, Literal, Optional, Union

import jwt
import requests

from nuclei import create_session

Expand All @@ -23,10 +22,23 @@
)

ROUTING = {
"PileCore": "https://crux-nuclei.com/api/pilecore/v2",
"VibraCore": "https://crux-nuclei.com/api/vibracore/v2",
"CPT Core": "https://crux-nuclei.com/api/cptcore/v1",
"ShallowCore": "https://crux-nuclei.com/api/shallowcore/v1",
"PileCore": {
"v2": "https://crux-nuclei.com/api/pilecore/v2",
"v3": "https://crux-nuclei.com/api/pilecore/v3",
"latest": "https://crux-nuclei.com/api/pilecore/v3",
},
"VibraCore": {
"v2": "https://crux-nuclei.com/api/vibracore/v2",
"latest": "https://crux-nuclei.com/api/vibracore/v2",
},
"CPT Core": {
"v1": "https://crux-nuclei.com/api/cptcore/v1",
"latest": "https://crux-nuclei.com/api/cptcore/v1",
},
"ShallowCore": {
"v1": "https://crux-nuclei.com/api/shallowcore/v1",
"latest": "https://crux-nuclei.com/api/shallowcore/v1",
},
}

DEFAULT_REQUEST_TIMEOUT = 5
Expand Down Expand Up @@ -60,14 +72,16 @@ def __init__(self) -> None:
# set default timeout
self.timeout = DEFAULT_REQUEST_TIMEOUT

def get_url(self, app: str) -> str:
def get_url(self, app: str, version: str) -> str:
"""
Get API's url
Parameters
----------
app : str
Application name
version : str
Application version
Returns
-------
Expand All @@ -76,20 +90,27 @@ def get_url(self, app: str) -> str:
Raises
-------
TypeError:
Wrong type for `app` argument
Wrong type for `app` or `version` argument
ValueError:
Wrong value for `app` argument
Wrong value for `app` or `version` argument
"""
if not isinstance(app, str):
raise TypeError(
f"Expected positional argument `app` to be of type <class 'str'>, but got type: {type(app)}"
)

if app in self.applications:
return self.routing[app]
raise ValueError(
f"Application not available, please select one of the following valid applications {self.applications}"
)
if not isinstance(version, str):
raise TypeError(
f"Expected positional argument `version` to be of type <class 'str'>, but got type: {type(version)}"
)
if app not in self.applications:
raise ValueError(
f"Application not available, please select one of the following valid applications {self.applications}"
)
if version not in self.get_versions(app):
raise ValueError(
f"Application version not available, please select one of the following valid versions {self.get_versions(app)}"
)
return self.routing[app][version]

@property
def user_permissions(self) -> List[str | None]:
Expand Down Expand Up @@ -119,15 +140,38 @@ def applications(self) -> List[str]:
"""
return list(self.routing.keys())

def get_versions(self, app: str) -> List[str]:
"""
Provide available API's versions in the Nuclei landscape.
Parameters
----------
app : str
Application name
Returns
-------
out : list[str]
Versions of the API's
"""
if not isinstance(app, str):
raise TypeError(
f"Expected positional argument `app` to be of type <class 'str'>, but got type: {type(app)}"
)

return list(self.routing[app].keys())

@lru_cache(16)
def _get_app_specification(self, app: str) -> dict:
def _get_app_specification(self, app: str, version: str) -> dict:
"""
Private methode to get the JSON schema of the API documentation.
Parameters
----------
app : str
Name of the API.
version : str
Application version
Returns
-------
Expand All @@ -138,17 +182,22 @@ def _get_app_specification(self, app: str) -> dict:
ConnectionError:
Application not available
TypeError:
Wrong type for `app` argument
Wrong type for `app` or `version` argument
ValueError:
Wrong value for `app` argument
Wrong value for `app` or `version` argument
"""
if not isinstance(app, str):
raise TypeError(
f"Expected positional argument `app` to be of type <class 'str'>, but got type: {type(app)}"
)

response = requests.get(
self.get_url(app) + "/openapi.json", timeout=self.timeout
if not isinstance(version, str):
raise TypeError(
f"Expected positional argument `version` to be of type <class 'str'>, but got type: {type(version)}"
)

response = self.session.get(
self.get_url(app, version) + "/openapi.json", timeout=self.timeout
)
if response.status_code != 200:
raise ConnectionError(
Expand All @@ -158,14 +207,16 @@ def _get_app_specification(self, app: str) -> dict:
)
return response.json()

def get_application_version(self, app: str) -> str:
def get_application_version(self, app: str, version: str) -> str:
"""
Provide version of the API in the Nuclei landscape.
Parameters
----------
app : str
Name of the API.
version : str
Application version
Returns
-------
Expand All @@ -177,25 +228,32 @@ def get_application_version(self, app: str) -> str:
ConnectionError:
Application not available
TypeError:
Wrong type for `app` argument
Wrong type for `app` or `version` argument
ValueError:
Wrong value for `app` argument
Wrong value for `app` or `version` argument
"""
if not isinstance(app, str):
raise TypeError(
f"Expected positional argument `app` to be of type <class 'str'>, but got type: {type(app)}"
)

return self._get_app_specification(app)["info"]["version"]
if not isinstance(version, str):
raise TypeError(
f"Expected positional argument `version` to be of type <class 'str'>, but got type: {type(version)}"
)

return self._get_app_specification(app, version)["info"]["version"]

def get_endpoints(self, app: str) -> List[str]:
def get_endpoints(self, app: str, version: str) -> List[str]:
"""
Get available endpoints of single API.
Parameters
----------
app : str
Name of the API.
version : str
Application version
Returns
-------
Expand All @@ -207,28 +265,33 @@ def get_endpoints(self, app: str) -> List[str]:
ConnectionError:
Application not available
TypeError:
Wrong type for `app` argument
Wrong type for `app` or `version` argument
ValueError:
Wrong value for `app` argument
Wrong value for `app` or `version` argument
"""
if not isinstance(app, str):
raise TypeError(
f"Expected positional argument `app` to be of type <class 'str'>, but got type: {type(app)}"
)

return list(self._get_app_specification(app)["paths"].keys())
return list(self._get_app_specification(app, version)["paths"].keys())

def get_endpoint_type(self, app: str, endpoint: str) -> str:
def get_endpoint_type(self, app: str, version: str, endpoint: str) -> List[str]:
"""
Get HTTP methode used in endpoint.
Parameters
----------
app
name of the app
version : str
Application version
endpoint
url of the endpoint.
Returns
-------
"get" | "post"
List[str]
Raises
-------
Expand All @@ -244,22 +307,30 @@ def get_endpoint_type(self, app: str, endpoint: str) -> str:
f"Expected positional argument `app` to be of type <class 'str'>, but got type: {type(app)}"
)

if not isinstance(version, str):
raise TypeError(
f"Expected positional argument `app` to be of type <class 'str'>, but got type: {type(version)}"
)

if not isinstance(endpoint, str):
raise TypeError(
f"Expected positional argument `endpoint` to be of type <class 'str'>, but got type: {type(endpoint)}"
)

if endpoint in self.get_endpoints(app):
return list(self._get_app_specification(app)["paths"][endpoint].keys())[0]
if endpoint in self.get_endpoints(app, version):
return list(
self._get_app_specification(app, version)["paths"][endpoint].keys()
)
raise ValueError(
f"Endpoint name not valid, please select on of the following valid endpoints {self.get_endpoints(app)}"
f"Endpoint name not valid, please select on of the following valid endpoints {self.get_endpoints(app, version)}"
)

def call_endpoint(
self,
app: str,
endpoint: str,
methode: Literal["auto", "get", "post"] = "auto",
version: str = "latest",
schema: Optional[Union[dict, str]] = None,
return_response: bool = False,
) -> Any:
Expand All @@ -275,13 +346,17 @@ def call_endpoint(
Parameters
----------
app: str
Name of the API. call `get_applications` to obtain a list with all applications.
Name of the API. call `applications` to obtain a list with all applications.
endpoint: str
Name of the API's endpoint. call `get_endpoints` to obtain a list with all applications for a given API.
methode: str
default is auto
default is auto
HTTP methode used to call endpoint. When auto methode is selected the HTTP methode is
obtained from the openapi docs.
obtained from the openapi docs. Please note that this is the first one. call `get_endpoint_type`
to obtain list with all methods related to the endpoint.
version: str
default is latest
API version used. call `get_versions` to obtain a list with all version of a specific application.
schema: dict or json-string, optional
Default is None
The parameter schema for the API. Take a look at the API documentation.
Expand Down Expand Up @@ -321,11 +396,21 @@ def call_endpoint(
f"Expected positional argument `app` to be of type <class 'str'>, but got type: {type(app)}"
)

if app not in self.applications:
raise ValueError(
f"Application not available, please select one of the following valid applications {self.applications}"
)

if not isinstance(endpoint, str):
raise TypeError(
f"Expected positional argument `endpoint` to be of type <class 'str'>, but got type: {type(endpoint)}"
)

if endpoint not in self.get_endpoints(app, version):
raise ValueError(
f"Endpoint name not valid, please select on of the following valid endpoints {self.get_endpoints(app, version)}"
)

if not isinstance(methode, str):
raise TypeError(
f"Expected keyword-argument `methode` to be of type <class 'str'>, but got type: {type(methode)}"
Expand All @@ -334,6 +419,15 @@ def call_endpoint(
raise ValueError(
f'Expected value of keyword-argument `methode` to be one of ["auto", "get", "post"] , but got: {methode}'
)
if not isinstance(version, str):
raise TypeError(
f"Expected keyword-argument `version` to be of type <class 'str'>, but got type: {type(version)}"
)

if version not in self.get_versions(app):
raise ValueError(
f"Application version not available, please select one of the following valid versions {self.get_versions(app)}"
)

if not (schema is None or isinstance(schema, (str, dict))):
raise TypeError(
Expand All @@ -346,7 +440,7 @@ def call_endpoint(
)

if methode == "auto":
t = self.get_endpoint_type(app, endpoint)
t = self.get_endpoint_type(app=app, version=version, endpoint=endpoint)[0]
else:
t = methode

Expand All @@ -368,13 +462,13 @@ def call_endpoint(

if t.lower() == "get":
response = self.session.get(
self.get_url(app) + endpoint,
self.get_url(app=app, version=version) + endpoint,
params=utils.serialize_jsonifyable_object(schema),
timeout=self.timeout,
)
elif t.lower() == "post":
response = self.session.post(
self.get_url(app) + endpoint,
self.get_url(app=app, version=version) + endpoint,
json=utils.serialize_jsonifyable_object(schema),
timeout=self.timeout,
)
Expand Down
Loading

0 comments on commit 0a9bb22

Please sign in to comment.