Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/actions/libextism/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ runs:
- uses: ./.extism-cli/.github/actions/extism-cli
- name: Install
shell: bash
run: sudo extism lib install --version git
run: sudo extism lib install --version git --github-token "$GITHUB_TOKEN"
env:
GITHUB_TOKEN: ${{ github.token }}
4 changes: 2 additions & 2 deletions examples/dune
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
(executables
(names runner kv typed)
(names runner kv typed pool)
(libraries extism))

(alias
(name examples)
(deps runner.exe kv.exe typed.exe))
(deps runner.exe kv.exe typed.exe pool.exe))

(alias
(name runtest)
Expand Down
32 changes: 32 additions & 0 deletions examples/pool.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
open Extism

let url =
"https://github.com/extism/plugins/releases/latest/download/count_vowels.wasm"

module Typed_example = struct
include Plugin.Typed.Init ()

let count_vowels = exn @@ fn "count_vowels" Type.string Type.json
end

let run_thread pool ~sleep ~key ~input =
Thread.create
(fun () ->
Thread.delay sleep;
let plugin = Pool.get pool key in
Pool.Instance.use plugin @@ fun p ->
let plugin = Typed_example.of_plugin_exn p in
let res = Typed_example.count_vowels plugin input in
print_endline (Yojson.Safe.to_string res))
()

let () =
let wasm = Manifest.Wasm.url url in
let manifest = Manifest.create [ wasm ] in
let pool = Pool.create 3 in
Pool.add pool "a" (fun () -> Plugin.of_manifest_exn manifest);
let threads =
List.init 10 (fun i ->
run_thread pool ~sleep:(float_of_int i *. 0.25) ~key:"a" ~input:"a")
in
List.iter Thread.join threads
139 changes: 139 additions & 0 deletions src/extism.ml
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,142 @@ let%test _ =
Gc.minor ();
Gc.full_major ();
!count > 0

module Pool = struct
exception Timeout

type queue = {
mutable count : int Atomic.t;
q : Plugin.t Queue.t;
lock : Mutex.t;
}

type 'a t = {
lock : Mutex.t;
max_instances : int;
plugins : ('a, unit -> Plugin.t) Hashtbl.t;
instances : ('a, queue) Hashtbl.t;
}

module Instance = struct
type t = { mutable plugin : Plugin.t option; q : queue }

let plugin { plugin; _ } =
match plugin with
| Some p -> p
| None -> invalid_arg "plugin instance has already been freed"

let free x =
match x.plugin with
| Some plugin ->
x.plugin <- None;
ignore (Plugin.reset plugin);
Mutex.protect x.q.lock @@ fun () -> Queue.push plugin x.q.q
| None -> ()

let use t f =
Fun.protect ~finally:(fun () -> free t) (fun () -> f (plugin t))
end

let create n =
{
lock = Mutex.create ();
max_instances = n;
plugins = Hashtbl.create 8;
instances = Hashtbl.create 8;
}

let add t name f =
Mutex.protect t.lock @@ fun () ->
Hashtbl.replace t.plugins name f;
Hashtbl.replace t.instances name
@@ { q = Queue.create (); count = Atomic.make 0; lock = Mutex.create () }

let count t name =
let instance = Hashtbl.find t.instances name in
Atomic.get instance.count

let get_opt t name =
let q = Hashtbl.find t.instances name in
Mutex.protect q.lock @@ fun () ->
let mk plugin =
let x = { Instance.plugin = Some plugin; q } in
Gc.finalise
(fun x ->
match x.Instance.plugin with
| Some plugin ->
x.plugin <- None;
Mutex.protect q.lock @@ fun () -> Queue.push plugin q.q
| None -> ())
x;
x
in
match Queue.take_opt q.q with
| Some x -> Some (mk x)
| None ->
let f = Hashtbl.find t.plugins name in
if Atomic.get q.count < t.max_instances then
let () = Atomic.incr q.count in
Some (mk @@ f ())
else None

let get ?timeout t name =
let rec aux start =
match get_opt t name with
| Some x -> x
| None when Option.is_some timeout ->
let curr = Unix.gettimeofday () in
if curr -. start >= Option.get timeout then raise Timeout
else aux start
| None -> aux start
in
aux @@ Unix.gettimeofday ()

let%test "pool timeout" =
let pool = create 3 in
add pool "test" (fun () ->
let manifest = Manifest.(create [ Wasm.file "test/code.wasm" ]) in
Plugin.of_manifest manifest |> Error.unwrap);
let _a = get pool "test" in
let _b = get pool "test" in
let _c = get pool "test" in
try
let _ = get ~timeout:1.0 pool "test" in
false
with Timeout -> count pool "test" = 3

let%test "pool timeout" =
let pool = create 2 in
add pool "test" (fun () ->
let manifest = Manifest.(create [ Wasm.file "test/code.wasm" ]) in
Plugin.of_manifest manifest |> Error.unwrap);
let a = get pool "test" in
let () = Instance.free a in
let b = get pool "test" in
let () = Instance.free b in
let _c = get pool "test" in
try
let _ = get ~timeout:0.0 pool "test" in
count pool "test" = 2
with Timeout -> false

let%test "pool threads" =
let pool = create 2 in
add pool "test" (fun () ->
let manifest = Manifest.(create [ Wasm.file "test/code.wasm" ]) in
Plugin.of_manifest manifest |> Error.unwrap);
let total = Atomic.make 0 in
let run n =
Thread.create
(fun () ->
Thread.delay n;
let a = get pool "test" in
Instance.use a @@ fun plugin ->
ignore (Plugin.call_string_exn plugin ~name:"count_vowels" "input");
Atomic.incr total)
()
in
let all = [ run 1.0; run 1.0; run 0.5; run 0.5; run 0.0 ] in
let () = List.iter Thread.join all in
Atomic.get total = List.length all && count pool "test" <= 2
end
44 changes: 44 additions & 0 deletions src/extism.mli
Original file line number Diff line number Diff line change
Expand Up @@ -515,3 +515,47 @@ val with_plugin : (Plugin.t -> 'a) -> Plugin.t -> 'a

val extism_version : unit -> string
(** Returns the libextism version, not the version of the OCaml library *)

(** The [Pool] module defines an interface for pooling a fixed number of
multiple plugins *)
module Pool : sig
type 'a t
(** ['a Pool.t] is a pool with keys of type ['a] *)

exception Timeout
(** Raised when [get] encounters a timeout *)

(** [Instance] is used to store active instances of a plugin in a pool *)
module Instance : sig
type t
(** The instance type *)

val plugin: t -> Plugin.t
(** Get the inner plugin, this should not be stored anywhere separate from the [Instance.t] *)

val free: t -> unit
(** Return the instance back to the pool *)

val use: t -> (Plugin.t -> 'a) -> 'a
(** [use instance f] calls the function [f] with the instance's plugin, then calls [free] to return
the instance back to the pool *)
end

val create: int -> 'a t
(** [create n] makes a new [Pool.t] that allows at most [n] instances of each plugin *)

val add: 'a t -> 'a -> (unit -> Plugin.t) -> unit
(** [add pool key f] associates a plugin initialization function, [f], with the given [key] *)

val count: 'a t -> 'a -> int
(** [count pool key] get the current active instance count for [key] *)

val get_opt: 'a t -> 'a -> Instance.t option
(** [get_opt pool key] attempts to get an active instance for [key], returning [None] if they are
all in use *)

val get : ?timeout:float -> 'a t -> 'a -> Instance.t
(** [get ?timeout pool key] blocks until an active instance is available, or if the timeout is reached.
This will raise `Timeout` if [timeout] seconds have passed.
*)
end