1616 */ 
1717package  org .apache .hadoop .hbase .quotas ;
1818
19+ import  static  org .junit .Assert .assertEquals ;
20+ import  static  org .junit .Assert .assertTrue ;
1921import  static  org .junit .Assert .fail ;
2022
2123import  java .io .IOException ;
2931import  java .util .concurrent .atomic .AtomicLong ;
3032
3133import  org .apache .hadoop .conf .Configuration ;
34+ import  org .apache .hadoop .fs .FileSystem ;
35+ import  org .apache .hadoop .fs .Path ;
3236import  org .apache .hadoop .hbase .HBaseTestingUtility ;
3337import  org .apache .hadoop .hbase .HColumnDescriptor ;
38+ import  org .apache .hadoop .hbase .HConstants ;
3439import  org .apache .hadoop .hbase .HTableDescriptor ;
3540import  org .apache .hadoop .hbase .MiniHBaseCluster ;
3641import  org .apache .hadoop .hbase .NamespaceDescriptor ;
3742import  org .apache .hadoop .hbase .TableName ;
43+ import  org .apache .hadoop .hbase .TableNotEnabledException ;
3844import  org .apache .hadoop .hbase .Waiter .Predicate ;
3945import  org .apache .hadoop .hbase .client .Admin ;
46+ import  org .apache .hadoop .hbase .client .Append ;
47+ import  org .apache .hadoop .hbase .client .ClientServiceCallable ;
4048import  org .apache .hadoop .hbase .client .Connection ;
49+ import  org .apache .hadoop .hbase .client .Delete ;
50+ import  org .apache .hadoop .hbase .client .Increment ;
51+ import  org .apache .hadoop .hbase .client .Mutation ;
4152import  org .apache .hadoop .hbase .client .Put ;
53+ import  org .apache .hadoop .hbase .client .Result ;
54+ import  org .apache .hadoop .hbase .client .ResultScanner ;
55+ import  org .apache .hadoop .hbase .client .Scan ;
56+ import  org .apache .hadoop .hbase .client .SecureBulkLoadClient ;
4257import  org .apache .hadoop .hbase .client .Table ;
58+ import  org .apache .hadoop .hbase .ipc .RpcControllerFactory ;
4359import  org .apache .hadoop .hbase .regionserver .HRegion ;
4460import  org .apache .hadoop .hbase .regionserver .HStore ;
4561import  org .apache .hadoop .hbase .regionserver .HStoreFile ;
62+ import  org .apache .hadoop .hbase .regionserver .TestHRegionServerBulkLoad ;
4663import  org .apache .hadoop .hbase .util .Bytes ;
64+ import  org .apache .hadoop .hbase .util .Pair ;
65+ import  org .apache .hadoop .util .StringUtils ;
4766import  org .apache .yetus .audience .InterfaceAudience ;
4867import  org .junit .rules .TestName ;
4968import  org .slf4j .Logger ;
@@ -65,6 +84,7 @@ public class SpaceQuotaHelperForTests {
6584  private  final  HBaseTestingUtility  testUtil ;
6685  private  final  TestName  testName ;
6786  private  final  AtomicLong  counter ;
87+   private  static  final  int  NUM_RETRIES  = 10 ;
6888
6989  public  SpaceQuotaHelperForTests (
7090      HBaseTestingUtility  testUtil , TestName  testName , AtomicLong  counter ) {
@@ -110,16 +130,214 @@ long listNumDefinedQuotas(Connection conn) throws IOException {
110130    }
111131  }
112132
133+   /** 
134+    * Writes the given mutation into a table until it violates the given policy. 
135+    * Verifies that the policy has been violated & then returns the name of 
136+    * the table created & written into. 
137+    */ 
138+   TableName  writeUntilViolationAndVerifyViolation (
139+       SpaceViolationPolicy  policyToViolate , Mutation  m ) throws  Exception  {
140+     final  TableName  tn  = writeUntilViolation (policyToViolate );
141+     verifyViolation (policyToViolate , tn , m );
142+     return  tn ;
143+   }
144+ 
145+   /** 
146+    * Writes the given mutation into a table until it violates the given policy. 
147+    * Returns the name of the table created & written into. 
148+    */ 
149+   TableName  writeUntilViolation (SpaceViolationPolicy  policyToViolate ) throws  Exception  {
150+     TableName  tn  = createTableWithRegions (10 );
151+     setQuotaLimit (tn , policyToViolate , 2L );
152+     // Write more data than should be allowed and flush it to disk 
153+     writeData (tn , 3L  * SpaceQuotaHelperForTests .ONE_MEGABYTE );
154+ 
155+     // This should be sufficient time for the chores to run and see the change. 
156+     Thread .sleep (5000 );
157+ 
158+     return  tn ;
159+   }
160+ 
161+   /** 
162+    * Verifies that the given policy on the given table has been violated 
163+    */ 
164+   void  verifyViolation (SpaceViolationPolicy  policyToViolate , TableName  tn , Mutation  m )
165+       throws  Exception  {
166+     // But let's try a few times to get the exception before failing 
167+     boolean  sawError  = false ;
168+     String  msg  = "" ;
169+     for  (int  i  = 0 ; i  < NUM_RETRIES  && !sawError ; i ++) {
170+       try  (Table  table  = testUtil .getConnection ().getTable (tn )) {
171+         if  (m  instanceof  Put ) {
172+           table .put ((Put ) m );
173+         } else  if  (m  instanceof  Delete ) {
174+           table .delete ((Delete ) m );
175+         } else  if  (m  instanceof  Append ) {
176+           table .append ((Append ) m );
177+         } else  if  (m  instanceof  Increment ) {
178+           table .increment ((Increment ) m );
179+         } else  {
180+           fail (
181+               "Failed to apply "  + m .getClass ().getSimpleName () +
182+                   " to the table. Programming error" );
183+         }
184+         LOG .info ("Did not reject the "  + m .getClass ().getSimpleName () + ", will sleep and retry" );
185+         Thread .sleep (2000 );
186+       } catch  (Exception  e ) {
187+         msg  = StringUtils .stringifyException (e );
188+         if  ((policyToViolate .equals (SpaceViolationPolicy .DISABLE )
189+             && e  instanceof  TableNotEnabledException ) || msg .contains (policyToViolate .name ())) {
190+           LOG .info ("Got the expected exception={}" , msg );
191+           sawError  = true ;
192+           break ;
193+         } else  {
194+           LOG .warn ("Did not get the expected exception, will sleep and retry" , e );
195+           Thread .sleep (2000 );
196+         }
197+       }
198+     }
199+     if  (!sawError ) {
200+       try  (Table  quotaTable  = testUtil .getConnection ().getTable (QuotaUtil .QUOTA_TABLE_NAME )) {
201+         ResultScanner  scanner  = quotaTable .getScanner (new  Scan ());
202+         Result  result  = null ;
203+         LOG .info ("Dumping contents of hbase:quota table" );
204+         while  ((result  = scanner .next ()) != null ) {
205+           LOG .info (Bytes .toString (result .getRow ()) + " => "  + result .toString ());
206+         }
207+         scanner .close ();
208+       }
209+     } else  {
210+       if  (policyToViolate .equals (SpaceViolationPolicy .DISABLE )) {
211+         assertTrue (
212+             msg .contains ("TableNotEnabledException" ) || msg .contains (policyToViolate .name ()));
213+       } else  {
214+         assertTrue ("Expected exception message to contain the word '"  + policyToViolate .name ()
215+                 + "', but was "  + msg ,
216+             msg .contains (policyToViolate .name ()));
217+       }
218+     }
219+     assertTrue (
220+         "Expected to see an exception writing data to a table exceeding its quota" , sawError );
221+   }
222+ 
223+   /** 
224+    * Verifies that no policy has been violated on the given table 
225+    */ 
226+   void  verifyNoViolation (TableName  tn , Mutation  m ) throws  Exception  {
227+     // But let's try a few times to write data before failing 
228+     boolean  sawSuccess  = false ;
229+     for  (int  i  = 0 ; i  < NUM_RETRIES  && !sawSuccess ; i ++) {
230+       try  (Table  table  = testUtil .getConnection ().getTable (tn )) {
231+         if  (m  instanceof  Put ) {
232+           table .put ((Put ) m );
233+         } else  if  (m  instanceof  Delete ) {
234+           table .delete ((Delete ) m );
235+         } else  if  (m  instanceof  Append ) {
236+           table .append ((Append ) m );
237+         } else  if  (m  instanceof  Increment ) {
238+           table .increment ((Increment ) m );
239+         } else  {
240+           fail ("Failed to apply "  + m .getClass ().getSimpleName () + " to the table." 
241+               + " Programming error" );
242+         }
243+         sawSuccess  = true ;
244+       } catch  (Exception  e ) {
245+         LOG .info ("Rejected the "  + m .getClass ().getSimpleName () + ", will sleep and retry" );
246+         Thread .sleep (2000 );
247+       }
248+     }
249+     if  (!sawSuccess ) {
250+       try  (Table  quotaTable  = testUtil .getConnection ().getTable (QuotaUtil .QUOTA_TABLE_NAME )) {
251+         ResultScanner  scanner  = quotaTable .getScanner (new  Scan ());
252+         Result  result  = null ;
253+         LOG .info ("Dumping contents of hbase:quota table" );
254+         while  ((result  = scanner .next ()) != null ) {
255+           LOG .info (Bytes .toString (result .getRow ()) + " => "  + result .toString ());
256+         }
257+         scanner .close ();
258+       }
259+     }
260+     assertTrue ("Expected to succeed in writing data to a table not having quota " , sawSuccess );
261+   }
262+ 
263+   /** 
264+    * Sets the given quota (policy & limit) on the passed table. 
265+    */ 
266+   void  setQuotaLimit (final  TableName  tn , SpaceViolationPolicy  policy , long  sizeInMBs )
267+       throws  Exception  {
268+     final  long  sizeLimit  = sizeInMBs  * SpaceQuotaHelperForTests .ONE_MEGABYTE ;
269+     QuotaSettings  settings  = QuotaSettingsFactory .limitTableSpace (tn , sizeLimit , policy );
270+     testUtil .getAdmin ().setQuota (settings );
271+     LOG .debug ("Quota limit set for table = {}, limit = {}" , tn , sizeLimit );
272+   }
273+ 
274+   /** 
275+    * Removes the space quota from the given table 
276+    */ 
277+   void  removeQuotaFromtable (final  TableName  tn ) throws  Exception  {
278+     QuotaSettings  removeQuota  = QuotaSettingsFactory .removeTableSpaceLimit (tn );
279+     testUtil .getAdmin ().setQuota (removeQuota );
280+     LOG .debug ("Space quota settings removed from the table " , tn );
281+   }
282+ 
283+   /** 
284+    * 
285+    * @param tn the tablename 
286+    * @param numFiles number of files 
287+    * @param numRowsPerFile number of rows per file 
288+    * @return a clientServiceCallable which can be used with the Caller factory for bulk load 
289+    * @throws Exception when failed to get connection, table or preparation of the bulk load 
290+    */ 
291+   ClientServiceCallable <Void > generateFileToLoad (TableName  tn , int  numFiles , int  numRowsPerFile )
292+       throws  Exception  {
293+     Connection  conn  = testUtil .getConnection ();
294+     FileSystem  fs  = testUtil .getTestFileSystem ();
295+     Configuration  conf  = testUtil .getConfiguration ();
296+     Path  baseDir  = new  Path (fs .getHomeDirectory (), testName .getMethodName () + "_files" );
297+     fs .mkdirs (baseDir );
298+     final  List <Pair <byte [], String >> famPaths  = new  ArrayList <Pair <byte [], String >>();
299+     for  (int  i  = 1 ; i  <= numFiles ; i ++) {
300+       Path  hfile  = new  Path (baseDir , "file"  + i );
301+       TestHRegionServerBulkLoad 
302+           .createHFile (fs , hfile , Bytes .toBytes (SpaceQuotaHelperForTests .F1 ), Bytes .toBytes ("to" ),
303+               Bytes .toBytes ("reject" ), numRowsPerFile );
304+       famPaths .add (new  Pair <>(Bytes .toBytes (SpaceQuotaHelperForTests .F1 ), hfile .toString ()));
305+     }
306+ 
307+     // bulk load HFiles 
308+     Table  table  = conn .getTable (tn );
309+     final  String  bulkToken  = new  SecureBulkLoadClient (conf , table ).prepareBulkLoad (conn );
310+     return  new  ClientServiceCallable <Void >(conn , tn , Bytes .toBytes ("row" ),
311+         new  RpcControllerFactory (conf ).newController (), HConstants .PRIORITY_UNSET ) {
312+       @ Override 
313+       public  Void  rpcCall () throws  Exception  {
314+         SecureBulkLoadClient  secureClient  = null ;
315+         byte [] regionName  = getLocation ().getRegionInfo ().getRegionName ();
316+         try  (Table  table  = conn .getTable (getTableName ())) {
317+           secureClient  = new  SecureBulkLoadClient (conf , table );
318+           secureClient .secureBulkLoadHFiles (getStub (), famPaths , regionName , true , null , bulkToken );
319+         }
320+         return  null ;
321+       }
322+     };
323+   }
324+ 
325+   /** 
326+    * Removes all quotas defined in the HBase quota table. 
327+    */ 
328+   void  removeAllQuotas () throws  Exception  {
329+     final  Connection  conn  = testUtil .getConnection ();
330+     removeAllQuotas (conn );
331+     assertEquals (0 , listNumDefinedQuotas (conn ));
332+   }
333+ 
113334  /** 
114335   * Removes all quotas defined in the HBase quota table. 
115336   */ 
116-   void  removeAllQuotas (Connection  conn ) throws  IOException ,  InterruptedException  {
337+   void  removeAllQuotas (Connection  conn ) throws  IOException  {
117338    // Wait for the quota table to be created 
118339    if  (!conn .getAdmin ().tableExists (QuotaUtil .QUOTA_TABLE_NAME )) {
119-       do  {
120-         LOG .debug ("Quota table does not yet exist" );
121-         Thread .sleep (1000 );
122-       } while  (!conn .getAdmin ().tableExists (QuotaUtil .QUOTA_TABLE_NAME ));
340+       waitForQuotaTable (conn );
123341    } else  {
124342      // Or, clean up any quotas from previous test runs. 
125343      QuotaRetriever  scanner  = QuotaRetriever .open (conn .getConfiguration ());
0 commit comments