Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/source/reference/convenience_functions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Convenience functions

load
open
list_arrays
save
cmp_arrays
iarray2numpy
Expand Down
7 changes: 7 additions & 0 deletions iarray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ class Reduce(Enum):
numpy2iarray,
_check_access_mode,
remove_urlpath,
list_arrays,
)

# random constructors (follow NumPy convention)
Expand All @@ -279,6 +280,12 @@ class Reduce(Enum):
Attributes,
)

# Global catalog
global_catalog = {}
HTTP_PORT = 28800

from . import http_server

# For some reason this needs to go to the end, else matmul function does not work.
from . import iarray_ext as ext

Expand Down
9 changes: 9 additions & 0 deletions iarray/ciarray_ext.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -654,3 +654,12 @@ cdef extern from "libiarray/iarray.h":
void *buffer,
int64_t *buffer_shape,
int64_t buffer_size);

# Server
ctypedef int32_t (*rhandler_ptr) (char *server_urlpath, char *array_urlpath, int64_t nchunk,
int32_t start, int32_t nitems, int32_t destsize, uint8_t *cblock);
ina_rc_t iarray_add_request_postfilter(iarray_container_t *src, char *server_urlpath, char *urlpath,
rhandler_ptr request_handler);

ina_rc_t iarray_server_job(iarray_context_t *ctx, iarray_container_t *a, int64_t nchunk,
int32_t start, int32_t nitems, int32_t size, uint8_t *dest, int32_t *block_size);
44 changes: 44 additions & 0 deletions iarray/http_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
from pydantic import BaseModel
import iarray as ia
from fastapi import FastAPI, Response, HTTPException
import numpy as np
from iarray import iarray_ext as ext


app = FastAPI()


class Get_block(BaseModel):
array_id: str
nchunk: int
start: int
nitems: int
size: int


@app.get("/v1/catalog/")
async def catalog():
return list(ia.global_catalog.keys())


@app.get("/v1/meta/")
async def meta(array_id: str):
cat = ia.global_catalog
if array_id not in cat.keys():
raise HTTPException(status_code=404, detail=array_id + " not in catalog")
arr = cat[array_id]
return {"shape": arr.shape, "chunks": arr.chunks, "blocks": arr.blocks, "dtype": np.dtype(arr.dtype).str}


@app.post("/v1/blocks/")
async def blocks(params: Get_block):
cat = ia.global_catalog
if params.array_id not in cat.keys():
raise HTTPException(status_code=404, detail=cat[params.array_id] + " not in catalog")
iarr = cat[params.array_id]
try:
res = ext._server_job(iarr, params.nchunk, params.start, params.nitems, params.size)
except:
raise HTTPException(status_code=500, detail="could not get the block")

return Response(content=res)
2 changes: 1 addition & 1 deletion iarray/iarray-c-develop
49 changes: 48 additions & 1 deletion iarray/iarray_ext.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,18 @@ import zarr
import s3fs
import cython
from cpython.pycapsule cimport PyCapsule_New, PyCapsule_GetPointer
from libc.stdlib cimport malloc, free
from libc.stdlib cimport malloc, free, realloc
from libc.string cimport memcpy
import iarray as ia
from iarray import udf
import requests
import json

from cpython cimport (
PyObject_GetBuffer,
PyBuffer_Release,
PyBUF_SIMPLE,
PyBytes_FromStringAndSize,
)

# dtype conversion tables: udf <-> iarray
Expand Down Expand Up @@ -1927,6 +1930,50 @@ def set_zproxy_postfilter(iarr):
iarray_check(ciarray.iarray_add_zproxy_postfilter(c, urlpath, func))


def set_request_postfilter(iarr, server_urlpath, array_urlpath):
cdef ciarray.iarray_container_t *c
c = <ciarray.iarray_container_t *> PyCapsule_GetPointer(iarr.to_capsule(), <char *> "iarray_container_t*")
cdef ciarray.rhandler_ptr func = server_accessor
server_urlpath = server_urlpath.encode("utf-8") if isinstance(server_urlpath, str) else server_urlpath
array_urlpath = array_urlpath.encode("utf-8") if isinstance(array_urlpath, str) else array_urlpath

iarray_check(ciarray.iarray_add_request_postfilter(c, server_urlpath, array_urlpath, func))


cdef ciarray.int32_t server_accessor(char *server_urlpath, char *array_id,
ciarray.int64_t nchunk, ciarray.int32_t start,
ciarray.int32_t nitems, ciarray.int32_t size, ciarray.uint8_t* cblock) with gil:
pyarray_id = array_id.decode()
pyserver_urlpath = server_urlpath.decode()
url = pyserver_urlpath + "/v1/blocks/"
params = {"array_id": pyarray_id, "nchunk": nchunk, "start": start, "nitems": nitems, "size": size}
r = requests.post(url, json.dumps(params))

cdef Py_buffer *buf = <Py_buffer *> malloc(sizeof(Py_buffer))
PyObject_GetBuffer(r.content, buf, PyBUF_SIMPLE)
cdef ciarray.int32_t len_ = buf.len
memcpy(cblock, buf.buf, len_)
PyBuffer_Release(buf)
return len_


def _server_job(iarr, nchunk, start, nitems, size):
cdef ciarray.iarray_container_t *c
c = <ciarray.iarray_container_t *> PyCapsule_GetPointer(iarr.to_capsule(), <char *> "iarray_container_t*")
ctx = Context(iarr.cfg)
cdef ciarray.iarray_context_t *ctx_
ctx_ = <ciarray.iarray_context_t *> PyCapsule_GetPointer(ctx.to_capsule(), <char *> "iarray_context_t*")

res = b"0" * (size + 32) # BLOSC_MAX_OVERHEAD
cdef ciarray.int32_t csize
cdef Py_buffer *buf = <Py_buffer *> malloc(sizeof(Py_buffer))
PyObject_GetBuffer(res, buf, PyBUF_SIMPLE)
iarray_check(ciarray.iarray_server_job(ctx_, c, nchunk, start, nitems, size, <ciarray.uint8_t *>buf.buf, &csize))
PyBuffer_Release(buf)

return res[:csize]


cdef class UdfLibrary:
"""
Library for scalar UDF functions.
Expand Down
94 changes: 94 additions & 0 deletions iarray/tests/test_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import pytest
import iarray as ia
import json
import requests
import subprocess
import time
import numpy as np
import numpy.testing
import os


ia.ones(shape=[12, 13, 3], urlpath="ones.iarr", mode="w")
ia.arange(shape=[10, 10], urlpath="arange.iarr", dtype=np.int32, mode="w")

dir_path = os.path.dirname(os.path.realpath(__file__))
host = "127.0.0.1"
port = str(ia.HTTP_PORT)

server = subprocess.Popen(["python", "../../scripts/iarr_server.py", host, port, dir_path])
time.sleep(5)


@pytest.fixture(scope="module", autouse=True)
def terminate_server():
yield
server.terminate()
ia.remove_urlpath("ones.iarr")
ia.remove_urlpath("arange.iarr")


def test_catalog():
response = requests.get("http://" + host + ":" + port + "/v1/catalog/")
res = json.loads(response.text)
assert res == ["ones.iarr", "arange.iarr"]
assert ia.list_arrays(host, port) == res


@pytest.mark.parametrize(
"array_id, res", [
("ones.iarr", {"shape": [12, 13, 3], "chunks": [8, 8, 2], "blocks": [4, 8, 2], "dtype": "<f8"}),
("arange.iarr", {"shape": [10, 10], "chunks": [8, 8], "blocks":[8, 8], "dtype": "<i4"}),
]
)
def test_meta(array_id, res):
url = "http://" + host + ":" + port + "/v1/meta/?array_id=" + array_id
response = requests.get(url)
assert json.loads(response.text) == res


@pytest.mark.parametrize(
"array_id", [
("ones.iarr"),
("arange.iarr"),
]
)
def test_copy(array_id):
urlpath = "iarr://" + host + ":" + port + "/" + array_id
a = ia.open(urlpath)

# Copy
c = a.data
d = a.copy()
numpy.testing.assert_array_equal(c, d.data)


@pytest.mark.parametrize(
"array_id, expression", [
("ones.iarr", "x- cos(0.5)"),
("arange.iarr", "x**2"),
]
)
def test_server_expr(array_id, expression):
urlpath = "iarr://" + host + ":" + port + "/" + array_id
a = ia.open(urlpath)
# Expr eval
ia.remove_urlpath("test_expression_zarray.iarr")
expr = ia.expr_from_string(
expression,
{"x": a},
urlpath="test_expression_zarray.iarr",
)
iout = expr.eval()
npout = ia.iarray2numpy(iout)

for ufunc in ia.MATH_FUNC_LIST:
if ufunc in expression:
idx = expression.find(ufunc)
# Prevent replacing an ufunc with np.ufunc twice (not terribly solid, but else, test will crash)
if "np." not in expression[idx - len("np.arc") : idx]:
expression = expression.replace(ufunc + "(", "np." + ufunc + "(")
npout2 = eval(expression, {"x": a.data, "np": np})
np.testing.assert_equal(npout, npout2)

ia.remove_urlpath(iout.cfg.urlpath)
66 changes: 65 additions & 1 deletion iarray/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# ("Confidential Information"). You shall not disclose such Confidential Information
# and shall use it only in accordance with the terms of the license agreement.
###########################################################################################
from typing import Union

import numpy as np

Expand All @@ -15,6 +16,8 @@
import os
import shutil

import json
import requests

zarr_to_iarray_dtypes = {
"int8": np.int8,
Expand Down Expand Up @@ -153,7 +156,8 @@ def open(urlpath: str, mode="a") -> ia.IArray:
Parameters
----------
urlpath : str
The url path to read.
The url path to read. If it starts with "iarr://", it opens a view from a remote array. In
this case, the urlpath must be of the form "iarr://host:port/array_urlpath". Default port is 28800.
mode : str
The open mode. This parameter supersedes the mode in the default :class:`Config`.

Expand All @@ -167,12 +171,72 @@ def open(urlpath: str, mode="a") -> ia.IArray:
save : Save an array to disk.
"""
cfg = ia.get_config_defaults()

if urlpath[:7] == "iarr://":
host_port, array_id = urlpath[7:].split("/")
params = host_port.split(":")
if len(params) == 1:
# Default value for port
port = ia.HTTP_PORT
else:
port = params[1]
host = params[0]

url = "http://" + host + ":" + port
with ia.config(cfg=cfg, mode=mode, nthreads=1) as cfg:
return request_view(server_urlpath=url, array_id=array_id, cfg=cfg)

if not os.path.exists(urlpath):
raise IOError("The file does not exist.")
with ia.config(cfg=cfg, mode=mode) as cfg:
return ext.open(cfg, urlpath)


def request_view(server_urlpath: str, array_id: str, cfg: ia.Config = None, **kwargs) -> ia.IArray:
if cfg is None:
cfg = ia.get_config_defaults()
# Get meta to create empty array
url = server_urlpath + "/v1/meta/" + "?array_id=" + array_id
r = requests.get(url)
params = json.loads(r.text)
kwargs["chunks"] = params["chunks"]
kwargs["blocks"] = params["blocks"]
dtype = np.dtype(params["dtype"]).type
kwargs["dtype"] = dtype

with ia.config(cfg=cfg, urlpath=None, **kwargs) as cfg:
a = ia.uninit(shape=params["shape"], cfg=cfg)

# Assign postfilter
ext.set_request_postfilter(a, server_urlpath, array_id)
return a


def list_arrays(host: str, port: Union[str, int] = 28800):
"""
Return a list of the registered arrays in the server.

Parameters
----------
host: str
server ip
port: str or int
The port to use to connect to the host.

Returns
-------
out: list
List of the arrays that the client has access to.

Notes
-----
The server must be running when calling this function.

"""
response = requests.get("http://" + host + ":" + str(port) + "/v1/catalog/")
return json.loads(response.text)


# TODO: are cfg and kwargs needed here?
def iarray2numpy(iarr: ia.IArray, cfg: ia.Config = None, **kwargs) -> np.ndarray:
"""Convert an ironArray array into a NumPy array.
Expand Down
3 changes: 3 additions & 0 deletions requirements-runtime.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,6 @@ llvmlite
ndindex
msgpack
ast_decompiler
fastapi
uvicorn
requests
19 changes: 19 additions & 0 deletions scripts/iarr_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import sys
import os
import iarray as ia
import uvicorn


# python iarr_server.py host port "path to folder where all the arrays to view are"
host = str(sys.argv[1])
port = int(sys.argv[2])
path = str(sys.argv[3])


# Add arrays to catalog
for file in os.listdir(path):
array_id = os.path.join(path, file)
if file.endswith('.iarr'):
ia.global_catalog[file] = ia.open(array_id)

uvicorn.run(ia.http_server.app, host=host, port=port)
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,5 @@ def get_version(rel_path):
python_requires=">=3.8",
extras_require={"doc": doc_deps, "examples": examples_deps},
**package_info,
# scripts=["scripts/iarr_server"],
)