2323import  java .io .DataOutput ;
2424import  java .io .IOException ;
2525import  java .io .InputStreamReader ;
26+ import  java .net .URI ;
27+ import  java .net .URISyntaxException ;
2628import  java .nio .charset .StandardCharsets ;
2729import  java .util .HashSet ;
2830import  java .util .LinkedList ;
3638import  org .apache .hadoop .fs .FileSystem ;
3739import  org .apache .hadoop .fs .Path ;
3840import  org .apache .hadoop .hbase .Cell ;
39- import  org .apache .hadoop .hbase .HBaseCommonTestingUtility ;
4041import  org .apache .hadoop .hbase .HBaseConfiguration ;
4142import  org .apache .hadoop .hbase .HBaseTestingUtility ;
4243import  org .apache .hadoop .hbase .IntegrationTestBase ;
5253import  org .apache .hadoop .hbase .client .Table ;
5354import  org .apache .hadoop .hbase .client .TableDescriptor ;
5455import  org .apache .hadoop .hbase .client .TableDescriptorBuilder ;
55- import  org .apache .hadoop .hbase .io .compress .Compression ;
5656import  org .apache .hadoop .hbase .io .encoding .DataBlockEncoding ;
5757import  org .apache .hadoop .hbase .mapreduce .TableMapReduceUtil ;
5858import  org .apache .hadoop .hbase .regionserver .BloomType ;
6464import  org .apache .hadoop .hbase .test .util .warc .WARCRecord ;
6565import  org .apache .hadoop .hbase .test .util .warc .WARCWritable ;
6666import  org .apache .hadoop .hbase .util .Bytes ;
67- import  org .apache .hadoop .hbase .util .CompressionTest ;
6867import  org .apache .hadoop .hbase .util .RegionSplitter ;
6968import  org .apache .hadoop .io .BytesWritable ;
70- import  org .apache .hadoop .io .IOUtils ;
7169import  org .apache .hadoop .io .LongWritable ;
7270import  org .apache .hadoop .io .NullWritable ;
7371import  org .apache .hadoop .io .SequenceFile .CompressionType ;
7472import  org .apache .hadoop .io .Writable ;
7573import  org .apache .hadoop .mapreduce .Counters ;
76- import  org .apache .hadoop .mapreduce .InputFormat ;
77- import  org .apache .hadoop .mapreduce .InputSplit ;
7874import  org .apache .hadoop .mapreduce .Job ;
7975import  org .apache .hadoop .mapreduce .JobContext ;
8076import  org .apache .hadoop .mapreduce .Mapper ;
81- import  org .apache .hadoop .mapreduce .RecordReader ;
82- import  org .apache .hadoop .mapreduce .Reducer ;
83- import  org .apache .hadoop .mapreduce .TaskAttemptContext ;
8477import  org .apache .hadoop .mapreduce .lib .input .FileInputFormat ;
8578import  org .apache .hadoop .mapreduce .lib .input .SequenceFileInputFormat ;
86- import  org .apache .hadoop .mapreduce .lib .output .FileOutputFormat ;
8779import  org .apache .hadoop .mapreduce .lib .output .NullOutputFormat ;
8880import  org .apache .hadoop .mapreduce .lib .output .SequenceFileOutputFormat ;
8981import  org .apache .hadoop .util .Tool ;
@@ -560,12 +552,27 @@ protected void map(LongWritable key, WARCWritable value, Context output)
560552        if  (warcHeader .getRecordType ().equals ("response" ) && warcHeader .getTargetURI () != null ) {
561553          String  contentType  = warcHeader .getField ("WARC-Identified-Payload-Type" );
562554          if  (contentType  != null ) {
563-             byte [] rowKey  = Bytes .toBytes (warcHeader .getTargetURI ());
555+ 
556+             // Make row key 
557+ 
558+             byte [] rowKey ;
559+             try  {
560+               rowKey  = rowKeyFromTargetURI (warcHeader .getTargetURI ());
561+             } catch  (URISyntaxException  e ) {
562+               LOG .warn ("Could not parse URI \" "  + warcHeader .getTargetURI () + "\"  for record "  +
563+                 warcHeader .getRecordID ());
564+               return ;
565+             }
566+ 
567+             // Get the content and calculate the CRC64 
568+ 
564569            byte [] content  = value .getRecord ().getContent ();
565570            CRC64  crc  = new  CRC64 ();
566571            crc .update (content );
567572            long  crc64  = crc .getValue ();
568573
574+             // Store to HBase 
575+ 
569576            Put  put  = new  Put (rowKey );
570577
571578            put .addColumn (CONTENT_FAMILY_NAME , CONTENT_QUALIFIER , content );
@@ -585,6 +592,8 @@ protected void map(LongWritable key, WARCWritable value, Context output)
585592
586593            table .put (put );
587594
595+             // If we succeeded in storing to HBase, write records for later verification 
596+ 
588597            output .write (new  HBaseKeyWritable (rowKey , INFO_FAMILY_NAME , CRC_QUALIFIER ),
589598              new  BytesWritable (Bytes .toBytes (crc64 )));
590599            output .write (new  HBaseKeyWritable (rowKey , INFO_FAMILY_NAME , CONTENT_LENGTH_QUALIFIER ),
@@ -602,6 +611,43 @@ protected void map(LongWritable key, WARCWritable value, Context output)
602611          }
603612        }
604613      }
614+ 
615+       private  byte [] rowKeyFromTargetURI (String  targetURI ) throws  URISyntaxException  {
616+         URI  uri  = new  URI (targetURI );
617+         StringBuffer  sb  = new  StringBuffer ();
618+         // Ignore the scheme 
619+         // Reverse the components of the hostname 
620+         String [] hostComponents  = uri .getHost ().split ("\\ ." );
621+         for  (int  i  = hostComponents .length  - 1 ; i  >= 0 ; i --) {
622+           sb .append (hostComponents [i ]);
623+           if  (i  != 0 ) {
624+             sb .append ('.' );
625+           }
626+         }
627+         // Port 
628+         if  (uri .getPort () != -1 ) {
629+           sb .append (":" );
630+           sb .append (uri .getPort ());
631+         }
632+         // Ignore the rest of the authority 
633+         sb .append ("/" );
634+         // Path, if present 
635+         if  (uri .getRawPath () != null ) {
636+           sb .append (uri .getRawPath ());
637+         }
638+         // Query, if present 
639+         if  (uri .getRawQuery () != null ) {
640+           sb .append ("?" );
641+           sb .append (uri .getRawQuery ());
642+         }
643+         // Fragment, if present 
644+         if  (uri .getRawFragment () != null ) {
645+           sb .append ("#" );
646+           sb .append (uri .getRawFragment ());
647+         }
648+         return  Bytes .toBytes (sb .toString ());
649+       }
650+ 
605651    }
606652  }
607653
0 commit comments