File tree Expand file tree Collapse file tree 1 file changed +13
-13
lines changed
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming Expand file tree Collapse file tree 1 file changed +13
-13
lines changed Original file line number Diff line number Diff line change @@ -127,19 +127,19 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
127127 }
128128
129129 override def commit (end : Offset ): Unit = synchronized {
130- if ( end. isInstanceOf [ LongOffset ]) {
131- val newOffset = end. asInstanceOf [ LongOffset ]
132- val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
133-
134- if (offsetDiff < 0 ) {
135- sys.error(s " Offsets committed out of order: $lastOffsetCommitted followed by $end" )
136- }
137-
138- batches.trimStart(offsetDiff)
139- lastOffsetCommitted = newOffset
140- } else {
141- sys.error(s " MemoryStream.commit() received an offset ( $end) that did not originate with " +
142- s " an instance of this class " )
130+ end match {
131+ case newOffset : LongOffset =>
132+ val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
133+
134+ if (offsetDiff < 0 ) {
135+ sys.error(s " Offsets committed out of order: $lastOffsetCommitted followed by $end" )
136+ }
137+
138+ batches.trimStart(offsetDiff)
139+ lastOffsetCommitted = newOffset
140+ case _ =>
141+ sys.error(s " MemoryStream.commit() received an offset ( $end) that did not originate with " +
142+ " an instance of this class" )
143143 }
144144 }
145145
You can’t perform that action at this time.
0 commit comments