Skip to content

Commit 551192f

Browse files
rmatharuatoomula
authored andcommitted
Consolidating offset read and write for store-offsets and side-inputs, maintaining backward compatbility
This consolidates the two different kinds of offsets used by stores and side-inputs into one. The code can still read and make sense of store-offset files written in the old format. The new format stores a map of ssp to offset, rather than a singular string. Updated tests. And added tests to ensure the reading of the old-format offset still works. Author: Ray Matharu <[email protected]> Reviewers: mynameborat Closes apache#915 from rmatharu/test-offset and squashes the following commits: c20e94c4 [Ray Matharu] minor 403f72b1 [Ray Matharu] simplifying code b5868066 [Ray Matharu] minor ca648223 [Ray Matharu] gradle.props b50e5fe [Ray Matharu] Applying comments 75905b0c [Ray Matharu] minor a8d1dcdc [Ray Matharu] Addressing review comments 1f7ecdcc [Ray Matharu] minor 666157ad [Ray Matharu] test fix d621a51c [Ray Matharu] Adding test for read of old format b9e58997 [Ray Matharu] build fix 9cb822ca [Ray Matharu] bug fix 5484e430 [Ray Matharu] Consolidating writing of offsets 55f35b7b [Ray Matharu] minor 88963a8c [Ray Matharu] minor f14ee03b [Ray Matharu] Consolidating read of offset file 12ca96bb [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza ee7daac8 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza 08006871 [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza 916f66ae [Ray Matharu] Merge branch 'master' of https://github.com/apache/samza 2c09b081 [Ray Matharu] Rocksdb bug fix
1 parent 63abbd2 commit 551192f

File tree

7 files changed

+127
-75
lines changed

7 files changed

+127
-75
lines changed

samza-core/src/main/java/org/apache/samza/storage/StorageManagerUtil.java

Lines changed: 64 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,34 @@
2121

2222
import com.google.common.collect.ImmutableMap;
2323
import java.io.File;
24+
import java.io.IOException;
25+
import java.util.HashMap;
26+
import java.util.Map;
27+
import java.util.Set;
28+
29+
import java.util.stream.Collectors;
2430
import org.apache.samza.container.TaskName;
31+
import org.apache.samza.serializers.model.SamzaObjectMapper;
2532
import org.apache.samza.system.SystemAdmin;
2633
import org.apache.samza.system.SystemStreamPartition;
2734
import org.apache.samza.util.FileUtil;
35+
import org.codehaus.jackson.JsonParseException;
36+
import org.codehaus.jackson.map.JsonMappingException;
37+
import org.codehaus.jackson.map.ObjectMapper;
38+
import org.codehaus.jackson.map.ObjectWriter;
39+
import org.codehaus.jackson.type.TypeReference;
2840
import org.slf4j.Logger;
2941
import org.slf4j.LoggerFactory;
3042

3143

3244
public class StorageManagerUtil {
3345
private static final Logger LOG = LoggerFactory.getLogger(StorageManagerUtil.class);
46+
public static final String OFFSET_FILE_NAME = "OFFSET";
47+
private static final ObjectMapper OBJECT_MAPPER = SamzaObjectMapper.getObjectMapper();
48+
private static final TypeReference<Map<SystemStreamPartition, String>> OFFSETS_TYPE_REFERENCE =
49+
new TypeReference<Map<SystemStreamPartition, String>>() { };
50+
private static final ObjectWriter OBJECT_WRITER = OBJECT_MAPPER.writerWithType(OFFSETS_TYPE_REFERENCE);
51+
3452

3553
/**
3654
* Fetch the starting offset for the input {@link SystemStreamPartition}
@@ -70,17 +88,15 @@ public static String getStartingOffset(
7088
* the {@code storeDeleteRetentionInMs}, then the store is considered stale.
7189
*
7290
* @param storeDir the base directory of the store
73-
* @param offsetFileName the offset file name
7491
* @param storeDeleteRetentionInMs store delete retention in millis
7592
* @param currentTimeMs current time in ms
7693
* @return true if the store is stale, false otherwise
7794
*/
78-
public static boolean isStaleStore(
79-
File storeDir, String offsetFileName, long storeDeleteRetentionInMs, long currentTimeMs) {
95+
public static boolean isStaleStore(File storeDir, long storeDeleteRetentionInMs, long currentTimeMs) {
8096
boolean isStaleStore = false;
8197
String storePath = storeDir.toPath().toString();
8298
if (storeDir.exists()) {
83-
File offsetFileRef = new File(storeDir, offsetFileName);
99+
File offsetFileRef = new File(storeDir, OFFSET_FILE_NAME);
84100
long offsetFileLastModifiedTime = offsetFileRef.lastModified();
85101
if ((currentTimeMs - offsetFileLastModifiedTime) >= storeDeleteRetentionInMs) {
86102
LOG.info(
@@ -98,14 +114,14 @@ public static boolean isStaleStore(
98114
* An offset file associated with logged store {@code storeDir} is valid if it exists and is not empty.
99115
*
100116
* @param storeDir the base directory of the store
101-
* @param offsetFileName name of the offset file
117+
* @param storeSSPs storeSSPs (if any) associated with the store
102118
* @return true if the offset file is valid. false otherwise.
103119
*/
104-
public static boolean isOffsetFileValid(File storeDir, String offsetFileName) {
120+
public static boolean isOffsetFileValid(File storeDir, Set<SystemStreamPartition> storeSSPs) {
105121
boolean hasValidOffsetFile = false;
106122
if (storeDir.exists()) {
107-
String offsetContents = readOffsetFile(storeDir, offsetFileName);
108-
if (offsetContents != null && !offsetContents.isEmpty()) {
123+
Map<SystemStreamPartition, String> offsetContents = readOffsetFile(storeDir, storeSSPs);
124+
if (offsetContents != null && !offsetContents.isEmpty() && offsetContents.keySet().equals(storeSSPs)) {
109125
hasValidOffsetFile = true;
110126
} else {
111127
LOG.info("Offset file is not valid for store: {}.", storeDir.toPath());
@@ -115,6 +131,34 @@ public static boolean isOffsetFileValid(File storeDir, String offsetFileName) {
115131
return hasValidOffsetFile;
116132
}
117133

134+
/**
135+
* Write the given SSP-Offset map into the offsets file.
136+
* @param storeBaseDir the base directory of the store
137+
* @param storeName the store name to use
138+
* @param taskName the task name which is referencing the store
139+
* @param offsets The SSP-offset to write
140+
* @throws IOException because of deserializing to json
141+
*/
142+
public static void writeOffsetFile(File storeBaseDir, String storeName, TaskName taskName,
143+
Map<SystemStreamPartition, String> offsets) throws IOException {
144+
File offsetFile = new File(getStorePartitionDir(storeBaseDir, storeName, taskName), OFFSET_FILE_NAME);
145+
String fileContents = OBJECT_WRITER.writeValueAsString(offsets);
146+
FileUtil.writeWithChecksum(offsetFile, fileContents);
147+
}
148+
149+
/**
150+
* Delete the offset file for this task and store, if one exists.
151+
* @param storeBaseDir the base directory of the store
152+
* @param storeName the store name to use
153+
* @param taskName the task name which is referencing the store
154+
*/
155+
public static void deleteOffsetFile(File storeBaseDir, String storeName, TaskName taskName) {
156+
File offsetFile = new File(getStorePartitionDir(storeBaseDir, storeName, taskName), OFFSET_FILE_NAME);
157+
if (offsetFile.exists()) {
158+
FileUtil.rm(offsetFile);
159+
}
160+
}
161+
118162
/**
119163
* Check if a store's disk footprint exists.
120164
*
@@ -129,26 +173,32 @@ public static boolean storeExists(File storeDir) {
129173
* Read and return the contents of the offset file.
130174
*
131175
* @param storagePartitionDir the base directory of the store
132-
* @param offsetFileName name of the offset file
176+
* @param storeSSPs SSPs associated with the store (if any)
133177
* @return the content of the offset file if it exists for the store, null otherwise.
134178
*/
135-
public static String readOffsetFile(File storagePartitionDir, String offsetFileName) {
136-
String offset = null;
137-
File offsetFileRef = new File(storagePartitionDir, offsetFileName);
179+
public static Map<SystemStreamPartition, String> readOffsetFile(File storagePartitionDir, Set<SystemStreamPartition> storeSSPs) {
180+
Map<SystemStreamPartition, String> offsets = new HashMap<>();
181+
String fileContents = null;
182+
File offsetFileRef = new File(storagePartitionDir, OFFSET_FILE_NAME);
138183
String storePath = storagePartitionDir.getPath();
139184

140185
if (offsetFileRef.exists()) {
141186
LOG.info("Found offset file in storage partition directory: {}", storePath);
142187
try {
143-
offset = FileUtil.readWithChecksum(offsetFileRef);
188+
fileContents = FileUtil.readWithChecksum(offsetFileRef);
189+
offsets = OBJECT_MAPPER.readValue(fileContents, OFFSETS_TYPE_REFERENCE);
190+
} catch (JsonParseException | JsonMappingException e) {
191+
LOG.info("Exception in json-parsing offset file {} {}, reading as string offset-value", storagePartitionDir.toPath(), OFFSET_FILE_NAME);
192+
final String finalFileContents = fileContents;
193+
offsets = (storeSSPs.size() == 1) ? storeSSPs.stream().collect(Collectors.toMap(ssp -> ssp, offset -> finalFileContents)) : offsets;
144194
} catch (Exception e) {
145195
LOG.warn("Failed to read offset file in storage partition directory: {}", storePath, e);
146196
}
147197
} else {
148198
LOG.info("No offset file found in storage partition directory: {}", storePath);
149199
}
150200

151-
return offset;
201+
return offsets;
152202
}
153203

154204
/**

samza-core/src/main/java/org/apache/samza/storage/TaskSideInputStorageManager.java

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
import org.apache.samza.SamzaException;
4040
import org.apache.samza.config.Config;
4141
import org.apache.samza.container.TaskName;
42-
import org.apache.samza.serializers.model.SamzaObjectMapper;
4342
import org.apache.samza.storage.kv.Entry;
4443
import org.apache.samza.storage.kv.KeyValueStore;
4544
import org.apache.samza.system.IncomingMessageEnvelope;
@@ -51,9 +50,6 @@
5150
import org.apache.samza.util.Clock;
5251
import org.apache.samza.util.FileUtil;
5352

54-
import org.codehaus.jackson.map.ObjectMapper;
55-
import org.codehaus.jackson.map.ObjectWriter;
56-
import org.codehaus.jackson.type.TypeReference;
5753
import org.slf4j.Logger;
5854
import org.slf4j.LoggerFactory;
5955
import scala.collection.JavaConverters;
@@ -65,17 +61,12 @@
6561
*/
6662
public class TaskSideInputStorageManager {
6763
private static final Logger LOG = LoggerFactory.getLogger(TaskSideInputStorageManager.class);
68-
private static final String OFFSET_FILE = "SIDE-INPUT-OFFSETS";
6964
private static final long STORE_DELETE_RETENTION_MS = TimeUnit.DAYS.toMillis(1); // same as changelog delete retention
70-
private static final ObjectMapper OBJECT_MAPPER = SamzaObjectMapper.getObjectMapper();
71-
private static final TypeReference<HashMap<SystemStreamPartition, String>> OFFSETS_TYPE_REFERENCE =
72-
new TypeReference<HashMap<SystemStreamPartition, String>>() { };
73-
private static final ObjectWriter OBJECT_WRITER = OBJECT_MAPPER.writerWithType(OFFSETS_TYPE_REFERENCE);
7465

7566
private final Clock clock;
7667
private final Map<String, SideInputsProcessor> storeToProcessor;
7768
private final Map<String, StorageEngine> stores;
78-
private final String storeBaseDir;
69+
private final File storeBaseDir;
7970
private final Map<String, Set<SystemStreamPartition>> storeToSSps;
8071
private final Map<SystemStreamPartition, Set<String>> sspsToStores;
8172
private final StreamMetadataCache streamMetadataCache;
@@ -97,7 +88,7 @@ public TaskSideInputStorageManager(
9788
Clock clock) {
9889
this.clock = clock;
9990
this.stores = sideInputStores;
100-
this.storeBaseDir = storeBaseDir;
91+
this.storeBaseDir = new File(storeBaseDir);
10192
this.storeToSSps = storesToSSPs;
10293
this.streamMetadataCache = streamMetadataCache;
10394
this.systemAdmins = systemAdmins;
@@ -258,9 +249,7 @@ void writeOffsetFiles() {
258249
.collect(Collectors.toMap(Function.identity(), lastProcessedOffsets::get));
259250

260251
try {
261-
String fileContents = OBJECT_WRITER.writeValueAsString(offsets);
262-
File offsetFile = new File(getStoreLocation(storeName), OFFSET_FILE);
263-
FileUtil.writeWithChecksum(offsetFile, fileContents);
252+
StorageManagerUtil.writeOffsetFile(storeBaseDir, storeName, taskName, offsets);
264253
} catch (Exception e) {
265254
throw new SamzaException("Failed to write offset file for side input store: " + storeName, e);
266255
}
@@ -284,8 +273,8 @@ Map<SystemStreamPartition, String> getFileOffsets() {
284273
File storeLocation = getStoreLocation(storeName);
285274
if (isValidSideInputStore(storeName, storeLocation)) {
286275
try {
287-
String fileContents = StorageManagerUtil.readOffsetFile(storeLocation, OFFSET_FILE);
288-
Map<SystemStreamPartition, String> offsets = OBJECT_MAPPER.readValue(fileContents, OFFSETS_TYPE_REFERENCE);
276+
277+
Map<SystemStreamPartition, String> offsets = StorageManagerUtil.readOffsetFile(storeLocation, storeToSSps.get(storeName));
289278
fileOffsets.putAll(offsets);
290279
} catch (Exception e) {
291280
LOG.warn("Failed to load the offset file for side input store:" + storeName, e);
@@ -298,7 +287,7 @@ Map<SystemStreamPartition, String> getFileOffsets() {
298287

299288
@VisibleForTesting
300289
File getStoreLocation(String storeName) {
301-
return new File(storeBaseDir, (storeName + File.separator + taskName.toString()).replace(' ', '_'));
290+
return StorageManagerUtil.getStorePartitionDir(storeBaseDir, storeName, taskName);
302291
}
303292

304293
/**
@@ -368,8 +357,8 @@ Map<SystemStreamPartition, String> getOldestOffsets() {
368357

369358
private boolean isValidSideInputStore(String storeName, File storeLocation) {
370359
return isPersistedStore(storeName)
371-
&& !StorageManagerUtil.isStaleStore(storeLocation, OFFSET_FILE, STORE_DELETE_RETENTION_MS, clock.currentTimeMillis())
372-
&& StorageManagerUtil.isOffsetFileValid(storeLocation, OFFSET_FILE);
360+
&& !StorageManagerUtil.isStaleStore(storeLocation, STORE_DELETE_RETENTION_MS, clock.currentTimeMillis())
361+
&& StorageManagerUtil.isOffsetFileValid(storeLocation, storeToSSps.get(storeName));
373362
}
374363

375364
private boolean isPersistedStore(String storeName) {

samza-core/src/main/scala/org/apache/samza/storage/ContainerStorageManager.java

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.io.File;
2424
import java.nio.file.Path;
2525
import java.util.ArrayList;
26+
import java.util.Collections;
2627
import java.util.HashMap;
2728
import java.util.HashSet;
2829
import java.util.List;
@@ -466,8 +467,6 @@ public Void call() {
466467
* with the respective consumer, restoring stores, and stopping stores.
467468
*/
468469
private class TaskRestoreManager {
469-
470-
private final static String OFFSET_FILE_NAME = "OFFSET";
471470
private final Map<String, StorageEngine> taskStores; // Map of all StorageEngines for this task indexed by store name
472471
private final Set<String> taskStoresToRestore;
473472
// Set of store names which need to be restored by consuming using system-consumers (see registerStartingOffsets)
@@ -532,14 +531,14 @@ private void cleanBaseDirsAndReadOffsetFiles() {
532531
LOG.info("Deleting logged storage partition directory " + loggedStorePartitionDir.toPath().toString());
533532
FileUtil.rm(loggedStorePartitionDir);
534533
} else {
535-
String offset = StorageManagerUtil.readOffsetFile(loggedStorePartitionDir, OFFSET_FILE_NAME);
536-
LOG.info("Read offset " + offset + " for the store " + storeName + " from logged storage partition directory "
537-
+ loggedStorePartitionDir);
538-
539-
if (offset != null) {
540-
fileOffsets.put(
541-
new SystemStreamPartition(changelogSystemStreams.get(storeName), taskModel.getChangelogPartition()),
542-
offset);
534+
535+
SystemStreamPartition changelogSSP = new SystemStreamPartition(changelogSystemStreams.get(storeName), taskModel.getChangelogPartition());
536+
Map<SystemStreamPartition, String> offset =
537+
StorageManagerUtil.readOffsetFile(loggedStorePartitionDir, Collections.singleton(changelogSSP));
538+
LOG.info("Read offset {} for the store {} from logged storage partition directory {}", offset, storeName, loggedStorePartitionDir);
539+
540+
if (offset.containsKey(changelogSSP)) {
541+
fileOffsets.put(changelogSSP, offset.get(changelogSSP));
543542
}
544543
}
545544
});
@@ -562,9 +561,13 @@ private boolean isLoggedStoreValid(String storeName, File loggedStoreDir) {
562561
(long) new StorageConfig(config).getChangeLogDeleteRetentionsInMs().get(storeName).get();
563562
}
564563

565-
return this.taskStores.get(storeName).getStoreProperties().isPersistedToDisk()
566-
&& StorageManagerUtil.isOffsetFileValid(loggedStoreDir, OFFSET_FILE_NAME) && !StorageManagerUtil.isStaleStore(
567-
loggedStoreDir, OFFSET_FILE_NAME, changeLogDeleteRetentionInMs, clock.currentTimeMillis());
564+
if (changelogSystemStreams.containsKey(storeName)) {
565+
SystemStreamPartition changelogSSP = new SystemStreamPartition(changelogSystemStreams.get(storeName), taskModel.getChangelogPartition());
566+
return this.taskStores.get(storeName).getStoreProperties().isPersistedToDisk() && StorageManagerUtil.isOffsetFileValid(loggedStoreDir, Collections.singleton(changelogSSP))
567+
&& !StorageManagerUtil.isStaleStore(loggedStoreDir, changeLogDeleteRetentionInMs, clock.currentTimeMillis());
568+
}
569+
570+
return false;
568571
}
569572

570573
/**

samza-core/src/main/scala/org/apache/samza/storage/TaskStorageManager.scala

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@ class TaskStorageManager(
4545
case (storeName, storageEngine) => storageEngine.getStoreProperties.isPersistedToDisk
4646
}
4747

48-
val offsetFileName = "OFFSET"
49-
5048
def getStore(storeName: String): Option[StorageEngine] = JavaOptionals.toRichOptional(containerStorageManager.getStore(taskName, storeName)).toOption
5149

5250
def init {
@@ -90,17 +88,13 @@ class TaskStorageManager(
9088
val newestOffset = if (sspMetadata == null) null else sspMetadata.getNewestOffset
9189
debug("Got offset %s for store %s" format(newestOffset, storeName))
9290

93-
val loggedStorePartitionDir = StorageManagerUtil.getStorePartitionDir(loggedStoreBaseDir, storeName, taskName)
94-
val offsetFile = new File(loggedStorePartitionDir, offsetFileName)
9591
if (newestOffset != null) {
9692
debug("Storing offset for store in OFFSET file ")
97-
FileUtil.writeWithChecksum(offsetFile, newestOffset)
93+
StorageManagerUtil.writeOffsetFile(loggedStoreBaseDir, storeName, taskName, Map(ssp -> newestOffset).asJava)
9894
debug("Successfully stored offset %s for store %s in OFFSET file " format(newestOffset, storeName))
9995
} else {
10096
//if newestOffset is null, then it means the store is (or has become) empty. No need to persist the offset file
101-
if (offsetFile.exists()) {
102-
FileUtil.rm(offsetFile)
103-
}
97+
StorageManagerUtil.deleteOffsetFile(loggedStoreBaseDir, storeName, taskName);
10498
debug("Not storing OFFSET file for taskName %s. Store %s backed by changelog topic: %s, partition: %s is empty. " format (taskName, storeName, systemStream.getStream, partition.getPartitionId))
10599
}
106100
} catch {

0 commit comments

Comments
 (0)