|
35 | 35 | import java.net.URISyntaxException; |
36 | 36 | import java.net.URLDecoder; |
37 | 37 | import java.util.*; |
| 38 | +import java.util.concurrent.ConcurrentHashMap; |
38 | 39 |
|
39 | 40 | import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; |
40 | 41 |
|
@@ -513,41 +514,22 @@ private void fileStatusesInIndex(HarStatus parent, List<FileStatus> statuses) |
513 | 514 | if (!parentString.endsWith(Path.SEPARATOR)){ |
514 | 515 | parentString += Path.SEPARATOR; |
515 | 516 | } |
516 | | - Path harPath = new Path(parentString); |
517 | | - int harlen = harPath.depth(); |
518 | | - final Map<String, FileStatus> cache = new TreeMap<String, FileStatus>(); |
519 | | - |
520 | | - for (HarStatus hstatus : metadata.archive.values()) { |
521 | | - String child = hstatus.getName(); |
522 | | - if ((child.startsWith(parentString))) { |
523 | | - Path thisPath = new Path(child); |
524 | | - if (thisPath.depth() == harlen + 1) { |
525 | | - statuses.add(toFileStatus(hstatus, cache)); |
526 | | - } |
527 | | - } |
| 517 | + |
| 518 | + for (String child: parent.children) { |
| 519 | + Path p = new Path(parentString + child); |
| 520 | + statuses.add(toFileStatus(metadata.archive.get(p))); |
528 | 521 | } |
529 | 522 | } |
530 | 523 |
|
531 | 524 | /** |
532 | 525 | * Combine the status stored in the index and the underlying status. |
533 | 526 | * @param h status stored in the index |
534 | | - * @param cache caching the underlying file statuses |
535 | 527 | * @return the combined file status |
536 | 528 | * @throws IOException |
537 | 529 | */ |
538 | | - private FileStatus toFileStatus(HarStatus h, |
539 | | - Map<String, FileStatus> cache) throws IOException { |
540 | | - FileStatus underlying = null; |
541 | | - if (cache != null) { |
542 | | - underlying = cache.get(h.partName); |
543 | | - } |
544 | | - if (underlying == null) { |
545 | | - final Path p = h.isDir? archivePath: new Path(archivePath, h.partName); |
546 | | - underlying = fs.getFileStatus(p); |
547 | | - if (cache != null) { |
548 | | - cache.put(h.partName, underlying); |
549 | | - } |
550 | | - } |
| 530 | + private FileStatus toFileStatus(HarStatus h) throws IOException { |
| 531 | + final Path p = h.isDir ? archivePath : new Path(archivePath, h.partName); |
| 532 | + FileStatus underlying = metadata.getPartFileStatus(p); |
551 | 533 |
|
552 | 534 | long modTime = 0; |
553 | 535 | int version = metadata.getVersion(); |
@@ -658,7 +640,7 @@ public long getModificationTime() { |
658 | 640 | @Override |
659 | 641 | public FileStatus getFileStatus(Path f) throws IOException { |
660 | 642 | HarStatus hstatus = getFileHarStatus(f); |
661 | | - return toFileStatus(hstatus, null); |
| 643 | + return toFileStatus(hstatus); |
662 | 644 | } |
663 | 645 |
|
664 | 646 | private HarStatus getFileHarStatus(Path f) throws IOException { |
@@ -815,7 +797,7 @@ public FileStatus[] listStatus(Path f) throws IOException { |
815 | 797 | if (hstatus.isDir()) { |
816 | 798 | fileStatusesInIndex(hstatus, statuses); |
817 | 799 | } else { |
818 | | - statuses.add(toFileStatus(hstatus, null)); |
| 800 | + statuses.add(toFileStatus(hstatus)); |
819 | 801 | } |
820 | 802 |
|
821 | 803 | return statuses.toArray(new FileStatus[statuses.size()]); |
@@ -1143,24 +1125,32 @@ private class HarMetaData { |
1143 | 1125 |
|
1144 | 1126 | List<Store> stores = new ArrayList<Store>(); |
1145 | 1127 | Map<Path, HarStatus> archive = new HashMap<Path, HarStatus>(); |
1146 | | - private Map<Path, FileStatus> partFileStatuses = new HashMap<Path, FileStatus>(); |
| 1128 | + // keys are always the internal har path. |
| 1129 | + private Map<Path, FileStatus> partFileStatuses = new ConcurrentHashMap<>(); |
1147 | 1130 |
|
1148 | 1131 | public HarMetaData(FileSystem fs, Path masterIndexPath, Path archiveIndexPath) { |
1149 | 1132 | this.fs = fs; |
1150 | 1133 | this.masterIndexPath = masterIndexPath; |
1151 | 1134 | this.archiveIndexPath = archiveIndexPath; |
1152 | 1135 | } |
1153 | 1136 |
|
1154 | | - public FileStatus getPartFileStatus(Path partPath) throws IOException { |
| 1137 | + public FileStatus getPartFileStatus(Path path) throws IOException { |
| 1138 | + Path partPath = getPathInHar(path); |
1155 | 1139 | FileStatus status; |
1156 | 1140 | status = partFileStatuses.get(partPath); |
1157 | 1141 | if (status == null) { |
1158 | | - status = fs.getFileStatus(partPath); |
| 1142 | + status = fs.getFileStatus(path); |
1159 | 1143 | partFileStatuses.put(partPath, status); |
1160 | 1144 | } |
1161 | 1145 | return status; |
1162 | 1146 | } |
1163 | 1147 |
|
| 1148 | + private void addPartFileStatuses(Path path) throws IOException { |
| 1149 | + for (FileStatus stat : fs.listStatus(path)) { |
| 1150 | + partFileStatuses.put(getPathInHar(stat.getPath()), stat); |
| 1151 | + } |
| 1152 | + } |
| 1153 | + |
1164 | 1154 | public long getMasterIndexTimestamp() { |
1165 | 1155 | return masterIndexTimestamp; |
1166 | 1156 | } |
@@ -1217,16 +1207,22 @@ private void parseMetaData() throws IOException { |
1217 | 1207 | try { |
1218 | 1208 | FileStatus archiveStat = fs.getFileStatus(archiveIndexPath); |
1219 | 1209 | archiveIndexTimestamp = archiveStat.getModificationTime(); |
1220 | | - LineReader aLin; |
| 1210 | + |
| 1211 | + // pre-populate part cache. |
| 1212 | + addPartFileStatuses(archiveIndexPath.getParent()); |
| 1213 | + LineReader aLin = null; |
1221 | 1214 |
|
1222 | 1215 | // now start reading the real index file |
| 1216 | + long pos = -1; |
1223 | 1217 | for (Store s: stores) { |
1224 | | - read = 0; |
1225 | | - aIn.seek(s.begin); |
1226 | | - aLin = new LineReader(aIn, getConf()); |
1227 | | - while (read + s.begin < s.end) { |
1228 | | - int tmp = aLin.readLine(line); |
1229 | | - read += tmp; |
| 1218 | + if (pos != s.begin) { |
| 1219 | + pos = s.begin; |
| 1220 | + aIn.seek(s.begin); |
| 1221 | + aLin = new LineReader(aIn, getConf()); |
| 1222 | + } |
| 1223 | + |
| 1224 | + while (pos < s.end) { |
| 1225 | + pos += aLin.readLine(line); |
1230 | 1226 | String lineFeed = line.toString(); |
1231 | 1227 | String[] parsed = lineFeed.split(" "); |
1232 | 1228 | parsed[0] = decodeFileName(parsed[0]); |
|
0 commit comments