-
Notifications
You must be signed in to change notification settings - Fork 10
slurm cloud integration via external services
The udp CHPL_COMM_SUBSTRATE is the most portable GASNET communication substrate, operating over TCP/IP.
Deploying Chapel programs (in this case Arkouda) on slurm via C spawner/srun requires a file such as this one:
#!/bin/bash
#
#SBATCH --job-name=arkouda-3-node
#SBATCH --output=/tmp/arkouda.out
#SBATCH --mem=4096
#SBATCH --ntasks=3
#SBATCH --uid=1000
#SBATCH --nodes=3
export CHPL_COMM_SUBSTRATE=udp
export GASNET_MASTERIP=ace
export SSH_SERVERS='finkel einhorn shickadance'
export GASNET_SPAWNFN=C
export GASNET_CSPAWN_CMD="srun -N%N %C"
/tmp/arkouda_server -nl 3
Chapel jobs can also be deployed via Slurm using the S spawner:
#!/bin/bash
#
#SBATCH --job-name=arkouda-3-node
#SBATCH --output=/tmp/arkouda.out
#SBATCH --mem=4096
#SBATCH --ntasks=3
#SBATCH --uid=1000
#SBATCH --nodes=3
export CHPL_COMM_SUBSTRATE=udp
export GASNET_MASTERIP=ace
export SSH_SERVERS='finkel einhorn shickadance'
export GASNET_SPAWNFN=S
/tmp/arkouda_server -nl 3
Important note: to ensure one Chapel process per node, two elements must be present (1) --nodes parameter indicating number of hosts and (2) --ntasks with a number matching --nodes.
Defining an external service enables access to an HPC app on Slurm such as Arkouda via slurm-jupyter in Kubernetes. As explained in this excellent article, there are two elements to accessing an external, non-Kubernetes service: (1) a Service definition with no ClusterIP and (2) and Endpoints definition that maps the Service to an external IP address/hostname and port. Examples of each are as follows:
apiVersion: v1
kind: Service
metadata:
name: arkouda
spec:
ports:
- protocol: TCP
port: 5555
targetPort: 5555
---------------------------------
apiVersion: v1
kind: Endpoints
metadata:
name: arkouda
subsets:
- addresses:
- ip: 19.168.1.11
ports:
- port: 5555
To fix a slurmd node in "draining" state, use the method discussed here:
socntrol
scontrol: update NodeName=node10 State=DOWN Reason="undraining"
scontrol: update NodeName=node10 State=RESUME
curl -k \
-X POST \
--cert $CERT_FILE \
--key $KEY_FILE \
--cacert $K8S_CACERT_FILE \
-d @- \
-H 'Accept: application/json' \
-H 'Content-Type: application/json' \
$K8S_HOST/api/v1/namespaces/$NAMESPACE/services <<'EOF'
{
"apiVersion": "v1",
"kind": "Service",
"metadata": {
"name": "$SERVICE_NAME"
},
"spec": {"ports": [{"port": $SERVICE_PORT,"protocol": "TCP","targetPort": $TARGET_SERVICE_PORT}]}
}
curl -k \
-X POST \
--cert $CERT_FILE \
--key $KEY_FILE \
--cacert $CACERT_FILE \
-d @- \
-H 'Accept: application/json' \
-H 'Content-Type: application/json' \
$K8S_HOST/api/v1/namespaces/$NAMESPACE/endpoints <<'EOF'
{
"kind": "Endpoints",
"apiVersion": "v1",
"metadata": {
"name": "$ENDPOINT_NAME"
},
"subsets": [
{
"addresses": [
{
"ip": "$ENDPOINT_IP"
}
],
"ports": [
{
"port": $ENDPOINT_PORT,
"protocol": "TCP"
}
]
}
]
}
Slurm job metadata can be conveniently shared with Kubernetes objects such as k8s external services via Kubernetes Annotations which is an instance attribute of Metadata. Annotations are implemented within Python as a Dict[str,str] and can be set as Metadata instance attribute as follows:
from kubernetes.client import V1ObjectMeta
metadata = V1ObjectMetadata()
metadata.annotations = {'foo':'bar'}
from slurm_rest import Configuration, ApiClient, ApiException
from slurm_rest.apis import SlurmApi
def getSlurmClient(userName : str, token : str, restdHost) -> SlurmApi:
configuration = Configuration(host=restdHost)
configuration.api_key['user'] = userName
configuration.api_key['token'] = token
configuration.discard_unknown_keys=True
api_client = ApiClient(configuration)
return SlurmApi(api_client)
client = getSlurmClient(userName='slurm', token=token, restdHost='http://einhorn:6820')
job = client.slurmctld_get_job(729).to_dict()['jobs'][0]
from kubernetes.client import Configuration, ApiClient, CoreV1Api, AppsV1Api, V1Pod, \
V1Service, V1ServiceSpec, V1ServicePort, V1Endpoints, V1EndpointSubset, \
V1EndpointAddress, CoreV1EndpointPort, V1ObjectMeta
from collections import namedtuple
from typing import Dict, List
'''
Generic Integration Error class used to re-raise lib-specific exceptions and errors
'''
class IntegtrationError(Exception):
pass
SlurmJobSummary = namedtuple('SlurmJobSummary', ['jobId', 'jobName','nodes','userId'])
def getSlurmJobAnnotations(job : Dict) -> Dict:
... summary = getSlurmJobSummary(job)
... return {'slurm_job_id':str(summary.jobId),
... 'slurm_job':summary.jobName,
... 'slurm_nodes':summary.nodes,
... 'slurm_user_id':str(summary.userId)
... }SlurmJobSummary = namedtuple('SlurmJobSummary', ['jobId', 'jobName','nodes','userId'])
def createExternalService(coreClient : CoreV1Api, serviceName : str, externalServiceName : str, port : int,
targetPort : int, ip : str, namespace : str='default',
metadata : V1ObjectMeta=None) -> None:
'''
Creates a k8s Service without an app selector and with an Endpoints object to enable connectivity to
services hosted outside of Kubernetes.
'''
service = V1Service()
service.kind = "Service"
if metadata:
metadata.name=serviceName
service.metadata = metadata
else:
service.metadata = V1ObjectMeta(name=serviceName)
spec = V1ServiceSpec()
spec.ports = [V1ServicePort(protocol='TCP', port=port,
target_port=targetPort)]
service.spec = spec
try:
coreClient.create_namespaced_service(namespace=namespace,
body=service)
except ApiException as e:
raise IntegrationError(e)
createEndpoints(coreClient=coreClient, serviceName=serviceName, ip=ip, port=port, namespace=namespace)
def createEndpoints(coreClient : CoreV1Api, serviceName : str, ip : str, port : int,
namespace : str='default') -> None:
'''
Creates a k8s Endpoints object that, in combination with a Service object w/ no app selector,
enables connectivity to services external to Kubernetes.
'''
endpoints = V1Endpoints()
metadata = V1ObjectMeta()
metadata.name='arkouda'
endpoints.metadata=metadata
subset=V1EndpointSubset()
address = V1EndpointAddress(ip=ip)
subset.addresses=[address]
port = CoreV1EndpointPort(port=port)
subset.ports = [port]
endpoints.subsets = [subset]
# Execute k8s API call
try:
coreClient.create_namespaced_endpoints(namespace=namespace, body=endpoints)
except ApiException as e:
raise IntegrationError(e)
# Create Annotations from Slurm job
metadata = V1ObjectMeta()
metadata.annotations = getSlurmJobAnnotations(job)