Skip to content

Commit 4266662

Browse files
author
David Roberts
authored
[ML] Change inference cache to store only the inner part of results (#2376)
Previously the inference cache stored complete results, including a request ID and time taken. This was inefficient as it then meant the original response had to be parsed and modified before sending back to the Java side. This PR changes the cache to store just the inner portion of the inference result. Then the outer layer is added per request after retrieving from the cache. Additionally, the result writing functions are moved into a class of their own, which means they can be unit tested. Companion to elastic/elasticsearch#88901
1 parent 5f57234 commit 4266662

File tree

11 files changed

+478
-279
lines changed

11 files changed

+478
-279
lines changed

bin/pytorch_inference/CCommandParser.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class CCommandParser {
5959
class CRequestCacheInterface {
6060
public:
6161
using TComputeResponse = std::function<std::string(SRequest)>;
62-
using TReadResponse = std::function<void(const std::string&)>;
62+
using TReadResponse = std::function<void(const std::string&, bool)>;
6363

6464
public:
6565
virtual ~CRequestCacheInterface() = default;
@@ -102,7 +102,7 @@ class CCommandParser {
102102
bool lookup(SRequest request,
103103
const TComputeResponse& computeResponse,
104104
const TReadResponse& readResponse) override {
105-
readResponse(computeResponse(std::move(request)));
105+
readResponse(computeResponse(std::move(request)), false);
106106
return false;
107107
}
108108

bin/pytorch_inference/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,6 @@ ml_add_executable(pytorch_inference
2525
CBufferedIStreamAdapter.cc
2626
CCmdLineParser.cc
2727
CCommandParser.cc
28+
CResultWriter.cc
2829
CThreadSettings.cc
2930
)
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the following additional limitation. Functionality enabled by the
5+
* files subject to the Elastic License 2.0 may only be used in production when
6+
* invoked by an Elasticsearch process with a license key installed that permits
7+
* use of machine learning features. You may not use this file except in
8+
* compliance with the Elastic License 2.0 and the foregoing additional
9+
* limitation.
10+
*/
11+
12+
#include "CResultWriter.h"
13+
14+
#include <core/CRapidJsonConcurrentLineWriter.h>
15+
16+
#include "CCommandParser.h"
17+
#include "CThreadSettings.h"
18+
19+
namespace ml {
20+
namespace torch {
21+
22+
const std::string CResultWriter::RESULT{"result"};
23+
const std::string CResultWriter::INFERENCE{"inference"};
24+
const std::string CResultWriter::ERROR{"error"};
25+
const std::string CResultWriter::TIME_MS{"time_ms"};
26+
const std::string CResultWriter::CACHE_HIT{"cache_hit"};
27+
const std::string CResultWriter::THREAD_SETTINGS{"thread_settings"};
28+
const std::string CResultWriter::ACK{"ack"};
29+
const std::string CResultWriter::ACKNOWLEDGED{"acknowledged"};
30+
const std::string CResultWriter::NUM_ALLOCATIONS{"num_allocations"};
31+
const std::string CResultWriter::NUM_THREADS_PER_ALLOCATION{"num_threads_per_allocation"};
32+
33+
CResultWriter::CResultWriter(std::ostream& strmOut)
34+
: m_WrappedOutputStream{strmOut} {
35+
}
36+
37+
void CResultWriter::writeInnerError(const std::string& message,
38+
TRapidJsonLineWriter& jsonWriter) {
39+
jsonWriter.Key(ERROR);
40+
jsonWriter.StartObject();
41+
jsonWriter.Key(ERROR);
42+
jsonWriter.String(message);
43+
jsonWriter.EndObject();
44+
}
45+
46+
void CResultWriter::writeError(const std::string& requestId, const std::string& message) {
47+
core::CRapidJsonConcurrentLineWriter jsonWriter{m_WrappedOutputStream};
48+
jsonWriter.StartObject();
49+
jsonWriter.Key(CCommandParser::REQUEST_ID);
50+
jsonWriter.String(requestId);
51+
writeInnerError(message, jsonWriter);
52+
jsonWriter.EndObject();
53+
}
54+
55+
void CResultWriter::wrapAndWriteInnerResponse(const std::string& innerResponse,
56+
const std::string& requestId,
57+
bool isCacheHit,
58+
std::uint64_t timeMs) {
59+
core::CRapidJsonConcurrentLineWriter jsonWriter{m_WrappedOutputStream};
60+
jsonWriter.StartObject();
61+
jsonWriter.Key(CCommandParser::REQUEST_ID);
62+
jsonWriter.String(requestId);
63+
jsonWriter.Key(CACHE_HIT);
64+
jsonWriter.Bool(isCacheHit);
65+
jsonWriter.Key(TIME_MS);
66+
jsonWriter.Uint64(timeMs);
67+
jsonWriter.RawValue(innerResponse.c_str(), innerResponse.length(), rapidjson::kObjectType);
68+
jsonWriter.EndObject();
69+
}
70+
71+
void CResultWriter::writeThreadSettings(const std::string& requestId,
72+
const CThreadSettings& threadSettings) {
73+
core::CRapidJsonConcurrentLineWriter jsonWriter{m_WrappedOutputStream};
74+
jsonWriter.StartObject();
75+
jsonWriter.Key(CCommandParser::REQUEST_ID);
76+
jsonWriter.String(requestId);
77+
jsonWriter.Key(THREAD_SETTINGS);
78+
jsonWriter.StartObject();
79+
jsonWriter.Key(NUM_THREADS_PER_ALLOCATION);
80+
jsonWriter.Uint(threadSettings.numThreadsPerAllocation());
81+
jsonWriter.Key(NUM_ALLOCATIONS);
82+
jsonWriter.Uint(threadSettings.numAllocations());
83+
jsonWriter.EndObject();
84+
jsonWriter.EndObject();
85+
}
86+
87+
void CResultWriter::writeSimpleAck(const std::string& requestId) {
88+
core::CRapidJsonConcurrentLineWriter jsonWriter{m_WrappedOutputStream};
89+
jsonWriter.StartObject();
90+
jsonWriter.Key(ml::torch::CCommandParser::REQUEST_ID);
91+
jsonWriter.String(requestId);
92+
jsonWriter.Key(ACK);
93+
jsonWriter.StartObject();
94+
jsonWriter.Key(ACKNOWLEDGED);
95+
jsonWriter.Bool(true);
96+
jsonWriter.EndObject();
97+
jsonWriter.EndObject();
98+
}
99+
100+
std::string CResultWriter::createInnerResult(const ::torch::Tensor& results) {
101+
rapidjson::StringBuffer stringBuffer;
102+
{
103+
TRapidJsonLineWriter jsonWriter{stringBuffer};
104+
// Even though we don't really want the outer braces on the
105+
// inner result we have to write them or else the JSON
106+
// writer will not put commas in the correct places.
107+
jsonWriter.StartObject();
108+
try {
109+
auto sizes = results.sizes();
110+
111+
switch (sizes.size()) {
112+
case 3:
113+
this->writePrediction<3>(results, jsonWriter);
114+
break;
115+
case 2:
116+
this->writePrediction<2>(results, jsonWriter);
117+
break;
118+
default: {
119+
std::ostringstream ss;
120+
ss << "Cannot convert results tensor of size [" << sizes << ']';
121+
writeInnerError(ss.str(), jsonWriter);
122+
break;
123+
}
124+
}
125+
} catch (const c10::Error& e) {
126+
writeInnerError(e.what(), jsonWriter);
127+
} catch (const std::runtime_error& e) {
128+
writeInnerError(e.what(), jsonWriter);
129+
}
130+
jsonWriter.EndObject();
131+
}
132+
// Return the object without the opening and closing braces and
133+
// the trailing newline. The resulting partial document will
134+
// later be wrapped, so does not need these.
135+
return std::string{stringBuffer.GetString() + 1, stringBuffer.GetLength() - 3};
136+
}
137+
}
138+
}
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0 and the following additional limitation. Functionality enabled by the
5+
* files subject to the Elastic License 2.0 may only be used in production when
6+
* invoked by an Elasticsearch process with a license key installed that permits
7+
* use of machine learning features. You may not use this file except in
8+
* compliance with the Elastic License 2.0 and the foregoing additional
9+
* limitation.
10+
*/
11+
12+
#ifndef INCLUDED_ml_torch_CResultWriter_h
13+
#define INCLUDED_ml_torch_CResultWriter_h
14+
15+
#include <core/CJsonOutputStreamWrapper.h>
16+
#include <core/CRapidJsonLineWriter.h>
17+
18+
#include <rapidjson/stringbuffer.h>
19+
#include <torch/csrc/api/include/torch/types.h>
20+
21+
#include <cstdint>
22+
#include <iosfwd>
23+
#include <sstream>
24+
#include <string>
25+
26+
namespace ml {
27+
namespace torch {
28+
class CThreadSettings;
29+
30+
//! \brief
31+
//! Formats and writes results for PyTorch inference.
32+
//!
33+
//! DESCRIPTION:\n
34+
//! There are four types of result:
35+
//!
36+
//! 1. Inference results
37+
//! 2. Thread settings
38+
//! 3. Acknowledgements
39+
//! 4. Errors
40+
//!
41+
//! IMPLEMENTATION DECISIONS:\n
42+
//! We can cache inference results and errors, but when we reply with a
43+
//! cached value we still need to change the request ID, time taken, and
44+
//! cache hit indicator. Therefore this class contains functionality for
45+
//! building the invariant portion of results to be cached and later
46+
//! spliced into a complete response.
47+
//!
48+
class CResultWriter {
49+
public:
50+
using TRapidJsonLineWriter = core::CRapidJsonLineWriter<rapidjson::StringBuffer>;
51+
52+
public:
53+
explicit CResultWriter(std::ostream& strmOut);
54+
55+
//! No copying
56+
CResultWriter(const CResultWriter&) = delete;
57+
CResultWriter& operator=(const CResultWriter&) = delete;
58+
59+
//! Write an error directly to the output stream.
60+
void writeError(const std::string& requestId, const std::string& message);
61+
62+
//! Write thread settings to the output stream.
63+
void writeThreadSettings(const std::string& requestId, const CThreadSettings& threadSettings);
64+
65+
//! Write a simple acknowledgement to the output stream.
66+
void writeSimpleAck(const std::string& requestId);
67+
68+
//! Wrap the invariant portion of a cached result with request ID,
69+
//! cache hit indicator and time taken. Then write the full document
70+
//! to the output stream.
71+
void wrapAndWriteInnerResponse(const std::string& innerResponse,
72+
const std::string& requestId,
73+
bool isCacheHit,
74+
std::uint64_t timeMs);
75+
76+
//! Write the prediction portion of an inference result.
77+
template<std::size_t N>
78+
void writePrediction(const ::torch::Tensor& prediction, TRapidJsonLineWriter& jsonWriter) {
79+
80+
// Creating the accessor will throw if the tensor does not have exactly
81+
// N dimensions. Do this before writing any output so the error message
82+
// isn't mingled with a partial result.
83+
84+
if (prediction.dtype() == ::torch::kFloat32) {
85+
auto accessor = prediction.accessor<float, N>();
86+
this->writeInferenceResults(accessor, jsonWriter);
87+
88+
} else if (prediction.dtype() == ::torch::kFloat64) {
89+
auto accessor = prediction.accessor<double, N>();
90+
this->writeInferenceResults(accessor, jsonWriter);
91+
92+
} else {
93+
std::ostringstream ss;
94+
ss << "Cannot process result tensor of type [" << prediction.dtype() << ']';
95+
writeInnerError(ss.str(), jsonWriter);
96+
}
97+
}
98+
99+
//! Create the invariant portion of an inference result, suitable for
100+
//! caching and later splicing into a full result.
101+
std::string createInnerResult(const ::torch::Tensor& results);
102+
103+
private:
104+
//! Field names.
105+
static const std::string RESULT;
106+
static const std::string INFERENCE;
107+
static const std::string ERROR;
108+
static const std::string TIME_MS;
109+
static const std::string CACHE_HIT;
110+
static const std::string THREAD_SETTINGS;
111+
static const std::string ACK;
112+
static const std::string ACKNOWLEDGED;
113+
static const std::string NUM_ALLOCATIONS;
114+
static const std::string NUM_THREADS_PER_ALLOCATION;
115+
116+
private:
117+
//! Create the invariant portion of an error result, suitable for
118+
//! caching and later splicing into a full result.
119+
static void writeInnerError(const std::string& message, TRapidJsonLineWriter& jsonWriter);
120+
121+
//! Write a one dimensional tensor.
122+
template<typename T>
123+
void writeTensor(const ::torch::TensorAccessor<T, 1UL>& accessor,
124+
TRapidJsonLineWriter& jsonWriter) {
125+
jsonWriter.StartArray();
126+
for (int i = 0; i < accessor.size(0); ++i) {
127+
jsonWriter.Double(static_cast<double>(accessor[i]));
128+
}
129+
jsonWriter.EndArray();
130+
}
131+
132+
//! Write an N dimensional tensor for N > 1.
133+
template<typename T, std::size_t N_DIMS>
134+
void writeTensor(const ::torch::TensorAccessor<T, N_DIMS>& accessor,
135+
TRapidJsonLineWriter& jsonWriter) {
136+
jsonWriter.StartArray();
137+
for (int i = 0; i < accessor.size(0); ++i) {
138+
this->writeTensor(accessor[i], jsonWriter);
139+
}
140+
jsonWriter.EndArray();
141+
}
142+
143+
//! Write a 3D inference result
144+
template<typename T>
145+
void writeInferenceResults(const ::torch::TensorAccessor<T, 3UL>& accessor,
146+
TRapidJsonLineWriter& jsonWriter) {
147+
148+
jsonWriter.Key(RESULT);
149+
jsonWriter.StartObject();
150+
jsonWriter.Key(INFERENCE);
151+
this->writeTensor(accessor, jsonWriter);
152+
jsonWriter.EndObject();
153+
}
154+
155+
//! Write a 2D inference result
156+
template<typename T>
157+
void writeInferenceResults(const ::torch::TensorAccessor<T, 2UL>& accessor,
158+
TRapidJsonLineWriter& jsonWriter) {
159+
160+
jsonWriter.Key(RESULT);
161+
jsonWriter.StartObject();
162+
jsonWriter.Key(INFERENCE);
163+
// The Java side requires a 3D array, so wrap the 2D result in an
164+
// extra outer array.
165+
jsonWriter.StartArray();
166+
this->writeTensor(accessor, jsonWriter);
167+
jsonWriter.EndArray();
168+
jsonWriter.EndObject();
169+
}
170+
171+
private:
172+
core::CJsonOutputStreamWrapper m_WrappedOutputStream;
173+
};
174+
}
175+
}
176+
177+
#endif // INCLUDED_ml_torch_CResultWriter_h

0 commit comments

Comments
 (0)