Skip to content

feat: add k8s runtime ttl #139

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Dec 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions api/src/resolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import {
spawnRuntime as spawnRuntime_k8s,
killRuntime as killRuntime_k8s,
infoRuntime as infoRuntime_k8s,
loopKillInactiveRoutes as loopKillInactiveRoutes_k8s,
initRoutes as initRoutes_k8s,
} from "./spawner-k8s";

export const resolvers = {
Expand Down Expand Up @@ -83,4 +85,7 @@ export const resolvers = {
if (process.env.RUNTIME_SPAWNER !== "k8s") {
initRoutes_docker();
loopKillInactiveRoutes_docker();
} else {
initRoutes_k8s();
loopKillInactiveRoutes_k8s();
}
18 changes: 11 additions & 7 deletions api/src/spawner-docker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,14 @@ export async function infoRuntime(_, { sessionId }) {
startedAt: startedAt ? new Date(startedAt).getTime() : null,
};
}
// debug: 3 min: 1000 * 60 * 3;
// prod: 12 hours: 1000 * 60 * 60 * 12;
let kernel_ttl: number = process.env.KERNEL_TTL
? parseInt(process.env.KERNEL_TTL)
: 1000 * 60 * 60 * 12;
let loop_interval = process.env.LOOP_INTERVAL
? parseInt(process.env.LOOP_INTERVAL)
: 1000 * 60 * 1;

async function killInactiveRoutes() {
let { data } = await apollo_client.query({
Expand All @@ -320,14 +328,10 @@ async function killInactiveRoutes() {
const now = new Date();
let inactiveRoutes = data.getUrls
.filter(({ url, lastActive }) => {
if (!lastActive) return false;
if (!lastActive) return true;
let d2 = new Date(parseInt(lastActive));
// Prod: 12 hours TTL
let ttl = 1000 * 60 * 60 * 12;
// DEBUG: 1 minute TTL
// let ttl = 1000 * 60 * 3;
let activeTime = now.getTime() - d2.getTime();
return activeTime > ttl;
return activeTime > kernel_ttl;
})
.map(({ url }) => url);
console.log("Inactive routes", inactiveRoutes);
Expand Down Expand Up @@ -356,7 +360,7 @@ async function killInactiveRoutes() {
export function loopKillInactiveRoutes() {
setInterval(async () => {
await killInactiveRoutes();
}, 1000 * 60 * 1);
}, loop_interval);
}

/**
Expand Down
171 changes: 137 additions & 34 deletions api/src/spawner-k8s.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,40 +118,20 @@ function getServiceSpec(name) {
};
}

export async function spawnRuntime(_, { sessionId }) {
let url = `/${sessionId}`;
sessionId = sessionId.replaceAll("_", "-").toLowerCase();
// sessionId = "runtime-k8s-user-UGY6YAk7TM-repo-34prxrgkKG-kernel";
// sessionId = "k8s-user-UGY6YAk7TM";
let k8s_name = `k8s-${sessionId}`;
console.log("spawnRuntime", url, k8s_name);
// check if exist?
// 1. create a jupyter kernel pod
// 2. create a ws pod
console.log("Creating namespaced pod ..");

let ns =
process.env.RUNTIME_NS ||
fs
.readFileSync("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
.toString();
console.log("Using k8s ns:", ns);
async function createDeployment(ns, deploy_spec) {
try {
// TODO if exists, skip
// await k8sApi.createNamespacedPod(ns, getPodSpec(k8s_name));
await k8sAppsApi.createNamespacedDeployment(
ns,
getDeploymentSpec(k8s_name)
);
await k8sAppsApi.createNamespacedDeployment(ns, deploy_spec);
// FIXME would this also do creation?
} catch (e: any) {
if (e.body.reason === "AlreadyExists") {
console.log("Already exists, patching ..");
try {
await k8sAppsApi.patchNamespacedDeployment(
getDeploymentSpec(k8s_name).metadata.name,
deploy_spec.metadata.name,
ns,
getDeploymentSpec(k8s_name),
deploy_spec,
undefined,
undefined,
undefined,
Expand All @@ -172,19 +152,21 @@ export async function spawnRuntime(_, { sessionId }) {
return false;
}
}
console.log("Creating service ..");
}

async function createService(ns: string, service_spec) {
try {
await k8sApi.createNamespacedService(ns, getServiceSpec(k8s_name));
await k8sApi.createNamespacedService(ns, service_spec);

// The DNS name of the service is
} catch (e: any) {
if (e.body.reason === "AlreadyExists") {
console.log("Already exists, patching ..");
try {
await k8sApi.patchNamespacedService(
getServiceSpec(k8s_name).metadata.name,
service_spec.metadata.name,
ns,
getServiceSpec(k8s_name),
service_spec,
undefined,
undefined,
undefined,
Expand All @@ -210,9 +192,10 @@ export async function spawnRuntime(_, { sessionId }) {
return false;
}
}
let dnsname = `${
getServiceSpec(k8s_name).metadata.name
}.${ns}.svc.cluster.local`;
}

async function addRoute(ns: string, service_spec, url: string) {
let dnsname = `${service_spec.metadata.name}.${ns}.svc.cluster.local`;
console.log("Created, dns:", dnsname);
// let ws_host = `${k8s_name}-deployment`;
let ws_host = dnsname;
Expand Down Expand Up @@ -241,6 +224,26 @@ export async function spawnRuntime(_, { sessionId }) {
},
],
});
}

export async function spawnRuntime(_, { sessionId }) {
let url = `/${sessionId}`;
sessionId = sessionId.replaceAll("_", "-").toLowerCase();
let k8s_name = `k8s-${sessionId}`;
console.log("spawnRuntime", url, k8s_name);
console.log("Creating namespaced pod ..");
let ns =
process.env.RUNTIME_NS ||
fs
.readFileSync("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
.toString();
console.log("Using k8s ns:", ns);
let deploy_spec = getDeploymentSpec(k8s_name);
let service_spec = getServiceSpec(k8s_name);
await createDeployment(ns, deploy_spec);
console.log("Creating service ..");
await createService(ns, service_spec);
await addRoute(ns, service_spec, url);
return true;
}

Expand Down Expand Up @@ -292,9 +295,109 @@ export async function killRuntime(_, { sessionId }) {
* @returns {startedAt} the time when the runtime is started.
*/
export async function infoRuntime(_, { sessionId }) {
// TODO implement
throw new Error("Not implemented");
sessionId = sessionId.replaceAll("_", "-").toLowerCase();
let k8s_name = `k8s-${sessionId}`;
let ns =
process.env.RUNTIME_NS ||
fs
.readFileSync("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
.toString();
let deploy_spec = getDeploymentSpec(k8s_name);
// read the startTime from the deployment status
let status = await k8sAppsApi.readNamespacedDeployment(
deploy_spec.metadata.name,
ns
);
let startedAt = status.body.metadata?.creationTimestamp;
return {
startedAt: null,
startedAt: startedAt ? new Date(startedAt).getTime() : null,
};
}

// debug: 3 min: 1000 * 60 * 3;
// prod: 12 hours: 1000 * 60 * 60 * 12;
let kernel_ttl: number = process.env.KERNEL_TTL
? parseInt(process.env.KERNEL_TTL)
: 1000 * 60 * 60 * 12;
let loop_interval = process.env.LOOP_INTERVAL
? parseInt(process.env.LOOP_INTERVAL)
: 1000 * 60 * 1;

async function killInactiveRoutes() {
let { data } = await apollo_client.query({
query: gql`
query GetUrls {
getUrls {
url
lastActive
}
}
`,
fetchPolicy: "network-only",
});
const now = new Date();
let inactiveRoutes = data.getUrls
.filter(({ url, lastActive }) => {
// If the lastActive is null, it means the route is not active.
if (!lastActive) return true;
let d2 = new Date(parseInt(lastActive));
let activeTime = now.getTime() - d2.getTime();
return activeTime > kernel_ttl;
})
.map(({ url }) => url);
console.log("Inactive routes", inactiveRoutes);
for (let url of inactiveRoutes) {
let sessionId = url.substring(1);
await killRuntime(null, { sessionId });
}
}

/**
* Periodically kill inactive routes every minute.
*/
export function loopKillInactiveRoutes() {
setInterval(async () => {
await killInactiveRoutes();
}, loop_interval);
}

function deploymentName2sessionId(deploymentName: string) {
// NOTE: this is sessionId.replaceAll("_", "-").toLowerCase()
return deploymentName.match(/runtime-k8s-(.*)-deployment/)![1];
}

async function scanRunningSessions(): Promise<string[]> {
let ns =
process.env.RUNTIME_NS ||
fs
.readFileSync("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
.toString();
let deployments = await k8sAppsApi.listNamespacedDeployment(ns);
let sessionIds = deployments.body.items
.map((d) => d.metadata!.name!)
.filter((x) => x)
.map((name) => deploymentName2sessionId(name));

return new Promise((resolve, reject) => {
resolve(sessionIds || []);
});
}

/**
* At startup, check all active containers and add them to the table.
*/
export async function initRoutes() {
let ns =
process.env.RUNTIME_NS ||
fs
.readFileSync("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
.toString();
let sessionIds = await scanRunningSessions();
console.log("initRoutes sessionIds", sessionIds);
for (let id of sessionIds) {
let url = `/${id}`;
let k8s_name = `k8s-${id}`;
let service_spec = getServiceSpec(k8s_name);
await addRoute(ns, service_spec, url);
}
}
8 changes: 8 additions & 0 deletions compose/dev/compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ services:
PROXY_API_URL: "http://proxy:4011/graphql"
ZMQ_KERNEL_IMAGE: "lihebi/codepod-kernel-python:latest"
WS_RUNTIME_IMAGE: "lihebi/codepod-runtime:latest"
# 1000 * 60 * 3: 3 minutes
# KERNEL_TTL: "180000"
# 1000 * 60 * 60 * 12: 12 hours
KERNEL_TTL: "43200000"
# 1000 * 5: 5 seconds
# LOOP_INTERVAL: "5000"
# 1000 * 60 * 1: 1 minute
LOOP_INTERVAL: "60000"

ui:
image: node:18
Expand Down
10 changes: 10 additions & 0 deletions k8s/helm-chart/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,16 @@ spec:
value: k8s
- name: RUNTIME_NS
value: {{ .Release.Namespace }}-runtime
- name: KERNEL_TTL
# 1000 * 60 * 60 * 12: 12 hours
# value: "43200000"
# 1000 * 60 * 3: 3 minutes
value: "180000"
- name: LOOP_INTERVAL
# 1000 * 60 * 1: 1 minute
# value: "60000"
# 1000 * 5: 5 seconds
value: "5000"
resources:
limits:
memory: 512Mi
Expand Down
13 changes: 11 additions & 2 deletions proxy/src/node-proxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,16 @@ function startProxyServer() {
return;
}
console.log("proxy ws req", req.url);
// FIXME why there're two leading slashes? "//user_xxx_repo_xxx"
if (req.url.startsWith("//")) {
// FIXME why there're two leading slashes? "//user_xxx_repo_xxx"
// UPDATE: for docker runtime, there's double slashes
// console.log("active docker connection", req.url.substring(1));
activeTable[req.url.substring(1)] = new Date();
} else {
// For k8s runtime, there's only one slash
// console.log("active k8s connection", req.url);
activeTable[req.url] = new Date();
}
activeTable[req.url.substring(1)] = new Date();
let match = await getRouteTarget(req);
if (!match) {
Expand Down Expand Up @@ -342,7 +351,7 @@ function startWebServer() {
//
const WEB_PORT = process.env.WEB_PORT || 4012;
// _routes.add("/", { target: "http://127.0.0.1:9000" });
_routes.add("/test", `http://localhost:${WEB_PORT}`);
// _routes.add("/test", `http://localhost:${WEB_PORT}`);
// Now http://localhost:4010/test should redirect to localhost:4012 and show some response.
console.log(`Demo web server listenning on http://localhost:${WEB_PORT}`);
http
Expand Down