Skip to content
This repository was archived by the owner on Nov 16, 2023. It is now read-only.

Commit afe4038

Browse files
piethsganik
authored andcommitted
Initial implementation of DateTime input and output column support. (#290)
* Add support for DateTime output. * Add support for DateTime input columns. * Add unit test for DateTime column input and output. * Fix DateTime.Kind == Unspecified output from dprep. * Update the csproj files to point to the latest nuget packages. * Update the Tensorflow.NET library version. * Fix azureml dprep not available for Python 2.7 * Fix missing sys import. * Fix broken assertEqual on Python 3.5.
1 parent 115b992 commit afe4038

File tree

9 files changed

+216
-3
lines changed

9 files changed

+216
-3
lines changed

src/DotNetBridge/NativeDataInterop.cs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,7 @@ private static unsafe void SendViewToNativeAsDataFrame(IChannel ch, EnvironmentB
182182
case InternalDataKind.R8:
183183
case InternalDataKind.BL:
184184
case InternalDataKind.TX:
185+
case InternalDataKind.DT:
185186
break;
186187
}
187188
keyCard = -1;
@@ -624,6 +625,17 @@ public static BufferFillerBase Create(EnvironmentBlock* penv, DataViewRow input,
624625
ValuePoker<ulong> pokeU8 =
625626
(ulong value, int col, long m, long n) => fnU8(penv, col, m, n, value);
626627
return new Impl<ulong>(input, pyCol, idvCol, type, pokeU8);
628+
case InternalDataKind.DT:
629+
var fnDT = MarshalDelegate<I8Setter>(setter);
630+
ValuePoker<DateTime> pokeDT =
631+
(DateTime value, int col, long m, long n) =>
632+
{
633+
DateTimeOffset dto = (value.Kind == DateTimeKind.Unspecified) ?
634+
new DateTimeOffset(value, TimeSpan.Zero) :
635+
new DateTimeOffset(value);
636+
fnDT(penv, col, m, n, dto.ToUnixTimeMilliseconds());
637+
};
638+
return new Impl<DateTime>(input, pyCol, idvCol, type, pokeDT);
627639
case InternalDataKind.TX:
628640
var fnTX = MarshalDelegate<TXSetter>(setter);
629641
ValuePoker<ReadOnlyMemory<char>> pokeTX =

src/DotNetBridge/NativeDataView.cs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,10 @@ public NativeDataView(IHostEnvironment env, DataSourceBlock* pdata)
142142
case InternalDataKind.Text:
143143
columns.Add(new TextColumn(pdata, pdata->getters[c], c, name));
144144
break;
145+
case InternalDataKind.DT:
146+
if (pdata->vecCards[c] == -1)
147+
columns.Add(new DateTimeColumn(pdata, pdata->getters[c], c, name));
148+
break;
145149
}
146150
}
147151

@@ -866,6 +870,31 @@ public override void Dispose()
866870
}
867871
}
868872

873+
private sealed class DateTimeColumn : Column<DateTime>
874+
{
875+
private I8Getter _getter;
876+
877+
public DateTimeColumn(DataSourceBlock* data, void* getter, int colIndex, string name)
878+
: base(data, colIndex, name, DateTimeDataViewType.Instance)
879+
{
880+
_getter = MarshalDelegate<I8Getter>(getter);
881+
}
882+
883+
public override void CopyOut(long index, Batch batch, ref DateTime value)
884+
{
885+
Contracts.Check(Data != null, AlreadyDisposed);
886+
Contracts.Assert(0 <= index);
887+
_getter(Data, ColIndex, index, out var val);
888+
value = DateTimeOffset.FromUnixTimeMilliseconds(val).UtcDateTime;
889+
}
890+
891+
public override void Dispose()
892+
{
893+
_getter = null;
894+
base.Dispose();
895+
}
896+
}
897+
869898
private sealed class TextColumn : Column<ReadOnlyMemory<char>>
870899
{
871900
private TXGetter _getter;

src/NativeBridge/DataViewInterop.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,10 @@ DataSourceBlock::DataSourceBlock(bp::dict& data)
9898
kind = R8;
9999
pgetter = (void*)&GetR8;
100100
break;
101+
case (ML_PY_DATETIME):
102+
kind = DT;
103+
pgetter = (void*)&GetI8;
104+
break;
101105
default:
102106
throw std::invalid_argument("column " + colName + " has unsupported type");
103107
}

src/NativeBridge/DataViewInterop.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -545,5 +545,6 @@ enum ML_PY_TYPE_MAP_ENUM {
545545
ML_PY_CAT = 'c',
546546
ML_PY_TEXT = 't',
547547
ML_PY_UNICODE = 'u',
548+
ML_PY_DATETIME = 'z',
548549
ML_PY_UNSUPPORTED = 'x'
549550
};

src/NativeBridge/ManagedInterop.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ void EnvironmentBlock::DataSinkCore(const DataViewBlock * pdata)
8181
case I4:
8282
_vset.push_back((void*)&SetI4);
8383
break;
84+
case DT:
8485
case I8:
8586
_vset.push_back((void*)&SetI8);
8687
break;
@@ -106,7 +107,6 @@ void EnvironmentBlock::DataSinkCore(const DataViewBlock * pdata)
106107
_vset.push_back((void*)&SetTX);
107108
break;
108109
case TS: // tbd
109-
case DT: // tbd
110110
case DZ: // tbd
111111
default:
112112
throw std::invalid_argument("data type is not supported " + std::to_string(kind));
@@ -259,8 +259,10 @@ bp::dict EnvironmentBlock::GetData()
259259
AddToDict(std::string);
260260
delete column;
261261
break;
262-
case TS:
263262
case DT:
263+
AddToDict(CxInt64);
264+
break;
265+
case TS:
264266
case DZ:
265267
default:
266268
throw std::invalid_argument("data type is not supported " + std::to_string(kind));

src/NativeBridge/PythonInterop.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ PyColumnBase::creation_map* PyColumnBase::CreateSingleMap()
4040
map->insert(creation_map_entry(R4, CreateSingle<float>));
4141
map->insert(creation_map_entry(R8, CreateSingle<double>));
4242
map->insert(creation_map_entry(TX, CreateSingle<std::string>));
43+
map->insert(creation_map_entry(DT, CreateSingle<CxInt64>));
4344
return map;
4445
}
4546

@@ -161,6 +162,19 @@ void PyColumnSingle<T>::AddToDict(bp::dict& dict,
161162
bp::make_tuple(sizeof(T)), bp::object(h));
162163
}
163164
break;
165+
case DataKind::DT:
166+
{
167+
bp::handle<> h(::PyCapsule_New((void*)this, NULL, (PyCapsule_Destructor)&destroyManagerCObject));
168+
np::ndarray npdata = np::from_data(
169+
data,
170+
np::dtype::get_builtin<T>(),
171+
bp::make_tuple(_pData->size()),
172+
bp::make_tuple(sizeof(T)), bp::object(h));
173+
174+
dict[name] = bp::dict();
175+
dict[name]["..DateTime"] = npdata;
176+
}
177+
break;
164178
}
165179
}
166180

src/python/nimbusml.pyproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -661,6 +661,7 @@
661661
<Compile Include="nimbusml\preprocessing\__init__.py" />
662662
<Compile Include="nimbusml\tests\cluster\test_kmeansplusplus.py" />
663663
<Compile Include="nimbusml\tests\cluster\__init__.py" />
664+
<Compile Include="nimbusml\tests\data_type\test_datetime.py" />
664665
<Compile Include="nimbusml\tests\data_type\test_numeric.py" />
665666
<Compile Include="nimbusml\tests\data_type\test_text.py" />
666667
<Compile Include="nimbusml\tests\data_type\test_text_label.py" />

src/python/nimbusml/internal/utils/dataframes.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
import numpy as np
88
import six
9-
from pandas import DataFrame, Series, concat, Categorical
9+
from pandas import DataFrame, Series, concat, Categorical, to_datetime
1010
from pandas.api.types import infer_dtype
1111
from scipy.sparse import csr_matrix
1212

@@ -47,6 +47,13 @@ def resolve_dataframe(dataframe):
4747
# Workaround, empty dataframe needs to be sent as an array
4848
# to convey type information
4949
ret[name_i] = serie.values.reshape((len(serie), 1))
50+
51+
elif serie.dtype == np.dtype('datetime64[ns]'):
52+
values = serie.values.astype(np.int64, copy=False)
53+
values = values // 1000000 # convert from nanoseconds to milliseconds
54+
ret[str(i)] = values
55+
types.append(_global_dtype_to_char_dict[np.dtype('datetime64[ns]')])
56+
5057
elif serie.dtype == np.object or str(serie.dtype) == '<U1':
5158
# This column might still be numeric, so we do another
5259
# check.
@@ -208,6 +215,8 @@ def resolve_output_as_dataframe(ret):
208215
for key in ret.keys():
209216
if not isinstance(ret[key], dict):
210217
data[key] = ret[key]
218+
elif "..DateTime" in ret[key]:
219+
data[key] = to_datetime(ret[key]["..DateTime"], unit='ms')
211220
else:
212221
data[key] = Categorical.from_codes(
213222
ret[key]["..Data"], ret[key]["..KeyValues"])
@@ -241,5 +250,6 @@ def resolve_output_as_csrmatrix(ret):
241250
np.dtype(np.float64): 'd',
242251
np.dtype(np.string_): 't',
243252
np.dtype(np.unicode): 'u',
253+
np.dtype('datetime64[ns]'): 'z',
244254
'unsupported': 'x'
245255
}
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
# --------------------------------------------------------------------------------------------
2+
# Copyright (c) Microsoft Corporation. All rights reserved.
3+
# Licensed under the MIT License.
4+
# --------------------------------------------------------------------------------------------
5+
import os
6+
import sys
7+
import unittest
8+
import tempfile
9+
10+
import numpy as np
11+
import pandas as pd
12+
from nimbusml import Pipeline, DprepDataStream
13+
from nimbusml.preprocessing.missing_values import Handler
14+
15+
16+
def get_temp_file(suffix=None):
17+
fd, file_name = tempfile.mkstemp(suffix=suffix)
18+
fl = os.fdopen(fd, 'w')
19+
fl.close()
20+
return file_name
21+
22+
23+
class TestDateTimeDataType(unittest.TestCase):
24+
def test_negative_values(self):
25+
milliseconds_in_year = 365*24*60*60*1000
26+
data = [i * milliseconds_in_year for i in [-1, -2, -3, -3.3]]
27+
28+
df = pd.DataFrame({'c1': data, 'c2': [3,4,5,6]})
29+
df = df.astype({'c1': np.dtype('datetime64[ms]')})
30+
31+
pipeline = Pipeline(steps=[Handler(columns={'c2': 'c2'})])
32+
result = pipeline.fit_transform(df)
33+
34+
self.assertTrue(result.loc[:, 'c1'].equals(df.loc[:, 'c1']))
35+
self.assertEqual(result.loc[:, 'c1'].dtype, np.dtype('datetime64[ns]'))
36+
37+
self.assertEqual(result.loc[0, 'c1'].year, 1969)
38+
self.assertEqual(result.loc[0, 'c1'].hour, 0)
39+
self.assertEqual(result.loc[0, 'c1'].minute, 0)
40+
self.assertEqual(result.loc[0, 'c1'].second, 0)
41+
42+
self.assertEqual(result.loc[3, 'c1'].year, 1966)
43+
44+
def test_timestamp_boundaries(self):
45+
# Here are the current min and max for a Pandas Timestamp
46+
# 1677-09-21 00:12:43.145225
47+
# 2262-04-11 23:47:16.854775807
48+
49+
data = [pd.Timestamp(1677, 9, 22, 1), pd.Timestamp.max]
50+
df = pd.DataFrame({'c1': data, 'c2': [3,4]})
51+
df = df.astype({'c1': np.dtype('datetime64[ms]')})
52+
53+
pipeline = Pipeline(steps=[Handler(columns={'c2': 'c2'})])
54+
result = pipeline.fit_transform(df)
55+
56+
self.assertTrue(result.loc[:, 'c1'].equals(df.loc[:, 'c1']))
57+
self.assertEqual(result.dtypes[0], np.dtype('datetime64[ns]'))
58+
59+
self.assertEqual(result.loc[0, 'c1'].year, 1677)
60+
self.assertEqual(result.loc[0, 'c1'].month, 9)
61+
self.assertEqual(result.loc[0, 'c1'].day, 22)
62+
63+
self.assertEqual(result.loc[1, 'c1'].year, 2262)
64+
self.assertEqual(result.loc[1, 'c1'].month, 4)
65+
self.assertEqual(result.loc[1, 'c1'].day, 11)
66+
67+
def test_datetime_column_parsed_from_string(self):
68+
dates = ["2018-01-02", "2018-02-01"]
69+
df = pd.DataFrame({'c1': dates, 'c2': [3,4]})
70+
71+
file_name = get_temp_file('.csv')
72+
df.to_csv(file_name)
73+
df = pd.read_csv(file_name, parse_dates=['c1'], index_col=0)
74+
75+
self.assertEqual(df.dtypes[0], np.dtype('datetime64[ns]'))
76+
77+
pipeline = Pipeline(steps=[Handler(columns={'c2': 'c2'})])
78+
result = pipeline.fit_transform(df)
79+
80+
self.assertEqual(result.loc[0, 'c1'].year, 2018)
81+
self.assertEqual(result.loc[0, 'c1'].month, 1)
82+
self.assertEqual(result.loc[0, 'c1'].day, 2)
83+
self.assertEqual(result.loc[0, 'c1'].hour, 0)
84+
self.assertEqual(result.loc[0, 'c1'].minute, 0)
85+
self.assertEqual(result.loc[0, 'c1'].second, 0)
86+
87+
self.assertEqual(result.loc[1, 'c1'].year, 2018)
88+
self.assertEqual(result.loc[1, 'c1'].month, 2)
89+
self.assertEqual(result.loc[1, 'c1'].day, 1)
90+
self.assertEqual(result.loc[1, 'c1'].hour, 0)
91+
self.assertEqual(result.loc[1, 'c1'].minute, 0)
92+
self.assertEqual(result.loc[1, 'c1'].second, 0)
93+
94+
self.assertEqual(len(result), 2)
95+
self.assertEqual(result.dtypes[0], np.dtype('datetime64[ns]'))
96+
97+
os.remove(file_name)
98+
99+
@unittest.skipIf(sys.version_info[:2] == (2, 7), "azureml-dataprep is not installed.")
100+
def test_dprep_datastream(self):
101+
import azureml.dataprep as dprep
102+
103+
dates = ["2018-01-02 00:00:00", "2018-02-01 10:00:00"]
104+
col2 = ['0', '1']
105+
label_array = np.repeat([0], 2)
106+
train_df = pd.DataFrame({'col1': dates, 'col2': col2, 'label': label_array})
107+
108+
pipeline = Pipeline(steps=[
109+
Handler(columns={'2': 'col2'}, concat=False, impute_by_slot=True, replace_with='Mean')
110+
])
111+
112+
file_name = get_temp_file('.csv')
113+
train_df.to_csv(file_name)
114+
115+
dataflow = dprep.read_csv(file_name, infer_column_types=True)
116+
dprepDataStream = DprepDataStream(dataflow)
117+
118+
result = pipeline.fit_transform(dprepDataStream)
119+
120+
self.assertEqual(result.loc[:, 'col1'].dtype, np.dtype('datetime64[ns]'))
121+
122+
self.assertEqual(result.loc[0, 'col1'].year, 2018)
123+
self.assertEqual(result.loc[0, 'col1'].month, 1)
124+
self.assertEqual(result.loc[0, 'col1'].day, 2)
125+
self.assertEqual(result.loc[0, 'col1'].hour, 0)
126+
self.assertEqual(result.loc[0, 'col1'].minute, 0)
127+
self.assertEqual(result.loc[0, 'col1'].second, 0)
128+
129+
self.assertEqual(result.loc[1, 'col1'].year, 2018)
130+
self.assertEqual(result.loc[1, 'col1'].month, 2)
131+
self.assertEqual(result.loc[1, 'col1'].day, 1)
132+
self.assertEqual(result.loc[1, 'col1'].hour, 10)
133+
self.assertEqual(result.loc[1, 'col1'].minute, 0)
134+
self.assertEqual(result.loc[1, 'col1'].second, 0)
135+
136+
os.remove(file_name)
137+
138+
139+
if __name__ == '__main__':
140+
unittest.main()

0 commit comments

Comments
 (0)