Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,10 @@ public void testContinousEvents() throws Exception {

// start all transforms, wait until the processed all data and stop them
startTransforms();
waitUntilTransformsReachedUpperBound(runDate.getEpochSecond() * 1000 + 1);

// at random we added between 0 and 999_999ns == (1ms - 1ns) to every data point, so we add 1ms, so every data point is before
// the checkpoint
waitUntilTransformsReachedUpperBound(runDate.toEpochMilli() + 1, run);
stopTransforms();

// TODO: the transform dest index requires a refresh, see gh#51154
Expand Down Expand Up @@ -387,11 +390,12 @@ private GetTransformStatsResponse getTransformStats(String id) throws IOExceptio
}
}

private void waitUntilTransformsReachedUpperBound(long timeStampUpperBoundMillis) throws Exception {
private void waitUntilTransformsReachedUpperBound(long timeStampUpperBoundMillis, int iteration) throws Exception {
logger.info(
"wait until transform reaches timestamp_millis: {}",
"wait until transform reaches timestamp_millis: {} iteration: {}",
ContinuousTestCase.STRICT_DATE_OPTIONAL_TIME_PRINTER_NANOS.withZone(ZoneId.of("UTC"))
.format(Instant.ofEpochMilli(timeStampUpperBoundMillis))
.format(Instant.ofEpochMilli(timeStampUpperBoundMillis)),
iteration
);
for (ContinuousTestCase testCase : transformTestCases) {
assertBusy(() -> {
Expand Down