diff --git a/.github/workflows/python-package-windows.yml b/.github/workflows/python-package-windows.yml index ac9d9d90..3c48ef05 100644 --- a/.github/workflows/python-package-windows.yml +++ b/.github/workflows/python-package-windows.yml @@ -12,7 +12,7 @@ on: jobs: build: - runs-on: windows-latest + runs-on: windows-2019 strategy: matrix: python-version: [3.8, 3.9, '3.10'] diff --git a/.gitignore b/.gitignore index 50310f50..29f52038 100755 --- a/.gitignore +++ b/.gitignore @@ -29,7 +29,6 @@ examples/*.json examples/*.html # Image/Video files -*.png *.mp4 # Data files diff --git a/benchmark/.gitignore b/benchmark/.gitignore new file mode 100644 index 00000000..d762f466 --- /dev/null +++ b/benchmark/.gitignore @@ -0,0 +1,2 @@ +run-*.yml +plot.yml diff --git a/benchmark/Dockerfile b/benchmark/Dockerfile new file mode 100644 index 00000000..2897c3c8 --- /dev/null +++ b/benchmark/Dockerfile @@ -0,0 +1,31 @@ +FROM nvidia/cuda:12.2.0-runtime-ubuntu22.04 + +RUN echo 'debconf debconf/frontend select Noninteractive' | debconf-set-selections && \ + apt-get update && \ + apt-get install -y \ + python3 python3-pip \ + && \ + apt-get autoremove && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* + +COPY ./requirements.txt giotto-deep/ + +RUN cd giotto-deep && \ + pip3 install --no-cache-dir --disable-pip-version-check -r requirements.txt + +COPY ./benchmark/requirements.txt giotto-deep/requirements.txt + +RUN cd giotto-deep && \ + pip3 install --no-cache-dir --disable-pip-version-check -r requirements.txt + +COPY ./setup.py giotto-deep/ +COPY ./setup.cfg giotto-deep/ +COPY ./README.md giotto-deep/ +COPY ./gdeep giotto-deep/gdeep/ +COPY ./examples giotto-deep/examples/ +COPY ./benchmark giotto-deep/benchmark/ + +RUN cd giotto-deep && pip3 install --no-cache-dir --disable-pip-version-check -e . + +ENTRYPOINT [ "python3", "/giotto-deep/benchmark/benchmark.py" ] diff --git a/benchmark/README.md b/benchmark/README.md new file mode 100644 index 00000000..499fa99f --- /dev/null +++ b/benchmark/README.md @@ -0,0 +1,415 @@ +# Run benchmark on GKE + +The benchmark uses: + +- Google Cloud Artifact Registry +- Google Cloud Kubernetes Engine +- Google Cloud Kubernetes Engine Workload +- Google Cloud Storage Buckets +- Google Cloud IAM +- Google Cloud Service accounts + +The benchmark is located in `benchmark/`. + +To configure the commands of this doc, populate the variables below: + +```console +$ PROJECT_NAME="" +$ PROJECT_ID="" +$ CLUSTER_NAME="" +$ CLUSTER_ZONE="" +$ CLUSTER_NODE="${CLUSTER_ZONE}-a" +$ POOL_PREFIX="" +$ BUCKET="" +$ SA_KUBE="" +$ SA_GCLOUD="" +$ ARTIFACT_REGISTRY="" +$ IMAGE_NAME="giotto-deep-benchmark:latest" +$ IMAGE_FULLPATH="${CLUSTER_ZONE}-docker.pkg.dev/${PROJECT_ID}/${ARTIFACT_REGISTRY}/${IMAGE_NAME}" + +$ echo "\n\nOn <${PROJECT_ID}>, for project <${PROJECT_NAME}> use cluster <${CLUSTER_NAME}> on <${CLUSTER_ZONE}> with location <${CLUSTER_NODE}>. Pools have prefix <${POOL_PREFIX}>. The container image <${IMAGE_FULLPATH}> is used and stored in <${ARTIFACT_REGISTRY}>. Kubernetes Service Account is <${SA_KUBE}> and GCP Service Account is <${SA_GCLOUD}>." +``` + +## Available models + +### Orbit 5k + +The batch size may be changed up to 32. + +### Orbit 5k big + +This model defines the batch size maximum based on the number of maximum number of GPUs found. +It is useless to change manually the batch size. One must keep the default batch size used by the model. + +### BERT + BERT big + +The batch size may be changed up to 32. + +## Build deployment + +The Docker image is built on [nvidia/cuda](https://hub.docker.com/r/nvidia/cuda) `runtime` image. +See also doc [Push and pull images](https://cloud.google.com/artifact-registry/docs/docker/pushing-and-pulling). + +Execute this step from the root of the project. + +```console +$ cp benchmark/Dockerfile . +$ docker builder build -t ${IMAGE_FULLPATH} . +$ docker push ${IMAGE_FULLPATH} +$ rm -f Dockerfile +``` + +## Run deployment on GKE + +Some docs: + +- https://sysdig.com/blog/kubernetes-limits-requests/ +- https://kubernetes.io/docs/tasks/configure-pod-container/quality-service-pod/ +- https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/ + +Execute this step from `benchmark/`. + +The pod `giotto-deep-benchmark` in `pod-run.yml` uses an [empty dir memory volume](https://kubernetes.io/docs/concepts/storage/volumes/#emptydir) +to increase the *shared memory*. + +Generate pod configurations with `genpods.py`. +Example for running *orbit5k* with no parallelisation, *FSDP SHARD GRAD OP*, and *pipeline*, and batch sizes 4 to 32, on nodes with 2 and 4 Nvidia T4: + +```console +python genpods.py -i $IMAGE_FULLPATH -b $BUCKET -s $SA_KUBE run -c 2 4 -g t4 -m orbit5k -p none fsdp_shard_grad_op pipeline -z 2 32 +``` + +Run the pod on a node with 2 GPUs. + +```console +$ kubectl apply -f run-orbit5k-t4-2.yml +``` + +Monitor the execution of the pod, adapt ``, ``, and ``. +The correct termination status is *Succeeded* or *Completed*. +When the benchmark is done, the script logs `BENCHMARK DONE. [...]`. + +```console +$ kubectl get pod +$ gcloud logging read "resource.labels.cluster_name=${CLUSTER_NAME} AND resource.labels.namespace_name=default AND resource.labels.container_name=giotto-deep-benchmark---" --limit=3 --format=json | jq '.[].textPayload' +``` + +Retrieve the results from the storage bucket. + +Another subcommand of `benchmark.py`, `plot`, allows to plot aggregated results of different runs. +Generate the pod configuration with `genpods.py`. + +```console +$ python genpods.py -i $IMAGE_FULLPATH -b $BUCKET -s $SA_KUBE plot +$ kubectl apply -f pod-plot.yml +``` + +## Download data from storage bucket + +```console +$ gsutil -m cp -R gs://$BUCKET /path/to/data +``` + +## Monitor resources + +```console +$ kubectl top pod +$ kubectl top node +``` + +## Create cluster + +Some docs: + +- https://cloud.google.com/kubernetes-engine/docs/how-to/gpus +- https://cloud.google.com/compute/docs/gpus/gpu-regions-zones +- https://cloud.google.com/compute/docs/machine-resource +- https://cloud.google.com/compute/docs/general-purpose-machines +- https://cloud.google.com/kubernetes-engine/docs/how-to/persistent-volumes/cloud-storage-fuse-csi-driver +- https://cloud.google.com/kubernetes-engine/docs/how-to/workload-identity +- https://cloud.google.com/iam/docs/service-accounts-create#iam-service-accounts-create-gcloud +- https://cloud.google.com/compute/docs/accelerator-optimized-machines +- https://cloud.google.com/kubernetes-engine/docs/how-to/node-auto-provisioning#gpu_limits + +To create this cluster, one need at least the following rights: + +- Artifact Registry Administrator +- Compute Admin +- IAM Workload Identity Pool Admin +- IAP-secured Tunnel User +- Kubernetes Engine Admin +- Kubernetes Engine Cluster Admin +- Logging Admin +- Security Admin ? +- Service Account Admin +- Storage Admin +- Storage Object Admin ? +- Workload Manager Admin + +Install [gcloud](https://cloud.google.com/sdk/docs/install#deb). +Install [kubectl](https://kubernetes.io/docs/tasks/tools/install-kubectl-linux/#install-using-native-package-management). + +```console +$ kubectl version --client +$ sudo apt install google-cloud-sdk-gke-gcloud-auth-plugin +$ gke-gcloud-auth-plugin --version +$ gcloud auth login +$ gcloud services enable container.googleapis.com +$ gcloud services enable compute.googleapis.com +$ gcloud config set project ${PROJECT_ID} +$ gcloud compute accelerator-types list | grep europe | grep T4 +$ gcloud compute accelerator-types list | grep europe | grep A100 +$ gcloud compute machine-types list | grep europe + +-> Create cluster + +$ gcloud container clusters create ${CLUSTER_NAME} \ + --zone ${CLUSTER_ZONE} \ + --num-nodes 1 \ + --workload-pool ${PROJECT_ID}.svc.id.goog \ + --addons GcsFuseCsiDriver +$ gcloud container clusters describe ${CLUSTER_NAME} --zone ${CLUSTER_ZONE} +$ gcloud container clusters get-credentials ${CLUSTER_NAME} --zone ${CLUSTER_ZONE} +$ kubectl cluster-info +$ kubectl get namespaces +$ kubectl get node +$ xdg-open https://console.cloud.google.com/kubernetes/clusters/details/${CLUSTER_ZONE}/${CLUSTER_NAME}/nodes\?project\=${PROJECT_ID} +$ kubectl create serviceaccount ${SA_KUBE} --namespace default + +-> Update default pool + +$ gcloud container node-pools update default-pool \ + --cluster ${CLUSTER_NAME} \ + --workload-metadata GKE_METADATA \ + --zone ${CLUSTER_ZONE} + +$ gcloud container node-pools update default-pool \ + --cluster ${CLUSTER_NAME} \ + --zone ${CLUSTER_ZONE} \ + --node-locations ${CLUSTER_NODE} + +$ gcloud container node-pools update default-pool \ + --cluster ${CLUSTER_NAME} \ + --zone ${CLUSTER_ZONE} \ + --min-nodes 0 \ + --max-nodes 2 \ + --enable-autoscaling +$ gcloud container clusters describe ${CLUSTER_NAME} --zone ${CLUSTER_ZONE} +$ kubectl get node +$ xdg-open https://console.cloud.google.com/kubernetes/clusters/details/${CLUSTER_ZONE}/${CLUSTER_NAME}/nodes\?project\=${PROJECT_ID} + +-> Create GPU T4 node pool with 2 GPUs + +$ gcloud container node-pools create ${POOL_PREFIX}-t4-2 \ + --cluster ${CLUSTER_NAME} \ + --workload-metadata GKE_METADATA \ + --zone ${CLUSTER_ZONE} \ + --node-locations ${CLUSTER_NODE} \ + --num-nodes 1 \ + --min-nodes 0 \ + --max-nodes 2 \ + --enable-autoscaling \ + --machine-type n1-standard-8 \ + --accelerator count=2,type=nvidia-tesla-t4,gpu-driver-version=default +$ gcloud container clusters describe ${CLUSTER_NAME} --zone ${CLUSTER_ZONE} +$ kubectl get node +$ xdg-open https://console.cloud.google.com/kubernetes/clusters/details/${CLUSTER_ZONE}/${CLUSTER_NAME}/nodes\?project\=${PROJECT_ID} + +-> Create GPU T4 node pool with 4 GPUs + +$ gcloud container node-pools create ${POOL_PREFIX}-t4-4 \ + --cluster ${CLUSTER_NAME} \ + --workload-metadata GKE_METADATA \ + --zone ${CLUSTER_ZONE} \ + --node-locations ${CLUSTER_NODE} \ + --num-nodes 0 \ + --min-nodes 0 \ + --max-nodes 2 \ + --enable-autoscaling \ + --machine-type n1-standard-8 \ + --accelerator count=4,type=nvidia-tesla-t4,gpu-driver-version=default +$ gcloud container clusters describe ${CLUSTER_NAME} --zone ${CLUSTER_ZONE} +$ kubectl get node +$ xdg-open https://console.cloud.google.com/kubernetes/clusters/details/${CLUSTER_ZONE}/${CLUSTER_NAME}/nodes\?project\=${PROJECT_ID} + +-> Create GPU A100 node pool with 2 GPUs + +$ gcloud container node-pools create ${POOL_PREFIX}-a100-2 \ + --cluster ${CLUSTER_NAME} \ + --workload-metadata GKE_METADATA \ + --zone ${CLUSTER_ZONE} \ + --node-locations ${CLUSTER_NODE} \ + --num-nodes 0 \ + --min-nodes 0 \ + --max-nodes 2 \ + --enable-autoscaling \ + --machine-type a2-highgpu-2g \ + --accelerator count=2,type=nvidia-tesla-a100,gpu-driver-version=default +$ gcloud container clusters describe ${CLUSTER_NAME} --zone ${CLUSTER_ZONE} +$ kubectl get node +$ xdg-open https://console.cloud.google.com/kubernetes/clusters/details/${CLUSTER_ZONE}/${CLUSTER_NAME}/nodes\?project\=${PROJECT_ID} + +-> Create GPU A100 node pool with 4 GPUs + +$ gcloud container node-pools create ${POOL_PREFIX}-a100-4 \ + --cluster ${CLUSTER_NAME} \ + --workload-metadata GKE_METADATA \ + --zone ${CLUSTER_ZONE} \ + --node-locations ${CLUSTER_NODE} \ + --num-nodes 0 \ + --min-nodes 0 \ + --max-nodes 2 \ + --enable-autoscaling \ + --machine-type a2-highgpu-4g \ + --accelerator count=4,type=nvidia-tesla-a100,gpu-driver-version=default +$ gcloud container clusters describe ${CLUSTER_NAME} --zone ${CLUSTER_ZONE} +$ kubectl get node +$ xdg-open https://console.cloud.google.com/kubernetes/clusters/details/${CLUSTER_ZONE}/${CLUSTER_NAME}/nodes\?project\=${PROJECT_ID} + +-> Create GPU A100 node pool with 8 GPUs + +$ gcloud container node-pools create ${POOL_PREFIX}-a100-8 \ + --cluster ${CLUSTER_NAME} \ + --workload-metadata GKE_METADATA \ + --zone ${CLUSTER_ZONE} \ + --node-locations ${CLUSTER_NODE} \ + --num-nodes 0 \ + --min-nodes 0 \ + --max-nodes 2 \ + --enable-autoscaling \ + --machine-type a2-highgpu-8g \ + --accelerator count=8,type=nvidia-tesla-a100,gpu-driver-version=default +$ gcloud container clusters describe ${CLUSTER_NAME} --zone ${CLUSTER_ZONE} +$ kubectl get node +$ xdg-open https://console.cloud.google.com/kubernetes/clusters/details/${CLUSTER_ZONE}/${CLUSTER_NAME}/nodes\?project\=${PROJECT_ID} + +-> Create GPU V100 node pool with 2 GPUs + +$ gcloud container node-pools create ${POOL_PREFIX}-v100-2 \ + --cluster ${CLUSTER_NAME} \ + --workload-metadata GKE_METADATA \ + --zone ${CLUSTER_ZONE} \ + --node-locations ${CLUSTER_NODE} \ + --num-nodes 0 \ + --min-nodes 0 \ + --max-nodes 2 \ + --enable-autoscaling \ + --machine-type n1-standard-8 \ + --accelerator count=2,type=nvidia-tesla-v100,gpu-driver-version=default +$ gcloud container clusters describe ${CLUSTER_NAME} --zone ${CLUSTER_ZONE} +$ kubectl get node +$ xdg-open https://console.cloud.google.com/kubernetes/clusters/details/${CLUSTER_ZONE}/${CLUSTER_NAME}/nodes\?project\=${PROJECT_ID}-> Create GPU T4 node pool with 2 GPUs + +-> Create GPU V100 node pool with 4 GPUs + +$ gcloud container node-pools create ${POOL_PREFIX}-v100-4 \ + --cluster ${CLUSTER_NAME} \ + --workload-metadata GKE_METADATA \ + --zone ${CLUSTER_ZONE} \ + --node-locations ${CLUSTER_NODE} \ + --num-nodes 0 \ + --min-nodes 0 \ + --max-nodes 2 \ + --enable-autoscaling \ + --machine-type n1-standard-8 \ + --accelerator count=4,type=nvidia-tesla-v100,gpu-driver-version=default +$ gcloud container clusters describe ${CLUSTER_NAME} --zone ${CLUSTER_ZONE} +$ kubectl get node +$ xdg-open https://console.cloud.google.com/kubernetes/clusters/details/${CLUSTER_ZONE}/${CLUSTER_NAME}/nodes\?project\=${PROJECT_ID}-> Create GPU T4 node pool with 2 GPUs + +-> Create GPU V100 node pool with 8 GPUs + +$ gcloud container node-pools create ${POOL_PREFIX}-v100-8 \ + --cluster ${CLUSTER_NAME} \ + --workload-metadata GKE_METADATA \ + --zone ${CLUSTER_ZONE} \ + --node-locations ${CLUSTER_NODE} \ + --num-nodes 0 \ + --min-nodes 0 \ + --max-nodes 2 \ + --enable-autoscaling \ + --machine-type n1-standard-8 \ + --accelerator count=8,type=nvidia-tesla-v100,gpu-driver-version=default +$ gcloud container clusters describe ${CLUSTER_NAME} --zone ${CLUSTER_ZONE} +$ kubectl get node +$ xdg-open https://console.cloud.google.com/kubernetes/clusters/details/${CLUSTER_ZONE}/${CLUSTER_NAME}/nodes\?project\=${PROJECT_ID} +``` + +Turn on [Google Artifact Registry](https://cloud.google.com/artifact-registry) on GCP's project. +See [Quick start](https://cloud.google.com/artifact-registry/docs/docker). + +Create docker artifact repository. + +```console +$ gcloud auth configure-docker ${CLUSTER_ZONE}-docker.pkg.dev +$ gcloud artifacts repositories create ${ARTIFACT_REGISTRY} \ + --repository-format=docker \ + --location=${CLUSTER_ZONE} +$ docker tag nvidia/cuda:11.0.3-runtime-ubuntu20.04 ${CLUSTER_ZONE}-docker.pkg.dev/${PROJECT_ID}/${ARTIFACT_REGISTRY}/nvidia/cuda:11.0.3-runtime-ubuntu20.04 +$ docker push ${CLUSTER_ZONE}-docker.pkg.dev/${PROJECT_ID}/${ARTIFACT_REGISTRY}/nvidia/cuda:11.0.3-runtime-ubuntu20.04 +$ xdg-open https://console.cloud.google.com/artifacts/docker/${PROJECT_ID}/${CLUSTER_ZONE}/${ARTIFACT_REGISTRY}\?project\=${PROJECT_ID} +$ cat << EOF > test-pod-from-artifactory.yml +apiVersion: v1 +kind: Pod +metadata: + name: test-pool-t4-from-artifactory +spec: + containers: + - name: my-gpu-container-from-artifactory + image: ${CLUSTER_ZONE}-docker.pkg.dev/${PROJECT_ID}/${ARTIFACT_REGISTRY}/nvidia/cuda:11.0.3-runtime-ubuntu20.04 + command: ["/bin/bash", "-c", "--"] + args: ["while true; do sleep 600; done;"] + resources: + limits: + nvidia.com/gpu: 2 + nodeSelector: + cloud.google.com/gke-accelerator: nvidia-tesla-t4 +EOF +$ kubectl apply -f test-pod-from-artifactory.yml +``` + +Create a [bucket](https://cloud.google.com/storage/docs/creating-buckets#storage-create-bucket-cli) +using the [standard storage class](https://cloud.google.com/storage/docs/storage-classes) +and configure the [Cloud Storage FUSE CSI driver](https://cloud.google.com/kubernetes-engine/docs/how-to/persistent-volumes/cloud-storage-fuse-csi-driver). + +```console +$ gcloud iam service-accounts create ${SA_GCLOUD} \ + --display-name="${PROJECT_NAME} Service Account" \ + --project=${PROJECT_ID} + +$ xdg-open https://console.cloud.google.com/iam-admin/serviceaccounts\?project\=${PROJECT_ID} + +$ gcloud storage buckets create gs://${BUCKET} \ + --project=${PROJECT_ID} \ + --default-storage-class=STANDARD \ + --location=${CLUSTER_ZONE} \ + --uniform-bucket-level-access + +$ xdg-open https://console.cloud.google.com/storage/browser\?project\=${PROJECT_ID} + +$ gcloud storage buckets add-iam-policy-binding gs://${BUCKET} \ + --member "serviceAccount:${SA_GCLOUD}@${PROJECT_ID}.iam.gserviceaccount.com" \ + --role "roles/storage.objectAdmin" + +$ gcloud projects add-iam-policy-binding ${PROJECT_ID} \ + --member "serviceAccount:${SA_GCLOUD}@${PROJECT_ID}.iam.gserviceaccount.com" \ + --role "roles/storage.objectAdmin" + +$ gcloud iam service-accounts add-iam-policy-binding ${SA_GCLOUD}@${PROJECT_ID}.iam.gserviceaccount.com \ + --role roles/iam.workloadIdentityUser \ + --member "serviceAccount:${PROJECT_ID}.svc.id.goog[default/${SA_KUBE}]" + +$ kubectl annotate serviceaccount ${SA_KUBE} \ + --namespace default \ + iam.gke.io/gcp-service-account=${SA_GCLOUD}@${PROJECT_ID}.iam.gserviceaccount.com +``` + +## Debug pods + +Connect to a pod with the following command. +Pods contain two containers, one running giotto-deep stuff, another running the sidecar for GKE fuse protocol. +It is thus necessary to indicate which pod and which container to connect to. + +```console +$ kubectl exec -it giotto-deep-plot -n default -c giotto-deep-plot -- /bin/bash +``` diff --git a/benchmark/benchmark.py b/benchmark/benchmark.py new file mode 100644 index 00000000..70832b19 --- /dev/null +++ b/benchmark/benchmark.py @@ -0,0 +1,659 @@ +import argparse +import csv +import dataclasses +import datetime +import enum +import math +import multiprocessing as pmp +import pathlib +import sys +import typing + +import matplotlib.pyplot as plt +import torch + +from gdeep.trainer.trainer import ParallelismType +from gdeep.utility_examples.fsdp import ShardingStrategyEx + +sys.path.append("../examples") +from examples import parallel_bert, parallel_orbit_5k + + +class Parallelism(enum.Enum): + none = enum.auto() + fsdp_full_shard = enum.auto() + fsdp_shard_grad_op = enum.auto() + fsdp_no_shard = enum.auto() + pipeline = enum.auto() + + def __str__(self): + return self.name + + @staticmethod + def from_string(s: str): + try: + return Parallelism[s] + except KeyError: + raise ValueError() + + def to_text(self): + if self is Parallelism.none: + return "None" + elif self is Parallelism.fsdp_full_shard: + return "FSDP Full Shard" + elif self is Parallelism.fsdp_shard_grad_op: + return "FSDP Shard Grad Op" + elif self is Parallelism.fsdp_no_shard: + return "FSDP No Shard" + elif self is Parallelism.pipeline: + return "Pipeline" + else: + return "?" + + def to_parallelism_type(self) -> ParallelismType: + if self is Parallelism.none: + return ParallelismType._NONE + elif self in ( + Parallelism.fsdp_full_shard, + Parallelism.fsdp_shard_grad_op, + Parallelism.fsdp_no_shard, + ): + return ParallelismType.FSDP + elif self is Parallelism.pipeline: + return ParallelismType.PIPELINE + else: + raise ValueError(f"Unknown {self}") + + def to_sharding_strategy(self) -> ShardingStrategyEx: + if self is Parallelism.fsdp_full_shard: + return ShardingStrategyEx.FULL_SHARD + elif self is Parallelism.fsdp_shard_grad_op: + return ShardingStrategyEx.SHARD_GRAD_OP + elif self is Parallelism.fsdp_no_shard: + return ShardingStrategyEx.NO_SHARD + else: + return ShardingStrategyEx.SHARD_GRAD_OP + + def colour(self) -> str: + # https://matplotlib.org/stable/gallery/color/named_colors.html#css-colors + if self is Parallelism.none: + return "blue" + elif self is Parallelism.fsdp_full_shard: + return "darkorange" + elif self is Parallelism.fsdp_shard_grad_op: + return "green" + elif self is Parallelism.fsdp_no_shard: + return "magenta" + elif self is Parallelism.pipeline: + return "red" + else: + return "black" + + +class Models(enum.Enum): + none = enum.auto() + orbit5k = enum.auto() + orbit5kbig = enum.auto() + bert = enum.auto() + bertbig = enum.auto() + + def __str__(self): + return self.name + + @staticmethod + def from_string(s: str): + try: + return Models[s] + except KeyError: + raise ValueError() + + +class RunData: + CSV_FIELDS = [ + "start_time", + "end_time", + "run_time", + "model", + "parallel", + "epochs", + "batch_size", + "loss", + "accuracy", + "gpu_count", + "gpu_model", + ] + + def __init__( + self, + start_time: datetime.datetime, + end_time: datetime.datetime, + model: Models, + parallel: Parallelism, + epochs: int, + batch_size: int, + loss: float, + accuracy: float, + gpu_count: int, + gpu_model: str, + ): + self.start_time = start_time + self.end_time = end_time + self.run_time = end_time - start_time + self.model = model + self.parallel = parallel + self.epochs = epochs + self.batch_size = batch_size + self.loss = loss + self.accuracy = accuracy + self.gpu_count = gpu_count + self.gpu_model = gpu_model + + @classmethod + def load( + cls, + start_time: str, + end_time: str, + model: str, + parallel: str, + epochs: str, + batch_size: str, + loss: str, + accuracy: str, + gpu_count: str, + gpu_model: str, + ): + return cls( + datetime.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S.%f"), + datetime.datetime.strptime(end_time, "%Y-%m-%d %H:%M:%S.%f"), + Models.from_string(model), + Parallelism.from_string(parallel), + int(epochs), + int(batch_size), + float(loss), + float(accuracy), + int(gpu_count), + gpu_model, + ) + + def rt_mms(self): + return self.run_time.total_seconds() + + def write_row(self, writer: csv.DictWriter): + values = {x: None for x in RunData.CSV_FIELDS} + values[RunData.CSV_FIELDS[0]] = str(self.start_time) + values[RunData.CSV_FIELDS[1]] = str(self.end_time) + values[RunData.CSV_FIELDS[2]] = str(self.rt_mms()) + values[RunData.CSV_FIELDS[3]] = self.model + values[RunData.CSV_FIELDS[4]] = self.parallel + values[RunData.CSV_FIELDS[5]] = self.epochs + values[RunData.CSV_FIELDS[6]] = self.batch_size + values[RunData.CSV_FIELDS[7]] = self.loss + values[RunData.CSV_FIELDS[8]] = self.accuracy + values[RunData.CSV_FIELDS[9]] = self.gpu_count + values[RunData.CSV_FIELDS[10]] = self.gpu_model + writer.writerow(values) + + @staticmethod + def write_header(fp) -> csv.DictWriter: + writer = csv.DictWriter(fp, dialect="unix", fieldnames=RunData.CSV_FIELDS) + writer.writeheader() + return writer + + def __repr__(self): + return ( + f"{self.__class__.__name__}({self.start_time}, {self.end_time}, {self.model}" + f", {self.parallel}, {self.epochs}, {self.batch_size}, {self.gpu_model}, {self.gpu_count}, {self.accuracy})" + ) + + def same(self, o: "RunData") -> bool: + return all( + [ + self.model == o.model, + self.parallel == o.parallel, + self.batch_size == o.batch_size, + self.gpu_count == o.gpu_count, + self.gpu_model == o.gpu_model, + ] + ) + + def gt(self, o: "RunData") -> bool: + return self.end_time > o.end_time + + +@dataclasses.dataclass +class RunResult: + start_time: datetime.datetime + end_time: datetime.datetime + loss: float + accuracy: float + + +BATCH_SIZE_VALUES = (1, 2, 4, 8, 16, 32, 64) + +# https://matplotlib.org/stable/gallery/lines_bars_and_markers/marker_reference.html +# https://matplotlib.org/stable/gallery/lines_bars_and_markers/linestyles.html +PLOT_LINES = [ + "solid", + "dashed", + "dotted", + "dashdot", +] +PLOT_MARKERS = [ + "x", + "+", + ".", + "*", + "H", + "s", +] +PLOT_IMG_WIDTH = 10 +PLOT_IMG_HEIGHT = 5 +PLOT_IMG_MARGIN_LEFT = 0.1 + + +def gen_plot_lines(): + for e in PLOT_LINES: + yield e + + +def gen_plot_markers(): + for e in PLOT_MARKERS: + yield e + + +def device_name(model: str, count: int) -> str: + return f"{count} {model}" + + +def device_filename(model: str, count: int) -> str: + return f"{model} {count}".lower().replace(" ", "-") + + +def nofn(args): + return 0, 0 + + +def wrap(fn, args, q): + start_time = datetime.datetime.now() + loss, accuracy = fn(args) + end_time = datetime.datetime.now() + q.put(RunResult(start_time, end_time, loss, accuracy)) + + +def identity(string): + return string + + +def run_training( + model: Models, + parallel: Parallelism, + batch_size: int, + epochs: int, + device_name: str, + device_count: int, + device_model: str, +) -> RunData: + args = argparse.ArgumentParser() + args.register("type", None, identity) + fn = nofn + + if model is Models.none: + pass + elif model in (Models.orbit5k, Models.orbit5kbig): + if model is Models.orbit5kbig: + batch_size = 4 + args.big_model = True + else: + args.big_model = False + args.batch_size = batch_size + args.n_epochs = epochs + args.parallel = parallel.to_parallelism_type() + args.sharding = parallel.to_sharding_strategy() + fn = parallel_orbit_5k.main + elif model in (Models.bert, Models.bertbig): + if model is Models.bertbig: + args.big_model = True + else: + args.big_model = False + args.batch_size = batch_size + args.n_epochs = epochs + args.parallel = parallel.to_parallelism_type() + args.sharding = parallel.to_sharding_strategy() + args.download = False + parallel_bert.download_dataset() + fn = parallel_bert.main + + sys.stdout.write( + "++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++\n" + ) + sys.stdout.write( + f"BENCHMARK RUNNING ON {device_name}... parallelism {parallel} with batch size {batch_size}...\n" + ) + sys.stdout.flush() + + # Spawn a new python interpreter to ensure a nice release of resources + mp = pmp.get_context("spawn") + rq = mp.SimpleQueue() + process = mp.Process(target=wrap, args=(fn, args, rq), daemon=False) + process.start() + process.join() + if process.exitcode != 0: + raise Exception(f"Train process exited with exitcode {process.exitcode}") + r = rq.get() + + # if model is not Models.none and parallel is Parallelism.pipeline: + # torch.distributed.rpc.shutdown() + + return RunData( + r.start_time, + r.end_time, + model, + parallel, + epochs, + batch_size, + r.loss, + r.accuracy, + device_count, + device_model, + ) + + +def uniq(data: typing.List[RunData]): + """Keep the most recent elements of each class. + + Some elements of the list may be of the same class but of different generation + time, e.g. some benchmark runs that were restarted. + """ + data2 = [] + idx = 0 + # parse every element in the list (unless those removed during the process) + while idx < len(data): + jdx = idx + 1 + keep = data[idx] # set current data as kept + # parse every further element in the list (unless those removed during the process) + while jdx < len(data): + # if the currently kept element and the current element are of the same "class" ... + if data[jdx].same(keep): + # ... compare if the current element is greater than the kept one ... + if data[jdx].gt(keep): + # ... and keep and remove the current element if it is greater + keep = data.pop(jdx) + else: + # ... or only remove the current element if it is not greater + del data[jdx] + else: + jdx += 1 + data2.append(keep) + idx += 1 + return data2 + + +def plot_training(run_data: typing.List[RunData], imgfile: pathlib.Path, dev_name: str): + plt_data = {} + for d in run_data: + if d.parallel not in plt_data: + plt_data[d.parallel] = {} + plt_data[d.parallel][d.batch_size] = d.rt_mms() + + fig = plt.figure(figsize=(PLOT_IMG_WIDTH, PLOT_IMG_HEIGHT)) + ax = fig.add_subplot(1, 1, 1) + plots = [] + legends = [] + for parallel, v in plt_data.items(): + (p,) = ax.plot( + v.keys(), + v.values(), + linestyle=PLOT_LINES[0], + linewidth=1.5, + color=parallel.colour(), + marker=PLOT_MARKERS[0], + ) + plots.append(p) + legends.append(parallel.to_text()) + + ax.legend(plots, legends, loc="upper right") + ax.set_title(f"{d.model} -- Run time per batch size -- {dev_name}") + ax.set_xlabel("Batch size") + ax.set_ylabel("Run time [s]") + fig.subplots_adjust(left=PLOT_IMG_MARGIN_LEFT) + plt.savefig(str(imgfile)) + + +def plot_csv( + run_data: typing.List[RunData], + img_dir: pathlib.Path, + now: datetime.datetime, +): + template = f"plot-{now.strftime('%Y-%m-%d-%H-%M-%S')}" + data = {} + for d in run_data: + if d.model not in data: + data[d.model] = {} + if d.gpu_model not in data[d.model]: + data[d.model][d.gpu_model] = {} + if d.gpu_count not in data[d.model][d.gpu_model]: + data[d.model][d.gpu_model][d.gpu_count] = {} + if d.parallel not in data[d.model][d.gpu_model][d.gpu_count]: + data[d.model][d.gpu_model][d.gpu_count][d.parallel] = {} + data[d.model][d.gpu_model][d.gpu_count][d.parallel][d.batch_size] = d.rt_mms() + + # Plot parallelism for model/gpu-model/gpu-count + for model, v_model in data.items(): + for gpu_model, v_gpu_model in v_model.items(): + for gpu_n, v_gpu_n in v_gpu_model.items(): + fig = plt.figure(figsize=(PLOT_IMG_WIDTH, PLOT_IMG_HEIGHT)) + ax = fig.add_subplot(1, 1, 1) + plots = [] + legends = [] + for parallel, values in v_gpu_n.items(): + (p,) = ax.plot( + values.keys(), + values.values(), + linestyle=PLOT_LINES[0], + linewidth=1.5, + color=parallel.colour(), + marker=PLOT_MARKERS[0], + ) + plots.append(p) + legends.append(parallel.to_text()) + ax.legend(plots, legends, loc="upper right") + ax.set_title( + f"{model} -- Run time per batch size -- {gpu_n} {gpu_model}" + ) + ax.set_xlabel("Batch size") + ax.set_ylabel("Run time [s]") + fig.subplots_adjust(left=PLOT_IMG_MARGIN_LEFT) + img_name = ( + template + f"-{model}-{device_filename(gpu_model, gpu_n)}.png" + ) + plt.savefig(str(img_dir.joinpath(img_name))) + + # Plot parallelism for model/gpu-model/gpu-count + linestyles = {} + markers = {} + gen_lines = gen_plot_lines() + gen_markers = gen_plot_markers() + for model, v_model in data.items(): + fig = plt.figure(figsize=(PLOT_IMG_WIDTH, PLOT_IMG_HEIGHT)) + ax = fig.add_subplot(1, 1, 1) + plots = [] + legends = [] + for gpu_model, v_gpu_model in v_model.items(): + if gpu_model not in linestyles: + linestyles[gpu_model] = next(gen_lines) + for gpu_n, v_gpu_n in v_gpu_model.items(): + if gpu_n not in markers: + markers[gpu_n] = next(gen_markers) + for parallel, values in v_gpu_n.items(): + (p,) = ax.plot( + values.keys(), + values.values(), + linestyle=linestyles[gpu_model], + linewidth=1.5, + color=parallel.colour(), + marker=markers[gpu_n], + ) + plots.append(p) + legends.append(f"{parallel.to_text()}, {gpu_n} {gpu_model}") + ax.legend(plots, legends, loc="upper right") + ax.set_title(f"{model} -- Run time per batch size") + ax.set_xlabel("Batch size") + ax.set_ylabel("Run time [s]") + fig.subplots_adjust(left=PLOT_IMG_MARGIN_LEFT) + img_name = template + f"-{model}.png" + plt.savefig(str(img_dir.joinpath(img_name))) + + +def main_plot(args): + data = [] + if args.files is not None: + files = args.files + else: + files = pathlib.Path(args.csvdir).glob("*.csv") + for csvfile in files: + with open(csvfile, "r") as csvfp: + skip = True + reader = csv.reader(csvfp, dialect="unix") + for row in reader: + if skip: + skip = False + else: + data.append( + RunData.load( + row[0], + row[1], + row[3], + row[4], + row[5], + row[6], + row[7], + row[8], + row[9], + row[10], + ) + ) + data = uniq(data) + plot_csv(data, pathlib.Path(csvfile).parent, datetime.datetime.now()) + + +def main_run(args): + # Get GPU data + use_cuda = torch.cuda.is_available() + if use_cuda: + dev_count = torch.cuda.device_count() + dev_model = torch.cuda.get_device_name(0) + else: + dev_count = 0 + dev_model = "cpu" + dev_name = device_name(dev_model, dev_count) + dev_filename = device_filename(dev_model, dev_count) + + # Parse arguments + min_exp_batch_size = int(math.log2(min(args.batch_size))) + max_exp_batch_size = int(math.log2(max(args.batch_size))) + filename_template = f"benchmark-{args.model}-{dev_filename}-{datetime.datetime.now().strftime('%Y-%m-%d-%H-%M-%S')}" + + # Setup result files + dir = pathlib.Path(args.csvdir) + dir.mkdir(exist_ok=True) + csvfile = dir.joinpath(f"{filename_template}.csv") + pltfile = dir.joinpath(f"{filename_template}.png") + sys.stdout.write(f"BENCHMARK RUNNING ON {dev_name}...\n") + sys.stdout.flush() + with open(csvfile, "w", newline="") as csvfp: + csvw = RunData.write_header(csvfp) + + # Run trainings + data = [] + for parallel in args.parallel: + for exp_batch_size in range(min_exp_batch_size, max_exp_batch_size + 1): + try: + run_data = run_training( + args.model, + parallel, + int(math.pow(2, exp_batch_size)), + args.n_epochs, + dev_name, + dev_count, + dev_model, + ) + run_data.write_row(csvw) + data.append(run_data) + except Exception as e: + sys.stdout.write(f"BENCHMARK RUN FAILED: {e}\n") + sys.stdout.flush() + + # Plot trainings data + if len(data): + plot_training(data, pltfile, dev_name) + + # End of script + sys.stdout.write( + f"BENCHMARK DONE.\nLOG FILE IS {csvfile}\nPLOT FILE IS {pltfile}\n" + ) + sys.stdout.flush() + + +def main(): + parser = argparse.ArgumentParser(description="Giotto-deep benchmark tool") + + subparsers = parser.add_subparsers(required=True) + + parser_run = subparsers.add_parser("run", help="Run a benchmark") + parser_run.set_defaults(func=main_run) + parser_run.add_argument( + "-m", + "--model", + required=True, + type=Models.from_string, + choices=[x for x in Models], + help="Model to run", + ) + parser_run.add_argument( + "-p", + "--parallel", + type=Parallelism.from_string, + choices=[x for x in Parallelism], + nargs="+", + default=Parallelism.none, + help="Parallelism type(s); default is %(default)s", + ) + parser_run.add_argument( + "-b", + "--batch-size", + type=int, + nargs=2, + choices=BATCH_SIZE_VALUES, + default=(4, 4), + metavar=("MINVAL", "MAXVAL"), + help=f"Batch size range; possible values are {BATCH_SIZE_VALUES}; default is %(default)s", + ) + parser_run.add_argument( + "-n", + "--n-epochs", + type=int, + default=3, + metavar="N", + help="Number of epochs; default is %(default)s", + ) + parser_run.add_argument("-d", "--csvdir", required=True, help="CSV files directory") + + parser_plot = subparsers.add_parser("plot", help="Plot benchmark results") + parser_plot.set_defaults(func=main_plot) + grp1 = parser_plot.add_mutually_exclusive_group(required=True) + grp1.add_argument( + "-d", "--csvdir", help="CSV files directory; use all files in the directory" + ) + grp1.add_argument( + "-f", + "--files", + nargs="*", + help="CSV files to plot; use these specific files; require full path", + ) + + args = parser.parse_args() + args.func(args) + + +if __name__ == "__main__": + main() diff --git a/benchmark/generate_pods.py b/benchmark/generate_pods.py new file mode 100644 index 00000000..894ba25b --- /dev/null +++ b/benchmark/generate_pods.py @@ -0,0 +1,142 @@ +import argparse +import enum +import string + +import benchmark + + +class GPUs(enum.Enum): + a100 = enum.auto() + v100 = enum.auto() + t4 = enum.auto() + + def __str__(self): + return self.name + + @staticmethod + def from_string(s: str): + try: + return GPUs[s] + except KeyError: + raise ValueError(f"Unknown GPU {s}") + + def fullname(self) -> str: + if self is GPUs.a100: + return "nvidia-tesla-a100" + elif self is GPUs.v100: + return "nvidia-tesla-v100" + elif self is GPUs.t4: + return "nvidia-tesla-t4" + else: + raise Exception(f"fullname missing for {self.name}") + + +def main(): + parser = argparse.ArgumentParser() + subparsers = parser.add_subparsers(required=True) + + parser.add_argument("-i", "--image", required=True, help="Container image") + parser.add_argument("-b", "--bucket", required=True, help="Storage bucket") + parser.add_argument("-s", "--ksa", required=True, help="Kubernetes Service Account") + parser.add_argument( + "-d", "--dir", default="", help="Subdirectory in the bucket to store/read data" + ) + + parser_run = subparsers.add_parser("run", help="Generate pods to run benchmarks") + parser_run.set_defaults(func=main_run) + parser_run.add_argument( + "-c", "--gpu-count", required=True, type=int, nargs="*", help="GPU count" + ) + parser_run.add_argument( + "-g", + "--gpu-model", + required=True, + type=GPUs.from_string, + choices=[x for x in GPUs], + help="GPU model", + ) + parser_run.add_argument( + "-m", + "--model", + required=True, + type=benchmark.Models.from_string, + choices=[x for x in benchmark.Models], + help="Model", + ) + parser_run.add_argument( + "-p", + "--parallel", + required=True, + type=benchmark.Parallelism.from_string, + choices=[x for x in benchmark.Parallelism], + nargs="+", + help="Parallelism type(s)", + ) + parser_run.add_argument( + "-z", + "--batch-size", + type=int, + nargs=2, + choices=benchmark.BATCH_SIZE_VALUES, + metavar=("MINVAL", "MAXVAL"), + help=f"Batch size range; possible values are {benchmark.BATCH_SIZE_VALUES}", + ) + + parser_plot = subparsers.add_parser( + "plot", help="Generate pods to plot benchmarks results" + ) + parser_plot.set_defaults(func=main_plot) + + args = parser.parse_args() + args.func(args) + + +def main_run(args): + values = { + "image": args.image, + "bucket": args.bucket, + "ksa": args.ksa, + "subdir": args.dir, + "gpu_count": 0, + "gpu_model": args.gpu_model.fullname(), + "gpu_name": str(args.gpu_model), + "model": str(args.model), + "parallel": ", ".join([f'"{x}"' for x in args.parallel]), + "batch_size": "", + } + + with open("pod-template-run.yml", "r") as fp: + ymlt = string.Template(fp.read()) + + if args.batch_size is not None: + args.batch_size.insert(0, "-b") + values["batch_size"] = ", " + ", ".join([f'"{x}"' for x in args.batch_size]) + + for gpu_count in args.gpu_count: + values["gpu_count"] = gpu_count + ymlv = ymlt.substitute(values) + filename = f"run-{args.model}-{args.gpu_model}-{gpu_count}.yml" + with open(filename, "w") as fp: + fp.write(ymlv) + print(f"kubectl apply -f {filename}") + + +def main_plot(args): + values = { + "image": args.image, + "bucket": args.bucket, + "ksa": args.ksa, + "subdir": args.dir, + } + + with open("pod-template-plot.yml", "r") as fp: + ymlt = string.Template(fp.read()) + ymlv = ymlt.substitute(values) + filename = "plot.yml" + with open(filename, "w") as fp: + fp.write(ymlv) + print(f"kubectl apply -f {filename}") + + +if __name__ == "__main__": + main() diff --git a/benchmark/pod-template-plot.yml b/benchmark/pod-template-plot.yml new file mode 100644 index 00000000..d75d7495 --- /dev/null +++ b/benchmark/pod-template-plot.yml @@ -0,0 +1,28 @@ +apiVersion: v1 +kind: Pod +metadata: + name: giotto-deep-plot + namespace: default + labels: + env: giotto-deep + app: giotto-deep-plot + annotations: + gke-gcsfuse/volumes: "true" +spec: + terminationGracePeriodSeconds: 60 + volumes: + - name: gcs-fuse-csi-ephemeral + csi: + driver: gcsfuse.csi.storage.gke.io + volumeAttributes: + bucketName: $bucket + containers: + - name: giotto-deep-plot + image: $image + args: ["plot", "-d", "/var/lib/data/$subdir"] + volumeMounts: + - mountPath: "/var/lib/data" + name: gcs-fuse-csi-ephemeral + imagePullPolicy: Always + serviceAccountName: $ksa + restartPolicy: Never diff --git a/benchmark/pod-template-run.yml b/benchmark/pod-template-run.yml new file mode 100644 index 00000000..12c9b90c --- /dev/null +++ b/benchmark/pod-template-run.yml @@ -0,0 +1,39 @@ +apiVersion: v1 +kind: Pod +metadata: + name: giotto-deep-benchmark-$model-$gpu_name-$gpu_count + namespace: default + labels: + env: giotto-deep + app: giotto-deep-benchmark + annotations: + gke-gcsfuse/volumes: "true" +spec: + terminationGracePeriodSeconds: 60 + volumes: + - name: gcs-fuse-csi-ephemeral + csi: + driver: gcsfuse.csi.storage.gke.io + volumeAttributes: + bucketName: $bucket + - name: shared-memory + emptyDir: + medium: Memory + sizeLimit: 16Gi + containers: + - name: giotto-deep-benchmark-$model-$gpu_name-$gpu_count + image: $image + args: ["run", "-m", "$model", "-d", "/var/lib/data/$subdir", "-p", $parallel$batch_size] + resources: + limits: + nvidia.com/gpu: $gpu_count + volumeMounts: + - mountPath: "/var/lib/data" + name: gcs-fuse-csi-ephemeral + - mountPath: /dev/shm + name: shared-memory + imagePullPolicy: Always + serviceAccountName: $ksa + restartPolicy: Never + nodeSelector: + cloud.google.com/gke-accelerator: $gpu_model diff --git a/benchmark/requirements.txt b/benchmark/requirements.txt new file mode 100644 index 00000000..6ccafc3f --- /dev/null +++ b/benchmark/requirements.txt @@ -0,0 +1 @@ +matplotlib diff --git a/docs/source/_img/giotto_trainer_fsdp.png b/docs/source/_img/giotto_trainer_fsdp.png new file mode 100644 index 00000000..d2fa8f9b Binary files /dev/null and b/docs/source/_img/giotto_trainer_fsdp.png differ diff --git a/docs/source/_img/plot-2023-11-23-12-16-58-bert-nvidia-geforce-rtx-3090-2.png b/docs/source/_img/plot-2023-11-23-12-16-58-bert-nvidia-geforce-rtx-3090-2.png new file mode 100644 index 00000000..6767d24e Binary files /dev/null and b/docs/source/_img/plot-2023-11-23-12-16-58-bert-nvidia-geforce-rtx-3090-2.png differ diff --git a/docs/source/_img/plot-2023-11-23-12-16-58-bert-tesla-t4-2.png b/docs/source/_img/plot-2023-11-23-12-16-58-bert-tesla-t4-2.png new file mode 100644 index 00000000..d1bef660 Binary files /dev/null and b/docs/source/_img/plot-2023-11-23-12-16-58-bert-tesla-t4-2.png differ diff --git a/docs/source/_img/plot-2023-11-23-12-16-58-bert-tesla-t4-4.png b/docs/source/_img/plot-2023-11-23-12-16-58-bert-tesla-t4-4.png new file mode 100644 index 00000000..5a29ea52 Binary files /dev/null and b/docs/source/_img/plot-2023-11-23-12-16-58-bert-tesla-t4-4.png differ diff --git a/docs/source/_img/plot-2023-11-23-12-16-58-bert-tesla-v100-sxm2-16gb-2.png b/docs/source/_img/plot-2023-11-23-12-16-58-bert-tesla-v100-sxm2-16gb-2.png new file mode 100644 index 00000000..cf8376cd Binary files /dev/null and b/docs/source/_img/plot-2023-11-23-12-16-58-bert-tesla-v100-sxm2-16gb-2.png differ diff --git a/docs/source/_img/plot-2023-11-23-12-16-58-bertbig-tesla-t4-2.png b/docs/source/_img/plot-2023-11-23-12-16-58-bertbig-tesla-t4-2.png new file mode 100644 index 00000000..011f58ad Binary files /dev/null and b/docs/source/_img/plot-2023-11-23-12-16-58-bertbig-tesla-t4-2.png differ diff --git a/docs/source/_img/plot-2023-11-23-12-16-58-orbit5k-nvidia-geforce-rtx-3090-2.png b/docs/source/_img/plot-2023-11-23-12-16-58-orbit5k-nvidia-geforce-rtx-3090-2.png new file mode 100644 index 00000000..a75c1b4c Binary files /dev/null and b/docs/source/_img/plot-2023-11-23-12-16-58-orbit5k-nvidia-geforce-rtx-3090-2.png differ diff --git a/docs/source/_img/plot-2023-11-23-12-16-58-orbit5k-tesla-v100-sxm2-16gb-2.png b/docs/source/_img/plot-2023-11-23-12-16-58-orbit5k-tesla-v100-sxm2-16gb-2.png new file mode 100644 index 00000000..d30ee34f Binary files /dev/null and b/docs/source/_img/plot-2023-11-23-12-16-58-orbit5k-tesla-v100-sxm2-16gb-2.png differ diff --git a/docs/source/_img/plot-2023-11-23-12-16-58-orbit5k-tesla-v100-sxm2-16gb-8.png b/docs/source/_img/plot-2023-11-23-12-16-58-orbit5k-tesla-v100-sxm2-16gb-8.png new file mode 100644 index 00000000..8e5cd626 Binary files /dev/null and b/docs/source/_img/plot-2023-11-23-12-16-58-orbit5k-tesla-v100-sxm2-16gb-8.png differ diff --git a/docs/source/conf.py b/docs/source/conf.py index 9900ab0e..26efbf38 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -42,6 +42,7 @@ # This pattern also affects html_static_path and html_extra_path. exclude_patterns = [] # type: ignore +numfig = True # -- Options for HTML output ------------------------------------------------- diff --git a/docs/source/index.rst b/docs/source/index.rst index 2af490c5..5ca8c2d3 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -13,6 +13,7 @@ Welcome to giotto-deep's documentation! introduction installation distributed + parallel modules/index References diff --git a/docs/source/parallel.rst b/docs/source/parallel.rst new file mode 100644 index 00000000..3fe05cab --- /dev/null +++ b/docs/source/parallel.rst @@ -0,0 +1,255 @@ +.. _FSDP documentation: https://pytorch.org/docs/stable/fsdp.html +.. _FSDP tutorial: https://pytorch.org/tutorials/intermediate/FSDP_tutorial.html#how-fsdp-works +.. _Advanced FSDP tutorial: https://pytorch.org/tutorials/intermediate/FSDP_adavnced_tutorial.html +.. _FSDP wrappers: https://github.com/pytorch/pytorch/blob/main/torch/distributed/fsdp/wrap.py +.. _Python pickler guidelines: https://docs.python.org/3/library/pickle.html#what-can-be-pickled-and-unpickled +.. _Data Parallelism: https://lightning.ai/courses/deep-learning-fundamentals/9.0-overview-techniques-for-speeding-up-model-training/unit-9.3-deep-dive-into-data-parallelism/ +.. _ZeRO: https://arxiv.org/abs/1910.02054 + +.. _parallel: + +##################### +Parallel training +##################### + +************ +Introduction +************ + +When multiple GPUs are available, it is possible to use more than one to train the same model. Currently, two methods of multi-GPU training are available in giotto-deep and provide different benefits: `Data parallelism`_ and Pipeline parallelism + +********************** +Data parallelism: FSDP +********************** + +Data parallelism consists in training multiple copies of the model on partitions of the dataset. For example: one GPU may be responsible for training one copy of the model on one half of the training dataset while another trains another copy on the second half of the dataset. At the end or multiple times during a training epoch, both copies are merged to obtain a new model technically trained on the whole dataset. This method relies on the assumption that the merging of the half-trained models is fast enough and results in good enough improvements to compensate the half-training. The principle is the same for more GPUs but the performance may not improve significantly past a certain point. This method is called Distributed Data Parallelism (DDP) + +A more complex method consists in sharding the model to distribute it across multiple GPUs. During the training, one GPU can request the shard it needs from the GPU responsible for this shard, do its calculations and then discard the shard. Once the gradients for this shard is calculated in the backward pass, the shard is retrieved again, calculations are done and the updated shard is sent back to the responsible GPU (as explained in the `FSDP tutorial`_). More communication between the GPUs is required as a result but this approach results in lower peak memory use. This algorithm is called `ZeRO`_ and exists in 2 variants in pytorch's FSDP feature: ZERO2 (SHARD_GRAD_OP) doesn't discard the shard after the forward pass and thus saves time in communication but requires more memory, ZERO3 (FULL_SHARD) discards the shard everytime after it is done with its calculations and thus requires more communication but less memory. In itself, using one of those 2 algorithms may not result in better training time but the freed memory can be used to increase the batch size (which usually results in faster epochs) or increase the size of the model + +Some optimisations allow those algorithms to increase in performance even more: +* Mixed precision: Converts the weights, gradients or the transmitted data to a lower precision to speed up calculation (potentially using the hardware support of the GPU) or transmission of data between the GPUs +* Backward prefetch: Allows shards to be recovered in different ways to optimise memory usage or performance +* ... +More optimisations are discussed in the `Advanced FSDP tutorial`_ + +The Data parallelism algorithms and optimisations to use are very dependant on the model and training method but can sensibly improve the training time when chosen and configured correctly. + +=============================== +Implementation into giotto-deep +=============================== + +Those algorithms are implemented into giotto-deep using pytorch's FSDP tool. The implementation's architecture is explained in the diagram below. + +.. image:: _img/giotto_trainer_fsdp.png + +What the diagram shows is that for each device used for the training, giotto-deep's Trainer will create a new process that executes a subinstance of Trainer. The members of the base instance (model, dataloaders,...) are deepcopied into the subinstances so each subinstance may work on its members without affecting the other processes. Contrary to a classic copy (sometimes called shallow copy) that only copies the references present in the object being copied, deep copies copy the memory content of the object into new memory. This process is more complex as it is recursive (referenced objects must be deepcopied too) but allow for truly independant copies that do not risk being invalidated by changes made to the original object. The training occurs on each process on its dedicated device using the part of the dataloader assigned to it thanks to a sampler. Once the training is complete, the partially trained models are "aggregated" to form a new model technically trained on the whole dataset. The model is then retrieved by the subprocess with rank 0 and stored in a temporary file where it is recovered by the base instance to update the model. + +This architecture was selected because it was the best compromise between the amount of changes that needed to be made to the API and inner workings of giotto-deep, the amount of changes necessary in the user's code and the flexibility in FSDP's confguration. Less changes would have been required to the Trainer had we chosen to simply make it compatible with torchrun (pytorch utility script to launch a python script in multiple processes) but some adaptations still would have been necessary not to break necessary features and this puts more work in the hands of the user. A simpler Trainer API to support parallelisation is possible but it comes at the expense of configurability which plays a crucial role in the performance of FSDP + +However, the implementation choices made come with a few limitations: + +* Most examples of "native" FSDP (FSDP not used in a library) that can be found online show the model and dataset being instanced in each subprocess. This allows each subprocess to possess an independant instance of the model and dataloader. However, due to giotto-deep's API which expects to be given already instanced model and dataset, this wasn't possible. +* In order to use FSDP within giotto-deep nonetheless, we had to deepcopy and send the received instances of the model and dataloaders to each subprocess. This means that the dataloaders and models MUST be serialisable using pytorch's pickler (`Python pickler guidelines`_). +* Currently, some features of Pytorch (ex:Map_style_dataset) and giotto-deep (ex:TransformingDataset) aren't serialisable and thus cannot be used as is when trying to train a model with FSDP through giotto-deep. + +=============================== +Using FSDP with giotto-deep +=============================== + +To use one of those algorithms, import and instantiate the `Parallelism` class with the following informations: + +* p_type: The algorithm to use, defined in the `ParallelismType` enum + + * `PIPELINE` + * `FSDP` +* devices: List of the GPUs available on the machine. The list can be generated using `list(range(torch.cuda.device_count()))`. If no list is provided, the class will look for the devices itself +* nb_device: The actual number of GPUs from the devices list to use for the training. Not providing this parameter or providing a value smaller than 1 results in all the devices (found or provided) to be used. Values higher than the number of devices (found or provided) will result in an error +* config_fsdp: Dictionnary containing the arguments for the instantiation of FullyShardedDataParallel as per the official `FSDP documentation`_. This allows the user to configure FSDP as he wishes. The device_id parameter of FSDP is automatically handled + +The instance can then be given to the `parallel` argument of the `train` function + +.. code-block:: + + # FSDP not used for training + valloss, valacc = train.train(SGD, + args.n_epochs, + args.cv, + {"lr": 0.001, "momentum": 0.9}) + + # FSDP configured and used for training + devices = list(range(torch.cuda.device_count())) + + config_fsdp = { + "sharding_strategy": ShardingStrategy.SHARD_GRAD_OP, + "auto_wrap_policy": always_wrap_policy, + } + + + parallelism = Parallelism(ParallelismType.FSDP, + devices, + len(devices), + config_fsdp=config_fsdp) + + valloss, valacc = train.train(SGD, + args.n_epochs, + args.cv, + {"lr": 0.001, "momentum": 0.9}, + parallel=parallelism) + +FSDP in giotto-deep works with profiling and cross-validation but not with parallel TPUs. + +.. warning:: + As FSDP uses multiprocessing, it is necessary to use the idiom `if __name__ == __main__:` for the main code. This also implies that the model and datasets should be serialisable (which is not the case of 'to_map_style_dataset' datasets for example) + +.. warning:: + When using FSDP with a sharding strategy that isn't `NO_SHARD` (DDP), always provide a wrapper found in `FSDP wrappers`_ or an appropriate Callable. Wrappers are Callables that take a module and return a boolean to indicate if this module should be sharded based on some rule. Not using any wrapper when trying to shard will behave as if no parallelism is used at all + +.. note:: + When using FSDP, the batch size given to the dataloader is used by each GPU. For example, using a batch size of 4 with FSDP and 2 GPUs effectively corresponds to using a batch size of 8 without FSDP + +.. note:: + FSDP's ability to accelerate the training of a model depends on the model and FSDP's configuration. Giotto-deep provides a working (although with some caveats) implementation but some modifications to the model as well as some trial and error with the configuration may be needed in order to fully profit off its capabilities + +========================= +Compatibility adaptations +========================= + +Due to some constraints posed by giotto-deep's API, some models aren't compatible as is with giotto-deep and/or its FSDP implementation. Two such examples are +* Huggingface's T5 model: giotto-deep expects the model to take what comes out of the dataset as is (if a simple tensor is given) or in order (if a list of tensor is given) but this model can do different things depending on which parameters are fed in its forward method. This implies that the received list of tensor may be given to the first, third and sixth parameter of the forward method (for example). Giotto-deep doesn't allow such "argument juggling". Moreover, giotto-deep expects the loss of a given prediction to be calculated by giotto-deep itself using a provided loss_fn function. T5, however, provides the loss for the current prediction as a member of the dictionnary returned by the forward method. +* Giotto-deep's QATransformer model: This model heavily relies on some features that aren't serializable. However, our implementation of FSDP into giotto-deep relies on serialisation due to giotto-deep's Trainer API that requires the provided datasets and model to be already instanced. FSDP, on the other hand, needs to do its work on different instances of the dataset and model, so a lot of examples show each process instantiating their own copy of the model and dataset. In our implementation, we decided to deepcopy the Trainer's parameters and send them to each process to make sure each process has an independant copy of what it needs but this requires the used features and classes to be serializable and a lot of them aren't in this example + +However, in order to provide more example models for FSDP, attempts were made to adapt giotto-deep or even pytorch to respect those constraints. Each modification made for those models are listed here (as well as their result) in the hope that they may provide some insight into what needs to be done going forward for the development of giotto-deep. + +* T5 + * **Trainer's ``__init__``**: Allow for an optionnal loss_fn. The absence of loss function serves as condition in the rest of the code to detect that we are training the T5 model + * **Method ``_send_to_device``**: When given a list of tensors without loss_fn, send tensors 0, 1 and 2 to ``input_ids``, ``attention_mask`` and ``labels`` respectively + * **Method ``_inner_train_loop``**: After ``_send_to_device`` when training T5, ``pred`` is a dict which contains ``logits`` and ``loss`` which must be stored in ``pred`` and ``loss`` respectively for the rest of the computation to work + * **Bypass model return and validation**: T5 training is only used for benchmarking and thus do not require the model to be returned or validated + * Result: trainable model but accuracy results are off and recovering the state_dict to store the model once trained blocks the program ad eternam. Only usable for benchmarking, to prove that FSDP's implementation inside giotto-deep is functionnal +* QATransformer + * **Make ``_MapStyleDataset`` picklable**: Torchtext provides a ``to_map_style_dataset`` function that transforms an iterable dataset into a map dataset (see pytorch's documentation for more info on the different dataset types). However, the ``_MapStyleDataset`` class it uses is defined inside the function which makes it unpicklable. Simply moving the class definition out of the function fixes the problem + * **Make ``TransformingDataset`` picklable**: Setting up serialisation for this class requires ``__getstate__`` and ``__setstate__`` methods to be defined + * **Make question_answering.py's classes and functions picklable**: Make sure all classes and functions used in the file are declared at the root of the file to make them picklable + * Result: Model not functionnal due to FSDP generating sparse tensors for unknown reasons. Sparse tensors do not implement ``backward`` which results in an error + +============== +Known problems +============== + +* Using FSPD with ``FULL_SHARD`` sharding strategy trains without problems but renders the program idle ad eternam once the trained model is returned using ``state_dict`` (trainer.py: ``parallel_train``). To minimize the risks of problems, only use ``SHARD_GRAD_OP`` or ``NO_SHARD`` sharding strategies. +* With some models, FSDP generates sparse tensors which don't implement ``backward`` and thus raise errors + + +*********************************** +Benchmarks +*********************************** + +Benchmarks were run on several multi-GPUs configurations to show and measure the benefit of the proposed parallelisation strategies. + +The system configurations used are: + +- 2x `Nvidia GeForce RTX 3090 `__ on a local machine + with NVIDIA-SMI 525.147.05, Driver Version: 525.147.05, CUDA Version: 12.0 +- 2-4x `Nvidia Tesla T4 `__ on `Google Kubernetes Engine `__ + with a Docker image based on `nvidia/cuda:12.2.0-runtime-ubuntu22.04 `__ +- 2-8x `Nvidia V100 `__ on `Google Kubernetes Engine `__ + with a Docker image based on `nvidia/cuda:12.2.0-runtime-ubuntu22.04 `__ + +The benchmark environment is available within this repository in folder :file:`benchmark/`. +The main script is :file:`benchmark.py` which executes pre-configured models in a standard way to measure the execution time of a model while +changing the batch size and parallelisation technique. The model and the number of GPUs is given as input. +The script adapts the model's arguments according to the needs of the model. +Figures are output to show the execution time vs batch size for every parallelisation technique. +A second batch of figures can be created once all benchmarks are finished; this allows to combine several single results to, for example, +show executions on different number of GPUs on the same graph. + +Other interesting scripts are :file:`genpods.py` and :file:`Dockerfile` which are able to generate Kubernetes pods and Docker images to execute the benchmarks in a Kubernetes cluster. + +The parallelisation techniques, that are also written as legend on the figures presented in this chapter, are: + +- *None*: the model was run on one GPU (without parallelisation) +- *FSDP Shard Grad Op*: the model was run on multiple GPUs using FSDP Shard Grad Op +- *FSDP No Shard*: the model was run on multiple GPUs using FSDP No Shard +- *Pipeline*: the model was run on multiple GPUs using the `pipeline tools `__ + +The default batch sizes are 2, 4, 8, 16, and 32. Each model is trained for 3 epochs. + +=============================== +Orbit 5k +=============================== + +For model orbit 5k, the benchmark environment exploits the example :file:`examples/parallel_orbit_5k.py`. + +On :numref:`benchmark-orbit5-2v100`, 3 distinct behaviours come out. +The line on the top, drawn by the pipeline execution, is the slowest execution. This is expected as the pipeline was designed to increase the total amount of memory used by the model instead of running the model faster. +The line in the middle, drawn by the non-parallel execution, shows the time required to train the model on one GPU. +The line at the bottom, drawn by the two FSDP executions, show that sharding the model on two GPUs reduces the execution time. + +.. _benchmark-orbit5-2v100: +.. figure:: _img/plot-2023-11-23-12-16-58-orbit5k-tesla-v100-sxm2-16gb-2.png + + Orbit5k --- 2x V100 + +On :numref:`benchmark-orbit5-2rtx3090`, the behaviour is the same for the pipeline and non-parallel executions. +The FSDP executions differ, however. And the execution with FSDP Shard Grad Op tends to join the non-parallel line. +This behaviour simply shows that, depending on the model and on the GPUs (available memory), different results are possible. + +.. _benchmark-orbit5-2rtx3090: +.. figure:: _img/plot-2023-11-23-12-16-58-orbit5k-nvidia-geforce-rtx-3090-2.png + + Orbit5k --- 2x GeForce RTX 3090 + +:numref:`benchmark-orbit5-8v100` shows the difference between the non-parallel execution and the FSDP executions on 8 GPUs. + +.. _benchmark-orbit5-8v100: +.. figure:: _img/plot-2023-11-23-12-16-58-orbit5k-tesla-v100-sxm2-16gb-8.png + + Orbit5k --- 8x V100 + +The orbit 5k is small and there is limited benefit to parallelise its execution. +However, it stays a good example and runs smoothly with ``giotto-deep``. + +=============================== +BERT +=============================== + +For model BERT, the benchmark environment exploits the example :file:`examples/parallel_bert.py`. + +:numref:`benchmark-bert-2v100`, :numref:`benchmark-bert-2t4`, and :numref:`benchmark-bert-2rtx3090` present the execution of the BERT model on two GPUs, on V100, on T4 and on RTX 3090. +These three figures show again that the results of a model may depend on the GPU model used. + +.. _benchmark-bert-2v100: +.. figure:: _img/plot-2023-11-23-12-16-58-bert-tesla-v100-sxm2-16gb-2.png + + BERT --- 2x V100 + +.. _benchmark-bert-2t4: +.. figure:: _img/plot-2023-11-23-12-16-58-bert-tesla-t4-2.png + + BERT --- 2x Tesla T4 + +.. _benchmark-bert-2rtx3090: +.. figure:: _img/plot-2023-11-23-12-16-58-bert-nvidia-geforce-rtx-3090-2.png + + BERT --- 2x GeForce RTX 3090 + +:numref:`benchmark-bert-4t4` and :numref:`benchmark-bertbig-2t4` present the execution of BERT on 4x Tesla T4 and BERT Big on 2x Tesla T4. +Each time showing an improvement of the execution time when using parallelisation. + +.. _benchmark-bert-4t4: +.. figure:: _img/plot-2023-11-23-12-16-58-bert-tesla-t4-4.png + + BERT --- 4x Tesla T4 + +.. _benchmark-bertbig-2t4: +.. figure:: _img/plot-2023-11-23-12-16-58-bertbig-tesla-t4-2.png + + BERT Big --- 2x Tesla T4 + +=============================== +Conclusion +=============================== + +The user is, in most cases, able to reduce the compute time with multiple GPUs. +The mileage varies, however, with the model specific features. +The parallelisation capacity of the model is thus a key element. diff --git a/examples/basic_tutorial_BERT.ipynb b/examples/basic_tutorial_BERT.ipynb index e1f9f7cd..e5b800c6 100644 --- a/examples/basic_tutorial_BERT.ipynb +++ b/examples/basic_tutorial_BERT.ipynb @@ -343,7 +343,7 @@ "metadata": {}, "outputs": [], "source": [ - "pipe = Trainer(model, (dl_tr, dl_val), loss, writer)" + "pipe = Trainer(model, (dl_tr,), loss, writer)" ] }, { diff --git a/examples/basic_tutorial_ensemble_learning.ipynb b/examples/basic_tutorial_ensemble_learning.ipynb index 07ec381b..9512d37b 100644 --- a/examples/basic_tutorial_ensemble_learning.ipynb +++ b/examples/basic_tutorial_ensemble_learning.ipynb @@ -183,7 +183,7 @@ "# initlaise the loss function\n", "loss_fn = nn.CrossEntropyLoss()\n", "# initialise the pipelien class\n", - "pipe = Trainer(model, (dl_tr, dl_ts), loss_fn, writer)\n", + "pipe = Trainer(model, (dl_tr,), loss_fn, writer)\n", "\n", "# initialise the SAM optimiser\n", "optim = SAMOptimizer(SGD) # this is a class, not an instance!\n", diff --git a/examples/basic_tutorial_hpo_and_benchmarking.ipynb b/examples/basic_tutorial_hpo_and_benchmarking.ipynb index f0faaf3b..526e7b9f 100644 --- a/examples/basic_tutorial_hpo_and_benchmarking.ipynb +++ b/examples/basic_tutorial_hpo_and_benchmarking.ipynb @@ -179,7 +179,7 @@ "\n", "# initialise pipeline class\n", "pipe = Trainer(\n", - " model, [dl_tr, None], loss_fn, writer, k_fold_class=StratifiedKFold(2, shuffle=True)\n", + " model, [dl_tr], loss_fn, writer, k_fold_class=StratifiedKFold(2, shuffle=True)\n", ")\n" ] }, @@ -356,7 +356,7 @@ "\n", "temp_dict = {}\n", "temp_dict[\"name\"] = \"double_tori\"\n", - "temp_dict[\"dataloaders\"] = (dl_tr, dl_ts)\n", + "temp_dict[\"dataloaders\"] = (dl_tr,)\n", "\n", "dataloaders_dicts.append(temp_dict)\n" ] diff --git a/examples/basic_tutorial_image.ipynb b/examples/basic_tutorial_image.ipynb index 5d45d1af..84a15a06 100644 --- a/examples/basic_tutorial_image.ipynb +++ b/examples/basic_tutorial_image.ipynb @@ -207,7 +207,7 @@ "loss_fn = nn.CrossEntropyLoss()\n", "\n", "# initilise the trainer class\n", - "pipe = Trainer(model, (dl_tr, dl_ts), loss_fn, writer)\n", + "pipe = Trainer(model, (dl_tr,), loss_fn, writer)\n", "\n", "# train the model\n", "pipe.train(\n", diff --git a/examples/basic_tutorial_question_answering.ipynb b/examples/basic_tutorial_question_answering.ipynb index 58306da7..81b1235b 100644 --- a/examples/basic_tutorial_question_answering.ipynb +++ b/examples/basic_tutorial_question_answering.ipynb @@ -289,7 +289,7 @@ "outputs": [], "source": [ "# prepare a pipeline class with the model, dataloaders loss_fn and tensorboard writer\n", - "pipe = Trainer(model, (dl_tr2, dl_ts2), loss_fn, writer)\n", + "pipe = Trainer(model, (dl_tr2,), loss_fn, writer)\n", "\n", "# train the model\n", "pipe.train(SGD, 3, False, {\"lr\": 0.01}, {\"batch_size\": 16})\n" diff --git a/examples/basic_tutorial_regression.ipynb b/examples/basic_tutorial_regression.ipynb index 0fef9098..2196d35a 100644 --- a/examples/basic_tutorial_regression.ipynb +++ b/examples/basic_tutorial_regression.ipynb @@ -177,7 +177,7 @@ "source": [ "loss_fn = nn.MSELoss()\n", "\n", - "pipe = Trainer(model, (dl_tr, dl_val), loss_fn, writer, l1_norm)\n", + "pipe = Trainer(model, (dl_tr,), loss_fn, writer, l1_norm)\n", "\n", "# train the model with learning rate scheduler\n", "pipe.train(\n", diff --git a/examples/basic_tutorial_tabular.ipynb b/examples/basic_tutorial_tabular.ipynb index 2a2d28d8..67088936 100644 --- a/examples/basic_tutorial_tabular.ipynb +++ b/examples/basic_tutorial_tabular.ipynb @@ -156,7 +156,7 @@ "# initialise the pipelien class\n", "pipe = Trainer(\n", " model,\n", - " (dl_tr, dl_val),\n", + " (dl_tr,),\n", " loss_fn,\n", " writer,\n", " k_fold_class=StratifiedKFold(3, shuffle=True),\n", diff --git a/examples/basic_tutorial_text_classification.ipynb b/examples/basic_tutorial_text_classification.ipynb index 5528da82..7b8f9017 100644 --- a/examples/basic_tutorial_text_classification.ipynb +++ b/examples/basic_tutorial_text_classification.ipynb @@ -247,7 +247,7 @@ "source": [ "loss_fn = nn.CrossEntropyLoss()\n", "\n", - "pipe = Trainer(model, (dl_tr2, dl_ts2), loss_fn, writer)\n", + "pipe = Trainer(model, (dl_tr2,), loss_fn, writer)\n", "\n", "# train the model\n", "pipe.train(SGD, 7, False, {\"lr\": 0.01}, {\"batch_size\": 20})\n" diff --git a/examples/basic_tutorial_translation.ipynb b/examples/basic_tutorial_translation.ipynb index 5c19bc9d..07ca112b 100644 --- a/examples/basic_tutorial_translation.ipynb +++ b/examples/basic_tutorial_translation.ipynb @@ -405,7 +405,7 @@ "outputs": [], "source": [ "# prepare a pipeline class with the model, dataloaders loss_fn and tensorboard writer\n", - "pipe = Trainer(model, (dl_tr, dl_val), loss_fn, writer)\n", + "pipe = Trainer(model, (dl_tr,), loss_fn, writer)\n", "\n", "# train the model\n", "pipe.train(\n", diff --git a/examples/compactification.ipynb b/examples/compactification.ipynb index bca3526d..6b5dce09 100644 --- a/examples/compactification.ipynb +++ b/examples/compactification.ipynb @@ -135,7 +135,7 @@ "# train NN\n", "model = FFNet(arch=[3, 3])\n", "\n", - "pipe = Trainer(model, (dl_tr, dl_ts), nn.CrossEntropyLoss(), writer)\n", + "pipe = Trainer(model, (dl_tr,), nn.CrossEntropyLoss(), writer)\n", "\n", "pipe.train(SGD, 5, False, {\"lr\": 0.01}, {\"batch_size\": 1})\n" ] diff --git a/examples/decision_boundary_tori.ipynb b/examples/decision_boundary_tori.ipynb index eb3b0277..9ed2d6bc 100644 --- a/examples/decision_boundary_tori.ipynb +++ b/examples/decision_boundary_tori.ipynb @@ -126,7 +126,7 @@ "# train NN\n", "model = FFNet(arch=[3, 10, 10, 2])\n", "print(model)\n", - "pipe = Trainer(model, (dl_tr, dl_ts), nn.CrossEntropyLoss(), writer)\n", + "pipe = Trainer(model, (dl_tr,), nn.CrossEntropyLoss(), writer)\n", "pipe.train(SGD, 5, False, {\"lr\": 0.01}, {\"batch_size\": 1})\n" ] }, diff --git a/examples/deploying_custom_regularizers.ipynb b/examples/deploying_custom_regularizers.ipynb index e414c054..22417d38 100644 --- a/examples/deploying_custom_regularizers.ipynb +++ b/examples/deploying_custom_regularizers.ipynb @@ -22,7 +22,9 @@ "cell_type": "code", "execution_count": null, "id": "313734c5", - "metadata": {}, + "metadata": { + "tags": [] + }, "outputs": [], "source": [ "import numpy as np\n", @@ -40,7 +42,7 @@ "from gdeep.utility import DEVICE\n", "from gdeep.search import HyperParameterOptimization\n", "from gdeep.models import FFNet\n", - "writer = GiottoSummaryWriter()" + "writer = GiottoSummaryWriter()\n" ] }, { @@ -77,7 +79,7 @@ " for parameter in model.parameters():\n", " total = total + self.lambda1 * torch.norm(parameter, 1) \\\n", " + self.lambda2 * torch.norm(parameter, 2)**2\n", - " return total " + " return total\n" ] }, { @@ -115,7 +117,7 @@ "X=np.stack([z1,z2],1)\n", "y=y.reshape(-1,1)\n", "y=y.astype(float)\n", - "X=X.astype(float)" + "X=X.astype(float)\n" ] }, { @@ -139,7 +141,7 @@ "\n", "class Net(nn.Module):\n", " def __init__(self,featdim='2'):\n", - " super(Net, self).__init__() \n", + " super(Net, self).__init__()\n", " self.flatten = nn.Flatten()\n", " self.linear_relu_stack = nn.Sequential(\n", " nn.Linear(eval(featdim), 1, bias=False),\n", @@ -148,7 +150,7 @@ " def forward(self, x):\n", " x = self.flatten(x)\n", " logits = self.linear_relu_stack(x)\n", - " return logits" + " return logits\n" ] }, { @@ -158,7 +160,7 @@ "metadata": {}, "outputs": [], "source": [ - "network=Net('2')" + "network=Net('2')\n" ] }, { @@ -169,7 +171,7 @@ "outputs": [], "source": [ "def l2_norm(prediction, y):\n", - " return torch.norm(prediction - y, p=2).to(DEVICE)" + " return torch.norm(prediction - y, p=2).to(DEVICE)\n" ] }, { @@ -181,7 +183,7 @@ "source": [ "loss_fn = nn.MSELoss()\n", "pipe = Trainer(network, (dl_tr, dl_val), loss_fn, writer,l2_norm)\n", - "pipe.train(SGD, 20, False, {\"lr\": 0.1})" + "pipe.train(SGD, 20, False, {\"lr\": 0.1})\n" ] }, { @@ -191,7 +193,7 @@ "metadata": {}, "outputs": [], "source": [ - "pipe2 = Trainer(network, (dl_tr, dl_val), loss_fn, writer,l2_norm,regularizer=TihonovRegularizer(0.2,p=1))" + "pipe2 = Trainer(network, (dl_tr, dl_val), loss_fn, writer,l2_norm,regularizer=TihonovRegularizer(0.2,p=1))\n" ] }, { @@ -201,7 +203,7 @@ "metadata": {}, "outputs": [], "source": [ - "pipe2.train(SGD, 20, False, {\"lr\": 0.01})" + "pipe2.train(SGD, 20, False, {\"lr\": 0.01})\n" ] }, { @@ -227,7 +229,7 @@ "optimizers_params = {\"lr\": [0.01]}\n", "dataloaders_params = {}\n", "models_hyperparams = {}\n", - "regularization_params={'regularizer':[reg],'lamda':[0.05,0.5,0.01],'p':[1]}" + "regularization_params={'regularizer':[reg],'lamda':[0.05,0.5,0.01],'p':[1]}\n" ] }, { @@ -247,7 +249,7 @@ " models_hyperparams,\n", " regularization_params=regularization_params,\n", " n_accumulated_grads=0,\n", - ")" + ")\n" ] }, { @@ -273,7 +275,7 @@ "optimizers_params = {\"lr\": [0.01]}\n", "dataloaders_params = {}\n", "models_hyperparams = {}\n", - "regularization_params={'reg':[reg], 'lambda1':[0.15,0.85,0.01],'lambda2':[0.0001,0.1,0.01]}" + "regularization_params={'reg':[reg], 'lambda1':[0.15,0.85,0.01],'lambda2':[0.0001,0.1,0.01]}\n" ] }, { @@ -292,7 +294,7 @@ " models_hyperparams,\n", " regularization_params=regularization_params,\n", " n_accumulated_grads=0,\n", - ")" + ")\n" ] }, { @@ -317,7 +319,7 @@ "S=1000\n", "X=np.linspace(0,2*np.pi,S)\n", "y=3*np.sin(X)+0.5*rng.standard_normal(S)\n", - "plt.plot(X,y)" + "plt.plot(X,y)\n" ] }, { @@ -332,8 +334,8 @@ "tensor_x_t=tensor_x_t.float()\n", "tensor_y_t = torch.from_numpy(train_y).reshape(-1, 1)\n", "tensor_y_t=tensor_y_t.float()\n", - "tensor_x_v = torch.Tensor(val_x)\n", - "tensor_y_v = torch.from_numpy(val_y)\n", + "tensor_x_v = torch.Tensor(val_x).reshape(-1, 1)\n", + "tensor_y_v = torch.from_numpy(val_y).reshape(-1, 1)\n", "train_dataset = TensorDataset(tensor_x_t,tensor_y_t)\n", "dl_tr = DataLoader(train_dataset,batch_size=10)\n", "val_dataset = TensorDataset(tensor_x_v,tensor_y_v)\n", @@ -348,7 +350,7 @@ " return self.seqmodel(x)\n", "\n", "\n", - "model = model1()" + "model = model1()\n" ] }, { @@ -368,7 +370,7 @@ "source": [ "loss_fn = nn.MSELoss()\n", "pipe = Trainer(model, (dl_tr, dl_val), loss_fn, writer,l2_norm)\n", - "pipe.train(SGD, 200, False, {\"lr\": 0.1})" + "pipe.train(SGD, 200, False, {\"lr\": 0.1})\n" ] }, { @@ -380,7 +382,7 @@ "source": [ "resp=pipe.model(dl_tr.dataset.tensors[0])\n", "X_t=dl_tr.dataset.tensors[0].detach().numpy().reshape(-1)\n", - "y_t=resp.detach().numpy().reshape(-1)" + "y_t=resp.detach().numpy().reshape(-1)\n" ] }, { @@ -399,7 +401,7 @@ "outputs": [], "source": [ "ind=np.argsort(X_t)\n", - "plt.plot(X_t[ind],y_t[ind])" + "plt.plot(X_t[ind],y_t[ind])\n" ] }, { @@ -429,10 +431,10 @@ " We penalized the squared values of the function at the points where it attains value higher than 2.\n", " \"\"\"\n", " res=model(self.X.reshape(-1,1)).reshape(-1)\n", - " inds1=torch.where(res>2) \n", + " inds1=torch.where(res>2)\n", " X1=self.X[inds1]\n", " res1=model(X1.reshape(-1,1)).reshape(-1)\n", - " return torch.sum(res1**2)" + " return torch.sum(res1**2)\n" ] }, { @@ -442,7 +444,7 @@ "metadata": {}, "outputs": [], "source": [ - "reg=CapReg(lamda=1/(2*S))" + "reg=CapReg(lamda=1/(2*S))\n" ] }, { @@ -470,7 +472,7 @@ "metadata": {}, "outputs": [], "source": [ - "pipe2.train(SGD, 10, False, {\"lr\": 0.1})" + "pipe2.train(SGD, 10, False, {\"lr\": 0.1})\n" ] }, { @@ -491,22 +493,20 @@ "resp=pipe2.model(dl_tr.dataset.tensors[0])\n", "X_t2=dl_tr.dataset.tensors[0].detach().numpy().reshape(-1)\n", "y_t2=resp.detach().numpy().reshape(-1)\n", - "ind2=np.argsort(X_t2)" + "ind2=np.argsort(X_t2)\n" ] }, { "cell_type": "code", "execution_count": null, "id": "3caaba36", - "metadata": { - "scrolled": false - }, + "metadata": {}, "outputs": [], "source": [ "plt.plot(X_t[ind],y_t[ind], label = 'unregularized')\n", "plt.plot(X_t2[ind2],y_t2[ind2], label = 'regularized')\n", "plt.legend()\n", - "plt.show()" + "plt.show()\n" ] }, { @@ -534,7 +534,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.9.17" + "version": "3.8.16" } }, "nbformat": 4, diff --git a/examples/orbit_5k_pipeline.ipynb b/examples/orbit_5k_pipeline.ipynb index f554b61b..cf566277 100644 --- a/examples/orbit_5k_pipeline.ipynb +++ b/examples/orbit_5k_pipeline.ipynb @@ -202,7 +202,7 @@ "\n", "loss_function = nn.CrossEntropyLoss()\n", "\n", - "trainer = Trainer(wrapped_model, [dl_train, dl_train], loss_function, writer) \n", + "trainer = Trainer(wrapped_model, [dl_train,], loss_function, writer) \n", "\n", "\n" ] diff --git a/examples/parallel_bert.py b/examples/parallel_bert.py new file mode 100644 index 00000000..3e3d00ba --- /dev/null +++ b/examples/parallel_bert.py @@ -0,0 +1,218 @@ +import os +import argparse +import functools +import pandas as pd +from pathlib import Path +import numpy as np +import urllib.request +import zipfile + +import torch +import torch.nn as nn +from torch.optim import Adam +from torch.distributed.fsdp.wrap import transformer_auto_wrap_policy +from torch.distributed.fsdp import BackwardPrefetch + +from gdeep.search.hpo import GiottoSummaryWriter +from gdeep.data.datasets import FromArray, DataLoaderBuilder +from gdeep.trainer.trainer import Trainer, Parallelism +import gdeep.utility_examples.args + +from sklearn.model_selection import train_test_split +from transformers import ( + BertTokenizer, + BertForSequenceClassification, +) +from transformers.models.bert.modeling_bert import BertLayer + + +def download_dataset(): + if not Path("cola_public").exists(): + req = urllib.request.urlretrieve( + "https://nyu-mll.github.io/CoLA/cola_public_1.1.zip" + ) + with zipfile.ZipFile(req[0], "r") as zip_ref: + zip_ref.extractall() + + +def main(args): + n_sentences_to_consider = 4000 + + tmp_path = Path("./cola_public") / "raw" / "in_domain_train.tsv" + df = pd.read_csv( + tmp_path, + delimiter="\t", + header=None, + names=["sentence_source", "label", "label_notes", "sentence"], + ) + # Get the lists of sentences and their labels. + sentences = df.sentence.values + labels = df.label.values + + sentences = sentences[0:n_sentences_to_consider] + labels = labels[0:n_sentences_to_consider] + tokenizer = BertTokenizer.from_pretrained("bert-base-uncased", do_lower_case=True) + # Tokenize all of the sentences and map the tokens to thier word IDs. + input_ids = [] + + for sentence in sentences: + + encoded_sentence = tokenizer.encode(sentence, add_special_tokens=True) + + # Add the encoded sentence to the list. + input_ids.append(encoded_sentence) + + # Print sentence 0, now as a list of IDs. + print("Original: ", sentences[0]) + print("Token IDs:", input_ids[0]) + + print("Max length: ", max([len(sen) for sen in input_ids])) + + MAX_LEN = 64 + + def pad_sequences( + sequences, + maxlen=None, + dtype="int32", + padding="pre", + truncating="pre", + value=0.0, + ): + if not hasattr(sequences, "__len__"): + raise ValueError("`sequences` must be iterable.") + num_samples = len(sequences) + + lengths = [] + sample_shape = () + flag = True + for x in sequences: + try: + lengths.append(len(x)) + if flag and len(x): + sample_shape = np.asarray(x).shape[1:] + flag = False + except TypeError as e: + raise ValueError( + "`sequences` must be a list of iterables. " + f"Found non-iterable: {str(x)}" + ) from e + + if maxlen is None: + maxlen = np.max(lengths) + + is_dtype_str = np.issubdtype(dtype, np.str_) or np.issubdtype( + dtype, np.unicode_ + ) + if isinstance(value, str) and dtype != object and not is_dtype_str: + raise ValueError( + f"`dtype` {dtype} is not compatible with `value`'s type: " + f"{type(value)}\nYou should set `dtype=object` for variable length " + "strings." + ) + + x = np.full((num_samples, maxlen) + sample_shape, value, dtype=dtype) + for idx, s in enumerate(sequences): + if not len(s): + continue # empty list/array was found + if truncating == "pre": + trunc = s[-maxlen:] + elif truncating == "post": + trunc = s[:maxlen] + else: + raise ValueError(f'Truncating type "{truncating}" not understood') + + # check `trunc` has expected shape + trunc = np.asarray(trunc, dtype=dtype) + if trunc.shape[1:] != sample_shape: + raise ValueError( + f"Shape of sample {trunc.shape[1:]} of sequence at " + f"position {idx} is different from expected shape " + f"{sample_shape}" + ) + + if padding == "post": + x[idx, : len(trunc)] = trunc + elif padding == "pre": + x[idx, -len(trunc) :] = trunc + else: + raise ValueError(f'Padding type "{padding}" not understood') + return x + + input_ids = pad_sequences( + input_ids, + maxlen=MAX_LEN, + dtype="long", + value=0, + truncating="post", + padding="post", + ) + + train_inputs, validation_inputs, train_labels, validation_labels = train_test_split( + input_ids, labels, random_state=13, test_size=0.1 + ) + + dl_builder = DataLoaderBuilder( + ( + FromArray(train_inputs, train_labels), + FromArray(validation_inputs, validation_labels), + ) + ) + + dl_tr, dl_val, _ = dl_builder.build( + ({"batch_size": args.batch_size}, {"batch_size": args.batch_size}) + ) + + if args.big_model: + model = BertForSequenceClassification.from_pretrained( + "bert-large-uncased", + num_labels=2, + output_attentions=True, + output_hidden_states=False, + ) + else: + model = BertForSequenceClassification.from_pretrained( + "bert-base-uncased", + num_labels=2, + output_attentions=True, + output_hidden_states=False, + ) + + # Define the trainer + + writer = GiottoSummaryWriter() + loss_function = nn.CrossEntropyLoss() + trainer = Trainer(model, (dl_tr, dl_val), loss_function, writer) + devices = list(range(torch.cuda.device_count())) + config_fsdp = { + "sharding_strategy": args.sharding.to_sharding_strategy(), + "auto_wrap_policy": functools.partial( + transformer_auto_wrap_policy, + transformer_layer_cls={ + BertLayer, + }, + ), + "backward_prefetch": BackwardPrefetch.BACKWARD_PRE, + } + parallel = Parallelism( + args.parallel, devices, len(devices), config_fsdp=config_fsdp, pipeline_chunks=2 + ) + + # train the model + + return trainer.train(Adam, args.n_epochs, parallel=parallel) + + +if __name__ == "__main__": + + parser = argparse.ArgumentParser(description="BERT Example") + gdeep.utility_examples.args.add_default_arguments(parser) + gdeep.utility_examples.args.add_big_model(parser) + parser.add_argument( + "--download", + action="store_true", + help="Download dataset if it does not exist already", + ) + args = parser.parse_args() + if args.download: + download_dataset() + main(args) diff --git a/examples/parallel_orbit_5k.py b/examples/parallel_orbit_5k.py new file mode 100644 index 00000000..0dd6ecb5 --- /dev/null +++ b/examples/parallel_orbit_5k.py @@ -0,0 +1,128 @@ +from typing import Tuple +from dataclasses import dataclass +import argparse +import functools + +import torch +import torch.nn as nn +from torch.optim import Adam +from torch.distributed.fsdp.wrap import transformer_auto_wrap_policy + +from gdeep.search.hpo import GiottoSummaryWriter +from gdeep.topology_layers import PersformerWrapper, persformer_block +from gdeep.topology_layers.persformer_config import PoolerType +from gdeep.trainer.trainer import Trainer, Parallelism +from gdeep.data.datasets import OrbitsGenerator, DataLoaderKwargs +import gdeep.utility_examples.args + + +def main(args): + # Generate a configuration file with the parameters of the desired dataset + @dataclass + class Orbit5kConfig: + batch_size_train: int = args.batch_size + num_orbits_per_class: int = 1000 + validation_percentage: float = 0.0 + test_percentage: float = 0.0 + num_jobs: int = 8 + dynamical_system: str = "classical_convention" + homology_dimensions: Tuple[int, int] = (0, 1) + dtype: str = "float32" + arbitrary_precision: bool = False + + config_data = Orbit5kConfig() + + # Define the OrbitsGenerator Class with the above parameters + + og = OrbitsGenerator( + num_orbits_per_class=config_data.num_orbits_per_class, + homology_dimensions=config_data.homology_dimensions, + validation_percentage=config_data.validation_percentage, + test_percentage=config_data.test_percentage, + n_jobs=config_data.num_jobs, + dynamical_system=config_data.dynamical_system, + dtype=config_data.dtype, + ) + + # Define the data loader + + dataloaders_dicts = DataLoaderKwargs( + train_kwargs={ + "batch_size": config_data.batch_size_train, + }, + val_kwargs={"batch_size": 4}, + test_kwargs={"batch_size": 3}, + ) + + if len(config_data.homology_dimensions) == 0: + dl_train, _, _ = og.get_dataloader_orbits(dataloaders_dicts) + else: + dl_train, _, _ = og.get_dataloader_persistence_diagrams(dataloaders_dicts) + + # Define the model by using a Wrapper for the Persformer model + + if args.big_model: + # Big model + wrapped_model = PersformerWrapper( + num_attention_layers=8, + num_attention_heads=32, + input_size=2 + 2, + output_size=5, + pooler_type=PoolerType.ATTENTION, + hidden_size=128, + intermediate_size=128, + ) + config_mha = [ + {"embed_dim": 128, "num_heads": 32, "dropout": 0.1, "batch_first": True} + ] * 9 + else: + # Small model + wrapped_model = PersformerWrapper( + num_attention_layers=2, + num_attention_heads=8, + input_size=2 + 2, + output_size=5, + pooler_type=PoolerType.ATTENTION, + hidden_size=16, + intermediate_size=16, + ) + config_mha = [ + {"embed_dim": 16, "num_heads": 8, "dropout": 0.1, "batch_first": True} + ] * 5 + + # Define the trainer + + writer = GiottoSummaryWriter() + loss_function = nn.CrossEntropyLoss() + trainer = Trainer(wrapped_model, [dl_train, dl_train], loss_function, writer) + devices = list(range(torch.cuda.device_count())) + config_fsdp = { + "sharding_strategy": args.sharding.to_sharding_strategy(), + "auto_wrap_policy": functools.partial( + transformer_auto_wrap_policy, + transformer_layer_cls={ + persformer_block.PersformerBlock, + }, + ), + } + parallel = Parallelism( + args.parallel, + devices, + len(devices), + config_fsdp=config_fsdp, + config_mha=config_mha, + pipeline_chunks=2, + ) + + # train the model + + return trainer.train(Adam, args.n_epochs, parallel=parallel) + + +if __name__ == "__main__": + + parser = argparse.ArgumentParser(description="Orbit 5k example") + gdeep.utility_examples.args.add_default_arguments(parser) + gdeep.utility_examples.args.add_big_model(parser) + args = parser.parse_args() + main(args) diff --git a/examples/pipeline_basic_image.py b/examples/pipeline_basic_image.py new file mode 100644 index 00000000..512b0f17 --- /dev/null +++ b/examples/pipeline_basic_image.py @@ -0,0 +1,179 @@ +import numpy as np +import torch +from torch import nn +import torch.nn.functional as F +from torch.optim import SGD +from torch.utils.data.sampler import SubsetRandomSampler +import torchvision.models as models +from gtda.diagrams import BettiCurve +from gtda.plotting import plot_betti_surfaces + +from gdeep.data.datasets import DatasetBuilder, DataLoaderBuilder +from gdeep.models import FFNet +from gdeep.visualization import persistence_diagrams_of_activations +from gdeep.data.preprocessors import ToTensorImage +from gdeep.trainer.trainer import Trainer, Parallelism, ParallelismType +from gdeep.models import ModelExtractor +from gdeep.analysis.interpretability import Interpreter +from gdeep.visualization import Visualiser +from gdeep.search import GiottoSummaryWriter +import argparse + +parser = argparse.ArgumentParser(description="Pipeline enabling") +parser.add_argument("--pipeline", default=False, action="store_true") +args = parser.parse_args() +pipeline_enabling = args.pipeline + +if pipeline_enabling: + print("Pipeline as been enabled") +else: + print("Pipeline is not enabled") + +writer = GiottoSummaryWriter() + +db = DatasetBuilder(name="CIFAR10") +ds_tr, ds_val, ds_ts = db.build(download=True) +NUMBER_OF_CLASSES = 10 + +transformation = ToTensorImage((32, 32)) +transformation.fit_to_dataset( + ds_tr +) # this is useless for this transformation, but in general this is the API + +transformed_ds_tr = transformation.attach_transform_to_dataset(ds_tr) +transformed_ds_val = transformation.attach_transform_to_dataset(ds_val) +transformed_ds_ts = transformation.attach_transform_to_dataset(ds_ts) + +# use only 320 images from cifar10 for training +train_indices = list(range(32 * 10)) +val_indices = list(range(32 * 5)) +test_indices = list(range(32 * 5)) +dl_tr, dl_val, dl_ts = DataLoaderBuilder( + (transformed_ds_tr, transformed_ds_val, transformed_ds_ts) +).build( + ( + {"batch_size": 32, "sampler": SubsetRandomSampler(train_indices)}, + {"batch_size": 32, "sampler": SubsetRandomSampler(val_indices)}, + {"batch_size": 32, "sampler": SubsetRandomSampler(test_indices)}, + ) +) + + +class Net(nn.Module): + def __init__(self): + super().__init__() + self.conv1 = nn.Conv2d(3, 6, 5) + self.pool = nn.MaxPool2d(2, 2) + self.conv2 = nn.Conv2d(6, 16, 5) + self.fc1 = nn.Linear(16 * 5 * 5, 120) + self.fc2 = nn.Linear(120, 84) + self.fc3 = nn.Linear(84, 10) + + def forward(self, x): + x = self.pool(F.relu(self.conv1(x))) + x = self.pool(F.relu(self.conv2(x))) + x = torch.flatten(x, 1) # flatten all dimensions except batch + x = F.relu(self.fc1(x)) + x = F.relu(self.fc2(x)) + x = self.fc3(x) + return x + + +model = Net() + +# define the loss function +loss_fn = nn.CrossEntropyLoss() + +# initilise the trainer class +pipe = Trainer(model, (dl_tr,), loss_fn, writer) + +devices = list(range(torch.cuda.device_count())) +parallel = Parallelism( + ParallelismType.PIPELINE, devices, len(devices), pipeline_chunks=2 +) + +if pipeline_enabling: + # train the model + pipe.train( + SGD, + 3, + False, + {"lr": 0.01}, + {"batch_size": 32, "sampler": SubsetRandomSampler(train_indices)}, + parallel=parallel, + ) + +else: + pipe.train( + SGD, + 3, + False, + {"lr": 0.01}, + {"batch_size": 32, "sampler": SubsetRandomSampler(train_indices)}, + ) + + +pipe.evaluate_classification(NUMBER_OF_CLASSES) + +# initialise the interpreter +inter = Interpreter(pipe.model, method="GuidedGradCam") + +# define a signle datum +datum = next(iter(dl_tr))[0][0].reshape(1, 3, 32, 32) + +# define the layer of which we are interested in displaying the features +layer = pipe.model.conv2 + +# we will test against this class +class_ = 0 + +# interpret the image +output = inter.interpret(datum, class_, layer) + +# visualise the interpreter +vs = Visualiser(pipe) +try: + vs.plot_interpreter_image(inter) +except AssertionError: + print("The heatmap is made of all zeros...") + + +# we now use another model: Saliency maps +inter2 = Interpreter(pipe.model, method="Saliency") + +# interpret the mage +output = inter2.interpret(datum, class_) + +# visualise the results +vs = Visualiser(pipe) +try: + vs.plot_interpreter_image(inter2) +except AssertionError: + print("The heatmap is made of all zeros...") + + +vs.plot_3d_dataset() + +me = ModelExtractor(pipe.model, loss_fn) + +list_of_layers = me.get_layers_param() + +for k, item in list_of_layers.items(): + print(k, item.shape) + +# the decision boundary will be available on tensorboard, in the projectors section. +x = next(iter(dl_tr))[0][0] +if x.dtype is not torch.int64: # cannot backpropagate on integers! + res = me.get_decision_boundary(x, n_epochs=1) + res.shape + +x = next(iter(dl_tr))[0] +list_activations = me.get_activations(x) +len(list_activations) + +batch = next(iter(dl_tr)) # a whole batch! +if batch[0].dtype is torch.float: # cannot backpropagate on integers! + for gradient in me.get_gradients(batch)[1]: + print(gradient.shape) + +vs.plot_persistence_diagrams(batch) diff --git a/examples/pipeline_orbit5k.py b/examples/pipeline_orbit5k.py new file mode 100644 index 00000000..10a784e1 --- /dev/null +++ b/examples/pipeline_orbit5k.py @@ -0,0 +1,192 @@ +# Include necessary general imports +import os +from typing import Tuple +from dataclasses import dataclass +import matplotlib.pyplot as plt + +# Torch imports + +import torch +import torch.nn as nn + +# Gdeep imports + +from gdeep.data import PreprocessingPipeline +from gdeep.data.datasets import PersistenceDiagramFromFiles +from gdeep.data.datasets.base_dataloaders import ( + DataLoaderBuilder, + DataLoaderParamsTuples, +) +from gdeep.data.datasets.persistence_diagrams_from_graphs_builder import ( + PersistenceDiagramFromGraphBuilder, +) +from gdeep.data.persistence_diagrams.one_hot_persistence_diagram import ( + OneHotEncodedPersistenceDiagram, + collate_fn_persistence_diagrams, +) +from gdeep.data.preprocessors import ( + FilterPersistenceDiagramByHomologyDimension, + FilterPersistenceDiagramByLifetime, + NormalizationPersistenceDiagram, +) +from gdeep.search.hpo import GiottoSummaryWriter +from gdeep.topology_layers import Persformer, PersformerConfig, PersformerWrapper +from gdeep.topology_layers.persformer_config import PoolerType +from gdeep.trainer.trainer import Trainer, Parallelism, ParallelismType +from gdeep.search import HyperParameterOptimization +from gdeep.utility import DEFAULT_GRAPH_DIR, PoolerType +from gdeep.utility.utils import autoreload_if_notebook +from gdeep.analysis.interpretability import Interpreter +from sklearn.model_selection import train_test_split +from torch.optim import Adam +from torch.utils.data import Subset +from gdeep.visualization import Visualiser +from gdeep.data.datasets import OrbitsGenerator, DataLoaderKwargs + +import argparse + +parser = argparse.ArgumentParser(description="Pipeline enabling") +parser.add_argument("--pipeline", default=False, action="store_true") +args = parser.parse_args() +pipeline_enabling = args.pipeline + +if pipeline_enabling: + print("Pipeline as been enabled") +else: + print("Pipeline is not enabled") + +# autoreload_if_notebook() + +# Generate a configuration file with the parameters of the desired dataset +@dataclass +class Orbit5kConfig: + batch_size_train: int = 4 + num_orbits_per_class: int = 32 + validation_percentage: float = 0.0 + test_percentage: float = 0.0 + num_jobs: int = 8 + dynamical_system: str = "classical_convention" + homology_dimensions: Tuple[int, int] = (0, 1) # type: ignore + dtype: str = "float32" + arbitrary_precision: bool = False + + +config_data = Orbit5kConfig() + +# Define the OrbitsGenerator Class with the above parameters + +og = OrbitsGenerator( + num_orbits_per_class=config_data.num_orbits_per_class, + homology_dimensions=config_data.homology_dimensions, + validation_percentage=config_data.validation_percentage, + test_percentage=config_data.test_percentage, + n_jobs=config_data.num_jobs, + dynamical_system=config_data.dynamical_system, + dtype=config_data.dtype, +) + + +# Define the data loader + +dataloaders_dicts = DataLoaderKwargs( + train_kwargs={ + "batch_size": config_data.batch_size_train, + }, + val_kwargs={"batch_size": 4}, + test_kwargs={"batch_size": 3}, +) + +if len(config_data.homology_dimensions) == 0: + dl_train, _, _ = og.get_dataloader_orbits(dataloaders_dicts) +else: + dl_train, _, _ = og.get_dataloader_persistence_diagrams(dataloaders_dicts) + + +# Get the orbits point clouds + +point_clouds = og.get_orbits() + +# For each rho value, plot one point cloud + +rho_values = [2.5, 3.5, 4.0, 4.1, 4.3] +fig, ax = plt.subplots(ncols=len(rho_values), figsize=(20, 3)) + +for i in range(len(rho_values)): + x, y = ( + point_clouds[i * config_data.num_orbits_per_class, :, 0], + point_clouds[i * config_data.num_orbits_per_class, :, 1], + ) + ax[i].scatter(x, y) + ax[i].set_title("Example of orbit for rho = " + str(rho_values[i])) + +# Define the model by using a Wrapper for the Persformer model + +wrapped_model = PersformerWrapper( + num_attention_layers=2, + num_attention_heads=8, + input_size=2 + 2, + output_size=5, + pooler_type=PoolerType.ATTENTION, + hidden_size=16, + intermediate_size=16, +) + +# Define the trainer + +writer = GiottoSummaryWriter() + +loss_function = nn.CrossEntropyLoss() + +trainer = Trainer( + wrapped_model, + [ + dl_train, + ], + loss_function, + writer, +) + +if pipeline_enabling: + configs = [ + {"embed_dim": 16, "num_heads": 8, "dropout": 0.1, "batch_first": True}, + {"embed_dim": 16, "num_heads": 8, "dropout": 0.1, "batch_first": True}, + {"embed_dim": 16, "num_heads": 8, "dropout": 0.1, "batch_first": True}, + {"embed_dim": 16, "num_heads": 8, "dropout": 0.1, "batch_first": True}, + {"embed_dim": 16, "num_heads": 8, "dropout": 0.1, "batch_first": True}, + ] + devices = list(range(torch.cuda.device_count())) + parallel = Parallelism( + ParallelismType.PIPELINE, + devices, + len(devices), + pipeline_chunks=2, + config_mha=configs, + ) + n_epoch = 1 + + trainer.train(Adam, n_epoch, parallel=parallel) + + +else: + # train the model for one epoch + n_epoch = 1 + + trainer.train(Adam, n_epoch) + + +# Initialize the Interpreter class in Saliency mode + +inter = Interpreter(trainer.model, method="Saliency") + +# Get a datum and its corresponding class + +batch = next(iter(dl_train)) +datum = batch[0][0].reshape(1, *(batch[0][0].shape)) +class_ = batch[1][0].item() + +# interpret the diagram +x, attr = inter.interpret(x=datum, y=class_) + +# visualise the results +vs = Visualiser(trainer) +vs.plot_attributions_persistence_diagrams(inter) diff --git a/examples/topo_deep_neural_networks.ipynb b/examples/topo_deep_neural_networks.ipynb index f42a8279..b05f6fd3 100644 --- a/examples/topo_deep_neural_networks.ipynb +++ b/examples/topo_deep_neural_networks.ipynb @@ -159,7 +159,7 @@ "for i in range(len(activation_functions)):\n", " model_temp = FFNet(arch = architecture, activation = activation_functions[i])\n", " writer_temp = GiottoSummaryWriter(log_dir='runs/' + model_temp.__class__.__name__ + activation_string[i])\n", - " trainer_temp = Trainer(model_temp, [dl_tr, dl_ts], loss_function, writer_temp)\n", + " trainer_temp = Trainer(model_temp, [dl_tr,], loss_function, writer_temp)\n", " models.append(model_temp)\n", " writers.append(writer_temp)\n", " trainers.append(trainer_temp)" diff --git a/gdeep/data/datasets/build_datasets.py b/gdeep/data/datasets/build_datasets.py index 94d8b523..6ba9e7fd 100644 --- a/gdeep/data/datasets/build_datasets.py +++ b/gdeep/data/datasets/build_datasets.py @@ -23,7 +23,7 @@ def __init__(self, dataset_name: str): self.dataset_name = dataset_name def __call__(self, **kwargs): # type: ignore - if 'root' not in kwargs: + if "root" not in kwargs: kwargs = dict(root=DEFAULT_DOWNLOAD_DIR, **kwargs) return datasets.__getattribute__(self.dataset_name)( # type: ignore **kwargs # type: ignore @@ -38,7 +38,7 @@ def __init__(self, dataset_name: str): self.dataset_name = dataset_name def __call__(self, **kwargs): # type: ignore - if 'root' not in kwargs: + if "root" not in kwargs: kwargs = dict(root=DEFAULT_DOWNLOAD_DIR, **kwargs) return textdatasets.__getattribute__(self.dataset_name)( # type: ignore **kwargs # type: ignore diff --git a/gdeep/models/extractor.py b/gdeep/models/extractor.py index e9fb4a1e..661e184c 100644 --- a/gdeep/models/extractor.py +++ b/gdeep/models/extractor.py @@ -1,15 +1,17 @@ -from typing import List, Dict, Callable, Tuple, Union from copy import copy +from typing import Callable, Dict, List, Tuple, Union import torch -from ..analysis.decision_boundary import GradientFlowDecisionBoundaryCalculator -from ..analysis.decision_boundary import UniformlySampledPoint -from . import SaveLayerOutput from gdeep.utility import DEVICE - from gdeep.utility.custom_types import Tensor +from ..analysis.decision_boundary import ( + GradientFlowDecisionBoundaryCalculator, + UniformlySampledPoint, +) +from . import SaveLayerOutput + class ModelExtractor: """This class wraps nn.Modules to extract @@ -36,10 +38,14 @@ class ModelExtractor: """ def __init__( - self, model: torch.nn.Module, loss_fn: Callable[[Tensor, Tensor], Tensor] + self, + model: torch.nn.Module, + loss_fn: Callable[[Tensor, Tensor], Tensor], + pipeline_train: bool = False, ) -> None: # self.model = model - self.model = model.to(DEVICE) + if not pipeline_train: + self.model = model.to(DEVICE) self.loss_fn = loss_fn def _send_to_device( diff --git a/gdeep/search/tests/test_benchmark.py b/gdeep/search/tests/test_benchmark.py index b526c6e7..4f4a8a78 100644 --- a/gdeep/search/tests/test_benchmark.py +++ b/gdeep/search/tests/test_benchmark.py @@ -87,7 +87,7 @@ def forward(self, x): dl_tr, dl_ts, _ = DataLoaderBuilder([ds_tr, ds_val]).build([{"batch_size": 48}, {"batch_size": 32}]) # type: ignore -temp_dict = {"name": "double_tori", "dataloaders": [dl_tr, dl_ts]} # type: ignore +temp_dict = {"name": "double_tori", "dataloaders": [dl_tr]} # type: ignore dataloaders_dicts.append(temp_dict) diff --git a/gdeep/search/tests/test_hpo.py b/gdeep/search/tests/test_hpo.py index 1acecee0..6514ed32 100644 --- a/gdeep/search/tests/test_hpo.py +++ b/gdeep/search/tests/test_hpo.py @@ -73,7 +73,7 @@ def test_hpo_failure(): # initialise pipeline class pipe = Trainer( model, - [dl_tr, None], # type: ignore + [dl_tr], # type: ignore loss_fn, writer, k_fold_class=StratifiedKFold(2, shuffle=True), @@ -96,7 +96,7 @@ def test_hpo_cross_val(): # initialise pipeline class pipe = Trainer( model, - [dl_tr, None], # type: ignore + [dl_tr], # type: ignore loss_fn, writer, k_fold_class=StratifiedKFold(2, shuffle=True), @@ -133,7 +133,7 @@ def test_hpo_accumulated_grads(): loss_fn = nn.CrossEntropyLoss() # initialise pipeline class - pipe = Trainer(model, [dl_tr, None], loss_fn, writer) # type: ignore + pipe = Trainer(model, [dl_tr], loss_fn, writer) # type: ignore # initialise gridsearch search = HyperParameterOptimization(pipe, "accuracy", 2, best_not_last=True) @@ -166,7 +166,7 @@ def test_hpo_loss(): loss_fn = nn.CrossEntropyLoss() # initialise pipeline class - pipe = Trainer(model, [dl_tr, None], loss_fn, writer) # type: ignore + pipe = Trainer(model, [dl_tr], loss_fn, writer) # type: ignore # initialise gridsearch search = HyperParameterOptimization(pipe, "loss", 2, best_not_last=True) @@ -190,7 +190,7 @@ def test_hpo_string_parameters(): loss_fn = nn.CrossEntropyLoss() # initialise pipeline class - pipe = Trainer(model, [dl_tr, None], loss_fn, writer) # type: ignore + pipe = Trainer(model, [dl_tr], loss_fn, writer) # type: ignore # initialise gridsearch search = HyperParameterOptimization(pipe, "loss", 2) @@ -243,7 +243,7 @@ def collate_fn(batch_tuple: List): # loss function loss_fn = nn.CrossEntropyLoss() # pipeline - pipe = Trainer(model, [dl_train, None], loss_fn, writer) # type: ignore + pipe = Trainer(model, [dl_train], loss_fn, writer) # type: ignore # initialise gridsearch search = HyperParameterOptimization(pipe, "loss", 2, best_not_last=True) @@ -266,7 +266,7 @@ def test_regularizer_optimization(): loss_fn = nn.CrossEntropyLoss() # initialise pipeline class - pipe = Trainer(model, [dl_tr, None], loss_fn, writer) # type: ignore + pipe = Trainer(model, [dl_tr], loss_fn, writer) # type: ignore # initialise gridsearch search = HyperParameterOptimization(pipe, "loss", 2, best_not_last=True) diff --git a/gdeep/trainer/__init__.py b/gdeep/trainer/__init__.py index 4f9df601..27659e55 100644 --- a/gdeep/trainer/__init__.py +++ b/gdeep/trainer/__init__.py @@ -7,6 +7,12 @@ from .regularizer import TopologicalRegularizerData -__all__ = ["Trainer", "accuracy", "TrainerConfig", - "Regularizer", "TopologicalRegularizer", - "TihonovRegularizer", "TopologicalRegularizerData"] +__all__ = [ + "Trainer", + "accuracy", + "TrainerConfig", + "Regularizer", + "TopologicalRegularizer", + "TihonovRegularizer", + "TopologicalRegularizerData", +] diff --git a/gdeep/trainer/regularizer.py b/gdeep/trainer/regularizer.py index 1f06898f..51d636fc 100644 --- a/gdeep/trainer/regularizer.py +++ b/gdeep/trainer/regularizer.py @@ -1,4 +1,4 @@ -#from gdeep.trainer import Trainer +# from gdeep.trainer import Trainer import os import copy import time @@ -71,7 +71,7 @@ def _local_homology_preprocess( n_neighbors=n_neighbors, homology_dimensions=homology_dimensions ) mod_pe = make_pipeline( - PersistenceEntropy(), FunctionTransformer(func=lambda X: 2 ** X) + PersistenceEntropy(), FunctionTransformer(func=lambda X: 2**X) ) pipe = make_pipeline(kn_lh, mod_pe) loc_dim_features = pipe.fit_transform(X) @@ -281,13 +281,13 @@ def _compute_critical_points( the complexity of the decision boundary: These are the ones that are active at the db: so born before it and die after it. Recall f: X -> [0,1], maps data to class 1 probability (class 0 probability) - + In that case the decision boundary: f^{-1}(1/2) - + We can simplify the boundary either by pushing the homology generator above it (delay its birth) or pushing the annihilator below it (expedite its death). We should do whichever is more economical, i.e., closer to the cutoff (1/2). Next we do just that. - + In the following code variable name containing 'rel_inds' is used to refer to indices with relative to the critical points. (i.e. rel_inds = 0 is the first critical pair) 'inds' without this qualifier are indices with respect to the coordinates (inds = 0: first vertex of coords) diff --git a/gdeep/trainer/tests/test_trainer.py b/gdeep/trainer/tests/test_trainer.py index c6c6d7cf..1e774f83 100644 --- a/gdeep/trainer/tests/test_trainer.py +++ b/gdeep/trainer/tests/test_trainer.py @@ -69,7 +69,7 @@ def test_trainer_from_array(): # tb writer writer = GiottoSummaryWriter() # pipeline - pipe = Trainer(model, [dl_tr, None], loss_fn, writer) # type: ignore + pipe = Trainer(model, [dl_tr], loss_fn, writer) # type: ignore # then one needs to train the model using the pipeline! pipe.train(SGD, 2, True, {"lr": 0.001}, n_accumulated_grads=2) @@ -86,13 +86,14 @@ def test_trainer_collate(): # tb writer writer = GiottoSummaryWriter() # pipeline - pipe = Trainer(model, [dl_tr, None], loss_fn, writer) # type: ignore + pipe = Trainer(model, [dl_tr], loss_fn, writer) # type: ignore # then one needs to train the model using the pipeline! pipe.train(SGD, 2, True, {"lr": 0.001}, n_accumulated_grads=2) # evaluation assert len(pipe.evaluate_classification(2)) == 3 - + + def test_regularizer_params_get_populated(): """ Test to Verify that the regularizer parameters get assigned @@ -134,7 +135,7 @@ def test_regularization_pipeline(): # pipeline reg = TihonovRegularizer(random.random(), p=random.randint(1, 10)) # pipeline - pipe = Trainer(model, [dl_tr, None], loss_fn, writer, regularizer=reg) + pipe = Trainer(model, [dl_tr], loss_fn, writer, regularizer=reg) pipe.train(SGD, 2, True, {"lr": 0.001}, n_accumulated_grads=2) diff --git a/gdeep/trainer/trainer.py b/gdeep/trainer/trainer.py index 34bd5782..a33aa03e 100644 --- a/gdeep/trainer/trainer.py +++ b/gdeep/trainer/trainer.py @@ -3,6 +3,7 @@ import time from functools import wraps import warnings +import gdeep.utility.multiprocessing as gmp from typing import ( Tuple, Optional, @@ -14,21 +15,30 @@ Union, TYPE_CHECKING, ) +import tempfile import torch.nn.functional as f from torch.optim import Optimizer import torch +import torch.distributed as dist +from torch.distributed.fsdp import ( + FullyShardedDataParallel as FSDP, + FullStateDictConfig, + StateDictType, + ShardingStrategy, +) from optuna.trial._base import BaseTrial import numpy as np from tqdm import tqdm from sklearn.model_selection._split import BaseCrossValidator from sklearn.model_selection import KFold, train_test_split -from torch.utils.data.sampler import SubsetRandomSampler +from gdeep.utility.sampler import GiottoSampler from torch.utils.data import DataLoader import optuna from datetime import datetime from torch.utils.tensorboard.writer import SummaryWriter from torch.optim.lr_scheduler import _LRScheduler # noqa +from enum import Enum, auto from ..utility.optimization import MissingClosureError from gdeep.models import ModelExtractor @@ -37,11 +47,14 @@ from gdeep.trainer.regularizer import Regularizer -#if TYPE_CHECKING: + +# if TYPE_CHECKING: # from gdeep.trainer.regularizer import Regularizer from .metrics import accuracy from gdeep.utility.custom_types import Tensor +from pipeline_tool.pipeline_config import PipelineConfig +from pipeline_tool.pipeline_tool import SkippableTracing try: import torch_xla.core.xla_model as xm # type: ignore @@ -78,6 +91,175 @@ def wrapper(*args, **kwargs): return wrapper +class ParallelismType(Enum): + """Type of multi-GPU parallelism to use for training""" + + _NONE = auto() + _DP = auto() + FSDP = auto() + PIPELINE = auto() + + def from_str(string): + if string.upper() == "FSDP": + return ParallelismType.FSDP + elif string.upper() == "PIPELINE": + return ParallelismType.PIPELINE + else: + return ParallelismType._NONE + + +class Parallelism: + """Stores the necessary informations to perform parallel training using the Trainer class. Some attributes are only used for FSDP (p_type = DDP, FSDP_ZERO2, FSDP_ZERO3) or pipeline_tool (p_type = PIPELINE) and are indicated as such in their description. No indication means they are used by both tools + Args: + p_type: + Type of parallelism training algorithm to use + devices: + Indices of the available GPUs to use for training + nb_device: + Actual number of GPUs to use from the ones referenced in devices. Asking for more GPUs than referenced in devices will result in an exception + config_fsdp: + [FSDP] FSDP configuration dictionary. See pytorch's FSDP documentation for more details + config_mha: + [pipeline_tool] Multihead Attention layers configuration in the transformer (if the model used is a transformer) + pipeline_chunks: + [pipeline_tool] Number of chunks to split the model into for pipelining + """ + + def __init__( + self, + p_type: ParallelismType, + devices: Tuple[int] = None, + nb_device: int = 0, + config_fsdp: Dict[str, Any] = {}, + config_mha: List[Dict[str, Any]] = [], + pipeline_chunks: int = 4, + ) -> None: + self.p_type = p_type + self.world_size = nb_device + self.rank = 0 + self.config_mha = config_mha + self.pipeline_chunks = pipeline_chunks + self.config_fsdp = config_fsdp + + if ( + p_type == ParallelismType.FSDP + and "sharding_strategy" in config_fsdp.keys() + and config_fsdp["sharding_strategy"] == ShardingStrategy.NO_SHARD + and not "auto_wrap_policy" in config_fsdp.keys() + ): + print( + "WARNING: You seem to be trying to use FSDP with sharding without providing any wrapping policy. \ + This will result in the model being unable to be sharded between the GPUs. Only use FSDP without \ + wrapping policy with sharding_strategy == ShardingStrategy.NO_SHARD" + ) + + if devices is None: + devices = list(range(torch.cuda.device_count())) + self.devices = [ + torch.device("cuda", x) for x in devices + ] # Convert device indices into torch devices + + # Verify proper use of devices + if self.world_size > len(self.devices): + raise ValueError("Cannot use more devices than those referenced in devices") + if self.world_size < 1: + self.world_size = len(self.devices) + + +def setup_env(): + """Setup the environment necessary for parallel training with RPC""" + os.environ["MASTER_ADDR"] = "localhost" + os.environ["MASTER_PORT"] = "12355" + + +def setup_fsdp(rank, world_size): + setup_env() + + # initialize the process group + dist.init_process_group("nccl", rank=rank, world_size=world_size) + + +def cleanup_fsdp(): + """Cleanup the parallel training environment""" + dist.destroy_process_group() + + +def parallel_train(rank, args, return_queue): + """Creates, configures and uses a sub instance of the Trainer class to train a model using parallel training + Args: + rank: + Index of the process used to execute this function + args: + Parameters to pass to the Trainer class + return_queue: + Multiprocessing queue to use to send return values back + """ + train_args = copy.deepcopy( + args + ) # Deepcopy arguments so they can be used independantly from those in other processes + train_args[ + "parallel" + ].p_type = ParallelismType._DP # Signal that this is a subinstance of Trainer + train_args["parallel"].devices = [ + args["parallel"].devices[rank] + ] # Use the device corresponding to the process + train_args["parallel"].rank = rank # Memorise rank (for in-class log filtering) + setup_fsdp(rank, len(args["parallel"].devices)) # Setup environment + torch.cuda.set_device( + rank + ) # Inform pytorch of the device that will be used by this process + + # Create Trainer subinstance + trainer = Trainer( + train_args["model"], + train_args["dataloaders"], + train_args["loss_fn"], + train_args["writer"](), + train_args["training_metric"], + train_args["k_fold_class"], + train_args["print_every"], + ) + + # Train + valloss, valacc = trainer.train( + train_args["optimizer"], + train_args["n_epochs"], + train_args["cross_validation"], + train_args["optimizers_param"], + train_args["dataloaders_param"], + train_args["lr_scheduler"], + train_args["scheduler_params"], + train_args["optuna_params"], + train_args["profiling"], + train_args["parallel_tpu"], + train_args["keep_training"], + train_args["store_grad_layer_hist"], + train_args["n_accumulated_grads"], + train_args["writer_tag"], + train_args["parallel"], + ) + + # if rank = 0, send valloss, valacc, and model parameters (if requested) to super instance + if return_queue is not None: + return_vals = [valloss, valacc] + if args["return_model"]: + save_policy = FullStateDictConfig(offload_to_cpu=True, rank0_only=True) + tmpf = tempfile.mkstemp() # Create temporary file + with FSDP.state_dict_type( + trainer.model, StateDictType.FULL_STATE_DICT, save_policy + ): + torch.save( + trainer.model.state_dict(), tmpf[1] + ) # Store Trainable parameters in the temporary file + os.close(tmpf[0]) + return_vals.append( + tmpf[1] + ) # Send temporary file path to super instance for recovery + return_queue.put(return_vals) + + cleanup_fsdp() + + class Trainer: """This is the generic class that allows the user to benchmark models over architectures @@ -88,8 +270,7 @@ class Trainer: model : standard torch model dataloaders (list of utils.DataLoader): - list of standard torch DataLoaders, e.g. - `[dl_tr, dl_val, dl_ts]` + list of standard torch DataLoaders, e.g. `[dl_tr, dl_val, dl_ts]`. The list may contain 1, 2 or 3 dataloaders. In the case where 1 datalaoder is given, it will be automatically split into a training (80%) and a validation (20%) dataloader. When 3 dataloaders are given, the third dataloader is never used loss_fn : loss function to average over batches writer : @@ -196,6 +377,8 @@ def __init__( # def tmploss(X,y): # return loss_fn(X,y) + self.regularizer.regularization_penalty(self.model) # self.train_loss_fn = tmploss + # device + self.device = DEVICE if not k_fold_class: self.k_fold_class = KFold(5, shuffle=True) else: @@ -204,14 +387,16 @@ def __init__( def _set_initial_model(self) -> None: """This private method is used to set the initial_model""" - self.initial_model = copy.deepcopy(self.model) + if self.parallel.p_type != ParallelismType._DP: + self.initial_model = copy.deepcopy(self.model) def _reset_model(self) -> None: """Private method to reset the initial model weights. This function is essential for the cross-validation procedure. """ - self.model = copy.deepcopy(self.initial_model) + if self.parallel.p_type != ParallelismType._DP: + self.model = copy.deepcopy(self.initial_model) def _optimisation_step( self, @@ -226,7 +411,7 @@ def _optimisation_step( if self.n_accumulated_grads < 2: # usual case for stochastic gradient descent self.optimizer.zero_grad() loss.backward() - if DEVICE.type == "xla": + if self.device.type == "xla": xm.optimizer_step( self.optimizer, barrier=True ) # Note: Cloud TPU-specific code! @@ -242,7 +427,7 @@ def _optimisation_step( if ( batch + 1 ) % self.n_accumulated_grads == 0: # do the optimization step only after the accumulations - if DEVICE.type == "xla": + if self.device.type == "xla": xm.optimizer_step( self.optimizer, barrier=True ) # Note: Cloud TPU-specific code! @@ -255,26 +440,27 @@ def _optimisation_step( except (MissingClosureError,): self.optimizer.step(closure) # type: ignore self.optimizer.zero_grad() + epoch_loss += loss.item() if batch % self.print_every == 0: - epoch_loss += loss.item() - print( - f"Batch training loss: {epoch_loss / (batch + 1)}", - f" \tBatch training {self.training_metric.__name__}: ", - batch_metric, - " \t[", - batch + 1, - "/", - steps, - "] ", - end="\r", - ) + if self.parallel.rank == 0: + print( + f"Batch training loss: {epoch_loss / (batch + 1)}", + f" \tBatch training {self.training_metric.__name__}: ", + batch_metric, + " \t[", + batch + 1, + "/", + steps, + "] ", + end="\r", + ) return epoch_loss def _send_to_device( self, x: Union[Tensor, List[Tensor]], y: Tensor ) -> Tuple[Tensor, Union[Tensor, List[Tensor]], Tensor]: """use this private method to send the - ``x`` and ``y`` to the ``DEVICE``. + ``x`` and ``y`` to the ``self.device``. Args: x: @@ -288,19 +474,33 @@ def _send_to_device( """ new_x: List[Tensor] = [] - if isinstance(x, tuple) or isinstance(x, list): - for xi in x: - new_x.append(xi.to(DEVICE)) - x = new_x - prediction = self.model(*x) - if hasattr(prediction, "logits"): # unwrapper for HuggingFace BERT model - prediction = prediction.logits # unwrapper for HuggingFace BERT model + + if self.parallel.p_type == ParallelismType.PIPELINE: + x = x.to(0) + y = y.to(self.nb_gpus - 1) + prediction = self.model(x).local_value() else: - x = x.to(DEVICE) - prediction = self.model(x) - if hasattr(prediction, "logits"): # unwrapper for HuggingFace BERT model - prediction = prediction.logits # unwrapper for HuggingFace BERT model - y = y.to(DEVICE) + if isinstance(x, tuple) or isinstance(x, list): + for xi in x: + new_x.append(xi.to(self.device)) + x = new_x + prediction = self.model(*x) + if hasattr( + prediction, "logits" + ): # unwrapper for HuggingFace BERT model + prediction = ( + prediction.logits + ) # unwrapper for HuggingFace BERT model + else: + x = x.to(self.device) + prediction = self.model(x) + if hasattr( + prediction, "logits" + ): # unwrapper for HuggingFace BERT model + prediction = ( + prediction.logits + ) # unwrapper for HuggingFace BERT model + y = y.to(self.device) return prediction, x, y @@ -318,6 +518,7 @@ def _inner_train_loop( self.prof.start() metric_list = [] epoch_loss = 0.0 + ddp_loss = torch.zeros(2).to(self.device) for batch, (X, y) in enumerate(dl_tr): def closure() -> Tensor: @@ -328,11 +529,12 @@ def closure() -> Tensor: pred, X, y = self._send_to_device(X, y) batch_metric = self.training_metric(pred, y) metric_list.append(batch_metric) - loss = self.loss_fn(pred,y) + loss = self.loss_fn(pred, y) if self.regularizer is not None: penalty = self.regularizer.regularization_penalty(self.model) loss += penalty - + ddp_loss[0] += loss.item() + ddp_loss[1] += len(X) # Save to tensorboard try: self.writer.add_scalar( # type: ignore @@ -364,7 +566,8 @@ def closure() -> Tensor: if self.prof is not None: self.prof.step() - + if self.parallel.p_type == ParallelismType._DP: + dist.all_reduce(ddp_loss, op=dist.ReduceOp.SUM) if self.prof is not None: self.prof.stop() @@ -381,7 +584,10 @@ def _train_loop( """private method to run a single training loop """ - self.model = self.model.to(DEVICE) + if ( + self.parallel.p_type == ParallelismType._NONE + ): # Once setup, FSDP handles the model's device + self.model = self.model.to(self.device) self.model.train() try: length: int = len(dl_tr.sampler.indices) # type: ignore @@ -418,7 +624,10 @@ def _val_loop( """private method to run a single validation loop """ - self.model = self.model.to(DEVICE) + if ( + self.parallel.p_type == ParallelismType._NONE + ): # FSDP handles the model's device + self.model = self.model.to(self.device) try: size = len(dl_val.sampler.indices) # type: ignore except AttributeError: @@ -514,6 +723,7 @@ def _inner_loop( pred_list = [] batch_metric_list = [] loss = 0.0 + ddp_loss = torch.zeros(3).to(self.device) with torch.no_grad(): for X, y in dl: pred, X, y = self._send_to_device(X, y) @@ -521,9 +731,15 @@ def _inner_loop( class_probs_batch = [f.softmax(el, dim=0) for el in pred] class_probs.append(class_probs_batch) loss += self.loss_fn(pred, y).item() + ddp_loss[0] += loss batch_metric = self.training_metric(pred, y) batch_metric_list.append(batch_metric) class_label.append(y) + pred = pred.argmax(dim=1, keepdim=True) + ddp_loss[1] += loss + ddp_loss[2] += len(X) + if self.parallel.p_type == ParallelismType._DP: + dist.all_reduce(ddp_loss, op=dist.ReduceOp.SUM) epoch_metric = sum(batch_metric_list) / len(batch_metric_list) epoch_loss = loss / len(batch_metric_list) return pred_list, epoch_loss, epoch_metric @@ -533,11 +749,9 @@ def _init_profiler( ) -> None: """initialise the profler for profiling""" # profiling - active: int = 10 - if not cross_validation: - active = n_epochs - 2 - else: - active = k_folds * (n_epochs - 2) + active: int = 2 + if cross_validation: + active *= k_folds if profiling: try: @@ -547,7 +761,7 @@ def _init_profiler( torch.profiler.ProfilerActivity.CUDA, # type: ignore ], schedule=torch.profiler.schedule( # type: ignore - wait=1, warmup=1, active=active, repeat=1 + wait=1, warmup=3, active=active, repeat=2 ), on_trace_ready=torch.profiler.tensorboard_trace_handler( # type: ignore os.path.join( @@ -557,14 +771,14 @@ def _init_profiler( self.model.__class__.__name__ + str(datetime.today()) ).replace(":", "-"), ), - worker_name="worker", + # worker_name="worker", ), record_shapes=True, - profile_memory=True - # with_stack=True + profile_memory=True, + with_stack=True, ) - except AssertionError: - pass + except AssertionError as e: + print(f"PID({os.getpid()}) Error: {e}") def _init_optimizer_and_scheduler( self, @@ -616,6 +830,7 @@ def train( store_grad_layer_hist: bool = False, n_accumulated_grads: int = 0, writer_tag: str = "", + parallel: Optional[Parallelism] = None, ) -> Tuple[float, float]: """Function to run all the training cycles. @@ -629,7 +844,7 @@ def train( dataloaders_param: dictionary of the dataloaders parameters, e.g. `{'batch_size': 32}`. If ``None``, then - the parameters of the training and validation + the parameters of the validation (if any) or training dataloaders will be used. optimizers_param: dictionary of the optimizers @@ -661,6 +876,8 @@ def train( Only a positive number will be taken into account writer_tag: the tensorboard writer tag + parallel: + The type of parallel training algorithm to use. If None, the default basic training will be used, else, the algorithm selected in the p_type member of the Parallelism object will be used. The Parallelism object must also contain a list of usable GPU indices as well as the number of those GPUs to actually use for the training Returns: (float, float): @@ -669,6 +886,17 @@ def train( is ignored. On the other hand, if there `cross_validation = False` then the test loss and accuracy is returned. """ + + if parallel_tpu and parallel is not None: + print( + f"Parallel TPU and parallel training cannot be enabled at the same time. Choose only one of them." + ) + exit() + + if parallel is None: + parallel = Parallelism(ParallelismType._NONE) + self.parallel = parallel + self.nb_gpus = self.parallel.world_size self.n_accumulated_grads = n_accumulated_grads self.store_grad_layer_hist = store_grad_layer_hist # to start the training from where we left @@ -682,7 +910,7 @@ def train( # dataloaders_param initialisation if dataloaders_param is None: - if self.dataloaders[1] is not None: + if len(self.dataloaders) > 1: dataloaders_param_val = Trainer.copy_dataloader_params( self.dataloaders[1] ) @@ -727,9 +955,16 @@ def train( except KeyError: pass + if self.parallel.p_type == ParallelismType._NONE: + world_size = 1 + rank = 0 + else: + world_size = self.parallel.world_size + rank = self.parallel.rank + # validation being the 20% in the case of 2 # dataloders without crossvalidation - if len(self.dataloaders) == 3: # type: ignore + if len(self.dataloaders) >= 2: # type: ignore try: val_idx = self.dataloaders[1].sampler.indices # type: ignore except AttributeError: @@ -739,7 +974,7 @@ def train( self.dataloaders[1].dataset, # pin_memory=True, **dataloaders_param_val, - sampler=SubsetRandomSampler(val_idx), + sampler=GiottoSampler(val_idx, world_size=world_size, rank=rank), ) try: tr_idx = self.dataloaders[0].sampler.indices # type: ignore @@ -750,7 +985,7 @@ def train( self.dataloaders[0].dataset, # pin_memory=True, **dataloaders_param_tr, - sampler=SubsetRandomSampler(tr_idx), + sampler=GiottoSampler(tr_idx, world_size=world_size, rank=rank), ) else: try: @@ -763,13 +998,13 @@ def train( self.dataloaders[0].dataset, # pin_memory=True, **dataloaders_param_val, - sampler=SubsetRandomSampler(val_idx), + sampler=GiottoSampler(val_idx, world_size=world_size, rank=rank), ) dl_tr = torch.utils.data.DataLoader( # type: ignore self.dataloaders[0].dataset, # pin_memory=True, **dataloaders_param_tr, - sampler=SubsetRandomSampler(tr_idx), + sampler=GiottoSampler(tr_idx, world_size=world_size, rank=rank), ) if cross_validation: @@ -804,6 +1039,14 @@ def train( scheduler_params, ) + if ( + self.parallel.p_type == ParallelismType.PIPELINE + and not os.name == "nt" + ): + self._pipelined_model( + self.parallel.pipeline_chunks, self.parallel.config_mha, dl_tr + ) + # re-initialise data loaders if len(self.dataloaders) == 3: warnings.warn( @@ -813,18 +1056,56 @@ def train( self.dataloaders[0].dataset, # pin_memory=True, **dataloaders_param_tr, - sampler=SubsetRandomSampler(tr_idx), + sampler=GiottoSampler(tr_idx), ) dl_val = torch.utils.data.DataLoader( # type: ignore self.dataloaders[0].dataset, # pin_memory=True, **dataloaders_param_val, - sampler=SubsetRandomSampler(val_idx), + sampler=GiottoSampler(val_idx), ) + # print n-th fold print("\n\n********** Fold ", fold + 1, "**************") + + if ( + self.parallel.p_type == ParallelismType.FSDP + ): # Must only get there with parallelism type != _DP + child_args = { + "model": self.model, + "dataloaders": self.dataloaders, + "loss_fn": self.loss_fn, + "writer": type(self.writer), + "training_metric": self.training_metric, + "k_fold_class": self.k_fold_class, + "print_every": self.print_every, + "parallel": self.parallel, + "optimizer": optimizer, + "n_epochs": n_epochs, + "cross_validation": False, + "optimizers_param": optimizers_param, + "dataloaders_param": dataloaders_param, + "lr_scheduler": lr_scheduler, + "scheduler_params": scheduler_params, + "optuna_params": optuna_params, + "profiling": profiling, + "parallel_tpu": parallel_tpu, + "keep_training": keep_training, + "store_grad_layer_hist": store_grad_layer_hist, + "n_accumulated_grads": n_accumulated_grads, + "writer_tag": writer_tag, + "return_model": False, + } + + return_vals = gmp.spawn( + parallel_train, + args=(child_args,), + nprocs=self.parallel.world_size, + ) + valloss, valacc = return_vals[0], return_vals[1] + # the training and validation loop - if parallel_tpu == False: + elif parallel_tpu == False: valloss, valacc = self._training_loops( n_epochs, dl_tr, @@ -874,6 +1155,53 @@ def train( valacc = torch.mean(torch.tensor(mean_val_acc)).item() else: + if self.parallel.p_type == ParallelismType.FSDP: + child_args = { + "model": self.model, + "dataloaders": self.dataloaders, + "loss_fn": self.loss_fn, + "writer": type(self.writer), + "training_metric": self.training_metric, + "k_fold_class": self.k_fold_class, + "print_every": self.print_every, + "parallel": self.parallel, + "optimizer": optimizer, + "n_epochs": n_epochs, + "cross_validation": cross_validation, + "optimizers_param": optimizers_param, + "dataloaders_param": dataloaders_param, + "lr_scheduler": lr_scheduler, + "scheduler_params": scheduler_params, + "optuna_params": optuna_params, + "profiling": profiling, + "parallel_tpu": parallel_tpu, + "keep_training": keep_training, + "store_grad_layer_hist": store_grad_layer_hist, + "n_accumulated_grads": n_accumulated_grads, + "writer_tag": writer_tag, + "return_model": True, + } + + return_vals = gmp.spawn( + parallel_train, args=(child_args,), nprocs=self.parallel.world_size + ) + self.model.load_state_dict(torch.load(return_vals[2])) + os.remove(return_vals[2]) + self.check_has_trained = True + return return_vals[0], return_vals[1] + elif self.parallel.p_type == ParallelismType._DP: + self.device = self.parallel.devices[0] + # Setup FSDP + print(f"PID={os.getpid()}: sending model to device {self.device}") + if "device_id" not in self.parallel.config_fsdp.keys(): + self.parallel.config_fsdp["device_id"] = self.device + else: + print( + 'WARNING: You provided a custom value for "device_id" in the FSDP config. This will \ + prevent the use of multiple GPUs. Only do this if you know what you are doing' + ) + self.model = FSDP(self.model, **self.parallel.config_fsdp) + self._init_optimizer_and_scheduler( keep_training, cross_validation, @@ -883,6 +1211,11 @@ def train( scheduler_params, ) + if self.parallel.p_type == ParallelismType.PIPELINE and not os.name == "nt": + self._pipelined_model( + self.parallel.pipeline_chunks, self.parallel.config_mha, dl_tr + ) + if not parallel_tpu: valloss, valacc = self._training_loops( n_epochs, @@ -917,6 +1250,20 @@ def train( # check for training self.check_has_trained = True + if self.parallel.p_type == ParallelismType.PIPELINE: + trained_weights = {} + for (_, value_src), (key, _) in zip( + self.model.state_dict().items(), self.model_saved.state_dict().items() + ): + trained_weights[key] = value_src + # Load the new weights on the base model + self.model_saved.load_state_dict(trained_weights) + # Set the model back for next steps + self.model = self.model_saved + self.model_saved = None + self.pipeline_train = False + self.model.to(DEVICE) + # put the mean of the cross_val return valloss, valacc @@ -971,7 +1318,8 @@ def _training_loops( self.val_epoch = t self.train_epoch = t self._train_loop(dl_tr, writer_tag) - me = ModelExtractor(self.model, self.loss_fn) + pipeline_train = self.parallel.p_type == ParallelismType.PIPELINE + me = ModelExtractor(self.model, self.loss_fn, pipeline_train) if self.store_grad_layer_hist: try: lista = me.get_layers_param() @@ -1251,6 +1599,9 @@ def evaluate_classification( (float, float, 2darray): the accuracy, loss and confusion matrix. """ + self.model.to( + self.device + ) # Send model to device in case it wasn't trained on it previously. WARNING: model does not fit on GPU -> OOM if dl is None: dl = self.dataloaders[0] class_probs: List[List[Tensor]] = [] @@ -1326,3 +1677,47 @@ def copy_dataloader_params( "prefetch_factor": original_dataloader.prefetch_factor, "persistent_workers": original_dataloader.persistent_workers, } + + def _pipelined_model(self, nb_chunks, config_mha, dl_tr): + setup_env() + + input_shape = None + output_shape = None + dtype = None + + for input, label in dl_tr: + input_shape = input.shape + dtype = str(label.dtype) + dtype = dtype.split(".", 1)[1] + output_shape = label.shape + break + + config = PipelineConfig( + input_shape=input_shape, + output_shape=output_shape, + data_type=dtype, + config_mha=config_mha, + ) + # Generate the piped model + trace = SkippableTracing(self.nb_gpus, self.model, config) + torch.distributed.rpc.init_rpc("worker", rank=0, world_size=1) + model_pipe = trace.get_modules() + try: + from torch.distributed.pipeline.sync import Pipe + + model_pipe = Pipe(model_pipe, nb_chunks) + except ImportError: + print("Windows does not support distributed computing") + pass + + # Get weights from the model and set them in the piped model + self.saved_weights = {} + for (_, value_src), (key, _) in zip( + self.model.state_dict().items(), model_pipe.state_dict().items() + ): + self.saved_weights[key] = value_src + model_pipe.load_state_dict(self.saved_weights) + + # Save the base model. We only use the piped model for training + self.model_saved = self.model + self.model = model_pipe diff --git a/gdeep/utility/multiprocessing.py b/gdeep/utility/multiprocessing.py new file mode 100644 index 00000000..ed4cb4e5 --- /dev/null +++ b/gdeep/utility/multiprocessing.py @@ -0,0 +1,32 @@ +import multiprocessing as pmp + +import torch.multiprocessing as tmp +from torch.multiprocessing.spawn import _wrap + + +# This code is heavily inspired from +# https://github.com/pytorch/pytorch/blob/v1.13.1/torch/multiprocessing/spawn.py#L178 +def spawn(fn, args=(), nprocs=1): + mp = pmp.get_context("spawn") + error_queues = [] + processes = [] + return_queue = mp.SimpleQueue() + for i in range(nprocs): + error_queue = mp.SimpleQueue() + wrap_args = (*args, return_queue if i == 0 else None) + process = mp.Process( + target=_wrap, + args=(fn, i, wrap_args, error_queue), + daemon=False, + ) + process.start() + error_queues.append(error_queue) + processes.append(process) + + context = tmp.ProcessContext(processes, error_queues) + + # Loop on join until it returns True or raises an exception. + while not context.join(): + pass + + return return_queue.get() diff --git a/gdeep/utility/sampler.py b/gdeep/utility/sampler.py new file mode 100644 index 00000000..1caf8d64 --- /dev/null +++ b/gdeep/utility/sampler.py @@ -0,0 +1,54 @@ +from torch.utils.data.sampler import Sampler +import torch +from typing import Optional, TypeVar, Iterator, Sequence +import math + +T_co = TypeVar("T_co", covariant=True) + + +class GiottoSampler(Sampler[T_co]): + def __init__( + self, + indices: Sequence[T_co], + shuffle: bool = True, + world_size: int = 1, + rank: int = 0, + ) -> None: + self.indices = indices + self.shuffle = shuffle + self.world_size = world_size + self.rank = rank + self.indices_per_rank = math.ceil(len(self.indices) / self.world_size) + self.total_size = self.indices_per_rank * self.world_size + self.padding_size = self.total_size - len(self.indices) + self.epoch = 0 + + def __iter__(self) -> Iterator[T_co]: + # Shuffle if needed + if self.shuffle: + g = torch.Generator() + g.manual_seed(self.epoch) + curr_indices_perm = torch.randperm(len(self.indices), generator=g).tolist() + else: + curr_indices_perm = list(self.indices) + + # Add padding to indices list if needed + if self.padding_size <= len(self.indices): + curr_indices_perm += curr_indices_perm[: self.padding_size] + else: + curr_indices_perm += ( + curr_indices_perm + * math.ceil(self.padding_size / len(curr_indices_perm)) + )[: self.padding_size] + + # Subsample indices for current rank + indices = curr_indices_perm[self.rank : self.total_size : self.world_size] + + for i in indices: + yield self.indices[i] + + def __len__(self) -> int: + return self.indices_per_rank + + def new_epoch(self): + self.epoch += 1 diff --git a/gdeep/utility/utils.py b/gdeep/utility/utils.py index 44c034d3..91a2eb7e 100644 --- a/gdeep/utility/utils.py +++ b/gdeep/utility/utils.py @@ -100,6 +100,7 @@ def save_model_and_optimizer( .replace("(", "") .replace(":", "") .replace(")", "") + .replace(" ", "") + "-" + trial_id + ".pth", diff --git a/gdeep/utility_examples/__init__.py b/gdeep/utility_examples/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/gdeep/utility_examples/args.py b/gdeep/utility_examples/args.py new file mode 100644 index 00000000..5705a232 --- /dev/null +++ b/gdeep/utility_examples/args.py @@ -0,0 +1,39 @@ +"""Modify arguments of examples.""" + +import argparse +from gdeep.trainer.trainer import ParallelismType +from gdeep.utility_examples.fsdp import ShardingStrategyEx + + +def add_default_arguments(parser: argparse.ArgumentParser): + parser.add_argument( + "--parallel", + type=ParallelismType.from_str, + default=ParallelismType._NONE, + help="Parallelism type to use for training (default: none)", + ) + parser.add_argument( + "--batch-size", + type=int, + default=4, + metavar="N", + help="input batch size for training (default: %(default)s)", + ) + parser.add_argument( + "--n-epochs", + type=int, + default=1, + metavar="N", + help="Number of epochs to train for (default: %(default)s)", + ) + parser.add_argument( + "--sharding", + type=ShardingStrategyEx.from_str, + choices=[x for x in ShardingStrategyEx], + default=ShardingStrategyEx.SHARD_GRAD_OP, + help="Sharding strategy for FSDP (default: %(default)s)", + ) + + +def add_big_model(parser: argparse.ArgumentParser): + parser.add_argument("--big-model", action="store_true", help="Use the big model") diff --git a/gdeep/utility_examples/fsdp.py b/gdeep/utility_examples/fsdp.py new file mode 100644 index 00000000..e334960b --- /dev/null +++ b/gdeep/utility_examples/fsdp.py @@ -0,0 +1,28 @@ +import enum +from torch.distributed.fsdp import ShardingStrategy + + +class ShardingStrategyEx(enum.Enum): + FULL_SHARD = enum.auto() + SHARD_GRAD_OP = enum.auto() + NO_SHARD = enum.auto() + + def __str__(self): + return self.name + + @staticmethod + def from_str(s: str): + try: + return ShardingStrategyEx[s] + except KeyError: + raise ValueError(f"Unknown {s}") + + def to_sharding_strategy(self) -> ShardingStrategy: + if self is ShardingStrategyEx.FULL_SHARD: + return ShardingStrategy.FULL_SHARD + elif self is ShardingStrategyEx.SHARD_GRAD_OP: + return ShardingStrategy.SHARD_GRAD_OP + elif self is ShardingStrategyEx.NO_SHARD: + return ShardingStrategy.NO_SHARD + else: + raise ValueError(f"Unknown {self}") diff --git a/requirements.txt b/requirements.txt index 16937e01..d490a510 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,16 +5,16 @@ plotly matplotlib<=3.6.3 matplotlib_venn scikit-learn==1.1.1 -torch==1.12.1 +torch==1.13.1 spacy scipy networkx torch-tb-profiler torchdiffeq torchensemble -torchtext -torchvision -torchdata +torchtext==0.14.1 +torchvision==0.14.1 +torchdata==0.5.1 transformers einops tensorboard @@ -41,3 +41,4 @@ jsonpickle typing_extensions; python_version == '3.7' gudhi pre-commit +pipeline-tool