001/*
002 * Copyright (c) 2016-2017 Daniel Ennis (Aikar) - MIT License
003 *
004 *  Permission is hereby granted, free of charge, to any person obtaining
005 *  a copy of this software and associated documentation files (the
006 *  "Software"), to deal in the Software without restriction, including
007 *  without limitation the rights to use, copy, modify, merge, publish,
008 *  distribute, sublicense, and/or sell copies of the Software, and to
009 *  permit persons to whom the Software is furnished to do so, subject to
010 *  the following conditions:
011 *
012 *  The above copyright notice and this permission notice shall be
013 *  included in all copies or substantial portions of the Software.
014 *
015 *  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
016 *  EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
017 *  MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
018 *  NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
019 *  LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
020 *  OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
021 *  WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
022 */
023
024/*
025 * TaskChain for Minecraft Plugins
026 *
027 * Written by Aikar <aikar@aikar.co>
028 * https://aikar.co
029 * https://starlis.com
030 *
031 * @license MIT
032 */
033
034package co.aikar.taskchain;
035
036import co.aikar.taskchain.TaskChainTasks.*;
037
038import java.util.ArrayList;
039import java.util.Collections;
040import java.util.HashMap;
041import java.util.List;
042import java.util.Map;
043import java.util.Objects;
044import java.util.concurrent.CompletableFuture;
045import java.util.concurrent.ConcurrentLinkedQueue;
046import java.util.concurrent.TimeUnit;
047import java.util.function.BiConsumer;
048import java.util.function.Consumer;
049import java.util.stream.Collectors;
050
051
052/**
053 * The Main API class of TaskChain. TaskChain's are created by a {@link TaskChainFactory}
054 */
055@SuppressWarnings({"unused", "FieldAccessedSynchronizedAndUnsynchronized"})
056public class TaskChain <T> {
057    private static final ThreadLocal<TaskChain<?>> currentChain = new ThreadLocal<>();
058
059    private final GameInterface impl;
060    private final TaskChainFactory factory;
061    private final Map<String, Object> taskMap = new HashMap<>(0);
062    private final ConcurrentLinkedQueue<TaskHolder<?,?>> chainQueue = new ConcurrentLinkedQueue<>();
063
064    private int currentActionIndex = 0;
065    private int actionIndex = 0;
066    private boolean executed = false;
067    private boolean async = false;
068    private boolean done = false;
069
070    private Object previous;
071    private TaskHolder<?, ?> currentHolder;
072    private Consumer<Boolean> doneCallback;
073    private BiConsumer<Exception, Task<?, ?>> errorHandler;
074
075    /* ======================================================================================== */
076    TaskChain(TaskChainFactory factory) {
077        this.factory = factory;
078        this.impl = factory.getImplementation();
079    }
080    /* ======================================================================================== */
081    // <editor-fold desc="// API Methods - Getters & Setters">
082    /**
083     * Called in an executing task, get the current action index.
084     * For every action that adds a task to the chain, the action index is increased.
085     *
086     * Useful in error or done handlers to know where you are in the chain when it aborted or threw exception.
087     * @return The current index
088     */
089    public int getCurrentActionIndex() {
090        return currentActionIndex;
091    }
092
093    /**
094     * Changes the done callback handler for this chain
095     * @param doneCallback The handler
096     */
097    @SuppressWarnings("WeakerAccess")
098    public void setDoneCallback(Consumer<Boolean> doneCallback) {
099        this.doneCallback = doneCallback;
100    }
101
102    /**
103     * @return The current error handler or null
104     */
105    public BiConsumer<Exception, Task<?, ?>> getErrorHandler() {
106        return errorHandler;
107    }
108
109    /**
110     * Changes the error handler for this chain
111     * @param errorHandler The error handler
112     */
113    @SuppressWarnings("WeakerAccess")
114    public void setErrorHandler(BiConsumer<Exception, Task<?, ?>> errorHandler) {
115        this.errorHandler = errorHandler;
116    }
117    // </editor-fold>
118    /* ======================================================================================== */
119    // <editor-fold desc="// API Methods - Data Wrappers">
120
121    /**
122     * Creates a data wrapper to return multiple objects from a task
123     */
124    public static <D1, D2> TaskChainDataWrappers.Data2<D1, D2> multi(D1 var1, D2 var2) {
125        return new TaskChainDataWrappers.Data2<>(var1, var2);
126    }
127
128    /**
129     * Creates a data wrapper to return multiple objects from a task
130     */
131    public static <D1, D2, D3> TaskChainDataWrappers.Data3<D1, D2, D3> multi(D1 var1, D2 var2, D3 var3) {
132        return new TaskChainDataWrappers.Data3<>(var1, var2, var3);
133    }
134
135    /**
136     * Creates a data wrapper to return multiple objects from a task
137     */
138    public static <D1, D2, D3, D4> TaskChainDataWrappers.Data4<D1, D2, D3, D4> multi(D1 var1, D2 var2, D3 var3, D4 var4) {
139        return new TaskChainDataWrappers.Data4<>(var1, var2, var3, var4);
140    }
141
142    /**
143     * Creates a data wrapper to return multiple objects from a task
144     */
145    public static <D1, D2, D3, D4, D5> TaskChainDataWrappers.Data5<D1, D2, D3, D4, D5> multi(D1 var1, D2 var2, D3 var3, D4 var4, D5 var5) {
146        return new TaskChainDataWrappers.Data5<>(var1, var2, var3, var4, var5);
147    }
148
149    /**
150     * Creates a data wrapper to return multiple objects from a task
151     */
152    public static <D1, D2, D3, D4, D5, D6> TaskChainDataWrappers.Data6<D1, D2, D3, D4, D5, D6> multi(D1 var1, D2 var2, D3 var3, D4 var4, D5 var5, D6 var6) {
153        return new TaskChainDataWrappers.Data6<>(var1, var2, var3, var4, var5, var6);
154    }
155    // </editor-fold>
156    /* ======================================================================================== */
157
158    // <editor-fold desc="// API Methods - Base">
159    /**
160     * Call to abort execution of the chain. Should be called inside of an executing task.
161     */
162    @SuppressWarnings("WeakerAccess")
163    public static void abort() {
164        TaskChainUtil.sneakyThrows(new AbortChainException());
165    }
166
167    /**
168     * Usable only inside of an executing Task or Chain Error/Done handlers
169     *
170     * Gets the current chain that is executing this Task or Error/Done handler
171     * This method should only be called on the same thread that is executing the method.
172     *
173     * In an AsyncExecutingTask or a FutureTask, You must call this method BEFORE passing control to another thread.
174     */
175    @SuppressWarnings("WeakerAccess")
176    public static TaskChain<?> getCurrentChain() {
177        return currentChain.get();
178    }
179
180    /* ======================================================================================== */
181
182    /**
183     * Allows you to call a callback to insert tasks into the chain without having to break the fluent interface
184     *
185     * Example: Plugin.newChain().sync(some::task).configure(chain -> {
186     *     chain.async(some::foo);
187     *     chain.sync(other::bar);
188     * }).async(other::task).execute();
189     *
190     * @param configure Instance of the current chain.
191     * @return The same chain
192     */
193    public TaskChain<T> configure(Consumer<TaskChain<T>> configure) {
194        configure.accept(this);
195        return this;
196    }
197
198    /**
199     * Checks if the chain has a value saved for the specified key.
200     * @param key Key to check if Task Data has a value for
201     */
202    @SuppressWarnings("WeakerAccess")
203    public boolean hasTaskData(String key) {
204        return taskMap.containsKey(key);
205    }
206
207    /**
208     * Retrieves a value relating to a specific key, saved by a previous task.
209     *
210     * @param key Key to look up Task Data for
211     * @param <R> Type the Task Data value is expected to be
212     */
213    @SuppressWarnings("WeakerAccess")
214    public <R> R getTaskData(String key) {
215        //noinspection unchecked
216        return (R) taskMap.get(key);
217    }
218
219    /**
220     * Saves a value for this chain so that a task furthur up the chain can access it.
221     *
222     * Useful for passing multiple values to the next (or furthur) tasks.
223     *
224     * @param key Key to store in Task Data
225     * @param val Value to store in Task Data
226     * @param <R> Type the Task Data value is expected to be
227     */
228    @SuppressWarnings("WeakerAccess")
229    public <R> R setTaskData(String key, Object val) {
230        //noinspection unchecked
231        return (R) taskMap.put(key, val);
232    }
233
234    /**
235     * Removes a saved value on the chain.
236     *
237     * @param key Key to remove from Task Data
238     * @param <R> Type the Task Data value is expected to be
239     */
240    @SuppressWarnings("WeakerAccess")
241    public <R> R removeTaskData(String key) {
242        //noinspection unchecked
243        return (R) taskMap.remove(key);
244    }
245
246    /**
247     * Takes the previous tasks return value, stores it to the specified key
248     * as Task Data, and then forwards that value to the next task.
249     *
250     * @param key Key to store the previous return value into Task Data
251     */
252    @SuppressWarnings("WeakerAccess")
253    public TaskChain<T> storeAsData(String key) {
254        return current((val) -> {
255            setTaskData(key, val);
256            return val;
257        });
258    }
259
260    /**
261     * Reads the specified key from Task Data, and passes it to the next task.
262     *
263     * Will need to pass expected type such as chain.&lt;Foo&gt;returnData("key")
264     *
265     * @param key Key to retrieve from Task Data and pass to next task
266     * @param <R> Return type that the next parameter can expect as argument type
267     */
268    @SuppressWarnings("WeakerAccess")
269    public <R> TaskChain<R> returnData(String key) {
270        //noinspection unchecked
271        return currentFirst(() -> (R) getTaskData(key));
272    }
273
274    /**
275     * Returns the chain itself to the next task.
276     */
277    @SuppressWarnings("WeakerAccess")
278    public TaskChain<TaskChain<?>> returnChain() {
279        return currentFirst(() -> this);
280    }
281
282
283    /**
284     * IMPLEMENTATION SPECIFIC!!
285     * Consult your application implementation to understand how long 1 unit is.
286     *
287     * For example, in Minecraft it is a tick, which is roughly 50 milliseconds, but not guaranteed.
288     *
289     * Adds a delay to the chain execution.
290     *
291     * @param gameUnits # of game units to delay before next task
292     */
293    @SuppressWarnings("WeakerAccess")
294    public TaskChain<T> delay(final int gameUnits) {
295        //noinspection CodeBlock2Expr
296        return currentCallback((input, next) -> {
297            impl.scheduleTask(gameUnits, () -> next.accept(input));
298        });
299    }
300
301    /**
302     * Adds a real time delay to the chain execution.
303     * Chain will abort if the delay is interrupted.
304     *
305     * @param duration duration of the delay before next task
306     */
307    @SuppressWarnings("WeakerAccess")
308    public TaskChain<T> delay(final int duration, TimeUnit unit) {
309        //noinspection CodeBlock2Expr
310        return currentCallback((input, next) -> {
311            impl.scheduleTask(duration, unit, () -> next.accept(input));
312        });
313    }
314
315    // </editor-fold>
316    // <editor-fold desc="// API Methods - Abort">
317
318    /**
319     * Aborts the chain once this step is reached. This is primarily useful when you are
320     * dynamically building the chains steps (such as .configure()) and want to conditionally stop the
321     * chain from proceeding.
322     *
323     * @return Chain
324     */
325    public TaskChain<?> abortChain() {
326        return current(TaskChain::abort);
327    }
328
329    /**
330     * Checks if the previous task return was null.
331     *
332     * If not null, the previous task return will forward to the next task.
333     */
334    @SuppressWarnings("WeakerAccess")
335    public TaskChain<T> abortIfNull() {
336        return abortIfNull(null, null, null, null);
337    }
338
339    /**
340     * {@link TaskChain#abortIf(Object, TaskChainAbortAction, Object, Object, Object)}
341     */
342    @SuppressWarnings("WeakerAccess")
343    public TaskChain<T> abortIfNull(TaskChainAbortAction<?, ?, ?> action) {
344        return abortIf(null, action, null, null, null);
345    }
346
347    /**
348     * {@link TaskChain#abortIf(Object, TaskChainAbortAction, Object, Object, Object)}
349     */
350    @SuppressWarnings("WeakerAccess")
351    public <A1> TaskChain<T> abortIfNull(TaskChainAbortAction<A1, ?, ?> action, A1 arg1) {
352        //noinspection unchecked
353        return abortIf(null, action, arg1, null, null);
354    }
355
356    /**
357     * {@link TaskChain#abortIf(Object, TaskChainAbortAction, Object, Object, Object)}
358     */
359    @SuppressWarnings("WeakerAccess")
360    public <A1, A2> TaskChain<T> abortIfNull(TaskChainAbortAction<A1, A2, ?> action, A1 arg1, A2 arg2) {
361        //noinspection unchecked
362        return abortIf(null, action, arg1, arg2, null);
363    }
364
365    /**
366     * Checks if the previous task return was null, and aborts if it was
367     * Then executes supplied action handler
368     *
369     * If not null, the previous task return will forward to the next task.
370     */
371    @SuppressWarnings("WeakerAccess")
372    public <A1, A2, A3> TaskChain<T> abortIfNull(TaskChainAbortAction<A1, A2, A3> action, A1 arg1, A2 arg2, A3 arg3) {
373        //noinspection unchecked
374        return abortIf(null, action, arg1, arg2, arg3);
375    }
376
377    /**
378     * Checks if the previous task return is the supplied value.
379     *
380     * If not, the previous task return will forward to the next task.
381     */
382    @SuppressWarnings("WeakerAccess")
383    public TaskChain<T> abortIf(T ifObj) {
384        return abortIf(ifObj, null, null, null, null);
385    }
386
387    /**
388     * {@link TaskChain#abortIf(Object, TaskChainAbortAction, Object, Object, Object)}
389     */
390    @SuppressWarnings("WeakerAccess")
391    public TaskChain<T> abortIf(T ifObj, TaskChainAbortAction<?, ?, ?> action) {
392        return abortIf(ifObj, action, null, null, null);
393    }
394
395    /**
396     * {@link TaskChain#abortIf(Object, TaskChainAbortAction, Object, Object, Object)}
397     */
398    @SuppressWarnings("WeakerAccess")
399    public <A1> TaskChain<T> abortIf(T ifObj, TaskChainAbortAction<A1, ?, ?> action, A1 arg1) {
400        return abortIf(ifObj, action, arg1, null, null);
401    }
402
403    /**
404     * {@link TaskChain#abortIf(Object, TaskChainAbortAction, Object, Object, Object)}
405     */
406    @SuppressWarnings("WeakerAccess")
407    public <A1, A2> TaskChain<T> abortIf(T ifObj, TaskChainAbortAction<A1, A2, ?> action, A1 arg1, A2 arg2) {
408        return abortIf(ifObj, action, arg1, arg2, null);
409    }
410
411    /**
412     * Checks if the previous task return is the supplied value, and aborts if it was.
413     * Then executes supplied action handler
414     *
415     * If not null, the previous task return will forward to the next task.
416     */
417    @SuppressWarnings("WeakerAccess")
418    public <A1, A2, A3> TaskChain<T> abortIf(T ifObj, TaskChainAbortAction<A1, A2, A3> action, A1 arg1, A2 arg2, A3 arg3) {
419        return current((obj) -> {
420            if (Objects.equals(obj, ifObj)) {
421                handleAbortAction(action, arg1, arg2, arg3);
422                return null;
423            }
424            return obj;
425        });
426    }
427
428    /**
429     * Checks if the previous task return is not the supplied value.
430     *
431     * If it is, the previous task return will forward to the next task.
432     */
433    @SuppressWarnings("WeakerAccess")
434    public TaskChain<T> abortIfNot(T ifNotObj) {
435        return abortIfNot(ifNotObj, null, null, null, null);
436    }
437
438    /**
439     * {@link TaskChain#abortIfNot(Object, TaskChainAbortAction, Object, Object, Object)}
440     */
441    @SuppressWarnings("WeakerAccess")
442    public TaskChain<T> abortIfNot(T ifNotObj, TaskChainAbortAction<?, ?, ?> action) {
443        return abortIfNot(ifNotObj, action, null, null, null);
444    }
445
446    /**
447     * {@link TaskChain#abortIfNot(Object, TaskChainAbortAction, Object, Object, Object)}
448     */
449    @SuppressWarnings("WeakerAccess")
450    public <A1> TaskChain<T> abortIfNot(T ifNotObj, TaskChainAbortAction<A1, ?, ?> action, A1 arg1) {
451        return abortIfNot(ifNotObj, action, arg1, null, null);
452    }
453
454    /**
455     * {@link TaskChain#abortIfNot(Object, TaskChainAbortAction, Object, Object, Object)}
456     */
457    @SuppressWarnings("WeakerAccess")
458    public <A1, A2> TaskChain<T> abortIfNot(T ifNotObj, TaskChainAbortAction<A1, A2, ?> action, A1 arg1, A2 arg2) {
459        return abortIfNot(ifNotObj, action, arg1, arg2, null);
460    }
461
462    /**
463     * Checks if the previous task return is the supplied value, and aborts if it was.
464     * Then executes supplied action handler
465     *
466     * If not null, the previous task return will forward to the next task.
467     */
468    @SuppressWarnings("WeakerAccess")
469    public <A1, A2, A3> TaskChain<T> abortIfNot(T ifNotObj, TaskChainAbortAction<A1, A2, A3> action, A1 arg1, A2 arg2, A3 arg3) {
470        return current((obj) -> {
471            if (!Objects.equals(obj, ifNotObj)) {
472                handleAbortAction(action, arg1, arg2, arg3);
473                return null;
474            }
475            return obj;
476        });
477    }
478
479    // </editor-fold>
480    // <editor-fold desc="// API Methods - Async Executing">
481    /* ======================================================================================== */
482    // Async Executing Tasks
483    /* ======================================================================================== */
484
485    /**
486     * Execute a task on the main thread, with no previous input, and a callback to return the response to.
487     *
488     * It's important you don't perform blocking operations in this method. Only use this if
489     * the task will be scheduling a different sync operation outside of the TaskChains scope.
490     *
491     * Usually you could achieve the same design with a blocking API by switching to an async task
492     * for the next task and running it there.
493     *
494     * This method would primarily be for cases where you need to use an API that ONLY provides
495     * a callback style API.
496     *
497     * @param task The task to execute
498     * @param <R> Return type that the next parameter can expect as argument type
499     */
500    @SuppressWarnings("WeakerAccess")
501    public <R> TaskChain<R> syncFirstCallback(AsyncExecutingFirstTask<R> task) {
502        //noinspection unchecked
503        return add0(new TaskHolder<>(this, false, task));
504    }
505
506    /**
507     * {@link TaskChain#syncFirstCallback(AsyncExecutingFirstTask)} but ran off main thread
508     * @param task The task to execute
509     * @param <R> Return type that the next parameter can expect as argument type
510     */
511    @SuppressWarnings("WeakerAccess")
512    public <R> TaskChain<R> asyncFirstCallback(AsyncExecutingFirstTask<R> task) {
513        //noinspection unchecked
514        return add0(new TaskHolder<>(this, true, task));
515    }
516
517    /**
518     * {@link TaskChain#syncFirstCallback(AsyncExecutingFirstTask)} but ran on current thread the Chain was created on
519     * @param task The task to execute
520     * @param <R> Return type that the next parameter can expect as argument type
521     */
522    @SuppressWarnings("WeakerAccess")
523    public <R> TaskChain<R> currentFirstCallback(AsyncExecutingFirstTask<R> task) {
524        //noinspection unchecked
525        return add0(new TaskHolder<>(this, null, task));
526    }
527
528    /**
529     * Execute a task on the main thread, with the last output, and a callback to return the response to.
530     *
531     * It's important you don't perform blocking operations in this method. Only use this if
532     * the task will be scheduling a different sync operation outside of the TaskChains scope.
533     *
534     * Usually you could achieve the same design with a blocking API by switching to an async task
535     * for the next task and running it there.
536     *
537     * This method would primarily be for cases where you need to use an API that ONLY provides
538     * a callback style API.
539     *
540     * @param task The task to execute
541     * @param <R> Return type that the next parameter can expect as argument type
542     */
543    @SuppressWarnings("WeakerAccess")
544    public <R> TaskChain<R> syncCallback(AsyncExecutingTask<R, T> task) {
545        //noinspection unchecked
546        return add0(new TaskHolder<>(this, false, task));
547    }
548
549    /**
550     * {@link TaskChain#syncCallback(AsyncExecutingTask)}, ran on main thread but no input or output
551     * @param task The task to execute
552     */
553    @SuppressWarnings("WeakerAccess")
554    public TaskChain<?> syncCallback(AsyncExecutingGenericTask task) {
555        return add0(new TaskHolder<>(this, false, task));
556    }
557
558    /**
559     * {@link TaskChain#syncCallback(AsyncExecutingTask)} but ran off main thread
560     * @param task The task to execute
561     * @param <R> Return type that the next parameter can expect as argument type
562     */
563    @SuppressWarnings("WeakerAccess")
564    public <R> TaskChain<R> asyncCallback(AsyncExecutingTask<R, T> task) {
565        //noinspection unchecked
566        return add0(new TaskHolder<>(this, true, task));
567    }
568
569    /**
570     * {@link TaskChain#syncCallback(AsyncExecutingTask)} but ran off main thread
571     * @param task The task to execute
572     */
573    @SuppressWarnings("WeakerAccess")
574    public TaskChain<?> asyncCallback(AsyncExecutingGenericTask task) {
575        return add0(new TaskHolder<>(this, true, task));
576    }
577
578    /**
579     * {@link TaskChain#syncCallback(AsyncExecutingTask)} but ran on current thread the Chain was created on
580     * @param task The task to execute
581     * @param <R> Return type that the next parameter can expect as argument type
582     */
583    @SuppressWarnings("WeakerAccess")
584    public <R> TaskChain<R> currentCallback(AsyncExecutingTask<R, T> task) {
585        //noinspection unchecked
586        return add0(new TaskHolder<>(this, null, task));
587    }
588
589    /**
590     * {@link TaskChain#syncCallback(AsyncExecutingTask)} but ran on current thread the Chain was created on
591     * @param task The task to execute
592     */
593    @SuppressWarnings("WeakerAccess")
594    public TaskChain<?> currentCallback(AsyncExecutingGenericTask task) {
595        return add0(new TaskHolder<>(this, null, task));
596    }
597
598    // </editor-fold>
599    // <editor-fold desc="// API Methods - Future">
600    /* ======================================================================================== */
601    // Future Tasks
602    /* ======================================================================================== */
603
604    /**
605     * Takes a supplied Future, and holds processing of the chain until the future completes.
606     * The value of the Future will be passed until the next task.
607     *
608     * @param future The Future to wait until it is complete on
609     * @param <R> Return type that the next parameter can expect as argument type
610     */
611    @SuppressWarnings("WeakerAccess")
612    public <R> TaskChain<R> future(CompletableFuture<R> future) {
613        return currentFuture((input) -> future);
614    }
615
616    /**
617     * Takes multiple supplied Futures, and holds processing of the chain until the futures completes.
618     * The results of the Futures will be passed until the next task.
619     *
620     * @param futures The Futures to wait until it is complete on
621     * @param <R> Return type that the next parameter can expect as argument type
622     */
623    @SafeVarargs
624    @SuppressWarnings("WeakerAccess")
625    public final <R> TaskChain<List<R>> futures(CompletableFuture<R>... futures) {
626        List<CompletableFuture<R>> futureList = new ArrayList<>(futures.length);
627        Collections.addAll(futureList, futures);
628        return futures(futureList);
629    }
630
631    /**
632     * Takes multiple supplied Futures, and holds processing of the chain until the futures completes.
633     * The results of the Futures will be passed until the next task.
634     *
635     * @param futures The Futures to wait until it is complete on
636     * @param <R> Return type that the next parameter can expect as argument type
637     */
638    @SuppressWarnings("WeakerAccess")
639    public <R> TaskChain<List<R>> futures(List<CompletableFuture<R>> futures) {
640        return currentFuture((input) -> getFuture(futures));
641    }
642
643    /**
644     * Executes a Task on the Main thread that provides a list of Futures, and holds processing
645     * of the chain until all of the futures completes.
646     *
647     * The response of every future will be passed to the next task as a List, in the order
648     * the futures were supplied.
649     *
650     * @param task The Futures Provider Task
651     * @param <R> Return type that the next parameter can expect as argument type
652     */
653    @SuppressWarnings("WeakerAccess")
654    public <R> TaskChain<List<R>> syncFutures(Task<List<CompletableFuture<R>>, T> task) {
655        return syncFuture((input) -> getFuture(task.run(input)));
656    }
657
658    /**
659     * Executes a Task off the Main thread that provides a list of Futures, and holds processing
660     * of the chain until all of the futures completes.
661     *
662     * The response of every future will be passed to the next task as a List, in the order
663     * the futures were supplied.
664     *
665     * @param task The Futures Provider Task
666     * @param <R> Return type that the next parameter can expect as argument type
667     */
668    @SuppressWarnings("WeakerAccess")
669    public <R> TaskChain<List<R>> asyncFutures(Task<List<CompletableFuture<R>>, T> task) {
670        return asyncFuture((input) -> getFuture(task.run(input)));
671    }
672
673    /**
674     * Executes a Task on the current thread that provides a list of Futures, and holds processing
675     * of the chain until all of the futures completes.
676     *
677     * The response of every future will be passed to the next task as a List, in the order
678     * the futures were supplied.
679     *
680     * @param task The Futures Provider Task
681     * @param <R> Return type that the next parameter can expect as argument type
682     */
683    @SuppressWarnings("WeakerAccess")
684    public <R> TaskChain<List<R>> currentFutures(Task<List<CompletableFuture<R>>, T> task) {
685        return currentFuture((input) -> getFuture(task.run(input)));
686    }
687
688    /**
689     * Executes a Task on the Main thread that provides a list of Futures, and holds processing
690     * of the chain until all of the futures completes.
691     *
692     * The response of every future will be passed to the next task as a List, in the order
693     * the futures were supplied.
694     *
695     * @param task The Futures Provider Task
696     * @param <R> Return type that the next parameter can expect as argument type
697     */
698    @SuppressWarnings("WeakerAccess")
699    public <R> TaskChain<List<R>> syncFirstFutures(FirstTask<List<CompletableFuture<R>>> task) {
700        return syncFuture((input) -> getFuture(task.run()));
701    }
702
703    /**
704     * Executes a Task off the Main thread that provides a list of Futures, and holds processing
705     * of the chain until all of the futures completes.
706     *
707     * The response of every future will be passed to the next task as a List, in the order
708     * the futures were supplied.
709     *
710     * @param task The Futures Provider Task
711     * @param <R> Return type that the next parameter can expect as argument type
712     */
713    @SuppressWarnings("WeakerAccess")
714    public <R> TaskChain<List<R>> asyncFirstFutures(FirstTask<List<CompletableFuture<R>>> task) {
715        return asyncFuture((input) -> getFuture(task.run()));
716    }
717
718    /**
719     * Executes a Task on the current thread that provides a list of Futures, and holds processing
720     * of the chain until all of the futures completes.
721     *
722     * The response of every future will be passed to the next task as a List, in the order
723     * the futures were supplied.
724     *
725     * @param task The Futures Provider Task
726     * @param <R> Return type that the next parameter can expect as argument type
727     */
728    @SuppressWarnings("WeakerAccess")
729    public <R> TaskChain<List<R>> currentFirstFutures(FirstTask<List<CompletableFuture<R>>> task) {
730        return currentFuture((input) -> getFuture(task.run()));
731    }
732
733    /**
734     * Execute a task on the main thread, with no previous input, that will return a Future to signal completion
735     *
736     * It's important you don't perform blocking operations in this method. Only use this if
737     * the task will be scheduling a different async operation outside of the TaskChains scope.
738     *
739     *
740     * @param task The task to execute
741     * @param <R> Return type that the next parameter can expect as argument type
742     */
743    @SuppressWarnings("WeakerAccess")
744    public <R> TaskChain<R> syncFirstFuture(FutureFirstTask<R> task) {
745        //noinspection unchecked
746        return add0(new TaskHolder<>(this, false, task));
747    }
748
749    /**
750     * {@link TaskChain#syncFirstFuture(FutureFirstTask)} but ran off main thread
751     * @param task The task to execute
752     * @param <R> Return type that the next parameter can expect as argument type
753     */
754    @SuppressWarnings("WeakerAccess")
755    public <R> TaskChain<R> asyncFirstFuture(FutureFirstTask<R> task) {
756        //noinspection unchecked
757        return add0(new TaskHolder<>(this, true, task));
758    }
759
760    /**
761     * {@link TaskChain#syncFirstFuture(FutureFirstTask)} but ran on current thread the Chain was created on
762     * @param task The task to execute
763     * @param <R> Return type that the next parameter can expect as argument type
764     */
765    @SuppressWarnings("WeakerAccess")
766    public <R> TaskChain<R> currentFirstFuture(FutureFirstTask<R> task) {
767        //noinspection unchecked
768        return add0(new TaskHolder<>(this, null, task));
769    }
770
771    /**
772     * Execute a task on the main thread, with the last output as the input to the future provider,
773     * that will return a Future to signal completion.
774     *
775     * It's important you don't perform blocking operations in this method. Only use this if
776     * the task will be scheduling a different async operation outside of the TaskChains scope.
777     *
778     * @param task The task to execute
779     * @param <R> Return type that the next parameter can expect as argument type
780     */
781    @SuppressWarnings("WeakerAccess")
782    public <R> TaskChain<R> syncFuture(FutureTask<R, T> task) {
783        //noinspection unchecked
784        return add0(new TaskHolder<>(this, false, task));
785    }
786
787    /**
788     * {@link TaskChain#syncFuture(FutureTask)}, ran on main thread but no input or output
789     * @param task The task to execute
790     */
791    @SuppressWarnings("WeakerAccess")
792    public TaskChain<?> syncFuture(FutureGenericTask task) {
793        return add0(new TaskHolder<>(this, false, task));
794    }
795
796    /**
797     * {@link TaskChain#syncFuture(FutureTask)} but the future provider is ran off main thread
798     * @param task The task to execute
799     * @param <R> Return type that the next parameter can expect as argument type
800     */
801    @SuppressWarnings("WeakerAccess")
802    public <R> TaskChain<R> asyncFuture(FutureTask<R, T> task) {
803        //noinspection unchecked
804        return add0(new TaskHolder<>(this, true, task));
805    }
806
807    /**
808     * {@link TaskChain#syncFuture(FutureTask)} but the future provider is ran off main thread
809     * @param task The task to execute
810     */
811    @SuppressWarnings("WeakerAccess")
812    public TaskChain<?> asyncFuture(FutureGenericTask task) {
813        return add0(new TaskHolder<>(this, true, task));
814    }
815
816    /**
817     * {@link TaskChain#syncFuture(FutureTask)} but the future provider is ran on current thread the Chain was created on
818     * @param task The task to execute
819     * @param <R> Return type that the next parameter can expect as argument type
820     */
821    @SuppressWarnings("WeakerAccess")
822    public <R> TaskChain<R> currentFuture(FutureTask<R, T> task) {
823        //noinspection unchecked
824        return add0(new TaskHolder<>(this, null, task));
825    }
826
827    /**
828     * {@link TaskChain#syncFuture(FutureTask)} but the future provider is ran on current thread the Chain was created on
829     * @param task The task to execute
830     */
831    @SuppressWarnings("WeakerAccess")
832    public TaskChain<?> currentFuture(FutureGenericTask task) {
833        return add0(new TaskHolder<>(this, null, task));
834    }
835
836    // </editor-fold>
837    // <editor-fold desc="// API Methods - Normal">
838    /* ======================================================================================== */
839    // Normal Tasks
840    /* ======================================================================================== */
841
842    /**
843     * Execute task on main thread, with no input, returning an output
844     * @param task The task to execute
845     * @param <R> Return type that the next parameter can expect as argument type
846     */
847    @SuppressWarnings("WeakerAccess")
848    public <R> TaskChain<R> syncFirst(FirstTask<R> task) {
849        //noinspection unchecked
850        return add0(new TaskHolder<>(this, false, task));
851    }
852
853    /**
854     * {@link TaskChain#syncFirst(FirstTask)} but ran off main thread
855     * @param task The task to execute
856     * @param <R> Return type that the next parameter can expect as argument type
857     */
858    @SuppressWarnings("WeakerAccess")
859    public <R> TaskChain<R> asyncFirst(FirstTask<R> task) {
860        //noinspection unchecked
861        return add0(new TaskHolder<>(this, true, task));
862    }
863
864    /**
865     * {@link TaskChain#syncFirst(FirstTask)} but ran on current thread the Chain was created on
866     * @param task The task to execute
867     * @param <R> Return type that the next parameter can expect as argument type
868     */
869    @SuppressWarnings("WeakerAccess")
870    public <R> TaskChain<R> currentFirst(FirstTask<R> task) {
871        //noinspection unchecked
872        return add0(new TaskHolder<>(this, null, task));
873    }
874
875    /**
876     * Execute task on main thread, with the last returned input, returning an output
877     * @param task The task to execute
878     * @param <R> Return type that the next parameter can expect as argument type
879     */
880    @SuppressWarnings("WeakerAccess")
881    public <R> TaskChain<R> sync(Task<R, T> task) {
882        //noinspection unchecked
883        return add0(new TaskHolder<>(this, false, task));
884    }
885
886    /**
887     * Execute task on main thread, with no input or output
888     * @param task The task to execute
889     */
890    @SuppressWarnings("WeakerAccess")
891    public TaskChain<?> sync(GenericTask task) {
892        return add0(new TaskHolder<>(this, false, task));
893    }
894
895    /**
896     * {@link TaskChain#sync(Task)} but ran off main thread
897     * @param task The task to execute
898     * @param <R> Return type that the next parameter can expect as argument type
899     */
900    @SuppressWarnings("WeakerAccess")
901    public <R> TaskChain<R> async(Task<R, T> task) {
902        //noinspection unchecked
903        return add0(new TaskHolder<>(this, true, task));
904    }
905
906    /**
907     * {@link TaskChain#sync(GenericTask)} but ran off main thread
908     * @param task The task to execute
909     */
910    @SuppressWarnings("WeakerAccess")
911    public TaskChain<?> async(GenericTask task) {
912        return add0(new TaskHolder<>(this, true, task));
913    }
914
915    /**
916     * {@link TaskChain#sync(Task)} but ran on current thread the Chain was created on
917     * @param task The task to execute
918     * @param <R> Return type that the next parameter can expect as argument type
919     */
920    @SuppressWarnings("WeakerAccess")
921    public <R> TaskChain<R> current(Task<R, T> task) {
922        //noinspection unchecked
923        return add0(new TaskHolder<>(this, null, task));
924    }
925
926    /**
927     * {@link TaskChain#sync(GenericTask)} but ran on current thread the Chain was created on
928     * @param task The task to execute
929     */
930    @SuppressWarnings("WeakerAccess")
931    public TaskChain<?> current(GenericTask task) {
932        return add0(new TaskHolder<>(this, null, task));
933    }
934
935
936    /**
937     * Execute task on main thread, with the last output, and no furthur output
938     * @param task The task to execute
939     */
940    @SuppressWarnings("WeakerAccess")
941    public TaskChain<?> syncLast(LastTask<T> task) {
942        return add0(new TaskHolder<>(this, false, task));
943    }
944
945    /**
946     * {@link TaskChain#syncLast(LastTask)} but ran off main thread
947     * @param task The task to execute
948     */
949    @SuppressWarnings("WeakerAccess")
950    public TaskChain<?> asyncLast(LastTask<T> task) {
951        return add0(new TaskHolder<>(this, true, task));
952    }
953
954    /**
955     * {@link TaskChain#syncLast(LastTask)} but ran on current thread the Chain was created on
956     * @param task The task to execute
957     */
958    @SuppressWarnings("WeakerAccess")
959    public TaskChain<?> currentLast(LastTask<T> task) {
960        return add0(new TaskHolder<>(this, null, task));
961    }
962
963    /**
964     * Finished adding tasks, begins executing them.
965     */
966    @SuppressWarnings("WeakerAccess")
967    public void execute() {
968        execute((Consumer<Boolean>) null, null);
969    }
970
971    /**
972     * Finished adding tasks, begins executing them with a done notifier
973     * @param done The Callback to handle when the chain has finished completion. Argument to consumer contains finish state
974     */
975    @SuppressWarnings("WeakerAccess")
976    public void execute(Runnable done) {
977        execute((finished) -> done.run(), null);
978    }
979
980    /**
981     * Finished adding tasks, begins executing them with a done notifier and error handler
982     * @param done The Callback to handle when the chain has finished completion. Argument to consumer contains finish state
983     * @param errorHandler The Error handler to handle exceptions
984     */
985    @SuppressWarnings("WeakerAccess")
986    public void execute(Runnable done, BiConsumer<Exception, Task<?, ?>> errorHandler) {
987        execute((finished) -> done.run(), errorHandler);
988    }
989
990    /**
991     * Finished adding tasks, with a done notifier
992     * @param done The Callback to handle when the chain has finished completion. Argument to consumer contains finish state
993     */
994    @SuppressWarnings("WeakerAccess")
995    public void execute(Consumer<Boolean> done) {
996        execute(done, null);
997    }
998
999    /**
1000     * Finished adding tasks, begins executing them, with an error handler
1001     * @param errorHandler The Error handler to handle exceptions
1002     */
1003    public void execute(BiConsumer<Exception, Task<?, ?>> errorHandler) {
1004        execute((Consumer<Boolean>) null, errorHandler);
1005    }
1006
1007    /**
1008     * Finished adding tasks, begins executing them with a done notifier and error handler
1009     * @param done The Callback to handle when the chain has finished completion. Argument to consumer contains finish state
1010     * @param errorHandler The Error handler to handle exceptions
1011     */
1012    public void execute(Consumer<Boolean> done, BiConsumer<Exception, Task<?, ?>> errorHandler) {
1013        if (errorHandler == null) {
1014            errorHandler = factory.getDefaultErrorHandler();
1015        }
1016        this.doneCallback = done;
1017        this.errorHandler = errorHandler;
1018        execute0();
1019    }
1020
1021    // </editor-fold>
1022    /* ======================================================================================== */
1023    // <editor-fold desc="// Implementation Details">
1024    private <A1, A2, A3> void handleAbortAction(TaskChainAbortAction<A1, A2, A3> action, A1 arg1, A2 arg2, A3 arg3) {
1025        if (action != null) {
1026            final TaskChain<?> prev = currentChain.get();
1027            try {
1028                currentChain.set(this);
1029                action.onAbort(this, arg1, arg2, arg3);
1030            } catch (Exception e) {
1031                TaskChainUtil.logError("TaskChain Exception in Abort Action handler: " + action.getClass().getName());
1032                TaskChainUtil.logError("Current Action Index was: " + currentActionIndex);
1033                e.printStackTrace();
1034            } finally {
1035                currentChain.set(prev);
1036            }
1037        }
1038        abort();
1039    }
1040
1041    void execute0() {
1042        synchronized (this) {
1043            if (this.executed) {
1044                throw new RuntimeException("Already executed");
1045            }
1046            this.executed = true;
1047        }
1048        async = !impl.isMainThread();
1049        nextTask();
1050    }
1051
1052    void done(boolean finished) {
1053        this.done = true;
1054        if (this.doneCallback != null) {
1055            final TaskChain<?> prev = currentChain.get();
1056            try {
1057                currentChain.set(this);
1058                this.doneCallback.accept(finished);
1059            } catch (Exception e) {
1060                this.handleError(e, null);
1061            } finally {
1062                currentChain.set(prev);
1063            }
1064        }
1065    }
1066
1067    @SuppressWarnings({"rawtypes", "WeakerAccess"})
1068    protected TaskChain add0(TaskHolder<?,?> task) {
1069        synchronized (this) {
1070            if (this.executed) {
1071                throw new RuntimeException("TaskChain is executing");
1072            }
1073        }
1074
1075        this.chainQueue.add(task);
1076        return this;
1077    }
1078
1079    /**
1080     * Fires off the next task, and switches between Async/Sync as necessary.
1081     */
1082    private void nextTask() {
1083        synchronized (this) {
1084            this.currentHolder = this.chainQueue.poll();
1085            if (this.currentHolder == null) {
1086                this.done = true; // to ensure its done while synchronized
1087            }
1088        }
1089
1090        if (this.currentHolder == null) {
1091            this.previous = null;
1092            // All Done!
1093            this.done(true);
1094            return;
1095        }
1096
1097        Boolean isNextAsync = this.currentHolder.async;
1098        if (isNextAsync == null || factory.shutdown) {
1099            this.currentHolder.run();
1100        } else if (isNextAsync) {
1101            if (this.async) {
1102                this.currentHolder.run();
1103            } else {
1104                impl.postAsync(() -> {
1105                    this.async = true;
1106                    this.currentHolder.run();
1107                });
1108            }
1109        } else {
1110            if (this.async) {
1111                impl.postToMain(() -> {
1112                    this.async = false;
1113                    this.currentHolder.run();
1114                });
1115            } else {
1116                this.currentHolder.run();
1117            }
1118        }
1119    }
1120
1121    private void handleError(Throwable throwable, Task<?, ?> task) {
1122        Exception e = throwable instanceof Exception ? (Exception) throwable : new Exception(throwable);
1123        if (errorHandler != null) {
1124            final TaskChain<?> prev = currentChain.get();
1125            try {
1126                currentChain.set(this);
1127                errorHandler.accept(e, task);
1128            } catch (Exception e2) {
1129                TaskChainUtil.logError("TaskChain Exception in the error handler!" + e2.getMessage());
1130                TaskChainUtil.logError("Current Action Index was: " + currentActionIndex);
1131                e.printStackTrace();
1132            } finally {
1133                currentChain.set(prev);
1134            }
1135        } else {
1136            TaskChainUtil.logError("TaskChain Exception on " + (task != null ? task.getClass().getName() : "Done Hander") + ": " + e.getMessage());
1137            TaskChainUtil.logError("Current Action Index was: " + currentActionIndex);
1138            e.printStackTrace();
1139        }
1140    }
1141
1142    private void abortExecutingChain() {
1143        this.previous = null;
1144        this.chainQueue.clear();
1145        this.done(false);
1146    }
1147
1148    private <R> CompletableFuture<List<R>> getFuture(List<CompletableFuture<R>> futures) {
1149        CompletableFuture<List<R>> onDone = new CompletableFuture<>();
1150        CompletableFuture<?>[] futureArray = new CompletableFuture<?>[futures.size()];
1151        CompletableFuture.allOf((CompletableFuture<?>[]) futures.toArray(futureArray)).whenComplete((aVoid, throwable) -> {
1152            if (throwable != null) {
1153                onDone.completeExceptionally(throwable);
1154            } else {
1155                boolean[] error = {false};
1156                final List<R> results = futures.stream().map(f -> {
1157                    try {
1158                        return f.join();
1159                    } catch (Exception e) {
1160                        error[0] = true;
1161                        TaskChain.this.handleError(e, TaskChain.this.currentHolder.task);
1162                        return null;
1163                    }
1164                }).collect(Collectors.toList());
1165                if (error[0]) {
1166                    onDone.completeExceptionally(new Exception("Future Dependant had an exception"));
1167                } else {
1168                    onDone.complete(results);
1169                }
1170            }
1171        });
1172        return onDone;
1173    }
1174
1175    // </editor-fold>
1176    /* ======================================================================================== */
1177    // <editor-fold desc="// TaskHolder">
1178    /**
1179     * Provides foundation of a task with what the previous task type should return
1180     * to pass to this and what this task will return.
1181     * @param <R> Return Type
1182     * @param <A> Argument Type Expected
1183     */
1184    @SuppressWarnings("AccessingNonPublicFieldOfAnotherObject")
1185    private class TaskHolder<R, A> {
1186        private final TaskChain<?> chain;
1187        private final Task<R, A> task;
1188        final Boolean async;
1189
1190        private boolean executed = false;
1191        private boolean aborted = false;
1192        private final int actionIndex;
1193
1194        private TaskHolder(TaskChain<?> chain, Boolean async, Task<R, A> task) {
1195            this.actionIndex = TaskChain.this.actionIndex++;
1196            this.task = task;
1197            this.chain = chain;
1198            this.async = async;
1199        }
1200
1201        /**
1202         * Called internally by Task Chain to facilitate executing the task and then the next task.
1203         */
1204        private void run() {
1205            final Object arg = this.chain.previous;
1206            this.chain.previous = null;
1207            TaskChain.this.currentActionIndex = this.actionIndex;
1208            final R res;
1209            final TaskChain<?> prevChain = currentChain.get();
1210            try {
1211                currentChain.set(this.chain);
1212                if (this.task instanceof FutureTask) {
1213                    //noinspection unchecked
1214                    final CompletableFuture<R> future = ((FutureTask<R, A>) this.task).runFuture((A) arg);
1215                    if (future == null) {
1216                        throw new NullPointerException("Must return a Future");
1217                    }
1218                    future.whenComplete((r, throwable) -> {
1219                        if (throwable != null) {
1220                            this.chain.handleError(throwable, this.task);
1221                            this.abort();
1222                        } else {
1223                            this.next(r);
1224                        }
1225                    });
1226                } else if (this.task instanceof AsyncExecutingTask) {
1227                    //noinspection unchecked
1228                    ((AsyncExecutingTask<R, A>) this.task).runAsync((A) arg, this::next);
1229                } else {
1230                    //noinspection unchecked
1231                    next(this.task.run((A) arg));
1232                }
1233            } catch (Throwable e) {
1234                //noinspection ConstantConditions
1235                if (e instanceof AbortChainException) {
1236                    this.abort();
1237                    return;
1238                }
1239                this.chain.handleError(e, this.task);
1240                this.abort();
1241            } finally {
1242                if (prevChain != null) {
1243                    currentChain.set(prevChain);
1244                } else {
1245                    currentChain.remove();
1246                }
1247            }
1248        }
1249
1250        /**
1251         * Abort the chain, and clear tasks for GC.
1252         */
1253        private synchronized void abort() {
1254            this.aborted = true;
1255            this.chain.abortExecutingChain();
1256        }
1257
1258        /**
1259         * Accepts result of previous task and executes the next
1260         */
1261        private void next(Object resp) {
1262            synchronized (this) {
1263                if (this.aborted) {
1264                    this.chain.done(false);
1265                    return;
1266                }
1267                if (this.executed) {
1268                    this.chain.done(false);
1269                    throw new RuntimeException("This task has already been executed.");
1270                }
1271                this.executed = true;
1272            }
1273
1274            this.chain.async = !TaskChain.this.impl.isMainThread(); // We don't know where the task called this from.
1275            this.chain.previous = resp;
1276            this.chain.nextTask();
1277        }
1278    }
1279    // </editor-fold>
1280}