From c63a8dbd7df970db9f3689f7b68418e1ec5e739f Mon Sep 17 00:00:00 2001 From: Comzyh Date: Wed, 22 Feb 2017 15:22:59 +0800 Subject: [PATCH 001/107] Opensource --- .gitignore | 92 +++++++ Dockerfile | 21 ++ deploy/.gitignore | 1 + deploy/README.md | 50 ++++ deploy/ceph-secret.yaml | 6 + deploy/mongodb-dev.yaml | 27 ++ deploy/mongodb-production.yaml | 34 +++ deploy/mongodb-service.yaml | 12 + frontend/index.html | 451 +++++++++++++++++++++++++++++++ frontend/static/ktqueue.css | 23 ++ ktqueue.yaml | 70 +++++ ktqueue/__init__.py | 1 + ktqueue/api/__init__.py | 8 + ktqueue/api/job.py | 349 ++++++++++++++++++++++++ ktqueue/api/node.py | 24 ++ ktqueue/api/repo.py | 75 +++++ ktqueue/api/tensorboard_proxy.py | 69 +++++ ktqueue/api/utils.py | 12 + ktqueue/cloner.py | 148 ++++++++++ ktqueue/event_watcher.py | 107 ++++++++ ktqueue/kubernetes_client.py | 60 ++++ ktqueue/settings.py | 3 + ktqueue/utils.py | 29 ++ requirements.txt | 4 + server.py | 87 ++++++ 25 files changed, 1763 insertions(+) create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 deploy/.gitignore create mode 100644 deploy/README.md create mode 100644 deploy/ceph-secret.yaml create mode 100644 deploy/mongodb-dev.yaml create mode 100644 deploy/mongodb-production.yaml create mode 100644 deploy/mongodb-service.yaml create mode 100644 frontend/index.html create mode 100644 frontend/static/ktqueue.css create mode 100644 ktqueue.yaml create mode 100644 ktqueue/__init__.py create mode 100644 ktqueue/api/__init__.py create mode 100644 ktqueue/api/job.py create mode 100644 ktqueue/api/node.py create mode 100644 ktqueue/api/repo.py create mode 100644 ktqueue/api/tensorboard_proxy.py create mode 100644 ktqueue/api/utils.py create mode 100644 ktqueue/cloner.py create mode 100644 ktqueue/event_watcher.py create mode 100644 ktqueue/kubernetes_client.py create mode 100644 ktqueue/settings.py create mode 100644 ktqueue/utils.py create mode 100644 requirements.txt create mode 100644 server.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0020c7e --- /dev/null +++ b/.gitignore @@ -0,0 +1,92 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +env/ +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +*.egg-info/ +.installed.cfg +*.egg + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*,cover +.hypothesis/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# IPython Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# dotenv +.env + +# virtualenv +venv/ +ENV/ + +# Spyder project settings +.spyderproject + +# Rope project settings +.ropeproject + +.remote-sync.json +dev_environment.sh diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..5a01426 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,21 @@ +FROM ubuntu:16.04 + +RUN apt-get update && apt-get install -y apt-transport-https ca-certificates + +RUN echo "deb https://mirrors.tuna.tsinghua.edu.cn/ubuntu/ xenial main restricted universe multiverse" > /etc/apt/sources.list && \ + echo "deb https://mirrors.tuna.tsinghua.edu.cn/ubuntu/ xenial-updates main restricted universe multiverse" >> /etc/apt/sources.list && \ + echo "deb https://mirrors.tuna.tsinghua.edu.cn/ubuntu/ xenial-backports main restricted universe multiverse" >> /etc/apt/sources.list && \ + echo "deb https://mirrors.tuna.tsinghua.edu.cn/ubuntu/ xenial-security main restricted universe multiverse" >> /etc/apt/sources.list + +RUN apt-get update +RUN apt-get install -y wget python3.5 python3-pip git + +RUN python3.5 -m pip install kubernetes tornado aiohttp pymongo --ignore-installed -i https://pypi.tuna.tsinghua.edu.cn/simple + +ADD . /ktqueue +WORKDIR /ktqueue + +RUN python3.5 -m pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple +CMD python3 /ktqueue/server.py + +EXPOSE 8080 diff --git a/deploy/.gitignore b/deploy/.gitignore new file mode 100644 index 0000000..cb5b539 --- /dev/null +++ b/deploy/.gitignore @@ -0,0 +1 @@ +dep*.yaml diff --git a/deploy/README.md b/deploy/README.md new file mode 100644 index 0000000..c0182f2 --- /dev/null +++ b/deploy/README.md @@ -0,0 +1,50 @@ +# Deployment + +- mongodb-service.yaml +- mongodb-dev.yaml +- mongodb-production.yaml + +# modify configure + +> cp mongodb-production.yaml dep-mongodb-production.yaml + +update your own ceph configure + +if you want to know mon in your ceph cluster, just type + +> ceph mon stat + +get your ceph-secret to kubernetes + +> sudo ceph auth get-key client.admin | base64 + +Note: though ceph get-key is encoded by base64, you should encode it again. + +then, update your ceph-secret.yaml, and import `ceph-secret.yaml` + +> cp ceph-secret.yaml dep-ceph-secret.yaml && vi dep-ceph-secret.yaml +> kubectl create -f dep-ceph-secret.yaml + +# create rbd + +> rbd create rbd/ktqueue-mongodb -s 10240 + +> rbd info rbd/ktqueue-mongodb + +try: + +> rbd map ktqueue-mongodb + +current linux kernal doesn't support all the features. if you get error, refer [this](http://tonybai.com/2016/11/07/integrate-kubernetes-with-ceph-rbd/) +> rbd feature disable ktqueue-mongodb exclusive-lock, object-map, fast-diff, deep-flatten + +# create services + + +create mongodb service + +> kubectl create -f mongodb-service.yaml + +create mongodb server + +> kubectl create -f dep-mongodb-production.yaml diff --git a/deploy/ceph-secret.yaml b/deploy/ceph-secret.yaml new file mode 100644 index 0000000..9c36bde --- /dev/null +++ b/deploy/ceph-secret.yaml @@ -0,0 +1,6 @@ +Version: v1 +kind: Secret +metadata: + name: ceph-secret +data: + key: diff --git a/deploy/mongodb-dev.yaml b/deploy/mongodb-dev.yaml new file mode 100644 index 0000000..2e05296 --- /dev/null +++ b/deploy/mongodb-dev.yaml @@ -0,0 +1,27 @@ +apiVersion: v1 +kind: ReplicationController +metadata: + labels: + name: ktqueue-mongodb + name: ktqueue-mongodb-controller +spec: + replicas: 1 + template: + metadata: + labels: + name: ktqueue-mongodb + spec: + containers: + - image: mongo + name: mongo + ports: + - name: mongo + containerPort: 27017 + hostPort: 27017 + volumeMounts: + - name: mongo-persistent-storage + mountPath: /data/db + volumes: + - name: mongo-persistent-storage + hostPath: + path: /tmp/tq-mongo diff --git a/deploy/mongodb-production.yaml b/deploy/mongodb-production.yaml new file mode 100644 index 0000000..c5ba116 --- /dev/null +++ b/deploy/mongodb-production.yaml @@ -0,0 +1,34 @@ +apiVersion: v1 +kind: ReplicationController +metadata: + labels: + name: ktqueue-mongodb + name: ktqueue-mongodb-controller +spec: + replicas: 1 + template: + metadata: + labels: + name: ktqueue-mongodb + spec: + containers: + - image: mongo + name: mongo + ports: + - name: mongo + containerPort: 27017 + hostPort: 27017 + volumeMounts: + - name: mongo-persistent-storage + mountPath: /data/db + volumes: + - name: mongo-persistent-storage + rbd: + monitors: + - localhost:6789 # your mons here + pool: rbd + image: ktqueue-mongodb + secretRef: + name: ceph-secret + fsType: ext4 + readOnly: false diff --git a/deploy/mongodb-service.yaml b/deploy/mongodb-service.yaml new file mode 100644 index 0000000..9ef4fa3 --- /dev/null +++ b/deploy/mongodb-service.yaml @@ -0,0 +1,12 @@ +apiVersion: v1 +kind: Service +metadata: + labels: + name: ktqueue-mongodb + name: ktqueue-mongodb +spec: + ports: + - port: 27017 + targetPort: 27017 + selector: + name: ktqueue-mongodb diff --git a/frontend/index.html b/frontend/index.html new file mode 100644 index 0000000..3eb56f9 --- /dev/null +++ b/frontend/index.html @@ -0,0 +1,451 @@ + + + + + + + KTQueue + + + + + + +
+
+
+ +

KTQueue

+ Jobs + Repos +
+ +
+ + + + + + + + + + + + diff --git a/frontend/static/ktqueue.css b/frontend/static/ktqueue.css new file mode 100644 index 0000000..44c2711 --- /dev/null +++ b/frontend/static/ktqueue.css @@ -0,0 +1,23 @@ +body { + padding: 0 1em ; + margin: 0; +} +#tq-header { +} +#tq-title { + vertical-align: middle; + line-height: 60px; + float: left; + margin: 0 2em 0 1em; +} +.table-header { + padding: .5em 0; +} +.table-header .el-pagination { + display: inline-block; +} +.inline-form-span { + width: 1em; + height: 1em; + float: left; +} diff --git a/ktqueue.yaml b/ktqueue.yaml new file mode 100644 index 0000000..0ebcb5a --- /dev/null +++ b/ktqueue.yaml @@ -0,0 +1,70 @@ +--- +apiVersion: v1 +kind: ServiceAccount +metadata: + name: ktqueue +--- +apiVersion: extensions/v1beta1 +kind: Deployment +metadata: + name: ktqueue + labels: + app: ktqueue +spec: + replicas: 1 + selector: + matchLabels: + app: ktqueue + template: + metadata: + labels: + app: ktqueue + spec: + nodeSelector: + beta.kubernetes.io/arch: amd64 + serviceAccountName: ktqueue + containers: + - name: ktqueue + image: ktqueue + imagePullPolicy: IfNotPresent + command: [ "python3", "/ktqueue/server.py" ] + volumeMounts: + - name: cephfs + mountPath: /cephfs + ports: + - containerPort: 8080 + protocol: TCP + securityContext: + privileged: true + volumes: + - name: 'cephfs' + hostPath: + path: /mnt/cephfs + # cephfs: + # monitors: + # - localhost:6789 + # user: admin + # secretRef: + # name: ceph-secret + # readOnly: false + +--- +kind: Service +apiVersion: v1 +metadata: + labels: + app: ktqueue + name: ktqueue + namespace: default +spec: + type: NodePort + ports: + - port: 80 + targetPort: 8080 + selector: + app: ktqueue +--- +apiVersion: v1 +kind: Namespace +metadata: + name: ktqueue diff --git a/ktqueue/__init__.py b/ktqueue/__init__.py new file mode 100644 index 0000000..bca5f67 --- /dev/null +++ b/ktqueue/__init__.py @@ -0,0 +1 @@ +# encoding: utf-8 diff --git a/ktqueue/api/__init__.py b/ktqueue/api/__init__.py new file mode 100644 index 0000000..444107f --- /dev/null +++ b/ktqueue/api/__init__.py @@ -0,0 +1,8 @@ +# encoding: utf-8 +from .job import JobsHandler +from .job import JobLogHandler +from .job import StopJobHandler +from .job import TensorBoardHandler +from .repo import ReposHandler +from .node import NodesHandler +from .tensorboard_proxy import TensorBoardProxyHandler diff --git a/ktqueue/api/job.py b/ktqueue/api/job.py new file mode 100644 index 0000000..5916dd8 --- /dev/null +++ b/ktqueue/api/job.py @@ -0,0 +1,349 @@ +# encoding: utf-8 +import json +import os +import logging + +import tornado.web + +from ktqueue.cloner import Cloner +from .utils import convert_asyncio_task +from ktqueue.utils import save_job_log +from ktqueue import settings + + +class JobsHandler(tornado.web.RequestHandler): + + def initialize(self, k8s_client, mongo_client): + self.k8s_client = k8s_client + self.mongo_client = mongo_client + self.jobs_collection = mongo_client.ktqueue.jobs + + @convert_asyncio_task + async def post(self): + """ + Create a new job. + e.x. request: + { + "name": "test-17", + "command": "echo 'aW1wb3J0IHRlbnNvcmZsb3cgYXMgdGYKaW1wb3J0IHRpbWUKc2Vzc2lvbiA9IHRmLlNlc3Npb24oKQpmb3IgaSBpbiByYW5nZSg2MDApOgogICAgdGltZS5zbGVlcCgxKQogICAgcHJpbnQoaSkK' | base64 -d | python3", + "gpu_num": 1, + "image": "comzyh/tf_image", + "repo": "https://github.com/comzyh/TF_Docker_Images.git", + "commit_id": "3701b94219fb06974f485cabf99ad88019afe618" + } + """ + body_arguments = json.loads(self.request.body.decode('utf-8')) + + name = body_arguments.get('name') + + # job with same name is forbidden + if self.jobs_collection.find_one({'name': name}): + self.set_status(400) + self.finish(json.dumps({'message': 'Job {} already exists'.format(name)})) + return + + command = body_arguments.get('command') + node = body_arguments.get('node', None) + gpu_num = int(body_arguments.get('gpu_num')) + image = body_arguments.get('image') + repo = body_arguments.get('repo', None) + commit_id = body_arguments.get('commit_id', None) + + command_kube = 'cd $WORK_DIR && ' + command + + job_dir = os.path.join('/cephfs/ktqueue/jobs/', name) + if not os.path.exists(job_dir): + os.makedirs(job_dir) + + output_dir = os.path.join('/cephfs/ktqueue/output', name) + if not os.path.exists(output_dir): + os.makedirs(output_dir) + + volumeMounts = [] + volumes = [] + node_selector = {} + + # add custom volumeMounts + for volume in body_arguments.get('volumeMounts', []): + volume_name = 'volume-{}'.format(volume['key']) + volumes.append({ + 'name': volume_name, + 'hostPath': {'path': volume['hostPath']} + }) + volumeMounts.append({ + 'name': volume_name, + 'mountPath': volume['mountPath'] + }) + + if node: + node_selector['kubernetes.io/hostname'] = node + + if gpu_num > 0: + # cause kubernetes does not support NVML, use this trick to suit nvidia driver version + command_kube = 'version=$(ls /nvidia-drivers | tail -1); ln -s /nvidia-drivers/$version /usr/local/nvidia &&' + command_kube + volumes.append({ + 'name': 'nvidia-drivers', + 'hostPath': { + 'path': '/var/lib/nvidia-docker/volumes/nvidia_driver' + } + }) + volumeMounts.append({ + 'name': 'nvidia-drivers', + 'mountPath': '/nvidia-drivers', + }) + + # cephfs + volumes.append({ + 'name': 'cephfs', + 'hostPath': { + 'path': '/mnt/cephfs' + } + }) + volumeMounts.append({ + 'name': 'cephfs', + 'mountPath': '/cephfs', + }) + + job = { + 'apiVersion': 'batch/v1', + 'kind': 'Job', + 'metadata': { + 'name': name, + }, + 'spec': { + 'parallelism': 1, + 'template': { + 'metadata': { + 'name': name, + }, + 'spec': { + 'containers': [ + { + 'name': name + 'container', + 'image': image, + 'imagePullPolicy': 'IfNotPresent', + 'command': ['sh', '-c', command_kube], + 'resources': { + 'limits': { + 'alpha.kubernetes.io/nvidia-gpu': gpu_num, + }, + }, + 'volumeMounts': volumeMounts, + 'env': [ + { + 'name': 'JOB_NAME', + 'value': name + }, + { + 'name': 'OUTPUT_DIR', + 'value': output_dir + }, + { + 'name': 'WORK_DIR', + 'value': os.path.join(job_dir, 'code'), + }, + ] + } + ], + 'volumes': volumes, + 'restartPolicy': 'OnFailure', + 'nodeSelector': node_selector, + } + } + } + } + self.jobs_collection.update_one({'name': name}, {'$set': { + 'name': name, + 'node': node, + 'command': command, + 'gpu_num': gpu_num, + 'repo': repo, + 'commit_id': commit_id, + 'image': image, + 'status': 'fetching', + 'tensorboard': False, + 'volumeMounts': body_arguments.get('volumeMounts', []), + }}, upsert=True) + self.finish(json.dumps({'message': 'job {} successful created.'.format(name)})) + # clone code + if repo and commit_id: + cloner = Cloner(repo=repo, commit_id=commit_id, dst_directory=os.path.join(job_dir, 'code')) + await cloner.clone_and_copy() + + ret = await self.k8s_client.call_api( + api='/apis/batch/v1/namespaces/{namespace}/jobs'.format(namespace=settings.job_namespace), + method='POST', + data=job + ) + try: + self.jobs_collection.update_one({'name': name}, {'$set': { + 'status': 'pending', + 'creationTimestamp': ret['metadata']['creationTimestamp'], + }}, upsert=True) + except Exception as e: + logging.info(ret) + logging.exception(e) + + async def get(self): + page = int(self.get_argument('page', 1)) + page_size = int(self.get_argument('page_size', 20)) + count = self.jobs_collection.count() + jobs = list(self.jobs_collection.find().sort("_id", -1).skip(page_size * (page - 1)).limit(page_size)) + for job in jobs: + job['_id'] = str(job['_id']) + self.finish(json.dumps({ + 'page': page, + 'total': count, + 'page_size': page_size, + 'data': jobs, + })) + + +class JobLogHandler(tornado.web.RequestHandler): + + def initialize(self, k8s_client, mongo_client): + self.k8s_client = k8s_client + self.mongo_client = mongo_client + self.jobs_collection = mongo_client.ktqueue.jobs + + @convert_asyncio_task + async def get(self, job): + pods = await self.k8s_client.call_api( + method='GET', + api='/api/v1/namespaces/{namespace}/pods'.format(namespace=settings.job_namespace), + params={'labelSelector': 'job-name={job}'.format(job=job)} + ) + if len(pods['items']): + pod_name = pods['items'][0]['metadata']['name'] + resp = await self.k8s_client.call_api_raw( + method='GET', + api='/api/v1/namespaces/{namespace}/pods/{pod_name}/log'.format(namespace=settings.job_namespace, pod_name=pod_name) + ) + async for chunk in resp.content.iter_any(): + self.write(chunk) + resp.close() + else: + with open(os.path.join('/cephfs/ktqueue/logs', job, 'log.txt'), 'r') as f: + self.finish(f.read()) + + +class StopJobHandler(tornado.web.RequestHandler): + + def initialize(self, k8s_client, mongo_client): + self.k8s_client = k8s_client + self.mongo_client = mongo_client + self.jobs_collection = mongo_client.ktqueue.jobs + + @convert_asyncio_task + async def post(self, job): + pods = await self.k8s_client.call_api( + method='GET', + api='/api/v1/namespaces/{namespace}/pods'.format(namespace=settings.job_namespace), + params={'labelSelector': 'job-name={job}'.format(job=job)} + ) + if len(pods['items']): + pod_name = pods['items'][0]['metadata']['name'] + await save_job_log(job_name=job, pod_name=pod_name, k8s_client=self.k8s_client) + await self.k8s_client.call_api( + method='DELETE', + api='/apis/batch/v1/namespaces/{namespace}/jobs/{name}'.format(namespace=settings.job_namespace, name=job) + ) + await self.k8s_client.call_api( + method='DELETE', + api='/api/v1/namespaces/{namespace}/pods/{name}'.format(namespace=settings.job_namespace, name=pod_name) + ) + self.jobs_collection.update_one({'name': job}, {'$set': {'status': 'ManualStop'}}) + self.finish({'message': 'Job {} successful deleted.'.format(job)}) + + +class TensorBoardHandler(tornado.web.RequestHandler): + + def initialize(self, k8s_client, mongo_client): + self.k8s_client = k8s_client + self.mongo_client = mongo_client + self.jobs_collection = mongo_client.ktqueue.jobs + + @convert_asyncio_task + async def post(self, job): + job_image = self.jobs_collection.find_one({'name': job})['image'] + body_arguments = json.loads(self.request.body.decode('utf-8')) + logdir = body_arguments.get('logdir', '/cephfs/ktqueue/logs/{job}/train'.format(job=job)) + job_dir = os.path.join('/cephfs/ktqueue/jobs/', job) + output_dir = os.path.join('/cephfs/ktqueue/output', job) + command = 'tensorboard --logdir {logdir} --host 0.0.0.0'.format(logdir=logdir) + + pod = { + 'apiVersion': 'v1', + 'kind': 'Pod', + 'metadata': { + 'name': '{job}-tensorboard'.format(job=job), + 'labels': { + 'ktqueue-tensorboard-job-name': job + } + }, + 'spec': { + 'containers': [ + { + 'name': 'ktqueue-tensorboard', + 'image': job_image, + 'command': ['sh', '-c', command], + 'volumeMounts': [ + { + 'name': 'cephfs', + 'mountPath': '/cephfs', + } + ], + 'env': [ + { + 'name': 'JOB_NAME', + 'value': job + }, + { + 'name': 'OUTPUT_DIR', + 'value': output_dir + }, + { + 'name': 'WORK_DIR', + 'value': os.path.join(job_dir, 'code'), + }, + ] + } + ], + 'volumes': [ + { + 'name': 'cephfs', + 'hostPath': { + 'path': '/mnt/cephfs' + } + } + ] + } + } + + ret = await self.k8s_client.call_api( + api='/api/v1/namespaces/{namespace}/pods'.format(namespace=settings.job_namespace), + method='POST', + data=pod + ) + self.jobs_collection.update_one({'name': job}, {'$set': {'tensorboard': True}}) + + self.write(ret) + + @convert_asyncio_task + async def delete(self, job): + pods = await self.k8s_client.call_api( + method='GET', + api='/api/v1/namespaces/{namespace}/pods'.format(namespace=settings.job_namespace), + params={'labelSelector': 'ktqueue-tensorboard-job-name={job}'.format(job=job)} + ) + if len(pods['items']): + pod_name = pods['items'][0]['metadata']['name'] + ret = await self.k8s_client.call_api( + api='/api/v1/namespaces/{namespace}/pods/{name}'.format(namespace=settings.job_namespace, name=pod_name), + method='DELETE', + ) + self.write(ret) + else: + self.set_status(404) + self.write({'message': 'tensorboard pod not found.'}) + self.jobs_collection.update_one({'name': job}, {'$set': {'tensorboard': False}}) diff --git a/ktqueue/api/node.py b/ktqueue/api/node.py new file mode 100644 index 0000000..63705b7 --- /dev/null +++ b/ktqueue/api/node.py @@ -0,0 +1,24 @@ +# encoding: utf-8 +import tornado.web + +from .utils import convert_asyncio_task + + +class NodesHandler(tornado.web.RequestHandler): + + def initialize(self, k8s_client, mongo_client): + self.k8s_client = k8s_client + self.mongo_client = mongo_client + + @convert_asyncio_task + async def get(self): + ret = await self.k8s_client.call_api( + api='/api/v1/nodes', + method='GET', + ) + + self.write({'items': [{ + 'name': node['metadata']['name'], + 'labels': node['metadata']['labels'] + } for node in ret['items'] + ]}) diff --git a/ktqueue/api/repo.py b/ktqueue/api/repo.py new file mode 100644 index 0000000..160582b --- /dev/null +++ b/ktqueue/api/repo.py @@ -0,0 +1,75 @@ +# encoding: utf-8 + +import json +import re + +import tornado.web + + +class ReposHandler(tornado.web.RequestHandler): + + __https_pattern = re.compile(r'https:\/\/(\w+@\w+)?[\w.\/]*.git') + __ssh_pattern = re.compile(r'\w+@[\w.]+:\w+\/\w+\.git') + + def initialize(self, mongo_client): + self.mongo_client = mongo_client + self.repos_collection = self.mongo_client.ktqueue.credentials + + async def post(self): + """create a credential for repo, support both ssh & https. + ssh e.x.: + { + "repo": "git@github.com:naturali/tensorflow.git", + "ssh_key": "" + } + https e.x.: + { + "repo": "git@github.com:naturali/tensorflow.git", + "username": "", + "password": "" + } + use deployment key with ssh is recommanded + """ + body = json.loads(self.request.body.decode('utf-8')) + repo = body['repo'].strip() + if self.__ssh_pattern.match(repo): + if body.get('ssh_key', None) is None: + self.set_status(400) + self.finish(json.dumps({'message': 'ssh_key must be provided.'})) + return + elif self.__https_pattern.match(repo): + if body.get('username', None) is None or body.get('password', None) is None: + self.set_status(400) + self.finish(json.dumps({'message': 'username and password must be provided.'})) + return + else: + self.set_status(400) + self.finish(json.dumps({'message': 'illigal repo'})) + return + + self.repos_collection.update_one({'repo': repo}, {'$set': body}, upsert=True) + self.finish(json.dumps({'message': 'repo {} successful added.'.format(repo)})) + + async def delete(self): + body = json.loads(self.request.body.decode('utf-8')) + repo = body['repo'].strip() + + self.mongo_client.ktqueue.credentials.delete_one({'repo': repo}) + self.finish(json.dumps({'message': 'repo {} successful added.'.format(repo)})) + + async def get(self): + page = int(self.get_argument('page', 1)) + page_size = int(self.get_argument('page_size', 20)) + count = self.repos_collection.count() + repos = [] + for repo in self.repos_collection.find().sort("_id", -1).skip(page_size * (page - 1)).limit(page_size): + repos.append({ + '_id': str(repo['_id']), + 'repo': repo['repo'] + }) + self.finish(json.dumps({ + 'page': page, + 'total': count, + 'page_size': page_size, + 'data': repos, + })) diff --git a/ktqueue/api/tensorboard_proxy.py b/ktqueue/api/tensorboard_proxy.py new file mode 100644 index 0000000..6ccc2bd --- /dev/null +++ b/ktqueue/api/tensorboard_proxy.py @@ -0,0 +1,69 @@ +# encoding: utf- +import tornado.web +import tornado.httpclient +import tornado.httputil +import urllib.parse +import re + +from tornado.httpclient import HTTPRequest + +job_tensorboard_map = {} + + +class TensorBoardProxyHandler(tornado.web.RequestHandler): + __job_pattern = re.compile(r'/tensorboard/(?P[\w_\-\.]+)/(.*?)') + + def initialize(self, client): + self.client = client + + async def get(self, **kwargs): + if 'job' not in kwargs: + referer_path = urllib.parse.urlparse(self.request.headers['Referer']).path + job = self.__job_pattern.match(referer_path).group('job') + url = 'data/' + kwargs['url'] + else: + job = kwargs['job'] + url = kwargs['url'] + + host = job_tensorboard_map.get(job, None) + if not host: + self.set_status(404) + self.finish({'message': 'tensorboard not found.'}) + return + + parsed_url = urllib.parse.urlparse(self.request.uri) + if parsed_url.query: + url += '?' + parsed_url.query + + body = self.request.body + if not body: + body = None + + request = HTTPRequest( + url='http://{host}:6006/{url}'.format(host=host, url=url), + method=self.request.method, body=body, + headers=self.request.headers, follow_redirects=False, + allow_nonstandard_methods=True, + request_timeout=180 if 'job' not in kwargs else 4, + ) + response = await self.client.fetch(request) + + if response.error and not isinstance(response.error, tornado.httpclient.HTTPError): + self.set_status(500) + self.finish('Internal server error:\n' + str(response.error)) + return + + self.set_status(response.code) + self._headers = tornado.httputil.HTTPHeaders() + for header, v in response.headers.get_all(): + if header not in ('Content-Length', 'Transfer-Encoding', 'Content-Encoding', 'Connection'): + self.add_header(header, v) + + if response.body: + self.set_header('Content-Length', len(response.body)) + self.write(response.body) + + self.finish() + + async def post(self, **kwargs): + await self.get(**kwargs) diff --git a/ktqueue/api/utils.py b/ktqueue/api/utils.py new file mode 100644 index 0000000..9eb53f6 --- /dev/null +++ b/ktqueue/api/utils.py @@ -0,0 +1,12 @@ +# encoding: utf-8 +import asyncio +import functools + + +def convert_asyncio_task(method): + """https://github.com/KeepSafe/aiohttp/issues/1176""" + @functools.wraps(method) + async def wrapper(self, *args, **kwargs): + coro = method(self, *args, **kwargs) + return await asyncio.get_event_loop().create_task(coro) + return wrapper diff --git a/ktqueue/cloner.py b/ktqueue/cloner.py new file mode 100644 index 0000000..ae4aa58 --- /dev/null +++ b/ktqueue/cloner.py @@ -0,0 +1,148 @@ +# encoding: utf-8 + +import os +import hashlib +import re +import asyncio +import logging +import urllib.parse + +import pymongo + + +class Cloner: + """Do the git clone stuff in another thread""" + + __https_pattern = re.compile(r'https:\/\/(\w+@\w+)?[\w.\/]*.git') + __ssh_pattern = re.compile(r'\w+@[\w.]+:\w+\/\w+\.git') + + def __init__(self, repo, commit_id, dst_directory): + self.repo = repo.strip() + self.commit_id = commit_id + self.dst_directory = dst_directory + + self.mongo_client = pymongo.MongoClient('ktqueue-mongodb') + self.ssh_key_path = None + + if self.__ssh_pattern.match(repo): + self.repo_type = 'ssh' + elif self.__https_pattern.match(repo): + self.repo_type = 'https' + else: + raise Exception('wrong repo type') + + if self.repo_type == 'ssh' and self.mongo_client.ktqueue.credentials.find_one({'repo': self.repo}) is None: + raise Exception('ssh credential for {repo} must be provided.'.format(repo=repo)) + + self.repo_hash = hashlib.sha1(self.repo.encode('utf-8')).hexdigest() + + async def prepare_ssh_key(self, repo): + ssh_key_dir = os.path.join('/tmp/ktqueue/ssh_keys', self.repo_hash) + if not os.path.exists(ssh_key_dir): + os.makedirs(ssh_key_dir) + self.ssh_key_path = os.path.join(ssh_key_dir, 'id') + if not os.path.exists(self.ssh_key_path): + with open(self.ssh_key_path, 'w') as f: + credential = self.mongo_client.ktqueue.credentials.find_one({'repo': self.repo}) + f.write(credential['ssh_key']) + os.chmod(self.ssh_key_path, 0o600) # prevent WARNING: UNPROTECTED PRIVATE KEY FILE! + await asyncio.sleep(1.0) # os.chmod may have strange behavior, that ssh still permission 644 after too short time + + @classmethod + async def git_with_ssh_key(cls, ssh_key_path, args, cwd=None): + env = {**os.environ, + 'GIT_SSH_COMMAND': 'ssh -oStrictHostKeyChecking=no -i {ssh_key_path}'.format(ssh_key_path=ssh_key_path) + } + proc = await asyncio.create_subprocess_exec(*['git'] + args, stdout=asyncio.subprocess.PIPE, env=env, cwd=cwd) + lines = [] + async for line in proc.stdout: + logging.debug(line) + lines.append(line) + recode = await proc.wait() + return recode, lines + + @classmethod + def add_credential_to_https_url(cls, url, username, password): + parsed = urllib.parse.urlparse(url) + if username is not None and password is not None: + host_and_port = parsed.hostname + if parsed.port: + host_and_port += ':' + parsed.port + parsed._replace(netloc='{}:{}@{}'.format(username, password, host_and_port)) + return parsed.geturl() + + @classmethod + async def git_with_https(cls, args, cwd=None): + proc = await asyncio.create_subprocess_exec(*['git'] + args, stdout=asyncio.subprocess.PIPE, cwd=cwd) + lines = [] + async for line in proc.stdout: + logging.debug(line) + lines.append(line) + recode = await proc.wait() + return recode, lines + + async def clone_and_copy(self, archive_file=None, keep_archive=False): + if not os.path.exists('/cephfs/ktqueue/repos'): + os.makedirs('/cephfs/ktqueue/repos') + repo_path = os.path.join('/cephfs/ktqueue/repos', self.repo_hash) + + if self.repo_type == 'ssh': + await self.prepare_ssh_key(self.repo) + elif self.repo_type == 'https': + credential = self.mongo_client.ktqueue.credentials.find_one({'repo': self.repo}) + repo_url = self.repo + if credential: + repo_url = self.add_credential_to_https_url( + self.repo, username=credential['username'], password=credential['password']) + + if not os.path.exists(repo_path): # Then clone it + if self.repo_type == 'ssh': + retcode, retlines = await self.git_with_ssh_key( + ssh_key_path=self.ssh_key_path, + cwd='/cephfs/ktqueue/repos', + args=['clone', self.repo, '--recursive', self.repo_hash], + + ) + else: + retcode, retlines = await self.git_with_https( + cwd='/cephfs/ktqueue/repos', + args=['clone', repo_url, '--recursive', self.repo_hash], + ) + if retcode != 0: + logging.error('clone repo failed with retcode {}.'.format(retcode)) + else: + if self.repo_type == 'ssh': + retcode, retlines = await self.git_with_ssh_key( + ssh_key_path=self.ssh_key_path, + cwd=repo_path, + args=['fetch'], + ) + else: + retcode, retlines = await self.git_with_https( + cwd=repo_path, + args=['fetch'], + ) + if retcode != 0: + logging.error('fetch repo failed with retcode {}.'.format(retcode)) + + if not os.path.exists('/cephfs/ktqueue/repo_archive'): + os.makedirs('/cephfs/ktqueue/repo_archive') + if archive_file is None: + archive_file = '/cephfs/ktqueue/repo_archive/{}.tar.gz'.format(self.commit_id) + + # Arcive commit_id + logging.info('Arciving {}:{}'.format(self.repo, self.commit_id)) + with open(archive_file, 'wb') as f: + proc = await asyncio.create_subprocess_exec(*['git', 'archive', self.commit_id, '--forma', 'tar.gz'], + stdout=f, cwd=repo_path) + retcode = await proc.wait() + if retcode != 0: + logging.error('Arcive repo failed with retcode {}'.format(retcode)) + + if not os.path.exists(self.dst_directory): + os.makedirs(self.dst_directory) + proc = await asyncio.create_subprocess_exec(*['tar', 'xzf', archive_file], cwd=self.dst_directory) + retcode = await proc.wait() + + if not keep_archive: + os.remove(archive_file) diff --git a/ktqueue/event_watcher.py b/ktqueue/event_watcher.py new file mode 100644 index 0000000..95489bf --- /dev/null +++ b/ktqueue/event_watcher.py @@ -0,0 +1,107 @@ +# encoding: utf-8 +import logging +import asyncio +import json + +import pymongo + +from ktqueue.utils import save_job_log +from ktqueue import settings + + +class EventWatcher: + + def __init__(self, k8s_client=None): + assert k8s_client is not None + self.k8s_client = k8s_client + self.running = True + + async def poll(self, api, method='GET', callback=None, **kwargs): + """ + This function will never return, await the future carefully. + """ + assert callback is not None + timeout = kwargs.pop('timeout', None) + while self.running: + try: + session = self.k8s_client.new_connector_session() + resp = await self.k8s_client.call_api_raw( + api=api, method=method, timeout=timeout, session=session, **kwargs) + async for line in resp.content: + try: + await callback(json.loads(line.decode('utf-8'))) + except Exception as e: + logging.exception(e) + else: + pass + except Exception as e: + logging.exception(e) + await asyncio.sleep(1) + finally: + session.close() + + +async def watch_pod(k8s_client): + from .api.tensorboard_proxy import job_tensorboard_map + + mongo_client = pymongo.MongoClient('ktqueue-mongodb') + jobs_collection = mongo_client.ktqueue.jobs + + async def callback(event): + labels = event['object']['metadata']['labels'] + if 'ktqueue-tensorboard-job-name' in labels: + job_name = labels['ktqueue-tensorboard-job-name'] + hostIP = event['object']['status'].get('podIP', None) + if event['type'] == 'DELETED': + job_tensorboard_map.pop(job_name, None) + elif hostIP: + job_tensorboard_map[job_name] = hostIP + return + + if 'job-name' not in labels: + return + job_name = labels['job-name'] + if not jobs_collection.find_one({'name': job_name}): + return + + if event['object']['status']['phase'] == 'Pending': + jobs_collection.update_one({'name': job_name}, {'$set': {'status': 'Pending'}}) + return + + state = event['object']['status']['containerStatuses'][0]['state'] + for k, v in state.items(): + status = (k, v.get('reason', None)) + status_str = '{}: {}'.format(*status) + continue + logging.info('Job {} enter state {}'.format(job_name, status_str)) + if jobs_collection.find_one({'name': job_name})['status'] != 'ManualStop': + jobs_collection.update_one({'name': job_name}, {'$set': { + 'status': status_str, + 'state': state, + }}) + + # When a job is successful finished, save log and do not watch it any more + if status[0] == 'terminated': + pod_name = event['object']['metadata']['name'] + + # save log first + await save_job_log(job_name=job_name, pod_name=pod_name, k8s_client=k8s_client) + + # set label 'ktqueue-watching' to 'false' + # refer https://tools.ietf.org/html/rfc6902#appendix-A.1 to know more about json-patch + if status == ('terminated', 'Completed'): + await k8s_client.call_api( + api='/api/v1/namespaces/{namespace}/pods/{name}'.format(namespace=settings.job_namespace, name=pod_name), + method='PATCH', + headers={'Content-Type': 'application/json-patch+json'}, + data=[{"op": "add", "path": "/metadata/labels/ktqueue-watching", "value": "false"}] + ) + + event_watcher = EventWatcher(k8s_client=k8s_client) + + await event_watcher.poll( + api='/api/v1/watch/namespaces/{namespace}/pods'.format(namespace=settings.job_namespace), + method='GET', + callback=callback, + params={'labelSelector': 'ktqueue-watching!=false'} + ) diff --git a/ktqueue/kubernetes_client.py b/ktqueue/kubernetes_client.py new file mode 100644 index 0000000..bd4c82a --- /dev/null +++ b/ktqueue/kubernetes_client.py @@ -0,0 +1,60 @@ +# encoding: utf-8 +import os +import aiohttp +import kubernetes +import json +import ssl + + +class kubernetes_client: + + def __init__(self, config=None): + if config is None: + config = self.get_service_account_config() + self.token = config.get('token', None) + self.host = config['host'] + self.token = config.get('token', None) + self.port = config.get('port', None) + self.schema = config['schema'] + self.api_preifx = "{schema}://{host}:{port}".format(schema=self.schema, host=self.host, port=self.port) + self.ca_crt = None + self.session = self.new_connector_session() + + def new_connector_session(self): + """ + 1 TCPConnector can handle 20 connections by default. + """ + if self.schema == 'https': + self.ca_crt = kubernetes.config.incluster_config.SERVICE_CERT_FILENAME + sslcontext = ssl.create_default_context(cafile=self.ca_crt) + conn = aiohttp.TCPConnector(ssl_context=sslcontext) + else: + conn = aiohttp.TCPConnector() + return aiohttp.ClientSession(connector=conn) + + @classmethod + def get_service_account_config(cls): + config = { + 'host': os.environ['KUBERNETES_SERVICE_HOST'], + 'port': os.environ['KUBERNETES_SERVICE_PORT'], + } + config['token'] = os.environ.get('KUBERNETES_API_ACCOUNT_TOKEN', None) # Not official, for debug usage + config['schema'] = os.environ.get('KUBERNETES_API_SCHEMA', 'https') # Not official, for debug usage + if config['token'] is None: + with open(kubernetes.config.incluster_config.SERVICE_TOKEN_FILENAME, 'r') as f: + config['token'] = f.read() + return config + + async def call_api(self, api, method='GET', **kwargs): + resp = await self.call_api_raw(api=api, method=method, **kwargs) + return json.loads(await resp.text()) + + async def call_api_raw(self, api, method='GET', **kwargs): + session = kwargs.pop('session', self.session) + url = self.api_preifx + api + headers = kwargs.pop('headers', {}) + headers['Authorization'] = 'Bearer {token}'.format(token=self.token) + headers['Content-Type'] = headers.get('Content-Type', 'application/json') + if 'data' in kwargs and (isinstance(kwargs['data'], dict) or isinstance(kwargs['data'], list)): + kwargs['data'] = json.dumps(kwargs['data']) + return await session.request(method, url, headers=headers, **kwargs) diff --git a/ktqueue/settings.py b/ktqueue/settings.py new file mode 100644 index 0000000..9bceb1f --- /dev/null +++ b/ktqueue/settings.py @@ -0,0 +1,3 @@ +import os + +job_namespace = os.environ.get('JOB_NAMESPACE', 'ktqueue') diff --git a/ktqueue/utils.py b/ktqueue/utils.py new file mode 100644 index 0000000..0567772 --- /dev/null +++ b/ktqueue/utils.py @@ -0,0 +1,29 @@ +# encoding: utf-8 +import os +import re +from ktqueue import settings + + +async def save_job_log(job_name, pod_name, k8s_client): + log_dir = os.path.join('/cephfs/ktqueue/logs', job_name) + if not os.path.exists(log_dir): + os.makedirs(log_dir) + resp = await k8s_client.call_api_raw( + method='GET', + api='/api/v1/namespaces/{namespace}/pods/{pod_name}/log'.format(namespace=settings.job_namespace, pod_name=pod_name) + ) + log_path = os.path.join(log_dir, 'log.txt') + + # Rolling log + if os.path.exists(log_path): + max_id = 0 + for filename in os.listdir(log_dir): + group = re.match(r'log\.(?P\d+)\.log', filename) + if group: + max_id = max(max_id, int(group.group('id'))) + os.rename(log_path, os.path.join(log_dir, 'log.{}.txt'.format(max_id + 1))) + + with open(os.path.join(log_dir, 'log.txt'), 'wb') as f: + async for chunk in resp.content.iter_any(): + f.write(chunk) + resp.close() diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..bd7acab --- /dev/null +++ b/requirements.txt @@ -0,0 +1,4 @@ +tornado +kubernetes +aiohttp +pymongo diff --git a/server.py b/server.py new file mode 100644 index 0000000..e796805 --- /dev/null +++ b/server.py @@ -0,0 +1,87 @@ +# encoding: utf-8 +import asyncio +import os +import sys +import logging + +import pymongo +import tornado +import tornado.web +import tornado.autoreload + +from tornado.simple_httpclient import SimpleAsyncHTTPClient +from tornado.platform.asyncio import AsyncIOMainLoop + +from ktqueue.kubernetes_client import kubernetes_client +from ktqueue.api import JobsHandler +from ktqueue.api import JobLogHandler +from ktqueue.api import ReposHandler +from ktqueue.api import NodesHandler +from ktqueue.api import StopJobHandler +from ktqueue.api import TensorBoardProxyHandler +from ktqueue.api import TensorBoardHandler + + +from ktqueue.event_watcher import watch_pod + +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) + +__frontend_path = os.path.join(BASE_DIR, 'frontend') +__static_path = os.path.join(__frontend_path, 'static') + + +def create_db_index(): + client = pymongo.MongoClient('ktqueue-mongodb') + client.ktqueue.jobs.create_index([("name", pymongo.ASCENDING)], unique=True) + client.ktqueue.credentials.create_index([("repo", pymongo.ASCENDING)], unique=True) + + +def get_app(): + k8s_client = kubernetes_client() + mongo_client = pymongo.MongoClient('ktqueue-mongodb') + application = tornado.web.Application([ + (r'/()', tornado.web.StaticFileHandler, { + 'path': __frontend_path, + 'default_filename': 'index.html' + }), + (r'/nodes', NodesHandler, {'k8s_client': k8s_client, 'mongo_client': mongo_client}), + (r'/jobs', JobsHandler, {'k8s_client': k8s_client, 'mongo_client': mongo_client}), + (r'/jobs/(?P[\w_-]+)/log', JobLogHandler, {'k8s_client': k8s_client, 'mongo_client': mongo_client}), + (r'/job/stop/(?P[\w_\-\.]+)', StopJobHandler, {'k8s_client': k8s_client, 'mongo_client': mongo_client}), + (r'/job/tensorboard/(?P[\w_\-\.]+)', TensorBoardHandler, {'k8s_client': k8s_client, 'mongo_client': mongo_client}), + (r'/repos', ReposHandler, {'mongo_client': mongo_client}), + (r'/static/(.*)', tornado.web.StaticFileHandler, {'path': __static_path}), + (r'/tensorboard/(?P[\w_\-\.]+)/(?P.*)', TensorBoardProxyHandler, {'client': SimpleAsyncHTTPClient(max_clients=64)}), + (r'/data/(?P.*)', TensorBoardProxyHandler, {'client': SimpleAsyncHTTPClient(max_clients=64)}), # This is a hack for TensorBoard + + ], debug=os.environ.get('KTQUEUE_DEBUG', '0') == '1') + return application + + +async def async_init(): + tasks = [ + watch_pod(kubernetes_client()), + ] + await asyncio.wait(tasks) + + +def start_server(): + create_db_index() + AsyncIOMainLoop().install() + app = get_app() + app.listen(8080) + loop = asyncio.get_event_loop() + if os.environ.get('KTQUEUE_DEBUG', '0') == '1': + print('Reload.') + loop.run_until_complete(async_init()) + loop.run_forever() + + +def main(): + logging.basicConfig(stream=sys.stdout, level=int(os.environ.get('KTQUEUE_DEBUG_LEVEL', logging.INFO)), + format='[%(asctime)s] %(name)s:%(levelname)s: %(message)s') + start_server() + + +if __name__ == '__main__': + main() From 55f1a02cf9c0443e8d14cf9ee818951377eb8679 Mon Sep 17 00:00:00 2001 From: Comzyh Date: Wed, 22 Feb 2017 15:30:50 +0800 Subject: [PATCH 002/107] update README.md --- README.md | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 72be5ec..9cdfdbf 100644 --- a/README.md +++ b/README.md @@ -1 +1,12 @@ -# ktq \ No newline at end of file +# ktqueue + +kubernetes task queue + +# feature + +- support GPU tasks +- support assigning task to node manually +- view logs +- mount host-path to Pod +- tensorboard manage & proxy +- git clone repository with ssh-key or username & password From e15c42fd446fbf7ae7bdd140e0306bd2e8db968a Mon Sep 17 00:00:00 2001 From: Comzyh Date: Thu, 23 Feb 2017 10:46:43 +0800 Subject: [PATCH 003/107] fix encoding issue --- ktqueue/api/job.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/ktqueue/api/job.py b/ktqueue/api/job.py index 5916dd8..497416d 100644 --- a/ktqueue/api/job.py +++ b/ktqueue/api/job.py @@ -140,7 +140,15 @@ async def post(self): }, { 'name': 'WORK_DIR', - 'value': os.path.join(job_dir, 'code'), + 'value': os.path.join(job_dir, 'code') + }, + { + 'name': 'LC_ALL', + 'value': 'en_US.UTF-8' + }, + { + 'name': 'LC_CTYPE', + 'value': 'en_US.UTF-8' }, ] } From f37e9d17ee062e43c1f317450d5e4f10b8e3eef1 Mon Sep 17 00:00:00 2001 From: Comzyh Date: Mon, 27 Feb 2017 10:00:47 +0800 Subject: [PATCH 004/107] move deployment yaml --- ktqueue.yaml => deploy/ktqueue.yaml | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename ktqueue.yaml => deploy/ktqueue.yaml (100%) diff --git a/ktqueue.yaml b/deploy/ktqueue.yaml similarity index 100% rename from ktqueue.yaml rename to deploy/ktqueue.yaml From a29526da050ab20a37d9c5eb356162793b2d1a8f Mon Sep 17 00:00:00 2001 From: Comzyh Date: Mon, 27 Feb 2017 10:50:00 +0800 Subject: [PATCH 005/107] update document --- README.md | 4 +++ deploy/README.md | 67 ++++++++++++++++++++++++++++++++++++++------- deploy/ktqueue.yaml | 3 ++ 3 files changed, 64 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 9cdfdbf..1010fbc 100644 --- a/README.md +++ b/README.md @@ -10,3 +10,7 @@ kubernetes task queue - mount host-path to Pod - tensorboard manage & proxy - git clone repository with ssh-key or username & password + +# deploy + +deploy guide under [deploy](./deploy) directory diff --git a/deploy/README.md b/deploy/README.md index c0182f2..a3a793b 100644 --- a/deploy/README.md +++ b/deploy/README.md @@ -1,31 +1,51 @@ # Deployment +# Brief + +1. setup kubernetes and ceph cluster +2. add ceph-secret to kubernetes +3. deploy mongodb based on kubernetes and ceph +4. build ktqueue docker image +5. deploy ktqueue + +# File list + +- ktqueue.yaml - mongodb-service.yaml - mongodb-dev.yaml - mongodb-production.yaml -# modify configure +# dependancy -> cp mongodb-production.yaml dep-mongodb-production.yaml +ktqueue requires kubernetes and cephfs, make sure you already have them. + +# prepare ceph -update your own ceph configure +kubernetes and other components needs permission (aka, a secret) to access ceph. -if you want to know mon in your ceph cluster, just type +if you want to know mons in your ceph cluster, just type: > ceph mon stat -get your ceph-secret to kubernetes +then, get your ceph-secret to kubernetes > sudo ceph auth get-key client.admin | base64 -Note: though ceph get-key is encoded by base64, you should encode it again. +Note: though `ceph get-key`'s response is encoded by base64, you should encode it again. + +then, update your ceph-secret.yaml, add your secret after `key:` -then, update your ceph-secret.yaml, and import `ceph-secret.yaml` +> cp ceph-secret.yaml dep-ceph-secret.yaml && vi dep-ceph-secret.yaml + +and import `ceph-secret.yaml` -> cp ceph-secret.yaml dep-ceph-secret.yaml && vi dep-ceph-secret.yaml > kubectl create -f dep-ceph-secret.yaml -# create rbd +# deploy mongodb + +## create rbd + +mongodb needs a `persistent volume`(just like a disk) to store data. so you should create one. > rbd create rbd/ktqueue-mongodb -s 10240 @@ -36,10 +56,12 @@ try: > rbd map ktqueue-mongodb current linux kernal doesn't support all the features. if you get error, refer [this](http://tonybai.com/2016/11/07/integrate-kubernetes-with-ceph-rbd/) + > rbd feature disable ktqueue-mongodb exclusive-lock, object-map, fast-diff, deep-flatten -# create services +## create mongodb services +> cp mongodb-production.yaml dep-mongodb-production.yaml create mongodb service @@ -48,3 +70,28 @@ create mongodb service create mongodb server > kubectl create -f dep-mongodb-production.yaml + +# deploy ktqueue + +## mount cephfs +ktqueue dameon needs to access ceph to clone code, store log, etc. and ktqueue job needs to access ceph to store output. + +so you should ensure that cephfs has been mounted at `/mnt/cephfs` on ensure every single node you want to run ktqueue jobs or ktqueue dameon. + +you should modify `fstab` and add cephfs mount. + +## deploy + +> cp ktqueue.yaml dep-ktqueue.yaml + +if you want to change IP/port of ktqueue or assign ktqueue to a specific node, you should modify `dep-ktqueue.yaml` + +to select node you want to run ktqueue, change `host_name_you_want` after `kubernetes.io/hostname` and uncomment this line. + +to select IP/port, change `ip_you_want_to_access_form_outside` under `externalIPs` + +finish it: + +> kubectl create -f dep-ktqueue.yaml + +enjoy! diff --git a/deploy/ktqueue.yaml b/deploy/ktqueue.yaml index 0ebcb5a..4e39e91 100644 --- a/deploy/ktqueue.yaml +++ b/deploy/ktqueue.yaml @@ -22,6 +22,7 @@ spec: spec: nodeSelector: beta.kubernetes.io/arch: amd64 + #kubernetes.io/hostname: host_name_you_want serviceAccountName: ktqueue containers: - name: ktqueue @@ -61,6 +62,8 @@ spec: ports: - port: 80 targetPort: 8080 + # externalIPs: + # - ip_you_want_to_access_form_outside selector: app: ktqueue --- From 8dae9baa32f875d9b6a0d88e0d317989c707d652 Mon Sep 17 00:00:00 2001 From: Comzyh Date: Mon, 27 Feb 2017 11:01:10 +0800 Subject: [PATCH 006/107] update document --- deploy/README.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/deploy/README.md b/deploy/README.md index a3a793b..c88e080 100644 --- a/deploy/README.md +++ b/deploy/README.md @@ -80,6 +80,13 @@ so you should ensure that cephfs has been mounted at `/mnt/cephfs` on ensure eve you should modify `fstab` and add cephfs mount. +## build image + +> docker build -t ktqueue . + +you can modify `dep-ktqueue.yaml`(see next step)and change image name if you want. + + ## deploy > cp ktqueue.yaml dep-ktqueue.yaml From 964055ea7e9c18c122298f03d421af67da552e49 Mon Sep 17 00:00:00 2001 From: Comzyh Date: Mon, 27 Feb 2017 16:30:09 +0800 Subject: [PATCH 007/107] support rbac --- deploy/ktqueue-rbac.yaml | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 deploy/ktqueue-rbac.yaml diff --git a/deploy/ktqueue-rbac.yaml b/deploy/ktqueue-rbac.yaml new file mode 100644 index 0000000..7d5bec8 --- /dev/null +++ b/deploy/ktqueue-rbac.yaml @@ -0,0 +1,26 @@ +# Create the clusterrole: +# $ kubectl create -f ktqueue-rbac.yaml +# Bind the ktqueue serviceaccount to the ktqueue clusterrole: +# $ kubectl create clusterrolebinding ktqueue --clusterrole=ktqueue --serviceaccount=default:ktqueue +--- +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1beta1 +metadata: + name: ktqueue +rules: + - apiGroups: + - "" + resources: + - pods + verbs: ["*"] + - apiGroups: + - "" + resources: + - nodes + verbs: + - list + - apiGroups: + - "" + resources: + - jobs + verbs: ["*"] From 2b80985e22efccf01a7d48baaab4c2872b803dc1 Mon Sep 17 00:00:00 2001 From: Comzyh Date: Mon, 27 Feb 2017 22:19:00 +0900 Subject: [PATCH 008/107] fix rbac --- deploy/ktqueue-rbac.yaml | 2 +- ktqueue/kubernetes_client.py | 11 ++++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/deploy/ktqueue-rbac.yaml b/deploy/ktqueue-rbac.yaml index 7d5bec8..174c690 100644 --- a/deploy/ktqueue-rbac.yaml +++ b/deploy/ktqueue-rbac.yaml @@ -20,7 +20,7 @@ rules: verbs: - list - apiGroups: - - "" + - "batch" resources: - jobs verbs: ["*"] diff --git a/ktqueue/kubernetes_client.py b/ktqueue/kubernetes_client.py index bd4c82a..78d6443 100644 --- a/ktqueue/kubernetes_client.py +++ b/ktqueue/kubernetes_client.py @@ -1,5 +1,6 @@ # encoding: utf-8 import os +import logging import aiohttp import kubernetes import json @@ -47,7 +48,15 @@ def get_service_account_config(cls): async def call_api(self, api, method='GET', **kwargs): resp = await self.call_api_raw(api=api, method=method, **kwargs) - return json.loads(await resp.text()) + text = await resp.text() + try: + result = json.loads(text) + except Exception as e: + print(api, method, kwargs) + print(text) + raise + else: + return result async def call_api_raw(self, api, method='GET', **kwargs): session = kwargs.pop('session', self.session) From 3ea2dfeadd02ceaed5292383136188f9cea5af83 Mon Sep 17 00:00:00 2001 From: Comzyh Date: Mon, 27 Feb 2017 22:25:58 +0900 Subject: [PATCH 009/107] fix pods/log permission --- deploy/ktqueue-rbac.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/deploy/ktqueue-rbac.yaml b/deploy/ktqueue-rbac.yaml index 174c690..6f5fccf 100644 --- a/deploy/ktqueue-rbac.yaml +++ b/deploy/ktqueue-rbac.yaml @@ -12,6 +12,7 @@ rules: - "" resources: - pods + - pods/log verbs: ["*"] - apiGroups: - "" From ad24e5eaf0cb6ccc058914a003a44b5394ba1b58 Mon Sep 17 00:00:00 2001 From: Comzyh Date: Tue, 28 Feb 2017 15:26:34 +0800 Subject: [PATCH 010/107] requested feature --- frontend/index.html | 47 ++++++++++++++++++++----------------- frontend/static/ktqueue.css | 19 +++++++++++++++ ktqueue/api/job.py | 13 +++++----- ktqueue/event_watcher.py | 23 ++++++++++++++---- 4 files changed, 70 insertions(+), 32 deletions(-) diff --git a/frontend/index.html b/frontend/index.html index 3eb56f9..cf38d41 100644 --- a/frontend/index.html +++ b/frontend/index.html @@ -43,10 +43,10 @@

KTQueue

v-loading="jobsData.loading"> - - - + label="Node" + width="100"> + + KTQueue align="center"> + label="creation" + width="200"> @@ -337,7 +337,7 @@

KTQueue

} resource.body.loading=false; this.jobsData = resource.body; - }) + }); }, createJob: function() { this.$http.post("/jobs", this.createJobDialog.data).then(function(resource){ @@ -431,7 +431,10 @@

KTQueue

loadJobLog: function(job_name){ this.$http.get('/jobs/' + job_name + '/log').then(function(resource){ this.log_text = resource.body; - }) + }).catch(function(response) { + this.$message.error('Unable to load Log!\n'); + console.error(response.body); + }); } } }); diff --git a/frontend/static/ktqueue.css b/frontend/static/ktqueue.css index 44c2711..27c6b97 100644 --- a/frontend/static/ktqueue.css +++ b/frontend/static/ktqueue.css @@ -21,3 +21,22 @@ body { height: 1em; float: left; } +.job-expand-item { + margin-bottom: 10px; + overflow: auto; + zoom: 1; +} +.job-expand-item > label { + width: 100px; + float: left; + text-align: right; + padding-right: 1em; + box-sizing: border-box; +} +.job-expand-item > div { + margin-left: 100px; +} +.status-stop-wrap { + display: inline-block; + width: 32px; +} diff --git a/ktqueue/api/job.py b/ktqueue/api/job.py index 497416d..284ce1b 100644 --- a/ktqueue/api/job.py +++ b/ktqueue/api/job.py @@ -227,12 +227,13 @@ async def get(self, job): method='GET', api='/api/v1/namespaces/{namespace}/pods/{pod_name}/log'.format(namespace=settings.job_namespace, pod_name=pod_name) ) - async for chunk in resp.content.iter_any(): - self.write(chunk) - resp.close() - else: - with open(os.path.join('/cephfs/ktqueue/logs', job, 'log.txt'), 'r') as f: - self.finish(f.read()) + if resp.status == 200: + async for chunk in resp.content.iter_any(): + self.write(chunk) + resp.close() + return + with open(os.path.join('/cephfs/ktqueue/logs', job, 'log.txt'), 'r') as f: + self.finish(f.read()) class StopJobHandler(tornado.web.RequestHandler): diff --git a/ktqueue/event_watcher.py b/ktqueue/event_watcher.py index 95489bf..1bf6f86 100644 --- a/ktqueue/event_watcher.py +++ b/ktqueue/event_watcher.py @@ -73,12 +73,27 @@ async def callback(event): status = (k, v.get('reason', None)) status_str = '{}: {}'.format(*status) continue + logging.info('Job {} enter state {}'.format(job_name, status_str)) + + job_update = { + 'state': state + } + + # update status + if status == ('terminated', 'Completed'): + job_update['status'] = 'Completed' + else: + job_update['status'] = status_str + + # update Running Node + if status[0] == 'terminated': + job_update['runningNode'] = None + elif event['object']['spec'].get('nodeName', None): + job_update['runningNode'] = event['object']['spec']['nodeName'] + if jobs_collection.find_one({'name': job_name})['status'] != 'ManualStop': - jobs_collection.update_one({'name': job_name}, {'$set': { - 'status': status_str, - 'state': state, - }}) + jobs_collection.update_one({'name': job_name}, {'$set': job_update}) # When a job is successful finished, save log and do not watch it any more if status[0] == 'terminated': From 92872708e4ff8623c8ec84f87ddc22f3a3d619c6 Mon Sep 17 00:00:00 2001 From: Comzyh Date: Tue, 28 Feb 2017 17:18:04 +0800 Subject: [PATCH 011/107] support multi version log --- frontend/index.html | 29 +++++++++++++++++++++++++---- ktqueue/api/__init__.py | 1 + ktqueue/api/job.py | 17 ++++++++++++++++- ktqueue/utils.py | 20 ++++++++++++++------ server.py | 3 +++ 5 files changed, 59 insertions(+), 11 deletions(-) diff --git a/frontend/index.html b/frontend/index.html index cf38d41..caf683b 100644 --- a/frontend/index.html +++ b/frontend/index.html @@ -50,7 +50,6 @@

KTQueue

TensorBoard

-
KTQueue @@ -422,19 +429,33 @@

KTQueue

data: function(){ return { log_text: '', + selected_version: 'current', + versions: [], } }, mounted:function() { - this.loadJobLog(this.$route.params.job_name); + this.loadJobLogVersions(this.$route.params.job_name); + this.loadJobLog(this.$route.params.job_name, 'current'); }, methods: { - loadJobLog: function(job_name){ - this.$http.get('/jobs/' + job_name + '/log').then(function(resource){ + loadJobLog: function(job_name, version){ + this.$http.get('/jobs/' + job_name + '/log/' + version ).then(function(resource){ this.log_text = resource.body; }).catch(function(response) { this.$message.error('Unable to load Log!\n'); console.error(response.body); }); + }, + loadJobLogVersions: function(job_name) { + this.$http.get('/jobs/' + job_name + '/log/version').then(function(resource){ + this.versions = resource.body.versions; + }).catch(function(response) { + this.$message.error('Unable to load Log versions!\n'); + console.error(response.body); + }); + }, + versionChange: function(version) { + this.loadJobLog(this.$route.params.job_name, version); } } }); diff --git a/ktqueue/api/__init__.py b/ktqueue/api/__init__.py index 444107f..3a478ef 100644 --- a/ktqueue/api/__init__.py +++ b/ktqueue/api/__init__.py @@ -3,6 +3,7 @@ from .job import JobLogHandler from .job import StopJobHandler from .job import TensorBoardHandler +from .job import JobLogVersionHandler from .repo import ReposHandler from .node import NodesHandler from .tensorboard_proxy import TensorBoardProxyHandler diff --git a/ktqueue/api/job.py b/ktqueue/api/job.py index 284ce1b..3dc5422 100644 --- a/ktqueue/api/job.py +++ b/ktqueue/api/job.py @@ -207,6 +207,17 @@ async def get(self): })) +class JobLogVersionHandler(tornado.web.RequestHandler): + @convert_asyncio_task + async def get(self, job): + from ktqueue.utils import get_log_versions + versions = get_log_versions(job) + self.write({ + 'job': job, + 'versions': versions + }) + + class JobLogHandler(tornado.web.RequestHandler): def initialize(self, k8s_client, mongo_client): @@ -215,7 +226,11 @@ def initialize(self, k8s_client, mongo_client): self.jobs_collection = mongo_client.ktqueue.jobs @convert_asyncio_task - async def get(self, job): + async def get(self, job, version=None): + if version and int(version): + with open(os.path.join('/cephfs/ktqueue/logs', job, 'log.{version}.txt'.format(version=version)), 'r') as f: + self.finish(f.read()) + return pods = await self.k8s_client.call_api( method='GET', api='/api/v1/namespaces/{namespace}/pods'.format(namespace=settings.job_namespace), diff --git a/ktqueue/utils.py b/ktqueue/utils.py index 0567772..e5af93a 100644 --- a/ktqueue/utils.py +++ b/ktqueue/utils.py @@ -4,6 +4,16 @@ from ktqueue import settings +def get_log_versions(job_name): + log_dir = os.path.join('/cephfs/ktqueue/logs', job_name) + versions = [] + for filename in os.listdir(log_dir): + group = re.match(r'log\.(?P\d+)\.txt', filename) + if group: + versions.append(int(group.group('id'))) + return versions + + async def save_job_log(job_name, pod_name, k8s_client): log_dir = os.path.join('/cephfs/ktqueue/logs', job_name) if not os.path.exists(log_dir): @@ -16,12 +26,10 @@ async def save_job_log(job_name, pod_name, k8s_client): # Rolling log if os.path.exists(log_path): - max_id = 0 - for filename in os.listdir(log_dir): - group = re.match(r'log\.(?P\d+)\.log', filename) - if group: - max_id = max(max_id, int(group.group('id'))) - os.rename(log_path, os.path.join(log_dir, 'log.{}.txt'.format(max_id + 1))) + max_version = 0 + for version in get_log_versions(job_name=job_name): + max_version = max(max_version, int(version)) + os.rename(log_path, os.path.join(log_dir, 'log.{}.txt'.format(max_version + 1))) with open(os.path.join(log_dir, 'log.txt'), 'wb') as f: async for chunk in resp.content.iter_any(): diff --git a/server.py b/server.py index e796805..e04d14f 100644 --- a/server.py +++ b/server.py @@ -15,6 +15,7 @@ from ktqueue.kubernetes_client import kubernetes_client from ktqueue.api import JobsHandler from ktqueue.api import JobLogHandler +from ktqueue.api import JobLogVersionHandler from ktqueue.api import ReposHandler from ktqueue.api import NodesHandler from ktqueue.api import StopJobHandler @@ -47,6 +48,8 @@ def get_app(): (r'/nodes', NodesHandler, {'k8s_client': k8s_client, 'mongo_client': mongo_client}), (r'/jobs', JobsHandler, {'k8s_client': k8s_client, 'mongo_client': mongo_client}), (r'/jobs/(?P[\w_-]+)/log', JobLogHandler, {'k8s_client': k8s_client, 'mongo_client': mongo_client}), + (r'/jobs/(?P[\w_-]+)/log/(?P\d+|current)', JobLogHandler, {'k8s_client': k8s_client, 'mongo_client': mongo_client}), + (r'/jobs/(?P[\w_-]+)/log/version', JobLogVersionHandler), (r'/job/stop/(?P[\w_\-\.]+)', StopJobHandler, {'k8s_client': k8s_client, 'mongo_client': mongo_client}), (r'/job/tensorboard/(?P[\w_\-\.]+)', TensorBoardHandler, {'k8s_client': k8s_client, 'mongo_client': mongo_client}), (r'/repos', ReposHandler, {'mongo_client': mongo_client}), From 4bb4d5d8af71179a90de32141bc014acce0f7b05 Mon Sep 17 00:00:00 2001 From: Comzyh Date: Tue, 28 Feb 2017 17:50:37 +0800 Subject: [PATCH 012/107] finish log --- frontend/index.html | 17 +++++++++++++---- ktqueue/api/job.py | 2 +- ktqueue/utils.py | 2 +- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/frontend/index.html b/frontend/index.html index caf683b..ecc6e96 100644 --- a/frontend/index.html +++ b/frontend/index.html @@ -123,7 +123,8 @@

KTQueue

+ :value="item.labels['kubernetes.io/hostname']" + :key="item.labels['kubernetes.io/hostname']"> @@ -221,14 +222,15 @@

KTQueue

@@ -236,6 +238,7 @@

KTQueue

+ - diff --git a/ktqueue/api/__init__.py b/ktqueue/api/__init__.py index 60d3a00..acddd49 100644 --- a/ktqueue/api/__init__.py +++ b/ktqueue/api/__init__.py @@ -8,3 +8,4 @@ from .node import NodesHandler from .tensorboard_proxy import TensorBoardProxyHandler from .oauth import OAuth2Handler +from .user import CurrentUserHandler diff --git a/ktqueue/api/oauth.py b/ktqueue/api/oauth.py index 5b90c08..79a0f8b 100644 --- a/ktqueue/api/oauth.py +++ b/ktqueue/api/oauth.py @@ -6,7 +6,6 @@ import tornado.auth import tornado.httpclient - import ktqueue.settings @@ -59,6 +58,7 @@ async def get(self): {'$set': data}, upsert=True ) + self.set_secure_cookie('user', resp['login']) self.redirect('/') else: await self.authorize_redirect( diff --git a/ktqueue/api/user.py b/ktqueue/api/user.py new file mode 100644 index 0000000..ddea98b --- /dev/null +++ b/ktqueue/api/user.py @@ -0,0 +1,8 @@ +# encoding: utf-8 +from .utils import BaseHandler + + +class CurrentUserHandler(BaseHandler): + + def get(self): + self.finish({'user': self.get_current_user()}) diff --git a/ktqueue/api/utils.py b/ktqueue/api/utils.py index 9eb53f6..d5b7cb5 100644 --- a/ktqueue/api/utils.py +++ b/ktqueue/api/utils.py @@ -1,6 +1,7 @@ # encoding: utf-8 import asyncio import functools +import tornado.web def convert_asyncio_task(method): @@ -10,3 +11,11 @@ async def wrapper(self, *args, **kwargs): coro = method(self, *args, **kwargs) return await asyncio.get_event_loop().create_task(coro) return wrapper + + +class BaseHandler(tornado.web.RequestHandler): + def get_current_user(self): + user = self.get_secure_cookie("user") + if user: + return user.decode('utf-8') + return None diff --git a/server.py b/server.py index df98f36..330c151 100644 --- a/server.py +++ b/server.py @@ -23,6 +23,7 @@ from ktqueue.api import TensorBoardProxyHandler from ktqueue.api import TensorBoardHandler from ktqueue.api import OAuth2Handler +from ktqueue.api import CurrentUserHandler from ktqueue.event_watcher import watch_pod @@ -70,6 +71,7 @@ def get_app(): (r'/data/(?P.*)', TensorBoardProxyHandler, {'client': SimpleAsyncHTTPClient(max_clients=64)}), # This is a hack for TensorBoard (r'/oauth2/start', OAuth2Handler, {'mongo_client': mongo_client}), (r'/oauth2/callback', OAuth2Handler, {'mongo_client': mongo_client}), + (r'/current_user', CurrentUserHandler), ], **app_kwargs) return application From 7f827055e0b0c990d33523589aee85da6098f833 Mon Sep 17 00:00:00 2001 From: Comzyh Date: Thu, 2 Mar 2017 19:58:21 +0800 Subject: [PATCH 016/107] support comments --- frontend/index.html | 39 +++++++++++++++++++++++++++++++++---- frontend/static/ktqueue.css | 3 +++ ktqueue/api/job.py | 12 +++++++++++- ktqueue/api/repo.py | 1 + ktqueue/api/user.py | 6 +++++- ktqueue/api/utils.py | 3 +++ 6 files changed, 58 insertions(+), 6 deletions(-) diff --git a/frontend/index.html b/frontend/index.html index 3bf3bbe..ca6e12c 100644 --- a/frontend/index.html +++ b/frontend/index.html @@ -21,17 +21,17 @@

KTQueue

Jobs - Repos + Repos {{current_user}} Login
- + diff --git a/frontend/static/ktqueue.css b/frontend/static/ktqueue.css index 27c6b97..3a2adf4 100644 --- a/frontend/static/ktqueue.css +++ b/frontend/static/ktqueue.css @@ -36,6 +36,9 @@ body { .job-expand-item > div { margin-left: 100px; } +.job-expand-item > div > pre{ + margin: 0; +} .status-stop-wrap { display: inline-block; width: 32px; diff --git a/ktqueue/api/job.py b/ktqueue/api/job.py index e82f1a2..82727e8 100644 --- a/ktqueue/api/job.py +++ b/ktqueue/api/job.py @@ -9,9 +9,10 @@ from .utils import convert_asyncio_task from ktqueue.utils import save_job_log from ktqueue import settings +from .utils import BaseHandler -class JobsHandler(tornado.web.RequestHandler): +class JobsHandler(BaseHandler): def initialize(self, k8s_client, mongo_client): self.k8s_client = k8s_client @@ -19,6 +20,7 @@ def initialize(self, k8s_client, mongo_client): self.jobs_collection = mongo_client.ktqueue.jobs @convert_asyncio_task + @tornado.web.authenticated async def post(self): """ Create a new job. @@ -32,6 +34,8 @@ async def post(self): "commit_id": "3701b94219fb06974f485cabf99ad88019afe618" } """ + user = self.get_current_user() + body_arguments = json.loads(self.request.body.decode('utf-8')) name = body_arguments.get('name') @@ -49,6 +53,7 @@ async def post(self): repo = body_arguments.get('repo', None) branch = body_arguments.get('branch', None) commit_id = body_arguments.get('commit_id', None) + comments = body_arguments.get('comments', None) command_kube = 'cd $WORK_DIR && ' + command @@ -164,11 +169,13 @@ async def post(self): self.jobs_collection.update_one({'name': name}, {'$set': { 'name': name, 'node': node, + 'user': user, 'command': command, 'gpu_num': gpu_num, 'repo': repo, 'branch': branch, 'commit_id': commit_id, + 'comments': comments, 'image': image, 'status': 'fetching', 'tensorboard': False, @@ -266,6 +273,7 @@ def initialize(self, k8s_client, mongo_client): self.jobs_collection = mongo_client.ktqueue.jobs @convert_asyncio_task + @tornado.web.authenticated async def post(self, job): pods = await self.k8s_client.call_api( method='GET', @@ -295,6 +303,7 @@ def initialize(self, k8s_client, mongo_client): self.jobs_collection = mongo_client.ktqueue.jobs @convert_asyncio_task + @tornado.web.authenticated async def post(self, job): job_image = self.jobs_collection.find_one({'name': job})['image'] body_arguments = json.loads(self.request.body.decode('utf-8')) @@ -361,6 +370,7 @@ async def post(self, job): self.write(ret) @convert_asyncio_task + @tornado.web.authenticated async def delete(self, job): pods = await self.k8s_client.call_api( method='GET', diff --git a/ktqueue/api/repo.py b/ktqueue/api/repo.py index 160582b..a64198e 100644 --- a/ktqueue/api/repo.py +++ b/ktqueue/api/repo.py @@ -15,6 +15,7 @@ def initialize(self, mongo_client): self.mongo_client = mongo_client self.repos_collection = self.mongo_client.ktqueue.credentials + @tornado.web.authenticated async def post(self): """create a credential for repo, support both ssh & https. ssh e.x.: diff --git a/ktqueue/api/user.py b/ktqueue/api/user.py index ddea98b..c0d6b96 100644 --- a/ktqueue/api/user.py +++ b/ktqueue/api/user.py @@ -1,8 +1,12 @@ # encoding: utf-8 from .utils import BaseHandler +import ktqueue.settings class CurrentUserHandler(BaseHandler): def get(self): - self.finish({'user': self.get_current_user()}) + self.finish({ + 'user': self.get_current_user(), + 'auth_required': ktqueue.settings.auth_required == '1' + }) diff --git a/ktqueue/api/utils.py b/ktqueue/api/utils.py index d5b7cb5..9ab8593 100644 --- a/ktqueue/api/utils.py +++ b/ktqueue/api/utils.py @@ -2,6 +2,7 @@ import asyncio import functools import tornado.web +import ktqueue.settings def convert_asyncio_task(method): @@ -18,4 +19,6 @@ def get_current_user(self): user = self.get_secure_cookie("user") if user: return user.decode('utf-8') + if not ktqueue.settings.auth_required: + return 'anonymous' return None From 0409f7fe3499b84a9791ae1b66eddf1b20ef09cf Mon Sep 17 00:00:00 2001 From: Comzyh Date: Thu, 2 Mar 2017 21:02:14 +0800 Subject: [PATCH 017/107] deployment yaml --- deploy/ktqueue.yaml | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/deploy/ktqueue.yaml b/deploy/ktqueue.yaml index 4e39e91..9ab6078 100644 --- a/deploy/ktqueue.yaml +++ b/deploy/ktqueue.yaml @@ -29,6 +29,19 @@ spec: image: ktqueue imagePullPolicy: IfNotPresent command: [ "python3", "/ktqueue/server.py" ] + env: + - name: KTQ_AUTH_REQUIRED + value: '0' + - name: KTQ_COOKIE_SECRET + value: '' + - name: KTQ_OAUTH2_PROVIDER + value: github + - name: KTQ_OAUTH2_CLIENT_ID + value: '' + - name: KTQ_OAUTH2_CLIENT_SECRET + value: '' + - name: KTQ_OAUTH2_CALLBACK + value: '' volumeMounts: - name: cephfs mountPath: /cephfs From f64e8dde7f04e7205987483aa3a896ac2f5dc71a Mon Sep 17 00:00:00 2001 From: Comzyh Date: Fri, 3 Mar 2017 17:09:57 +0800 Subject: [PATCH 018/107] support hide & favorite --- frontend/index.html | 64 +++++++++++++++++++++++++++++++++++-- frontend/static/ktqueue.css | 8 +++++ ktqueue/api/job.py | 42 +++++++++++++++++++++--- ktqueue/api/repo.py | 4 ++- ktqueue/settings.py | 2 +- server.py | 5 +-- 6 files changed, 115 insertions(+), 10 deletions(-) diff --git a/frontend/index.html b/frontend/index.html index ca6e12c..9632acd 100644 --- a/frontend/index.html +++ b/frontend/index.html @@ -32,6 +32,11 @@

KTQueue

Create Job + + + + + KTQueue
{{ scope.row.commit_id }}
{{ scope.row.command }}
{{ scope.row.comments }}
- +
+
+ + +
+

TensorBoard

@@ -65,6 +80,15 @@

KTQueue

Clone + + + KTQueue }, data: function(){ return { + jobsFilter: 'All', jobsData: { count: 0, page_size: 20, @@ -364,7 +389,14 @@

KTQueue

}, loadJobs: function(page) { this.jobsData.loading=true; - this.$http.get('/jobs?page=' + page).then(function(resource){ + var params = {}; + params['page'] = page; + if (this.jobsFilter === 'Hidden') { + params['hide'] = '1' + } else if (this.jobsFilter === 'Fav') { + params['fav'] = '1' + } + this.$http.get('/jobs', {params: params}).then(function(resource){ for (var i = 0; i < resource.body.data.length; i++){ var line = resource.body.data[i]; if (line['creationTimestamp']) { @@ -411,6 +443,34 @@

KTQueue

if (index !== -1) { this.createJobDialog.data.volumeMounts.splice(index, 1) } + }, + jobHideChange: function(line, event) { + this.$http.put('/jobs', { + '_id': line._id, + 'hide': event + }).then(function(){ + line.hide = event; + }).catch(function(){ + line.hide = !event; + }) + return !event; + }, + toggleFavorite: function(line, event) { + var icon = event.target; + if (icon.tagName !== 'I') { + icon=icon.querySelector("i") + } + icon.className='el-icon-loading'; + this.$http.put('/jobs', { + '_id': line._id, + 'fav': !line.fav + }).then(function(){ + line.fav = !line.fav; + }).catch(function(){ + }) + }, + jobsFilterChange: function() { + this.loadJobs(1); } } }); diff --git a/frontend/static/ktqueue.css b/frontend/static/ktqueue.css index 3a2adf4..441e995 100644 --- a/frontend/static/ktqueue.css +++ b/frontend/static/ktqueue.css @@ -16,6 +16,11 @@ body { .table-header .el-pagination { display: inline-block; } +.table-header .el-pagination, +.table-header .el-button, +.table-header .el-radio-group { + vertical-align: middle;; +} .inline-form-span { width: 1em; height: 1em; @@ -33,6 +38,9 @@ body { padding-right: 1em; box-sizing: border-box; } +.job-expand-item .el-tag+.el-tag { + margin-left: 1em; +} .job-expand-item > div { margin-left: 100px; } diff --git a/ktqueue/api/job.py b/ktqueue/api/job.py index 82727e8..34742ba 100644 --- a/ktqueue/api/job.py +++ b/ktqueue/api/job.py @@ -2,6 +2,7 @@ import json import os import logging +import bson import tornado.web @@ -208,8 +209,29 @@ async def post(self): async def get(self): page = int(self.get_argument('page', 1)) page_size = int(self.get_argument('page_size', 20)) - count = self.jobs_collection.count() - jobs = list(self.jobs_collection.find().sort("_id", -1).skip(page_size * (page - 1)).limit(page_size)) + hide = self.get_argument('hide', None) + fav = self.get_argument('fav', None) + tags = self.get_arguments('tag') + + query = {} + + # hide + if hide is None: # default is False + query['hide'] = False + elif hide != 'all': # 'all' means no filter + query['hide'] = False if hide == '0' else True + + # tags + if tags: + query['tags': {'$all': tags}] + + # fav + if fav: + query['fav'] = True if fav == '1' else False + + print(query) + count = self.jobs_collection.count(query) + jobs = list(self.jobs_collection.find(query).sort("_id", -1).skip(page_size * (page - 1)).limit(page_size)) for job in jobs: job['_id'] = str(job['_id']) self.finish(json.dumps({ @@ -219,6 +241,18 @@ async def get(self): 'data': jobs, })) + @tornado.web.authenticated + async def put(self): + """modify job. + only part of fields can be modified. + """ + body_arguments = json.loads(self.request.body.decode('utf-8')) + update_data = {k: v for k, v in body_arguments.items() if k in ('hide', 'comments', 'tags', 'fav')} + self.jobs_collection.update_one({'_id': bson.ObjectId(body_arguments['_id'])}, {'$set': update_data}) + ret = self.jobs_collection.find_one({'_id': bson.ObjectId(body_arguments['_id'])}) + ret['_id'] = str(ret['_id']) + self.finish(ret) + class JobLogVersionHandler(tornado.web.RequestHandler): @@ -232,7 +266,7 @@ async def get(self, job): }) -class JobLogHandler(tornado.web.RequestHandler): +class JobLogHandler(BaseHandler): def initialize(self, k8s_client, mongo_client): self.k8s_client = k8s_client @@ -265,7 +299,7 @@ async def get(self, job, version=None): self.finish(f.read()) -class StopJobHandler(tornado.web.RequestHandler): +class StopJobHandler(BaseHandler): def initialize(self, k8s_client, mongo_client): self.k8s_client = k8s_client diff --git a/ktqueue/api/repo.py b/ktqueue/api/repo.py index a64198e..a14b0af 100644 --- a/ktqueue/api/repo.py +++ b/ktqueue/api/repo.py @@ -5,8 +5,10 @@ import tornado.web +from .utils import BaseHandler -class ReposHandler(tornado.web.RequestHandler): + +class ReposHandler(BaseHandler): __https_pattern = re.compile(r'https:\/\/(\w+@\w+)?[\w.\/]*.git') __ssh_pattern = re.compile(r'\w+@[\w.]+:\w+\/\w+\.git') diff --git a/ktqueue/settings.py b/ktqueue/settings.py index 0d49cdb..244eb70 100644 --- a/ktqueue/settings.py +++ b/ktqueue/settings.py @@ -1,7 +1,7 @@ import os job_namespace = os.environ.get('KTQ_JOB_NAMESPACE', 'ktqueue') -auth_required = os.environ.get('KTQ_AUTH_REQUIRED', False) +auth_required = True if os.environ.get('KTQ_AUTH_REQUIRED', '0') == '1' else False cookie_secret = os.environ.get('KTQ_COOKIE_SECRET', '') oauth2_provider = os.environ.get('KTQ_OAUTH2_PROVIDER', 'github') oauth2_clinet_id = os.environ.get('KTQ_OAUTH2_CLIENT_ID', '') diff --git a/server.py b/server.py index 330c151..ee72100 100644 --- a/server.py +++ b/server.py @@ -40,6 +40,8 @@ def create_db_index(): client.ktqueue.jobs.create_index([("hide", pymongo.ASCENDING)]) client.ktqueue.credentials.create_index([("repo", pymongo.ASCENDING)], unique=True) client.ktqueue.oauth.create_index([("provider", pymongo.ASCENDING), ("id", pymongo.ASCENDING)], unique=True) + client.ktqueue.jobs.update({'hide': {'$exists': False}}, {'$set': {'hide': False}}) + client.ktqueue.jobs.update({'fav': {'$exists': False}}, {'$set': {'fav': False}}) def get_app(): @@ -51,8 +53,7 @@ def get_app(): # debug app_kwargs['debug'] = os.environ.get('KTQUEUE_DEBUG', '0') == '1' # cookie_secret - if ktqueue.settings.auth_required: - app_kwargs['cookie_secret'] = ktqueue.settings.cookie_secret + app_kwargs['cookie_secret'] = ktqueue.settings.cookie_secret application = tornado.web.Application([ (r'/()', tornado.web.StaticFileHandler, { 'path': __frontend_path, From cd00b4ddc38ddbae834df8151c24058813083a5c Mon Sep 17 00:00:00 2001 From: Comzyh Date: Fri, 3 Mar 2017 17:24:16 +0800 Subject: [PATCH 019/107] use update_many rather than update --- server.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server.py b/server.py index ee72100..58b031b 100644 --- a/server.py +++ b/server.py @@ -40,8 +40,8 @@ def create_db_index(): client.ktqueue.jobs.create_index([("hide", pymongo.ASCENDING)]) client.ktqueue.credentials.create_index([("repo", pymongo.ASCENDING)], unique=True) client.ktqueue.oauth.create_index([("provider", pymongo.ASCENDING), ("id", pymongo.ASCENDING)], unique=True) - client.ktqueue.jobs.update({'hide': {'$exists': False}}, {'$set': {'hide': False}}) - client.ktqueue.jobs.update({'fav': {'$exists': False}}, {'$set': {'fav': False}}) + client.ktqueue.jobs.update_many({'hide': {'$exists': False}}, {'$set': {'hide': False}}) + client.ktqueue.jobs.update_many({'fav': {'$exists': False}}, {'$set': {'fav': False}}) def get_app(): From 0cce9aa7ba84fb38ee0dc01896f50f9cd4f927d6 Mon Sep 17 00:00:00 2001 From: Comzyh Date: Fri, 3 Mar 2017 17:35:21 +0800 Subject: [PATCH 020/107] fix permission --- ktqueue/api/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ktqueue/api/job.py b/ktqueue/api/job.py index 34742ba..574eaff 100644 --- a/ktqueue/api/job.py +++ b/ktqueue/api/job.py @@ -329,7 +329,7 @@ async def post(self, job): self.finish({'message': 'Job {} successful deleted.'.format(job)}) -class TensorBoardHandler(tornado.web.RequestHandler): +class TensorBoardHandler(BaseHandler): def initialize(self, k8s_client, mongo_client): self.k8s_client = k8s_client From 87bdd83c4641342a83e6d9dc530bb6c2a8471437 Mon Sep 17 00:00:00 2001 From: Comzyh Date: Sat, 4 Mar 2017 10:46:18 +0800 Subject: [PATCH 021/107] log wrap-line & job-name validation --- frontend/index.html | 9 ++++++++- frontend/static/ktqueue.css | 3 +++ ktqueue/api/job.py | 11 +++++++++++ 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/frontend/index.html b/frontend/index.html index 9632acd..9c1408f 100644 --- a/frontend/index.html +++ b/frontend/index.html @@ -272,7 +272,13 @@

KTQueue

:key="version"> -
{{log_text}}
+ + +
{{log_text}}
@@ -526,6 +532,7 @@

KTQueue

log_text: '', selected_version: 'current', versions: [], + line_wrap: false } }, mounted:function() { diff --git a/frontend/static/ktqueue.css b/frontend/static/ktqueue.css index 441e995..ada1c76 100644 --- a/frontend/static/ktqueue.css +++ b/frontend/static/ktqueue.css @@ -51,3 +51,6 @@ body { display: inline-block; width: 32px; } +.ktq-log-wrap { + white-space: pre-wrap; +} diff --git a/ktqueue/api/job.py b/ktqueue/api/job.py index 574eaff..e413d48 100644 --- a/ktqueue/api/job.py +++ b/ktqueue/api/job.py @@ -1,6 +1,7 @@ # encoding: utf-8 import json import os +import re import logging import bson @@ -15,6 +16,8 @@ class JobsHandler(BaseHandler): + __job_name_pattern = re.compile(r'^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$') + def initialize(self, k8s_client, mongo_client): self.k8s_client = k8s_client self.mongo_client = mongo_client @@ -41,6 +44,11 @@ async def post(self): name = body_arguments.get('name') + if not self.__job_name_pattern.match(name): + self.set_status(400) + self.finish({"message": "illegal task name, regex used for validation is [a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*"}) + return + # job with same name is forbidden if self.jobs_collection.find_one({'name': name}): self.set_status(400) @@ -180,6 +188,7 @@ async def post(self): 'image': image, 'status': 'fetching', 'tensorboard': False, + 'hide': False, 'volumeMounts': body_arguments.get('volumeMounts', []), }}, upsert=True) self.finish(json.dumps({'message': 'job {} successful created.'.format(name)})) @@ -191,6 +200,8 @@ async def post(self): await cloner.clone_and_copy() if not commit_id: self.jobs_collection.update_one({'name': name}, {'$set': {'commit_id': cloner.commit_id}}) + else: + os.makedirs(os.path.join('/cephfs/ktqueue/jobs', name, 'code')) ret = await self.k8s_client.call_api( api='/apis/batch/v1/namespaces/{namespace}/jobs'.format(namespace=settings.job_namespace), From 57650bc1e32e7782cf2faf75720b1b1c19334279 Mon Sep 17 00:00:00 2001 From: Comzyh Date: Sun, 19 Mar 2017 10:55:50 +0800 Subject: [PATCH 022/107] update document --- deploy/NODE.md | 38 ++++++++++++++++++++++++++++++++++++++ deploy/README.md | 13 ++++++++----- 2 files changed, 46 insertions(+), 5 deletions(-) create mode 100644 deploy/NODE.md diff --git a/deploy/NODE.md b/deploy/NODE.md new file mode 100644 index 0000000..f767342 --- /dev/null +++ b/deploy/NODE.md @@ -0,0 +1,38 @@ +# How to setup a KTQueue node + +## Brief + +1. join the kubernetes cluster, and ensure `kubelet`'s GPU support is enabled. +2. mount the cephfs on `/mnt/cephfs` +3. install `nvidia-docker` and run it once + +## GPU support of kubelet + +Kubernetes (> v1.6.0-beta.1) is supporting multi-GPU now, please follow the newest guide to enable GPU support. + +## Nvidia-Docker + +[nvidia-docker](https://github.com/NVIDIA/nvidia-docker) homepage + +CUDA in container need NVIDIA drivers such as `libcuda.so.1` to work. And KTQueue assume that drivers located at `/var/lib/nvidia-docker/volumes/nvidia_driver/`. So you need to install nvidia-docker and use nvidia-docker to run a cuda image to ensure that the drivers are located in the right place. + +1. follow the installation instruction to install nvidia-docker +2. run the test nvidia-smi with nvidia-docker +3. make sure that there is a driver like `367.57` are located at `/var/lib/nvidia-docker/volumes/nvidia_driver/` + +## Trouble shoot + +- nvidia-smi print the right output, but there are no drivers found at `/var/lib/nvidia-docker/volumes/nvidia_driver/` + +try to update the nvidia-docker(>1.0.0), and check docker volumes + +> docker volume ls + +you may get a line like + +``` +DRIVER VOLUME NAME +nvidia-docker nvidia_driver_367.57 +``` + +try to remove that volume and run `nvidia-docker run --rm nvidia/cuda nvidia-smi` again. diff --git a/deploy/README.md b/deploy/README.md index c88e080..ec40d2d 100644 --- a/deploy/README.md +++ b/deploy/README.md @@ -8,6 +8,9 @@ 4. build ktqueue docker image 5. deploy ktqueue +To setup a KTQueue node, please read [How to setup a KTQueue node](./NODE.md) + + # File list - ktqueue.yaml @@ -15,11 +18,11 @@ - mongodb-dev.yaml - mongodb-production.yaml -# dependancy +# Dependancy ktqueue requires kubernetes and cephfs, make sure you already have them. -# prepare ceph +# Prepare ceph kubernetes and other components needs permission (aka, a secret) to access ceph. @@ -41,7 +44,7 @@ and import `ceph-secret.yaml` > kubectl create -f dep-ceph-secret.yaml -# deploy mongodb +# Deploy mongodb ## create rbd @@ -59,7 +62,7 @@ current linux kernal doesn't support all the features. if you get error, refer [ > rbd feature disable ktqueue-mongodb exclusive-lock, object-map, fast-diff, deep-flatten -## create mongodb services +## Create mongodb services > cp mongodb-production.yaml dep-mongodb-production.yaml @@ -71,7 +74,7 @@ create mongodb server > kubectl create -f dep-mongodb-production.yaml -# deploy ktqueue +# Deploy ktqueue ## mount cephfs ktqueue dameon needs to access ceph to clone code, store log, etc. and ktqueue job needs to access ceph to store output. From 39cb25ef47b182f886b6b57c713394618abf1e9f Mon Sep 17 00:00:00 2001 From: Comzyh Date: Sun, 19 Mar 2017 15:19:05 +0800 Subject: [PATCH 023/107] add status filter, to support Running etc. --- frontend/index.html | 4 ++++ ktqueue/api/job.py | 5 +++++ ktqueue/event_watcher.py | 2 ++ server.py | 1 + 4 files changed, 12 insertions(+) diff --git a/frontend/index.html b/frontend/index.html index 9c1408f..943b9a4 100644 --- a/frontend/index.html +++ b/frontend/index.html @@ -36,6 +36,7 @@

KTQueue

+ KTQueue params['hide'] = '1' } else if (this.jobsFilter === 'Fav') { params['fav'] = '1' + } else if (this.jobsFilter === 'Running') { + params['status'] = 'Running' } + this.$http.get('/jobs', {params: params}).then(function(resource){ for (var i = 0; i < resource.body.data.length; i++){ var line = resource.body.data[i]; diff --git a/ktqueue/api/job.py b/ktqueue/api/job.py index e413d48..b0dbd73 100644 --- a/ktqueue/api/job.py +++ b/ktqueue/api/job.py @@ -222,6 +222,7 @@ async def get(self): page_size = int(self.get_argument('page_size', 20)) hide = self.get_argument('hide', None) fav = self.get_argument('fav', None) + status = self.get_argument('status', None) tags = self.get_arguments('tag') query = {} @@ -240,6 +241,10 @@ async def get(self): if fav: query['fav'] = True if fav == '1' else False + # status; Running etc. + if status: + query['status'] = status + print(query) count = self.jobs_collection.count(query) jobs = list(self.jobs_collection.find(query).sort("_id", -1).skip(page_size * (page - 1)).limit(page_size)) diff --git a/ktqueue/event_watcher.py b/ktqueue/event_watcher.py index 1bf6f86..79fc515 100644 --- a/ktqueue/event_watcher.py +++ b/ktqueue/event_watcher.py @@ -83,6 +83,8 @@ async def callback(event): # update status if status == ('terminated', 'Completed'): job_update['status'] = 'Completed' + elif status == ('running', None): + job_update['status'] = 'Running' else: job_update['status'] = status_str diff --git a/server.py b/server.py index 58b031b..5fa6875 100644 --- a/server.py +++ b/server.py @@ -38,6 +38,7 @@ def create_db_index(): client = pymongo.MongoClient('ktqueue-mongodb') client.ktqueue.jobs.create_index([("name", pymongo.ASCENDING)], unique=True) client.ktqueue.jobs.create_index([("hide", pymongo.ASCENDING)]) + client.ktqueue.jobs.create_index([("status", pymongo.ASCENDING)]) client.ktqueue.credentials.create_index([("repo", pymongo.ASCENDING)], unique=True) client.ktqueue.oauth.create_index([("provider", pymongo.ASCENDING), ("id", pymongo.ASCENDING)], unique=True) client.ktqueue.jobs.update_many({'hide': {'$exists': False}}, {'$set': {'hide': False}}) From 7511af2250999309781aaf220130a3b649f018e9 Mon Sep 17 00:00:00 2001 From: Comzyh Date: Mon, 20 Mar 2017 10:44:41 +0800 Subject: [PATCH 024/107] Collapse some line --- deploy/NODE.md | 4 ++++ frontend/index.html | 22 ++++++---------------- frontend/static/ktqueue.css | 3 +++ ktqueue/api/node.py | 3 ++- ktqueue/event_watcher.py | 6 ++++++ 5 files changed, 21 insertions(+), 17 deletions(-) diff --git a/deploy/NODE.md b/deploy/NODE.md index f767342..b08c4c7 100644 --- a/deploy/NODE.md +++ b/deploy/NODE.md @@ -36,3 +36,7 @@ nvidia-docker nvidia_driver_367.57 ``` try to remove that volume and run `nvidia-docker run --rm nvidia/cuda nvidia-smi` again. + +- docker: Error response from daemon: create nvidia_driver_367.57: VolumeDriver.Create: internal error, check logs for details. + +> sudo chown nvidia-docker:nvidia-docker /var/lib/nvidia-docker/volumes/nvidia_driver diff --git a/frontend/index.html b/frontend/index.html index 943b9a4..e3c566e 100644 --- a/frontend/index.html +++ b/frontend/index.html @@ -58,6 +58,11 @@

KTQueue

{{ scope.row.commit_id }}
{{ scope.row.command }}
{{ scope.row.comments }}
+
Clone
+
+ Stop + Start +
KTQueue

- - - diff --git a/frontend/src/Jobs.vue b/frontend/src/Jobs.vue index 541a221..a2ffa70 100644 --- a/frontend/src/Jobs.vue +++ b/frontend/src/Jobs.vue @@ -32,8 +32,8 @@
{{ scope.row.commit_id }}
{{ scope.row.command }}
{{ scope.row.comments }}
-
Edit
-
Clone
+
Edit
+
Clone
Restart @@ -118,77 +118,24 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 删除 - - - Add volume - - - - 取 消 - 确 定 - - +
+ diff --git a/ktqueue/api/job.py b/ktqueue/api/job.py index bc8b399..60c3e25 100644 --- a/ktqueue/api/job.py +++ b/ktqueue/api/job.py @@ -9,9 +9,10 @@ from ktqueue.cloner import Cloner from .utils import convert_asyncio_task +from .utils import BaseHandler from ktqueue.utils import k8s_delete_job +from ktqueue.utils import KTQueueDefaultCredentialProvider from ktqueue import settings -from .utils import BaseHandler def generate_job(name, command, node, gpu_num, image, repo, branch, commit_id, comments, mounts, load_nvidia_driver=None): @@ -131,12 +132,12 @@ def generate_job(name, command, node, gpu_num, image, repo, branch, commit_id, c return job -async def clone_code(name, repo, branch, commit_id, jobs_collection, job_dir): +async def clone_code(name, repo, branch, commit_id, jobs_collection, job_dir, crediential): # clone code if repo: try: cloner = Cloner(repo=repo, dst_directory=os.path.join(job_dir, 'code'), - branch=branch, commit_id=commit_id) + branch=branch, commit_id=commit_id, crediential=crediential) await cloner.clone_and_copy() except Exception as e: jobs_collection.update_one({'name': name}, {'$set': {'status': 'FetchError'}}) @@ -232,7 +233,8 @@ async def post(self): # clone code await clone_code( name=name, repo=repo, branch=branch, commit_id=commit_id, - jobs_collection=self.jobs_collection, job_dir=job_dir) + jobs_collection=self.jobs_collection, job_dir=job_dir, + crediential=KTQueueDefaultCredentialProvider(repo=repo, user=user, mongo_client=self.mongo_client)) ret = await self.k8s_client.call_api( api='/apis/batch/v1/namespaces/{namespace}/jobs'.format(namespace=settings.job_namespace), @@ -402,7 +404,10 @@ async def post(self, job): if job['status'] == 'FetchError': await clone_code( name=job['name'], repo=job['repo'], branch=job['branch'], commit_id=job['commit_id'], - jobs_collection=self.jobs_collection, job_dir=job_dir) + jobs_collection=self.jobs_collection, job_dir=job_dir, + crediential=KTQueueDefaultCredentialProvider( + repo=job['repo'], user=self.get_current_user(), mongo_client=self.mongo_client + )) await self.k8s_client.call_api( api='/apis/batch/v1/namespaces/{namespace}/jobs'.format(namespace=settings.job_namespace), diff --git a/ktqueue/api/utils.py b/ktqueue/api/utils.py index 9ab8593..0abba25 100644 --- a/ktqueue/api/utils.py +++ b/ktqueue/api/utils.py @@ -2,6 +2,7 @@ import asyncio import functools import tornado.web + import ktqueue.settings diff --git a/ktqueue/cloner.py b/ktqueue/cloner.py index a2b3a1f..2b7874e 100644 --- a/ktqueue/cloner.py +++ b/ktqueue/cloner.py @@ -7,35 +7,60 @@ import logging import urllib.parse -import pymongo + +class GitCredentialProvider: + __https_pattern = re.compile(r'https:\/\/(\w+@\w+)?[\w.\/\-+]*.git') + __ssh_pattern = re.compile(r'\w+@[\w.]+:[\w-]+\/[\w\-+]+\.git') + + @classmethod + def get_repo_type(cls, repo): + if cls.__ssh_pattern.match(repo): + return 'ssh' + elif cls.__https_pattern.match(repo): + return'https' + return None + + def __init__(self, ssh_key=None, https_username=None, https_password=None): + pass + + @property + def ssh_key(self): + raise NotImplementedError + + @property + def https_username(self): + raise NotImplementedError + + @property + def https_password(self): + raise NotImplementedError class Cloner: """Do the git clone stuff in another thread""" - __https_pattern = re.compile(r'https:\/\/(\w+@\w+)?[\w.\/\-+]*.git') - __ssh_pattern = re.compile(r'\w+@[\w.]+:[\w-]+\/[\w\-+]+\.git') __ref_pattern = re.compile(r'(?P\w+)\s(?P[\w/\-]+)') - def __init__(self, repo, dst_directory, branch=None, commit_id=None): + def __init__(self, repo, dst_directory, branch=None, commit_id=None, + crediential=None): + """Init + crediential: a credientialProvider instance + """ self.repo = repo.strip() self.dst_directory = dst_directory self.branch = branch or 'master' self.commit_id = commit_id + self.crediential = crediential - self.mongo_client = pymongo.MongoClient('ktqueue-mongodb') self.ssh_key_path = None self.repo_path = None self.repo_url = None + self.repo_type = GitCredentialProvider.get_repo_type(self.repo) - if self.__ssh_pattern.match(repo): - self.repo_type = 'ssh' - elif self.__https_pattern.match(repo): - self.repo_type = 'https' - else: + if not self.repo_type: raise Exception('wrong repo type') - if self.repo_type == 'ssh' and self.mongo_client.ktqueue.credentials.find_one({'repo': self.repo}) is None: + if self.repo_type == 'ssh' and self.crediential.ssh_key is None: raise Exception('ssh credential for {repo} must be provided.'.format(repo=repo)) self.repo_hash = hashlib.sha1(self.repo.encode('utf-8')).hexdigest() @@ -47,8 +72,7 @@ async def prepare_ssh_key(self, repo): self.ssh_key_path = os.path.join(ssh_key_dir, 'id') if not os.path.exists(self.ssh_key_path): with open(self.ssh_key_path, 'w') as f: - credential = self.mongo_client.ktqueue.credentials.find_one({'repo': self.repo}) - f.write(credential['ssh_key']) + f.write(self.crediential.ssh_key) os.chmod(self.ssh_key_path, 0o600) # prevent WARNING: UNPROTECTED PRIVATE KEY FILE! await asyncio.sleep(1.0) # os.chmod may have strange behavior, that ssh still permission 644 after too short time @@ -57,31 +81,37 @@ async def git_with_ssh_key(cls, ssh_key_path, args, cwd=None): env = {**os.environ, 'GIT_SSH_COMMAND': 'ssh -oStrictHostKeyChecking=no -i {ssh_key_path}'.format(ssh_key_path=ssh_key_path) } - proc = await asyncio.create_subprocess_exec(*['git'] + args, stdout=asyncio.subprocess.PIPE, env=env, cwd=cwd) + proc = await asyncio.create_subprocess_exec( + *['git'] + args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, env=env, cwd=cwd) lines = [] async for line in proc.stdout: logging.debug(line) - lines.append(line) + lines.append(line.decode()) recode = await proc.wait() return recode, lines @classmethod def add_credential_to_https_url(cls, url, username, password): parsed = urllib.parse.urlparse(url) - if username is not None and password is not None: + if username is not None: host_and_port = parsed.hostname if parsed.port: host_and_port += ':' + parsed.port - parsed._replace(netloc='{}:{}@{}'.format(username, password, host_and_port)) + if password: + parsed = parsed._replace(netloc='{}:{}@{}'.format(username, password, host_and_port)) + else: + parsed = parsed._replace(netloc='{}@{}'.format(username, host_and_port)) return parsed.geturl() @classmethod async def git_with_https(cls, args, cwd=None): - proc = await asyncio.create_subprocess_exec(*['git'] + args, stdout=asyncio.subprocess.PIPE, cwd=cwd) + proc = await asyncio.create_subprocess_exec( + *['git'] + args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, cwd=cwd) lines = [] async for line in proc.stdout: logging.debug(line) - lines.append(line) + lines.append(line.decode()) + recode = await proc.wait() return recode, lines @@ -109,6 +139,7 @@ async def clone(self): args=['clone', self.repo_url, '--recursive', self.repo_hash], ) if retcode != 0: + logging.error('\n'.join(retlines)) raise Exception('clone repo failed with retcode {}.'.format(retcode)) async def fetch(self): @@ -121,9 +152,10 @@ async def fetch(self): else: retcode, retlines = await self.git_with_https( cwd=self.repo_path, - args=['fetch'], + args=['fetch', self.repo_url, '+refs/heads/*:refs/remotes/origin/*'], ) if retcode != 0: + logging.error('\n'.join(retlines)) logging.error('fetch repo failed with retcode {}.'.format(retcode)) async def clone_and_copy(self, archive_file=None, keep_archive=False): @@ -134,11 +166,10 @@ async def clone_and_copy(self, archive_file=None, keep_archive=False): if self.repo_type == 'ssh': await self.prepare_ssh_key(self.repo) elif self.repo_type == 'https': - credential = self.mongo_client.ktqueue.credentials.find_one({'repo': self.repo}) self.repo_url = self.repo - if credential: + if self.crediential.https_username: self.repo_url = self.add_credential_to_https_url( - self.repo, username=credential['username'], password=credential['password']) + self.repo, username=self.crediential.https_username, password=self.crediential.https_password) if not os.path.exists(self.repo_path): # Then clone it await self.clone() diff --git a/ktqueue/utils.py b/ktqueue/utils.py index 4a41b20..f9bb09d 100644 --- a/ktqueue/utils.py +++ b/ktqueue/utils.py @@ -2,7 +2,9 @@ import os import re import logging + from ktqueue import settings +from .cloner import GitCredentialProvider def get_log_versions(job_name): @@ -64,3 +66,64 @@ async def k8s_delete_job(k8s_client, job, pod_name=None, save_log=True): method='DELETE', api='/api/v1/namespaces/{namespace}/pods/{name}'.format(namespace=settings.job_namespace, name=pod_name) ) + + +class KTQueueDefaultCredentialProvider(GitCredentialProvider): + """Give the authorization method for a (user, repo) combination + + default is use Github Oauth2, + if repo authorization type is provided, use the spcific metho + """ + allowed_method = ['none', 'github_oauth', 'ssh_key', 'https_password'] + + def __init__(self, repo, user, mongo_client): + self.repo = repo + self.user = user + self.mongo_client = mongo_client + + self.repos_collection = self.mongo_client.ktqueue.repos + self.oauth_collection = self.mongo_client.ktqueue.oauth + + self._auth_type = 'none' + if settings.auth_required: + self._auth_type = 'github_oauth' + self._ssh_key = None + self._https_username = None + self._https_password = None + self.repo_type = None + + def prepare_credential(self): + self.repo_type = self.get_repo_type(self.repo) + repo = self.repos_collection.find_one({'repo': self.repo}) + if repo: + self._auth_type = repo['authType'] + + if self._auth_type == 'none': # username = password = None + pass + elif self._auth_type == 'github_oauth': + crediential = self.oauth_collection.find_one({'provider': 'github', 'id': self.user}) + if crediential: + self._https_username = crediential['access_token'] + elif self._auth_type == 'ssh_key': + self._ssh_key = repo['crediential']['sshKey'] + elif self._auth_type == 'https_password': + self._https_username = repo['crediential']['username'] + self._https_password = repo['crediential']['password'] + + @property + def ssh_key(self): + if not self._ssh_key: + self.prepare_credential() + return self._ssh_key + + @property + def https_username(self): + if not self._https_username: + self.prepare_credential() + return self._https_username + + @property + def https_password(self): + if not self._https_password: + self.prepare_credential() + return self._https_password From 9e039f38ecaa3d54953f86d80444e106a4d404cf Mon Sep 17 00:00:00 2001 From: Comzyh Date: Mon, 8 May 2017 18:31:45 +0800 Subject: [PATCH 063/107] log long polling --- ktqueue/api/job.py | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/ktqueue/api/job.py b/ktqueue/api/job.py index 60c3e25..98066b4 100644 --- a/ktqueue/api/job.py +++ b/ktqueue/api/job.py @@ -340,6 +340,7 @@ def initialize(self, k8s_client, mongo_client): self.k8s_client = k8s_client self.mongo_client = mongo_client self.jobs_collection = mongo_client.ktqueue.jobs + self.closed = False @convert_asyncio_task async def get(self, job, version=None): @@ -352,17 +353,32 @@ async def get(self, job, version=None): api='/api/v1/namespaces/{namespace}/pods'.format(namespace=settings.job_namespace), params={'labelSelector': 'job-name={job}'.format(job=job)} ) + if len(pods['items']): + params = {} + follow = self.get_argument('follow', None) == 'true' + if follow: + params['follow'] = 'true' + params['tailLines'] = self.get_argument('tailLines', '10') pod_name = pods['items'][0]['metadata']['name'] resp = await self.k8s_client.call_api_raw( method='GET', - api='/api/v1/namespaces/{namespace}/pods/{pod_name}/log'.format(namespace=settings.job_namespace, pod_name=pod_name) + api='/api/v1/namespaces/{namespace}/pods/{pod_name}/log'.format(namespace=settings.job_namespace, pod_name=pod_name), + params=params, ) if resp.status == 200: - async for chunk in resp.content.iter_any(): - self.write(chunk) - resp.close() - return + try: + async for chunk in resp.content.iter_any(): + if self.closed: + break + self.write(chunk) + if follow: + self.flush() + finally: + resp.close() + + def on_connection_close(self): + self.closed = True class StopJobHandler(BaseHandler): From 02604481d878c047fbc99334f00017ffcbc147a2 Mon Sep 17 00:00:00 2001 From: Comzyh Date: Tue, 9 May 2017 15:00:45 +0800 Subject: [PATCH 064/107] proper connection timeout --- ktqueue/api/job.py | 4 +++- ktqueue/event_watcher.py | 3 ++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/ktqueue/api/job.py b/ktqueue/api/job.py index 98066b4..ca8cef1 100644 --- a/ktqueue/api/job.py +++ b/ktqueue/api/job.py @@ -357,14 +357,16 @@ async def get(self, job, version=None): if len(pods['items']): params = {} follow = self.get_argument('follow', None) == 'true' + timeout = 60 if follow: params['follow'] = 'true' params['tailLines'] = self.get_argument('tailLines', '10') + timeout = 0 # disable timeout checks pod_name = pods['items'][0]['metadata']['name'] resp = await self.k8s_client.call_api_raw( method='GET', api='/api/v1/namespaces/{namespace}/pods/{pod_name}/log'.format(namespace=settings.job_namespace, pod_name=pod_name), - params=params, + params=params, timeout=timeout ) if resp.status == 200: try: diff --git a/ktqueue/event_watcher.py b/ktqueue/event_watcher.py index 4156ae4..ab65cb1 100644 --- a/ktqueue/event_watcher.py +++ b/ktqueue/event_watcher.py @@ -138,5 +138,6 @@ async def callback(event): api='/api/v1/watch/namespaces/{namespace}/pods'.format(namespace=settings.job_namespace), method='GET', callback=callback, - params={'labelSelector': 'ktqueue-watching!=false'} + params={'labelSelector': 'ktqueue-watching!=false'}, + timeout=0, ) From ffa233335fa8cee31d00b7193828f218efb3ddef Mon Sep 17 00:00:00 2001 From: Comzyh Date: Tue, 9 May 2017 17:09:20 +0800 Subject: [PATCH 065/107] websocket API --- ktqueue/api/__init__.py | 1 + ktqueue/api/job.py | 80 ++++++++++++++++++++++++++++++----------- server.py | 3 ++ 3 files changed, 64 insertions(+), 20 deletions(-) diff --git a/ktqueue/api/__init__.py b/ktqueue/api/__init__.py index 34a8792..a8fa7c7 100644 --- a/ktqueue/api/__init__.py +++ b/ktqueue/api/__init__.py @@ -1,6 +1,7 @@ # encoding: utf-8 from .job import JobsHandler from .job import JobLogHandler +from .job import JobLogWSHandler from .job import StopJobHandler from .job import RestartJobHandler from .job import TensorBoardHandler diff --git a/ktqueue/api/job.py b/ktqueue/api/job.py index ca8cef1..f8c89e1 100644 --- a/ktqueue/api/job.py +++ b/ktqueue/api/job.py @@ -6,6 +6,7 @@ import bson import tornado.web +import tornado.websocket from ktqueue.cloner import Cloner from .utils import convert_asyncio_task @@ -341,26 +342,22 @@ def initialize(self, k8s_client, mongo_client): self.mongo_client = mongo_client self.jobs_collection = mongo_client.ktqueue.jobs self.closed = False + self.follow = False - @convert_asyncio_task - async def get(self, job, version=None): - if version and version != 'current': - with open(os.path.join('/cephfs/ktqueue/logs', job, 'log.{version}.txt'.format(version=version)), 'r') as f: - self.finish(f.read()) - return + async def get_log_stream(self, job, version): pods = await self.k8s_client.call_api( method='GET', api='/api/v1/namespaces/{namespace}/pods'.format(namespace=settings.job_namespace), params={'labelSelector': 'job-name={job}'.format(job=job)} ) - if len(pods['items']): params = {} - follow = self.get_argument('follow', None) == 'true' timeout = 60 - if follow: + if self.follow: params['follow'] = 'true' - params['tailLines'] = self.get_argument('tailLines', '10') + tailLines = self.get_argument('tailLines', None) + if tailLines: + params['tailLines'] = tailLines timeout = 0 # disable timeout checks pod_name = pods['items'][0]['metadata']['name'] resp = await self.k8s_client.call_api_raw( @@ -368,21 +365,64 @@ async def get(self, job, version=None): api='/api/v1/namespaces/{namespace}/pods/{pod_name}/log'.format(namespace=settings.job_namespace, pod_name=pod_name), params=params, timeout=timeout ) - if resp.status == 200: - try: - async for chunk in resp.content.iter_any(): - if self.closed: - break - self.write(chunk) - if follow: - self.flush() - finally: - resp.close() + return resp + return None + + @convert_asyncio_task + async def get(self, job, version=None): + if version and version != 'current': + with open(os.path.join('/cephfs/ktqueue/logs', job, 'log.{version}.txt'.format(version=version)), 'r') as f: + self.finish(f.read()) + return + self.follow = self.get_argument('follow', None) == 'true' + resp = await self.get_log_stream(job, version) + if resp and resp.status == 200: + try: + async for chunk in resp.content.iter_any(): + if self.closed: + break + self.write(chunk) + if self.follow: + self.flush() + finally: + resp.close() def on_connection_close(self): + print('on_connection_close') self.closed = True +class JobLogWSHandler(tornado.websocket.WebSocketHandler, JobLogHandler): + + def initialize(self, *args, **kwargs): + JobLogHandler.initialize(self, *args, **kwargs) + print('initialize') + + def check_origin(self, origin): + return True + + @convert_asyncio_task + async def open(self, job): + self.follow = True + resp = await self.get_log_stream(job, 'current') + if resp and resp.status == 200: + try: + async for chunk in resp.content.iter_any(): + if self.closed: + break + self.write_message(chunk) + except Exception as e: + raise + finally: + resp.close() + + def on_close(self): + self.closed = True + + def on_message(self): + pass + + class StopJobHandler(BaseHandler): def initialize(self, k8s_client, mongo_client): diff --git a/server.py b/server.py index bb87ac8..664e799 100644 --- a/server.py +++ b/server.py @@ -16,6 +16,7 @@ from ktqueue.kubernetes_client import kubernetes_client from ktqueue.api import JobsHandler from ktqueue.api import JobLogHandler +from ktqueue.api import JobLogWSHandler from ktqueue.api import JobLogVersionHandler from ktqueue.api import ReposHandler from ktqueue.api import RepoHandler @@ -79,6 +80,8 @@ def get_app(): (r'/api/repos', ReposHandler, {'mongo_client': mongo_client}), (r'/api/repos/(?P[0-9a-f]+)', RepoHandler, {'mongo_client': mongo_client}), (r'/api/current_user', CurrentUserHandler), + (r'/wsapi/jobs/(?P[\w_-]+)/log', JobLogWSHandler, {'k8s_client': k8s_client, 'mongo_client': mongo_client}), + ], **app_kwargs) From d538403f63958cc01c6b1fc8ab69b43f41e6567f Mon Sep 17 00:00:00 2001 From: Comzyh Date: Tue, 9 May 2017 18:58:00 +0800 Subject: [PATCH 066/107] auto refresh log --- frontend/src/JobLog.vue | 82 ++++++++++++++++++++++++++++++++++++-- frontend/webpack.config.js | 4 ++ requirements.txt | 2 +- 3 files changed, 83 insertions(+), 5 deletions(-) diff --git a/frontend/src/JobLog.vue b/frontend/src/JobLog.vue index a697978..8458ee8 100644 --- a/frontend/src/JobLog.vue +++ b/frontend/src/JobLog.vue @@ -14,10 +14,20 @@ on-text="Wrap" off-text="No wrap">
- + + Download -
{{logText}}
+
{{logText}}{{newLogText}}
diff --git a/frontend/src/Repos.vue b/frontend/src/Repos.vue index 8bb5c2a..0d4d921 100644 --- a/frontend/src/Repos.vue +++ b/frontend/src/Repos.vue @@ -1,7 +1,7 @@