Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 25 additions & 12 deletions include/api/CForecastRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@ struct testValidateNoExpiry;
struct testValidateInvalidExpiry;
struct testValidateBrokenMessage;
struct testValidateMissingId;
struct testValidateProvidedMinDiskSpace;
struct testValidateProvidedMaxMemoryLimit;
struct testValidateProvidedTooLargeMaxMemoryLimit;
struct testSufficientDiskSpace;
}

namespace ml {
Expand Down Expand Up @@ -93,11 +95,16 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable {
//! The purpose of this value is to guard the rest of the system against
//! running out of disk space.
//! minimum disk space required for disk persistence
static const std::size_t MIN_FORECAST_AVAILABLE_DISK_SPACE = 4294967296ull; // 4GB
//! (not defined inline because we need its address)
static const std::size_t DEFAULT_MIN_FORECAST_AVAILABLE_DISK_SPACE;

//! minimum time between stat updates to prevent to many updates in a short time
static const std::uint64_t MINIMUM_TIME_ELAPSED_FOR_STATS_UPDATE = 3000ul; // 3s

//! default bounds percentile
//! (not defined inline because we need its address)
static const double DEFAULT_BOUNDS_PERCENTILE;

private:
static const std::string ERROR_FORECAST_REQUEST_FAILED_TO_PARSE;
static const std::string ERROR_NO_FORECAST_ID;
Expand Down Expand Up @@ -167,7 +174,7 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable {

private:
struct API_EXPORT SForecast {
SForecast();
SForecast() {}

SForecast(SForecast&& other) noexcept;
SForecast& operator=(SForecast&& other) noexcept;
Expand All @@ -191,31 +198,34 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable {
TForecastResultSeriesVec s_ForecastSeries;

//! Forecast create time
core_t::TTime s_CreateTime;
core_t::TTime s_CreateTime{0};

//! Forecast start time
core_t::TTime s_StartTime;
core_t::TTime s_StartTime{0};

//! Forecast duration
core_t::TTime s_Duration;
core_t::TTime s_Duration{0};

//! Expiration of the forecast (for automatic deletion)
core_t::TTime s_ExpiryTime;
core_t::TTime s_ExpiryTime{0};

//! Forecast bounds
double s_BoundsPercentile;
double s_BoundsPercentile{DEFAULT_BOUNDS_PERCENTILE};

//! total number of models
std::size_t s_NumberOfModels;
std::size_t s_NumberOfModels{0};

//! total number of models able to forecast
std::size_t s_NumberOfForecastableModels;
std::size_t s_NumberOfForecastableModels{0};

//! total memory required for this forecasting job (only the models)
std::size_t s_MemoryUsage;
std::size_t s_MemoryUsage{0};

//! maximum allowed memory (in bytes) that this forecast can use
std::size_t s_MaxForecastModelMemory;
std::size_t s_MaxForecastModelMemory{DEFAULT_MAX_FORECAST_MODEL_MEMORY};

//! minimum free disk space (in bytes) for a forecast to use disk
std::size_t s_MinForecastAvailableDiskSpace{DEFAULT_MIN_FORECAST_AVAILABLE_DISK_SPACE};

//! A collection storing important messages from forecasting
TStrUSet s_Messages;
Expand All @@ -236,7 +246,8 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable {
bool tryGetJob(SForecast& forecastJob);

//! check for sufficient disk space
bool sufficientAvailableDiskSpace(const boost::filesystem::path& path);
static bool sufficientAvailableDiskSpace(std::size_t minForecastAvailableDiskSpace,
const boost::filesystem::path& path);

//! pushes new jobs into the internal 'queue' (thread boundary)
bool push(SForecast& forecastJob);
Expand Down Expand Up @@ -299,8 +310,10 @@ class API_EXPORT CForecastRunner final : private core::CNonCopyable {
friend struct CForecastRunnerTest::testValidateInvalidExpiry;
friend struct CForecastRunnerTest::testValidateBrokenMessage;
friend struct CForecastRunnerTest::testValidateMissingId;
friend struct CForecastRunnerTest::testValidateProvidedMinDiskSpace;
friend struct CForecastRunnerTest::testValidateProvidedMaxMemoryLimit;
friend struct CForecastRunnerTest::testValidateProvidedTooLargeMaxMemoryLimit;
friend struct CForecastRunnerTest::testSufficientDiskSpace;
};
}
}
Expand Down
32 changes: 16 additions & 16 deletions lib/api/CForecastRunner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ const std::string EMPTY_STRING;
}

const std::size_t CForecastRunner::DEFAULT_MAX_FORECAST_MODEL_MEMORY{20971520}; // 20MB
const std::size_t CForecastRunner::DEFAULT_MIN_FORECAST_AVAILABLE_DISK_SPACE{4294967296ull}; // 4GB
const double CForecastRunner::DEFAULT_BOUNDS_PERCENTILE{0.95};

const std::string CForecastRunner::ERROR_FORECAST_REQUEST_FAILED_TO_PARSE("Failed to parse forecast request: ");
const std::string CForecastRunner::ERROR_NO_FORECAST_ID("forecast ID must be specified and non empty");
Expand All @@ -49,13 +51,6 @@ const std::string CForecastRunner::INFO_DEFAULT_DURATION("Forecast duration not
const std::string CForecastRunner::INFO_DEFAULT_EXPIRY("Forecast expires_in not specified, setting to 14 days");
const std::string CForecastRunner::INFO_NO_MODELS_CAN_CURRENTLY_BE_FORECAST("Insufficient history to forecast for all models");

CForecastRunner::SForecast::SForecast()
: s_ForecastId(), s_ForecastAlias(), s_ForecastSeries(), s_CreateTime(0),
s_StartTime(0), s_Duration(0), s_ExpiryTime(0), s_BoundsPercentile(0),
s_NumberOfModels(0), s_NumberOfForecastableModels(0), s_MemoryUsage(0),
s_Messages(), s_TemporaryFolder() {
}

CForecastRunner::SForecast::SForecast(SForecast&& other) noexcept
: s_ForecastId(std::move(other.s_ForecastId)),
s_ForecastAlias(std::move(other.s_ForecastAlias)),
Expand Down Expand Up @@ -345,7 +340,8 @@ bool CForecastRunner::pushForecastJob(const std::string& controlMessage,
if (totalMemoryUsage >= forecastJob.s_MaxForecastModelMemory) {
boost::filesystem::path temporaryFolder(forecastJob.s_TemporaryFolder);

if (this->sufficientAvailableDiskSpace(temporaryFolder) == false) {
if (this->sufficientAvailableDiskSpace(forecastJob.s_MinForecastAvailableDiskSpace,
temporaryFolder) == false) {
this->sendErrorMessage(forecastJob, ERROR_MEMORY_LIMIT_DISKSPACE);
return false;
}
Expand Down Expand Up @@ -415,7 +411,7 @@ bool CForecastRunner::parseAndValidateForecastRequest(const std::string& control
std::istringstream stringStream(controlMessage.substr(1));
forecastJob.s_StartTime = lastResultsTime;

core_t::TTime expiresIn = 0l;
core_t::TTime expiresIn = 0;
boost::property_tree::ptree properties;
try {
boost::property_tree::read_json(stringStream, properties);
Expand All @@ -427,14 +423,17 @@ bool CForecastRunner::parseAndValidateForecastRequest(const std::string& control
forecastJob.s_CreateTime = properties.get<core_t::TTime>("create_time", 0);
forecastJob.s_MaxForecastModelMemory = properties.get<std::size_t>(
"max_model_memory", DEFAULT_MAX_FORECAST_MODEL_MEMORY);
forecastJob.s_MinForecastAvailableDiskSpace = properties.get<std::size_t>(
"min_available_disk_space", DEFAULT_MIN_FORECAST_AVAILABLE_DISK_SPACE);

// tmp storage if available
forecastJob.s_TemporaryFolder = properties.get<std::string>("tmp_storage", EMPTY_STRING);
// use -1 as default to allow 0 as 'never expires'
expiresIn = properties.get<core_t::TTime>("expires_in", -1l);

// note: this is not exposed on the Java side
forecastJob.s_BoundsPercentile = properties.get<double>("boundspercentile", 95.0);
forecastJob.s_BoundsPercentile =
properties.get<double>("boundspercentile", DEFAULT_BOUNDS_PERCENTILE);
} catch (const std::exception& e) {
LOG_ERROR(<< ERROR_FORECAST_REQUEST_FAILED_TO_PARSE << e.what());
return false;
Expand All @@ -454,7 +453,7 @@ bool CForecastRunner::parseAndValidateForecastRequest(const std::string& control
return false;
}

if (lastResultsTime == 0l) {
if (lastResultsTime == 0) {
errorFunction(forecastJob, ERROR_NO_DATA_PROCESSED);
return false;
}
Expand All @@ -470,11 +469,11 @@ bool CForecastRunner::parseAndValidateForecastRequest(const std::string& control
LOG_INFO(<< INFO_DEFAULT_DURATION);
}

if (expiresIn < -1l) {
if (expiresIn < -1) {
// only log
expiresIn = DEFAULT_EXPIRY_TIME;
LOG_INFO(<< WARNING_INVALID_EXPIRY);
} else if (expiresIn == -1l) {
} else if (expiresIn == -1) {
// only log
expiresIn = DEFAULT_EXPIRY_TIME;
LOG_DEBUG(<< INFO_DEFAULT_EXPIRY);
Expand All @@ -485,7 +484,8 @@ bool CForecastRunner::parseAndValidateForecastRequest(const std::string& control
return true;
}

bool CForecastRunner::sufficientAvailableDiskSpace(const boost::filesystem::path& path) {
bool CForecastRunner::sufficientAvailableDiskSpace(std::size_t minForecastAvailableDiskSpace,
const boost::filesystem::path& path) {
boost::system::error_code errorCode;
auto spaceInfo = boost::filesystem::space(path, errorCode);

Expand All @@ -495,8 +495,8 @@ bool CForecastRunner::sufficientAvailableDiskSpace(const boost::filesystem::path
return false;
}

if (spaceInfo.available < MIN_FORECAST_AVAILABLE_DISK_SPACE) {
LOG_WARN(<< "Checked disk space for " << path << " - required: " << MIN_FORECAST_AVAILABLE_DISK_SPACE
if (spaceInfo.available < minForecastAvailableDiskSpace) {
LOG_WARN(<< "Checked disk space for " << path << " - required: " << minForecastAvailableDiskSpace
<< ", available: " << spaceInfo.available);
return false;
}
Expand Down
44 changes: 38 additions & 6 deletions lib/api/unittest/CForecastRunnerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -333,23 +333,44 @@ BOOST_AUTO_TEST_CASE(testValidateMissingId) {
message, forecastJob, 1400000000) == false);
}

BOOST_AUTO_TEST_CASE(testValidateProvidedMinDiskSpace) {
ml::api::CForecastRunner::SForecast forecastJob;

std::string message{
"p{\"duration\":" + std::to_string(3 * ml::core::constants::WEEK) +
",\"forecast_id\": \"42\",\"create_time\": \"1511370819\",\"min_available_disk_space\": 100000}"};

BOOST_TEST_REQUIRE(ml::api::CForecastRunner::parseAndValidateForecastRequest(
message, forecastJob, 1400000000));
BOOST_REQUIRE_EQUAL(100000, forecastJob.s_MinForecastAvailableDiskSpace);

std::string message2{"p{\"duration\":" + std::to_string(3 * ml::core::constants::WEEK) +
",\"forecast_id\": \"42\",\"create_time\": \"1511370819\"}"};

BOOST_TEST_REQUIRE(ml::api::CForecastRunner::parseAndValidateForecastRequest(
message2, forecastJob, 1400000000));
BOOST_REQUIRE_EQUAL(ml::api::CForecastRunner::DEFAULT_MIN_FORECAST_AVAILABLE_DISK_SPACE,
forecastJob.s_MinForecastAvailableDiskSpace);
}

BOOST_AUTO_TEST_CASE(testValidateProvidedMaxMemoryLimit) {
ml::api::CForecastRunner::SForecast forecastJob;

std::string message(
std::string message{
"p{\"duration\":" + std::to_string(3 * ml::core::constants::WEEK) +
",\"forecast_id\": \"42\",\"create_time\": \"1511370819\",\"max_model_memory\": 10000000}");
",\"forecast_id\": \"42\",\"create_time\": \"1511370819\",\"max_model_memory\": 10000000}"};

BOOST_TEST_REQUIRE(ml::api::CForecastRunner::parseAndValidateForecastRequest(
message, forecastJob, 1400000000));
BOOST_REQUIRE_EQUAL(forecastJob.s_MaxForecastModelMemory, static_cast<size_t>(10000000));
BOOST_REQUIRE_EQUAL(10000000, forecastJob.s_MaxForecastModelMemory);

std::string message2("p{\"duration\":" + std::to_string(3 * ml::core::constants::WEEK) +
",\"forecast_id\": \"42\",\"create_time\": \"1511370819\"}");
std::string message2{"p{\"duration\":" + std::to_string(3 * ml::core::constants::WEEK) +
",\"forecast_id\": \"42\",\"create_time\": \"1511370819\"}"};

BOOST_TEST_REQUIRE(ml::api::CForecastRunner::parseAndValidateForecastRequest(
message2, forecastJob, 1400000000));
BOOST_REQUIRE_EQUAL(forecastJob.s_MaxForecastModelMemory, 20971520ull);
BOOST_REQUIRE_EQUAL(ml::api::CForecastRunner::DEFAULT_MAX_FORECAST_MODEL_MEMORY,
forecastJob.s_MaxForecastModelMemory);
}

BOOST_AUTO_TEST_CASE(testValidateProvidedTooLargeMaxMemoryLimit) {
Expand Down Expand Up @@ -384,4 +405,15 @@ BOOST_AUTO_TEST_CASE(testValidateProvidedTooLargeMaxMemoryLimit) {
}));
}

BOOST_AUTO_TEST_CASE(testSufficientDiskSpace) {

// These tests could theoretically fail based on environmental factors, but
// it's unlikely - they are saying the current directory must have at least
// 1 byte free disk space and less than 16 exabytes free
BOOST_REQUIRE_EQUAL(
true, ml::api::CForecastRunner::sufficientAvailableDiskSpace(1, "."));
BOOST_REQUIRE_EQUAL(false, ml::api::CForecastRunner::sufficientAvailableDiskSpace(
std::numeric_limits<std::size_t>::max(), "."));
}

BOOST_AUTO_TEST_SUITE_END()