diff --git a/src/endpoints.go b/src/endpoints.go index b859470..8453d2a 100644 --- a/src/endpoints.go +++ b/src/endpoints.go @@ -49,9 +49,17 @@ func (ep *Endpoints) ExecuteTask(ctx *Context, rw web.ResponseWriter, req *web.R t := task.NewTask(id, request.Type) - if err := ep.taskManager.Run(t); err != nil { + err, data := ep.taskManager.Run(t) + if err != nil { http.Error(rw, err.Error(), http.StatusBadRequest) } + + respBytes, err := json.Marshal(data) + if err != nil { + http.Error(rw, err.Error(), http.StatusInternalServerError) + } + + _, _ = rw.Write(respBytes) } type ExecuteTaskRequest struct { diff --git a/src/kube/client.go b/src/kube/client.go index 7e3c566..946f6a7 100644 --- a/src/kube/client.go +++ b/src/kube/client.go @@ -3,19 +3,23 @@ package kube import ( "errors" "fmt" - - batchv1 "k8s.io/api/batch/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + apps "k8s.io/api/apps/v1" + batch "k8s.io/api/batch/v1" + core "k8s.io/api/core/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" ) type Client interface { - RunBatchJob(job *batchv1.Job) error - WatchBatchJobs(options v1.ListOptions) (watch.Interface, error) - GetBatchJobs(options v1.ListOptions) (*batchv1.JobList, error) + RunBatchJob(job *batch.Job) error + WatchBatchJobs(options meta.ListOptions) (watch.Interface, error) + GetBatchJobs(options meta.ListOptions) (*batch.JobList, error) + RunDeployment(deployment *apps.Deployment) error + WatchDeployments(options meta.ListOptions) (watch.Interface, error) + GetDeployments(options meta.ListOptions) (*apps.DeploymentList, error) + CreateService(service *core.Service) (*core.Service, error) } type client struct { @@ -38,18 +42,41 @@ func NewClient(kubeConfigPath string) *client { } } -func (c *client) RunBatchJob(job *batchv1.Job) error { +func (c *client) RunBatchJob(job *batch.Job) error { jobClient := c.clientset.BatchV1().Jobs("default") _, err := jobClient.Create(job) return err } -func (c *client) WatchBatchJobs(options v1.ListOptions) (watch.Interface, error) { +func (c *client) WatchBatchJobs(options meta.ListOptions) (watch.Interface, error) { jobClient := c.clientset.BatchV1().Jobs("default") return jobClient.Watch(options) } -func (c *client) GetBatchJobs(options v1.ListOptions) (*batchv1.JobList, error) { +func (c *client) GetBatchJobs(options meta.ListOptions) (*batch.JobList, error) { jobClient := c.clientset.BatchV1().Jobs("default") return jobClient.List(options) } + +func (c *client) RunDeployment(deployment *apps.Deployment) error { + deploymentClient := c.clientset.AppsV1().Deployments("default") + _, err := deploymentClient.Create(deployment) + return err +} + +func (c *client) WatchDeployments(options meta.ListOptions) (watch.Interface, error) { + deploymentClient := c.clientset.AppsV1().Deployments("default") + return deploymentClient.Watch(options) +} + +func (c *client) GetDeployments(options meta.ListOptions) (*apps.DeploymentList, error) { + deploymentClient := c.clientset.AppsV1().Deployments("default") + return deploymentClient.List(options) +} + +func (c *client) CreateService(service *core.Service) (*core.Service, error) { + serviceClient := c.clientset.CoreV1().Services("default") + createdService, err := serviceClient.Create(service) + //log.Print(createdService.Spec.Ports[0].NodePort) + return createdService, err +} diff --git a/src/main.go b/src/main.go index caf1688..412faf8 100644 --- a/src/main.go +++ b/src/main.go @@ -10,6 +10,7 @@ import ( "github.com/Web-networks/execution-system/kube" "github.com/Web-networks/execution-system/task" "github.com/Web-networks/execution-system/task/applying" + "github.com/Web-networks/execution-system/task/jupyter" "github.com/Web-networks/execution-system/task/learning" "github.com/gocraft/web" ) @@ -21,6 +22,7 @@ func main() { taskManager := task.CreateManagerFromKubernetesState( learning.NewHandler(kubeClient, conf), applying.NewHandler(kubeClient), + jupyter.NewHandler(kubeClient), ) router := web.New(Context{}) diff --git a/src/task/applying/handler.go b/src/task/applying/handler.go index b78b3b6..893b143 100644 --- a/src/task/applying/handler.go +++ b/src/task/applying/handler.go @@ -24,7 +24,7 @@ func (spec *ApplyingTaskSpecification) Type() task.TaskType { func (spec *ApplyingTaskSpecification) GenerateWorkload(t *task.Task) interface{} { return &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ - Name: t.KubeJobName(), + Name: t.KubeWorkloadName(), Labels: map[string]string{ task.ManagedByLabel: task.ManagedByValue, task.TaskTypeLabel: task.ApplyingType, diff --git a/src/task/basehandlers/batch_handler.go b/src/task/basehandlers/batch_handler.go index 1899770..d540683 100644 --- a/src/task/basehandlers/batch_handler.go +++ b/src/task/basehandlers/batch_handler.go @@ -6,8 +6,8 @@ import ( "github.com/Web-networks/execution-system/kube" "github.com/Web-networks/execution-system/task" - batchv1 "k8s.io/api/batch/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + batch "k8s.io/api/batch/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/watch" ) @@ -37,7 +37,7 @@ func (h *BatchWorkloadTaskTypeHandler) RestoreTasks() ([]*task.Task, error) { return h.tasksFromJobs(jobs), nil } -func (h *BatchWorkloadTaskTypeHandler) tasksFromJobs(jobs *batchv1.JobList) []*task.Task { +func (h *BatchWorkloadTaskTypeHandler) tasksFromJobs(jobs *batch.JobList) []*task.Task { var tasks []*task.Task for _, job := range jobs.Items { tasks = append(tasks, newTaskFromWorkload(&job)) @@ -45,14 +45,14 @@ func (h *BatchWorkloadTaskTypeHandler) tasksFromJobs(jobs *batchv1.JobList) []*t return tasks } -func (h *BatchWorkloadTaskTypeHandler) restoreJobsFromKube() (*batchv1.JobList, error) { - return h.kubeClient.GetBatchJobs(v1.ListOptions{ +func (h *BatchWorkloadTaskTypeHandler) restoreJobsFromKube() (*batch.JobList, error) { + return h.kubeClient.GetBatchJobs(meta.ListOptions{ // TODO: add managed-by LabelSelector: fmt.Sprintf("%s=%s", task.TaskTypeLabel, h.spec.Type()), }) } -func newTaskFromWorkload(job *batchv1.Job) *task.Task { +func newTaskFromWorkload(job *batch.Job) *task.Task { t := &task.Task{ ID: idFromKubeJob(job), Type: typeFromKubeJob(job), @@ -61,17 +61,17 @@ func newTaskFromWorkload(job *batchv1.Job) *task.Task { return t } -func idFromKubeJob(job *batchv1.Job) string { +func idFromKubeJob(job *batch.Job) string { labels := job.ObjectMeta.GetObjectMeta().GetLabels() return labels[task.TaskIDLabel] } -func typeFromKubeJob(job *batchv1.Job) task.TaskType { +func typeFromKubeJob(job *batch.Job) task.TaskType { labels := job.ObjectMeta.GetObjectMeta().GetLabels() return labels[task.TaskTypeLabel] } -func stateFromKubeJob(job *batchv1.Job) task.TaskState { +func stateFromKubeJob(job *batch.Job) task.TaskState { if job.Status.Failed > 0 { return task.Failed } else if job.Status.Active > 0 { @@ -84,7 +84,7 @@ func stateFromKubeJob(job *batchv1.Job) task.TaskState { } func (h *BatchWorkloadTaskTypeHandler) WatchTasks(cb task.OnTaskStateModifiedCallback) { - tasksWatcher, err := h.kubeClient.WatchBatchJobs(v1.ListOptions{ + tasksWatcher, err := h.kubeClient.WatchBatchJobs(meta.ListOptions{ // TODO: add managed-by LabelSelector: fmt.Sprintf("%s=%s", task.TaskTypeLabel, h.spec.Type()), }) @@ -97,7 +97,7 @@ func (h *BatchWorkloadTaskTypeHandler) WatchTasks(cb task.OnTaskStateModifiedCal for event := range tasksWatcher.ResultChan() { switch event.Type { case watch.Modified: - job := event.Object.DeepCopyObject().(*batchv1.Job) + job := event.Object.DeepCopyObject().(*batch.Job) taskID := idFromKubeJob(job) newState := stateFromKubeJob(job) @@ -109,11 +109,11 @@ func (h *BatchWorkloadTaskTypeHandler) WatchTasks(cb task.OnTaskStateModifiedCal }() } -func (h *BatchWorkloadTaskTypeHandler) Run(task *task.Task) error { - workload := h.spec.GenerateWorkload(task).(*batchv1.Job) +func (h *BatchWorkloadTaskTypeHandler) Run(task *task.Task) (error, string) { + workload := h.spec.GenerateWorkload(task).(*batch.Job) err := h.kubeClient.RunBatchJob(workload) if err != nil { - return err + return err, "" } - return nil + return nil, "" } diff --git a/src/task/basehandlers/deployment_handler.go b/src/task/basehandlers/deployment_handler.go new file mode 100644 index 0000000..d0b91e1 --- /dev/null +++ b/src/task/basehandlers/deployment_handler.go @@ -0,0 +1,122 @@ +package basehandlers + +import ( + "fmt" + "log" + + "github.com/Web-networks/execution-system/kube" + "github.com/Web-networks/execution-system/task" + apps "k8s.io/api/apps/v1" + core "k8s.io/api/core/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" +) + +type DeploymentHandler struct { + kubeClient kube.Client + spec TaskWithServiceSpecification +} + +var _ task.TaskTypeHandler = (*DeploymentHandler)(nil) + +func NewDeploymentHandler(kubeClient kube.Client, spec TaskWithServiceSpecification) *DeploymentHandler { + return &DeploymentHandler{ + kubeClient: kubeClient, + spec: spec, + } +} + +func (h *DeploymentHandler) Type() task.TaskType { + return h.spec.Type() +} + +func (h *DeploymentHandler) RestoreTasks() ([]*task.Task, error) { + deployments, err := h.restoreDeploymentsFromKube() + if err != nil { + return nil, err + } + return h.tasksFromDeployments(deployments), nil +} + +func (h *DeploymentHandler) tasksFromDeployments(deployments *apps.DeploymentList) []*task.Task { + var tasks []*task.Task + for _, deployment := range deployments.Items { + t := &task.Task{ + ID: idFromKubeDeployment(&deployment), + Type: typeFromKubeDeployment(&deployment), + } + t.SetState(stateFromKubeDeployment(&deployment)) + tasks = append(tasks, t) + } + return tasks +} + +func (h *DeploymentHandler) restoreDeploymentsFromKube() (*apps.DeploymentList, error) { + return h.kubeClient.GetDeployments(meta.ListOptions{ + // TODO: add managed-by + LabelSelector: fmt.Sprintf("%s=%s", task.TaskTypeLabel, h.spec.Type()), + }) +} + +func idFromKubeDeployment(deployment *apps.Deployment) string { + labels := deployment.ObjectMeta.GetObjectMeta().GetLabels() + return labels[task.TaskIDLabel] +} + +func typeFromKubeDeployment(deployment *apps.Deployment) task.TaskType { + labels := deployment.ObjectMeta.GetObjectMeta().GetLabels() + return labels[task.TaskTypeLabel] +} + +func stateFromKubeDeployment(deployment *apps.Deployment) task.TaskState { + if deployment.Status.AvailableReplicas == deployment.Status.Replicas { + return task.Running + } else { + return task.Initializing + } +} + +func (h *DeploymentHandler) WatchTasks(cb task.OnTaskStateModifiedCallback) { + tasksWatcher, err := h.kubeClient.WatchDeployments(meta.ListOptions{ + // TODO: add managed-by + LabelSelector: fmt.Sprintf("%s=%s", task.TaskTypeLabel, h.spec.Type()), + }) + if err != nil { + panic(fmt.Sprintf("failed to start watching for %s tasks", h.spec.Type())) + } + + go func() { + log.Printf("watcher: start watching!") + for event := range tasksWatcher.ResultChan() { + switch event.Type { + case watch.Modified: + deployment := event.Object.DeepCopyObject().(*apps.Deployment) + + taskID := idFromKubeDeployment(deployment) + newState := stateFromKubeDeployment(deployment) + cb(taskID, newState) + } + } + }() +} + +func (h *DeploymentHandler) Run(task *task.Task) (error, string) { + workload := h.spec.GenerateWorkload(task).(*apps.Deployment) + service := h.spec.GenerateService(task).(*core.Service) + + createdService, err := h.kubeClient.CreateService(service) + if err != nil { + log.Print(err) + return err, "" + } else { + log.Printf("created service with NodePort=%v", createdService.Spec.Ports[0].NodePort) + } + + err = h.kubeClient.RunDeployment(workload) + if err != nil { + log.Print(err) + return err, "" + } + + return nil, fmt.Sprintf("\"NodePort\": %v", createdService.Spec.Ports[0].NodePort) +} diff --git a/src/task/basehandlers/task_specification.go b/src/task/basehandlers/task_specification.go index 1d51f5c..0664796 100644 --- a/src/task/basehandlers/task_specification.go +++ b/src/task/basehandlers/task_specification.go @@ -6,3 +6,8 @@ type TaskSpecification interface { Type() task.TaskType GenerateWorkload(t *task.Task) interface{} } +type TaskWithServiceSpecification interface { + Type() task.TaskType + GenerateWorkload(t *task.Task) interface{} + GenerateService(t *task.Task) interface{} +} diff --git a/src/task/handler.go b/src/task/handler.go index 8636450..a4fceb7 100644 --- a/src/task/handler.go +++ b/src/task/handler.go @@ -8,7 +8,7 @@ type TaskTypeHandler interface { // runtime WatchTasks(cb OnTaskStateModifiedCallback) - Run(task *Task) error + Run(task *Task) (error, string) } type OnTaskStateModifiedCallback = func(id string, newState TaskState) diff --git a/src/task/jupyter/handler.go b/src/task/jupyter/handler.go new file mode 100644 index 0000000..19a6b32 --- /dev/null +++ b/src/task/jupyter/handler.go @@ -0,0 +1,96 @@ +package jupyter + +import ( + "github.com/Web-networks/execution-system/kube" + "github.com/Web-networks/execution-system/task" + "github.com/Web-networks/execution-system/task/basehandlers" + apps "k8s.io/api/apps/v1" + core "k8s.io/api/core/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func NewHandler(kubeClient kube.Client) task.TaskTypeHandler { + return basehandlers.NewDeploymentHandler(kubeClient, &JupyterTaskSpecification{}) +} + +type JupyterTaskSpecification struct{} + +var _ basehandlers.TaskWithServiceSpecification = (*JupyterTaskSpecification)(nil) + +func (spec *JupyterTaskSpecification) Type() task.TaskType { + return task.JupyterType +} + +func (spec *JupyterTaskSpecification) GenerateWorkload(t *task.Task) interface{} { + labels := map[string]string{ + task.ManagedByLabel: task.ManagedByValue, + task.TaskTypeLabel: task.JupyterType, + task.TaskIDLabel: t.ID, + } + return &apps.Deployment{ + ObjectMeta: meta.ObjectMeta{ + Name: t.KubeWorkloadName(), + Labels: labels, + }, + Spec: apps.DeploymentSpec{ + Selector: &meta.LabelSelector{ + MatchLabels: labels, + }, + Template: core.PodTemplateSpec{ + ObjectMeta: meta.ObjectMeta{ + Labels: labels, + }, + Spec: core.PodSpec{ + Containers: []core.Container{ + { + Name: t.Type, + Image: "networksidea/image_generation:jupyter-1.0.5", + Ports: []core.ContainerPort{ + { + Name: "http", + Protocol: core.ProtocolTCP, + ContainerPort: 8888, + }, + }, + Env: []core.EnvVar{ + { + Name: "JUPYTER_TOKEN", + Value: "abcd", + }, + }, + }, + }, + RestartPolicy: core.RestartPolicyAlways, + ImagePullSecrets: []core.LocalObjectReference{ + { + Name: "dockerpullsecrets", + }, + }, + }, + }, + }, + } +} + +func (spec *JupyterTaskSpecification) GenerateService(t *task.Task) interface{} { + labels := map[string]string{ + task.ManagedByLabel: task.ManagedByValue, + task.TaskTypeLabel: task.JupyterType, + task.TaskIDLabel: t.ID, + } + return &core.Service{ + ObjectMeta: meta.ObjectMeta{ + Name: t.KubeWorkloadName(), + Labels: labels, + }, + Spec: core.ServiceSpec{ + Selector: labels, + Type: "NodePort", + Ports: []core.ServicePort{ + { + Port: 8888, + }, + }, + }, + } +} diff --git a/src/task/jupyter/jupyter.go b/src/task/jupyter/jupyter.go deleted file mode 100644 index 4259b01..0000000 --- a/src/task/jupyter/jupyter.go +++ /dev/null @@ -1 +0,0 @@ -package jupyter diff --git a/src/task/learning/handler.go b/src/task/learning/handler.go index 511db03..6b492f3 100644 --- a/src/task/learning/handler.go +++ b/src/task/learning/handler.go @@ -29,7 +29,7 @@ func (spec *LearningTaskSpecification) Type() task.TaskType { func (spec *LearningTaskSpecification) GenerateWorkload(t *task.Task) interface{} { return &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ - Name: t.KubeJobName(), + Name: t.KubeWorkloadName(), Labels: map[string]string{ task.ManagedByLabel: task.ManagedByValue, task.TaskTypeLabel: task.LearningType, diff --git a/src/task/manager.go b/src/task/manager.go index 2f46257..e09f4c2 100644 --- a/src/task/manager.go +++ b/src/task/manager.go @@ -27,23 +27,23 @@ func (tm *TaskManager) onTaskStateChanged(id string, state TaskState) { tm.tasks[id].SetState(state) } -func (tm *TaskManager) Run(task *Task) error { +func (tm *TaskManager) Run(task *Task) (error, string) { handler, ok := tm.typeHandlers[task.Type] if !ok { - return errors.New(fmt.Sprintf("unsupported task type '%s'", task.Type)) + return errors.New(fmt.Sprintf("unsupported task type '%s'", task.Type)), "" } if _, ok := tm.tasks[task.ID]; ok { - return errors.New(fmt.Sprintf("task with id %s already exists", task.ID)) + return errors.New(fmt.Sprintf("task with id %s already exists", task.ID)), "" } - err := handler.Run(task) + err, data := handler.Run(task) if err != nil { - return err + return err, data } tm.tasks[task.ID] = task - return nil + return nil, data } func (tm *TaskManager) TaskStateByID(id string) TaskState { diff --git a/src/task/task.go b/src/task/task.go index 4446684..4f63160 100644 --- a/src/task/task.go +++ b/src/task/task.go @@ -15,9 +15,9 @@ const TaskIDLabel = "bigone.demist.ru/task-id" type TaskType = string const ( - LearningType = "learning" - ApplyingType = "applying" - RemoteJupyterType = "jupyter" + LearningType = "learning" + ApplyingType = "applying" + JupyterType = "jupyter" ) type TaskState = string @@ -56,6 +56,6 @@ func (t *Task) SetState(state TaskState) { atomic.StorePointer(&t.state, unsafe.Pointer(&state)) } -func (t *Task) KubeJobName() string { +func (t *Task) KubeWorkloadName() string { return fmt.Sprintf("%s-%s", t.Type, t.ID) }