@@ -14,6 +14,7 @@ limitations under the License.
1414==============================================================================*/
1515
1616#include " tensorflow/core/framework/op_kernel.h"
17+ #include " tensorflow_io/core/kernels/io_interface.h"
1718#include " tensorflow_io/core/prometheus_go.h"
1819
1920namespace tensorflow {
@@ -74,5 +75,138 @@ REGISTER_KERNEL_BUILDER(Name("ReadPrometheus").Device(DEVICE_CPU),
7475
7576
7677} // namespace
78+
79+
80+ class PrometheusIndexable : public IOIndexableInterface {
81+ public:
82+ PrometheusIndexable (Env* env)
83+ : env_(env) {}
84+
85+ ~PrometheusIndexable () {}
86+ Status Init (const std::vector<string>& input, const std::vector<string>& metadata, const void * memory_data, const int64 memory_size) override {
87+ if (input.size () > 1 ) {
88+ return errors::InvalidArgument (" more than 1 query is not supported" );
89+ }
90+ const string& query = input[0 ];
91+
92+ string endpoint = " http://localhost:9090" ;
93+ for (size_t i = 0 ; i < metadata.size (); i++) {
94+ if (metadata[i].find_first_of (" endpoint: " ) == 0 ) {
95+ endpoint = metadata[i].substr (8 );
96+ }
97+ }
98+
99+ int64 ts = time (NULL );
100+
101+ GoString endpoint_go = {endpoint.c_str (), static_cast <int64>(endpoint.size ())};
102+ GoString query_go = {query.c_str (), static_cast <int64>(query.size ())};
103+
104+ GoSlice timestamp_go = {0 , 0 , 0 };
105+ GoSlice value_go = {0 , 0 , 0 };
106+
107+ GoInt returned = Query (endpoint_go, query_go, ts, timestamp_go, value_go);
108+ if (returned < 0 ) {
109+ return errors::InvalidArgument (" unable to query prometheus" );
110+ }
111+
112+ timestamp_.resize (returned);
113+ value_.resize (returned);
114+
115+ if (returned > 0 ) {
116+ timestamp_go.data = ×tamp_[0 ];
117+ timestamp_go.len = returned;
118+ timestamp_go.cap = returned;
119+ value_go.data = &value_[0 ];
120+ value_go.len = returned;
121+ value_go.cap = returned;
122+
123+ returned = Query (endpoint_go, query_go, ts, timestamp_go, value_go);
124+ if (returned < 0 ) {
125+ return errors::InvalidArgument (" unable to query prometheus to get the value" );
126+ }
127+ }
128+
129+ for (size_t i = 0 ; i < metadata.size (); i++) {
130+ if (metadata[i].find_first_of (" column: " ) == 0 ) {
131+ columns_.emplace_back (metadata[i].substr (8 ));
132+ }
133+ }
134+ if (columns_.size () == 0 ) {
135+ columns_.emplace_back (" timestamp" );
136+ columns_.emplace_back (" value" );
137+ }
138+
139+ for (size_t i = 0 ; i < columns_.size (); i++) {
140+ if (columns_[i] == " timestamp" ) {
141+ dtypes_.emplace_back (DT_INT64);
142+ shapes_.emplace_back (TensorShape ({static_cast <int64>(returned)}));
143+ } else if (columns_[i] == " value" ) {
144+ dtypes_.emplace_back (DT_DOUBLE);
145+ shapes_.emplace_back (TensorShape ({static_cast <int64>(returned)}));
146+ } else {
147+ return errors::InvalidArgument (" column name other than `timestamp` or `value` is not supported: " , columns_[i]);
148+ }
149+ }
150+
151+ return Status::OK ();
152+ }
153+ Status Spec (std::vector<DataType>& dtypes, std::vector<PartialTensorShape>& shapes) override {
154+ dtypes.clear ();
155+ for (size_t i = 0 ; i < dtypes_.size (); i++) {
156+ dtypes.push_back (dtypes_[i]);
157+ }
158+ shapes.clear ();
159+ for (size_t i = 0 ; i < shapes_.size (); i++) {
160+ shapes.push_back (shapes_[i]);
161+ }
162+ return Status::OK ();
163+ }
164+
165+ Status Extra (std::vector<Tensor>* extra) override {
166+ // Expose columns
167+ Tensor columns (DT_STRING, TensorShape ({static_cast <int64>(columns_.size ())}));
168+ for (size_t i = 0 ; i < columns_.size (); i++) {
169+ columns.flat <string>()(i) = columns_[i];
170+ }
171+ extra->push_back (columns);
172+ return Status::OK ();
173+ }
174+
175+ Status GetItem (const int64 start, const int64 stop, const int64 step, std::vector<Tensor>& tensors) override {
176+ if (step != 1 ) {
177+ return errors::InvalidArgument (" step " , step, " is not supported" );
178+ }
179+ for (size_t i = 0 ; i < columns_.size (); i++) {
180+ if (columns_[i] == " timestamp" ) {
181+ memcpy (&tensors[i].flat <int64>().data ()[start], ×tamp_[0 ], sizeof (int64) * (stop - start));
182+ } else {
183+ memcpy (&tensors[i].flat <double >().data ()[start], &value_[0 ], sizeof (double ) * (stop - start));
184+ }
185+ }
186+
187+ return Status::OK ();
188+ }
189+
190+ string DebugString () const override {
191+ mutex_lock l (mu_);
192+ return strings::StrCat (" PrometheusIndexable" );
193+ }
194+ private:
195+ mutable mutex mu_;
196+ Env* env_ GUARDED_BY (mu_);
197+
198+ std::vector<DataType> dtypes_;
199+ std::vector<TensorShape> shapes_;
200+ std::vector<string> columns_;
201+
202+ std::vector<int64> timestamp_;
203+ std::vector<double > value_;
204+ };
205+
206+ REGISTER_KERNEL_BUILDER (Name(" PrometheusIndexableInit" ).Device(DEVICE_CPU),
207+ IOInterfaceInitOp<PrometheusIndexable>);
208+ REGISTER_KERNEL_BUILDER (Name(" PrometheusIndexableGetItem" ).Device(DEVICE_CPU),
209+ IOIndexableGetItemOp<PrometheusIndexable>);
210+
77211} // namespace data
78212} // namespace tensorflow
0 commit comments