2626import java .util .ArrayList ;
2727import java .util .Date ;
2828import java .util .HashMap ;
29- import java .util .HashSet ;
3029import java .util .List ;
31- import java .util .Set ;
30+ import java .util .Map ;
31+ import java .util .Optional ;
3232import java .util .function .Consumer ;
3333import org .apache .hadoop .conf .Configuration ;
3434import org .apache .hadoop .fs .FileStatus ;
3737import org .apache .hadoop .hbase .Cell ;
3838import org .apache .hadoop .hbase .KeyValue ;
3939import org .apache .hadoop .hbase .PrivateCellUtil ;
40+ import org .apache .hadoop .hbase .TableName ;
4041import org .apache .hadoop .hbase .regionserver .CellSink ;
4142import org .apache .hadoop .hbase .regionserver .HMobStore ;
4243import org .apache .hadoop .hbase .regionserver .HStore ;
44+ import org .apache .hadoop .hbase .regionserver .HStoreFile ;
4345import org .apache .hadoop .hbase .regionserver .InternalScanner ;
4446import org .apache .hadoop .hbase .regionserver .KeyValueScanner ;
4547import org .apache .hadoop .hbase .regionserver .ScanInfo ;
6264import org .slf4j .Logger ;
6365import org .slf4j .LoggerFactory ;
6466
67+ import org .apache .hbase .thirdparty .com .google .common .collect .HashMultimap ;
68+ import org .apache .hbase .thirdparty .com .google .common .collect .ImmutableSetMultimap ;
6569import org .apache .hbase .thirdparty .com .google .common .collect .Lists ;
70+ import org .apache .hbase .thirdparty .com .google .common .collect .SetMultimap ;
6671
6772/**
6873 * Compact passed set of files in the mob-enabled column family.
@@ -82,12 +87,8 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
8287 * compaction process.
8388 */
8489
85- static ThreadLocal <Set <String >> mobRefSet = new ThreadLocal <Set <String >>() {
86- @ Override
87- protected Set <String > initialValue () {
88- return new HashSet <String >();
89- }
90- };
90+ static ThreadLocal <SetMultimap <TableName , String >> mobRefSet =
91+ ThreadLocal .withInitial (HashMultimap ::create );
9192
9293 /*
9394 * Is it user or system-originated request.
@@ -192,34 +193,72 @@ public List<Path> compact(CompactionRequestImpl request,
192193 // Check if I/O optimized MOB compaction
193194 if (ioOptimizedMode ) {
194195 if (request .isMajor () && request .getPriority () == HStore .PRIORITY_USER ) {
195- Path mobDir =
196- MobUtils .getMobFamilyPath (conf , store .getTableName (), store .getColumnFamilyName ());
197- List <Path > mobFiles = MobUtils .getReferencedMobFiles (request .getFiles (), mobDir );
198- // reset disableIO
199- disableIO .set (Boolean .FALSE );
200- if (mobFiles .size () > 0 ) {
201- calculateMobLengthMap (mobFiles );
196+ try {
197+ final SetMultimap <TableName , String > mobRefs = request .getFiles ().stream ().map (file -> {
198+ byte [] value = file .getMetadataValue (HStoreFile .MOB_FILE_REFS );
199+ ImmutableSetMultimap .Builder <TableName , String > builder ;
200+ if (value == null ) {
201+ builder = ImmutableSetMultimap .builder ();
202+ } else {
203+ try {
204+ builder = MobUtils .deserializeMobFileRefs (value );
205+ } catch (RuntimeException exception ) {
206+ throw new RuntimeException ("failure getting mob references for hfile " + file ,
207+ exception );
208+ }
209+ }
210+ return builder ;
211+ }).reduce ((a , b ) -> a .putAll (b .build ())).orElseGet (ImmutableSetMultimap ::builder ).build ();
212+ // reset disableIO
213+ disableIO .set (Boolean .FALSE );
214+ if (!mobRefs .isEmpty ()) {
215+ calculateMobLengthMap (mobRefs );
216+ }
217+ LOG .info (
218+ "Table={} cf={} region={}. I/O optimized MOB compaction. "
219+ + "Total referenced MOB files: {}" ,
220+ tableName , familyName , regionName , mobRefs .size ());
221+ } catch (RuntimeException exception ) {
222+ throw new IOException ("Failed to get list of referenced hfiles for request " + request ,
223+ exception );
202224 }
203- LOG .info ("Table={} cf={} region={}. I/O optimized MOB compaction. "
204- + "Total referenced MOB files: {}" , tableName , familyName , regionName , mobFiles .size ());
205225 }
206226 }
207227
208228 return compact (request , scannerFactory , writerFactory , throughputController , user );
209229 }
210230
211- private void calculateMobLengthMap (List <Path > mobFiles ) throws IOException {
231+ /**
232+ * @param mobRefs multimap of original table name -> mob hfile
233+ */
234+ private void calculateMobLengthMap (SetMultimap <TableName , String > mobRefs ) throws IOException {
212235 FileSystem fs = store .getFileSystem ();
213236 HashMap <String , Long > map = mobLengthMap .get ();
214237 map .clear ();
215- for (Path p : mobFiles ) {
216- if (MobFileName .isOldMobFileName (p .getName ())) {
238+ for (Map .Entry <TableName , String > reference : mobRefs .entries ()) {
239+ final TableName table = reference .getKey ();
240+ final String mobfile = reference .getValue ();
241+ if (MobFileName .isOldMobFileName (mobfile )) {
217242 disableIO .set (Boolean .TRUE );
218243 }
219- FileStatus st = fs .getFileStatus (p );
220- long size = st .getLen ();
221- LOG .debug ("Referenced MOB file={} size={}" , p , size );
222- map .put (p .getName (), fs .getFileStatus (p ).getLen ());
244+ List <Path > locations = mobStore .getLocations (table );
245+ for (Path p : locations ) {
246+ try {
247+ FileStatus st = fs .getFileStatus (new Path (p , mobfile ));
248+ long size = st .getLen ();
249+ LOG .debug ("Referenced MOB file={} size={}" , mobfile , size );
250+ map .put (mobfile , size );
251+ break ;
252+ } catch (FileNotFoundException exception ) {
253+ LOG .debug ("Mob file {} was not in location {}. May have other locations to try." , mobfile ,
254+ p );
255+ }
256+ }
257+ if (!map .containsKey (mobfile )) {
258+ throw new FileNotFoundException ("Could not find mob file " + mobfile + " in the list of "
259+ + "expected locations: " + locations );
260+ }
261+
223262 }
224263 }
225264
@@ -395,8 +434,15 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
395434 // We leave large MOB file as is (is not compacted),
396435 // then we update set of MOB file references
397436 // and append mob cell directly to the store's writer
398- mobRefSet .get ().add (fName );
399- writer .append (mobCell );
437+ Optional <TableName > refTable = MobUtils .getTableName (c );
438+ if (refTable .isPresent ()) {
439+ mobRefSet .get ().put (refTable .get (), fName );
440+ writer .append (c );
441+ } else {
442+ throw new IOException ("MOB cell did not contain a tablename "
443+ + "tag. should not be possible. see ref guide on mob troubleshooting. "
444+ + "store=" + getStoreInfo () + " cell=" + c );
445+ }
400446 }
401447 }
402448 } else {
@@ -444,9 +490,15 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
444490 if (MobUtils .hasValidMobRefCellValue (c )) {
445491 // We do not check mobSizeThreshold during normal compaction,
446492 // leaving it to a MOB compaction run
447- writer .append (c );
448- // Add MOB reference to a MOB reference set
449- mobRefSet .get ().add (MobUtils .getMobFileName (c ));
493+ Optional <TableName > refTable = MobUtils .getTableName (c );
494+ if (refTable .isPresent ()) {
495+ mobRefSet .get ().put (refTable .get (), MobUtils .getMobFileName (c ));
496+ writer .append (c );
497+ } else {
498+ throw new IOException ("MOB cell did not contain a tablename "
499+ + "tag. should not be possible. see ref guide on mob troubleshooting. " + "store="
500+ + getStoreInfo () + " cell=" + c );
501+ }
450502 } else {
451503 String errMsg = String .format ("Corrupted MOB reference: %s" , c .toString ());
452504 throw new IOException (errMsg );
@@ -525,7 +577,7 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
525577 throughputController .finish (compactionName );
526578 if (!finished && mobFileWriter != null ) {
527579 // Remove all MOB references because compaction failed
528- mobRefSet . get (). clear ();
580+ clearThreadLocals ();
529581 // Abort writer
530582 LOG .debug ("Aborting writer for {} because of a compaction failure, Store {}" ,
531583 mobFileWriter .getPath (), getStoreInfo ());
@@ -543,16 +595,13 @@ protected boolean performCompaction(FileDetails fd, InternalScanner scanner, Cel
543595 return true ;
544596 }
545597
546- private String getStoreInfo () {
598+ protected String getStoreInfo () {
547599 return String .format ("[table=%s family=%s region=%s]" , store .getTableName ().getNameAsString (),
548600 store .getColumnFamilyName (), store .getRegionInfo ().getEncodedName ());
549601 }
550602
551603 private void clearThreadLocals () {
552- Set <String > set = mobRefSet .get ();
553- if (set != null ) {
554- set .clear ();
555- }
604+ mobRefSet .get ().clear ();
556605 HashMap <String , Long > map = mobLengthMap .get ();
557606 if (map != null ) {
558607 map .clear ();
@@ -567,7 +616,7 @@ private StoreFileWriter newMobWriter(FileDetails fd, boolean major) throws IOExc
567616 LOG .debug ("New MOB writer created={} store={}" , mobFileWriter .getPath ().getName (),
568617 getStoreInfo ());
569618 // Add reference we get for compact MOB
570- mobRefSet .get ().add ( mobFileWriter .getPath ().getName ());
619+ mobRefSet .get ().put ( store . getTableName (), mobFileWriter .getPath ().getName ());
571620 return mobFileWriter ;
572621 } catch (IOException e ) {
573622 // Bailing out
@@ -599,7 +648,7 @@ private void commitOrAbortMobWriter(StoreFileWriter mobFileWriter, long maxSeqId
599648 LOG .debug ("Aborting writer for {} because there are no MOB cells, store={}" ,
600649 mobFileWriter .getPath (), getStoreInfo ());
601650 // Remove MOB file from reference set
602- mobRefSet .get ().remove (mobFileWriter .getPath ().getName ());
651+ mobRefSet .get ().remove (store . getTableName (), mobFileWriter .getPath ().getName ());
603652 abortWriter (mobFileWriter );
604653 }
605654 } else {
@@ -612,9 +661,7 @@ protected List<Path> commitWriter(StoreFileWriter writer, FileDetails fd,
612661 CompactionRequestImpl request ) throws IOException {
613662 List <Path > newFiles = Lists .newArrayList (writer .getPath ());
614663 writer .appendMetadata (fd .maxSeqId , request .isAllFiles (), request .getFiles ());
615- // Append MOB references
616- Set <String > refSet = mobRefSet .get ();
617- writer .appendMobMetadata (refSet );
664+ writer .appendMobMetadata (mobRefSet .get ());
618665 writer .close ();
619666 clearThreadLocals ();
620667 return newFiles ;
0 commit comments