001/*
002 * Copyright (c) 2016-2017 Daniel Ennis (Aikar) - MIT License
003 *
004 *  Permission is hereby granted, free of charge, to any person obtaining
005 *  a copy of this software and associated documentation files (the
006 *  "Software"), to deal in the Software without restriction, including
007 *  without limitation the rights to use, copy, modify, merge, publish,
008 *  distribute, sublicense, and/or sell copies of the Software, and to
009 *  permit persons to whom the Software is furnished to do so, subject to
010 *  the following conditions:
011 *
012 *  The above copyright notice and this permission notice shall be
013 *  included in all copies or substantial portions of the Software.
014 *
015 *  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
016 *  EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
017 *  MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
018 *  NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
019 *  LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
020 *  OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
021 *  WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
022 */
023
024/*
025 * TaskChain for Minecraft Plugins
026 *
027 * Written by Aikar <aikar@aikar.co>
028 * https://aikar.co
029 * https://starlis.com
030 *
031 * @license MIT
032 */
033
034package co.aikar.taskchain;
035
036import co.aikar.taskchain.TaskChainTasks.*;
037
038import java.util.ArrayList;
039import java.util.Collections;
040import java.util.HashMap;
041import java.util.List;
042import java.util.Map;
043import java.util.Objects;
044import java.util.concurrent.CompletableFuture;
045import java.util.concurrent.ConcurrentLinkedQueue;
046import java.util.concurrent.TimeUnit;
047import java.util.function.BiConsumer;
048import java.util.function.Consumer;
049import java.util.function.Predicate;
050import java.util.stream.Collectors;
051
052
053/**
054 * The Main API class of TaskChain. TaskChain's are created by a {@link TaskChainFactory}
055 */
056@SuppressWarnings({"unused", "FieldAccessedSynchronizedAndUnsynchronized"})
057public class TaskChain <T> {
058    private static final ThreadLocal<TaskChain<?>> currentChain = new ThreadLocal<>();
059
060    private final GameInterface impl;
061    private final TaskChainFactory factory;
062    private final Map<String, Object> taskMap = new HashMap<>(0);
063    private final ConcurrentLinkedQueue<TaskHolder<?,?>> chainQueue = new ConcurrentLinkedQueue<>();
064
065    private int currentActionIndex = 0;
066    private int actionIndex = 0;
067    private boolean executed = false;
068    private boolean async = false;
069    private boolean done = false;
070
071    private Object previous;
072    private TaskHolder<?, ?> currentHolder;
073    private Consumer<Boolean> doneCallback;
074    private BiConsumer<Exception, Task<?, ?>> errorHandler;
075
076    /* ======================================================================================== */
077    TaskChain(TaskChainFactory factory) {
078        this.factory = factory;
079        this.impl = factory.getImplementation();
080    }
081    /* ======================================================================================== */
082    // <editor-fold desc="// API Methods - Getters & Setters">
083    /**
084     * Called in an executing task, get the current action index.
085     * For every action that adds a task to the chain, the action index is increased.
086     *
087     * Useful in error or done handlers to know where you are in the chain when it aborted or threw exception.
088     * @return The current index
089     */
090    public int getCurrentActionIndex() {
091        return currentActionIndex;
092    }
093
094    /**
095     * Changes the done callback handler for this chain
096     * @param doneCallback The handler
097     */
098    @SuppressWarnings("WeakerAccess")
099    public void setDoneCallback(Consumer<Boolean> doneCallback) {
100        this.doneCallback = doneCallback;
101    }
102
103    /**
104     * @return The current error handler or null
105     */
106    public BiConsumer<Exception, Task<?, ?>> getErrorHandler() {
107        return errorHandler;
108    }
109
110    /**
111     * Changes the error handler for this chain
112     * @param errorHandler The error handler
113     */
114    @SuppressWarnings("WeakerAccess")
115    public void setErrorHandler(BiConsumer<Exception, Task<?, ?>> errorHandler) {
116        this.errorHandler = errorHandler;
117    }
118    // </editor-fold>
119    /* ======================================================================================== */
120    // <editor-fold desc="// API Methods - Data Wrappers">
121
122    /**
123     * Creates a data wrapper to return multiple objects from a task
124     */
125    public static <D1, D2> TaskChainDataWrappers.Data2<D1, D2> multi(D1 var1, D2 var2) {
126        return new TaskChainDataWrappers.Data2<>(var1, var2);
127    }
128
129    /**
130     * Creates a data wrapper to return multiple objects from a task
131     */
132    public static <D1, D2, D3> TaskChainDataWrappers.Data3<D1, D2, D3> multi(D1 var1, D2 var2, D3 var3) {
133        return new TaskChainDataWrappers.Data3<>(var1, var2, var3);
134    }
135
136    /**
137     * Creates a data wrapper to return multiple objects from a task
138     */
139    public static <D1, D2, D3, D4> TaskChainDataWrappers.Data4<D1, D2, D3, D4> multi(D1 var1, D2 var2, D3 var3, D4 var4) {
140        return new TaskChainDataWrappers.Data4<>(var1, var2, var3, var4);
141    }
142
143    /**
144     * Creates a data wrapper to return multiple objects from a task
145     */
146    public static <D1, D2, D3, D4, D5> TaskChainDataWrappers.Data5<D1, D2, D3, D4, D5> multi(D1 var1, D2 var2, D3 var3, D4 var4, D5 var5) {
147        return new TaskChainDataWrappers.Data5<>(var1, var2, var3, var4, var5);
148    }
149
150    /**
151     * Creates a data wrapper to return multiple objects from a task
152     */
153    public static <D1, D2, D3, D4, D5, D6> TaskChainDataWrappers.Data6<D1, D2, D3, D4, D5, D6> multi(D1 var1, D2 var2, D3 var3, D4 var4, D5 var5, D6 var6) {
154        return new TaskChainDataWrappers.Data6<>(var1, var2, var3, var4, var5, var6);
155    }
156    // </editor-fold>
157    /* ======================================================================================== */
158
159    // <editor-fold desc="// API Methods - Base">
160    /**
161     * Call to abort execution of the chain. Should be called inside of an executing task.
162     */
163    @SuppressWarnings("WeakerAccess")
164    public static void abort() {
165        TaskChainUtil.sneakyThrows(new AbortChainException());
166    }
167
168    /**
169     * Usable only inside of an executing Task or Chain Error/Done handlers
170     *
171     * Gets the current chain that is executing this Task or Error/Done handler
172     * This method should only be called on the same thread that is executing the method.
173     *
174     * In an AsyncExecutingTask or a FutureTask, You must call this method BEFORE passing control to another thread.
175     */
176    @SuppressWarnings("WeakerAccess")
177    public static TaskChain<?> getCurrentChain() {
178        return currentChain.get();
179    }
180
181    /* ======================================================================================== */
182
183    /**
184     * Allows you to call a callback to insert tasks into the chain without having to break the fluent interface
185     *
186     * Example: Plugin.newChain().sync(some::task).configure(chain -> {
187     *     chain.async(some::foo);
188     *     chain.sync(other::bar);
189     * }).async(other::task).execute();
190     *
191     * @param configure Instance of the current chain.
192     * @return The same chain
193     */
194    public TaskChain<T> configure(Consumer<TaskChain<T>> configure) {
195        configure.accept(this);
196        return this;
197    }
198
199    /**
200     * Checks if the chain has a value saved for the specified key.
201     * @param key Key to check if Task Data has a value for
202     */
203    @SuppressWarnings("WeakerAccess")
204    public boolean hasTaskData(String key) {
205        return taskMap.containsKey(key);
206    }
207
208    /**
209     * Retrieves a value relating to a specific key, saved by a previous task.
210     *
211     * @param key Key to look up Task Data for
212     * @param <R> Type the Task Data value is expected to be
213     */
214    @SuppressWarnings("WeakerAccess")
215    public <R> R getTaskData(String key) {
216        //noinspection unchecked
217        return (R) taskMap.get(key);
218    }
219
220    /**
221     * Saves a value for this chain so that a task furthur up the chain can access it.
222     *
223     * Useful for passing multiple values to the next (or furthur) tasks.
224     *
225     * @param key Key to store in Task Data
226     * @param val Value to store in Task Data
227     * @param <R> Type the Task Data value is expected to be
228     */
229    @SuppressWarnings("WeakerAccess")
230    public <R> R setTaskData(String key, Object val) {
231        //noinspection unchecked
232        return (R) taskMap.put(key, val);
233    }
234
235    /**
236     * Removes a saved value on the chain.
237     *
238     * @param key Key to remove from Task Data
239     * @param <R> Type the Task Data value is expected to be
240     */
241    @SuppressWarnings("WeakerAccess")
242    public <R> R removeTaskData(String key) {
243        //noinspection unchecked
244        return (R) taskMap.remove(key);
245    }
246
247    /**
248     * Takes the previous tasks return value, stores it to the specified key
249     * as Task Data, and then forwards that value to the next task.
250     *
251     * @param key Key to store the previous return value into Task Data
252     */
253    @SuppressWarnings("WeakerAccess")
254    public TaskChain<T> storeAsData(String key) {
255        return current((val) -> {
256            setTaskData(key, val);
257            return val;
258        });
259    }
260
261    /**
262     * Reads the specified key from Task Data, and passes it to the next task.
263     *
264     * Will need to pass expected type such as chain.&lt;Foo&gt;returnData("key")
265     *
266     * @param key Key to retrieve from Task Data and pass to next task
267     * @param <R> Return type that the next parameter can expect as argument type
268     */
269    @SuppressWarnings("WeakerAccess")
270    public <R> TaskChain<R> returnData(String key) {
271        //noinspection unchecked
272        return currentFirst(() -> (R) getTaskData(key));
273    }
274
275    /**
276     * Returns the chain itself to the next task.
277     */
278    @SuppressWarnings("WeakerAccess")
279    public TaskChain<TaskChain<?>> returnChain() {
280        return currentFirst(() -> this);
281    }
282
283
284    /**
285     * IMPLEMENTATION SPECIFIC!!
286     * Consult your application implementation to understand how long 1 unit is.
287     *
288     * For example, in Minecraft it is a tick, which is roughly 50 milliseconds, but not guaranteed.
289     *
290     * Adds a delay to the chain execution.
291     *
292     * @param gameUnits # of game units to delay before next task
293     */
294    @SuppressWarnings("WeakerAccess")
295    public TaskChain<T> delay(final int gameUnits) {
296        //noinspection CodeBlock2Expr
297        return currentCallback((input, next) -> {
298            impl.scheduleTask(gameUnits, () -> next.accept(input));
299        });
300    }
301
302    /**
303     * Adds a real time delay to the chain execution.
304     * Chain will abort if the delay is interrupted.
305     *
306     * @param duration duration of the delay before next task
307     */
308    @SuppressWarnings("WeakerAccess")
309    public TaskChain<T> delay(final int duration, TimeUnit unit) {
310        //noinspection CodeBlock2Expr
311        return currentCallback((input, next) -> {
312            impl.scheduleTask(duration, unit, () -> next.accept(input));
313        });
314    }
315
316    // </editor-fold>
317    // <editor-fold desc="// API Methods - Abort">
318
319    /**
320     * Aborts the chain once this step is reached. This is primarily useful when you are
321     * dynamically building the chains steps (such as .configure()) and want to conditionally stop the
322     * chain from proceeding.
323     *
324     * @return Chain
325     */
326    public TaskChain<?> abortChain() {
327        if (executed) {
328            TaskChain.abort();
329            return this;
330        } else {
331            return current(TaskChain::abort);
332        }
333    }
334
335    /**
336     * Checks if the previous task return was null.
337     *
338     * If not null, the previous task return will forward to the next task.
339     */
340    @SuppressWarnings("WeakerAccess")
341    public TaskChain<T> abortIfNull() {
342        return abortIfNull(null, null, null, null);
343    }
344
345    /**
346     * {@link TaskChain#abortIf(Object, TaskChainAbortAction, Object, Object, Object)}
347     */
348    @SuppressWarnings("WeakerAccess")
349    public TaskChain<T> abortIfNull(TaskChainAbortAction<?, ?, ?> action) {
350        return abortIf(Predicate.isEqual(null), action, null, null, null);
351    }
352
353    /**
354     * {@link TaskChain#abortIf(Object, TaskChainAbortAction, Object, Object, Object)}
355     */
356    @SuppressWarnings("WeakerAccess")
357    public <A1> TaskChain<T> abortIfNull(TaskChainAbortAction<A1, ?, ?> action, A1 arg1) {
358        //noinspection unchecked
359        return abortIf(Predicate.isEqual(null), action, arg1, null, null);
360    }
361
362    /**
363     * {@link TaskChain#abortIf(Object, TaskChainAbortAction, Object, Object, Object)}
364     */
365    @SuppressWarnings("WeakerAccess")
366    public <A1, A2> TaskChain<T> abortIfNull(TaskChainAbortAction<A1, A2, ?> action, A1 arg1, A2 arg2) {
367        //noinspection unchecked
368        return abortIf(Predicate.isEqual(null), action, arg1, arg2, null);
369    }
370
371    /**
372     * Checks if the previous task return was null, and aborts if it was
373     * Then executes supplied action handler
374     *
375     * If not null, the previous task return will forward to the next task.
376     */
377    @SuppressWarnings("WeakerAccess")
378    public <A1, A2, A3> TaskChain<T> abortIfNull(TaskChainAbortAction<A1, A2, A3> action, A1 arg1, A2 arg2, A3 arg3) {
379        //noinspection unchecked
380        return abortIf(Predicate.isEqual(null), action, arg1, arg2, arg3);
381    }
382
383    /**
384     * Checks if the previous task return is the supplied value.
385     *
386     * If not, the previous task return will forward to the next task.
387     */
388    @SuppressWarnings("WeakerAccess")
389    public TaskChain<T> abortIf(T ifObj) {
390        return abortIf(ifObj, null, null, null, null);
391    }
392
393    /**
394     * {@link TaskChain#abortIf(Object, TaskChainAbortAction, Object, Object, Object)}
395     */
396    @SuppressWarnings("WeakerAccess")
397    public TaskChain<T> abortIf(T ifObj, TaskChainAbortAction<?, ?, ?> action) {
398        return abortIf(ifObj, action, null, null, null);
399    }
400
401    /**
402     * {@link TaskChain#abortIf(Object, TaskChainAbortAction, Object, Object, Object)}
403     */
404    @SuppressWarnings("WeakerAccess")
405    public <A1> TaskChain<T> abortIf(T ifObj, TaskChainAbortAction<A1, ?, ?> action, A1 arg1) {
406        return abortIf(ifObj, action, arg1, null, null);
407    }
408
409    /**
410     * {@link TaskChain#abortIf(Object, TaskChainAbortAction, Object, Object, Object)}
411     */
412    @SuppressWarnings("WeakerAccess")
413    public <A1, A2> TaskChain<T> abortIf(T ifObj, TaskChainAbortAction<A1, A2, ?> action, A1 arg1, A2 arg2) {
414        return abortIf(ifObj, action, arg1, arg2, null);
415    }
416
417    /**
418     * {@link TaskChain#abortIf(Predicate, TaskChainAbortAction, Object, Object, Object)}
419     */
420    @SuppressWarnings("WeakerAccess")
421    public <A1, A2, A3> TaskChain<T> abortIf(T ifObj, TaskChainAbortAction<A1, A2, A3> action, A1 arg1, A2 arg2, A3 arg3) {
422        return abortIf(Predicate.isEqual(ifObj), action, arg1, arg2, arg3);
423    }
424    
425    /**
426     * Checks if the previous task return matches the supplied predicate, and aborts if it was.
427     * 
428     * If predicate does not match, the previous task return will forward to the next task.
429     */
430    @SuppressWarnings("WeakerAccess")
431    public TaskChain<T> abortIf(Predicate<T> predicate) {
432        return abortIf(predicate, null, null, null, null);
433    }
434
435    /**
436     * {@link TaskChain#abortIf(Object, TaskChainAbortAction, Object, Object, Object)}
437     */
438    @SuppressWarnings("WeakerAccess")
439    public TaskChain<T> abortIf(Predicate<T> predicate, TaskChainAbortAction<?, ?, ?> action) {
440        return abortIf(predicate, action, null, null, null);
441    }
442
443    /**
444     * {@link TaskChain#abortIf(Object, TaskChainAbortAction, Object, Object, Object)}
445     */
446    @SuppressWarnings("WeakerAccess")
447    public <A1> TaskChain<T> abortIf(Predicate<T> predicate, TaskChainAbortAction<A1, ?, ?> action, A1 arg1) {
448        return abortIf(predicate, action, arg1, null, null);
449    }
450
451    /**
452     * {@link TaskChain#abortIf(Object, TaskChainAbortAction, Object, Object, Object)}
453     */
454    @SuppressWarnings("WeakerAccess")
455    public <A1, A2> TaskChain<T> abortIf(Predicate<T> predicate, TaskChainAbortAction<A1, A2, ?> action, A1 arg1, A2 arg2) {
456        return abortIf(predicate, action, arg1, arg2, null);
457    }
458
459    /**
460     * Checks if the previous task return matches the supplied predicate, and aborts if it was.
461     * Then executes supplied action handler
462     * 
463     * If predicate does not match, the previous task return will forward to the next task.
464     */
465    public <A1, A2, A3> TaskChain<T> abortIf(Predicate<T> predicate, TaskChainAbortAction<A1, A2, A3> action, A1 arg1, A2 arg2, A3 arg3) {
466        return current((obj) -> {
467           if (predicate.test(obj)) {
468               handleAbortAction(action, arg1, arg2, arg3);
469               return null;
470           }
471           return obj;
472        });
473    }
474
475    /**
476     * {@link TaskChain#abortIfNot(Object, TaskChainAbortAction, Object, Object, Object)}
477     */
478    @SuppressWarnings("WeakerAccess")
479    public TaskChain<T> abortIfNot(T ifNotObj) {
480        return abortIfNot(ifNotObj, null, null, null, null);
481    }
482
483    /**
484     * {@link TaskChain#abortIfNot(Object, TaskChainAbortAction, Object, Object, Object)}
485     */
486    @SuppressWarnings("WeakerAccess")
487    public TaskChain<T> abortIfNot(T ifNotObj, TaskChainAbortAction<?, ?, ?> action) {
488        return abortIfNot(ifNotObj, action, null, null, null);
489    }
490
491    /**
492     * {@link TaskChain#abortIfNot(Object, TaskChainAbortAction, Object, Object, Object)}
493     */
494    @SuppressWarnings("WeakerAccess")
495    public <A1> TaskChain<T> abortIfNot(T ifNotObj, TaskChainAbortAction<A1, ?, ?> action, A1 arg1) {
496        return abortIfNot(ifNotObj, action, arg1, null, null);
497    }
498
499    /**
500     * {@link TaskChain#abortIfNot(Object, TaskChainAbortAction, Object, Object, Object)}
501     */
502    @SuppressWarnings("WeakerAccess")
503    public <A1, A2> TaskChain<T> abortIfNot(T ifNotObj, TaskChainAbortAction<A1, A2, ?> action, A1 arg1, A2 arg2) {
504        return abortIfNot(ifNotObj, action, arg1, arg2, null);
505    }
506
507    /**
508     * {@link TaskChain#abortIfNot(Predicate, TaskChainAbortAction, Object, Object, Object)}
509     */
510    @SuppressWarnings("WeakerAccess")
511    public <A1, A2, A3> TaskChain<T> abortIfNot(T ifNotObj, TaskChainAbortAction<A1, A2, A3> action, A1 arg1, A2 arg2, A3 arg3) {
512        return abortIfNot(Predicate.<T>isEqual(ifNotObj), action, arg1, arg2, arg3);
513    }
514    
515    /**
516     * Checks if the previous task return does NOT match the supplied predicate, and aborts if it does not match.
517     * 
518     * If predicate matches, the previous task return will forward to the next task.
519     */
520    @SuppressWarnings("WeakerAccess")
521    public TaskChain<T> abortIfNot(Predicate<T> ifNotPredicate) {
522        return abortIfNot(ifNotPredicate, null, null, null, null);
523    }
524
525    /**
526     * {@link TaskChain#abortIfNot(Object, TaskChainAbortAction, Object, Object, Object)}
527     */
528    @SuppressWarnings("WeakerAccess")
529    public TaskChain<T> abortIfNot(Predicate<T> ifNotPredicate, TaskChainAbortAction<?, ?, ?> action) {
530        return abortIfNot(ifNotPredicate, action, null, null, null);
531    }
532
533    /**
534     * {@link TaskChain#abortIfNot(Object, TaskChainAbortAction, Object, Object, Object)}
535     */
536    @SuppressWarnings("WeakerAccess")
537    public <A1> TaskChain<T> abortIfNot(Predicate<T> ifNotPredicate, TaskChainAbortAction<A1, ?, ?> action, A1 arg1) {
538        return abortIfNot(ifNotPredicate, action, arg1, null, null);
539    }
540
541    /**
542     * {@link TaskChain#abortIfNot(Object, TaskChainAbortAction, Object, Object, Object)}
543     */
544    @SuppressWarnings("WeakerAccess")
545    public <A1, A2> TaskChain<T> abortIfNot(Predicate<T> ifNotPredicate, TaskChainAbortAction<A1, A2, ?> action, A1 arg1, A2 arg2) {
546        return abortIfNot(ifNotPredicate, action, arg1, arg2, null);
547    }
548
549    /**
550     * Checks if the previous task return does NOT match the supplied predicate, and aborts if it does not match.
551     * Then executes supplied action handler
552     * 
553     * If predicate matches, the previous task return will forward to the next task.
554     */
555    @SuppressWarnings("WeakerAccess")
556    public <A1, A2, A3> TaskChain<T> abortIfNot(Predicate<T> ifNotPredicate, TaskChainAbortAction<A1, A2, A3> action, A1 arg1, A2 arg2, A3 arg3) {
557        return abortIf(ifNotPredicate.negate(), action, arg1, arg2, arg3);
558    }
559
560    // </editor-fold>
561    // <editor-fold desc="// API Methods - Async Executing">
562    /* ======================================================================================== */
563    // Async Executing Tasks
564    /* ======================================================================================== */
565
566    /**
567     * Execute a task on the main thread, with no previous input, and a callback to return the response to.
568     *
569     * It's important you don't perform blocking operations in this method. Only use this if
570     * the task will be scheduling a different sync operation outside of the TaskChains scope.
571     *
572     * Usually you could achieve the same design with a blocking API by switching to an async task
573     * for the next task and running it there.
574     *
575     * This method would primarily be for cases where you need to use an API that ONLY provides
576     * a callback style API.
577     *
578     * @param task The task to execute
579     * @param <R> Return type that the next parameter can expect as argument type
580     */
581    @SuppressWarnings("WeakerAccess")
582    public <R> TaskChain<R> syncFirstCallback(AsyncExecutingFirstTask<R> task) {
583        //noinspection unchecked
584        return add0(new TaskHolder<>(this, false, task));
585    }
586
587    /**
588     * {@link TaskChain#syncFirstCallback(AsyncExecutingFirstTask)} but ran off main thread
589     * @param task The task to execute
590     * @param <R> Return type that the next parameter can expect as argument type
591     */
592    @SuppressWarnings("WeakerAccess")
593    public <R> TaskChain<R> asyncFirstCallback(AsyncExecutingFirstTask<R> task) {
594        //noinspection unchecked
595        return add0(new TaskHolder<>(this, true, task));
596    }
597
598    /**
599     * {@link TaskChain#syncFirstCallback(AsyncExecutingFirstTask)} but ran on current thread the Chain was created on
600     * @param task The task to execute
601     * @param <R> Return type that the next parameter can expect as argument type
602     */
603    @SuppressWarnings("WeakerAccess")
604    public <R> TaskChain<R> currentFirstCallback(AsyncExecutingFirstTask<R> task) {
605        //noinspection unchecked
606        return add0(new TaskHolder<>(this, null, task));
607    }
608
609    /**
610     * Execute a task on the main thread, with the last output, and a callback to return the response to.
611     *
612     * It's important you don't perform blocking operations in this method. Only use this if
613     * the task will be scheduling a different sync operation outside of the TaskChains scope.
614     *
615     * Usually you could achieve the same design with a blocking API by switching to an async task
616     * for the next task and running it there.
617     *
618     * This method would primarily be for cases where you need to use an API that ONLY provides
619     * a callback style API.
620     *
621     * @param task The task to execute
622     * @param <R> Return type that the next parameter can expect as argument type
623     */
624    @SuppressWarnings("WeakerAccess")
625    public <R> TaskChain<R> syncCallback(AsyncExecutingTask<R, T> task) {
626        //noinspection unchecked
627        return add0(new TaskHolder<>(this, false, task));
628    }
629
630    /**
631     * {@link TaskChain#syncCallback(AsyncExecutingTask)}, ran on main thread but no input or output
632     * @param task The task to execute
633     */
634    @SuppressWarnings("WeakerAccess")
635    public TaskChain<?> syncCallback(AsyncExecutingGenericTask task) {
636        return add0(new TaskHolder<>(this, false, task));
637    }
638
639    /**
640     * {@link TaskChain#syncCallback(AsyncExecutingTask)} but ran off main thread
641     * @param task The task to execute
642     * @param <R> Return type that the next parameter can expect as argument type
643     */
644    @SuppressWarnings("WeakerAccess")
645    public <R> TaskChain<R> asyncCallback(AsyncExecutingTask<R, T> task) {
646        //noinspection unchecked
647        return add0(new TaskHolder<>(this, true, task));
648    }
649
650    /**
651     * {@link TaskChain#syncCallback(AsyncExecutingTask)} but ran off main thread
652     * @param task The task to execute
653     */
654    @SuppressWarnings("WeakerAccess")
655    public TaskChain<?> asyncCallback(AsyncExecutingGenericTask task) {
656        return add0(new TaskHolder<>(this, true, task));
657    }
658
659    /**
660     * {@link TaskChain#syncCallback(AsyncExecutingTask)} but ran on current thread the Chain was created on
661     * @param task The task to execute
662     * @param <R> Return type that the next parameter can expect as argument type
663     */
664    @SuppressWarnings("WeakerAccess")
665    public <R> TaskChain<R> currentCallback(AsyncExecutingTask<R, T> task) {
666        //noinspection unchecked
667        return add0(new TaskHolder<>(this, null, task));
668    }
669
670    /**
671     * {@link TaskChain#syncCallback(AsyncExecutingTask)} but ran on current thread the Chain was created on
672     * @param task The task to execute
673     */
674    @SuppressWarnings("WeakerAccess")
675    public TaskChain<?> currentCallback(AsyncExecutingGenericTask task) {
676        return add0(new TaskHolder<>(this, null, task));
677    }
678
679    // </editor-fold>
680    // <editor-fold desc="// API Methods - Future">
681    /* ======================================================================================== */
682    // Future Tasks
683    /* ======================================================================================== */
684
685    /**
686     * Takes a supplied Future, and holds processing of the chain until the future completes.
687     * The value of the Future will be passed until the next task.
688     *
689     * @param future The Future to wait until it is complete on
690     * @param <R> Return type that the next parameter can expect as argument type
691     */
692    @SuppressWarnings("WeakerAccess")
693    public <R> TaskChain<R> future(CompletableFuture<R> future) {
694        return currentFuture((input) -> future);
695    }
696
697    /**
698     * Takes multiple supplied Futures, and holds processing of the chain until the futures completes.
699     * The results of the Futures will be passed until the next task.
700     *
701     * @param futures The Futures to wait until it is complete on
702     * @param <R> Return type that the next parameter can expect as argument type
703     */
704    @SafeVarargs
705    @SuppressWarnings("WeakerAccess")
706    public final <R> TaskChain<List<R>> futures(CompletableFuture<R>... futures) {
707        List<CompletableFuture<R>> futureList = new ArrayList<>(futures.length);
708        Collections.addAll(futureList, futures);
709        return futures(futureList);
710    }
711
712    /**
713     * Takes multiple supplied Futures, and holds processing of the chain until the futures completes.
714     * The results of the Futures will be passed until the next task.
715     *
716     * @param futures The Futures to wait until it is complete on
717     * @param <R> Return type that the next parameter can expect as argument type
718     */
719    @SuppressWarnings("WeakerAccess")
720    public <R> TaskChain<List<R>> futures(List<CompletableFuture<R>> futures) {
721        return currentFuture((input) -> getFuture(futures));
722    }
723
724    /**
725     * Executes a Task on the Main thread that provides a list of Futures, and holds processing
726     * of the chain until all of the futures completes.
727     *
728     * The response of every future will be passed to the next task as a List, in the order
729     * the futures were supplied.
730     *
731     * @param task The Futures Provider Task
732     * @param <R> Return type that the next parameter can expect as argument type
733     */
734    @SuppressWarnings("WeakerAccess")
735    public <R> TaskChain<List<R>> syncFutures(Task<List<CompletableFuture<R>>, T> task) {
736        return syncFuture((input) -> getFuture(task.run(input)));
737    }
738
739    /**
740     * Executes a Task off the Main thread that provides a list of Futures, and holds processing
741     * of the chain until all of the futures completes.
742     *
743     * The response of every future will be passed to the next task as a List, in the order
744     * the futures were supplied.
745     *
746     * @param task The Futures Provider Task
747     * @param <R> Return type that the next parameter can expect as argument type
748     */
749    @SuppressWarnings("WeakerAccess")
750    public <R> TaskChain<List<R>> asyncFutures(Task<List<CompletableFuture<R>>, T> task) {
751        return asyncFuture((input) -> getFuture(task.run(input)));
752    }
753
754    /**
755     * Executes a Task on the current thread that provides a list of Futures, and holds processing
756     * of the chain until all of the futures completes.
757     *
758     * The response of every future will be passed to the next task as a List, in the order
759     * the futures were supplied.
760     *
761     * @param task The Futures Provider Task
762     * @param <R> Return type that the next parameter can expect as argument type
763     */
764    @SuppressWarnings("WeakerAccess")
765    public <R> TaskChain<List<R>> currentFutures(Task<List<CompletableFuture<R>>, T> task) {
766        return currentFuture((input) -> getFuture(task.run(input)));
767    }
768
769    /**
770     * Executes a Task on the Main thread that provides a list of Futures, and holds processing
771     * of the chain until all of the futures completes.
772     *
773     * The response of every future will be passed to the next task as a List, in the order
774     * the futures were supplied.
775     *
776     * @param task The Futures Provider Task
777     * @param <R> Return type that the next parameter can expect as argument type
778     */
779    @SuppressWarnings("WeakerAccess")
780    public <R> TaskChain<List<R>> syncFirstFutures(FirstTask<List<CompletableFuture<R>>> task) {
781        return syncFuture((input) -> getFuture(task.run()));
782    }
783
784    /**
785     * Executes a Task off the Main thread that provides a list of Futures, and holds processing
786     * of the chain until all of the futures completes.
787     *
788     * The response of every future will be passed to the next task as a List, in the order
789     * the futures were supplied.
790     *
791     * @param task The Futures Provider Task
792     * @param <R> Return type that the next parameter can expect as argument type
793     */
794    @SuppressWarnings("WeakerAccess")
795    public <R> TaskChain<List<R>> asyncFirstFutures(FirstTask<List<CompletableFuture<R>>> task) {
796        return asyncFuture((input) -> getFuture(task.run()));
797    }
798
799    /**
800     * Executes a Task on the current thread that provides a list of Futures, and holds processing
801     * of the chain until all of the futures completes.
802     *
803     * The response of every future will be passed to the next task as a List, in the order
804     * the futures were supplied.
805     *
806     * @param task The Futures Provider Task
807     * @param <R> Return type that the next parameter can expect as argument type
808     */
809    @SuppressWarnings("WeakerAccess")
810    public <R> TaskChain<List<R>> currentFirstFutures(FirstTask<List<CompletableFuture<R>>> task) {
811        return currentFuture((input) -> getFuture(task.run()));
812    }
813
814    /**
815     * Execute a task on the main thread, with no previous input, that will return a Future to signal completion
816     *
817     * It's important you don't perform blocking operations in this method. Only use this if
818     * the task will be scheduling a different async operation outside of the TaskChains scope.
819     *
820     *
821     * @param task The task to execute
822     * @param <R> Return type that the next parameter can expect as argument type
823     */
824    @SuppressWarnings("WeakerAccess")
825    public <R> TaskChain<R> syncFirstFuture(FutureFirstTask<R> task) {
826        //noinspection unchecked
827        return add0(new TaskHolder<>(this, false, task));
828    }
829
830    /**
831     * {@link TaskChain#syncFirstFuture(FutureFirstTask)} but ran off main thread
832     * @param task The task to execute
833     * @param <R> Return type that the next parameter can expect as argument type
834     */
835    @SuppressWarnings("WeakerAccess")
836    public <R> TaskChain<R> asyncFirstFuture(FutureFirstTask<R> task) {
837        //noinspection unchecked
838        return add0(new TaskHolder<>(this, true, task));
839    }
840
841    /**
842     * {@link TaskChain#syncFirstFuture(FutureFirstTask)} but ran on current thread the Chain was created on
843     * @param task The task to execute
844     * @param <R> Return type that the next parameter can expect as argument type
845     */
846    @SuppressWarnings("WeakerAccess")
847    public <R> TaskChain<R> currentFirstFuture(FutureFirstTask<R> task) {
848        //noinspection unchecked
849        return add0(new TaskHolder<>(this, null, task));
850    }
851
852    /**
853     * Execute a task on the main thread, with the last output as the input to the future provider,
854     * that will return a Future to signal completion.
855     *
856     * It's important you don't perform blocking operations in this method. Only use this if
857     * the task will be scheduling a different async operation outside of the TaskChains scope.
858     *
859     * @param task The task to execute
860     * @param <R> Return type that the next parameter can expect as argument type
861     */
862    @SuppressWarnings("WeakerAccess")
863    public <R> TaskChain<R> syncFuture(FutureTask<R, T> task) {
864        //noinspection unchecked
865        return add0(new TaskHolder<>(this, false, task));
866    }
867
868    /**
869     * {@link TaskChain#syncFuture(FutureTask)}, ran on main thread but no input or output
870     * @param task The task to execute
871     */
872    @SuppressWarnings("WeakerAccess")
873    public TaskChain<?> syncFuture(FutureGenericTask task) {
874        return add0(new TaskHolder<>(this, false, task));
875    }
876
877    /**
878     * {@link TaskChain#syncFuture(FutureTask)} but the future provider is ran off main thread
879     * @param task The task to execute
880     * @param <R> Return type that the next parameter can expect as argument type
881     */
882    @SuppressWarnings("WeakerAccess")
883    public <R> TaskChain<R> asyncFuture(FutureTask<R, T> task) {
884        //noinspection unchecked
885        return add0(new TaskHolder<>(this, true, task));
886    }
887
888    /**
889     * {@link TaskChain#syncFuture(FutureTask)} but the future provider is ran off main thread
890     * @param task The task to execute
891     */
892    @SuppressWarnings("WeakerAccess")
893    public TaskChain<?> asyncFuture(FutureGenericTask task) {
894        return add0(new TaskHolder<>(this, true, task));
895    }
896
897    /**
898     * {@link TaskChain#syncFuture(FutureTask)} but the future provider is ran on current thread the Chain was created on
899     * @param task The task to execute
900     * @param <R> Return type that the next parameter can expect as argument type
901     */
902    @SuppressWarnings("WeakerAccess")
903    public <R> TaskChain<R> currentFuture(FutureTask<R, T> task) {
904        //noinspection unchecked
905        return add0(new TaskHolder<>(this, null, task));
906    }
907
908    /**
909     * {@link TaskChain#syncFuture(FutureTask)} but the future provider is ran on current thread the Chain was created on
910     * @param task The task to execute
911     */
912    @SuppressWarnings("WeakerAccess")
913    public TaskChain<?> currentFuture(FutureGenericTask task) {
914        return add0(new TaskHolder<>(this, null, task));
915    }
916
917    // </editor-fold>
918    // <editor-fold desc="// API Methods - Normal">
919    /* ======================================================================================== */
920    // Normal Tasks
921    /* ======================================================================================== */
922
923    /**
924     * Execute task on main thread, with no input, returning an output
925     * @param task The task to execute
926     * @param <R> Return type that the next parameter can expect as argument type
927     */
928    @SuppressWarnings("WeakerAccess")
929    public <R> TaskChain<R> syncFirst(FirstTask<R> task) {
930        //noinspection unchecked
931        return add0(new TaskHolder<>(this, false, task));
932    }
933
934    /**
935     * {@link TaskChain#syncFirst(FirstTask)} but ran off main thread
936     * @param task The task to execute
937     * @param <R> Return type that the next parameter can expect as argument type
938     */
939    @SuppressWarnings("WeakerAccess")
940    public <R> TaskChain<R> asyncFirst(FirstTask<R> task) {
941        //noinspection unchecked
942        return add0(new TaskHolder<>(this, true, task));
943    }
944
945    /**
946     * {@link TaskChain#syncFirst(FirstTask)} but ran on current thread the Chain was created on
947     * @param task The task to execute
948     * @param <R> Return type that the next parameter can expect as argument type
949     */
950    @SuppressWarnings("WeakerAccess")
951    public <R> TaskChain<R> currentFirst(FirstTask<R> task) {
952        //noinspection unchecked
953        return add0(new TaskHolder<>(this, null, task));
954    }
955
956    /**
957     * Execute task on main thread, with the last returned input, returning an output
958     * @param task The task to execute
959     * @param <R> Return type that the next parameter can expect as argument type
960     */
961    @SuppressWarnings("WeakerAccess")
962    public <R> TaskChain<R> sync(Task<R, T> task) {
963        //noinspection unchecked
964        return add0(new TaskHolder<>(this, false, task));
965    }
966
967    /**
968     * Execute task on main thread, with no input or output
969     * @param task The task to execute
970     */
971    @SuppressWarnings("WeakerAccess")
972    public TaskChain<?> sync(GenericTask task) {
973        return add0(new TaskHolder<>(this, false, task));
974    }
975
976    /**
977     * {@link TaskChain#sync(Task)} but ran off main thread
978     * @param task The task to execute
979     * @param <R> Return type that the next parameter can expect as argument type
980     */
981    @SuppressWarnings("WeakerAccess")
982    public <R> TaskChain<R> async(Task<R, T> task) {
983        //noinspection unchecked
984        return add0(new TaskHolder<>(this, true, task));
985    }
986
987    /**
988     * {@link TaskChain#sync(GenericTask)} but ran off main thread
989     * @param task The task to execute
990     */
991    @SuppressWarnings("WeakerAccess")
992    public TaskChain<?> async(GenericTask task) {
993        return add0(new TaskHolder<>(this, true, task));
994    }
995
996    /**
997     * {@link TaskChain#sync(Task)} but ran on current thread the Chain was created on
998     * @param task The task to execute
999     * @param <R> Return type that the next parameter can expect as argument type
1000     */
1001    @SuppressWarnings("WeakerAccess")
1002    public <R> TaskChain<R> current(Task<R, T> task) {
1003        //noinspection unchecked
1004        return add0(new TaskHolder<>(this, null, task));
1005    }
1006
1007    /**
1008     * {@link TaskChain#sync(GenericTask)} but ran on current thread the Chain was created on
1009     * @param task The task to execute
1010     */
1011    @SuppressWarnings("WeakerAccess")
1012    public TaskChain<?> current(GenericTask task) {
1013        return add0(new TaskHolder<>(this, null, task));
1014    }
1015
1016
1017    /**
1018     * Execute task on main thread, with the last output, and no furthur output
1019     * @param task The task to execute
1020     */
1021    @SuppressWarnings("WeakerAccess")
1022    public TaskChain<?> syncLast(LastTask<T> task) {
1023        return add0(new TaskHolder<>(this, false, task));
1024    }
1025
1026    /**
1027     * {@link TaskChain#syncLast(LastTask)} but ran off main thread
1028     * @param task The task to execute
1029     */
1030    @SuppressWarnings("WeakerAccess")
1031    public TaskChain<?> asyncLast(LastTask<T> task) {
1032        return add0(new TaskHolder<>(this, true, task));
1033    }
1034
1035    /**
1036     * {@link TaskChain#syncLast(LastTask)} but ran on current thread the Chain was created on
1037     * @param task The task to execute
1038     */
1039    @SuppressWarnings("WeakerAccess")
1040    public TaskChain<?> currentLast(LastTask<T> task) {
1041        return add0(new TaskHolder<>(this, null, task));
1042    }
1043
1044    /**
1045     * Finished adding tasks, begins executing them.
1046     */
1047    @SuppressWarnings("WeakerAccess")
1048    public void execute() {
1049        execute((Consumer<Boolean>) null, null);
1050    }
1051
1052    /**
1053     * Finished adding tasks, begins executing them with a done notifier
1054     * @param done The Callback to handle when the chain has finished completion. Argument to consumer contains finish state
1055     */
1056    @SuppressWarnings("WeakerAccess")
1057    public void execute(Runnable done) {
1058        execute((finished) -> done.run(), null);
1059    }
1060
1061    /**
1062     * Finished adding tasks, begins executing them with a done notifier and error handler
1063     * @param done The Callback to handle when the chain has finished completion. Argument to consumer contains finish state
1064     * @param errorHandler The Error handler to handle exceptions
1065     */
1066    @SuppressWarnings("WeakerAccess")
1067    public void execute(Runnable done, BiConsumer<Exception, Task<?, ?>> errorHandler) {
1068        execute((finished) -> done.run(), errorHandler);
1069    }
1070
1071    /**
1072     * Finished adding tasks, with a done notifier
1073     * @param done The Callback to handle when the chain has finished completion. Argument to consumer contains finish state
1074     */
1075    @SuppressWarnings("WeakerAccess")
1076    public void execute(Consumer<Boolean> done) {
1077        execute(done, null);
1078    }
1079
1080    /**
1081     * Finished adding tasks, begins executing them, with an error handler
1082     * @param errorHandler The Error handler to handle exceptions
1083     */
1084    public void execute(BiConsumer<Exception, Task<?, ?>> errorHandler) {
1085        execute((Consumer<Boolean>) null, errorHandler);
1086    }
1087
1088    /**
1089     * Finished adding tasks, begins executing them with a done notifier and error handler
1090     * @param done The Callback to handle when the chain has finished completion. Argument to consumer contains finish state
1091     * @param errorHandler The Error handler to handle exceptions
1092     */
1093    public void execute(Consumer<Boolean> done, BiConsumer<Exception, Task<?, ?>> errorHandler) {
1094        if (errorHandler == null) {
1095            errorHandler = factory.getDefaultErrorHandler();
1096        }
1097        this.doneCallback = done;
1098        this.errorHandler = errorHandler;
1099        execute0();
1100    }
1101
1102    // </editor-fold>
1103    /* ======================================================================================== */
1104    // <editor-fold desc="// Implementation Details">
1105    private <A1, A2, A3> void handleAbortAction(TaskChainAbortAction<A1, A2, A3> action, A1 arg1, A2 arg2, A3 arg3) {
1106        if (action != null) {
1107            final TaskChain<?> prev = currentChain.get();
1108            try {
1109                currentChain.set(this);
1110                action.onAbort(this, arg1, arg2, arg3);
1111            } catch (Exception e) {
1112                TaskChainUtil.logError("TaskChain Exception in Abort Action handler: " + action.getClass().getName());
1113                TaskChainUtil.logError("Current Action Index was: " + currentActionIndex);
1114                e.printStackTrace();
1115            } finally {
1116                currentChain.set(prev);
1117            }
1118        }
1119        abort();
1120    }
1121
1122    void execute0() {
1123        synchronized (this) {
1124            if (this.executed) {
1125                throw new RuntimeException("Already executed");
1126            }
1127            this.executed = true;
1128        }
1129        async = !impl.isMainThread();
1130        nextTask();
1131    }
1132
1133    void done(boolean finished) {
1134        this.done = true;
1135        if (this.doneCallback != null) {
1136            final TaskChain<?> prev = currentChain.get();
1137            try {
1138                currentChain.set(this);
1139                this.doneCallback.accept(finished);
1140            } catch (Exception e) {
1141                this.handleError(e, null);
1142            } finally {
1143                currentChain.set(prev);
1144            }
1145        }
1146    }
1147
1148    @SuppressWarnings({"rawtypes", "WeakerAccess"})
1149    protected TaskChain add0(TaskHolder<?,?> task) {
1150        synchronized (this) {
1151            if (this.executed) {
1152                throw new RuntimeException("TaskChain is executing");
1153            }
1154        }
1155
1156        this.chainQueue.add(task);
1157        return this;
1158    }
1159
1160    /**
1161     * Fires off the next task, and switches between Async/Sync as necessary.
1162     */
1163    private void nextTask() {
1164        synchronized (this) {
1165            this.currentHolder = this.chainQueue.poll();
1166            if (this.currentHolder == null) {
1167                this.done = true; // to ensure its done while synchronized
1168            }
1169        }
1170
1171        if (this.currentHolder == null) {
1172            this.previous = null;
1173            // All Done!
1174            this.done(true);
1175            return;
1176        }
1177
1178        Boolean isNextAsync = this.currentHolder.async;
1179        if (isNextAsync == null || factory.shutdown) {
1180            this.currentHolder.run();
1181        } else if (isNextAsync) {
1182            if (this.async) {
1183                this.currentHolder.run();
1184            } else {
1185                impl.postAsync(() -> {
1186                    this.async = true;
1187                    this.currentHolder.run();
1188                });
1189            }
1190        } else {
1191            if (this.async) {
1192                impl.postToMain(() -> {
1193                    this.async = false;
1194                    this.currentHolder.run();
1195                });
1196            } else {
1197                this.currentHolder.run();
1198            }
1199        }
1200    }
1201
1202    private void handleError(Throwable throwable, Task<?, ?> task) {
1203        Exception e = throwable instanceof Exception ? (Exception) throwable : new Exception(throwable);
1204        if (errorHandler != null) {
1205            final TaskChain<?> prev = currentChain.get();
1206            try {
1207                currentChain.set(this);
1208                errorHandler.accept(e, task);
1209            } catch (Exception e2) {
1210                TaskChainUtil.logError("TaskChain Exception in the error handler!" + e2.getMessage());
1211                TaskChainUtil.logError("Current Action Index was: " + currentActionIndex);
1212                e.printStackTrace();
1213            } finally {
1214                currentChain.set(prev);
1215            }
1216        } else {
1217            TaskChainUtil.logError("TaskChain Exception on " + (task != null ? task.getClass().getName() : "Done Hander") + ": " + e.getMessage());
1218            TaskChainUtil.logError("Current Action Index was: " + currentActionIndex);
1219            e.printStackTrace();
1220        }
1221    }
1222
1223    private void abortExecutingChain() {
1224        this.previous = null;
1225        this.chainQueue.clear();
1226        this.done(false);
1227    }
1228
1229    private <R> CompletableFuture<List<R>> getFuture(List<CompletableFuture<R>> futures) {
1230        CompletableFuture<List<R>> onDone = new CompletableFuture<>();
1231        CompletableFuture<?>[] futureArray = new CompletableFuture<?>[futures.size()];
1232        CompletableFuture.allOf((CompletableFuture<?>[]) futures.toArray(futureArray)).whenComplete((aVoid, throwable) -> {
1233            if (throwable != null) {
1234                onDone.completeExceptionally(throwable);
1235            } else {
1236                boolean[] error = {false};
1237                final List<R> results = futures.stream().map(f -> {
1238                    try {
1239                        return f.join();
1240                    } catch (Exception e) {
1241                        error[0] = true;
1242                        TaskChain.this.handleError(e, TaskChain.this.currentHolder.task);
1243                        return null;
1244                    }
1245                }).collect(Collectors.toList());
1246                if (error[0]) {
1247                    onDone.completeExceptionally(new Exception("Future Dependant had an exception"));
1248                } else {
1249                    onDone.complete(results);
1250                }
1251            }
1252        });
1253        return onDone;
1254    }
1255
1256    // </editor-fold>
1257    /* ======================================================================================== */
1258    // <editor-fold desc="// TaskHolder">
1259    /**
1260     * Provides foundation of a task with what the previous task type should return
1261     * to pass to this and what this task will return.
1262     * @param <R> Return Type
1263     * @param <A> Argument Type Expected
1264     */
1265    @SuppressWarnings("AccessingNonPublicFieldOfAnotherObject")
1266    private class TaskHolder<R, A> {
1267        private final TaskChain<?> chain;
1268        private final Task<R, A> task;
1269        final Boolean async;
1270
1271        private boolean executed = false;
1272        private boolean aborted = false;
1273        private final int actionIndex;
1274
1275        private TaskHolder(TaskChain<?> chain, Boolean async, Task<R, A> task) {
1276            this.actionIndex = TaskChain.this.actionIndex++;
1277            this.task = task;
1278            this.chain = chain;
1279            this.async = async;
1280        }
1281
1282        /**
1283         * Called internally by Task Chain to facilitate executing the task and then the next task.
1284         */
1285        private void run() {
1286            final Object arg = this.chain.previous;
1287            this.chain.previous = null;
1288            TaskChain.this.currentActionIndex = this.actionIndex;
1289            final R res;
1290            final TaskChain<?> prevChain = currentChain.get();
1291            try {
1292                currentChain.set(this.chain);
1293                if (this.task instanceof FutureTask) {
1294                    //noinspection unchecked
1295                    final CompletableFuture<R> future = ((FutureTask<R, A>) this.task).runFuture((A) arg);
1296                    if (future == null) {
1297                        throw new NullPointerException("Must return a Future");
1298                    }
1299                    future.whenComplete((r, throwable) -> {
1300                        if (throwable != null) {
1301                            this.chain.handleError(throwable, this.task);
1302                            this.abort();
1303                        } else {
1304                            this.next(r);
1305                        }
1306                    });
1307                } else if (this.task instanceof AsyncExecutingTask) {
1308                    //noinspection unchecked
1309                    ((AsyncExecutingTask<R, A>) this.task).runAsync((A) arg, this::next);
1310                } else {
1311                    //noinspection unchecked
1312                    next(this.task.run((A) arg));
1313                }
1314            } catch (Throwable e) {
1315                //noinspection ConstantConditions
1316                if (e instanceof AbortChainException) {
1317                    this.abort();
1318                    return;
1319                }
1320                this.chain.handleError(e, this.task);
1321                this.abort();
1322            } finally {
1323                if (prevChain != null) {
1324                    currentChain.set(prevChain);
1325                } else {
1326                    currentChain.remove();
1327                }
1328            }
1329        }
1330
1331        /**
1332         * Abort the chain, and clear tasks for GC.
1333         */
1334        private synchronized void abort() {
1335            this.aborted = true;
1336            this.chain.abortExecutingChain();
1337        }
1338
1339        /**
1340         * Accepts result of previous task and executes the next
1341         */
1342        private void next(Object resp) {
1343            synchronized (this) {
1344                if (this.aborted) {
1345                    this.chain.done(false);
1346                    return;
1347                }
1348                if (this.executed) {
1349                    this.chain.done(false);
1350                    throw new RuntimeException("This task has already been executed.");
1351                }
1352                this.executed = true;
1353            }
1354
1355            this.chain.async = !TaskChain.this.impl.isMainThread(); // We don't know where the task called this from.
1356            this.chain.previous = resp;
1357            this.chain.nextTask();
1358        }
1359    }
1360    // </editor-fold>
1361}