From 36bc624335bd2e605171a5ae2a0ad5c3a8d4b1f4 Mon Sep 17 00:00:00 2001 From: Marta Iborra Date: Fri, 29 Jul 2022 12:39:12 +0200 Subject: [PATCH 1/5] Update submodule to branch --- iarray/iarray-c-develop | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iarray/iarray-c-develop b/iarray/iarray-c-develop index 8357abc3..36f81453 160000 --- a/iarray/iarray-c-develop +++ b/iarray/iarray-c-develop @@ -1 +1 @@ -Subproject commit 8357abc3f5c0115dc16706ecf11df0f897ce61dd +Subproject commit 36f81453055c9d1b7772ae6598cb4235ab5176df From c73646339a878e63b82b0d1804c6e60ef961090e Mon Sep 17 00:00:00 2001 From: Marta Iborra Date: Fri, 29 Jul 2022 12:47:37 +0200 Subject: [PATCH 2/5] Preliminary commit. Support remote access to iarrays --- .../reference/convenience_functions.rst | 1 + iarray/__init__.py | 6 ++ iarray/ciarray_ext.pxd | 9 ++ iarray/http_server.py | 44 +++++++++ iarray/iarray_ext.pyx | 49 +++++++++- iarray/tests/test_server.py | 94 +++++++++++++++++++ iarray/utils.py | 66 ++++++++++++- requirements-runtime.txt | 4 + scripts/iarr_server.py | 19 ++++ setup.py | 1 + 10 files changed, 291 insertions(+), 2 deletions(-) create mode 100644 iarray/http_server.py create mode 100644 iarray/tests/test_server.py create mode 100644 scripts/iarr_server.py diff --git a/doc/source/reference/convenience_functions.rst b/doc/source/reference/convenience_functions.rst index 90f85c7b..52850d0d 100644 --- a/doc/source/reference/convenience_functions.rst +++ b/doc/source/reference/convenience_functions.rst @@ -12,6 +12,7 @@ Convenience functions load open + list_arrays save cmp_arrays iarray2numpy diff --git a/iarray/__init__.py b/iarray/__init__.py index f439176f..7625244b 100644 --- a/iarray/__init__.py +++ b/iarray/__init__.py @@ -256,6 +256,7 @@ class Reduce(Enum): numpy2iarray, _check_access_mode, remove_urlpath, + list_arrays, ) # random constructors (follow NumPy convention) @@ -279,6 +280,11 @@ class Reduce(Enum): Attributes, ) +# Global catalog +global_catalog = {} + +from . import http_server + # For some reason this needs to go to the end, else matmul function does not work. from . import iarray_ext as ext diff --git a/iarray/ciarray_ext.pxd b/iarray/ciarray_ext.pxd index 29798571..c5a49fe4 100644 --- a/iarray/ciarray_ext.pxd +++ b/iarray/ciarray_ext.pxd @@ -654,3 +654,12 @@ cdef extern from "libiarray/iarray.h": void *buffer, int64_t *buffer_shape, int64_t buffer_size); + + # Server + ctypedef int32_t (*rhandler_ptr) (char *server_urlpath, char *array_urlpath, int64_t nchunk, + int32_t start, int32_t nitems, int32_t destsize, uint8_t *cblock); + ina_rc_t iarray_add_request_postfilter(iarray_container_t *src, char *server_urlpath, char *urlpath, + rhandler_ptr request_handler); + + ina_rc_t iarray_server_job(iarray_context_t *ctx, iarray_container_t *a, int64_t nchunk, + int32_t start, int32_t nitems, int32_t size, uint8_t *dest, int32_t *block_size); diff --git a/iarray/http_server.py b/iarray/http_server.py new file mode 100644 index 00000000..55639272 --- /dev/null +++ b/iarray/http_server.py @@ -0,0 +1,44 @@ +from pydantic import BaseModel +import iarray as ia +from fastapi import FastAPI, Response, HTTPException +import numpy as np +from iarray import iarray_ext as ext + + +app = FastAPI() + + +class Get_block(BaseModel): + array_id: str + nchunk: int + start: int + nitems: int + size: int + + +@app.get("/v1/catalog/") +async def catalog(): + return list(ia.global_catalog.keys()) + + +@app.get("/v1/meta/") +async def meta(array_id: str): + cat = ia.global_catalog + if array_id not in cat.keys(): + raise HTTPException(status_code=404, detail=array_id + " not in catalog") + arr = cat[array_id] + return {"shape": arr.shape, "chunks": arr.chunks, "blocks": arr.blocks, "dtype": np.dtype(arr.dtype).str} + + +@app.post("/v1/blocks/") +async def blocks(params: Get_block): + cat = ia.global_catalog + if params.array_id not in cat.keys(): + raise HTTPException(status_code=404, detail=cat[params.array_id] + " not in catalog") + iarr = cat[params.array_id] + try: + res = ext._server_job(iarr, params.nchunk, params.start, params.nitems, params.size) + except: + raise HTTPException(status_code=500, detail="could not get the block") + + return Response(content=res) diff --git a/iarray/iarray_ext.pyx b/iarray/iarray_ext.pyx index a5bb3483..56e9f8d4 100644 --- a/iarray/iarray_ext.pyx +++ b/iarray/iarray_ext.pyx @@ -22,15 +22,18 @@ import zarr import s3fs import cython from cpython.pycapsule cimport PyCapsule_New, PyCapsule_GetPointer -from libc.stdlib cimport malloc, free +from libc.stdlib cimport malloc, free, realloc from libc.string cimport memcpy import iarray as ia from iarray import udf +import requests +import json from cpython cimport ( PyObject_GetBuffer, PyBuffer_Release, PyBUF_SIMPLE, + PyBytes_FromStringAndSize, ) # dtype conversion tables: udf <-> iarray @@ -1927,6 +1930,50 @@ def set_zproxy_postfilter(iarr): iarray_check(ciarray.iarray_add_zproxy_postfilter(c, urlpath, func)) +def set_request_postfilter(iarr, server_urlpath, array_urlpath): + cdef ciarray.iarray_container_t *c + c = PyCapsule_GetPointer(iarr.to_capsule(), "iarray_container_t*") + cdef ciarray.rhandler_ptr func = server_accessor + server_urlpath = server_urlpath.encode("utf-8") if isinstance(server_urlpath, str) else server_urlpath + array_urlpath = array_urlpath.encode("utf-8") if isinstance(array_urlpath, str) else array_urlpath + + iarray_check(ciarray.iarray_add_request_postfilter(c, server_urlpath, array_urlpath, func)) + + +cdef ciarray.int32_t server_accessor(char *server_urlpath, char *array_id, + ciarray.int64_t nchunk, ciarray.int32_t start, + ciarray.int32_t nitems, ciarray.int32_t size, ciarray.uint8_t* cblock) with gil: + pyarray_id = array_id.decode() + pyserver_urlpath = server_urlpath.decode() + url = pyserver_urlpath + "/v1/blocks/" + params = {"array_id": pyarray_id, "nchunk": nchunk, "start": start, "nitems": nitems, "size": size} + r = requests.post(url, json.dumps(params)) + + cdef Py_buffer *buf = malloc(sizeof(Py_buffer)) + PyObject_GetBuffer(r.content, buf, PyBUF_SIMPLE) + cdef ciarray.int32_t len_ = buf.len + memcpy(cblock, buf.buf, len_) + PyBuffer_Release(buf) + return len_ + + +def _server_job(iarr, nchunk, start, nitems, size): + cdef ciarray.iarray_container_t *c + c = PyCapsule_GetPointer(iarr.to_capsule(), "iarray_container_t*") + ctx = Context(iarr.cfg) + cdef ciarray.iarray_context_t *ctx_ + ctx_ = PyCapsule_GetPointer(ctx.to_capsule(), "iarray_context_t*") + + res = b"0" * (size + 32) # BLOSC_MAX_OVERHEAD + cdef ciarray.int32_t csize + cdef Py_buffer *buf = malloc(sizeof(Py_buffer)) + PyObject_GetBuffer(res, buf, PyBUF_SIMPLE) + iarray_check(ciarray.iarray_server_job(ctx_, c, nchunk, start, nitems, size, buf.buf, &csize)) + PyBuffer_Release(buf) + + return res[:csize] + + cdef class UdfLibrary: """ Library for scalar UDF functions. diff --git a/iarray/tests/test_server.py b/iarray/tests/test_server.py new file mode 100644 index 00000000..a3509d31 --- /dev/null +++ b/iarray/tests/test_server.py @@ -0,0 +1,94 @@ +import pytest +import iarray as ia +import json +import requests +import subprocess +import time +import numpy as np +import numpy.testing +import os + + +ia.ones(shape=[12, 13, 3], urlpath="ones.iarr", mode="w") +ia.arange(shape=[10, 10], urlpath="arange.iarr", dtype=np.int32, mode="w") + +dir_path = os.path.dirname(os.path.realpath(__file__)) +host = "127.0.0.1" +port = "28800" + +server = subprocess.Popen(["python", "../../scripts/iarr_server.py", host, port, dir_path]) +time.sleep(5) +print(server.stdout) + +@pytest.fixture(scope="module", autouse=True) +def terminate_server(): + yield + server.terminate() + ia.remove_urlpath("ones.iarr") + ia.remove_urlpath("arange.iarr") + + +def test_catalog(): + response = requests.get("http://" + host + ":" + port + "/v1/catalog/") + res = json.loads(response.text) + assert res == ["ones.iarr", "arange.iarr"] + assert ia.list_arrays(host, port) == res + + +@pytest.mark.parametrize( + "array_id, res", [ + ("ones.iarr", {"shape": [12, 13, 3], "chunks": [8, 8, 2], "blocks": [4, 8, 2], "dtype": " ia.IArray: Parameters ---------- urlpath : str - The url path to read. + The url path to read. If it starts with "iarr://", it opens a view from a remote array. In + this case, the urlpath must be of the form "iarr://host:port/array_urlpath". Default port is 28800. mode : str The open mode. This parameter supersedes the mode in the default :class:`Config`. @@ -167,12 +171,72 @@ def open(urlpath: str, mode="a") -> ia.IArray: save : Save an array to disk. """ cfg = ia.get_config_defaults() + + if urlpath[:7] == "iarr://": + host_port, array_id = urlpath[7:].split("/") + params = host_port.split(":") + if len(params) == 1: + # Default value for port + port = 28800 + else: + port = params[1] + host = params[0] + + url = "http://" + host + ":" + port + with ia.config(cfg=cfg, mode=mode, nthreads=1) as cfg: + return request_view(server_urlpath=url, array_id=array_id, cfg=cfg) + if not os.path.exists(urlpath): raise IOError("The file does not exist.") with ia.config(cfg=cfg, mode=mode) as cfg: return ext.open(cfg, urlpath) +def request_view(server_urlpath: str, array_id: str, cfg: ia.Config = None, **kwargs) -> ia.IArray: + if cfg is None: + cfg = ia.get_config_defaults() + # Get meta to create empty array + url = server_urlpath + "/v1/meta/" + "?array_id=" + array_id + r = requests.get(url) + params = json.loads(r.text) + kwargs["chunks"] = params["chunks"] + kwargs["blocks"] = params["blocks"] + dtype = np.dtype(params["dtype"]).type + kwargs["dtype"] = dtype + + with ia.config(cfg=cfg, urlpath=None, **kwargs) as cfg: + a = ia.uninit(shape=params["shape"], cfg=cfg) + + # Assign postfilter + ext.set_request_postfilter(a, server_urlpath, array_id) + return a + + +def list_arrays(host: str, port: Union[str, int] = 28800): + """ + Return a list of the registered arrays in the server. + + Parameters + ---------- + host: str + server ip + port: str or int + The port to use to connect to the host. + + Returns + ------- + out: list + List of the arrays that the client has access to. + + Notes + ----- + The server must be running when calling this function. + + """ + response = requests.get("http://" + host + ":" + str(port) + "/v1/catalog/") + return json.loads(response.text) + + # TODO: are cfg and kwargs needed here? def iarray2numpy(iarr: ia.IArray, cfg: ia.Config = None, **kwargs) -> np.ndarray: """Convert an ironArray array into a NumPy array. diff --git a/requirements-runtime.txt b/requirements-runtime.txt index 4cf74983..2d75b0df 100644 --- a/requirements-runtime.txt +++ b/requirements-runtime.txt @@ -3,3 +3,7 @@ llvmlite ndindex msgpack ast_decompiler +fastapi +uvicorn +json +requests diff --git a/scripts/iarr_server.py b/scripts/iarr_server.py new file mode 100644 index 00000000..58ffdbf7 --- /dev/null +++ b/scripts/iarr_server.py @@ -0,0 +1,19 @@ +import sys +import os +import iarray as ia +import uvicorn + + +# python iarr_server.py host port "path to folder where all the arrays to view are" +host = str(sys.argv[1]) +port = int(sys.argv[2]) +path = str(sys.argv[3]) + + +# Add arrays to catalog +for file in os.listdir(path): + array_id = os.path.join(path, file) + if file.endswith('.iarr'): + ia.global_catalog[file] = ia.open(array_id) + +uvicorn.run(ia.http_server.app, host=host, port=port) diff --git a/setup.py b/setup.py index 7e3db830..82ba2ed6 100644 --- a/setup.py +++ b/setup.py @@ -85,4 +85,5 @@ def get_version(rel_path): python_requires=">=3.8", extras_require={"doc": doc_deps, "examples": examples_deps}, **package_info, + scripts=["scripts/iarr_server", "iarray/http_server"], ) From 04421c35938258963470b630318e376c587b8709 Mon Sep 17 00:00:00 2001 From: Marta Iborra Date: Fri, 29 Jul 2022 13:18:28 +0200 Subject: [PATCH 3/5] Restore setup.py --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 82ba2ed6..7e3f7614 100644 --- a/setup.py +++ b/setup.py @@ -85,5 +85,5 @@ def get_version(rel_path): python_requires=">=3.8", extras_require={"doc": doc_deps, "examples": examples_deps}, **package_info, - scripts=["scripts/iarr_server", "iarray/http_server"], + # scripts=["scripts/iarr_server"], ) From 25ccf6ff1234b96d3ea41e2d823ba7344dd00a7e Mon Sep 17 00:00:00 2001 From: Marta Iborra Date: Fri, 29 Jul 2022 13:31:22 +0200 Subject: [PATCH 4/5] remove json from requirements --- requirements-runtime.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/requirements-runtime.txt b/requirements-runtime.txt index 2d75b0df..6eee6ba8 100644 --- a/requirements-runtime.txt +++ b/requirements-runtime.txt @@ -5,5 +5,4 @@ msgpack ast_decompiler fastapi uvicorn -json requests From a5625f1a70f676d7f4b9ef2b228947c309c08188 Mon Sep 17 00:00:00 2001 From: Marta Iborra Date: Fri, 29 Jul 2022 14:03:54 +0200 Subject: [PATCH 5/5] Add global variable HTTP_PORT --- iarray/__init__.py | 1 + iarray/tests/test_server.py | 4 ++-- iarray/utils.py | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/iarray/__init__.py b/iarray/__init__.py index 7625244b..26eb8621 100644 --- a/iarray/__init__.py +++ b/iarray/__init__.py @@ -282,6 +282,7 @@ class Reduce(Enum): # Global catalog global_catalog = {} +HTTP_PORT = 28800 from . import http_server diff --git a/iarray/tests/test_server.py b/iarray/tests/test_server.py index a3509d31..a0eb07b6 100644 --- a/iarray/tests/test_server.py +++ b/iarray/tests/test_server.py @@ -14,11 +14,11 @@ dir_path = os.path.dirname(os.path.realpath(__file__)) host = "127.0.0.1" -port = "28800" +port = str(ia.HTTP_PORT) server = subprocess.Popen(["python", "../../scripts/iarr_server.py", host, port, dir_path]) time.sleep(5) -print(server.stdout) + @pytest.fixture(scope="module", autouse=True) def terminate_server(): diff --git a/iarray/utils.py b/iarray/utils.py index fe1a2e3e..ab852524 100644 --- a/iarray/utils.py +++ b/iarray/utils.py @@ -177,7 +177,7 @@ def open(urlpath: str, mode="a") -> ia.IArray: params = host_port.split(":") if len(params) == 1: # Default value for port - port = 28800 + port = ia.HTTP_PORT else: port = params[1] host = params[0]