|
76 | 76 | import java.util.Arrays; |
77 | 77 | import java.util.Collection; |
78 | 78 | import java.util.Collections; |
| 79 | +import java.util.Comparator; |
79 | 80 | import java.util.EnumSet; |
80 | 81 | import java.util.HashMap; |
81 | 82 | import java.util.HashSet; |
| 83 | +import java.util.Iterator; |
82 | 84 | import java.util.LinkedHashMap; |
83 | 85 | import java.util.LinkedList; |
84 | 86 | import java.util.List; |
@@ -141,6 +143,7 @@ public class EntityGroupFSTimelineStore extends CompositeService |
141 | 143 | private long unknownActiveMillis; |
142 | 144 | private int appCacheMaxSize = 0; |
143 | 145 | private boolean recoveryEnabled; |
| 146 | + private boolean isAppendSupported; |
144 | 147 | private Path checkpointFile; |
145 | 148 | private ConcurrentMap<String, Pair<Long, Long>> recoveredLogs = |
146 | 149 | new ConcurrentHashMap<String, Pair<Long, Long>>(); |
@@ -223,6 +226,10 @@ protected boolean removeEldestEntry( |
223 | 226 | YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RECOVERY_ENABLED, |
224 | 227 | YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RECOVERY_ENABLED_DEFAULT); |
225 | 228 |
|
| 229 | + isAppendSupported = conf.getBoolean( |
| 230 | + YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND, |
| 231 | + YarnConfiguration.TIMELINE_SERVICE_ENTITYFILE_FS_SUPPORT_APPEND_DEFAULT); |
| 232 | + |
226 | 233 | aclsEnabled = conf.getBoolean(YarnConfiguration.YARN_ACL_ENABLE, |
227 | 234 | YarnConfiguration.DEFAULT_YARN_ACL_ENABLE); |
228 | 235 | CallerContext.setCurrent( |
@@ -842,11 +849,28 @@ long scanForLogs() throws IOException { |
842 | 849 | } |
843 | 850 | String attemptDirName = statAttempt.getPath().getName(); |
844 | 851 | RemoteIterator<FileStatus> iterCache = list(statAttempt.getPath()); |
| 852 | + List<FileStatus> files = new ArrayList<>(); |
845 | 853 | while (iterCache.hasNext()) { |
846 | 854 | FileStatus statCache = iterCache.next(); |
847 | 855 | if (!statCache.isFile()) { |
848 | 856 | continue; |
849 | 857 | } |
| 858 | + files.add(statCache); |
| 859 | + } |
| 860 | + if (isAppendSupported) { |
| 861 | + Collections.sort(files, new Comparator<FileStatus>() { |
| 862 | + @Override |
| 863 | + public int compare(FileStatus o1, FileStatus o2) { |
| 864 | + String[] ts1 = o1.getPath().getName().split("_"); |
| 865 | + String[] ts2 = o2.getPath().getName().split("_"); |
| 866 | + return (Integer.parseInt(ts1[ts1.length - 1]) - Integer.parseInt(ts2[ts2.length - 1])); |
| 867 | + } |
| 868 | + }); |
| 869 | + } |
| 870 | + Iterator<FileStatus> fileIterator = files.iterator(); |
| 871 | + |
| 872 | + while (fileIterator.hasNext()) { |
| 873 | + FileStatus statCache = fileIterator.next(); |
850 | 874 | String filename = statCache.getPath().getName(); |
851 | 875 | String owner = statCache.getOwner(); |
852 | 876 | //YARN-10884:Owner of File is set to Null on WASB Append Operation.ATS fails to read such |
|
0 commit comments