001/* 002 * Copyright (c) 2016-2017 Daniel Ennis (Aikar) - MIT License 003 * 004 * Permission is hereby granted, free of charge, to any person obtaining 005 * a copy of this software and associated documentation files (the 006 * "Software"), to deal in the Software without restriction, including 007 * without limitation the rights to use, copy, modify, merge, publish, 008 * distribute, sublicense, and/or sell copies of the Software, and to 009 * permit persons to whom the Software is furnished to do so, subject to 010 * the following conditions: 011 * 012 * The above copyright notice and this permission notice shall be 013 * included in all copies or substantial portions of the Software. 014 * 015 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, 016 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 017 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND 018 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE 019 * LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION 020 * OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION 021 * WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 022 */ 023 024/* 025 * TaskChain for Minecraft Plugins 026 * 027 * Written by Aikar <[email protected]> 028 * https://aikar.co 029 * https://starlis.com 030 * 031 * @license MIT 032 */ 033 034package co.aikar.taskchain; 035 036import co.aikar.taskchain.TaskChainTasks.*; 037 038import java.util.ArrayList; 039import java.util.Collections; 040import java.util.HashMap; 041import java.util.List; 042import java.util.Map; 043import java.util.Objects; 044import java.util.concurrent.CompletableFuture; 045import java.util.concurrent.ConcurrentLinkedQueue; 046import java.util.concurrent.TimeUnit; 047import java.util.function.BiConsumer; 048import java.util.function.Consumer; 049import java.util.function.Predicate; 050import java.util.stream.Collectors; 051 052 053/** 054 * The Main API class of TaskChain. TaskChain's are created by a {@link TaskChainFactory} 055 */ 056@SuppressWarnings({"unused", "FieldAccessedSynchronizedAndUnsynchronized"}) 057public class TaskChain <T> { 058 private static final ThreadLocal<TaskChain<?>> currentChain = new ThreadLocal<>(); 059 060 private final GameInterface impl; 061 private final TaskChainFactory factory; 062 private final Map<String, Object> taskMap = new HashMap<>(0); 063 private final ConcurrentLinkedQueue<TaskHolder<?,?>> chainQueue = new ConcurrentLinkedQueue<>(); 064 065 private int currentActionIndex = 0; 066 private int actionIndex = 0; 067 private boolean executed = false; 068 private boolean async = false; 069 private boolean done = false; 070 071 private Object previous; 072 private TaskHolder<?, ?> currentHolder; 073 private Consumer<Boolean> doneCallback; 074 private BiConsumer<Exception, Task<?, ?>> errorHandler; 075 076 /* ======================================================================================== */ 077 TaskChain(TaskChainFactory factory) { 078 this.factory = factory; 079 this.impl = factory.getImplementation(); 080 } 081 /* ======================================================================================== */ 082 // <editor-fold desc="// API Methods - Getters & Setters"> 083 /** 084 * Called in an executing task, get the current action index. 085 * For every action that adds a task to the chain, the action index is increased. 086 * 087 * Useful in error or done handlers to know where you are in the chain when it aborted or threw exception. 088 * @return The current index 089 */ 090 public int getCurrentActionIndex() { 091 return currentActionIndex; 092 } 093 094 /** 095 * Changes the done callback handler for this chain 096 * @param doneCallback The handler 097 */ 098 @SuppressWarnings("WeakerAccess") 099 public void setDoneCallback(Consumer<Boolean> doneCallback) { 100 this.doneCallback = doneCallback; 101 } 102 103 /** 104 * @return The current error handler or null 105 */ 106 public BiConsumer<Exception, Task<?, ?>> getErrorHandler() { 107 return errorHandler; 108 } 109 110 /** 111 * Changes the error handler for this chain 112 * @param errorHandler The error handler 113 */ 114 @SuppressWarnings("WeakerAccess") 115 public void setErrorHandler(BiConsumer<Exception, Task<?, ?>> errorHandler) { 116 this.errorHandler = errorHandler; 117 } 118 // </editor-fold> 119 /* ======================================================================================== */ 120 // <editor-fold desc="// API Methods - Data Wrappers"> 121 122 /** 123 * Creates a data wrapper to return multiple objects from a task 124 */ 125 public static <D1, D2> TaskChainDataWrappers.Data2<D1, D2> multi(D1 var1, D2 var2) { 126 return new TaskChainDataWrappers.Data2<>(var1, var2); 127 } 128 129 /** 130 * Creates a data wrapper to return multiple objects from a task 131 */ 132 public static <D1, D2, D3> TaskChainDataWrappers.Data3<D1, D2, D3> multi(D1 var1, D2 var2, D3 var3) { 133 return new TaskChainDataWrappers.Data3<>(var1, var2, var3); 134 } 135 136 /** 137 * Creates a data wrapper to return multiple objects from a task 138 */ 139 public static <D1, D2, D3, D4> TaskChainDataWrappers.Data4<D1, D2, D3, D4> multi(D1 var1, D2 var2, D3 var3, D4 var4) { 140 return new TaskChainDataWrappers.Data4<>(var1, var2, var3, var4); 141 } 142 143 /** 144 * Creates a data wrapper to return multiple objects from a task 145 */ 146 public static <D1, D2, D3, D4, D5> TaskChainDataWrappers.Data5<D1, D2, D3, D4, D5> multi(D1 var1, D2 var2, D3 var3, D4 var4, D5 var5) { 147 return new TaskChainDataWrappers.Data5<>(var1, var2, var3, var4, var5); 148 } 149 150 /** 151 * Creates a data wrapper to return multiple objects from a task 152 */ 153 public static <D1, D2, D3, D4, D5, D6> TaskChainDataWrappers.Data6<D1, D2, D3, D4, D5, D6> multi(D1 var1, D2 var2, D3 var3, D4 var4, D5 var5, D6 var6) { 154 return new TaskChainDataWrappers.Data6<>(var1, var2, var3, var4, var5, var6); 155 } 156 // </editor-fold> 157 /* ======================================================================================== */ 158 159 // <editor-fold desc="// API Methods - Base"> 160 /** 161 * Call to abort execution of the chain. Should be called inside of an executing task. 162 */ 163 @SuppressWarnings("WeakerAccess") 164 public static void abort() { 165 TaskChainUtil.sneakyThrows(new AbortChainException()); 166 } 167 168 /** 169 * Usable only inside of an executing Task or Chain Error/Done handlers 170 * 171 * Gets the current chain that is executing this Task or Error/Done handler 172 * This method should only be called on the same thread that is executing the method. 173 * 174 * In an AsyncExecutingTask or a FutureTask, You must call this method BEFORE passing control to another thread. 175 */ 176 @SuppressWarnings("WeakerAccess") 177 public static TaskChain<?> getCurrentChain() { 178 return currentChain.get(); 179 } 180 181 /* ======================================================================================== */ 182 183 /** 184 * Allows you to call a callback to insert tasks into the chain without having to break the fluent interface 185 * 186 * Example: Plugin.newChain().sync(some::task).configure(chain -> { 187 * chain.async(some::foo); 188 * chain.sync(other::bar); 189 * }).async(other::task).execute(); 190 * 191 * @param configure Instance of the current chain. 192 * @return The same chain 193 */ 194 public TaskChain<T> configure(Consumer<TaskChain<T>> configure) { 195 configure.accept(this); 196 return this; 197 } 198 199 /** 200 * Checks if the chain has a value saved for the specified key. 201 * @param key Key to check if Task Data has a value for 202 */ 203 @SuppressWarnings("WeakerAccess") 204 public boolean hasTaskData(String key) { 205 return taskMap.containsKey(key); 206 } 207 208 /** 209 * Retrieves a value relating to a specific key, saved by a previous task. 210 * 211 * @param key Key to look up Task Data for 212 * @param <R> Type the Task Data value is expected to be 213 */ 214 @SuppressWarnings("WeakerAccess") 215 public <R> R getTaskData(String key) { 216 //noinspection unchecked 217 return (R) taskMap.get(key); 218 } 219 220 /** 221 * Saves a value for this chain so that a task furthur up the chain can access it. 222 * 223 * Useful for passing multiple values to the next (or furthur) tasks. 224 * 225 * @param key Key to store in Task Data 226 * @param val Value to store in Task Data 227 * @param <R> Type the Task Data value is expected to be 228 */ 229 @SuppressWarnings("WeakerAccess") 230 public <R> R setTaskData(String key, Object val) { 231 //noinspection unchecked 232 return (R) taskMap.put(key, val); 233 } 234 235 /** 236 * Removes a saved value on the chain. 237 * 238 * @param key Key to remove from Task Data 239 * @param <R> Type the Task Data value is expected to be 240 */ 241 @SuppressWarnings("WeakerAccess") 242 public <R> R removeTaskData(String key) { 243 //noinspection unchecked 244 return (R) taskMap.remove(key); 245 } 246 247 /** 248 * Takes the previous tasks return value, stores it to the specified key 249 * as Task Data, and then forwards that value to the next task. 250 * 251 * @param key Key to store the previous return value into Task Data 252 */ 253 @SuppressWarnings("WeakerAccess") 254 public TaskChain<T> storeAsData(String key) { 255 return current((val) -> { 256 setTaskData(key, val); 257 return val; 258 }); 259 } 260 261 /** 262 * Reads the specified key from Task Data, and passes it to the next task. 263 * 264 * Will need to pass expected type such as chain.<Foo>returnData("key") 265 * 266 * @param key Key to retrieve from Task Data and pass to next task 267 * @param <R> Return type that the next parameter can expect as argument type 268 */ 269 @SuppressWarnings("WeakerAccess") 270 public <R> TaskChain<R> returnData(String key) { 271 //noinspection unchecked 272 return currentFirst(() -> (R) getTaskData(key)); 273 } 274 275 /** 276 * Returns the chain itself to the next task. 277 */ 278 @SuppressWarnings("WeakerAccess") 279 public TaskChain<TaskChain<?>> returnChain() { 280 return currentFirst(() -> this); 281 } 282 283 284 /** 285 * IMPLEMENTATION SPECIFIC!! 286 * Consult your application implementation to understand how long 1 unit is. 287 * 288 * For example, in Minecraft it is a tick, which is roughly 50 milliseconds, but not guaranteed. 289 * 290 * Adds a delay to the chain execution. 291 * 292 * @param gameUnits # of game units to delay before next task 293 */ 294 @SuppressWarnings("WeakerAccess") 295 public TaskChain<T> delay(final int gameUnits) { 296 //noinspection CodeBlock2Expr 297 return currentCallback((input, next) -> { 298 impl.scheduleTask(gameUnits, () -> next.accept(input)); 299 }); 300 } 301 302 /** 303 * Adds a real time delay to the chain execution. 304 * Chain will abort if the delay is interrupted. 305 * 306 * @param duration duration of the delay before next task 307 */ 308 @SuppressWarnings("WeakerAccess") 309 public TaskChain<T> delay(final int duration, TimeUnit unit) { 310 //noinspection CodeBlock2Expr 311 return currentCallback((input, next) -> { 312 impl.scheduleTask(duration, unit, () -> next.accept(input)); 313 }); 314 } 315 316 // </editor-fold> 317 // <editor-fold desc="// API Methods - Abort"> 318 319 /** 320 * Aborts the chain once this step is reached. This is primarily useful when you are 321 * dynamically building the chains steps (such as .configure()) and want to conditionally stop the 322 * chain from proceeding. 323 * 324 * @return Chain 325 */ 326 public TaskChain<?> abortChain() { 327 if (executed) { 328 TaskChain.abort(); 329 return this; 330 } else { 331 return current(TaskChain::abort); 332 } 333 } 334 335 /** 336 * Checks if the previous task return was null. 337 * 338 * If not null, the previous task return will forward to the next task. 339 */ 340 @SuppressWarnings("WeakerAccess") 341 public TaskChain<T> abortIfNull() { 342 return abortIfNull(null, null, null, null); 343 } 344 345 /** 346 * {@link TaskChain#abortIf(Object, TaskChainAbortAction, Object, Object, Object)} 347 */ 348 @SuppressWarnings("WeakerAccess") 349 public TaskChain<T> abortIfNull(TaskChainAbortAction<?, ?, ?> action) { 350 return abortIf(Predicate.isEqual(null), action, null, null, null); 351 } 352 353 /** 354 * {@link TaskChain#abortIf(Object, TaskChainAbortAction, Object, Object, Object)} 355 */ 356 @SuppressWarnings("WeakerAccess") 357 public <A1> TaskChain<T> abortIfNull(TaskChainAbortAction<A1, ?, ?> action, A1 arg1) { 358 //noinspection unchecked 359 return abortIf(Predicate.isEqual(null), action, arg1, null, null); 360 } 361 362 /** 363 * {@link TaskChain#abortIf(Object, TaskChainAbortAction, Object, Object, Object)} 364 */ 365 @SuppressWarnings("WeakerAccess") 366 public <A1, A2> TaskChain<T> abortIfNull(TaskChainAbortAction<A1, A2, ?> action, A1 arg1, A2 arg2) { 367 //noinspection unchecked 368 return abortIf(Predicate.isEqual(null), action, arg1, arg2, null); 369 } 370 371 /** 372 * Checks if the previous task return was null, and aborts if it was 373 * Then executes supplied action handler 374 * 375 * If not null, the previous task return will forward to the next task. 376 */ 377 @SuppressWarnings("WeakerAccess") 378 public <A1, A2, A3> TaskChain<T> abortIfNull(TaskChainAbortAction<A1, A2, A3> action, A1 arg1, A2 arg2, A3 arg3) { 379 //noinspection unchecked 380 return abortIf(Predicate.isEqual(null), action, arg1, arg2, arg3); 381 } 382 383 /** 384 * Checks if the previous task return is the supplied value. 385 * 386 * If not, the previous task return will forward to the next task. 387 */ 388 @SuppressWarnings("WeakerAccess") 389 public TaskChain<T> abortIf(T ifObj) { 390 return abortIf(ifObj, null, null, null, null); 391 } 392 393 /** 394 * {@link TaskChain#abortIf(Object, TaskChainAbortAction, Object, Object, Object)} 395 */ 396 @SuppressWarnings("WeakerAccess") 397 public TaskChain<T> abortIf(T ifObj, TaskChainAbortAction<?, ?, ?> action) { 398 return abortIf(ifObj, action, null, null, null); 399 } 400 401 /** 402 * {@link TaskChain#abortIf(Object, TaskChainAbortAction, Object, Object, Object)} 403 */ 404 @SuppressWarnings("WeakerAccess") 405 public <A1> TaskChain<T> abortIf(T ifObj, TaskChainAbortAction<A1, ?, ?> action, A1 arg1) { 406 return abortIf(ifObj, action, arg1, null, null); 407 } 408 409 /** 410 * {@link TaskChain#abortIf(Object, TaskChainAbortAction, Object, Object, Object)} 411 */ 412 @SuppressWarnings("WeakerAccess") 413 public <A1, A2> TaskChain<T> abortIf(T ifObj, TaskChainAbortAction<A1, A2, ?> action, A1 arg1, A2 arg2) { 414 return abortIf(ifObj, action, arg1, arg2, null); 415 } 416 417 /** 418 * {@link TaskChain#abortIf(Predicate, TaskChainAbortAction, Object, Object, Object)} 419 */ 420 @SuppressWarnings("WeakerAccess") 421 public <A1, A2, A3> TaskChain<T> abortIf(T ifObj, TaskChainAbortAction<A1, A2, A3> action, A1 arg1, A2 arg2, A3 arg3) { 422 return abortIf(Predicate.isEqual(ifObj), action, arg1, arg2, arg3); 423 } 424 425 /** 426 * Checks if the previous task return matches the supplied predicate, and aborts if it was. 427 * 428 * If predicate does not match, the previous task return will forward to the next task. 429 */ 430 @SuppressWarnings("WeakerAccess") 431 public TaskChain<T> abortIf(Predicate<T> predicate) { 432 return abortIf(predicate, null, null, null, null); 433 } 434 435 /** 436 * {@link TaskChain#abortIf(Object, TaskChainAbortAction, Object, Object, Object)} 437 */ 438 @SuppressWarnings("WeakerAccess") 439 public TaskChain<T> abortIf(Predicate<T> predicate, TaskChainAbortAction<?, ?, ?> action) { 440 return abortIf(predicate, action, null, null, null); 441 } 442 443 /** 444 * {@link TaskChain#abortIf(Object, TaskChainAbortAction, Object, Object, Object)} 445 */ 446 @SuppressWarnings("WeakerAccess") 447 public <A1> TaskChain<T> abortIf(Predicate<T> predicate, TaskChainAbortAction<A1, ?, ?> action, A1 arg1) { 448 return abortIf(predicate, action, arg1, null, null); 449 } 450 451 /** 452 * {@link TaskChain#abortIf(Object, TaskChainAbortAction, Object, Object, Object)} 453 */ 454 @SuppressWarnings("WeakerAccess") 455 public <A1, A2> TaskChain<T> abortIf(Predicate<T> predicate, TaskChainAbortAction<A1, A2, ?> action, A1 arg1, A2 arg2) { 456 return abortIf(predicate, action, arg1, arg2, null); 457 } 458 459 /** 460 * Checks if the previous task return matches the supplied predicate, and aborts if it was. 461 * Then executes supplied action handler 462 * 463 * If predicate does not match, the previous task return will forward to the next task. 464 */ 465 public <A1, A2, A3> TaskChain<T> abortIf(Predicate<T> predicate, TaskChainAbortAction<A1, A2, A3> action, A1 arg1, A2 arg2, A3 arg3) { 466 return current((obj) -> { 467 if (predicate.test(obj)) { 468 handleAbortAction(action, arg1, arg2, arg3); 469 return null; 470 } 471 return obj; 472 }); 473 } 474 475 /** 476 * {@link TaskChain#abortIfNot(Object, TaskChainAbortAction, Object, Object, Object)} 477 */ 478 @SuppressWarnings("WeakerAccess") 479 public TaskChain<T> abortIfNot(T ifNotObj) { 480 return abortIfNot(ifNotObj, null, null, null, null); 481 } 482 483 /** 484 * {@link TaskChain#abortIfNot(Object, TaskChainAbortAction, Object, Object, Object)} 485 */ 486 @SuppressWarnings("WeakerAccess") 487 public TaskChain<T> abortIfNot(T ifNotObj, TaskChainAbortAction<?, ?, ?> action) { 488 return abortIfNot(ifNotObj, action, null, null, null); 489 } 490 491 /** 492 * {@link TaskChain#abortIfNot(Object, TaskChainAbortAction, Object, Object, Object)} 493 */ 494 @SuppressWarnings("WeakerAccess") 495 public <A1> TaskChain<T> abortIfNot(T ifNotObj, TaskChainAbortAction<A1, ?, ?> action, A1 arg1) { 496 return abortIfNot(ifNotObj, action, arg1, null, null); 497 } 498 499 /** 500 * {@link TaskChain#abortIfNot(Object, TaskChainAbortAction, Object, Object, Object)} 501 */ 502 @SuppressWarnings("WeakerAccess") 503 public <A1, A2> TaskChain<T> abortIfNot(T ifNotObj, TaskChainAbortAction<A1, A2, ?> action, A1 arg1, A2 arg2) { 504 return abortIfNot(ifNotObj, action, arg1, arg2, null); 505 } 506 507 /** 508 * {@link TaskChain#abortIfNot(Predicate, TaskChainAbortAction, Object, Object, Object)} 509 */ 510 @SuppressWarnings("WeakerAccess") 511 public <A1, A2, A3> TaskChain<T> abortIfNot(T ifNotObj, TaskChainAbortAction<A1, A2, A3> action, A1 arg1, A2 arg2, A3 arg3) { 512 return abortIfNot(Predicate.<T>isEqual(ifNotObj), action, arg1, arg2, arg3); 513 } 514 515 /** 516 * Checks if the previous task return does NOT match the supplied predicate, and aborts if it does not match. 517 * 518 * If predicate matches, the previous task return will forward to the next task. 519 */ 520 @SuppressWarnings("WeakerAccess") 521 public TaskChain<T> abortIfNot(Predicate<T> ifNotPredicate) { 522 return abortIfNot(ifNotPredicate, null, null, null, null); 523 } 524 525 /** 526 * {@link TaskChain#abortIfNot(Object, TaskChainAbortAction, Object, Object, Object)} 527 */ 528 @SuppressWarnings("WeakerAccess") 529 public TaskChain<T> abortIfNot(Predicate<T> ifNotPredicate, TaskChainAbortAction<?, ?, ?> action) { 530 return abortIfNot(ifNotPredicate, action, null, null, null); 531 } 532 533 /** 534 * {@link TaskChain#abortIfNot(Object, TaskChainAbortAction, Object, Object, Object)} 535 */ 536 @SuppressWarnings("WeakerAccess") 537 public <A1> TaskChain<T> abortIfNot(Predicate<T> ifNotPredicate, TaskChainAbortAction<A1, ?, ?> action, A1 arg1) { 538 return abortIfNot(ifNotPredicate, action, arg1, null, null); 539 } 540 541 /** 542 * {@link TaskChain#abortIfNot(Object, TaskChainAbortAction, Object, Object, Object)} 543 */ 544 @SuppressWarnings("WeakerAccess") 545 public <A1, A2> TaskChain<T> abortIfNot(Predicate<T> ifNotPredicate, TaskChainAbortAction<A1, A2, ?> action, A1 arg1, A2 arg2) { 546 return abortIfNot(ifNotPredicate, action, arg1, arg2, null); 547 } 548 549 /** 550 * Checks if the previous task return does NOT match the supplied predicate, and aborts if it does not match. 551 * Then executes supplied action handler 552 * 553 * If predicate matches, the previous task return will forward to the next task. 554 */ 555 @SuppressWarnings("WeakerAccess") 556 public <A1, A2, A3> TaskChain<T> abortIfNot(Predicate<T> ifNotPredicate, TaskChainAbortAction<A1, A2, A3> action, A1 arg1, A2 arg2, A3 arg3) { 557 return abortIf(ifNotPredicate.negate(), action, arg1, arg2, arg3); 558 } 559 560 // </editor-fold> 561 // <editor-fold desc="// API Methods - Async Executing"> 562 /* ======================================================================================== */ 563 // Async Executing Tasks 564 /* ======================================================================================== */ 565 566 /** 567 * Execute a task on the main thread, with no previous input, and a callback to return the response to. 568 * 569 * It's important you don't perform blocking operations in this method. Only use this if 570 * the task will be scheduling a different sync operation outside of the TaskChains scope. 571 * 572 * Usually you could achieve the same design with a blocking API by switching to an async task 573 * for the next task and running it there. 574 * 575 * This method would primarily be for cases where you need to use an API that ONLY provides 576 * a callback style API. 577 * 578 * @param task The task to execute 579 * @param <R> Return type that the next parameter can expect as argument type 580 */ 581 @SuppressWarnings("WeakerAccess") 582 public <R> TaskChain<R> syncFirstCallback(AsyncExecutingFirstTask<R> task) { 583 //noinspection unchecked 584 return add0(new TaskHolder<>(this, false, task)); 585 } 586 587 /** 588 * {@link TaskChain#syncFirstCallback(AsyncExecutingFirstTask)} but ran off main thread 589 * @param task The task to execute 590 * @param <R> Return type that the next parameter can expect as argument type 591 */ 592 @SuppressWarnings("WeakerAccess") 593 public <R> TaskChain<R> asyncFirstCallback(AsyncExecutingFirstTask<R> task) { 594 //noinspection unchecked 595 return add0(new TaskHolder<>(this, true, task)); 596 } 597 598 /** 599 * {@link TaskChain#syncFirstCallback(AsyncExecutingFirstTask)} but ran on current thread the Chain was created on 600 * @param task The task to execute 601 * @param <R> Return type that the next parameter can expect as argument type 602 */ 603 @SuppressWarnings("WeakerAccess") 604 public <R> TaskChain<R> currentFirstCallback(AsyncExecutingFirstTask<R> task) { 605 //noinspection unchecked 606 return add0(new TaskHolder<>(this, null, task)); 607 } 608 609 /** 610 * Execute a task on the main thread, with the last output, and a callback to return the response to. 611 * 612 * It's important you don't perform blocking operations in this method. Only use this if 613 * the task will be scheduling a different sync operation outside of the TaskChains scope. 614 * 615 * Usually you could achieve the same design with a blocking API by switching to an async task 616 * for the next task and running it there. 617 * 618 * This method would primarily be for cases where you need to use an API that ONLY provides 619 * a callback style API. 620 * 621 * @param task The task to execute 622 * @param <R> Return type that the next parameter can expect as argument type 623 */ 624 @SuppressWarnings("WeakerAccess") 625 public <R> TaskChain<R> syncCallback(AsyncExecutingTask<R, T> task) { 626 //noinspection unchecked 627 return add0(new TaskHolder<>(this, false, task)); 628 } 629 630 /** 631 * {@link TaskChain#syncCallback(AsyncExecutingTask)}, ran on main thread but no input or output 632 * @param task The task to execute 633 */ 634 @SuppressWarnings("WeakerAccess") 635 public TaskChain<?> syncCallback(AsyncExecutingGenericTask task) { 636 return add0(new TaskHolder<>(this, false, task)); 637 } 638 639 /** 640 * {@link TaskChain#syncCallback(AsyncExecutingTask)} but ran off main thread 641 * @param task The task to execute 642 * @param <R> Return type that the next parameter can expect as argument type 643 */ 644 @SuppressWarnings("WeakerAccess") 645 public <R> TaskChain<R> asyncCallback(AsyncExecutingTask<R, T> task) { 646 //noinspection unchecked 647 return add0(new TaskHolder<>(this, true, task)); 648 } 649 650 /** 651 * {@link TaskChain#syncCallback(AsyncExecutingTask)} but ran off main thread 652 * @param task The task to execute 653 */ 654 @SuppressWarnings("WeakerAccess") 655 public TaskChain<?> asyncCallback(AsyncExecutingGenericTask task) { 656 return add0(new TaskHolder<>(this, true, task)); 657 } 658 659 /** 660 * {@link TaskChain#syncCallback(AsyncExecutingTask)} but ran on current thread the Chain was created on 661 * @param task The task to execute 662 * @param <R> Return type that the next parameter can expect as argument type 663 */ 664 @SuppressWarnings("WeakerAccess") 665 public <R> TaskChain<R> currentCallback(AsyncExecutingTask<R, T> task) { 666 //noinspection unchecked 667 return add0(new TaskHolder<>(this, null, task)); 668 } 669 670 /** 671 * {@link TaskChain#syncCallback(AsyncExecutingTask)} but ran on current thread the Chain was created on 672 * @param task The task to execute 673 */ 674 @SuppressWarnings("WeakerAccess") 675 public TaskChain<?> currentCallback(AsyncExecutingGenericTask task) { 676 return add0(new TaskHolder<>(this, null, task)); 677 } 678 679 // </editor-fold> 680 // <editor-fold desc="// API Methods - Future"> 681 /* ======================================================================================== */ 682 // Future Tasks 683 /* ======================================================================================== */ 684 685 /** 686 * Takes a supplied Future, and holds processing of the chain until the future completes. 687 * The value of the Future will be passed until the next task. 688 * 689 * @param future The Future to wait until it is complete on 690 * @param <R> Return type that the next parameter can expect as argument type 691 */ 692 @SuppressWarnings("WeakerAccess") 693 public <R> TaskChain<R> future(CompletableFuture<R> future) { 694 return currentFuture((input) -> future); 695 } 696 697 /** 698 * Takes multiple supplied Futures, and holds processing of the chain until the futures completes. 699 * The results of the Futures will be passed until the next task. 700 * 701 * @param futures The Futures to wait until it is complete on 702 * @param <R> Return type that the next parameter can expect as argument type 703 */ 704 @SafeVarargs 705 @SuppressWarnings("WeakerAccess") 706 public final <R> TaskChain<List<R>> futures(CompletableFuture<R>... futures) { 707 List<CompletableFuture<R>> futureList = new ArrayList<>(futures.length); 708 Collections.addAll(futureList, futures); 709 return futures(futureList); 710 } 711 712 /** 713 * Takes multiple supplied Futures, and holds processing of the chain until the futures completes. 714 * The results of the Futures will be passed until the next task. 715 * 716 * @param futures The Futures to wait until it is complete on 717 * @param <R> Return type that the next parameter can expect as argument type 718 */ 719 @SuppressWarnings("WeakerAccess") 720 public <R> TaskChain<List<R>> futures(List<CompletableFuture<R>> futures) { 721 return currentFuture((input) -> getFuture(futures)); 722 } 723 724 /** 725 * Executes a Task on the Main thread that provides a list of Futures, and holds processing 726 * of the chain until all of the futures completes. 727 * 728 * The response of every future will be passed to the next task as a List, in the order 729 * the futures were supplied. 730 * 731 * @param task The Futures Provider Task 732 * @param <R> Return type that the next parameter can expect as argument type 733 */ 734 @SuppressWarnings("WeakerAccess") 735 public <R> TaskChain<List<R>> syncFutures(Task<List<CompletableFuture<R>>, T> task) { 736 return syncFuture((input) -> getFuture(task.run(input))); 737 } 738 739 /** 740 * Executes a Task off the Main thread that provides a list of Futures, and holds processing 741 * of the chain until all of the futures completes. 742 * 743 * The response of every future will be passed to the next task as a List, in the order 744 * the futures were supplied. 745 * 746 * @param task The Futures Provider Task 747 * @param <R> Return type that the next parameter can expect as argument type 748 */ 749 @SuppressWarnings("WeakerAccess") 750 public <R> TaskChain<List<R>> asyncFutures(Task<List<CompletableFuture<R>>, T> task) { 751 return asyncFuture((input) -> getFuture(task.run(input))); 752 } 753 754 /** 755 * Executes a Task on the current thread that provides a list of Futures, and holds processing 756 * of the chain until all of the futures completes. 757 * 758 * The response of every future will be passed to the next task as a List, in the order 759 * the futures were supplied. 760 * 761 * @param task The Futures Provider Task 762 * @param <R> Return type that the next parameter can expect as argument type 763 */ 764 @SuppressWarnings("WeakerAccess") 765 public <R> TaskChain<List<R>> currentFutures(Task<List<CompletableFuture<R>>, T> task) { 766 return currentFuture((input) -> getFuture(task.run(input))); 767 } 768 769 /** 770 * Executes a Task on the Main thread that provides a list of Futures, and holds processing 771 * of the chain until all of the futures completes. 772 * 773 * The response of every future will be passed to the next task as a List, in the order 774 * the futures were supplied. 775 * 776 * @param task The Futures Provider Task 777 * @param <R> Return type that the next parameter can expect as argument type 778 */ 779 @SuppressWarnings("WeakerAccess") 780 public <R> TaskChain<List<R>> syncFirstFutures(FirstTask<List<CompletableFuture<R>>> task) { 781 return syncFuture((input) -> getFuture(task.run())); 782 } 783 784 /** 785 * Executes a Task off the Main thread that provides a list of Futures, and holds processing 786 * of the chain until all of the futures completes. 787 * 788 * The response of every future will be passed to the next task as a List, in the order 789 * the futures were supplied. 790 * 791 * @param task The Futures Provider Task 792 * @param <R> Return type that the next parameter can expect as argument type 793 */ 794 @SuppressWarnings("WeakerAccess") 795 public <R> TaskChain<List<R>> asyncFirstFutures(FirstTask<List<CompletableFuture<R>>> task) { 796 return asyncFuture((input) -> getFuture(task.run())); 797 } 798 799 /** 800 * Executes a Task on the current thread that provides a list of Futures, and holds processing 801 * of the chain until all of the futures completes. 802 * 803 * The response of every future will be passed to the next task as a List, in the order 804 * the futures were supplied. 805 * 806 * @param task The Futures Provider Task 807 * @param <R> Return type that the next parameter can expect as argument type 808 */ 809 @SuppressWarnings("WeakerAccess") 810 public <R> TaskChain<List<R>> currentFirstFutures(FirstTask<List<CompletableFuture<R>>> task) { 811 return currentFuture((input) -> getFuture(task.run())); 812 } 813 814 /** 815 * Execute a task on the main thread, with no previous input, that will return a Future to signal completion 816 * 817 * It's important you don't perform blocking operations in this method. Only use this if 818 * the task will be scheduling a different async operation outside of the TaskChains scope. 819 * 820 * 821 * @param task The task to execute 822 * @param <R> Return type that the next parameter can expect as argument type 823 */ 824 @SuppressWarnings("WeakerAccess") 825 public <R> TaskChain<R> syncFirstFuture(FutureFirstTask<R> task) { 826 //noinspection unchecked 827 return add0(new TaskHolder<>(this, false, task)); 828 } 829 830 /** 831 * {@link TaskChain#syncFirstFuture(FutureFirstTask)} but ran off main thread 832 * @param task The task to execute 833 * @param <R> Return type that the next parameter can expect as argument type 834 */ 835 @SuppressWarnings("WeakerAccess") 836 public <R> TaskChain<R> asyncFirstFuture(FutureFirstTask<R> task) { 837 //noinspection unchecked 838 return add0(new TaskHolder<>(this, true, task)); 839 } 840 841 /** 842 * {@link TaskChain#syncFirstFuture(FutureFirstTask)} but ran on current thread the Chain was created on 843 * @param task The task to execute 844 * @param <R> Return type that the next parameter can expect as argument type 845 */ 846 @SuppressWarnings("WeakerAccess") 847 public <R> TaskChain<R> currentFirstFuture(FutureFirstTask<R> task) { 848 //noinspection unchecked 849 return add0(new TaskHolder<>(this, null, task)); 850 } 851 852 /** 853 * Execute a task on the main thread, with the last output as the input to the future provider, 854 * that will return a Future to signal completion. 855 * 856 * It's important you don't perform blocking operations in this method. Only use this if 857 * the task will be scheduling a different async operation outside of the TaskChains scope. 858 * 859 * @param task The task to execute 860 * @param <R> Return type that the next parameter can expect as argument type 861 */ 862 @SuppressWarnings("WeakerAccess") 863 public <R> TaskChain<R> syncFuture(FutureTask<R, T> task) { 864 //noinspection unchecked 865 return add0(new TaskHolder<>(this, false, task)); 866 } 867 868 /** 869 * {@link TaskChain#syncFuture(FutureTask)}, ran on main thread but no input or output 870 * @param task The task to execute 871 */ 872 @SuppressWarnings("WeakerAccess") 873 public TaskChain<?> syncFuture(FutureGenericTask task) { 874 return add0(new TaskHolder<>(this, false, task)); 875 } 876 877 /** 878 * {@link TaskChain#syncFuture(FutureTask)} but the future provider is ran off main thread 879 * @param task The task to execute 880 * @param <R> Return type that the next parameter can expect as argument type 881 */ 882 @SuppressWarnings("WeakerAccess") 883 public <R> TaskChain<R> asyncFuture(FutureTask<R, T> task) { 884 //noinspection unchecked 885 return add0(new TaskHolder<>(this, true, task)); 886 } 887 888 /** 889 * {@link TaskChain#syncFuture(FutureTask)} but the future provider is ran off main thread 890 * @param task The task to execute 891 */ 892 @SuppressWarnings("WeakerAccess") 893 public TaskChain<?> asyncFuture(FutureGenericTask task) { 894 return add0(new TaskHolder<>(this, true, task)); 895 } 896 897 /** 898 * {@link TaskChain#syncFuture(FutureTask)} but the future provider is ran on current thread the Chain was created on 899 * @param task The task to execute 900 * @param <R> Return type that the next parameter can expect as argument type 901 */ 902 @SuppressWarnings("WeakerAccess") 903 public <R> TaskChain<R> currentFuture(FutureTask<R, T> task) { 904 //noinspection unchecked 905 return add0(new TaskHolder<>(this, null, task)); 906 } 907 908 /** 909 * {@link TaskChain#syncFuture(FutureTask)} but the future provider is ran on current thread the Chain was created on 910 * @param task The task to execute 911 */ 912 @SuppressWarnings("WeakerAccess") 913 public TaskChain<?> currentFuture(FutureGenericTask task) { 914 return add0(new TaskHolder<>(this, null, task)); 915 } 916 917 // </editor-fold> 918 // <editor-fold desc="// API Methods - Normal"> 919 /* ======================================================================================== */ 920 // Normal Tasks 921 /* ======================================================================================== */ 922 923 /** 924 * Execute task on main thread, with no input, returning an output 925 * @param task The task to execute 926 * @param <R> Return type that the next parameter can expect as argument type 927 */ 928 @SuppressWarnings("WeakerAccess") 929 public <R> TaskChain<R> syncFirst(FirstTask<R> task) { 930 //noinspection unchecked 931 return add0(new TaskHolder<>(this, false, task)); 932 } 933 934 /** 935 * {@link TaskChain#syncFirst(FirstTask)} but ran off main thread 936 * @param task The task to execute 937 * @param <R> Return type that the next parameter can expect as argument type 938 */ 939 @SuppressWarnings("WeakerAccess") 940 public <R> TaskChain<R> asyncFirst(FirstTask<R> task) { 941 //noinspection unchecked 942 return add0(new TaskHolder<>(this, true, task)); 943 } 944 945 /** 946 * {@link TaskChain#syncFirst(FirstTask)} but ran on current thread the Chain was created on 947 * @param task The task to execute 948 * @param <R> Return type that the next parameter can expect as argument type 949 */ 950 @SuppressWarnings("WeakerAccess") 951 public <R> TaskChain<R> currentFirst(FirstTask<R> task) { 952 //noinspection unchecked 953 return add0(new TaskHolder<>(this, null, task)); 954 } 955 956 /** 957 * Execute task on main thread, with the last returned input, returning an output 958 * @param task The task to execute 959 * @param <R> Return type that the next parameter can expect as argument type 960 */ 961 @SuppressWarnings("WeakerAccess") 962 public <R> TaskChain<R> sync(Task<R, T> task) { 963 //noinspection unchecked 964 return add0(new TaskHolder<>(this, false, task)); 965 } 966 967 /** 968 * Execute task on main thread, with no input or output 969 * @param task The task to execute 970 */ 971 @SuppressWarnings("WeakerAccess") 972 public TaskChain<?> sync(GenericTask task) { 973 return add0(new TaskHolder<>(this, false, task)); 974 } 975 976 /** 977 * {@link TaskChain#sync(Task)} but ran off main thread 978 * @param task The task to execute 979 * @param <R> Return type that the next parameter can expect as argument type 980 */ 981 @SuppressWarnings("WeakerAccess") 982 public <R> TaskChain<R> async(Task<R, T> task) { 983 //noinspection unchecked 984 return add0(new TaskHolder<>(this, true, task)); 985 } 986 987 /** 988 * {@link TaskChain#sync(GenericTask)} but ran off main thread 989 * @param task The task to execute 990 */ 991 @SuppressWarnings("WeakerAccess") 992 public TaskChain<?> async(GenericTask task) { 993 return add0(new TaskHolder<>(this, true, task)); 994 } 995 996 /** 997 * {@link TaskChain#sync(Task)} but ran on current thread the Chain was created on 998 * @param task The task to execute 999 * @param <R> Return type that the next parameter can expect as argument type 1000 */ 1001 @SuppressWarnings("WeakerAccess") 1002 public <R> TaskChain<R> current(Task<R, T> task) { 1003 //noinspection unchecked 1004 return add0(new TaskHolder<>(this, null, task)); 1005 } 1006 1007 /** 1008 * {@link TaskChain#sync(GenericTask)} but ran on current thread the Chain was created on 1009 * @param task The task to execute 1010 */ 1011 @SuppressWarnings("WeakerAccess") 1012 public TaskChain<?> current(GenericTask task) { 1013 return add0(new TaskHolder<>(this, null, task)); 1014 } 1015 1016 1017 /** 1018 * Execute task on main thread, with the last output, and no furthur output 1019 * @param task The task to execute 1020 */ 1021 @SuppressWarnings("WeakerAccess") 1022 public TaskChain<?> syncLast(LastTask<T> task) { 1023 return add0(new TaskHolder<>(this, false, task)); 1024 } 1025 1026 /** 1027 * {@link TaskChain#syncLast(LastTask)} but ran off main thread 1028 * @param task The task to execute 1029 */ 1030 @SuppressWarnings("WeakerAccess") 1031 public TaskChain<?> asyncLast(LastTask<T> task) { 1032 return add0(new TaskHolder<>(this, true, task)); 1033 } 1034 1035 /** 1036 * {@link TaskChain#syncLast(LastTask)} but ran on current thread the Chain was created on 1037 * @param task The task to execute 1038 */ 1039 @SuppressWarnings("WeakerAccess") 1040 public TaskChain<?> currentLast(LastTask<T> task) { 1041 return add0(new TaskHolder<>(this, null, task)); 1042 } 1043 1044 /** 1045 * Finished adding tasks, begins executing them. 1046 */ 1047 @SuppressWarnings("WeakerAccess") 1048 public void execute() { 1049 execute((Consumer<Boolean>) null, null); 1050 } 1051 1052 /** 1053 * Finished adding tasks, begins executing them with a done notifier 1054 * @param done The Callback to handle when the chain has finished completion. Argument to consumer contains finish state 1055 */ 1056 @SuppressWarnings("WeakerAccess") 1057 public void execute(Runnable done) { 1058 execute((finished) -> done.run(), null); 1059 } 1060 1061 /** 1062 * Finished adding tasks, begins executing them with a done notifier and error handler 1063 * @param done The Callback to handle when the chain has finished completion. Argument to consumer contains finish state 1064 * @param errorHandler The Error handler to handle exceptions 1065 */ 1066 @SuppressWarnings("WeakerAccess") 1067 public void execute(Runnable done, BiConsumer<Exception, Task<?, ?>> errorHandler) { 1068 execute((finished) -> done.run(), errorHandler); 1069 } 1070 1071 /** 1072 * Finished adding tasks, with a done notifier 1073 * @param done The Callback to handle when the chain has finished completion. Argument to consumer contains finish state 1074 */ 1075 @SuppressWarnings("WeakerAccess") 1076 public void execute(Consumer<Boolean> done) { 1077 execute(done, null); 1078 } 1079 1080 /** 1081 * Finished adding tasks, begins executing them, with an error handler 1082 * @param errorHandler The Error handler to handle exceptions 1083 */ 1084 public void execute(BiConsumer<Exception, Task<?, ?>> errorHandler) { 1085 execute((Consumer<Boolean>) null, errorHandler); 1086 } 1087 1088 /** 1089 * Finished adding tasks, begins executing them with a done notifier and error handler 1090 * @param done The Callback to handle when the chain has finished completion. Argument to consumer contains finish state 1091 * @param errorHandler The Error handler to handle exceptions 1092 */ 1093 public void execute(Consumer<Boolean> done, BiConsumer<Exception, Task<?, ?>> errorHandler) { 1094 if (errorHandler == null) { 1095 errorHandler = factory.getDefaultErrorHandler(); 1096 } 1097 this.doneCallback = done; 1098 this.errorHandler = errorHandler; 1099 execute0(); 1100 } 1101 1102 // </editor-fold> 1103 /* ======================================================================================== */ 1104 // <editor-fold desc="// Implementation Details"> 1105 private <A1, A2, A3> void handleAbortAction(TaskChainAbortAction<A1, A2, A3> action, A1 arg1, A2 arg2, A3 arg3) { 1106 if (action != null) { 1107 final TaskChain<?> prev = currentChain.get(); 1108 try { 1109 currentChain.set(this); 1110 action.onAbort(this, arg1, arg2, arg3); 1111 } catch (Exception e) { 1112 TaskChainUtil.logError("TaskChain Exception in Abort Action handler: " + action.getClass().getName()); 1113 TaskChainUtil.logError("Current Action Index was: " + currentActionIndex); 1114 e.printStackTrace(); 1115 } finally { 1116 currentChain.set(prev); 1117 } 1118 } 1119 abort(); 1120 } 1121 1122 void execute0() { 1123 synchronized (this) { 1124 if (this.executed) { 1125 throw new RuntimeException("Already executed"); 1126 } 1127 this.executed = true; 1128 } 1129 async = !impl.isMainThread(); 1130 nextTask(); 1131 } 1132 1133 void done(boolean finished) { 1134 this.done = true; 1135 if (this.doneCallback != null) { 1136 final TaskChain<?> prev = currentChain.get(); 1137 try { 1138 currentChain.set(this); 1139 this.doneCallback.accept(finished); 1140 } catch (Exception e) { 1141 this.handleError(e, null); 1142 } finally { 1143 currentChain.set(prev); 1144 } 1145 } 1146 } 1147 1148 @SuppressWarnings({"rawtypes", "WeakerAccess"}) 1149 protected TaskChain add0(TaskHolder<?,?> task) { 1150 synchronized (this) { 1151 if (this.executed) { 1152 throw new RuntimeException("TaskChain is executing"); 1153 } 1154 } 1155 1156 this.chainQueue.add(task); 1157 return this; 1158 } 1159 1160 /** 1161 * Fires off the next task, and switches between Async/Sync as necessary. 1162 */ 1163 private void nextTask() { 1164 synchronized (this) { 1165 this.currentHolder = this.chainQueue.poll(); 1166 if (this.currentHolder == null) { 1167 this.done = true; // to ensure its done while synchronized 1168 } 1169 } 1170 1171 if (this.currentHolder == null) { 1172 this.previous = null; 1173 // All Done! 1174 this.done(true); 1175 return; 1176 } 1177 1178 Boolean isNextAsync = this.currentHolder.async; 1179 if (isNextAsync == null || factory.shutdown) { 1180 this.currentHolder.run(); 1181 } else if (isNextAsync) { 1182 if (this.async) { 1183 this.currentHolder.run(); 1184 } else { 1185 impl.postAsync(() -> { 1186 this.async = true; 1187 this.currentHolder.run(); 1188 }); 1189 } 1190 } else { 1191 if (this.async) { 1192 impl.postToMain(() -> { 1193 this.async = false; 1194 this.currentHolder.run(); 1195 }); 1196 } else { 1197 this.currentHolder.run(); 1198 } 1199 } 1200 } 1201 1202 private void handleError(Throwable throwable, Task<?, ?> task) { 1203 Exception e = throwable instanceof Exception ? (Exception) throwable : new Exception(throwable); 1204 if (errorHandler != null) { 1205 final TaskChain<?> prev = currentChain.get(); 1206 try { 1207 currentChain.set(this); 1208 errorHandler.accept(e, task); 1209 } catch (Exception e2) { 1210 TaskChainUtil.logError("TaskChain Exception in the error handler!" + e2.getMessage()); 1211 TaskChainUtil.logError("Current Action Index was: " + currentActionIndex); 1212 e.printStackTrace(); 1213 } finally { 1214 currentChain.set(prev); 1215 } 1216 } else { 1217 TaskChainUtil.logError("TaskChain Exception on " + (task != null ? task.getClass().getName() : "Done Hander") + ": " + e.getMessage()); 1218 TaskChainUtil.logError("Current Action Index was: " + currentActionIndex); 1219 e.printStackTrace(); 1220 } 1221 } 1222 1223 private void abortExecutingChain() { 1224 this.previous = null; 1225 this.chainQueue.clear(); 1226 this.done(false); 1227 } 1228 1229 private <R> CompletableFuture<List<R>> getFuture(List<CompletableFuture<R>> futures) { 1230 CompletableFuture<List<R>> onDone = new CompletableFuture<>(); 1231 CompletableFuture<?>[] futureArray = new CompletableFuture<?>[futures.size()]; 1232 CompletableFuture.allOf((CompletableFuture<?>[]) futures.toArray(futureArray)).whenComplete((aVoid, throwable) -> { 1233 if (throwable != null) { 1234 onDone.completeExceptionally(throwable); 1235 } else { 1236 boolean[] error = {false}; 1237 final List<R> results = futures.stream().map(f -> { 1238 try { 1239 return f.join(); 1240 } catch (Exception e) { 1241 error[0] = true; 1242 TaskChain.this.handleError(e, TaskChain.this.currentHolder.task); 1243 return null; 1244 } 1245 }).collect(Collectors.toList()); 1246 if (error[0]) { 1247 onDone.completeExceptionally(new Exception("Future Dependant had an exception")); 1248 } else { 1249 onDone.complete(results); 1250 } 1251 } 1252 }); 1253 return onDone; 1254 } 1255 1256 // </editor-fold> 1257 /* ======================================================================================== */ 1258 // <editor-fold desc="// TaskHolder"> 1259 /** 1260 * Provides foundation of a task with what the previous task type should return 1261 * to pass to this and what this task will return. 1262 * @param <R> Return Type 1263 * @param <A> Argument Type Expected 1264 */ 1265 @SuppressWarnings("AccessingNonPublicFieldOfAnotherObject") 1266 private class TaskHolder<R, A> { 1267 private final TaskChain<?> chain; 1268 private final Task<R, A> task; 1269 final Boolean async; 1270 1271 private boolean executed = false; 1272 private boolean aborted = false; 1273 private final int actionIndex; 1274 1275 private TaskHolder(TaskChain<?> chain, Boolean async, Task<R, A> task) { 1276 this.actionIndex = TaskChain.this.actionIndex++; 1277 this.task = task; 1278 this.chain = chain; 1279 this.async = async; 1280 } 1281 1282 /** 1283 * Called internally by Task Chain to facilitate executing the task and then the next task. 1284 */ 1285 private void run() { 1286 final Object arg = this.chain.previous; 1287 this.chain.previous = null; 1288 TaskChain.this.currentActionIndex = this.actionIndex; 1289 final R res; 1290 final TaskChain<?> prevChain = currentChain.get(); 1291 try { 1292 currentChain.set(this.chain); 1293 if (this.task instanceof FutureTask) { 1294 //noinspection unchecked 1295 final CompletableFuture<R> future = ((FutureTask<R, A>) this.task).runFuture((A) arg); 1296 if (future == null) { 1297 throw new NullPointerException("Must return a Future"); 1298 } 1299 future.whenComplete((r, throwable) -> { 1300 if (throwable != null) { 1301 this.chain.handleError(throwable, this.task); 1302 this.abort(); 1303 } else { 1304 this.next(r); 1305 } 1306 }); 1307 } else if (this.task instanceof AsyncExecutingTask) { 1308 //noinspection unchecked 1309 ((AsyncExecutingTask<R, A>) this.task).runAsync((A) arg, this::next); 1310 } else { 1311 //noinspection unchecked 1312 next(this.task.run((A) arg)); 1313 } 1314 } catch (Throwable e) { 1315 //noinspection ConstantConditions 1316 if (e instanceof AbortChainException) { 1317 this.abort(); 1318 return; 1319 } 1320 this.chain.handleError(e, this.task); 1321 this.abort(); 1322 } finally { 1323 if (prevChain != null) { 1324 currentChain.set(prevChain); 1325 } else { 1326 currentChain.remove(); 1327 } 1328 } 1329 } 1330 1331 /** 1332 * Abort the chain, and clear tasks for GC. 1333 */ 1334 private synchronized void abort() { 1335 this.aborted = true; 1336 this.chain.abortExecutingChain(); 1337 } 1338 1339 /** 1340 * Accepts result of previous task and executes the next 1341 */ 1342 private void next(Object resp) { 1343 synchronized (this) { 1344 if (this.aborted) { 1345 this.chain.done(false); 1346 return; 1347 } 1348 if (this.executed) { 1349 this.chain.done(false); 1350 throw new RuntimeException("This task has already been executed."); 1351 } 1352 this.executed = true; 1353 } 1354 1355 this.chain.async = !TaskChain.this.impl.isMainThread(); // We don't know where the task called this from. 1356 this.chain.previous = resp; 1357 this.chain.nextTask(); 1358 } 1359 } 1360 // </editor-fold> 1361}