Skip to content
This repository was archived by the owner on Nov 16, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/DotNetBridge/NativeDataInterop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ private static unsafe void SendViewToNativeAsDataFrame(IChannel ch, EnvironmentB
case InternalDataKind.R8:
case InternalDataKind.BL:
case InternalDataKind.TX:
case InternalDataKind.DT:
break;
}
keyCard = -1;
Expand Down Expand Up @@ -624,6 +625,17 @@ public static BufferFillerBase Create(EnvironmentBlock* penv, DataViewRow input,
ValuePoker<ulong> pokeU8 =
(ulong value, int col, long m, long n) => fnU8(penv, col, m, n, value);
return new Impl<ulong>(input, pyCol, idvCol, type, pokeU8);
case InternalDataKind.DT:
var fnDT = MarshalDelegate<I8Setter>(setter);
ValuePoker<DateTime> pokeDT =
(DateTime value, int col, long m, long n) =>
{
DateTimeOffset dto = (value.Kind == DateTimeKind.Unspecified) ?
new DateTimeOffset(value, TimeSpan.Zero) :
new DateTimeOffset(value);
fnDT(penv, col, m, n, dto.ToUnixTimeMilliseconds());
};
return new Impl<DateTime>(input, pyCol, idvCol, type, pokeDT);
case InternalDataKind.TX:
var fnTX = MarshalDelegate<TXSetter>(setter);
ValuePoker<ReadOnlyMemory<char>> pokeTX =
Expand Down
29 changes: 29 additions & 0 deletions src/DotNetBridge/NativeDataView.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ public NativeDataView(IHostEnvironment env, DataSourceBlock* pdata)
case InternalDataKind.Text:
columns.Add(new TextColumn(pdata, pdata->getters[c], c, name));
break;
case InternalDataKind.DT:
if (pdata->vecCards[c] == -1)
columns.Add(new DateTimeColumn(pdata, pdata->getters[c], c, name));
break;
}
}

Expand Down Expand Up @@ -866,6 +870,31 @@ public override void Dispose()
}
}

private sealed class DateTimeColumn : Column<DateTime>
{
private I8Getter _getter;

public DateTimeColumn(DataSourceBlock* data, void* getter, int colIndex, string name)
: base(data, colIndex, name, DateTimeDataViewType.Instance)
{
_getter = MarshalDelegate<I8Getter>(getter);
}

public override void CopyOut(long index, Batch batch, ref DateTime value)
{
Contracts.Check(Data != null, AlreadyDisposed);
Contracts.Assert(0 <= index);
_getter(Data, ColIndex, index, out var val);
value = DateTimeOffset.FromUnixTimeMilliseconds(val).UtcDateTime;
}

public override void Dispose()
{
_getter = null;
base.Dispose();
}
}

private sealed class TextColumn : Column<ReadOnlyMemory<char>>
{
private TXGetter _getter;
Expand Down
4 changes: 4 additions & 0 deletions src/NativeBridge/DataViewInterop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ DataSourceBlock::DataSourceBlock(bp::dict& data)
kind = R8;
pgetter = (void*)&GetR8;
break;
case (ML_PY_DATETIME):
kind = DT;
pgetter = (void*)&GetI8;
break;
default:
throw std::invalid_argument("column " + colName + " has unsupported type");
}
Expand Down
1 change: 1 addition & 0 deletions src/NativeBridge/DataViewInterop.h
Original file line number Diff line number Diff line change
Expand Up @@ -545,5 +545,6 @@ enum ML_PY_TYPE_MAP_ENUM {
ML_PY_CAT = 'c',
ML_PY_TEXT = 't',
ML_PY_UNICODE = 'u',
ML_PY_DATETIME = 'z',
ML_PY_UNSUPPORTED = 'x'
};
6 changes: 4 additions & 2 deletions src/NativeBridge/ManagedInterop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ void EnvironmentBlock::DataSinkCore(const DataViewBlock * pdata)
case I4:
_vset.push_back((void*)&SetI4);
break;
case DT:
case I8:
_vset.push_back((void*)&SetI8);
break;
Expand All @@ -106,7 +107,6 @@ void EnvironmentBlock::DataSinkCore(const DataViewBlock * pdata)
_vset.push_back((void*)&SetTX);
break;
case TS: // tbd
case DT: // tbd
case DZ: // tbd
default:
throw std::invalid_argument("data type is not supported " + std::to_string(kind));
Expand Down Expand Up @@ -259,8 +259,10 @@ bp::dict EnvironmentBlock::GetData()
AddToDict(std::string);
delete column;
break;
case TS:
case DT:
AddToDict(CxInt64);
break;
case TS:
case DZ:
default:
throw std::invalid_argument("data type is not supported " + std::to_string(kind));
Expand Down
14 changes: 14 additions & 0 deletions src/NativeBridge/PythonInterop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ PyColumnBase::creation_map* PyColumnBase::CreateSingleMap()
map->insert(creation_map_entry(R4, CreateSingle<float>));
map->insert(creation_map_entry(R8, CreateSingle<double>));
map->insert(creation_map_entry(TX, CreateSingle<std::string>));
map->insert(creation_map_entry(DT, CreateSingle<CxInt64>));
return map;
}

Expand Down Expand Up @@ -161,6 +162,19 @@ void PyColumnSingle<T>::AddToDict(bp::dict& dict,
bp::make_tuple(sizeof(T)), bp::object(h));
}
break;
case DataKind::DT:
{
bp::handle<> h(::PyCapsule_New((void*)this, NULL, (PyCapsule_Destructor)&destroyManagerCObject));
np::ndarray npdata = np::from_data(
data,
np::dtype::get_builtin<T>(),
bp::make_tuple(_pData->size()),
bp::make_tuple(sizeof(T)), bp::object(h));

dict[name] = bp::dict();
dict[name]["..DateTime"] = npdata;
}
break;
}
}

Expand Down
1 change: 1 addition & 0 deletions src/python/nimbusml.pyproj
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,7 @@
<Compile Include="nimbusml\preprocessing\__init__.py" />
<Compile Include="nimbusml\tests\cluster\test_kmeansplusplus.py" />
<Compile Include="nimbusml\tests\cluster\__init__.py" />
<Compile Include="nimbusml\tests\data_type\test_datetime.py" />
<Compile Include="nimbusml\tests\data_type\test_numeric.py" />
<Compile Include="nimbusml\tests\data_type\test_text.py" />
<Compile Include="nimbusml\tests\data_type\test_text_label.py" />
Expand Down
12 changes: 11 additions & 1 deletion src/python/nimbusml/internal/utils/dataframes.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import numpy as np
import six
from pandas import DataFrame, Series, concat, Categorical
from pandas import DataFrame, Series, concat, Categorical, to_datetime
from pandas.api.types import infer_dtype
from scipy.sparse import csr_matrix

Expand Down Expand Up @@ -47,6 +47,13 @@ def resolve_dataframe(dataframe):
# Workaround, empty dataframe needs to be sent as an array
# to convey type information
ret[name_i] = serie.values.reshape((len(serie), 1))

elif serie.dtype == np.dtype('datetime64[ns]'):
values = serie.values.astype(np.int64, copy=False)
values = values // 1000000 # convert from nanoseconds to milliseconds
ret[str(i)] = values
types.append(_global_dtype_to_char_dict[np.dtype('datetime64[ns]')])

elif serie.dtype == np.object or str(serie.dtype) == '<U1':
# This column might still be numeric, so we do another
# check.
Expand Down Expand Up @@ -208,6 +215,8 @@ def resolve_output_as_dataframe(ret):
for key in ret.keys():
if not isinstance(ret[key], dict):
data[key] = ret[key]
elif "..DateTime" in ret[key]:
data[key] = to_datetime(ret[key]["..DateTime"], unit='ms')
else:
data[key] = Categorical.from_codes(
ret[key]["..Data"], ret[key]["..KeyValues"])
Expand Down Expand Up @@ -241,5 +250,6 @@ def resolve_output_as_csrmatrix(ret):
np.dtype(np.float64): 'd',
np.dtype(np.string_): 't',
np.dtype(np.unicode): 'u',
np.dtype('datetime64[ns]'): 'z',
'unsupported': 'x'
}
140 changes: 140 additions & 0 deletions src/python/nimbusml/tests/data_type/test_datetime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
# --------------------------------------------------------------------------------------------
import os
import sys
import unittest
import tempfile

import numpy as np
import pandas as pd
from nimbusml import Pipeline, DprepDataStream
from nimbusml.preprocessing.missing_values import Handler


def get_temp_file(suffix=None):
fd, file_name = tempfile.mkstemp(suffix=suffix)
fl = os.fdopen(fd, 'w')
fl.close()
return file_name


class TestDateTimeDataType(unittest.TestCase):
def test_negative_values(self):
milliseconds_in_year = 365*24*60*60*1000
data = [i * milliseconds_in_year for i in [-1, -2, -3, -3.3]]

df = pd.DataFrame({'c1': data, 'c2': [3,4,5,6]})
df = df.astype({'c1': np.dtype('datetime64[ms]')})

pipeline = Pipeline(steps=[Handler(columns={'c2': 'c2'})])
result = pipeline.fit_transform(df)

self.assertTrue(result.loc[:, 'c1'].equals(df.loc[:, 'c1']))
self.assertEqual(result.loc[:, 'c1'].dtype, np.dtype('datetime64[ns]'))

self.assertEqual(result.loc[0, 'c1'].year, 1969)
self.assertEqual(result.loc[0, 'c1'].hour, 0)
self.assertEqual(result.loc[0, 'c1'].minute, 0)
self.assertEqual(result.loc[0, 'c1'].second, 0)

self.assertEqual(result.loc[3, 'c1'].year, 1966)

def test_timestamp_boundaries(self):
# Here are the current min and max for a Pandas Timestamp
# 1677-09-21 00:12:43.145225
# 2262-04-11 23:47:16.854775807

data = [pd.Timestamp(1677, 9, 22, 1), pd.Timestamp.max]
df = pd.DataFrame({'c1': data, 'c2': [3,4]})
df = df.astype({'c1': np.dtype('datetime64[ms]')})

pipeline = Pipeline(steps=[Handler(columns={'c2': 'c2'})])
result = pipeline.fit_transform(df)

self.assertTrue(result.loc[:, 'c1'].equals(df.loc[:, 'c1']))
self.assertEqual(result.dtypes[0], np.dtype('datetime64[ns]'))

self.assertEqual(result.loc[0, 'c1'].year, 1677)
self.assertEqual(result.loc[0, 'c1'].month, 9)
self.assertEqual(result.loc[0, 'c1'].day, 22)

self.assertEqual(result.loc[1, 'c1'].year, 2262)
self.assertEqual(result.loc[1, 'c1'].month, 4)
self.assertEqual(result.loc[1, 'c1'].day, 11)

def test_datetime_column_parsed_from_string(self):
dates = ["2018-01-02", "2018-02-01"]
df = pd.DataFrame({'c1': dates, 'c2': [3,4]})

file_name = get_temp_file('.csv')
df.to_csv(file_name)
df = pd.read_csv(file_name, parse_dates=['c1'], index_col=0)

self.assertEqual(df.dtypes[0], np.dtype('datetime64[ns]'))

pipeline = Pipeline(steps=[Handler(columns={'c2': 'c2'})])
result = pipeline.fit_transform(df)

self.assertEqual(result.loc[0, 'c1'].year, 2018)
self.assertEqual(result.loc[0, 'c1'].month, 1)
self.assertEqual(result.loc[0, 'c1'].day, 2)
self.assertEqual(result.loc[0, 'c1'].hour, 0)
self.assertEqual(result.loc[0, 'c1'].minute, 0)
self.assertEqual(result.loc[0, 'c1'].second, 0)

self.assertEqual(result.loc[1, 'c1'].year, 2018)
self.assertEqual(result.loc[1, 'c1'].month, 2)
self.assertEqual(result.loc[1, 'c1'].day, 1)
self.assertEqual(result.loc[1, 'c1'].hour, 0)
self.assertEqual(result.loc[1, 'c1'].minute, 0)
self.assertEqual(result.loc[1, 'c1'].second, 0)

self.assertEqual(len(result), 2)
self.assertEqual(result.dtypes[0], np.dtype('datetime64[ns]'))

os.remove(file_name)

@unittest.skipIf(sys.version_info[:2] == (2, 7), "azureml-dataprep is not installed.")
def test_dprep_datastream(self):
import azureml.dataprep as dprep

dates = ["2018-01-02 00:00:00", "2018-02-01 10:00:00"]
col2 = ['0', '1']
label_array = np.repeat([0], 2)
train_df = pd.DataFrame({'col1': dates, 'col2': col2, 'label': label_array})

pipeline = Pipeline(steps=[
Handler(columns={'2': 'col2'}, concat=False, impute_by_slot=True, replace_with='Mean')
])

file_name = get_temp_file('.csv')
train_df.to_csv(file_name)

dataflow = dprep.read_csv(file_name, infer_column_types=True)
dprepDataStream = DprepDataStream(dataflow)

result = pipeline.fit_transform(dprepDataStream)

self.assertEqual(result.loc[:, 'col1'].dtype, np.dtype('datetime64[ns]'))

self.assertEqual(result.loc[0, 'col1'].year, 2018)
self.assertEqual(result.loc[0, 'col1'].month, 1)
self.assertEqual(result.loc[0, 'col1'].day, 2)
self.assertEqual(result.loc[0, 'col1'].hour, 0)
self.assertEqual(result.loc[0, 'col1'].minute, 0)
self.assertEqual(result.loc[0, 'col1'].second, 0)

self.assertEqual(result.loc[1, 'col1'].year, 2018)
self.assertEqual(result.loc[1, 'col1'].month, 2)
self.assertEqual(result.loc[1, 'col1'].day, 1)
self.assertEqual(result.loc[1, 'col1'].hour, 10)
self.assertEqual(result.loc[1, 'col1'].minute, 0)
self.assertEqual(result.loc[1, 'col1'].second, 0)

os.remove(file_name)


if __name__ == '__main__':
unittest.main()