From e9c6f134498fb0c33758fe732e5cd1a51268643d Mon Sep 17 00:00:00 2001 From: Matvey Volkov Date: Wed, 20 May 2020 18:34:37 +0300 Subject: [PATCH 1/5] jupyter basic --- src/main.go | 2 ++ src/task/jupyter/handler.go | 56 +++++++++++++++++++++++++++++++++++++ src/task/jupyter/jupyter.go | 1 - src/task/task.go | 6 ++-- 4 files changed, 61 insertions(+), 4 deletions(-) create mode 100644 src/task/jupyter/handler.go delete mode 100644 src/task/jupyter/jupyter.go diff --git a/src/main.go b/src/main.go index caf1688..412faf8 100644 --- a/src/main.go +++ b/src/main.go @@ -10,6 +10,7 @@ import ( "github.com/Web-networks/execution-system/kube" "github.com/Web-networks/execution-system/task" "github.com/Web-networks/execution-system/task/applying" + "github.com/Web-networks/execution-system/task/jupyter" "github.com/Web-networks/execution-system/task/learning" "github.com/gocraft/web" ) @@ -21,6 +22,7 @@ func main() { taskManager := task.CreateManagerFromKubernetesState( learning.NewHandler(kubeClient, conf), applying.NewHandler(kubeClient), + jupyter.NewHandler(kubeClient), ) router := web.New(Context{}) diff --git a/src/task/jupyter/handler.go b/src/task/jupyter/handler.go new file mode 100644 index 0000000..149dd78 --- /dev/null +++ b/src/task/jupyter/handler.go @@ -0,0 +1,56 @@ +package jupyter + +import ( + "github.com/Web-networks/execution-system/kube" + "github.com/Web-networks/execution-system/task" + "github.com/Web-networks/execution-system/task/basehandlers" + batchv1 "k8s.io/api/batch/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func NewHandler(kubeClient kube.Client) task.TaskTypeHandler { + return basehandlers.NewBatchHandler(kubeClient, &JupyterTaskSpecification{}) +} + +type JupyterTaskSpecification struct{} + +var _ basehandlers.TaskSpecification = (*JupyterTaskSpecification)(nil) + +func (spec *JupyterTaskSpecification) Type() task.TaskType { + return task.JupyterType +} + +func (spec *JupyterTaskSpecification) GenerateWorkload(t *task.Task) interface{} { + return &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: t.KubeJobName(), + Labels: map[string]string{ + task.ManagedByLabel: task.ManagedByValue, + task.TaskTypeLabel: task.JupyterType, + task.TaskIDLabel: t.ID, + }, + }, + Spec: batchv1.JobSpec{ + Template: v1.PodTemplateSpec{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "jupyter", + Image: "busybox", + Command: []string{"sleep", "30"}, // sleep for 30 seconds + Ports: []v1.ContainerPort{ + { + Name: "http", + Protocol: v1.ProtocolTCP, + ContainerPort: 80, + }, + }, + }, + }, + RestartPolicy: v1.RestartPolicyNever, + }, + }, + }, + } +} diff --git a/src/task/jupyter/jupyter.go b/src/task/jupyter/jupyter.go deleted file mode 100644 index 4259b01..0000000 --- a/src/task/jupyter/jupyter.go +++ /dev/null @@ -1 +0,0 @@ -package jupyter diff --git a/src/task/task.go b/src/task/task.go index 4446684..e269083 100644 --- a/src/task/task.go +++ b/src/task/task.go @@ -15,9 +15,9 @@ const TaskIDLabel = "bigone.demist.ru/task-id" type TaskType = string const ( - LearningType = "learning" - ApplyingType = "applying" - RemoteJupyterType = "jupyter" + LearningType = "learning" + ApplyingType = "applying" + JupyterType = "jupyter" ) type TaskState = string From 799affde7141b00779bf069050c6aaa3de943cbc Mon Sep 17 00:00:00 2001 From: Matvey Volkov Date: Thu, 21 May 2020 01:02:21 +0300 Subject: [PATCH 2/5] finally added deployment type --- src/kube/client.go | 20 ++++ src/task/basehandlers/deployment_handler.go | 114 ++++++++++++++++++++ src/task/jupyter/handler.go | 31 +++--- src/task/task.go | 4 + 4 files changed, 157 insertions(+), 12 deletions(-) create mode 100644 src/task/basehandlers/deployment_handler.go diff --git a/src/kube/client.go b/src/kube/client.go index 7e3c566..dcec87d 100644 --- a/src/kube/client.go +++ b/src/kube/client.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" + apps "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" @@ -16,6 +17,9 @@ type Client interface { RunBatchJob(job *batchv1.Job) error WatchBatchJobs(options v1.ListOptions) (watch.Interface, error) GetBatchJobs(options v1.ListOptions) (*batchv1.JobList, error) + RunDeployment(deployment *apps.Deployment) error + WatchDeployments(options v1.ListOptions) (watch.Interface, error) + GetDeployments(options v1.ListOptions) (*apps.DeploymentList, error) } type client struct { @@ -53,3 +57,19 @@ func (c *client) GetBatchJobs(options v1.ListOptions) (*batchv1.JobList, error) jobClient := c.clientset.BatchV1().Jobs("default") return jobClient.List(options) } + +func (c *client) RunDeployment(deployment *apps.Deployment) error { + deploymentClient := c.clientset.AppsV1().Deployments("default") + _, err := deploymentClient.Create(deployment) + return err +} + +func (c *client) WatchDeployments(options v1.ListOptions) (watch.Interface, error) { + deploymentClient := c.clientset.AppsV1().Deployments("default") + return deploymentClient.Watch(options) +} + +func (c *client) GetDeployments(options v1.ListOptions) (*apps.DeploymentList, error) { + deploymentClient := c.clientset.AppsV1().Deployments("default") + return deploymentClient.List(options) +} diff --git a/src/task/basehandlers/deployment_handler.go b/src/task/basehandlers/deployment_handler.go new file mode 100644 index 0000000..280976e --- /dev/null +++ b/src/task/basehandlers/deployment_handler.go @@ -0,0 +1,114 @@ +package basehandlers + +import ( + "fmt" + "log" + + "github.com/Web-networks/execution-system/kube" + "github.com/Web-networks/execution-system/task" + apps "k8s.io/api/apps/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" +) + +type DeploymentHandler struct { + kubeClient kube.Client + spec TaskSpecification +} + +var _ task.TaskTypeHandler = (*DeploymentHandler)(nil) + +func NewDeploymentHandler(kubeClient kube.Client, spec TaskSpecification) *DeploymentHandler { + return &DeploymentHandler{ + kubeClient: kubeClient, + spec: spec, + } +} + +func (h *DeploymentHandler) Type() task.TaskType { + return h.spec.Type() +} + +func (h *DeploymentHandler) RestoreTasks() ([]*task.Task, error) { + deployments, err := h.restoreDeploymentsFromKube() + if err != nil { + return nil, err + } + return h.tasksFromDeployments(deployments), nil +} + +func (h *DeploymentHandler) tasksFromDeployments(deployments *apps.DeploymentList) []*task.Task { + var tasks []*task.Task + for _, deployment := range deployments.Items { + t := &task.Task{ + ID: idFromKubeDeployment(&deployment), + Type: typeFromKubeDeployment(&deployment), + } + t.SetState(stateFromKubeDeployment(&deployment)) + log.Printf("task restored = %v", t) // todo: clean logs + tasks = append(tasks, t) + } + return tasks +} + +func (h *DeploymentHandler) restoreDeploymentsFromKube() (*apps.DeploymentList, error) { + return h.kubeClient.GetDeployments(v1.ListOptions{ + // TODO: add managed-by + LabelSelector: fmt.Sprintf("%s=%s", task.TaskTypeLabel, h.spec.Type()), + }) +} + +func idFromKubeDeployment(deployment *apps.Deployment) string { + labels := deployment.ObjectMeta.GetObjectMeta().GetLabels() + return labels[task.TaskIDLabel] +} + +func typeFromKubeDeployment(deployment *apps.Deployment) task.TaskType { + labels := deployment.ObjectMeta.GetObjectMeta().GetLabels() + return labels[task.TaskTypeLabel] +} + +func stateFromKubeDeployment(deployment *apps.Deployment) task.TaskState { + log.Printf("status: available=%v, replicas=%v, unavailable=%v", + deployment.Status.AvailableReplicas, + deployment.Status.Replicas, + deployment.Status.UnavailableReplicas) // todo: clean logs + if deployment.Status.AvailableReplicas == deployment.Status.Replicas { + return task.Running + } else { + return task.Initializing + } +} + +func (h *DeploymentHandler) WatchTasks(cb task.OnTaskStateModifiedCallback) { + tasksWatcher, err := h.kubeClient.WatchDeployments(v1.ListOptions{ + // TODO: add managed-by + LabelSelector: fmt.Sprintf("%s=%s", task.TaskTypeLabel, h.spec.Type()), + }) + if err != nil { + panic(fmt.Sprintf("failed to start watching for %s tasks", h.spec.Type())) + } + + go func() { + log.Printf("watcher: start watching!") + for event := range tasksWatcher.ResultChan() { + switch event.Type { + case watch.Modified: + deployment := event.Object.DeepCopyObject().(*apps.Deployment) + + taskID := idFromKubeDeployment(deployment) + newState := stateFromKubeDeployment(deployment) + cb(taskID, newState) + } + } + }() +} + +func (h *DeploymentHandler) Run(task *task.Task) error { + workload := h.spec.GenerateWorkload(task).(*apps.Deployment) + err := h.kubeClient.RunDeployment(workload) + if err != nil { + return err + } + return nil +} diff --git a/src/task/jupyter/handler.go b/src/task/jupyter/handler.go index 149dd78..d53aaa4 100644 --- a/src/task/jupyter/handler.go +++ b/src/task/jupyter/handler.go @@ -4,13 +4,13 @@ import ( "github.com/Web-networks/execution-system/kube" "github.com/Web-networks/execution-system/task" "github.com/Web-networks/execution-system/task/basehandlers" - batchv1 "k8s.io/api/batch/v1" + apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func NewHandler(kubeClient kube.Client) task.TaskTypeHandler { - return basehandlers.NewBatchHandler(kubeClient, &JupyterTaskSpecification{}) + return basehandlers.NewDeploymentHandler(kubeClient, &JupyterTaskSpecification{}) } type JupyterTaskSpecification struct{} @@ -22,21 +22,28 @@ func (spec *JupyterTaskSpecification) Type() task.TaskType { } func (spec *JupyterTaskSpecification) GenerateWorkload(t *task.Task) interface{} { - return &batchv1.Job{ + labels := map[string]string{ + task.ManagedByLabel: task.ManagedByValue, + task.TaskTypeLabel: task.JupyterType, + task.TaskIDLabel: t.ID, + } + return &apps.Deployment{ ObjectMeta: metav1.ObjectMeta{ - Name: t.KubeJobName(), - Labels: map[string]string{ - task.ManagedByLabel: task.ManagedByValue, - task.TaskTypeLabel: task.JupyterType, - task.TaskIDLabel: t.ID, - }, + Name: t.KubeDeploymentName(), + Labels: labels, }, - Spec: batchv1.JobSpec{ + Spec: apps.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: labels, + }, Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + }, Spec: v1.PodSpec{ Containers: []v1.Container{ { - Name: "jupyter", + Name: t.Type, Image: "busybox", Command: []string{"sleep", "30"}, // sleep for 30 seconds Ports: []v1.ContainerPort{ @@ -48,7 +55,7 @@ func (spec *JupyterTaskSpecification) GenerateWorkload(t *task.Task) interface{} }, }, }, - RestartPolicy: v1.RestartPolicyNever, + RestartPolicy: v1.RestartPolicyAlways, }, }, }, diff --git a/src/task/task.go b/src/task/task.go index e269083..e6f1afe 100644 --- a/src/task/task.go +++ b/src/task/task.go @@ -59,3 +59,7 @@ func (t *Task) SetState(state TaskState) { func (t *Task) KubeJobName() string { return fmt.Sprintf("%s-%s", t.Type, t.ID) } + +func (t *Task) KubeDeploymentName() string { + return fmt.Sprintf("%s-%s", t.Type, t.ID) +} From 4b1069cf3bf253c3425ea792a6b9a4abdc889408 Mon Sep 17 00:00:00 2001 From: Matvey Volkov Date: Thu, 21 May 2020 10:43:46 +0300 Subject: [PATCH 3/5] minor ref --- src/task/applying/handler.go | 2 +- src/task/basehandlers/deployment_handler.go | 5 ----- src/task/jupyter/handler.go | 2 +- src/task/learning/handler.go | 2 +- src/task/task.go | 6 +----- 5 files changed, 4 insertions(+), 13 deletions(-) diff --git a/src/task/applying/handler.go b/src/task/applying/handler.go index b78b3b6..893b143 100644 --- a/src/task/applying/handler.go +++ b/src/task/applying/handler.go @@ -24,7 +24,7 @@ func (spec *ApplyingTaskSpecification) Type() task.TaskType { func (spec *ApplyingTaskSpecification) GenerateWorkload(t *task.Task) interface{} { return &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ - Name: t.KubeJobName(), + Name: t.KubeWorkloadName(), Labels: map[string]string{ task.ManagedByLabel: task.ManagedByValue, task.TaskTypeLabel: task.ApplyingType, diff --git a/src/task/basehandlers/deployment_handler.go b/src/task/basehandlers/deployment_handler.go index 280976e..eddd353 100644 --- a/src/task/basehandlers/deployment_handler.go +++ b/src/task/basehandlers/deployment_handler.go @@ -45,7 +45,6 @@ func (h *DeploymentHandler) tasksFromDeployments(deployments *apps.DeploymentLis Type: typeFromKubeDeployment(&deployment), } t.SetState(stateFromKubeDeployment(&deployment)) - log.Printf("task restored = %v", t) // todo: clean logs tasks = append(tasks, t) } return tasks @@ -69,10 +68,6 @@ func typeFromKubeDeployment(deployment *apps.Deployment) task.TaskType { } func stateFromKubeDeployment(deployment *apps.Deployment) task.TaskState { - log.Printf("status: available=%v, replicas=%v, unavailable=%v", - deployment.Status.AvailableReplicas, - deployment.Status.Replicas, - deployment.Status.UnavailableReplicas) // todo: clean logs if deployment.Status.AvailableReplicas == deployment.Status.Replicas { return task.Running } else { diff --git a/src/task/jupyter/handler.go b/src/task/jupyter/handler.go index d53aaa4..6943c67 100644 --- a/src/task/jupyter/handler.go +++ b/src/task/jupyter/handler.go @@ -29,7 +29,7 @@ func (spec *JupyterTaskSpecification) GenerateWorkload(t *task.Task) interface{} } return &apps.Deployment{ ObjectMeta: metav1.ObjectMeta{ - Name: t.KubeDeploymentName(), + Name: t.KubeWorkloadName(), Labels: labels, }, Spec: apps.DeploymentSpec{ diff --git a/src/task/learning/handler.go b/src/task/learning/handler.go index 511db03..6b492f3 100644 --- a/src/task/learning/handler.go +++ b/src/task/learning/handler.go @@ -29,7 +29,7 @@ func (spec *LearningTaskSpecification) Type() task.TaskType { func (spec *LearningTaskSpecification) GenerateWorkload(t *task.Task) interface{} { return &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ - Name: t.KubeJobName(), + Name: t.KubeWorkloadName(), Labels: map[string]string{ task.ManagedByLabel: task.ManagedByValue, task.TaskTypeLabel: task.LearningType, diff --git a/src/task/task.go b/src/task/task.go index e6f1afe..4f63160 100644 --- a/src/task/task.go +++ b/src/task/task.go @@ -56,10 +56,6 @@ func (t *Task) SetState(state TaskState) { atomic.StorePointer(&t.state, unsafe.Pointer(&state)) } -func (t *Task) KubeJobName() string { - return fmt.Sprintf("%s-%s", t.Type, t.ID) -} - -func (t *Task) KubeDeploymentName() string { +func (t *Task) KubeWorkloadName() string { return fmt.Sprintf("%s-%s", t.Type, t.ID) } From 27bdccd5ff138d7cdb1a318d3da87d4ad291e36f Mon Sep 17 00:00:00 2001 From: Matvey Volkov Date: Thu, 21 May 2020 21:21:40 +0300 Subject: [PATCH 4/5] deployment for jupyter image, service --- src/kube/client.go | 35 ++++++----- src/task/basehandlers/batch_handler.go | 24 ++++---- src/task/basehandlers/deployment_handler.go | 25 ++++++-- src/task/basehandlers/task_specification.go | 5 ++ src/task/jupyter/handler.go | 65 ++++++++++++++++----- 5 files changed, 106 insertions(+), 48 deletions(-) diff --git a/src/kube/client.go b/src/kube/client.go index dcec87d..946f6a7 100644 --- a/src/kube/client.go +++ b/src/kube/client.go @@ -3,23 +3,23 @@ package kube import ( "errors" "fmt" - apps "k8s.io/api/apps/v1" - batchv1 "k8s.io/api/batch/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + batch "k8s.io/api/batch/v1" + core "k8s.io/api/core/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" ) type Client interface { - RunBatchJob(job *batchv1.Job) error - WatchBatchJobs(options v1.ListOptions) (watch.Interface, error) - GetBatchJobs(options v1.ListOptions) (*batchv1.JobList, error) + RunBatchJob(job *batch.Job) error + WatchBatchJobs(options meta.ListOptions) (watch.Interface, error) + GetBatchJobs(options meta.ListOptions) (*batch.JobList, error) RunDeployment(deployment *apps.Deployment) error - WatchDeployments(options v1.ListOptions) (watch.Interface, error) - GetDeployments(options v1.ListOptions) (*apps.DeploymentList, error) + WatchDeployments(options meta.ListOptions) (watch.Interface, error) + GetDeployments(options meta.ListOptions) (*apps.DeploymentList, error) + CreateService(service *core.Service) (*core.Service, error) } type client struct { @@ -42,18 +42,18 @@ func NewClient(kubeConfigPath string) *client { } } -func (c *client) RunBatchJob(job *batchv1.Job) error { +func (c *client) RunBatchJob(job *batch.Job) error { jobClient := c.clientset.BatchV1().Jobs("default") _, err := jobClient.Create(job) return err } -func (c *client) WatchBatchJobs(options v1.ListOptions) (watch.Interface, error) { +func (c *client) WatchBatchJobs(options meta.ListOptions) (watch.Interface, error) { jobClient := c.clientset.BatchV1().Jobs("default") return jobClient.Watch(options) } -func (c *client) GetBatchJobs(options v1.ListOptions) (*batchv1.JobList, error) { +func (c *client) GetBatchJobs(options meta.ListOptions) (*batch.JobList, error) { jobClient := c.clientset.BatchV1().Jobs("default") return jobClient.List(options) } @@ -64,12 +64,19 @@ func (c *client) RunDeployment(deployment *apps.Deployment) error { return err } -func (c *client) WatchDeployments(options v1.ListOptions) (watch.Interface, error) { +func (c *client) WatchDeployments(options meta.ListOptions) (watch.Interface, error) { deploymentClient := c.clientset.AppsV1().Deployments("default") return deploymentClient.Watch(options) } -func (c *client) GetDeployments(options v1.ListOptions) (*apps.DeploymentList, error) { +func (c *client) GetDeployments(options meta.ListOptions) (*apps.DeploymentList, error) { deploymentClient := c.clientset.AppsV1().Deployments("default") return deploymentClient.List(options) } + +func (c *client) CreateService(service *core.Service) (*core.Service, error) { + serviceClient := c.clientset.CoreV1().Services("default") + createdService, err := serviceClient.Create(service) + //log.Print(createdService.Spec.Ports[0].NodePort) + return createdService, err +} diff --git a/src/task/basehandlers/batch_handler.go b/src/task/basehandlers/batch_handler.go index 1899770..444fa53 100644 --- a/src/task/basehandlers/batch_handler.go +++ b/src/task/basehandlers/batch_handler.go @@ -6,8 +6,8 @@ import ( "github.com/Web-networks/execution-system/kube" "github.com/Web-networks/execution-system/task" - batchv1 "k8s.io/api/batch/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + batch "k8s.io/api/batch/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" ) @@ -37,7 +37,7 @@ func (h *BatchWorkloadTaskTypeHandler) RestoreTasks() ([]*task.Task, error) { return h.tasksFromJobs(jobs), nil } -func (h *BatchWorkloadTaskTypeHandler) tasksFromJobs(jobs *batchv1.JobList) []*task.Task { +func (h *BatchWorkloadTaskTypeHandler) tasksFromJobs(jobs *batch.JobList) []*task.Task { var tasks []*task.Task for _, job := range jobs.Items { tasks = append(tasks, newTaskFromWorkload(&job)) @@ -45,14 +45,14 @@ func (h *BatchWorkloadTaskTypeHandler) tasksFromJobs(jobs *batchv1.JobList) []*t return tasks } -func (h *BatchWorkloadTaskTypeHandler) restoreJobsFromKube() (*batchv1.JobList, error) { - return h.kubeClient.GetBatchJobs(v1.ListOptions{ +func (h *BatchWorkloadTaskTypeHandler) restoreJobsFromKube() (*batch.JobList, error) { + return h.kubeClient.GetBatchJobs(meta.ListOptions{ // TODO: add managed-by LabelSelector: fmt.Sprintf("%s=%s", task.TaskTypeLabel, h.spec.Type()), }) } -func newTaskFromWorkload(job *batchv1.Job) *task.Task { +func newTaskFromWorkload(job *batch.Job) *task.Task { t := &task.Task{ ID: idFromKubeJob(job), Type: typeFromKubeJob(job), @@ -61,17 +61,17 @@ func newTaskFromWorkload(job *batchv1.Job) *task.Task { return t } -func idFromKubeJob(job *batchv1.Job) string { +func idFromKubeJob(job *batch.Job) string { labels := job.ObjectMeta.GetObjectMeta().GetLabels() return labels[task.TaskIDLabel] } -func typeFromKubeJob(job *batchv1.Job) task.TaskType { +func typeFromKubeJob(job *batch.Job) task.TaskType { labels := job.ObjectMeta.GetObjectMeta().GetLabels() return labels[task.TaskTypeLabel] } -func stateFromKubeJob(job *batchv1.Job) task.TaskState { +func stateFromKubeJob(job *batch.Job) task.TaskState { if job.Status.Failed > 0 { return task.Failed } else if job.Status.Active > 0 { @@ -84,7 +84,7 @@ func stateFromKubeJob(job *batchv1.Job) task.TaskState { } func (h *BatchWorkloadTaskTypeHandler) WatchTasks(cb task.OnTaskStateModifiedCallback) { - tasksWatcher, err := h.kubeClient.WatchBatchJobs(v1.ListOptions{ + tasksWatcher, err := h.kubeClient.WatchBatchJobs(meta.ListOptions{ // TODO: add managed-by LabelSelector: fmt.Sprintf("%s=%s", task.TaskTypeLabel, h.spec.Type()), }) @@ -97,7 +97,7 @@ func (h *BatchWorkloadTaskTypeHandler) WatchTasks(cb task.OnTaskStateModifiedCal for event := range tasksWatcher.ResultChan() { switch event.Type { case watch.Modified: - job := event.Object.DeepCopyObject().(*batchv1.Job) + job := event.Object.DeepCopyObject().(*batch.Job) taskID := idFromKubeJob(job) newState := stateFromKubeJob(job) @@ -110,7 +110,7 @@ func (h *BatchWorkloadTaskTypeHandler) WatchTasks(cb task.OnTaskStateModifiedCal } func (h *BatchWorkloadTaskTypeHandler) Run(task *task.Task) error { - workload := h.spec.GenerateWorkload(task).(*batchv1.Job) + workload := h.spec.GenerateWorkload(task).(*batch.Job) err := h.kubeClient.RunBatchJob(workload) if err != nil { return err diff --git a/src/task/basehandlers/deployment_handler.go b/src/task/basehandlers/deployment_handler.go index eddd353..001a5e0 100644 --- a/src/task/basehandlers/deployment_handler.go +++ b/src/task/basehandlers/deployment_handler.go @@ -7,18 +7,19 @@ import ( "github.com/Web-networks/execution-system/kube" "github.com/Web-networks/execution-system/task" apps "k8s.io/api/apps/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + core "k8s.io/api/core/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" ) type DeploymentHandler struct { kubeClient kube.Client - spec TaskSpecification + spec TaskWithServiceSpecification } var _ task.TaskTypeHandler = (*DeploymentHandler)(nil) -func NewDeploymentHandler(kubeClient kube.Client, spec TaskSpecification) *DeploymentHandler { +func NewDeploymentHandler(kubeClient kube.Client, spec TaskWithServiceSpecification) *DeploymentHandler { return &DeploymentHandler{ kubeClient: kubeClient, spec: spec, @@ -51,7 +52,7 @@ func (h *DeploymentHandler) tasksFromDeployments(deployments *apps.DeploymentLis } func (h *DeploymentHandler) restoreDeploymentsFromKube() (*apps.DeploymentList, error) { - return h.kubeClient.GetDeployments(v1.ListOptions{ + return h.kubeClient.GetDeployments(meta.ListOptions{ // TODO: add managed-by LabelSelector: fmt.Sprintf("%s=%s", task.TaskTypeLabel, h.spec.Type()), }) @@ -76,7 +77,7 @@ func stateFromKubeDeployment(deployment *apps.Deployment) task.TaskState { } func (h *DeploymentHandler) WatchTasks(cb task.OnTaskStateModifiedCallback) { - tasksWatcher, err := h.kubeClient.WatchDeployments(v1.ListOptions{ + tasksWatcher, err := h.kubeClient.WatchDeployments(meta.ListOptions{ // TODO: add managed-by LabelSelector: fmt.Sprintf("%s=%s", task.TaskTypeLabel, h.spec.Type()), }) @@ -101,9 +102,21 @@ func (h *DeploymentHandler) WatchTasks(cb task.OnTaskStateModifiedCallback) { func (h *DeploymentHandler) Run(task *task.Task) error { workload := h.spec.GenerateWorkload(task).(*apps.Deployment) - err := h.kubeClient.RunDeployment(workload) + service := h.spec.GenerateService(task).(*core.Service) + + createdService, err := h.kubeClient.CreateService(service) if err != nil { + log.Print(err) return err + } else { + log.Printf("created service with NodePort=%v", createdService.Spec.Ports[0].NodePort) } + + err = h.kubeClient.RunDeployment(workload) + if err != nil { + log.Print(err) + return err + } + return nil } diff --git a/src/task/basehandlers/task_specification.go b/src/task/basehandlers/task_specification.go index 1d51f5c..0664796 100644 --- a/src/task/basehandlers/task_specification.go +++ b/src/task/basehandlers/task_specification.go @@ -6,3 +6,8 @@ type TaskSpecification interface { Type() task.TaskType GenerateWorkload(t *task.Task) interface{} } +type TaskWithServiceSpecification interface { + Type() task.TaskType + GenerateWorkload(t *task.Task) interface{} + GenerateService(t *task.Task) interface{} +} diff --git a/src/task/jupyter/handler.go b/src/task/jupyter/handler.go index 6943c67..19a6b32 100644 --- a/src/task/jupyter/handler.go +++ b/src/task/jupyter/handler.go @@ -5,8 +5,8 @@ import ( "github.com/Web-networks/execution-system/task" "github.com/Web-networks/execution-system/task/basehandlers" apps "k8s.io/api/apps/v1" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + core "k8s.io/api/core/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" ) func NewHandler(kubeClient kube.Client) task.TaskTypeHandler { @@ -15,7 +15,7 @@ func NewHandler(kubeClient kube.Client) task.TaskTypeHandler { type JupyterTaskSpecification struct{} -var _ basehandlers.TaskSpecification = (*JupyterTaskSpecification)(nil) +var _ basehandlers.TaskWithServiceSpecification = (*JupyterTaskSpecification)(nil) func (spec *JupyterTaskSpecification) Type() task.TaskType { return task.JupyterType @@ -28,34 +28,67 @@ func (spec *JupyterTaskSpecification) GenerateWorkload(t *task.Task) interface{} task.TaskIDLabel: t.ID, } return &apps.Deployment{ - ObjectMeta: metav1.ObjectMeta{ + ObjectMeta: meta.ObjectMeta{ Name: t.KubeWorkloadName(), Labels: labels, }, Spec: apps.DeploymentSpec{ - Selector: &metav1.LabelSelector{ + Selector: &meta.LabelSelector{ MatchLabels: labels, }, - Template: v1.PodTemplateSpec{ - ObjectMeta: metav1.ObjectMeta{ + Template: core.PodTemplateSpec{ + ObjectMeta: meta.ObjectMeta{ Labels: labels, }, - Spec: v1.PodSpec{ - Containers: []v1.Container{ + Spec: core.PodSpec{ + Containers: []core.Container{ { - Name: t.Type, - Image: "busybox", - Command: []string{"sleep", "30"}, // sleep for 30 seconds - Ports: []v1.ContainerPort{ + Name: t.Type, + Image: "networksidea/image_generation:jupyter-1.0.5", + Ports: []core.ContainerPort{ { Name: "http", - Protocol: v1.ProtocolTCP, - ContainerPort: 80, + Protocol: core.ProtocolTCP, + ContainerPort: 8888, }, }, + Env: []core.EnvVar{ + { + Name: "JUPYTER_TOKEN", + Value: "abcd", + }, + }, + }, + }, + RestartPolicy: core.RestartPolicyAlways, + ImagePullSecrets: []core.LocalObjectReference{ + { + Name: "dockerpullsecrets", }, }, - RestartPolicy: v1.RestartPolicyAlways, + }, + }, + }, + } +} + +func (spec *JupyterTaskSpecification) GenerateService(t *task.Task) interface{} { + labels := map[string]string{ + task.ManagedByLabel: task.ManagedByValue, + task.TaskTypeLabel: task.JupyterType, + task.TaskIDLabel: t.ID, + } + return &core.Service{ + ObjectMeta: meta.ObjectMeta{ + Name: t.KubeWorkloadName(), + Labels: labels, + }, + Spec: core.ServiceSpec{ + Selector: labels, + Type: "NodePort", + Ports: []core.ServicePort{ + { + Port: 8888, }, }, }, From 19d4fa7b9b0e8e5f0a4f1a98892f8af9a466ad21 Mon Sep 17 00:00:00 2001 From: salamantos Date: Tue, 16 Jun 2020 10:22:27 +0300 Subject: [PATCH 5/5] return data for task creation endpoint --- src/endpoints.go | 10 +++++++++- src/task/basehandlers/batch_handler.go | 6 +++--- src/task/basehandlers/deployment_handler.go | 8 ++++---- src/task/handler.go | 2 +- src/task/manager.go | 12 ++++++------ 5 files changed, 23 insertions(+), 15 deletions(-) diff --git a/src/endpoints.go b/src/endpoints.go index b859470..8453d2a 100644 --- a/src/endpoints.go +++ b/src/endpoints.go @@ -49,9 +49,17 @@ func (ep *Endpoints) ExecuteTask(ctx *Context, rw web.ResponseWriter, req *web.R t := task.NewTask(id, request.Type) - if err := ep.taskManager.Run(t); err != nil { + err, data := ep.taskManager.Run(t) + if err != nil { http.Error(rw, err.Error(), http.StatusBadRequest) } + + respBytes, err := json.Marshal(data) + if err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + } + + _, _ = rw.Write(respBytes) } type ExecuteTaskRequest struct { diff --git a/src/task/basehandlers/batch_handler.go b/src/task/basehandlers/batch_handler.go index 444fa53..d540683 100644 --- a/src/task/basehandlers/batch_handler.go +++ b/src/task/basehandlers/batch_handler.go @@ -109,11 +109,11 @@ func (h *BatchWorkloadTaskTypeHandler) WatchTasks(cb task.OnTaskStateModifiedCal }() } -func (h *BatchWorkloadTaskTypeHandler) Run(task *task.Task) error { +func (h *BatchWorkloadTaskTypeHandler) Run(task *task.Task) (error, string) { workload := h.spec.GenerateWorkload(task).(*batch.Job) err := h.kubeClient.RunBatchJob(workload) if err != nil { - return err + return err, "" } - return nil + return nil, "" } diff --git a/src/task/basehandlers/deployment_handler.go b/src/task/basehandlers/deployment_handler.go index 001a5e0..d0b91e1 100644 --- a/src/task/basehandlers/deployment_handler.go +++ b/src/task/basehandlers/deployment_handler.go @@ -100,14 +100,14 @@ func (h *DeploymentHandler) WatchTasks(cb task.OnTaskStateModifiedCallback) { }() } -func (h *DeploymentHandler) Run(task *task.Task) error { +func (h *DeploymentHandler) Run(task *task.Task) (error, string) { workload := h.spec.GenerateWorkload(task).(*apps.Deployment) service := h.spec.GenerateService(task).(*core.Service) createdService, err := h.kubeClient.CreateService(service) if err != nil { log.Print(err) - return err + return err, "" } else { log.Printf("created service with NodePort=%v", createdService.Spec.Ports[0].NodePort) } @@ -115,8 +115,8 @@ func (h *DeploymentHandler) Run(task *task.Task) error { err = h.kubeClient.RunDeployment(workload) if err != nil { log.Print(err) - return err + return err, "" } - return nil + return nil, fmt.Sprintf("\"NodePort\": %v", createdService.Spec.Ports[0].NodePort) } diff --git a/src/task/handler.go b/src/task/handler.go index 8636450..a4fceb7 100644 --- a/src/task/handler.go +++ b/src/task/handler.go @@ -8,7 +8,7 @@ type TaskTypeHandler interface { // runtime WatchTasks(cb OnTaskStateModifiedCallback) - Run(task *Task) error + Run(task *Task) (error, string) } type OnTaskStateModifiedCallback = func(id string, newState TaskState) diff --git a/src/task/manager.go b/src/task/manager.go index 2f46257..e09f4c2 100644 --- a/src/task/manager.go +++ b/src/task/manager.go @@ -27,23 +27,23 @@ func (tm *TaskManager) onTaskStateChanged(id string, state TaskState) { tm.tasks[id].SetState(state) } -func (tm *TaskManager) Run(task *Task) error { +func (tm *TaskManager) Run(task *Task) (error, string) { handler, ok := tm.typeHandlers[task.Type] if !ok { - return errors.New(fmt.Sprintf("unsupported task type '%s'", task.Type)) + return errors.New(fmt.Sprintf("unsupported task type '%s'", task.Type)), "" } if _, ok := tm.tasks[task.ID]; ok { - return errors.New(fmt.Sprintf("task with id %s already exists", task.ID)) + return errors.New(fmt.Sprintf("task with id %s already exists", task.ID)), "" } - err := handler.Run(task) + err, data := handler.Run(task) if err != nil { - return err + return err, data } tm.tasks[task.ID] = task - return nil + return nil, data } func (tm *TaskManager) TaskStateByID(id string) TaskState {