1616 */
1717package org .apache .hadoop .hbase .quotas ;
1818
19+ import static org .junit .Assert .assertEquals ;
20+ import static org .junit .Assert .assertTrue ;
1921import static org .junit .Assert .fail ;
2022
2123import java .io .IOException ;
2931import java .util .concurrent .atomic .AtomicLong ;
3032
3133import org .apache .hadoop .conf .Configuration ;
34+ import org .apache .hadoop .fs .FileSystem ;
35+ import org .apache .hadoop .fs .Path ;
3236import org .apache .hadoop .hbase .HBaseTestingUtility ;
3337import org .apache .hadoop .hbase .HColumnDescriptor ;
38+ import org .apache .hadoop .hbase .HConstants ;
3439import org .apache .hadoop .hbase .HTableDescriptor ;
3540import org .apache .hadoop .hbase .MiniHBaseCluster ;
3641import org .apache .hadoop .hbase .NamespaceDescriptor ;
3742import org .apache .hadoop .hbase .TableName ;
43+ import org .apache .hadoop .hbase .TableNotEnabledException ;
3844import org .apache .hadoop .hbase .Waiter .Predicate ;
3945import org .apache .hadoop .hbase .client .Admin ;
46+ import org .apache .hadoop .hbase .client .Append ;
47+ import org .apache .hadoop .hbase .client .ClientServiceCallable ;
4048import org .apache .hadoop .hbase .client .Connection ;
49+ import org .apache .hadoop .hbase .client .Delete ;
50+ import org .apache .hadoop .hbase .client .Increment ;
51+ import org .apache .hadoop .hbase .client .Mutation ;
4152import org .apache .hadoop .hbase .client .Put ;
53+ import org .apache .hadoop .hbase .client .Result ;
54+ import org .apache .hadoop .hbase .client .ResultScanner ;
55+ import org .apache .hadoop .hbase .client .Scan ;
56+ import org .apache .hadoop .hbase .client .SecureBulkLoadClient ;
4257import org .apache .hadoop .hbase .client .Table ;
58+ import org .apache .hadoop .hbase .ipc .RpcControllerFactory ;
4359import org .apache .hadoop .hbase .regionserver .HRegion ;
4460import org .apache .hadoop .hbase .regionserver .HStore ;
4561import org .apache .hadoop .hbase .regionserver .HStoreFile ;
62+ import org .apache .hadoop .hbase .regionserver .TestHRegionServerBulkLoad ;
4663import org .apache .hadoop .hbase .util .Bytes ;
64+ import org .apache .hadoop .hbase .util .Pair ;
65+ import org .apache .hadoop .util .StringUtils ;
4766import org .apache .yetus .audience .InterfaceAudience ;
4867import org .junit .rules .TestName ;
4968import org .slf4j .Logger ;
@@ -65,6 +84,7 @@ public class SpaceQuotaHelperForTests {
6584 private final HBaseTestingUtility testUtil ;
6685 private final TestName testName ;
6786 private final AtomicLong counter ;
87+ private static final int NUM_RETRIES = 10 ;
6888
6989 public SpaceQuotaHelperForTests (
7090 HBaseTestingUtility testUtil , TestName testName , AtomicLong counter ) {
@@ -110,16 +130,214 @@ long listNumDefinedQuotas(Connection conn) throws IOException {
110130 }
111131 }
112132
133+ /**
134+ * Writes the given mutation into a table until it violates the given policy.
135+ * Verifies that the policy has been violated & then returns the name of
136+ * the table created & written into.
137+ */
138+ TableName writeUntilViolationAndVerifyViolation (
139+ SpaceViolationPolicy policyToViolate , Mutation m ) throws Exception {
140+ final TableName tn = writeUntilViolation (policyToViolate );
141+ verifyViolation (policyToViolate , tn , m );
142+ return tn ;
143+ }
144+
145+ /**
146+ * Writes the given mutation into a table until it violates the given policy.
147+ * Returns the name of the table created & written into.
148+ */
149+ TableName writeUntilViolation (SpaceViolationPolicy policyToViolate ) throws Exception {
150+ TableName tn = createTableWithRegions (10 );
151+ setQuotaLimit (tn , policyToViolate , 2L );
152+ // Write more data than should be allowed and flush it to disk
153+ writeData (tn , 3L * SpaceQuotaHelperForTests .ONE_MEGABYTE );
154+
155+ // This should be sufficient time for the chores to run and see the change.
156+ Thread .sleep (5000 );
157+
158+ return tn ;
159+ }
160+
161+ /**
162+ * Verifies that the given policy on the given table has been violated
163+ */
164+ void verifyViolation (SpaceViolationPolicy policyToViolate , TableName tn , Mutation m )
165+ throws Exception {
166+ // But let's try a few times to get the exception before failing
167+ boolean sawError = false ;
168+ String msg = "" ;
169+ for (int i = 0 ; i < NUM_RETRIES && !sawError ; i ++) {
170+ try (Table table = testUtil .getConnection ().getTable (tn )) {
171+ if (m instanceof Put ) {
172+ table .put ((Put ) m );
173+ } else if (m instanceof Delete ) {
174+ table .delete ((Delete ) m );
175+ } else if (m instanceof Append ) {
176+ table .append ((Append ) m );
177+ } else if (m instanceof Increment ) {
178+ table .increment ((Increment ) m );
179+ } else {
180+ fail (
181+ "Failed to apply " + m .getClass ().getSimpleName () +
182+ " to the table. Programming error" );
183+ }
184+ LOG .info ("Did not reject the " + m .getClass ().getSimpleName () + ", will sleep and retry" );
185+ Thread .sleep (2000 );
186+ } catch (Exception e ) {
187+ msg = StringUtils .stringifyException (e );
188+ if ((policyToViolate .equals (SpaceViolationPolicy .DISABLE )
189+ && e instanceof TableNotEnabledException ) || msg .contains (policyToViolate .name ())) {
190+ LOG .info ("Got the expected exception={}" , msg );
191+ sawError = true ;
192+ break ;
193+ } else {
194+ LOG .warn ("Did not get the expected exception, will sleep and retry" , e );
195+ Thread .sleep (2000 );
196+ }
197+ }
198+ }
199+ if (!sawError ) {
200+ try (Table quotaTable = testUtil .getConnection ().getTable (QuotaUtil .QUOTA_TABLE_NAME )) {
201+ ResultScanner scanner = quotaTable .getScanner (new Scan ());
202+ Result result = null ;
203+ LOG .info ("Dumping contents of hbase:quota table" );
204+ while ((result = scanner .next ()) != null ) {
205+ LOG .info (Bytes .toString (result .getRow ()) + " => " + result .toString ());
206+ }
207+ scanner .close ();
208+ }
209+ } else {
210+ if (policyToViolate .equals (SpaceViolationPolicy .DISABLE )) {
211+ assertTrue (
212+ msg .contains ("TableNotEnabledException" ) || msg .contains (policyToViolate .name ()));
213+ } else {
214+ assertTrue ("Expected exception message to contain the word '" + policyToViolate .name ()
215+ + "', but was " + msg ,
216+ msg .contains (policyToViolate .name ()));
217+ }
218+ }
219+ assertTrue (
220+ "Expected to see an exception writing data to a table exceeding its quota" , sawError );
221+ }
222+
223+ /**
224+ * Verifies that no policy has been violated on the given table
225+ */
226+ void verifyNoViolation (TableName tn , Mutation m ) throws Exception {
227+ // But let's try a few times to write data before failing
228+ boolean sawSuccess = false ;
229+ for (int i = 0 ; i < NUM_RETRIES && !sawSuccess ; i ++) {
230+ try (Table table = testUtil .getConnection ().getTable (tn )) {
231+ if (m instanceof Put ) {
232+ table .put ((Put ) m );
233+ } else if (m instanceof Delete ) {
234+ table .delete ((Delete ) m );
235+ } else if (m instanceof Append ) {
236+ table .append ((Append ) m );
237+ } else if (m instanceof Increment ) {
238+ table .increment ((Increment ) m );
239+ } else {
240+ fail ("Failed to apply " + m .getClass ().getSimpleName () + " to the table."
241+ + " Programming error" );
242+ }
243+ sawSuccess = true ;
244+ } catch (Exception e ) {
245+ LOG .info ("Rejected the " + m .getClass ().getSimpleName () + ", will sleep and retry" );
246+ Thread .sleep (2000 );
247+ }
248+ }
249+ if (!sawSuccess ) {
250+ try (Table quotaTable = testUtil .getConnection ().getTable (QuotaUtil .QUOTA_TABLE_NAME )) {
251+ ResultScanner scanner = quotaTable .getScanner (new Scan ());
252+ Result result = null ;
253+ LOG .info ("Dumping contents of hbase:quota table" );
254+ while ((result = scanner .next ()) != null ) {
255+ LOG .info (Bytes .toString (result .getRow ()) + " => " + result .toString ());
256+ }
257+ scanner .close ();
258+ }
259+ }
260+ assertTrue ("Expected to succeed in writing data to a table not having quota " , sawSuccess );
261+ }
262+
263+ /**
264+ * Sets the given quota (policy & limit) on the passed table.
265+ */
266+ void setQuotaLimit (final TableName tn , SpaceViolationPolicy policy , long sizeInMBs )
267+ throws Exception {
268+ final long sizeLimit = sizeInMBs * SpaceQuotaHelperForTests .ONE_MEGABYTE ;
269+ QuotaSettings settings = QuotaSettingsFactory .limitTableSpace (tn , sizeLimit , policy );
270+ testUtil .getAdmin ().setQuota (settings );
271+ LOG .debug ("Quota limit set for table = {}, limit = {}" , tn , sizeLimit );
272+ }
273+
274+ /**
275+ * Removes the space quota from the given table
276+ */
277+ void removeQuotaFromtable (final TableName tn ) throws Exception {
278+ QuotaSettings removeQuota = QuotaSettingsFactory .removeTableSpaceLimit (tn );
279+ testUtil .getAdmin ().setQuota (removeQuota );
280+ LOG .debug ("Space quota settings removed from the table " , tn );
281+ }
282+
283+ /**
284+ *
285+ * @param tn the tablename
286+ * @param numFiles number of files
287+ * @param numRowsPerFile number of rows per file
288+ * @return a clientServiceCallable which can be used with the Caller factory for bulk load
289+ * @throws Exception when failed to get connection, table or preparation of the bulk load
290+ */
291+ ClientServiceCallable <Void > generateFileToLoad (TableName tn , int numFiles , int numRowsPerFile )
292+ throws Exception {
293+ Connection conn = testUtil .getConnection ();
294+ FileSystem fs = testUtil .getTestFileSystem ();
295+ Configuration conf = testUtil .getConfiguration ();
296+ Path baseDir = new Path (fs .getHomeDirectory (), testName .getMethodName () + "_files" );
297+ fs .mkdirs (baseDir );
298+ final List <Pair <byte [], String >> famPaths = new ArrayList <Pair <byte [], String >>();
299+ for (int i = 1 ; i <= numFiles ; i ++) {
300+ Path hfile = new Path (baseDir , "file" + i );
301+ TestHRegionServerBulkLoad
302+ .createHFile (fs , hfile , Bytes .toBytes (SpaceQuotaHelperForTests .F1 ), Bytes .toBytes ("to" ),
303+ Bytes .toBytes ("reject" ), numRowsPerFile );
304+ famPaths .add (new Pair <>(Bytes .toBytes (SpaceQuotaHelperForTests .F1 ), hfile .toString ()));
305+ }
306+
307+ // bulk load HFiles
308+ Table table = conn .getTable (tn );
309+ final String bulkToken = new SecureBulkLoadClient (conf , table ).prepareBulkLoad (conn );
310+ return new ClientServiceCallable <Void >(conn , tn , Bytes .toBytes ("row" ),
311+ new RpcControllerFactory (conf ).newController (), HConstants .PRIORITY_UNSET ) {
312+ @ Override
313+ public Void rpcCall () throws Exception {
314+ SecureBulkLoadClient secureClient = null ;
315+ byte [] regionName = getLocation ().getRegionInfo ().getRegionName ();
316+ try (Table table = conn .getTable (getTableName ())) {
317+ secureClient = new SecureBulkLoadClient (conf , table );
318+ secureClient .secureBulkLoadHFiles (getStub (), famPaths , regionName , true , null , bulkToken );
319+ }
320+ return null ;
321+ }
322+ };
323+ }
324+
325+ /**
326+ * Removes all quotas defined in the HBase quota table.
327+ */
328+ void removeAllQuotas () throws Exception {
329+ final Connection conn = testUtil .getConnection ();
330+ removeAllQuotas (conn );
331+ assertEquals (0 , listNumDefinedQuotas (conn ));
332+ }
333+
113334 /**
114335 * Removes all quotas defined in the HBase quota table.
115336 */
116- void removeAllQuotas (Connection conn ) throws IOException , InterruptedException {
337+ void removeAllQuotas (Connection conn ) throws IOException {
117338 // Wait for the quota table to be created
118339 if (!conn .getAdmin ().tableExists (QuotaUtil .QUOTA_TABLE_NAME )) {
119- do {
120- LOG .debug ("Quota table does not yet exist" );
121- Thread .sleep (1000 );
122- } while (!conn .getAdmin ().tableExists (QuotaUtil .QUOTA_TABLE_NAME ));
340+ waitForQuotaTable (conn );
123341 } else {
124342 // Or, clean up any quotas from previous test runs.
125343 QuotaRetriever scanner = QuotaRetriever .open (conn .getConfiguration ());
0 commit comments