Posts Tagged: multi-threaded


28
Mar 11

Ruthless but Fair Task Executor

Facebook executionGoal: To implement fair mechanism for execution of tasks in parallel with fair usage of resources by each task.
Proposed implementation is going to be ruthless so as fairly long running tasks will be simply killed, so they don’t block execution of other tasks in the queue.






Ingridients: WatchedFutureTask – a monitored task, which should rather be nimble and swift.

public class WatchedFutureTask extends FutureTask<Object> {
    private long startTime;

    public WatchedFutureTask(Runnable runnable) {
        super(runnable, null);
    }
   
    public void run() {
        startTime = System.currentTimeMillis();
        super.run();
    }

    public boolean isTimedOut(int timeout) {
        return !isCancelled() && !isDone() && System.currentTimeMillis() > (startTime + timeout);
    }
}

TaskMonitor – the ruthless and thread-safe killer.

public class TaskMonitor extends Thread {
    private final Queue<WatchedFutureTask> runningTasks = new ConcurrentLinkedQueue<WatchedFutureTask>();
    private final long timeout;

    public TaskMonitor(long timeout) {
        super();
        this.timeout = timeout;
    }

    public void watch(WatchedFutureTask task) {
        runningTasks.add(task);
    }

    public void run() {
        while (!isInterrupted()) {
            for (Iterator<WatchedFutureTask> it = runningTasks.iterator(); it.hasNext();) {
                WatchedFutureTask task = it.next();
                if (task.isTimedOut(timeout)) task.cancel(true);
                if (task.isDone() || task.isCancelled()) it.remove();
            }
            try {
                Thread.sleep(5000); // run every 5 second
            } (InterruptedException e) {
                // should exit naturally
            }
        }
    }
}

ExecutorService provided by java.util.concurrent package.
And now putting it all together we have got TaskExecutor:

public class TaskExecutor {
    private TaskMonitor monitor;
    private ExecutorService executor;

    public TaskExecutor(int threadPoolSize, long taskTimeout) {
        ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize);
        monitor = new TaskMonitor(taskTimeout);
        monitor.start();
    }

    public void submit(Runnable task) {
        WatchedFutureTask wtask = new WatchedFutureTask(task);
        executor.execute(wtask);
        monitor.watch(wtask);
    }
}

Leveraging capability of Future to hold task execution result (of type T), we could eloborate and arrange collection of the tasks execution results. To achieve this, you would replace usages of Runnable with usages of Callable and implement appropriate logic to store the outcomes.