@@ -65,13 +65,11 @@ public final class SimpleRowBatch extends MemoryConsumer{
6565 private int vlen ;
6666 private int recordLength ;
6767
68- private MemoryBlock currentPage = null ;
68+ private MemoryBlock currentAndOnlyPage = null ;
6969 private Object currentAndOnlyBase = null ;
7070 private long recordStartOffset ;
7171 private long pageCursor = 0 ;
7272
73- private final LinkedList <MemoryBlock > dataPages = new LinkedList <>();
74-
7573 public static SimpleRowBatch allocate (StructType keySchema , StructType valueSchema ,
7674 TaskMemoryManager manager ) {
7775 return new SimpleRowBatch (keySchema , valueSchema , DEFAULT_CAPACITY , manager );
@@ -85,26 +83,23 @@ public static SimpleRowBatch allocate(StructType keySchema, StructType valueSche
8583 public int numRows () { return numRows ; }
8684
8785 public void close () {
88- if (dataPages .size () != 0 ) {
89- currentPage = dataPages .remove ();
90- freePage (currentPage );
91- assert (dataPages .size () == 0 );
86+ if (currentAndOnlyPage != null ) {
87+ freePage (currentAndOnlyPage );
88+ currentAndOnlyPage = null ;
9289 }
9390 }
9491
9592 private boolean acquireNewPage (long required ) {
9693 try {
97- currentPage = allocatePage (required );
94+ currentAndOnlyPage = allocatePage (required );
9895 } catch (OutOfMemoryError e ) {
9996 return false ;
10097 }
101- Platform .putInt (currentPage .getBaseObject (), currentPage .getBaseOffset (), 0 );
98+ currentAndOnlyBase = currentAndOnlyPage .getBaseObject ();
99+ Platform .putInt (currentAndOnlyBase , currentAndOnlyPage .getBaseOffset (), 0 );
102100 pageCursor = 4 ;
103- recordStartOffset = pageCursor + currentPage .getBaseOffset ();
101+ recordStartOffset = pageCursor + currentAndOnlyPage .getBaseOffset ();
104102
105- //System.out.println("acquired a new page");
106- dataPages .add (currentPage );
107- //TODO: add code to recycle the pages when we destroy this map
108103 return true ;
109104 }
110105
@@ -116,21 +111,13 @@ public UnsafeRow appendRow(Object kbase, long koff, int klen,
116111 Object vbase , long voff , int vlen ) {
117112 final long recordLength = 8 + klen + vlen + 8 ;
118113 // if run out of max supported rows or page size, return null
119- if (numRows >= capacity || currentPage == null
120- || currentPage .size () - pageCursor < recordLength ) {
114+ if (numRows >= capacity || currentAndOnlyPage == null
115+ || currentAndOnlyPage .size () - pageCursor < recordLength ) {
121116 return null ;
122117 }
123118
124- // We now keeps only one big 64MB page, so no need to do the following page acquisition:
125- // if (currentPage == null || currentPage.size() - pageCursor < recordLength) {
126- // // acquire new page
127- // if (!acquireNewPage(recordLength + 4L)) {
128- // return null;
129- // }
130- //}
131-
132- final Object base = currentPage .getBaseObject ();
133- long offset = currentPage .getBaseOffset () + pageCursor ;
119+ final Object base = currentAndOnlyBase ;
120+ long offset = currentAndOnlyPage .getBaseOffset () + pageCursor ;
134121 final long recordOffset = offset ;
135122 if (!allFixedLength ) { // we only put lengths info for variable length
136123 Platform .putInt (base , offset , klen + vlen + 4 );
@@ -143,7 +130,7 @@ public UnsafeRow appendRow(Object kbase, long koff, int klen,
143130 offset += vlen ;
144131 Platform .putLong (base , offset , 0 );
145132
146- offset = currentPage .getBaseOffset ();
133+ offset = currentAndOnlyPage .getBaseOffset ();
147134 Platform .putInt (base , offset , Platform .getInt (base , offset ) + 1 );
148135 pageCursor += recordLength ;
149136
@@ -225,8 +212,6 @@ public org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow> rowIterator() {
225212 private final UnsafeRow key = new UnsafeRow (keySchema .length ());
226213 private final UnsafeRow value = new UnsafeRow (valueSchema .length ());
227214
228- private MemoryBlock currentPage = null ;
229- private Object pageBaseObject = null ;
230215 private long offsetInPage = 0 ;
231216 private int recordsInPage = 0 ;
232217
@@ -237,11 +222,9 @@ public org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow> rowIterator() {
237222 private boolean inited = false ;
238223
239224 private void init () {
240- if (dataPages .size () > 0 ) {
241- currentPage = dataPages .remove ();
242- pageBaseObject = currentPage .getBaseObject ();
243- offsetInPage = currentPage .getBaseOffset ();
244- recordsInPage = Platform .getInt (pageBaseObject , offsetInPage );
225+ if (currentAndOnlyPage != null ) {
226+ offsetInPage = currentAndOnlyPage .getBaseOffset ();
227+ recordsInPage = Platform .getInt (currentAndOnlyBase , offsetInPage );
245228 offsetInPage += 4 ;
246229 }
247230 inited = true ;
@@ -251,22 +234,23 @@ private void init() {
251234 public boolean next () {
252235 if (!inited ) init ();
253236 //searching for the next non empty page is records is now zero
254- while (recordsInPage == 0 ) {
255- if (!advanceToNextPage ()) return false ;
237+ if (recordsInPage == 0 ) {
238+ freeCurrentPage ();
239+ return false ;
256240 }
257241
258242 if (allFixedLength ) {
259243 totalLength = klen + vlen + 4 ;
260244 currentklen = klen ;
261245 currentvlen = vlen ;
262246 } else {
263- totalLength = Platform .getInt (pageBaseObject , offsetInPage );
264- currentklen = Platform .getInt (pageBaseObject , offsetInPage + 4 );
247+ totalLength = Platform .getInt (currentAndOnlyBase , offsetInPage );
248+ currentklen = Platform .getInt (currentAndOnlyBase , offsetInPage + 4 );
265249 currentvlen = totalLength - currentklen - 4 ;
266250 }
267251
268- key .pointTo (pageBaseObject , offsetInPage + 8 , currentklen );
269- value .pointTo (pageBaseObject , offsetInPage + 8 + currentklen , currentvlen + 4 );
252+ key .pointTo (currentAndOnlyBase , offsetInPage + 8 , currentklen );
253+ value .pointTo (currentAndOnlyBase , offsetInPage + 8 + currentklen , currentvlen + 4 );
270254
271255 offsetInPage += 4 + totalLength + 8 ;
272256 recordsInPage -= 1 ;
@@ -288,17 +272,10 @@ public void close() {
288272 // do nothing
289273 }
290274
291- private boolean advanceToNextPage () {
292- if (currentPage != null ) freePage (currentPage ); //free before advance
293- if (dataPages .size () > 0 ) {
294- currentPage = dataPages .remove ();
295- pageBaseObject = currentPage .getBaseObject ();
296- offsetInPage = currentPage .getBaseOffset ();
297- recordsInPage = Platform .getInt (pageBaseObject , offsetInPage );
298- offsetInPage += 4 ;
299- return true ;
300- } else {
301- return false ;
275+ private void freeCurrentPage () {
276+ if (currentAndOnlyPage != null ) {
277+ freePage (currentAndOnlyPage );
278+ currentAndOnlyPage = null ;
302279 }
303280 }
304281 };
@@ -316,7 +293,7 @@ private SimpleRowBatch(StructType keySchema, StructType valueSchema, int maxRows
316293 this .valueRow = new UnsafeRow (valueSchema .length ());
317294
318295 // checking if there is any variable length fields
319- // there is probably a more succint impl of this
296+ // there is probably a more succinct impl of this
320297 allFixedLength = true ;
321298 for (String name : keySchema .fieldNames ()) {
322299 allFixedLength = allFixedLength
@@ -338,9 +315,9 @@ private SimpleRowBatch(StructType keySchema, StructType valueSchema, int maxRows
338315 }
339316
340317 if (!acquireNewPage (DEFAULT_PAGE_SIZE )) {
341- currentPage = null ;
318+ currentAndOnlyPage = null ;
342319 } else {
343- currentAndOnlyBase = currentPage .getBaseObject ();
320+ currentAndOnlyBase = currentAndOnlyPage .getBaseObject ();
344321 }
345322 }
346323}
0 commit comments