Skip to content

Commit bc94bcc

Browse files
committed
Operations management consistent with ParseFile
1 parent 369feaf commit bc94bcc

File tree

1 file changed

+32
-28
lines changed

1 file changed

+32
-28
lines changed

Parse/src/main/java/com/parse/ParseQuery.java

Lines changed: 32 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -889,9 +889,9 @@ public String toString() {
889889
private final State.Builder<T> builder;
890890
private ParseUser user;
891891

892-
private final Object lock = new Object();
893-
private int isRunning = 0;
894-
private TaskCompletionSource<Void> cts = new TaskCompletionSource<>();
892+
// Just like ParseFile
893+
private Set<TaskCompletionSource<?>> currentTasks = Collections.synchronizedSet(
894+
new HashSet<TaskCompletionSource<?>>());
895895

896896
/**
897897
* Constructs a query for a {@link ParseObject} subclass type. A default query with no further
@@ -961,21 +961,19 @@ public ParseQuery(ParseQuery<T> query) {
961961
}
962962

963963
/**
964-
* Cancels the current network request (if one is running).
964+
* Cancels the current network request(s) (if any is running).
965965
*/
966966
//TODO (grantland): Deprecate and replace with CancellationTokens
967967
public void cancel() {
968-
synchronized (lock) {
969-
cts.trySetCancelled();
970-
cts = new TaskCompletionSource<>();
971-
isRunning = 0;
968+
Set<TaskCompletionSource<?>> tasks = new HashSet<>(currentTasks);
969+
for (TaskCompletionSource<?> tcs : tasks) {
970+
tcs.trySetCancelled();
972971
}
972+
currentTasks.removeAll(tasks);
973973
}
974974

975975
public boolean isRunning() {
976-
synchronized (lock) {
977-
return isRunning > 0;
978-
}
976+
return currentTasks.size() > 0;
979977
}
980978

981979
/**
@@ -1123,10 +1121,13 @@ public long getMaxCacheAge() {
11231121
}
11241122

11251123

1126-
private <TResult> Task<TResult> perform(Callable<Task<TResult>> runnable) {
1127-
synchronized (lock) {
1128-
isRunning++;
1129-
}
1124+
/**
1125+
* Wraps the runnable operation and keeps it in sync with the given tcs, so we know how many
1126+
* operations are running (currentTasks.size()) and can cancel them.
1127+
*/
1128+
private <TResult> Task<TResult> perform(Callable<Task<TResult>> runnable, final TaskCompletionSource<?> tcs) {
1129+
currentTasks.add(tcs);
1130+
11301131
Task<TResult> task;
11311132
try {
11321133
task = runnable.call();
@@ -1136,10 +1137,8 @@ private <TResult> Task<TResult> perform(Callable<Task<TResult>> runnable) {
11361137
return task.continueWithTask(new Continuation<TResult, Task<TResult>>() {
11371138
@Override
11381139
public Task<TResult> then(Task<TResult> task) throws Exception {
1139-
synchronized (lock) {
1140-
// cancel() drops the count to 0 so we have to check
1141-
isRunning = Math.max(isRunning-1, 0);
1142-
}
1140+
tcs.trySetResult(null); // release
1141+
currentTasks.remove(tcs);
11431142
return task;
11441143
}
11451144
});
@@ -1187,18 +1186,19 @@ public Task<List<T>> call(State<T> state, ParseUser user, Task<Void> cancellatio
11871186
}
11881187

11891188
private Task<List<T>> findAsync(final State<T> state) {
1189+
final TaskCompletionSource<Void> tcs = new TaskCompletionSource<>();
11901190
return perform(new Callable<Task<List<T>>>() {
11911191
@Override
11921192
public Task<List<T>> call() throws Exception {
11931193
return getUserAsync(state).onSuccessTask(new Continuation<ParseUser, Task<List<T>>>() {
11941194
@Override
11951195
public Task<List<T>> then(Task<ParseUser> task) throws Exception {
11961196
final ParseUser user = task.getResult();
1197-
return findAsync(state, user, cts.getTask());
1197+
return findAsync(state, user, tcs.getTask());
11981198
}
11991199
});
12001200
}
1201-
});
1201+
}, tcs);
12021202
}
12031203

12041204
/* package */ Task<List<T>> findAsync(State<T> state, ParseUser user, Task<Void> cancellationToken) {
@@ -1254,18 +1254,19 @@ public Task<T> call(State<T> state, ParseUser user, Task<Void> cancellationToken
12541254
}
12551255

12561256
private Task<T> getFirstAsync(final State<T> state) {
1257+
final TaskCompletionSource<Void> tcs = new TaskCompletionSource<>();
12571258
return perform(new Callable<Task<T>>() {
12581259
@Override
12591260
public Task<T> call() throws Exception {
12601261
return getUserAsync(state).onSuccessTask(new Continuation<ParseUser, Task<T>>() {
12611262
@Override
12621263
public Task<T> then(Task<ParseUser> task) throws Exception {
12631264
final ParseUser user = task.getResult();
1264-
return getFirstAsync(state, user, cts.getTask());
1265+
return getFirstAsync(state, user, tcs.getTask());
12651266
}
12661267
});
12671268
}
1268-
});
1269+
}, tcs);
12691270
}
12701271

12711272
private Task<T> getFirstAsync(State<T> state, ParseUser user, Task<Void> cancellationToken) {
@@ -1331,18 +1332,19 @@ public Task<Integer> call(State<T> state, ParseUser user, Task<Void> cancellatio
13311332
}
13321333

13331334
private Task<Integer> countAsync(final State<T> state) {
1335+
final TaskCompletionSource<Void> tcs = new TaskCompletionSource<>();
13341336
return perform(new Callable<Task<Integer>>() {
13351337
@Override
13361338
public Task<Integer> call() throws Exception {
13371339
return getUserAsync(state).onSuccessTask(new Continuation<ParseUser, Task<Integer>>() {
13381340
@Override
13391341
public Task<Integer> then(Task<ParseUser> task) throws Exception {
13401342
final ParseUser user = task.getResult();
1341-
return countAsync(state, user, cts.getTask());
1343+
return countAsync(state, user, tcs.getTask());
13421344
}
13431345
});
13441346
}
1345-
});
1347+
}, tcs);
13461348
}
13471349

13481350
private Task<Integer> countAsync(State<T> state, ParseUser user, Task<Void> cancellationToken) {
@@ -1500,6 +1502,8 @@ private <TResult> Task<TResult> doCacheThenNetwork(
15001502
final ParseQuery.State<T> state,
15011503
final ParseCallback2<TResult, ParseException> callback,
15021504
final CacheThenNetworkCallable<T, Task<TResult>> delegate) {
1505+
1506+
final TaskCompletionSource<Void> tcs = new TaskCompletionSource<>();
15031507
return perform(new Callable<Task<TResult>>() {
15041508
@Override
15051509
public Task<TResult> call() throws Exception {
@@ -1514,21 +1518,21 @@ public Task<TResult> then(Task<ParseUser> task) throws Exception {
15141518
.setCachePolicy(CachePolicy.NETWORK_ONLY)
15151519
.build();
15161520

1517-
Task<TResult> executionTask = delegate.call(cacheState, user, cts.getTask());
1521+
Task<TResult> executionTask = delegate.call(cacheState, user, tcs.getTask());
15181522
executionTask = ParseTaskUtils.callbackOnMainThreadAsync(executionTask, callback);
15191523
return executionTask.continueWithTask(new Continuation<TResult, Task<TResult>>() {
15201524
@Override
15211525
public Task<TResult> then(Task<TResult> task) throws Exception {
15221526
if (task.isCancelled()) {
15231527
return task;
15241528
}
1525-
return delegate.call(networkState, user, cts.getTask());
1529+
return delegate.call(networkState, user, tcs.getTask());
15261530
}
15271531
});
15281532
}
15291533
});
15301534
}
1531-
});
1535+
}, tcs);
15321536
}
15331537

15341538
private interface CacheThenNetworkCallable<T extends ParseObject, TResult> {

0 commit comments

Comments
 (0)