|  | 
| 35 | 35 | import java.net.URISyntaxException; | 
| 36 | 36 | import java.net.URLDecoder; | 
| 37 | 37 | import java.util.*; | 
|  | 38 | +import java.util.concurrent.ConcurrentHashMap; | 
| 38 | 39 | 
 | 
| 39 | 40 | import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; | 
| 40 | 41 | 
 | 
| @@ -513,41 +514,22 @@ private void fileStatusesInIndex(HarStatus parent, List<FileStatus> statuses) | 
| 513 | 514 |     if (!parentString.endsWith(Path.SEPARATOR)){ | 
| 514 | 515 |         parentString += Path.SEPARATOR; | 
| 515 | 516 |     } | 
| 516 |  | -    Path harPath = new Path(parentString); | 
| 517 |  | -    int harlen = harPath.depth(); | 
| 518 |  | -    final Map<String, FileStatus> cache = new TreeMap<String, FileStatus>(); | 
| 519 |  | - | 
| 520 |  | -    for (HarStatus hstatus : metadata.archive.values()) { | 
| 521 |  | -      String child = hstatus.getName(); | 
| 522 |  | -      if ((child.startsWith(parentString))) { | 
| 523 |  | -        Path thisPath = new Path(child); | 
| 524 |  | -        if (thisPath.depth() == harlen + 1) { | 
| 525 |  | -          statuses.add(toFileStatus(hstatus, cache)); | 
| 526 |  | -        } | 
| 527 |  | -      } | 
|  | 517 | + | 
|  | 518 | +    for (String child: parent.children) { | 
|  | 519 | +      Path p = new Path(parentString + child); | 
|  | 520 | +      statuses.add(toFileStatus(metadata.archive.get(p))); | 
| 528 | 521 |     } | 
| 529 | 522 |   } | 
| 530 | 523 | 
 | 
| 531 | 524 |   /** | 
| 532 | 525 |    * Combine the status stored in the index and the underlying status.  | 
| 533 | 526 |    * @param h status stored in the index | 
| 534 |  | -   * @param cache caching the underlying file statuses | 
| 535 | 527 |    * @return the combined file status | 
| 536 | 528 |    * @throws IOException | 
| 537 | 529 |    */ | 
| 538 |  | -  private FileStatus toFileStatus(HarStatus h, | 
| 539 |  | -      Map<String, FileStatus> cache) throws IOException { | 
| 540 |  | -    FileStatus underlying = null; | 
| 541 |  | -    if (cache != null) { | 
| 542 |  | -      underlying = cache.get(h.partName); | 
| 543 |  | -    } | 
| 544 |  | -    if (underlying == null) { | 
| 545 |  | -      final Path p = h.isDir? archivePath: new Path(archivePath, h.partName); | 
| 546 |  | -      underlying = fs.getFileStatus(p); | 
| 547 |  | -      if (cache != null) { | 
| 548 |  | -        cache.put(h.partName, underlying); | 
| 549 |  | -      } | 
| 550 |  | -    } | 
|  | 530 | +  private FileStatus toFileStatus(HarStatus h) throws IOException { | 
|  | 531 | +    final Path p = h.isDir ? archivePath : new Path(archivePath, h.partName); | 
|  | 532 | +    FileStatus underlying = metadata.getPartFileStatus(p); | 
| 551 | 533 | 
 | 
| 552 | 534 |     long modTime = 0; | 
| 553 | 535 |     int version = metadata.getVersion(); | 
| @@ -658,7 +640,7 @@ public long getModificationTime() { | 
| 658 | 640 |   @Override | 
| 659 | 641 |   public FileStatus getFileStatus(Path f) throws IOException { | 
| 660 | 642 |     HarStatus hstatus = getFileHarStatus(f); | 
| 661 |  | -    return toFileStatus(hstatus, null); | 
|  | 643 | +    return toFileStatus(hstatus); | 
| 662 | 644 |   } | 
| 663 | 645 | 
 | 
| 664 | 646 |   private HarStatus getFileHarStatus(Path f) throws IOException { | 
| @@ -815,7 +797,7 @@ public FileStatus[] listStatus(Path f) throws IOException { | 
| 815 | 797 |     if (hstatus.isDir()) { | 
| 816 | 798 |       fileStatusesInIndex(hstatus, statuses); | 
| 817 | 799 |     } else { | 
| 818 |  | -      statuses.add(toFileStatus(hstatus, null)); | 
|  | 800 | +      statuses.add(toFileStatus(hstatus)); | 
| 819 | 801 |     } | 
| 820 | 802 | 
 | 
| 821 | 803 |     return statuses.toArray(new FileStatus[statuses.size()]); | 
| @@ -1143,24 +1125,32 @@ private class HarMetaData { | 
| 1143 | 1125 | 
 | 
| 1144 | 1126 |     List<Store> stores = new ArrayList<Store>(); | 
| 1145 | 1127 |     Map<Path, HarStatus> archive = new HashMap<Path, HarStatus>(); | 
| 1146 |  | -    private Map<Path, FileStatus> partFileStatuses = new HashMap<Path, FileStatus>(); | 
|  | 1128 | +    // keys are always the internal har path. | 
|  | 1129 | +    private Map<Path, FileStatus> partFileStatuses = new ConcurrentHashMap<>(); | 
| 1147 | 1130 | 
 | 
| 1148 | 1131 |     public HarMetaData(FileSystem fs, Path masterIndexPath, Path archiveIndexPath) { | 
| 1149 | 1132 |       this.fs = fs; | 
| 1150 | 1133 |       this.masterIndexPath = masterIndexPath; | 
| 1151 | 1134 |       this.archiveIndexPath = archiveIndexPath; | 
| 1152 | 1135 |     } | 
| 1153 | 1136 | 
 | 
| 1154 |  | -    public FileStatus getPartFileStatus(Path partPath) throws IOException { | 
|  | 1137 | +    public FileStatus getPartFileStatus(Path path) throws IOException { | 
|  | 1138 | +      Path partPath = getPathInHar(path); | 
| 1155 | 1139 |       FileStatus status; | 
| 1156 | 1140 |       status = partFileStatuses.get(partPath); | 
| 1157 | 1141 |       if (status == null) { | 
| 1158 |  | -        status = fs.getFileStatus(partPath); | 
|  | 1142 | +        status = fs.getFileStatus(path); | 
| 1159 | 1143 |         partFileStatuses.put(partPath, status); | 
| 1160 | 1144 |       } | 
| 1161 | 1145 |       return status; | 
| 1162 | 1146 |     } | 
| 1163 | 1147 | 
 | 
|  | 1148 | +    private void addPartFileStatuses(Path path) throws IOException { | 
|  | 1149 | +      for (FileStatus stat : fs.listStatus(path)) { | 
|  | 1150 | +        partFileStatuses.put(getPathInHar(stat.getPath()), stat); | 
|  | 1151 | +      } | 
|  | 1152 | +    } | 
|  | 1153 | + | 
| 1164 | 1154 |     public long getMasterIndexTimestamp() { | 
| 1165 | 1155 |       return masterIndexTimestamp; | 
| 1166 | 1156 |     } | 
| @@ -1217,16 +1207,22 @@ private void parseMetaData() throws IOException { | 
| 1217 | 1207 |       try { | 
| 1218 | 1208 |         FileStatus archiveStat = fs.getFileStatus(archiveIndexPath); | 
| 1219 | 1209 |         archiveIndexTimestamp = archiveStat.getModificationTime(); | 
| 1220 |  | -        LineReader aLin; | 
|  | 1210 | + | 
|  | 1211 | +        // pre-populate part cache. | 
|  | 1212 | +        addPartFileStatuses(archiveIndexPath.getParent()); | 
|  | 1213 | +        LineReader aLin = null; | 
| 1221 | 1214 | 
 | 
| 1222 | 1215 |         // now start reading the real index file | 
|  | 1216 | +        long pos = -1; | 
| 1223 | 1217 |         for (Store s: stores) { | 
| 1224 |  | -          read = 0; | 
| 1225 |  | -          aIn.seek(s.begin); | 
| 1226 |  | -          aLin = new LineReader(aIn, getConf()); | 
| 1227 |  | -          while (read + s.begin < s.end) { | 
| 1228 |  | -            int tmp = aLin.readLine(line); | 
| 1229 |  | -            read += tmp; | 
|  | 1218 | +          if (pos != s.begin) { | 
|  | 1219 | +            pos = s.begin; | 
|  | 1220 | +            aIn.seek(s.begin); | 
|  | 1221 | +            aLin = new LineReader(aIn, getConf()); | 
|  | 1222 | +          } | 
|  | 1223 | + | 
|  | 1224 | +          while (pos < s.end) { | 
|  | 1225 | +            pos += aLin.readLine(line); | 
| 1230 | 1226 |             String lineFeed = line.toString(); | 
| 1231 | 1227 |             String[] parsed = lineFeed.split(" "); | 
| 1232 | 1228 |             parsed[0] = decodeFileName(parsed[0]); | 
|  | 
0 commit comments