Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,17 @@ func (ep *Endpoints) ExecuteTask(ctx *Context, rw web.ResponseWriter, req *web.R

t := task.NewTask(id, request.Type)

if err := ep.taskManager.Run(t); err != nil {
err, data := ep.taskManager.Run(t)
if err != nil {
http.Error(rw, err.Error(), http.StatusBadRequest)
}

respBytes, err := json.Marshal(data)
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
}

_, _ = rw.Write(respBytes)
}

type ExecuteTaskRequest struct {
Expand Down
47 changes: 37 additions & 10 deletions src/kube/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,23 @@ package kube
import (
"errors"
"fmt"

batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apps "k8s.io/api/apps/v1"
batch "k8s.io/api/batch/v1"
core "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)

type Client interface {
RunBatchJob(job *batchv1.Job) error
WatchBatchJobs(options v1.ListOptions) (watch.Interface, error)
GetBatchJobs(options v1.ListOptions) (*batchv1.JobList, error)
RunBatchJob(job *batch.Job) error
WatchBatchJobs(options meta.ListOptions) (watch.Interface, error)
GetBatchJobs(options meta.ListOptions) (*batch.JobList, error)
RunDeployment(deployment *apps.Deployment) error
WatchDeployments(options meta.ListOptions) (watch.Interface, error)
GetDeployments(options meta.ListOptions) (*apps.DeploymentList, error)
CreateService(service *core.Service) (*core.Service, error)
}

type client struct {
Expand All @@ -38,18 +42,41 @@ func NewClient(kubeConfigPath string) *client {
}
}

func (c *client) RunBatchJob(job *batchv1.Job) error {
func (c *client) RunBatchJob(job *batch.Job) error {
jobClient := c.clientset.BatchV1().Jobs("default")
_, err := jobClient.Create(job)
return err
}

func (c *client) WatchBatchJobs(options v1.ListOptions) (watch.Interface, error) {
func (c *client) WatchBatchJobs(options meta.ListOptions) (watch.Interface, error) {
jobClient := c.clientset.BatchV1().Jobs("default")
return jobClient.Watch(options)
}

func (c *client) GetBatchJobs(options v1.ListOptions) (*batchv1.JobList, error) {
func (c *client) GetBatchJobs(options meta.ListOptions) (*batch.JobList, error) {
jobClient := c.clientset.BatchV1().Jobs("default")
return jobClient.List(options)
}

func (c *client) RunDeployment(deployment *apps.Deployment) error {
deploymentClient := c.clientset.AppsV1().Deployments("default")
_, err := deploymentClient.Create(deployment)
return err
}

func (c *client) WatchDeployments(options meta.ListOptions) (watch.Interface, error) {
deploymentClient := c.clientset.AppsV1().Deployments("default")
return deploymentClient.Watch(options)
}

func (c *client) GetDeployments(options meta.ListOptions) (*apps.DeploymentList, error) {
deploymentClient := c.clientset.AppsV1().Deployments("default")
return deploymentClient.List(options)
}

func (c *client) CreateService(service *core.Service) (*core.Service, error) {
serviceClient := c.clientset.CoreV1().Services("default")
createdService, err := serviceClient.Create(service)
//log.Print(createdService.Spec.Ports[0].NodePort)
return createdService, err
}
2 changes: 2 additions & 0 deletions src/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/Web-networks/execution-system/kube"
"github.com/Web-networks/execution-system/task"
"github.com/Web-networks/execution-system/task/applying"
"github.com/Web-networks/execution-system/task/jupyter"
"github.com/Web-networks/execution-system/task/learning"
"github.com/gocraft/web"
)
Expand All @@ -21,6 +22,7 @@ func main() {
taskManager := task.CreateManagerFromKubernetesState(
learning.NewHandler(kubeClient, conf),
applying.NewHandler(kubeClient),
jupyter.NewHandler(kubeClient),
)

router := web.New(Context{})
Expand Down
2 changes: 1 addition & 1 deletion src/task/applying/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (spec *ApplyingTaskSpecification) Type() task.TaskType {
func (spec *ApplyingTaskSpecification) GenerateWorkload(t *task.Task) interface{} {
return &batchv1.Job{
ObjectMeta: metav1.ObjectMeta{
Name: t.KubeJobName(),
Name: t.KubeWorkloadName(),
Labels: map[string]string{
task.ManagedByLabel: task.ManagedByValue,
task.TaskTypeLabel: task.ApplyingType,
Expand Down
30 changes: 15 additions & 15 deletions src/task/basehandlers/batch_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (

"github.com/Web-networks/execution-system/kube"
"github.com/Web-networks/execution-system/task"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
batch "k8s.io/api/batch/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
)

Expand Down Expand Up @@ -37,22 +37,22 @@ func (h *BatchWorkloadTaskTypeHandler) RestoreTasks() ([]*task.Task, error) {
return h.tasksFromJobs(jobs), nil
}

func (h *BatchWorkloadTaskTypeHandler) tasksFromJobs(jobs *batchv1.JobList) []*task.Task {
func (h *BatchWorkloadTaskTypeHandler) tasksFromJobs(jobs *batch.JobList) []*task.Task {
var tasks []*task.Task
for _, job := range jobs.Items {
tasks = append(tasks, newTaskFromWorkload(&job))
}
return tasks
}

func (h *BatchWorkloadTaskTypeHandler) restoreJobsFromKube() (*batchv1.JobList, error) {
return h.kubeClient.GetBatchJobs(v1.ListOptions{
func (h *BatchWorkloadTaskTypeHandler) restoreJobsFromKube() (*batch.JobList, error) {
return h.kubeClient.GetBatchJobs(meta.ListOptions{
// TODO: add managed-by
LabelSelector: fmt.Sprintf("%s=%s", task.TaskTypeLabel, h.spec.Type()),
})
}

func newTaskFromWorkload(job *batchv1.Job) *task.Task {
func newTaskFromWorkload(job *batch.Job) *task.Task {
t := &task.Task{
ID: idFromKubeJob(job),
Type: typeFromKubeJob(job),
Expand All @@ -61,17 +61,17 @@ func newTaskFromWorkload(job *batchv1.Job) *task.Task {
return t
}

func idFromKubeJob(job *batchv1.Job) string {
func idFromKubeJob(job *batch.Job) string {
labels := job.ObjectMeta.GetObjectMeta().GetLabels()
return labels[task.TaskIDLabel]
}

func typeFromKubeJob(job *batchv1.Job) task.TaskType {
func typeFromKubeJob(job *batch.Job) task.TaskType {
labels := job.ObjectMeta.GetObjectMeta().GetLabels()
return labels[task.TaskTypeLabel]
}

func stateFromKubeJob(job *batchv1.Job) task.TaskState {
func stateFromKubeJob(job *batch.Job) task.TaskState {
if job.Status.Failed > 0 {
return task.Failed
} else if job.Status.Active > 0 {
Expand All @@ -84,7 +84,7 @@ func stateFromKubeJob(job *batchv1.Job) task.TaskState {
}

func (h *BatchWorkloadTaskTypeHandler) WatchTasks(cb task.OnTaskStateModifiedCallback) {
tasksWatcher, err := h.kubeClient.WatchBatchJobs(v1.ListOptions{
tasksWatcher, err := h.kubeClient.WatchBatchJobs(meta.ListOptions{
// TODO: add managed-by
LabelSelector: fmt.Sprintf("%s=%s", task.TaskTypeLabel, h.spec.Type()),
})
Expand All @@ -97,7 +97,7 @@ func (h *BatchWorkloadTaskTypeHandler) WatchTasks(cb task.OnTaskStateModifiedCal
for event := range tasksWatcher.ResultChan() {
switch event.Type {
case watch.Modified:
job := event.Object.DeepCopyObject().(*batchv1.Job)
job := event.Object.DeepCopyObject().(*batch.Job)

taskID := idFromKubeJob(job)
newState := stateFromKubeJob(job)
Expand All @@ -109,11 +109,11 @@ func (h *BatchWorkloadTaskTypeHandler) WatchTasks(cb task.OnTaskStateModifiedCal
}()
}

func (h *BatchWorkloadTaskTypeHandler) Run(task *task.Task) error {
workload := h.spec.GenerateWorkload(task).(*batchv1.Job)
func (h *BatchWorkloadTaskTypeHandler) Run(task *task.Task) (error, string) {
workload := h.spec.GenerateWorkload(task).(*batch.Job)
err := h.kubeClient.RunBatchJob(workload)
if err != nil {
return err
return err, ""
}
return nil
return nil, ""
}
122 changes: 122 additions & 0 deletions src/task/basehandlers/deployment_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package basehandlers

import (
"fmt"
"log"

"github.com/Web-networks/execution-system/kube"
"github.com/Web-networks/execution-system/task"
apps "k8s.io/api/apps/v1"
core "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
)

type DeploymentHandler struct {
kubeClient kube.Client
spec TaskWithServiceSpecification
}

var _ task.TaskTypeHandler = (*DeploymentHandler)(nil)

func NewDeploymentHandler(kubeClient kube.Client, spec TaskWithServiceSpecification) *DeploymentHandler {
return &DeploymentHandler{
kubeClient: kubeClient,
spec: spec,
}
}

func (h *DeploymentHandler) Type() task.TaskType {
return h.spec.Type()
}

func (h *DeploymentHandler) RestoreTasks() ([]*task.Task, error) {
deployments, err := h.restoreDeploymentsFromKube()
if err != nil {
return nil, err
}
return h.tasksFromDeployments(deployments), nil
}

func (h *DeploymentHandler) tasksFromDeployments(deployments *apps.DeploymentList) []*task.Task {
var tasks []*task.Task
for _, deployment := range deployments.Items {
t := &task.Task{
ID: idFromKubeDeployment(&deployment),
Type: typeFromKubeDeployment(&deployment),
}
t.SetState(stateFromKubeDeployment(&deployment))
tasks = append(tasks, t)
}
return tasks
}

func (h *DeploymentHandler) restoreDeploymentsFromKube() (*apps.DeploymentList, error) {
return h.kubeClient.GetDeployments(meta.ListOptions{
// TODO: add managed-by
LabelSelector: fmt.Sprintf("%s=%s", task.TaskTypeLabel, h.spec.Type()),
})
}

func idFromKubeDeployment(deployment *apps.Deployment) string {
labels := deployment.ObjectMeta.GetObjectMeta().GetLabels()
return labels[task.TaskIDLabel]
}

func typeFromKubeDeployment(deployment *apps.Deployment) task.TaskType {
labels := deployment.ObjectMeta.GetObjectMeta().GetLabels()
return labels[task.TaskTypeLabel]
}

func stateFromKubeDeployment(deployment *apps.Deployment) task.TaskState {
if deployment.Status.AvailableReplicas == deployment.Status.Replicas {
return task.Running
} else {
return task.Initializing
}
}

func (h *DeploymentHandler) WatchTasks(cb task.OnTaskStateModifiedCallback) {
tasksWatcher, err := h.kubeClient.WatchDeployments(meta.ListOptions{
// TODO: add managed-by
LabelSelector: fmt.Sprintf("%s=%s", task.TaskTypeLabel, h.spec.Type()),
})
if err != nil {
panic(fmt.Sprintf("failed to start watching for %s tasks", h.spec.Type()))
}

go func() {
log.Printf("watcher: start watching!")
for event := range tasksWatcher.ResultChan() {
switch event.Type {
case watch.Modified:
deployment := event.Object.DeepCopyObject().(*apps.Deployment)

taskID := idFromKubeDeployment(deployment)
newState := stateFromKubeDeployment(deployment)
cb(taskID, newState)
}
}
}()
}

func (h *DeploymentHandler) Run(task *task.Task) (error, string) {
workload := h.spec.GenerateWorkload(task).(*apps.Deployment)
service := h.spec.GenerateService(task).(*core.Service)

createdService, err := h.kubeClient.CreateService(service)
if err != nil {
log.Print(err)
return err, ""
} else {
log.Printf("created service with NodePort=%v", createdService.Spec.Ports[0].NodePort)
}

err = h.kubeClient.RunDeployment(workload)
if err != nil {
log.Print(err)
return err, ""
}

return nil, fmt.Sprintf("\"NodePort\": %v", createdService.Spec.Ports[0].NodePort)
}
5 changes: 5 additions & 0 deletions src/task/basehandlers/task_specification.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,8 @@ type TaskSpecification interface {
Type() task.TaskType
GenerateWorkload(t *task.Task) interface{}
}
type TaskWithServiceSpecification interface {
Type() task.TaskType
GenerateWorkload(t *task.Task) interface{}
GenerateService(t *task.Task) interface{}
}
2 changes: 1 addition & 1 deletion src/task/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ type TaskTypeHandler interface {

// runtime
WatchTasks(cb OnTaskStateModifiedCallback)
Run(task *Task) error
Run(task *Task) (error, string)
}

type OnTaskStateModifiedCallback = func(id string, newState TaskState)
Loading