Dynamic Resource Scheduler for Distributed Deep Learning Training on Kubeflow
Distributed deep learning is a method of machine learning that is used today due to its many advantages. One of the many tools used to train distributed deep learning model is Kubeflow, which runs on top of Kubernetes. Kubeflow provides utility and workflows to ease the process of managing, training, and deploying machine learning models. Kubeflow use tf-operator as a custom controller to manage job scheduling. I tried to improve the scheduling capability of Kubeflow by developing a custom controller inspired by DRAGON and OASIS to replace the existing tf-operator.
Distributed Machine Learning
Differently than with a centralized machine learning, distributed machine separate learning process into two entities, parameter server(s) and workers. You can distribute the data in smaller chunks to each of the workers you have, then each worker will be tasked to calculate the partial gradient and send it back to the parameter server(s). The parameter server(s) then would calculate a new global parameter asynchronously before the next iteration start.
Kubeflow on Kubernetes
Kubeflow runs on top of Kubernetes. Kubernetes is a orchestrator for containerized workloads and services. It can be used as a platform to train models by utilizing a custom resource declaration of TFJob object to represent a training job. Each TFJob holds informations as to how many pods will be deployed for that particular job, similar to how Deployment works. The difference is, Job terminate as soon as the job has been finished.
Dynamic Scheduler Basic Ideas
The scheduler uses 2 basic ideas to improve the scheduling capability for jobs on Kubeflow.
- Gang Scheduling
- Priority Queueing
Due to how Kubernetes’ scheduler work, it will load balance the pods deployed for a job on multiple nodes. This is all well and good for you normal applications, but one of the biggest disadvantage of distributed machine learning is the overhead time needed for parameter server(s) and workers to communicate and synchronize.
Gang scheduling is a scheduling methodology that group all workers to a single node. It will minimize the amount of communication overhead needed by each job, hence increasing the overall training time.
Minimum and maximum number of worker replicas are predetermined on the configuration file. With autoscaling, the scheduler can dictate the number of worker deployed for every running job on the cluster. Intuitively, we can assume that by increasing a number of worker, it would decrease the training time (although this is not always true, there is always a tipping point where increasing the number of worker will only increase the training time due to overhead).
Prioritizing a certain job over another could be beneficial, since we can control which job to run first. This, combined with autoscaling, can be used to control which jobs to be upscaled, and which to downscale.
Jobs that enters the system would be inserted to the queue based on its priority. The scheduler then checks whether there are high priority jobs present in the queue. If there aren’t any, it will schedule the first feasible job (Adapted First Come First Served) utilizing gang scheduling. Then it will call scale up function to maximize the number of workers. If there were actually high priority jobs in the queue, it will schedule the high priority job first. If there were no available resource for the high priority job to be scheduled, the scheduler would call the scale down function to create more room for the high priority job. If the scale down function was successful, it would schedule the high priority job. If not, then it would call the scale up function to maximize the residual resource that are left unused.
Weight is given to jobs using a simple algorithm based on its priority and resource requirements. Jobs requiring high CPU and memory are more prioritized than jobs requiring low CPU and memory. Scheduler also take into account jobs that have been assigned high priority by users, regardless of its resource requirements.
INPUT: Job queue Q, job J
OUTPUT: Job queue Q with job J 1: if len(Q) == 0 do
3: return Q
4: end if
5: Initialize R = J CPU requirement + J Memory requirement
6: Initialize U = J Utility
7: for q in Q do
8: Initialize r = q CPU requirement + q Memory requirement
9: Initialize u = q Utility
10: if (U > u) or (U == u and R > r) do
11: Initialize F = queue of job so that U > u
12: Initialize f = queue of job so that U <= u
16: return Q
17: end if
18: end for
20: return Q
The scheduler will iterate every node in the cluster to find node that can satisfy all parameter server(s) and workers. If it can’t find such node, it will opt to schedule the workers in a regular manner.
INPUT: Pod requests PR, Node Resources NR 1: for n in NR do
2: Initialize stop = False
3: Initialize oneNodeOk = True
4: for r in PR do
5: if (n free CPU < r required CPU) or (n free
6: Memory < r required Memory) do
7: oneNodeOk = False
8: stop = True
9: n free CPU -= r required CPU
10: n free Memory -= r required Memory
11: end if
11: end for
12: if stop do break
13: end for
14: if oneNodeOk do schedule all r in one node
15: else do schedule all r in the first available node
The scheduler will iterate every jobs that are currently running and reducing the amount of worker of every job that has lower weight than the high priority job that will be scheduled. It will decrease the number of worker until:
- The available resource can satisfy the high priority job → success scale-down.
- The number of worker of every job with lower priority has been reduced its minimum value → failed scale-down.
INPUT: High priority job J, running queue Q, node resources NR OUTPUT: Success S1: for q in Q do
2: Initialize i = 0
3: Initialize maxDeleteCount = q current replica count –
4: q minimum replica count
5: Initialize stop = False
6: for n in NR do
7: if J can be scheduled in n do
8: Initialize S = True
9: return S
10: end if
11: if i >= maxDeleteCount do
12: Initialize stop = True
14: end if
15: Initialize res = node resources of n
16: res free CPU += q required CPU
17: res free Memory += q required Memory
19: if stop do break
20: end for
21: end for
22: if J can be scheduled in n do
23: Initialize S = True
24: return S
25: end if
26: return S
The scheduler will iterate every job that are currently running and increase the amount of worker of each job to its maximum value.
INPUT: Running queue Q, node resources NR
OUTPUT: Success S1: Initialize i = 0
2: Initialize S = False
3: for n in NR do
4: for q in Q do
5: Initialize maxScaleUpNum = q maximum replica count
6: - q current replica count
7: Initialize r = q replica request for worker
8: for j -> maxScaleUpNum do
9: if (n free CPU < r required CPU) or (n free
10: Memory < r required Memory) do
11: Initialize stop = True
13: end if
14: end for
15: n free CPU -= r required CPU
16: n free Memory -= r required Memory
17: initialize one new worker in cluster
18: Initialize S = True
19: end for
20: if stop do break
21: if S do break
22: return S
Implements the architecture mentioned above.
INPUT: Waiting queue W, running queue R, node resources NR 1: Initialize scaleDownFlag = False
2: if len(W) > 0 do
3: Initialize U = max waiting job utility
4: Initialize u = max running job utility
5: if U > u do
6: Initialize h = first job in W
7: if h can be scheduled do
10: end if
11: run ScaleDown function
12: if ScaleDown is successful do
13: Initialize scaleDownFlag = True
14: end if
15: end if
16: end if
17: if h == nil or scaleDownFlag do
18: for w in W do
19: if w can be scheduled do
23: end if
24: end for
25: end if
26: run ScaleUp function
I did 3 simple experiments to see how the scheduler will compare with other scheduler: Dragon and tf-operator. The experiment is done on Google Kubernetes Engine with 3 nodes. Each node has 8 vCPU and 30 GB Memory, totalling to 24 vCPU and 90 GB Memory in the cluster. Each experiment is done using 4 jobs, with specifications mentioned on the table below.
In the experiments I conducted, it is revealed that the implemented scheduler has a better performance for every experiments than the others. The scheduler excels in an environment where every jobs scheduled have low resource requirements in comparison to the available resource in the cluster. This is due to how the scheduler maximize the utilization of scale-up function to decrease the training time of jobs.
Implementation of dynamic resource scheduler by combining DRAGON’s and OASIS’ approach that is a weighted autoscaling scheduler with gang scheduling capabilities resulted in an increase of speed for deep learning training. The conducted experiments show an increase of up to 26.56% of training speed due to the weighted autoscaling and gang scheduling functionalities.