diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..f1b92af --- /dev/null +++ b/.dockerignore @@ -0,0 +1,2 @@ +.git +frontend/node_modules diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..9534c46 --- /dev/null +++ b/.gitignore @@ -0,0 +1,93 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +env/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +*.egg-info/ +.installed.cfg +*.egg + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*,cover +.hypothesis/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# IPython Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# dotenv +.env + +# virtualenv +venv/ +ENV/ + +# Spyder project settings +.spyderproject + +# Rope project settings +.ropeproject + +.remote-sync.json +dev_environment.sh +frontend/node_modules diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..5a01426 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,21 @@ +FROM ubuntu:16.04 + +RUN apt-get update && apt-get install -y apt-transport-https ca-certificates + +RUN echo "deb https://mirrors.tuna.tsinghua.edu.cn/ubuntu/ xenial main restricted universe multiverse" > /etc/apt/sources.list && \ + echo "deb https://mirrors.tuna.tsinghua.edu.cn/ubuntu/ xenial-updates main restricted universe multiverse" >> /etc/apt/sources.list && \ + echo "deb https://mirrors.tuna.tsinghua.edu.cn/ubuntu/ xenial-backports main restricted universe multiverse" >> /etc/apt/sources.list && \ + echo "deb https://mirrors.tuna.tsinghua.edu.cn/ubuntu/ xenial-security main restricted universe multiverse" >> /etc/apt/sources.list + +RUN apt-get update +RUN apt-get install -y wget python3.5 python3-pip git + +RUN python3.5 -m pip install kubernetes tornado aiohttp pymongo --ignore-installed -i https://pypi.tuna.tsinghua.edu.cn/simple + +ADD . /ktqueue +WORKDIR /ktqueue + +RUN python3.5 -m pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple +CMD python3 /ktqueue/server.py + +EXPOSE 8080 diff --git a/README.md b/README.md index 72be5ec..ef7f66a 100644 --- a/README.md +++ b/README.md @@ -1 +1,27 @@ -# ktq \ No newline at end of file +# KTQueue + +kubernetes task queue with GPU support + +# Features + +- support GPU tasks +- support assigning the task to node manually +- realtime logs on webpage & log version management +- mount host-path to Pod manually +- Tensorboard manage & proxy +- git clone repository with ssh-key or username & password or Github OAuth +- CPU & Memory limit supported + +# screenshoot + +![screenshoot](https://user-images.githubusercontent.com/1068203/28708229-10e6e19e-73ae-11e7-882f-f4fb6bff877a.png) + +# How to deploy + +deployment guide under [deploy](./docs/deploy) directory + +# How to build images for KTQueue + +You can use any framework you want as long as you have the correct docker image, here are examples to build docker image for KTQueue + +- [tensorflow](./docs/docker_image_example/tensorflow) diff --git a/docs/deploy/.gitignore b/docs/deploy/.gitignore new file mode 100644 index 0000000..cb5b539 --- /dev/null +++ b/docs/deploy/.gitignore @@ -0,0 +1 @@ +dep*.yaml diff --git a/docs/deploy/NODE.md b/docs/deploy/NODE.md new file mode 100644 index 0000000..b08c4c7 --- /dev/null +++ b/docs/deploy/NODE.md @@ -0,0 +1,42 @@ +# How to setup a KTQueue node + +## Brief + +1. join the kubernetes cluster, and ensure `kubelet`'s GPU support is enabled. +2. mount the cephfs on `/mnt/cephfs` +3. install `nvidia-docker` and run it once + +## GPU support of kubelet + +Kubernetes (> v1.6.0-beta.1) is supporting multi-GPU now, please follow the newest guide to enable GPU support. + +## Nvidia-Docker + +[nvidia-docker](https://github.com/NVIDIA/nvidia-docker) homepage + +CUDA in container need NVIDIA drivers such as `libcuda.so.1` to work. And KTQueue assume that drivers located at `/var/lib/nvidia-docker/volumes/nvidia_driver/`. So you need to install nvidia-docker and use nvidia-docker to run a cuda image to ensure that the drivers are located in the right place. + +1. follow the installation instruction to install nvidia-docker +2. run the test nvidia-smi with nvidia-docker +3. make sure that there is a driver like `367.57` are located at `/var/lib/nvidia-docker/volumes/nvidia_driver/` + +## Trouble shoot + +- nvidia-smi print the right output, but there are no drivers found at `/var/lib/nvidia-docker/volumes/nvidia_driver/` + +try to update the nvidia-docker(>1.0.0), and check docker volumes + +> docker volume ls + +you may get a line like + +``` +DRIVER VOLUME NAME +nvidia-docker nvidia_driver_367.57 +``` + +try to remove that volume and run `nvidia-docker run --rm nvidia/cuda nvidia-smi` again. + +- docker: Error response from daemon: create nvidia_driver_367.57: VolumeDriver.Create: internal error, check logs for details. + +> sudo chown nvidia-docker:nvidia-docker /var/lib/nvidia-docker/volumes/nvidia_driver diff --git a/docs/deploy/README.md b/docs/deploy/README.md new file mode 100644 index 0000000..d29f130 --- /dev/null +++ b/docs/deploy/README.md @@ -0,0 +1,115 @@ +# Deployment + +# Brief + +1. setup kubernetes and ceph cluster +2. add ceph-secret to kubernetes +3. deploy mongodb based on kubernetes and ceph +4. build ktqueue docker image +5. deploy ktqueue + +To setup a KTQueue node, please read [How to setup a KTQueue node](./NODE.md) + + +# File list + +- ktqueue.yaml +- mongodb-service.yaml +- mongodb-dev.yaml +- mongodb-production.yaml + +# Dependancy + +ktqueue requires kubernetes and cephfs, make sure you already have them. + +# Prepare ceph + +kubernetes and other components needs permission (aka, a secret) to access ceph. + +if you want to know mons in your ceph cluster, just type: + +> ceph mon stat + +then, get your ceph-secret to kubernetes + +> sudo ceph auth get-key client.admin | base64 + +Note: though `ceph get-key`'s response is encoded by base64, you should encode it again. + +then, update your ceph-secret.yaml, add your secret after `key:` + +> cp ceph-secret.yaml dep-ceph-secret.yaml && vi dep-ceph-secret.yaml + +and import `ceph-secret.yaml` + +> kubectl create -f dep-ceph-secret.yaml + +# Deploy mongodb + +## create rbd + +mongodb needs a `persistent volume`(just like a disk) to store data. so you should create one. + +> rbd create rbd/ktqueue-mongodb -s 10240 + +> rbd info rbd/ktqueue-mongodb + +try: + +> rbd map ktqueue-mongodb + +current linux kernal doesn't support all the features. if you get error, refer [this](http://tonybai.com/2016/11/07/integrate-kubernetes-with-ceph-rbd/) + +> rbd feature disable ktqueue-mongodb exclusive-lock, object-map, fast-diff, deep-flatten + +## Create mongodb services + +> cp mongodb-production.yaml dep-mongodb-production.yaml + +create mongodb service + +> kubectl create -f mongodb-service.yaml + +create mongodb server + +> kubectl create -f dep-mongodb-production.yaml + +# Deploy ktqueue + +## mount cephfs +ktqueue dameon needs to access ceph to clone code, store log, etc. and ktqueue job needs to access ceph to store output. + +so you should ensure that cephfs has been mounted at `/mnt/cephfs` on ensure every single node you want to run ktqueue jobs or ktqueue dameon. + +you should modify `fstab` and add cephfs mount. + +## build image + +build front-end + +> cd frontend +> npm install +> npm run build + +build docker image + +> docker build -t ktqueue . + +you can modify `dep-ktqueue.yaml`(see next step)and change image name if you want. + + +## deploy + +> cp ktqueue.yaml dep-ktqueue.yaml + +if you want to change IP/port of ktqueue or assign ktqueue to a specific node, you should modify `dep-ktqueue.yaml` + +to select node you want to run ktqueue, change `host_name_you_want` after `kubernetes.io/hostname` and uncomment this line. + +to select IP/port, change `ip_you_want_to_access_form_outside` under `externalIPs` + +finish it: + +> kubectl create -f dep-ktqueue.yaml + +enjoy! diff --git a/docs/deploy/ceph-secret.yaml b/docs/deploy/ceph-secret.yaml new file mode 100644 index 0000000..9c36bde --- /dev/null +++ b/docs/deploy/ceph-secret.yaml @@ -0,0 +1,6 @@ +Version: v1 +kind: Secret +metadata: + name: ceph-secret +data: + key: diff --git a/docs/deploy/ktqueue-rbac.yaml b/docs/deploy/ktqueue-rbac.yaml new file mode 100644 index 0000000..e0271ba --- /dev/null +++ b/docs/deploy/ktqueue-rbac.yaml @@ -0,0 +1,40 @@ +# Create the clusterrole: +# $ kubectl create -f ktqueue-rbac.yaml +# Bind the ktqueue serviceaccount to the ktqueue clusterrole: +# $ kubectl create clusterrolebinding ktqueue --clusterrole=ktqueue --serviceaccount=default:ktqueue +--- +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1beta1 +metadata: + name: ktqueue +rules: + - apiGroups: + - "" + resources: + - pods + - pods/log + verbs: ["*"] + - apiGroups: + - "" + resources: + - nodes + verbs: + - list + - apiGroups: + - "batch" + resources: + - jobs + verbs: ["*"] +--- +apiVersion: rbac.authorization.k8s.io/v1beta1 +kind: ClusterRoleBinding +metadata: + name: ktqueue +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: ktqueue +subjects: +- kind: ServiceAccount + name: ktqueue + namespace: default diff --git a/docs/deploy/ktqueue.yaml b/docs/deploy/ktqueue.yaml new file mode 100644 index 0000000..2f1f6cb --- /dev/null +++ b/docs/deploy/ktqueue.yaml @@ -0,0 +1,85 @@ +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: ktqueue +--- +apiVersion: extensions/v1beta1 +kind: Deployment +metadata: + name: ktqueue + labels: + app: ktqueue +spec: + replicas: 1 + selector: + matchLabels: + app: ktqueue + template: + metadata: + labels: + app: ktqueue + spec: + nodeSelector: + beta.kubernetes.io/arch: amd64 + #kubernetes.io/hostname: host_name_you_want + serviceAccountName: ktqueue + containers: + - name: ktqueue + image: ktqueue + imagePullPolicy: IfNotPresent + command: [ "python3", "/ktqueue/server.py" ] + env: + - name: KTQ_AUTH_REQUIRED + value: '0' + - name: KTQ_COOKIE_SECRET + value: '' + - name: KTQ_OAUTH2_PROVIDER + value: github + - name: KTQ_OAUTH2_CLIENT_ID + value: '' + - name: KTQ_OAUTH2_CLIENT_SECRET + value: '' + - name: KTQ_OAUTH2_CALLBACK + value: '' + volumeMounts: + - name: cephfs + mountPath: /cephfs + ports: + - containerPort: 8080 + protocol: TCP + securityContext: + privileged: true + volumes: + - name: 'cephfs' + hostPath: + path: /mnt/cephfs + # cephfs: + # monitors: + # - localhost:6789 + # user: admin + # secretRef: + # name: ceph-secret + # readOnly: false + +--- +kind: Service +apiVersion: v1 +metadata: + labels: + app: ktqueue + name: ktqueue +spec: + type: NodePort + ports: + - port: 80 + targetPort: 8080 + # externalIPs: + # - ip_you_want_to_access_form_outside + selector: + app: ktqueue +--- +apiVersion: v1 +kind: Namespace +metadata: + name: ktqueue diff --git a/docs/deploy/mongodb-dev.yaml b/docs/deploy/mongodb-dev.yaml new file mode 100644 index 0000000..2e05296 --- /dev/null +++ b/docs/deploy/mongodb-dev.yaml @@ -0,0 +1,27 @@ +apiVersion: v1 +kind: ReplicationController +metadata: + labels: + name: ktqueue-mongodb + name: ktqueue-mongodb-controller +spec: + replicas: 1 + template: + metadata: + labels: + name: ktqueue-mongodb + spec: + containers: + - image: mongo + name: mongo + ports: + - name: mongo + containerPort: 27017 + hostPort: 27017 + volumeMounts: + - name: mongo-persistent-storage + mountPath: /data/db + volumes: + - name: mongo-persistent-storage + hostPath: + path: /tmp/tq-mongo diff --git a/docs/deploy/mongodb-production.yaml b/docs/deploy/mongodb-production.yaml new file mode 100644 index 0000000..c5ba116 --- /dev/null +++ b/docs/deploy/mongodb-production.yaml @@ -0,0 +1,34 @@ +apiVersion: v1 +kind: ReplicationController +metadata: + labels: + name: ktqueue-mongodb + name: ktqueue-mongodb-controller +spec: + replicas: 1 + template: + metadata: + labels: + name: ktqueue-mongodb + spec: + containers: + - image: mongo + name: mongo + ports: + - name: mongo + containerPort: 27017 + hostPort: 27017 + volumeMounts: + - name: mongo-persistent-storage + mountPath: /data/db + volumes: + - name: mongo-persistent-storage + rbd: + monitors: + - localhost:6789 # your mons here + pool: rbd + image: ktqueue-mongodb + secretRef: + name: ceph-secret + fsType: ext4 + readOnly: false diff --git a/docs/deploy/mongodb-service.yaml b/docs/deploy/mongodb-service.yaml new file mode 100644 index 0000000..9ef4fa3 --- /dev/null +++ b/docs/deploy/mongodb-service.yaml @@ -0,0 +1,12 @@ +apiVersion: v1 +kind: Service +metadata: + labels: + name: ktqueue-mongodb + name: ktqueue-mongodb +spec: + ports: + - port: 27017 + targetPort: 27017 + selector: + name: ktqueue-mongodb diff --git a/docs/docker_image_example/tensorflow/Dockerfile b/docs/docker_image_example/tensorflow/Dockerfile new file mode 100644 index 0000000..672f02c --- /dev/null +++ b/docs/docker_image_example/tensorflow/Dockerfile @@ -0,0 +1,24 @@ +# base on nivida official cuda image +FROM nvidia/cuda:8.0-cudnn6-devel-ubuntu16.04 + +LABEL maintainer "comzyh " + +# if you wan't to use HTTPS sources +RUN apt-get update && apt-get install -y apt-transport-https + +# use mirrors, use ubuntu 16.04 for example +RUN echo "deb https://mirrors.tuna.tsinghua.edu.cn/ubuntu/ xenial main restricted universe multiverse" > /etc/apt/sources.list && \ + echo "deb https://mirrors.tuna.tsinghua.edu.cn/ubuntu/ xenial-updates main restricted universe multiverse" >> /etc/apt/sources.list && \ + echo "deb https://mirrors.tuna.tsinghua.edu.cn/ubuntu/ xenial-backports main restricted universe multiverse" >> /etc/apt/sources.list && \ + echo "deb https://mirrors.tuna.tsinghua.edu.cn/ubuntu/ xenial-security main restricted universe multiverse" >> /etc/apt/sources.list + +# install python3 and pip +RUN apt-get update +RUN apt-get install -y wget python3.5 python3-pip locales + +# generate UTF-8 locales, to avoid encoding error when printing logs +RUN locale-gen en_US.UTF-8 + +# add tensorflow pakcage and install tensorflow +ADD tensorflow*.whl /tmp/ +RUN python3.5 -m pip install --upgrade pip && python3.5 -m pip install --ignore-installed -i https://pypi.tuna.tsinghua.edu.cn/simple /tmp/tensorflow*.whl diff --git a/docs/docker_image_example/tensorflow/README.md b/docs/docker_image_example/tensorflow/README.md new file mode 100644 index 0000000..71bb0ad --- /dev/null +++ b/docs/docker_image_example/tensorflow/README.md @@ -0,0 +1,26 @@ +# How to build Tensorflow image for KTQueue + +# Build image from source(Optional) + +Generally, you can use offical tensorflow package. If you want't to build your own python wheel, please refer [Installing TensorFlow from Sources](https://www.tensorflow.org/install/install_sources) + +TLDR; brief procdeure to build GPU package: + +configure package, enter `yes` when you are asked `Do you wish to build TensorFlow with CUDA support` + +>./configure + +build tensorflow + +> bazel build --config=opt //tensorflow/tools/pip_package:build_pip_package + +build python wheel + +> bazel build --config=opt --config=cuda //tensorflow/tools/pip_package:build_pip_package + +# Build docker image for KTQueue + +1. copy your wheel(tensorflow-*.whl) into this directory. +2. build docker image + +> docker build . -t your_image_name diff --git a/frontend/.babelrc b/frontend/.babelrc new file mode 100644 index 0000000..6311841 --- /dev/null +++ b/frontend/.babelrc @@ -0,0 +1,7 @@ +{ + "presets": [ + ["latest", { + "es2015": { "modules": false } + }] + ] +} diff --git a/frontend/.eslintrc.json b/frontend/.eslintrc.json new file mode 100644 index 0000000..29d89e0 --- /dev/null +++ b/frontend/.eslintrc.json @@ -0,0 +1,3 @@ +{ + "extends": ["plugin:vue-libs/recommended"] +} diff --git a/frontend/.gitignore b/frontend/.gitignore new file mode 100644 index 0000000..c369a4d --- /dev/null +++ b/frontend/.gitignore @@ -0,0 +1,5 @@ +.DS_Store +node_modules/ +dist/ +npm-debug.log +yarn-error.log diff --git a/frontend/README.md b/frontend/README.md new file mode 100644 index 0000000..daf4e72 --- /dev/null +++ b/frontend/README.md @@ -0,0 +1,18 @@ +# ktqueue + +> KTQueue frontend + +## Build Setup + +``` bash +# install dependencies +npm install + +# serve with hot reload at localhost:8081 +npm run dev + +# build for production with minification +npm run build +``` + +For detailed explanation on how things work, consult the [docs for vue-loader](http://vuejs.github.io/vue-loader). diff --git a/frontend/package.json b/frontend/package.json new file mode 100644 index 0000000..4584477 --- /dev/null +++ b/frontend/package.json @@ -0,0 +1,43 @@ +{ + "name": "ktqueue", + "description": "KTQueue frontend", + "version": "1.0.0", + "author": "Comzyh ", + "private": true, + "scripts": { + "dev": "cross-env NODE_ENV=development webpack-dev-server --open --hot", + "build": "cross-env NODE_ENV=production webpack --progress --hide-modules" + }, + "dependencies": { + "element-ui": "^1.4.0", + "moment": "^2.18.1", + "vue": "^2.4.2", + "vue-resource": "^1.3.4", + "vue-router": "^2.7.0", + "vuex": "^2.3.1" + }, + "devDependencies": { + "babel-core": "^6.25.0", + "babel-loader": "^7.1.1", + "babel-preset-latest": "^6.0.0", + "cross-env": "^5.0.1", + "css-loader": "^0.28.4", + "eslint": "^4.3.0", + "eslint-plugin-html": "^3.1.1", + "eslint-plugin-import": "^2.7.0", + "eslint-plugin-node": "^5.1.1", + "eslint-plugin-promise": "^3.5.0", + "eslint-plugin-standard": "^3.0.1", + "eslint-plugin-vue-libs": "git+https://github.com/vuejs/eslint-plugin-vue-libs.git", + "extract-text-webpack-plugin": "^3.0.0", + "file-loader": "^0.11.2", + "html-webpack-plugin": "^2.29.0", + "node-sass": "^4.5.3", + "sass-loader": "^6.0.6", + "style-loader": "^0.18.2", + "vue-loader": "^12.2.2", + "vue-template-compiler": "^2.4.2", + "webpack": "^3.3.0", + "webpack-dev-server": "^2.6.1" + } +} diff --git a/frontend/src/App.vue b/frontend/src/App.vue new file mode 100644 index 0000000..54d0c34 --- /dev/null +++ b/frontend/src/App.vue @@ -0,0 +1,53 @@ + + + + + diff --git a/frontend/src/JobEditDialog.vue b/frontend/src/JobEditDialog.vue new file mode 100644 index 0000000..3a968df --- /dev/null +++ b/frontend/src/JobEditDialog.vue @@ -0,0 +1,141 @@ + + + diff --git a/frontend/src/JobLog.vue b/frontend/src/JobLog.vue new file mode 100644 index 0000000..377ebf3 --- /dev/null +++ b/frontend/src/JobLog.vue @@ -0,0 +1,166 @@ + + + diff --git a/frontend/src/Jobs.vue b/frontend/src/Jobs.vue new file mode 100644 index 0000000..a46881d --- /dev/null +++ b/frontend/src/Jobs.vue @@ -0,0 +1,501 @@ + + + diff --git a/frontend/src/Login.vue b/frontend/src/Login.vue new file mode 100644 index 0000000..76fa5c7 --- /dev/null +++ b/frontend/src/Login.vue @@ -0,0 +1,31 @@ + + + diff --git a/frontend/src/Repos.vue b/frontend/src/Repos.vue new file mode 100644 index 0000000..0d4d921 --- /dev/null +++ b/frontend/src/Repos.vue @@ -0,0 +1,128 @@ + + + + diff --git a/frontend/src/const.js b/frontend/src/const.js new file mode 100644 index 0000000..3558662 --- /dev/null +++ b/frontend/src/const.js @@ -0,0 +1,18 @@ +const defaultJobData = { + name: '', + node: null, + gpuNum: 0, + command: '', + image: '', + repo: '', + branch: '', + commit: '', + comments: '', + volumeMounts: [], + cpuLimit: '1.5', + memoryLimit: '2Gi', + autoRestart: false +} +export { + defaultJobData +} diff --git a/frontend/src/index.html b/frontend/src/index.html new file mode 100644 index 0000000..d4f42eb --- /dev/null +++ b/frontend/src/index.html @@ -0,0 +1,18 @@ + + + + + + + KTQueue + + + +
+ + diff --git a/frontend/src/main.js b/frontend/src/main.js new file mode 100644 index 0000000..faf7b90 --- /dev/null +++ b/frontend/src/main.js @@ -0,0 +1,69 @@ +import Vue from 'vue' +import Vuex from 'vuex' +import VueRouter from 'vue-router' +import VueResource from 'vue-resource' +import ElementUI from 'element-ui' +import 'element-ui/lib/theme-default/index.css' + +import App from './App.vue' +import Jobs from './Jobs.vue' +import JobLog from './JobLog.vue' +import Repos from './Repos.vue' +import Login from './Login.vue' + +Vue.use(ElementUI) +Vue.use(VueRouter) +Vue.use(VueResource) +Vue.use(Vuex) + +const routes = [ + { path: '/', name: 'jobs', redirect: '/jobs', meta: { requireAuth: true }}, + { path: '/jobs', component: Jobs, meta: { requireAuth: true }}, + { path: '/jobs/:jobName/log', component: JobLog, meta: { requireAuth: true }}, + { path: '/repos', name: 'repos', component: Repos, meta: { requireAuth: true }}, + { path: '/login', name: 'login', component: Login, meta: { requireAuth: false }} +] + +const router = new VueRouter({ routes }) + +const store = new Vuex.Store({ + strict: process.env.NODE_ENV !== 'production', + state: { + userInfo: { + username: window.localStorage['username'] + } + }, + mutations: { + updateUserInfo (state, newUserInfo) { + state.userInfo = newUserInfo + } + } +}) + +// auth +router.beforeEach((to, from, next) => { + if (to.meta.requireAuth && store.state.userInfo.username == null) { + next({ path: '/login' }) + } else { + next() + } +}) + +new Vue({ + router, + store, + el: '#app', + render: h => h(App), + created () { + this.$http.get('/api/current_user').then((response) => { + this.$store.commit('updateUserInfo', { username: response.body.user }) + window.localStorage['username'] = response.body.user + if (this.$route.path === '/login') { + this.$router.push('/') + } + }).catch((response) => { + this.$store.commit('updateUserInfo', { username: null }) + delete window.localStorage['username'] + }) + } +}) diff --git a/frontend/webpack.config.js b/frontend/webpack.config.js new file mode 100644 index 0000000..ee8b32d --- /dev/null +++ b/frontend/webpack.config.js @@ -0,0 +1,116 @@ +var path = require('path') +var webpack = require('webpack') +var ExtractTextPlugin = require('extract-text-webpack-plugin') +var HtmlWebpackPlugin = require('html-webpack-plugin') + +module.exports = { + entry: { + app: './src/main.js', + vendor: ['element-ui', 'vue', 'vue-router', 'vuex', 'vue-resource', 'moment'] + }, + output: { + path: path.resolve(__dirname, './dist'), + publicPath: '/dist/', + filename: 'build.js' + }, + module: { + rules: [ + { + test: /\.vue$/, + loader: 'vue-loader', + options: { + loaders: { + // Since sass-loader (weirdly) has SCSS as its default parse mode, we map + // the "scss" and "sass" values for the lang attribute to the right configs here. + // other preprocessors should work out of the box, no loader config like this necessary. + 'scss': 'vue-style-loader!css-loader!sass-loader', + 'sass': 'vue-style-loader!css-loader!sass-loader?indentedSyntax' + } + // other vue-loader options go here + } + }, + { + test: /\.js$/, + loader: 'babel-loader', + exclude: /node_modules/ + }, + { + test: /\.css$/, + loader: ExtractTextPlugin.extract({ + fallback: 'style-loader', + use: 'css-loader' + }) + }, + { + test: /\.(eot|svg|ttf|woff|woff2)(\?\S*)?$/, + loader: 'file-loader' + }, + { + test: /\.(png|jpe?g|gif|svg)(\?\S*)?$/, + loader: 'file-loader', + query: { + name: '[name].[ext]?[hash]' + } + } + ] + }, + resolve: { + alias: { + 'vue$': 'vue/dist/vue.esm.js' + } + }, + devServer: { + historyApiFallback: { + index: '/dist/index.html' + }, + port: 8081, + proxy: { + '/api': { + target: 'http://localhost:8080' + }, + '/auth': { + target: 'http://localhost:8080' + }, + '/wsapi': { + target: 'http://localhost:8080', + ws: true + }, + secure: false, + changeOrigin: true + } + }, + performance: { + hints: false + }, + plugins: [ + new ExtractTextPlugin('styles.css'), + new webpack.optimize.CommonsChunkPlugin({ name: 'vendor', filename: 'vendor.js' }), + new HtmlWebpackPlugin({ + hash: true, + template: 'src/index.html', + filename: 'index.html' + }) + ], + devtool: '#eval-source-map' +} + +if (process.env.NODE_ENV === 'production') { + module.exports.devtool = '#source-map' + // http://vue-loader.vuejs.org/en/workflow/production.html + module.exports.plugins = (module.exports.plugins || []).concat([ + new webpack.DefinePlugin({ + 'process.env': { + NODE_ENV: '"production"' + } + }), + new webpack.optimize.UglifyJsPlugin({ + sourceMap: true, + compress: { + warnings: false + } + }), + new webpack.LoaderOptionsPlugin({ + minimize: true + }) + ]) +} diff --git a/ktqueue/__init__.py b/ktqueue/__init__.py new file mode 100644 index 0000000..bca5f67 --- /dev/null +++ b/ktqueue/__init__.py @@ -0,0 +1 @@ +# encoding: utf-8 diff --git a/ktqueue/api/__init__.py b/ktqueue/api/__init__.py new file mode 100644 index 0000000..0b1a218 --- /dev/null +++ b/ktqueue/api/__init__.py @@ -0,0 +1,15 @@ +# encoding: utf-8 +from .job import JobsHandler +from .job import JobLogHandler +from .job import JobLogWSHandler +from .job import StopJobHandler +from .job import RestartJobHandler +from .job import TensorBoardHandler +from .job import JobLogVersionHandler +from .repo import ReposHandler +from .repo import RepoHandler +from .node import NodesHandler +from .tensorboard_proxy import TensorBoardProxyHandler +from .oauth import OAuth2Handler +from .user import CurrentUserHandler +from .auth import AuthRequestHandler diff --git a/ktqueue/api/auth.py b/ktqueue/api/auth.py new file mode 100644 index 0000000..c89f66c --- /dev/null +++ b/ktqueue/api/auth.py @@ -0,0 +1,16 @@ +# encoding: utf-8 +from .utils import BaseHandler + + +class AuthRequestHandler(BaseHandler): + """Support Nginx auth_request directive. + http://nginx.org/en/docs/http/ngx_http_auth_request_module.html + """ + def get(self): + if self.get_current_user(): + self.set_status(202) + else: + self.set_status(401) + + def head(self): + self.get() diff --git a/ktqueue/api/job.py b/ktqueue/api/job.py new file mode 100644 index 0000000..abe49b0 --- /dev/null +++ b/ktqueue/api/job.py @@ -0,0 +1,575 @@ +# encoding: utf-8 +import json +import os +import re +import logging +import bson +from collections import defaultdict + +import tornado.web +import tornado.websocket + +from ktqueue.cloner import Cloner +from .utils import convert_asyncio_task +from .utils import BaseHandler +from .utils import apiauthenticated +from ktqueue.utils import k8s_delete_job +from ktqueue.utils import KTQueueDefaultCredentialProvider +from ktqueue import settings + + +def generate_job(name, command, node, gpu_num, image, repo, branch, commit_id, + comments, mounts, load_nvidia_driver=None, cpu_limit=None, memory_limit=None, auto_restart=False): + """Generate a job description in JSON format.""" + + command_kube = 'cd $WORK_DIR && ' + command + + job_dir = os.path.join('/cephfs/ktqueue/jobs/', name) + if not os.path.exists(job_dir): + os.makedirs(job_dir) + + output_dir = os.path.join('/cephfs/ktqueue/output', name) + if not os.path.exists(output_dir): + os.makedirs(output_dir) + + volumeMounts = [] + volumes = [] + node_selector = {} + + # add custom volumeMounts + for volume in mounts: + volume_name = 'volume-{}'.format(volume['key']) + volumes.append({ + 'name': volume_name, + 'hostPath': {'path': volume['hostPath']} + }) + volumeMounts.append({ + 'name': volume_name, + 'mountPath': volume['mountPath'] + }) + + if node: + node_selector['kubernetes.io/hostname'] = node + + if gpu_num > 0 or load_nvidia_driver: + # cause kubernetes does not support NVML, use this trick to suit nvidia driver version + command_kube = 'version=$(ls /nvidia-drivers | tail -1); ln -s /nvidia-drivers/$version /usr/local/nvidia &&' + command_kube + volumes.append({ + 'name': 'nvidia-drivers', + 'hostPath': { + 'path': '/var/lib/nvidia-docker/volumes/nvidia_driver' + } + }) + volumeMounts.append({ + 'name': 'nvidia-drivers', + 'mountPath': '/nvidia-drivers', + }) + + # resources + resources = { + 'limits': { + # 'alpha.kubernetes.io/nvidia-gpu': gpu_num, + 'nvidia.com/gpu': gpu_num, + }, + } + + if cpu_limit: + resources['limits']['cpu'] = cpu_limit + + if memory_limit: + resources['limits']['memory'] = memory_limit + + # cephfs + volumes.append(settings.sfs_volume) + volumeMounts.append({ + 'name': 'cephfs', + 'mountPath': '/cephfs', + }) + + job = { + 'apiVersion': 'batch/v1', + 'kind': 'Job', + 'metadata': { + 'name': name, + }, + 'spec': { + 'parallelism': 1, + 'template': { + 'metadata': { + 'name': name, + }, + 'spec': { + 'containers': [ + { + 'name': 'ktqueue-job', + 'image': image, + # 'imagePullPolicy': 'IfNotPresent', + 'command': ['sh', '-c', command_kube], + 'resources': resources, + 'volumeMounts': volumeMounts, + 'env': [ + { + 'name': 'JOB_NAME', + 'value': name + }, + { + 'name': 'OUTPUT_DIR', + 'value': output_dir + }, + { + 'name': 'WORK_DIR', + 'value': os.path.join(job_dir, 'code') + }, + { + 'name': 'LC_ALL', + 'value': 'en_US.UTF-8' + }, + { + 'name': 'LC_CTYPE', + 'value': 'en_US.UTF-8' + }, + ] + } + ], + 'volumes': volumes, + 'restartPolicy': 'OnFailure' if auto_restart else 'Never', + 'nodeSelector': node_selector, + } + } + } + } + return job + + +async def clone_code(name, repo, branch, commit_id, jobs_collection, job_dir, crediential): + # clone code + if repo: + try: + cloner = Cloner(repo=repo, dst_directory=os.path.join(job_dir, 'code'), + branch=branch, commit_id=commit_id, crediential=crediential) + await cloner.clone_and_copy() + except Exception as e: + jobs_collection.update_one({'name': name}, {'$set': {'status': 'FetchError'}}) + raise + if not commit_id: + jobs_collection.update_one({'name': name}, {'$set': {'commit': cloner.commit_id}}) + else: + os.makedirs(os.path.join('/cephfs/ktqueue/jobs', name, 'code')) + + +class JobsHandler(BaseHandler): + + __job_name_pattern = re.compile(r'^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$') + + def initialize(self, k8s_client, mongo_client): + self.k8s_client = k8s_client + self.mongo_client = mongo_client + self.jobs_collection = mongo_client.ktqueue.jobs + + @convert_asyncio_task + @apiauthenticated + async def post(self): + """ + Create a new job. + e.x. request: + { + "name": "test-17", + "command": "echo 'aW1wb3J0IHRlbnNvcmZsb3cgYXMgdGYKaW1wb3J0IHRpbWUKc2Vzc2lvbiA9IHRmLlNlc3Npb24oKQpmb3IgaSBpbiByYW5nZSg2MDApOgogICAgdGltZS5zbGVlcCgxKQogICAgcHJpbnQoaSkK' | base64 -d | python3", + "gpuNum": 1, + "image": "comzyh/tf_image", + "repo": "https://github.com/comzyh/TF_Docker_Images.git", + "commit_id": "3701b94219fb06974f485cabf99ad88019afe618" + } + """ + user = self.get_current_user() + + body_arguments = json.loads(self.request.body.decode('utf-8')) + + name = body_arguments.get('name') + + if len(name) > 58: # kubernetes doesn't accept name longer than 64. 64 - 6 (pod name suffix) = 58 + self.set_status(400) + self.finish({"message": "job name too long(>58)."}) + return + + if not self.__job_name_pattern.match(name): + self.set_status(400) + self.finish( + {"message": "illegal task name, regex used for validation is [a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*"}) + return + + # job with same name is forbidden + if self.jobs_collection.find_one({'name': name}): + self.set_status(400) + self.finish(json.dumps({'message': 'Job {} already exists'.format(name)})) + return + + command = body_arguments.get('command') + node = body_arguments.get('node', None) + gpu_num = int(body_arguments.get('gpuNum')) + image = body_arguments.get('image') + repo = body_arguments.get('repo', None) + branch = body_arguments.get('branch', None) + commit_id = body_arguments.get('commit', None) + comments = body_arguments.get('comments', None) + mounts = body_arguments.get('volumeMounts', []) + cpu_limit = body_arguments.get('cpuLimit', None) + memory_limit = body_arguments.get('memoryLimit', None) + auto_restart = body_arguments.get('autoRestart', False) + + job_dir = os.path.join('/cephfs/ktqueue/jobs/', name) + + job = generate_job( + name=name, command=command, node=node, gpu_num=gpu_num, image=image, + repo=repo, branch=branch, commit_id=commit_id, comments=comments, + mounts=mounts, cpu_limit=cpu_limit, memory_limit=memory_limit, + auto_restart=auto_restart + ) + + self.jobs_collection.update_one({'name': name}, {'$set': { + 'name': name, + 'node': node, + 'user': user, + 'command': command, + 'gpuNum': gpu_num, + 'repo': repo, + 'branch': branch, + 'commit': commit_id, + 'comments': comments, + 'image': image, + 'status': 'fetching', + 'tensorboard': False, + 'hide': False, + 'volumeMounts': mounts, + 'cpuLimit': cpu_limit, + 'memoryLimit': memory_limit, + }}, upsert=True) + self.finish(json.dumps({'message': 'job {} successful created.'.format(name)})) + + # clone code + await clone_code( + name=name, repo=repo, branch=branch, commit_id=commit_id, + jobs_collection=self.jobs_collection, job_dir=job_dir, + crediential=KTQueueDefaultCredentialProvider(repo=repo, user=user, mongo_client=self.mongo_client)) + + ret = await self.k8s_client.call_api( + api='/apis/batch/v1/namespaces/{namespace}/jobs'.format(namespace=settings.job_namespace), + method='POST', + data=job + ) + try: + self.jobs_collection.update_one({'name': name}, {'$set': { + 'status': 'pending', + 'creationTimestamp': ret['metadata']['creationTimestamp'], + }}, upsert=True) + except Exception as e: + logging.info(ret) + logging.exception(e) + + async def get(self): + page = int(self.get_argument('page', 1)) + page_size = int(self.get_argument('pageSize', 20)) + hide = self.get_argument('hide', None) + fav = self.get_argument('fav', None) + status = self.get_argument('status', None) + tags = self.get_arguments('tag') + user = self.get_arguments('user[]') + node = self.get_arguments('node[]') + + query = {} + + # hide + if hide is None: # default is False + query['hide'] = False + elif hide != 'all': # 'all' means no filter + query['hide'] = False if hide == '0' else True + + # tags + if tags: + query['tags': {'$all': tags}] + + # fav + if fav: + query['fav'] = True if fav == '1' else False + + # status; Running etc. + if status: + if status == '$RunningExtra': + query.pop('hide', None) + query['status'] = {'$nin': ['Completed', 'ManualStop', 'FetchError']} + else: + query['status'] = status + # user + if user: + query['user'] = {'$in': user} + + # node + if node: + query['node'] = {'$in': node} + + if status == '$RunningExtra': + query = {'$or': [query, {'tensorboard': True}]} + + count = self.jobs_collection.count(query) + jobs = list(self.jobs_collection.find(query).sort("_id", -1).skip(page_size * (page - 1)).limit(page_size)) + for job in jobs: + job['_id'] = str(job['_id']) + self.finish(json.dumps({ + 'page': page, + 'total': count, + 'pageSize': page_size, + 'data': jobs, + })) + + @apiauthenticated + async def put(self): + """modify job. + only part of fields can be modified. + """ + body_arguments = json.loads(self.request.body.decode('utf-8')) + + allowedFields = ['hide', 'comments', 'tags', 'fav'] + job = self.jobs_collection.find_one({'_id': bson.ObjectId(body_arguments['_id'])}) + if job['status'] in ('ManualStop', 'Completed'): + allowedFields += ['node', 'gpuNum', 'image', 'command', 'volumeMounts', 'cpuLimit', 'memoryLimit'] + update_data = {k: v for k, v in body_arguments.items() if k in allowedFields} + self.jobs_collection.update_one({'_id': bson.ObjectId(body_arguments['_id'])}, {'$set': update_data}) + ret = self.jobs_collection.find_one({'_id': bson.ObjectId(body_arguments['_id'])}) + ret['_id'] = str(ret['_id']) + self.finish(ret) + + +class JobLogVersionHandler(tornado.web.RequestHandler): + + def initialize(self, k8s_client): + self.k8s_client = k8s_client + + @convert_asyncio_task + async def get(self, job): + from ktqueue.utils import get_log_versions + versions = get_log_versions(job) + pods = await self.k8s_client.call_api( + method='GET', + api='/api/v1/namespaces/{namespace}/pods'.format(namespace=settings.job_namespace), + params={'labelSelector': 'job-name={job}'.format(job=job)} + ) + if pods.get('items', None): + versions = ['current'] + versions + self.write({ + 'job': job, + 'versions': versions + }) + + +class JobLogHandler(BaseHandler): + + def initialize(self, k8s_client, mongo_client): + self.k8s_client = k8s_client + self.mongo_client = mongo_client + self.jobs_collection = mongo_client.ktqueue.jobs + self.closed = False + self.follow = False + + async def get_log_stream(self, job, version): + pods = await self.k8s_client.call_api( + method='GET', + api='/api/v1/namespaces/{namespace}/pods'.format(namespace=settings.job_namespace), + params={'labelSelector': 'job-name={job}'.format(job=job)} + ) + if pods.get('items', None): + params = {} + timeout = 60 + if self.follow: + params['follow'] = 'true' + tailLines = self.get_argument('tailLines', None) + if tailLines: + params['tailLines'] = tailLines + timeout = 0 # disable timeout checks + pod_name = pods['items'][0]['metadata']['name'] + resp = await self.k8s_client.call_api_raw( + method='GET', + api='/api/v1/namespaces/{namespace}/pods/{pod_name}/log'.format(namespace=settings.job_namespace, pod_name=pod_name), + params=params, timeout=timeout + ) + return resp + return None + + @convert_asyncio_task + async def get(self, job, version=None): + if version and version != 'current': + with open(os.path.join('/cephfs/ktqueue/logs', job, 'log.{version}.txt'.format(version=version)), 'rb') as f: + self.finish(f.read()) + return + self.follow = self.get_argument('follow', None) == 'true' + resp = await self.get_log_stream(job, version) + if resp and resp.status == 200: + try: + async for chunk in resp.content.iter_any(): + if self.closed: + break + self.write(chunk) + if self.follow: + self.flush() + finally: + resp.close() + + def on_connection_close(self): + self.closed = True + + +class JobLogWSHandler(tornado.websocket.WebSocketHandler, JobLogHandler): + + def initialize(self, *args, **kwargs): + JobLogHandler.initialize(self, *args, **kwargs) + + def check_origin(self, origin): + return True + + @convert_asyncio_task + async def open(self, job): + self.follow = True + resp = await self.get_log_stream(job, 'current') + if resp and resp.status == 200: + try: + async for chunk in resp.content.iter_any(): + if self.closed: + break + self.write_message(chunk) + except Exception as e: + raise + finally: + resp.close() + + def on_close(self): + self.closed = True + + def on_message(self): + pass + + +class StopJobHandler(BaseHandler): + + def initialize(self, k8s_client, mongo_client): + self.k8s_client = k8s_client + self.mongo_client = mongo_client + self.jobs_collection = mongo_client.ktqueue.jobs + + @convert_asyncio_task + @apiauthenticated + async def post(self, job): + await k8s_delete_job(self.k8s_client, job) + self.jobs_collection.update_one({'name': job}, {'$set': {'status': 'ManualStop'}}) + self.finish({'message': 'Job {} successful deleted.'.format(job)}) + +class RestartJobHandler(BaseHandler): + + def initialize(self, k8s_client, mongo_client): + self.k8s_client = k8s_client + self.mongo_client = mongo_client + self.jobs_collection = mongo_client.ktqueue.jobs + + @convert_asyncio_task + @apiauthenticated + async def post(self, job): + job_name = job + await k8s_delete_job(self.k8s_client, job_name) + job = defaultdict(lambda: None) + job.update(self.jobs_collection.find_one({'name': job_name})) + job_dir = os.path.join('/cephfs/ktqueue/jobs/', job['name']) + + job_description = generate_job( + name=job['name'], command=job['command'], node=job['node'], gpu_num=job['gpuNum'], image=job['image'], + repo=job['name'], branch=job['command'], commit_id=job['commit'], comments=job['comments'], + mounts=job['volumeMounts'], cpu_limit=job['cpuLimit'], memory_limit=job['memoryLimit'], + ) + + if job['status'] == 'FetchError': + self.jobs_collection.update_one({'name': job}, {'$set': {'status': 'fetching'}}) + + self.finish({'message': 'job {} successful restarted.'.format(job['name'])}) + + # Refetch + if job['status'] == 'FetchError': + await clone_code( + name=job['name'], repo=job['repo'], branch=job['branch'], commit_id=job['commit'], + jobs_collection=self.jobs_collection, job_dir=job_dir, + crediential=KTQueueDefaultCredentialProvider( + repo=job['repo'], user=self.get_current_user(), mongo_client=self.mongo_client + )) + + await self.k8s_client.call_api( + api='/apis/batch/v1/namespaces/{namespace}/jobs'.format(namespace=settings.job_namespace), + method='POST', + data=job_description + ) + + +class TensorBoardHandler(BaseHandler): + + def initialize(self, k8s_client, mongo_client): + self.k8s_client = k8s_client + self.mongo_client = mongo_client + self.jobs_collection = mongo_client.ktqueue.jobs + + @convert_asyncio_task + @apiauthenticated + async def post(self, job): + body_arguments = json.loads(self.request.body.decode('utf-8')) + logdir = body_arguments.get('logdir', '/cephfs/ktqueue/logs/{job}/train'.format(job=job)) + command = 'tensorboard --logdir {logdir} --host 0.0.0.0'.format(logdir=logdir) + + job_record = defaultdict(lambda: None) + job_record.update(self.jobs_collection.find_one({'name': job})) + job_description = generate_job( + name=job_record['name'], command=command, node=job_record['node'], gpu_num=0, image=job_record['image'], + repo=None, branch=None, commit_id=None, comments=None, mounts=job_record['volumeMounts'], + cpu_limit=job_record['cpuLimit'], memory_limit=job_record['memoryLimit'], + load_nvidia_driver=True, + ) + pod_spec = dict(job_description['spec']['template']['spec']) + pod_spec['containers'][0]['name'] = 'ktqueue-tensorboard' + print(pod_spec) + + pod = { + 'apiVersion': 'v1', + 'kind': 'Pod', + 'metadata': { + 'name': '{job}-tensorboard'.format(job=job), + 'labels': { + 'ktqueue-tensorboard-job-name': job + } + }, + 'spec': pod_spec + } + + ret = await self.k8s_client.call_api( + api='/api/v1/namespaces/{namespace}/pods'.format(namespace=settings.job_namespace), + method='POST', + data=pod + ) + if 'metadata' in ret and 'creationTimestamp' in ret['metadata']: + self.jobs_collection.update_one({'name': job}, {'$set': {'tensorboard': True}}) + else: + self.set_status(500) + + self.write(ret) + + @convert_asyncio_task + @apiauthenticated + async def delete(self, job): + pods = await self.k8s_client.call_api( + method='GET', + api='/api/v1/namespaces/{namespace}/pods'.format(namespace=settings.job_namespace), + params={'labelSelector': 'ktqueue-tensorboard-job-name={job}'.format(job=job)} + ) + if pods.get('items', None): + pod_name = pods['items'][0]['metadata']['name'] + ret = await self.k8s_client.call_api( + api='/api/v1/namespaces/{namespace}/pods/{name}'.format(namespace=settings.job_namespace, name=pod_name), + method='DELETE', + ) + self.write(ret) + else: + self.set_status(404) + self.write({'message': 'tensorboard pod not found.'}) + self.jobs_collection.update_one({'name': job}, {'$set': {'tensorboard': False}}) diff --git a/ktqueue/api/node.py b/ktqueue/api/node.py new file mode 100644 index 0000000..8e35212 --- /dev/null +++ b/ktqueue/api/node.py @@ -0,0 +1,37 @@ +# encoding: utf-8 +from collections import defaultdict + +import tornado.web + +from .utils import convert_asyncio_task + +node_used_gpus = defaultdict(lambda: dict()) + + +class NodesHandler(tornado.web.RequestHandler): + + def initialize(self, k8s_client, mongo_client): + self.k8s_client = k8s_client + self.mongo_client = mongo_client + + @convert_asyncio_task + async def get(self): + def used_gpus(node): + result = 0 + for job, num in node_used_gpus[node].items(): + result += num + return result + + ret = await self.k8s_client.call_api( + api='/api/v1/nodes', + method='GET', + ) + + self.write({'items': [{ + 'name': node['metadata']['name'], + 'labels': node['metadata']['labels'], + 'gpu_used': used_gpus(node['metadata']['name']), + 'jobs': node_used_gpus[node['metadata']['name']], + 'gpu_capacity': node['status']['capacity'].get('nvidia.com/gpu', 0), + } for node in ret['items'] + ]}) diff --git a/ktqueue/api/oauth.py b/ktqueue/api/oauth.py new file mode 100644 index 0000000..bf6a8d9 --- /dev/null +++ b/ktqueue/api/oauth.py @@ -0,0 +1,74 @@ +# encoding: utf-8 +import urllib +import json +import logging +import tornado.web +import tornado.auth +import tornado.httpclient + +import ktqueue.settings + + +class GithubOAuth2StartHandler(tornado.web.RequestHandler, tornado.auth.OAuth2Mixin): + _OAUTH_AUTHORIZE_URL = 'https://github.com/login/oauth/authorize' + _OAUTH_ACCESS_TOKEN_URL = 'https://github.com/login/oauth/access_token' + + def initialize(self, mongo_client): + self.mongo_client = mongo_client + + async def get(self): + code = self.get_argument('code', None) + if code: + # get token + client = self.get_auth_http_client() + request = tornado.httpclient.HTTPRequest( + url=self._OAUTH_ACCESS_TOKEN_URL, + method='POST', + body=urllib.parse.urlencode(dict( + client_id=ktqueue.settings.oauth2_clinet_id, + client_secret=ktqueue.settings.oauth2_client_secret, + code=code)), + headers=dict(Accept="application/json") + ) + resp = await client.fetch(request) + resp = json.loads(resp.body.decode('utf-8')) + access_token = resp.get('access_token', None) + if not access_token: + raise Exception('Can not get access_token') + request = tornado.httpclient.HTTPRequest( + url='https://api.github.com/user', + headers={ + 'Authorization': 'token {token}'.format(token=access_token), + 'Accept': "application/json", + 'User-Agent': 'KTQueue API Client' + }) + resp = await client.fetch(request, raise_error=False) + if resp.code != 200: + logging.warn(resp.body) + raise Exception('Failed to get user info') + resp = json.loads(resp.body.decode('utf-8')) + data = { + 'provider': 'github', + 'id': resp['login'], + 'access_token': access_token, + 'data': resp + } + self.mongo_client.ktqueue.oauth.update_one( + {'provider': 'github', 'id': resp['login']}, + {'$set': data}, + upsert=True + ) + self.set_secure_cookie('user', resp['login']) + self.redirect('/') + else: + await self.authorize_redirect( + redirect_uri=ktqueue.settings.oauth2_callback, + client_id=ktqueue.settings.oauth2_clinet_id, + client_secret=ktqueue.settings.oauth2_client_secret, + scope=['repo'], # reuquest access to private repository + ) + + return + + +OAuth2Handler = GithubOAuth2StartHandler diff --git a/ktqueue/api/repo.py b/ktqueue/api/repo.py new file mode 100644 index 0000000..c8299f1 --- /dev/null +++ b/ktqueue/api/repo.py @@ -0,0 +1,86 @@ +# encoding: utf-8 + +import json +import re + +import bson +import tornado.web + +from .utils import BaseHandler +from .utils import apiauthenticated + + +class ReposHandler(BaseHandler): + + __https_pattern = re.compile(r'https:\/\/(\w+@\w+)?[\w.\/\-+]*.git') + __ssh_pattern = re.compile(r'\w+@[\w.]+:[\w-]+\/[\w\-+]+\.git') + + def initialize(self, mongo_client): + self.mongo_client = mongo_client + self.repos_collection = self.mongo_client.ktqueue.repos + + @apiauthenticated + async def post(self): + """create a credential for repo, support both ssh & https. + ssh e.x.: + { + "repo": "git@github.com:naturali/tensorflow.git", + "ssh_key": "" + } + https e.x.: + { + "repo": "git@github.com:naturali/tensorflow.git", + "username": "", + "password": "" + } + use deployment key with ssh is recommanded + """ + body = json.loads(self.request.body.decode('utf-8')) + repo = body['repo'].strip() + if self.__ssh_pattern.match(repo): + if body.get('ssh_key', None) is None: + self.set_status(400) + self.finish(json.dumps({'message': 'ssh_key must be provided.'})) + return + elif self.__https_pattern.match(repo): + if body.get('username', None) is None or body.get('password', None) is None: + self.set_status(400) + self.finish(json.dumps({'message': 'username and password must be provided.'})) + return + else: + self.set_status(400) + self.finish(json.dumps({'message': 'illigal repo'})) + return + + self.repos_collection.update_one({'repo': repo}, {'$set': body}, upsert=True) + self.finish(json.dumps({'message': 'repo {} successful added.'.format(repo)})) + + async def get(self): + page = int(self.get_argument('page', 1)) + page_size = int(self.get_argument('pageSize', 20)) + count = self.repos_collection.count() + repos = [] + for repo in self.repos_collection.find().sort("_id", -1).skip(page_size * (page - 1)).limit(page_size): + repos.append({ + '_id': str(repo['_id']), + 'repo': repo['repo'] + }) + self.finish(json.dumps({ + 'page': page, + 'total': count, + 'pageSize': page_size, + 'data': repos, + })) + + +class RepoHandler(BaseHandler): + + def initialize(self, mongo_client): + self.mongo_client = mongo_client + self.repos_collection = self.mongo_client.ktqueue.repos + + @apiauthenticated + async def delete(self, id): + print(id) + self.repos_collection.delete_one({'_id': bson.ObjectId(id)}) + self.finish({'message': 'repos successful added.'}) diff --git a/ktqueue/api/tensorboard_proxy.py b/ktqueue/api/tensorboard_proxy.py new file mode 100644 index 0000000..6ccc2bd --- /dev/null +++ b/ktqueue/api/tensorboard_proxy.py @@ -0,0 +1,69 @@ +# encoding: utf- +import tornado.web +import tornado.httpclient +import tornado.httputil +import urllib.parse +import re + +from tornado.httpclient import HTTPRequest + +job_tensorboard_map = {} + + +class TensorBoardProxyHandler(tornado.web.RequestHandler): + __job_pattern = re.compile(r'/tensorboard/(?P[\w_\-\.]+)/(.*?)') + + def initialize(self, client): + self.client = client + + async def get(self, **kwargs): + if 'job' not in kwargs: + referer_path = urllib.parse.urlparse(self.request.headers['Referer']).path + job = self.__job_pattern.match(referer_path).group('job') + url = 'data/' + kwargs['url'] + else: + job = kwargs['job'] + url = kwargs['url'] + + host = job_tensorboard_map.get(job, None) + if not host: + self.set_status(404) + self.finish({'message': 'tensorboard not found.'}) + return + + parsed_url = urllib.parse.urlparse(self.request.uri) + if parsed_url.query: + url += '?' + parsed_url.query + + body = self.request.body + if not body: + body = None + + request = HTTPRequest( + url='http://{host}:6006/{url}'.format(host=host, url=url), + method=self.request.method, body=body, + headers=self.request.headers, follow_redirects=False, + allow_nonstandard_methods=True, + request_timeout=180 if 'job' not in kwargs else 4, + ) + response = await self.client.fetch(request) + + if response.error and not isinstance(response.error, tornado.httpclient.HTTPError): + self.set_status(500) + self.finish('Internal server error:\n' + str(response.error)) + return + + self.set_status(response.code) + self._headers = tornado.httputil.HTTPHeaders() + for header, v in response.headers.get_all(): + if header not in ('Content-Length', 'Transfer-Encoding', 'Content-Encoding', 'Connection'): + self.add_header(header, v) + + if response.body: + self.set_header('Content-Length', len(response.body)) + self.write(response.body) + + self.finish() + + async def post(self, **kwargs): + await self.get(**kwargs) diff --git a/ktqueue/api/user.py b/ktqueue/api/user.py new file mode 100644 index 0000000..5e0321e --- /dev/null +++ b/ktqueue/api/user.py @@ -0,0 +1,19 @@ +# encoding: utf-8 +from .utils import BaseHandler +from .utils import apiauthenticated +import ktqueue.settings + + +class CurrentUserHandler(BaseHandler): + @apiauthenticated + def get(self): + self.finish({ + 'user': self.get_current_user(), + 'auth_required': ktqueue.settings.auth_required + }) + + def delete(self): + self.clear_cookie('user') + self.finish({ + 'msg': 'ok' + }) diff --git a/ktqueue/api/utils.py b/ktqueue/api/utils.py new file mode 100644 index 0000000..0167257 --- /dev/null +++ b/ktqueue/api/utils.py @@ -0,0 +1,36 @@ +# encoding: utf-8 +import asyncio +import functools +import tornado.web + +import ktqueue.settings + + +def convert_asyncio_task(method): + """https://github.com/KeepSafe/aiohttp/issues/1176""" + @functools.wraps(method) + async def wrapper(self, *args, **kwargs): + coro = method(self, *args, **kwargs) + return await asyncio.get_event_loop().create_task(coro) + return wrapper + + +class BaseHandler(tornado.web.RequestHandler): + def get_current_user(self): + user = self.get_secure_cookie("user") + if user: + return user.decode('utf-8') + if not ktqueue.settings.auth_required: + return 'anonymous' + return None + + +def apiauthenticated(method): + """ Return 401 if user is not authenticated + """ + @functools.wraps(method) + def wrapper(self, *args, **kwargs): + if not self.current_user: + raise tornado.web.HTTPError(401) + return method(self, *args, **kwargs) + return wrapper diff --git a/ktqueue/cloner.py b/ktqueue/cloner.py new file mode 100644 index 0000000..2b7874e --- /dev/null +++ b/ktqueue/cloner.py @@ -0,0 +1,206 @@ +# encoding: utf-8 + +import os +import hashlib +import re +import asyncio +import logging +import urllib.parse + + +class GitCredentialProvider: + __https_pattern = re.compile(r'https:\/\/(\w+@\w+)?[\w.\/\-+]*.git') + __ssh_pattern = re.compile(r'\w+@[\w.]+:[\w-]+\/[\w\-+]+\.git') + + @classmethod + def get_repo_type(cls, repo): + if cls.__ssh_pattern.match(repo): + return 'ssh' + elif cls.__https_pattern.match(repo): + return'https' + return None + + def __init__(self, ssh_key=None, https_username=None, https_password=None): + pass + + @property + def ssh_key(self): + raise NotImplementedError + + @property + def https_username(self): + raise NotImplementedError + + @property + def https_password(self): + raise NotImplementedError + + +class Cloner: + """Do the git clone stuff in another thread""" + + __ref_pattern = re.compile(r'(?P\w+)\s(?P[\w/\-]+)') + + def __init__(self, repo, dst_directory, branch=None, commit_id=None, + crediential=None): + """Init + crediential: a credientialProvider instance + """ + self.repo = repo.strip() + self.dst_directory = dst_directory + self.branch = branch or 'master' + self.commit_id = commit_id + self.crediential = crediential + + self.ssh_key_path = None + self.repo_path = None + self.repo_url = None + self.repo_type = GitCredentialProvider.get_repo_type(self.repo) + + if not self.repo_type: + raise Exception('wrong repo type') + + if self.repo_type == 'ssh' and self.crediential.ssh_key is None: + raise Exception('ssh credential for {repo} must be provided.'.format(repo=repo)) + + self.repo_hash = hashlib.sha1(self.repo.encode('utf-8')).hexdigest() + + async def prepare_ssh_key(self, repo): + ssh_key_dir = os.path.join('/tmp/ktqueue/ssh_keys', self.repo_hash) + if not os.path.exists(ssh_key_dir): + os.makedirs(ssh_key_dir) + self.ssh_key_path = os.path.join(ssh_key_dir, 'id') + if not os.path.exists(self.ssh_key_path): + with open(self.ssh_key_path, 'w') as f: + f.write(self.crediential.ssh_key) + os.chmod(self.ssh_key_path, 0o600) # prevent WARNING: UNPROTECTED PRIVATE KEY FILE! + await asyncio.sleep(1.0) # os.chmod may have strange behavior, that ssh still permission 644 after too short time + + @classmethod + async def git_with_ssh_key(cls, ssh_key_path, args, cwd=None): + env = {**os.environ, + 'GIT_SSH_COMMAND': 'ssh -oStrictHostKeyChecking=no -i {ssh_key_path}'.format(ssh_key_path=ssh_key_path) + } + proc = await asyncio.create_subprocess_exec( + *['git'] + args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, env=env, cwd=cwd) + lines = [] + async for line in proc.stdout: + logging.debug(line) + lines.append(line.decode()) + recode = await proc.wait() + return recode, lines + + @classmethod + def add_credential_to_https_url(cls, url, username, password): + parsed = urllib.parse.urlparse(url) + if username is not None: + host_and_port = parsed.hostname + if parsed.port: + host_and_port += ':' + parsed.port + if password: + parsed = parsed._replace(netloc='{}:{}@{}'.format(username, password, host_and_port)) + else: + parsed = parsed._replace(netloc='{}@{}'.format(username, host_and_port)) + return parsed.geturl() + + @classmethod + async def git_with_https(cls, args, cwd=None): + proc = await asyncio.create_subprocess_exec( + *['git'] + args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, cwd=cwd) + lines = [] + async for line in proc.stdout: + logging.debug(line) + lines.append(line.decode()) + + recode = await proc.wait() + return recode, lines + + async def get_heads(self): + proc = await asyncio.create_subprocess_exec(*['git', 'show-ref'], stdout=asyncio.subprocess.PIPE, cwd=self.repo_path) + heads = {} + async for line in proc.stdout: + logging.debug(line) + group = self.__ref_pattern.search(line.decode()) + if group: + heads[group.group('ref')] = group.group('hash') + return heads + + async def clone(self): + if self.repo_type == 'ssh': + retcode, retlines = await self.git_with_ssh_key( + ssh_key_path=self.ssh_key_path, + cwd='/cephfs/ktqueue/repos', + args=['clone', self.repo, '--recursive', self.repo_hash], + + ) + else: + retcode, retlines = await self.git_with_https( + cwd='/cephfs/ktqueue/repos', + args=['clone', self.repo_url, '--recursive', self.repo_hash], + ) + if retcode != 0: + logging.error('\n'.join(retlines)) + raise Exception('clone repo failed with retcode {}.'.format(retcode)) + + async def fetch(self): + if self.repo_type == 'ssh': + retcode, retlines = await self.git_with_ssh_key( + ssh_key_path=self.ssh_key_path, + cwd=self.repo_path, + args=['fetch'], + ) + else: + retcode, retlines = await self.git_with_https( + cwd=self.repo_path, + args=['fetch', self.repo_url, '+refs/heads/*:refs/remotes/origin/*'], + ) + if retcode != 0: + logging.error('\n'.join(retlines)) + logging.error('fetch repo failed with retcode {}.'.format(retcode)) + + async def clone_and_copy(self, archive_file=None, keep_archive=False): + if not os.path.exists('/cephfs/ktqueue/repos'): + os.makedirs('/cephfs/ktqueue/repos') + self.repo_path = os.path.join('/cephfs/ktqueue/repos', self.repo_hash) + + if self.repo_type == 'ssh': + await self.prepare_ssh_key(self.repo) + elif self.repo_type == 'https': + self.repo_url = self.repo + if self.crediential.https_username: + self.repo_url = self.add_credential_to_https_url( + self.repo, username=self.crediential.https_username, password=self.crediential.https_password) + + if not os.path.exists(self.repo_path): # Then clone it + await self.clone() + else: + await self.fetch() + + # get commit_id + if not self.commit_id and self.branch: + heads = await self.get_heads() + self.commit_id = heads.get('refs/remotes/origin/{branch}'.format(branch=self.branch), None) + if not self.commit_id: + raise Exception('Branch {branch} not found for {repo}.'.format(branch=self.branch, repo=self.repo)) + + if not os.path.exists('/cephfs/ktqueue/repo_archive'): + os.makedirs('/cephfs/ktqueue/repo_archive') + if archive_file is None: + archive_file = '/cephfs/ktqueue/repo_archive/{}.tar.gz'.format(self.commit_id) + + # Arcive commit_id + logging.info('Arciving {}:{}'.format(self.repo, self.commit_id)) + with open(archive_file, 'wb') as f: + proc = await asyncio.create_subprocess_exec(*['git', 'archive', self.commit_id, '--format', 'tar.gz'], + stdout=f, cwd=self.repo_path) + retcode = await proc.wait() + if retcode != 0: + logging.error('Arcive repo failed with retcode {}'.format(retcode)) + + if not os.path.exists(self.dst_directory): + os.makedirs(self.dst_directory) + proc = await asyncio.create_subprocess_exec(*['tar', 'xzf', archive_file], cwd=self.dst_directory) + retcode = await proc.wait() + + if not keep_archive: + os.remove(archive_file) diff --git a/ktqueue/event_watcher.py b/ktqueue/event_watcher.py new file mode 100644 index 0000000..6103d2b --- /dev/null +++ b/ktqueue/event_watcher.py @@ -0,0 +1,146 @@ +# encoding: utf-8 +import logging +import asyncio +import json + +import pymongo + +from ktqueue.utils import save_job_log +from ktqueue.utils import k8s_delete_job +from ktqueue import settings + + +class EventWatcher: + + def __init__(self, k8s_client=None): + assert k8s_client is not None + self.k8s_client = k8s_client + self.running = True + + async def poll(self, api, method='GET', callback=None, **kwargs): + """ + This function will never return, await the future carefully. + """ + assert callback is not None + timeout = kwargs.pop('timeout', None) + while self.running: + try: + session = self.k8s_client.new_connector_session() + resp = await self.k8s_client.call_api_raw( + api=api, method=method, timeout=timeout, session=session, **kwargs) + async for line in resp.content: + try: + await callback(json.loads(line.decode('utf-8'))) + except Exception as e: + logging.exception(e) + logging.exception('Event is:') + logging.exception(line.decode('utf-8')) + else: + pass + except Exception as e: + logging.exception(e) + await asyncio.sleep(1) + finally: + session.close() + + +async def watch_pod(k8s_client): + from .api.tensorboard_proxy import job_tensorboard_map + from .api.node import node_used_gpus + + mongo_client = pymongo.MongoClient(settings.mongodb_server) + jobs_collection = mongo_client.ktqueue.jobs + + async def callback(event): + labels = event['object']['metadata']['labels'] + + # TensorBoard Pod + if 'ktqueue-tensorboard-job-name' in labels: + job_name = labels['ktqueue-tensorboard-job-name'] + hostIP = event['object']['status'].get('podIP', None) + if event['type'] == 'DELETED': + job_tensorboard_map.pop(job_name, None) + elif hostIP: + job_tensorboard_map[job_name] = hostIP + return + + # Ignore MODIFY event by add 'ktqueue-watching' label + if labels.get('ktqueue-watching', None) == 'false': + return + + if 'job-name' not in labels: + return + job_name = labels['job-name'] + + job_exist = jobs_collection.find_one({'name': job_name}) + if not job_exist: + return + + state = {} + job_update = {} + status = (None, None) + if 'containerStatuses' in event['object']['status']: + state = event['object']['status']['containerStatuses'][0]['state'] + for k, v in state.items(): + status = (k, v.get('reason', None)) + status_str = '{}: {}'.format(*status) + continue + job_update['state'] = state + elif event['object']['status']['phase'] == 'Pending': + jobs_collection.update_one({'name': job_name}, {'$set': {'status': 'Pending'}}) + return + else: + status_str = '{}: {}'.format(event['object']['status']['phase'], event['object']['status']['reason']) + + pod_name = event['object']['metadata']['name'] + + # update Running Node & used GPU + if status[0] == 'terminated': + node_used_gpus[event['object']['spec']['nodeName']].pop(pod_name, None) + elif status[0] == 'waiting': # waiting doesn't use GPU + pass + elif event['object']['spec'].get('nodeName', None): + job_update['runningNode'] = event['object']['spec']['nodeName'] + node_used_gpus[event['object']['spec']['nodeName']][pod_name] = int(job_exist['gpuNum']) + + # Job is being terminated should not affect job status + if labels.get('ktqueue-terminating', None) == 'true': + return + + logging.info('Job {} enter state {}'.format(job_name, status_str)) + + # update status + if status == ('terminated', 'Completed'): + job_update['status'] = 'Completed' + elif status == ('running', None): + job_update['status'] = 'Running' + else: + job_update['status'] = status_str + + jobs_collection.update_one({'name': job_name}, {'$set': job_update}) + + # When a job is successful finished, save log and do not watch it any more + if status[0] == 'terminated': + # save log first + await save_job_log(job_name=job_name, pod_name=pod_name, k8s_client=k8s_client) + + # set label 'ktqueue-watching' to 'false' + # refer https://tools.ietf.org/html/rfc6902#appendix-A.1 to know more about json-patch + if status == ('terminated', 'Completed'): + await k8s_client.call_api( + api='/api/v1/namespaces/{namespace}/pods/{name}'.format(namespace=settings.job_namespace, name=pod_name), + method='PATCH', + headers={'Content-Type': 'application/json-patch+json'}, + data=[{"op": "add", "path": "/metadata/labels/ktqueue-watching", "value": "false"}] + ) + await k8s_delete_job(k8s_client=k8s_client, job=job_name, pod_name=pod_name, save_log=False) + + event_watcher = EventWatcher(k8s_client=k8s_client) + + await event_watcher.poll( + api='/api/v1/watch/namespaces/{namespace}/pods'.format(namespace=settings.job_namespace), + method='GET', + callback=callback, + params={'labelSelector': 'ktqueue-watching!=false'}, + timeout=0, + ) diff --git a/ktqueue/kubernetes_client.py b/ktqueue/kubernetes_client.py new file mode 100644 index 0000000..3b97537 --- /dev/null +++ b/ktqueue/kubernetes_client.py @@ -0,0 +1,68 @@ +# encoding: utf-8 +import os +import aiohttp +import kubernetes +import json +import ssl + + +class kubernetes_client: + + def __init__(self, config=None): + if config is None: + config = self.get_service_account_config() + self.token = config.get('token', None) + self.host = config['host'] + self.token = config.get('token', None) + self.port = config.get('port', None) + self.schema = config['schema'] + self.api_preifx = "{schema}://{host}:{port}".format(schema=self.schema, host=self.host, port=self.port) + self.ca_crt = None + self.session = self.new_connector_session() + + def new_connector_session(self): + """ + 1 TCPConnector can handle 20 connections by default. + """ + if self.schema == 'https': + self.ca_crt = kubernetes.config.incluster_config.SERVICE_CERT_FILENAME + sslcontext = ssl.create_default_context(cafile=self.ca_crt) + conn = aiohttp.TCPConnector(ssl_context=sslcontext) + else: + conn = aiohttp.TCPConnector() + return aiohttp.ClientSession(connector=conn) + + @classmethod + def get_service_account_config(cls): + config = { + 'host': os.environ['KUBERNETES_SERVICE_HOST'], + 'port': os.environ['KUBERNETES_SERVICE_PORT'], + } + config['token'] = os.environ.get('KUBERNETES_API_ACCOUNT_TOKEN', None) # Not official, for debug usage + config['schema'] = os.environ.get('KUBERNETES_API_SCHEMA', 'https') # Not official, for debug usage + if config['token'] is None: + with open(kubernetes.config.incluster_config.SERVICE_TOKEN_FILENAME, 'r') as f: + config['token'] = f.read() + return config + + async def call_api(self, api, method='GET', **kwargs): + resp = await self.call_api_raw(api=api, method=method, **kwargs) + text = await resp.text() + try: + result = json.loads(text) + except Exception as e: + print(api, method, kwargs) + print(text) + raise + else: + return result + + async def call_api_raw(self, api, method='GET', **kwargs): + session = kwargs.pop('session', self.session) + url = self.api_preifx + api + headers = kwargs.pop('headers', {}) + headers['Authorization'] = 'Bearer {token}'.format(token=self.token) + headers['Content-Type'] = headers.get('Content-Type', 'application/json') + if 'data' in kwargs and (isinstance(kwargs['data'], dict) or isinstance(kwargs['data'], list)): + kwargs['data'] = json.dumps(kwargs['data']) + return await session.request(method, url, headers=headers, **kwargs) diff --git a/ktqueue/settings.py b/ktqueue/settings.py new file mode 100644 index 0000000..a966f6d --- /dev/null +++ b/ktqueue/settings.py @@ -0,0 +1,37 @@ +import os + +mongodb_server = os.environ.get('KTQ_MONGODB_SERVER', 'ktqueue-mongodb') +job_namespace = os.environ.get('KTQ_JOB_NAMESPACE', 'ktqueue') +auth_required = True if os.environ.get('KTQ_AUTH_REQUIRED', '0') == '1' else False +cookie_secret = os.environ.get('KTQ_COOKIE_SECRET', '') +oauth2_provider = os.environ.get('KTQ_OAUTH2_PROVIDER', 'github') +oauth2_clinet_id = os.environ.get('KTQ_OAUTH2_CLIENT_ID', '') +oauth2_client_secret = os.environ.get('KTQ_OAUTH2_CLIENT_SECRET', '') +oauth2_callback = os.environ.get('KTQ_OAUTH2_CALLBACK', None) +sfs_type = os.environ.get('KTQ_SHAREFS_TYPE', 'hostPath') +sfs_volume = {} +if sfs_type == 'hostPath': + sfs_volume = { + 'name': 'cephfs', + 'hostPath': { + 'path': os.environ.get('KTQ_SHAREFS_HOSTPATH', '/mnt/cephfs') + } + } +elif sfs_type == 'azure_file': + sfs_volume = { + 'name': 'cephfs', + 'azureFile': { + 'secretName': os.environ.get('KTQ_SHAREFS_AZURE_FILE_SECREAT_NAME'), + 'shareName': os.environ.get('KTQ_SHAREFS_AZURE_FILE_SHARE_NAME'), + 'readOnly': False, + } + } +elif sfs_type == 'nfs': + sfs_volume = { + 'name': 'cephfs', + 'nfs': { + 'server': os.environ.get('KTQ_SHAREFS_NFS_SERVER'), + 'path': os.environ.get('KTQ_SHAREFS_NFS_PATH', '/'), + 'readOnly': False, + } + } diff --git a/ktqueue/utils.py b/ktqueue/utils.py new file mode 100644 index 0000000..7f7bd85 --- /dev/null +++ b/ktqueue/utils.py @@ -0,0 +1,146 @@ +# encoding: utf-8 +import os +import re +import logging + +from ktqueue import settings +from .cloner import GitCredentialProvider + + +def get_log_versions(job_name): + log_dir = os.path.join('/cephfs/ktqueue/logs', job_name) + if not os.path.exists(log_dir): + os.makedirs(log_dir) + versions = [] + for filename in os.listdir(log_dir): + group = re.match(r'log\.(?P\d+)\.txt', filename) + if group: + versions.append(int(group.group('id'))) + return sorted(versions) + + +async def save_job_log(job_name, pod_name, k8s_client): + log_dir = os.path.join('/cephfs/ktqueue/logs', job_name) + if not os.path.exists(log_dir): + os.makedirs(log_dir) + resp = await k8s_client.call_api_raw( + method='GET', + api='/api/v1/namespaces/{namespace}/pods/{pod_name}/log'.format(namespace=settings.job_namespace, pod_name=pod_name) + ) + logging.info('save log for {}, resp.status = {}'.format(job_name, resp.status)) + if resp.status > 300: + resp.close() + return + + max_version = 0 + for version in get_log_versions(job_name=job_name): + max_version = max(max_version, int(version)) + log_path = os.path.join(log_dir, 'log.{}.txt'.format(max_version + 1)) + + with open(log_path, 'wb') as f: + async for chunk in resp.content.iter_any(): + f.write(chunk) + resp.close() + + +async def k8s_delete_job(k8s_client, job, pod_name=None, save_log=True): + await k8s_client.call_api( + method='DELETE', + params={'gracePeriodSeconds': 0}, + api='/apis/batch/v1/namespaces/{namespace}/jobs/{name}'.format(namespace=settings.job_namespace, name=job) + ) + + await k8s_client.call_api( + api='/api/v1/namespaces/{namespace}/pods'.format(namespace=settings.job_namespace), + method='PATCH', + params={'labelSelector': 'job-name={job}'.format(job=job)}, + headers={'Content-Type': 'application/json-patch+json'}, + data=[{"op": "add", "path": "/metadata/labels/ktqueue-terminating", "value": "true"}] + ) + + if pod_name is None: + pods = await k8s_client.call_api( + method='GET', + api='/api/v1/namespaces/{namespace}/pods'.format(namespace=settings.job_namespace), + params={'labelSelector': 'job-name={job}'.format(job=job)} + ) + if save_log and pods.get('items', None): + for pod in pods["items"]: + name = pod['metadata']['name'] + await save_job_log(job_name=job, pod_name=name, k8s_client=k8s_client) + await k8s_client.call_api( + method='DELETE', + params={ + 'labelSelector': 'job-name={job}'.format(job=job), + 'gracePeriodSeconds': 0, + }, + api='/api/v1/namespaces/{namespace}/pods'.format(namespace=settings.job_namespace) + ) + else: + if save_log: + await save_job_log(job_name=job, pod_name=pod_name, k8s_client=k8s_client) + await k8s_client.call_api( + method='DELETE', + params={'gracePeriodSeconds': 0}, + api='/api/v1/namespaces/{namespace}/pods/{name}'.format(namespace=settings.job_namespace, name=pod_name) + ) + +class KTQueueDefaultCredentialProvider(GitCredentialProvider): + """Give the authorization method for a (user, repo) combination + + default is use Github Oauth2, + if repo authorization type is provided, use the spcific metho + """ + allowed_method = ['none', 'github_oauth', 'ssh_key', 'https_password'] + + def __init__(self, repo, user, mongo_client): + self.repo = repo + self.user = user + self.mongo_client = mongo_client + + self.repos_collection = self.mongo_client.ktqueue.repos + self.oauth_collection = self.mongo_client.ktqueue.oauth + + self._auth_type = 'none' + if settings.auth_required: + self._auth_type = 'github_oauth' + self._ssh_key = None + self._https_username = None + self._https_password = None + self.repo_type = None + + def prepare_credential(self): + self.repo_type = self.get_repo_type(self.repo) + repo = self.repos_collection.find_one({'repo': self.repo}) + if repo: + self._auth_type = repo['authType'] + + if self._auth_type == 'none': # username = password = None + pass + elif self._auth_type == 'github_oauth': + crediential = self.oauth_collection.find_one({'provider': 'github', 'id': self.user}) + if crediential: + self._https_username = crediential['access_token'] + elif self._auth_type == 'ssh_key': + self._ssh_key = repo['crediential']['sshKey'] + elif self._auth_type == 'https_password': + self._https_username = repo['crediential']['username'] + self._https_password = repo['crediential']['password'] + + @property + def ssh_key(self): + if not self._ssh_key: + self.prepare_credential() + return self._ssh_key + + @property + def https_username(self): + if not self._https_username: + self.prepare_credential() + return self._https_username + + @property + def https_password(self): + if not self._https_password: + self.prepare_credential() + return self._https_password diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..df33247 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +tornado >= 4.5.1 +kubernetes +aiohttp +pymongo diff --git a/server.py b/server.py new file mode 100644 index 0000000..9b07147 --- /dev/null +++ b/server.py @@ -0,0 +1,122 @@ +# encoding: utf-8 +import asyncio +import os +import sys +import logging + +import pymongo +import tornado +import tornado.web +import tornado.autoreload + +from tornado.simple_httpclient import SimpleAsyncHTTPClient +from tornado.platform.asyncio import AsyncIOMainLoop + +import ktqueue.settings +from ktqueue.kubernetes_client import kubernetes_client +from ktqueue.api import JobsHandler +from ktqueue.api import JobLogHandler +from ktqueue.api import JobLogWSHandler +from ktqueue.api import JobLogVersionHandler +from ktqueue.api import ReposHandler +from ktqueue.api import RepoHandler +from ktqueue.api import NodesHandler +from ktqueue.api import StopJobHandler +from ktqueue.api import RestartJobHandler +from ktqueue.api import TensorBoardProxyHandler +from ktqueue.api import TensorBoardHandler +from ktqueue.api import OAuth2Handler +from ktqueue.api import CurrentUserHandler +from ktqueue.api import AuthRequestHandler + + +from ktqueue.event_watcher import watch_pod + +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) + +__frontend_path = os.path.join(BASE_DIR, 'frontend') +__dist_path = os.path.join(__frontend_path, 'dist') + + +def create_db_index(): + client = pymongo.MongoClient(ktqueue.settings.mongodb_server) + client.ktqueue.jobs.create_index([("name", pymongo.ASCENDING)], unique=True) + client.ktqueue.jobs.create_index([("hide", pymongo.ASCENDING)]) + client.ktqueue.jobs.create_index([("status", pymongo.ASCENDING)]) + client.ktqueue.credentials.create_index([("repo", pymongo.ASCENDING)], unique=True) + client.ktqueue.oauth.create_index([("provider", pymongo.ASCENDING), ("id", pymongo.ASCENDING)], unique=True) + + # compatibility + client.ktqueue.jobs.update_many({'hide': {'$exists': False}}, {'$set': {'hide': False}}) + client.ktqueue.jobs.update_many({'fav': {'$exists': False}}, {'$set': {'fav': False}}) + client.ktqueue.jobs.update_many({'memoryLimit': {'$exists': False}}, {'$set': {'memoryLimit': None}}) + client.ktqueue.jobs.update_many({'cpuLimit': {'$exists': False}}, {'$set': {'cpuLimit': None}}) + client.ktqueue.jobs.update_many({'commit_id': {'$exists': True}}, {'$rename': {'commit_id': 'commit'}}) + client.ktqueue.jobs.update_many({'gpu_num': {'$exists': True}}, {'$rename': {'gpu_num': 'gpuNum'}}) + + +def get_app(): + k8s_client = kubernetes_client() + mongo_client = pymongo.MongoClient(ktqueue.settings.mongodb_server) + + # other args to app + app_kwargs = {} + # debug + app_kwargs['debug'] = os.environ.get('KTQUEUE_DEBUG', '0') == '1' + # cookie_secret + app_kwargs['cookie_secret'] = ktqueue.settings.cookie_secret + application = tornado.web.Application([ + (r'/()', tornado.web.StaticFileHandler, { + 'path': __dist_path, + 'default_filename': 'index.html' + }), + (r'/dist/(.*)', tornado.web.StaticFileHandler, {'path': __dist_path}), + (r'/tensorboard/(?P[\.\w_-]+)/(?P.*)', TensorBoardProxyHandler, {'client': SimpleAsyncHTTPClient(max_clients=64)}), + (r'/data/(?P.*)', TensorBoardProxyHandler, {'client': SimpleAsyncHTTPClient(max_clients=64)}), # This is a hack for TensorBoard + (r'/auth/oauth2/start', OAuth2Handler, {'mongo_client': mongo_client}), + (r'/auth/oauth2/callback', OAuth2Handler, {'mongo_client': mongo_client}), + (r'/auth/auth', AuthRequestHandler), + # APIS + (r'/api/nodes', NodesHandler, {'k8s_client': k8s_client, 'mongo_client': mongo_client}), + (r'/api/jobs', JobsHandler, {'k8s_client': k8s_client, 'mongo_client': mongo_client}), + (r'/api/jobs/(?P[\.\w_-]+)/log', JobLogHandler, {'k8s_client': k8s_client, 'mongo_client': mongo_client}), + (r'/api/jobs/(?P[\.\w_-]+)/log/(?P\d+|current)', JobLogHandler, {'k8s_client': k8s_client, 'mongo_client': mongo_client}), + (r'/api/jobs/(?P[\.\w_-]+)/log/version', JobLogVersionHandler, {'k8s_client': k8s_client}), + (r'/api/job/stop/(?P[\.\w_\-]+)', StopJobHandler, {'k8s_client': k8s_client, 'mongo_client': mongo_client}), + (r'/api/job/restart/(?P[\.\w_-]+)', RestartJobHandler, {'k8s_client': k8s_client, 'mongo_client': mongo_client}), + (r'/api/job/tensorboard/(?P[\.\w_-]+)', TensorBoardHandler, {'k8s_client': k8s_client, 'mongo_client': mongo_client}), + (r'/api/repos', ReposHandler, {'mongo_client': mongo_client}), + (r'/api/repos/(?P[0-9a-f]+)', RepoHandler, {'mongo_client': mongo_client}), + (r'/api/current_user', CurrentUserHandler), + (r'/wsapi/jobs/(?P[\.\w_-]+)/log', JobLogWSHandler, {'k8s_client': k8s_client, 'mongo_client': mongo_client}), + ], **app_kwargs) + return application + + +async def async_init(): + tasks = [ + watch_pod(kubernetes_client()), + ] + await asyncio.wait(tasks) + + +def start_server(): + create_db_index() + AsyncIOMainLoop().install() + app = get_app() + app.listen(8080) + loop = asyncio.get_event_loop() + if os.environ.get('KTQUEUE_DEBUG', '0') == '1': + print('Reload.') + loop.run_until_complete(async_init()) + loop.run_forever() + + +def main(): + logging.basicConfig(stream=sys.stdout, level=int(os.environ.get('KTQUEUE_DEBUG_LEVEL', logging.INFO)), + format='[%(asctime)s] %(name)s:%(levelname)s: %(message)s') + start_server() + + +if __name__ == '__main__': + main()