3838import java .util .HashMap ;
3939import java .util .HashSet ;
4040import java .util .Iterator ;
41+ import java .util .LinkedHashSet ;
4142import java .util .List ;
4243import java .util .Map ;
4344import java .util .Map .Entry ;
6465import java .util .concurrent .atomic .AtomicBoolean ;
6566import java .util .concurrent .atomic .AtomicInteger ;
6667import java .util .concurrent .atomic .AtomicLong ;
68+ import java .util .concurrent .atomic .AtomicReference ;
6769import java .util .concurrent .atomic .LongAdder ;
70+ import java .util .concurrent .locks .Condition ;
6871import java .util .concurrent .locks .Lock ;
6972import java .util .concurrent .locks .ReadWriteLock ;
7073import java .util .concurrent .locks .ReentrantLock ;
@@ -6811,49 +6814,41 @@ public String toString() {
68116814 public static class RowCommitSequencer {
68126815
68136816 final AtomicLong sequence = new AtomicLong (EnvironmentEdgeManager .currentTime ());
6814- final ConcurrentSkipListSet <HashedBytes > sequenceRowSet = new ConcurrentSkipListSet <>();
6815- final ReentrantLock sequenceRowSetLock = new ReentrantLock ();
6817+ final AtomicReference <HashSet <HashedBytes >> sequenceRowSet =
6818+ new AtomicReference <>(new LinkedHashSet <>());
6819+ final ReentrantLock lock = new ReentrantLock ();
6820+ final Condition condition = lock .newCondition ();
68166821 final LongAdder yieldCount = new LongAdder ();
68176822
6818- long updateCurrentTime (final long now ) {
6819- return sequence .updateAndGet (x -> {
6820- // Clock might go backwards, deliberately so in some unit tests.
6821- // We only care that there is a unique CSLS for each discrete tick.
6823+ boolean updateCurrentTime (final long now ) {
6824+ final boolean changed [] = { false };
6825+ sequence .updateAndGet (x -> {
68226826 if (now != x ) {
6823- // When the clock ticks, we begin a new commit sequence.
6824- sequenceRowSet .clear ();
6827+ changed [0 ] = true ;
68256828 }
68266829 return now ;
68276830 });
6831+ return changed [0 ];
68286832 }
68296833
6830- boolean checkForOverlap (Collection <RowLock > rowLocks ) throws IOException {
6834+ boolean checkAndAddRows (Collection <RowLock > rowLocks ) throws IOException {
68316835 boolean overlap = false ;
68326836 // For each row, test if the set already contains the row. If there is no mutation
68336837 // and the current operation will be allowed to go forward, then add all of its rows
68346838 // to the set.
6835- // Unfortunately we need the set key tests and additions to be atomic in the aggregate.
6836- try {
6837- sequenceRowSetLock .lockInterruptibly ();
6838- } catch (InterruptedException e ) {
6839- throw (IOException ) new InterruptedIOException ().initCause (e );
6839+ Set <HashedBytes > rowSet = sequenceRowSet .get ();
6840+ for (RowLock l : rowLocks ) {
6841+ HashedBytes row = ((RowLockImpl )l ).context .row ;
6842+ if (rowSet .contains (row )) {
6843+ overlap = true ;
6844+ break ;
6845+ }
68406846 }
6841- try {
6847+ if (! overlap ) {
68426848 for (RowLock l : rowLocks ) {
68436849 HashedBytes row = ((RowLockImpl )l ).context .row ;
6844- if (sequenceRowSet .contains (row )) {
6845- overlap = true ;
6846- break ;
6847- }
6850+ rowSet .add (row );
68486851 }
6849- if (!overlap ) {
6850- for (RowLock l : rowLocks ) {
6851- HashedBytes row = ((RowLockImpl )l ).context .row ;
6852- sequenceRowSet .add (row );
6853- }
6854- }
6855- } finally {
6856- sequenceRowSetLock .unlock ();
68576852 }
68586853 return overlap ;
68596854 }
@@ -6863,34 +6858,45 @@ boolean checkForOverlap(Collection<RowLock> rowLocks) throws IOException {
68636858 * @return the time to use for the pending batch mutation
68646859 */
68656860 public long getRowSequence (List <RowLock > rowLocks ) throws IOException {
6866- // When the clock ticks, we begin a new commit sequence. Do this housekeeping first.
6867- long last = updateCurrentTime (EnvironmentEdgeManager .currentTime ());
6868- // Now we can check for collisions.
6869- if (!checkForOverlap (rowLocks )) {
6870- // No collision detected, proceed.
6871- return sequence .get ();
6872- }
6873- // Collision detected, now we wait.
6874- boolean overlap = true ;
6875- do {
6876- yieldCount .increment ();
6877- try {
6878- // Yield the thread with a small sleep.
6879- // Actual time suspended will depend on platform. Not less than the resolution of the
6880- // system clock, which is what we want.
6881- Thread .sleep (1 ,0 );
6882- } catch (InterruptedException e ) {
6883- throw (IOException ) new InterruptedIOException ().initCause (e );
6861+ while (true ) {
6862+ boolean locked = false ;
6863+ // When the clock ticks, we begin a new commit sequence. Do this housekeeping first.
6864+ if (updateCurrentTime (EnvironmentEdgeManager .currentTime ())) {
6865+ sequenceRowSet .set (new LinkedHashSet <>());
6866+ try {
6867+ lock .lockInterruptibly ();
6868+ } catch (InterruptedException e ) {
6869+ throw (IOException ) new InterruptedIOException ().initCause (e );
6870+ }
6871+ condition .signalAll ();
6872+ locked = true ;
68846873 }
6885- // Don't check again until the time changes, it would be useless overhead.
6886- long now = updateCurrentTime (EnvironmentEdgeManager .currentTime ());
6887- if (last != now ) {
6888- overlap = checkForOverlap (rowLocks );
6889- last = now ;
6874+ if (!locked ) {
6875+ try {
6876+ lock .lockInterruptibly ();
6877+ } catch (InterruptedException e ) {
6878+ throw (IOException ) new InterruptedIOException ().initCause (e );
6879+ }
68906880 }
6891- } while (overlap );
6892- // No collision detected, proceed
6893- return sequence .get ();
6881+ try {
6882+ // Now we can check for collisions.
6883+ if (!checkAndAddRows (rowLocks )) {
6884+ // No collision detected, proceed.
6885+ return sequence .get ();
6886+ }
6887+ // Collision, get in line.
6888+ yieldCount .increment ();
6889+ try {
6890+ if (!condition .await (1 , TimeUnit .MILLISECONDS )) {
6891+ continue ;
6892+ }
6893+ } catch (InterruptedException e ) {
6894+ throw (IOException ) new InterruptedIOException ().initCause (e );
6895+ }
6896+ } finally {
6897+ lock .unlock ();
6898+ }
6899+ }
68946900 }
68956901
68966902 /**
0 commit comments