Câu hỏi ExecutorService ngắt công việc sau khi hết thời gian chờ


Tôi đang tìm kiếm một ExecutorService thực hiện có thể được cung cấp với thời gian chờ. Các tác vụ được gửi tới ExecutorService bị gián đoạn nếu chúng mất nhiều thời gian hơn thời gian chờ để chạy. Thực hiện một con thú như vậy không phải là một nhiệm vụ khó khăn như vậy, nhưng tôi tự hỏi nếu có ai biết về một thực hiện hiện có.

Dưới đây là những gì tôi đưa ra dựa trên một số cuộc thảo luận dưới đây. Có ý kiến ​​gì không?

import java.util.List;
import java.util.concurrent.*;

public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
    private final long timeout;
    private final TimeUnit timeoutUnit;

    private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
    private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    @Override
    public void shutdown() {
        timeoutExecutor.shutdown();
        super.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        timeoutExecutor.shutdownNow();
        return super.shutdownNow();
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        if(timeout > 0) {
            final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);
            runningTasks.put(r, scheduled);
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        ScheduledFuture timeoutTask = runningTasks.remove(r);
        if(timeoutTask != null) {
            timeoutTask.cancel(false);
        }
    }

    class TimeoutTask implements Runnable {
        private final Thread thread;

        public TimeoutTask(Thread thread) {
            this.thread = thread;
        }

        @Override
        public void run() {
            thread.interrupt();
        }
    }
}

76
2018-05-03 14:10


gốc


Đó có phải là 'thời gian bắt đầu' của thời gian chờ thời gian gửi không? Hoặc thời gian nhiệm vụ bắt đầu thực hiện? - Tim Bender
Câu hỏi hay. Khi nó bắt đầu thực hiện. Có lẽ sử dụng protected void beforeExecute(Thread t, Runnable r) móc. - scompt.com
@ scompt.com là bạn vẫn đang sử dụng giải pháp này hoặc đã được superceded - Paul Taylor
@PaulTaylor Công việc mà tôi đã triển khai giải pháp này đã được superceded. :-) - scompt.com
Tôi cần chính xác điều này, ngoại trừ a) Tôi cần dịch vụ lên lịch chính của tôi là một nhóm luồng với một chuỗi dịch vụ duy nhất vì cần các nhiệm vụ của tôi để thực hiện đồng thời một cách nghiêm ngặt và b) Tôi cần phải xác định thời gian chờ cho mỗi tác vụ tại thời gian nhiệm vụ được gửi. Tôi đã cố gắng sử dụng này như là một điểm khởi đầu nhưng mở rộng ScheduledThreadPoolExecutor, nhưng tôi không thể nhìn thấy một cách để có được thời gian chờ quy định đó là để được xác định tại thời gian nộp nhiệm vụ thông qua phương pháp beforeExecute. Bất cứ lời đề nghị nào cũng biết ơn! - Michael Ellis


Các câu trả lời:


Bạn có thể sử dụng ScheduledExecutorService cho điều này. Trước tiên, bạn sẽ chỉ gửi nó một lần để bắt đầu ngay lập tức và giữ lại tương lai được tạo ra. Sau đó bạn có thể gửi một nhiệm vụ mới sẽ hủy bỏ tương lai được giữ lại sau một khoảng thời gian.

 ScheduledExecutorService executor = Executors.newScheduledThreadPool(2); 
 final Future handler = executor.submit(new Callable(){ ... });
 executor.schedule(new Runnable(){
     public void run(){
         handler.cancel();
     }      
 }, 10000, TimeUnit.MILLISECONDS);

Điều này sẽ thực hiện xử lý của bạn (chức năng chính bị gián đoạn) trong 10 giây, sau đó sẽ hủy (tức là gián đoạn) nhiệm vụ cụ thể đó.


71
2018-05-03 15:12



Ý tưởng thú vị, nhưng điều gì sẽ xảy ra nếu nhiệm vụ kết thúc trước khi hết thời gian chờ (thường là như vậy)? Tôi không muốn có hàng tấn công việc dọn dẹp chờ đợi để chạy chỉ để tìm ra nhiệm vụ được giao của họ đã hoàn thành. Có cần phải là một chủ đề giám sát tương lai khi họ kết thúc để loại bỏ các nhiệm vụ dọn dẹp của họ. - scompt.com
Người thực thi sẽ chỉ lên lịch hủy này một lần. Nếu nhiệm vụ được hoàn thành thì việc hủy bỏ là không có op và công việc tiếp tục không thay đổi. Chỉ cần có thêm một luồng đề tài để hủy các nhiệm vụ và một luồng để chạy chúng. Bạn có thể có hai người thực thi, một để gửi các nhiệm vụ chính của bạn và một để hủy bỏ chúng. - John Vint
Đó là sự thật, nhưng nếu thời gian chờ là 5 giờ và trong thời gian đó 10k nhiệm vụ được thực thi. Tôi muốn tránh có tất cả những người không có ops nằm xung quanh chiếm bộ nhớ và gây ra bối cảnh chuyển mạch. - scompt.com
@Scompt Không nhất thiết. Sẽ có 10k lệnh inv.cations () trong tương lai, tuy nhiên nếu tương lai được hoàn thành thì việc hủy bỏ sẽ nhanh chóng thoát ra và không thực hiện bất kỳ công việc không cần thiết nào. Nếu bạn không muốn 10k thêm hủy bỏ invocations thì điều này có thể không hoạt động, nhưng số lượng công việc được thực hiện khi một nhiệm vụ được hoàn thành là rất nhỏ. - John Vint
@ John W .: Tôi vừa mới nhận ra một vấn đề khác với việc triển khai của bạn. Tôi cần thời gian chờ để bắt đầu khi nhiệm vụ bắt đầu thực hiện, như tôi đã nhận xét trước đó. Tôi nghĩ cách duy nhất để làm điều đó là sử dụng beforeExecute móc. - scompt.com


Thật không may giải pháp là thiếu sót. Có một loại lỗi với ScheduledThreadPoolExecutor, cũng được báo cáo trong câu hỏi này: hủy bỏ tác vụ đã gửi không giải phóng đầy đủ các tài nguyên bộ nhớ liên quan đến tác vụ; các tài nguyên được phát hành chỉ khi nhiệm vụ hết hạn.

Nếu bạn tạo ra một TimeoutThreadPoolExecutor với thời gian hết hạn khá dài (một cách sử dụng điển hình) và gửi các tác vụ đủ nhanh, bạn sẽ làm đầy bộ nhớ - ngay cả khi các tác vụ thực sự đã hoàn tất thành công.

Bạn có thể thấy vấn đề với chương trình thử nghiệm sau (rất thô):

public static void main(String[] args) throws InterruptedException {
    ExecutorService service = new TimeoutThreadPoolExecutor(1, 1, 10, TimeUnit.SECONDS, 
            new LinkedBlockingQueue<Runnable>(), 10, TimeUnit.MINUTES);
    //ExecutorService service = Executors.newFixedThreadPool(1);
    try {
        final AtomicInteger counter = new AtomicInteger();
        for (long i = 0; i < 10000000; i++) {
            service.submit(new Runnable() {
                @Override
                public void run() {
                    counter.incrementAndGet();
                }
            });
            if (i % 10000 == 0) {
                System.out.println(i + "/" + counter.get());
                while (i > counter.get()) {
                    Thread.sleep(10);
                }
            }
        }
    } finally {
        service.shutdown();
    }
}

Chương trình loại bỏ bộ nhớ có sẵn, mặc dù nó chờ đợi cho sinh sản Runnables để hoàn thành.

Tôi mặc dù về điều này một thời gian, nhưng tiếc là tôi không thể đưa ra một giải pháp tốt.

CHỈNH SỬA: Tôi phát hiện ra vấn đề này đã được báo cáo là Lỗi JDK 6602600và dường như đã được sửa chữa rất gần đây.


5
2017-10-11 16:14





Gói nhiệm vụ trong FutureTask và bạn có thể chỉ định thời gian chờ cho FutureTask. Nhìn vào ví dụ trong câu trả lời của tôi cho câu hỏi này,

java native Thời gian chờ xử lý


4
2018-05-03 14:46



Tôi nhận ra có một vài cách để làm điều này bằng cách sử dụng java.util.concurrent các lớp học, nhưng tôi đang tìm kiếm ExecutorService thực hiện. - scompt.com
Nếu bạn đang nói rằng bạn muốn ExecutorService của bạn che giấu thực tế là timeouts đang được thêm từ mã máy khách, bạn có thể triển khai ExecutorService của riêng bạn để bao bọc tất cả các runnable cho nó bằng FutureTask trước khi thực thi chúng. - erikprice


Cách sử dụng ExecutorService.shutDownNow() phương pháp như mô tả trong http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html? Nó có vẻ là giải pháp đơn giản nhất.


1
2018-03-04 19:27



Bởi vì nó sẽ ngừng tất cả các nhiệm vụ theo lịch trình và không phải là một nhiệm vụ cụ thể như được yêu cầu bởi câu hỏi - MikeL


Có vẻ như vấn đề không nằm trong lỗi JDK 6602600 (nó đã được giải quyết tại 2010-05-22), nhưng trong cuộc gọi không chính xác của giấc ngủ (10) trong vòng tròn. Ngoài ra, chủ đề chính phải cung cấp trực tiếp CHANCE đến các chủ đề khác để nhận ra các nhiệm vụ của họ bằng cách gọi SLEEP (0) trong MỌI chi nhánh của vòng tròn bên ngoài. Nó là tốt hơn, tôi nghĩ rằng, để sử dụng Thread.yield () thay vì Thread.sleep (0)

Kết quả sửa chữa một phần của mã vấn đề trước đó là như thế này:

.......................
........................
Thread.yield();         

if (i % 1000== 0) {
System.out.println(i + "/" + counter.get()+ "/"+service.toString());
}

//                
//                while (i > counter.get()) {
//                    Thread.sleep(10);
//                } 

Nó hoạt động chính xác với số lượng truy cập bên ngoài lên đến 150 000 000 vòng tròn thử nghiệm.


1
2017-12-27 08:24





Sau hàng tấn thời gian khảo sát,
Cuối cùng, tôi sử dụng invokeAll phương pháp của ExecutorService để giải quyết vấn đề này.
Điều đó sẽ làm gián đoạn hoàn toàn nhiệm vụ trong khi nhiệm vụ đang chạy.
Đây là ví dụ

ExecutorService executorService = Executors.newCachedThreadPool();

try {
    List<Callable<Object>> callables = new ArrayList<>();
    // Add your long time task (callable)
    callables.add(new VaryLongTimeTask());
    // Assign tasks for specific execution timeout (e.g. 2 sec)
    List<Future<Object>> futures = executorService.invokeAll(callables, 2000, TimeUnit.MILLISECONDS);
    for (Future<Object> future : futures) {
        // Getting result
    }
} catch (InterruptedException e) {
    e.printStackTrace();
}

executorService.shutdown();

Chuyên nghiệp là bạn cũng có thể gửi ListenableFuture ở cùng một ExecutorService.
Chỉ cần thay đổi một chút dòng đầu tiên của mã.

ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());

ListeningExecutorService là tính năng nghe của ExecutorService tại dự án google ổi (com.google.guava))


1
2017-08-02 01:56



Cảm ơn bạn đã chỉ ra invokeAll. Điều đó hoạt động rất tốt. Chỉ cần một lời cảnh cáo cho bất cứ ai suy nghĩ về việc sử dụng này: mặc dù invokeAll trả về một danh sách Future các đối tượng, nó thực sự có vẻ là một hoạt động chặn. - mxro


Sử dụng câu trả lời của John W Tôi đã tạo một triển khai thực hiện chính xác thời gian chờ khi nhiệm vụ bắt đầu thực hiện. Tôi thậm chí viết một bài kiểm tra đơn vị cho nó :)

Tuy nhiên, nó không phù hợp với nhu cầu của tôi vì một số hoạt động IO không bị gián đoạn khi Future.cancel() được gọi (tức là khi Thread.interrupted() được gọi là).

Dù sao thì nếu có ai quan tâm, tôi đã tạo ra một ý chính: https://gist.github.com/amanteaux/64c54a913c1ae34ad7b86db109cbc0bf


1
2017-07-16 21:04





Điều gì về ý tưởng thay thế này:

  • hai có hai người thực thi:
    • một cho:
      • gửi tác vụ, mà không cần quan tâm đến thời gian chờ của tác vụ
      • thêm kết quả tương lai và thời điểm kết thúc với cấu trúc bên trong
    • một để thực hiện một công việc nội bộ đang kiểm tra cấu trúc bên trong nếu một số nhiệm vụ hết thời gian chờ và nếu chúng bị hủy bỏ.

Mẫu nhỏ là ở đây:

public class AlternativeExecutorService 
{

private final CopyOnWriteArrayList<ListenableFutureTask> futureQueue       = new CopyOnWriteArrayList();
private final ScheduledThreadPoolExecutor                scheduledExecutor = new ScheduledThreadPoolExecutor(1); // used for internal cleaning job
private final ListeningExecutorService                   threadExecutor    = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(5)); // used for
private ScheduledFuture scheduledFuture;
private static final long INTERNAL_JOB_CLEANUP_FREQUENCY = 1000L;

public AlternativeExecutorService()
{
    scheduledFuture = scheduledExecutor.scheduleAtFixedRate(new TimeoutManagerJob(), 0, INTERNAL_JOB_CLEANUP_FREQUENCY, TimeUnit.MILLISECONDS);
}

public void pushTask(OwnTask task)
{
    ListenableFuture<Void> future = threadExecutor.submit(task);  // -> create your Callable
    futureQueue.add(new ListenableFutureTask(future, task, getCurrentMillisecondsTime())); // -> store the time when the task should end
}

public void shutdownInternalScheduledExecutor()
{
    scheduledFuture.cancel(true);
    scheduledExecutor.shutdownNow();
}

long getCurrentMillisecondsTime()
{
    return Calendar.getInstance().get(Calendar.MILLISECOND);
}

class ListenableFutureTask
{
    private final ListenableFuture<Void> future;
    private final OwnTask                task;
    private final long                   milliSecEndTime;

    private ListenableFutureTask(ListenableFuture<Void> future, OwnTask task, long milliSecStartTime)
    {
        this.future = future;
        this.task = task;
        this.milliSecEndTime = milliSecStartTime + task.getTimeUnit().convert(task.getTimeoutDuration(), TimeUnit.MILLISECONDS);
    }

    ListenableFuture<Void> getFuture()
    {
        return future;
    }

    OwnTask getTask()
    {
        return task;
    }

    long getMilliSecEndTime()
    {
        return milliSecEndTime;
    }
}

class TimeoutManagerJob implements Runnable
{
    CopyOnWriteArrayList<ListenableFutureTask> getCopyOnWriteArrayList()
    {
        return futureQueue;
    }

    @Override
    public void run()
    {
        long currentMileSecValue = getCurrentMillisecondsTime();
        for (ListenableFutureTask futureTask : futureQueue)
        {
            consumeFuture(futureTask, currentMileSecValue);
        }
    }

    private void consumeFuture(ListenableFutureTask futureTask, long currentMileSecValue)
    {
        ListenableFuture<Void> future = futureTask.getFuture();
        boolean isTimeout = futureTask.getMilliSecEndTime() >= currentMileSecValue;
        if (isTimeout)
        {
            if (!future.isDone())
            {
                future.cancel(true);
            }
            futureQueue.remove(futureTask);
        }
    }
}

class OwnTask implements Callable<Void>
{
    private long     timeoutDuration;
    private TimeUnit timeUnit;

    OwnTask(long timeoutDuration, TimeUnit timeUnit)
    {
        this.timeoutDuration = timeoutDuration;
        this.timeUnit = timeUnit;
    }

    @Override
    public Void call() throws Exception
    {
        // do logic
        return null;
    }

    public long getTimeoutDuration()
    {
        return timeoutDuration;
    }

    public TimeUnit getTimeUnit()
    {
        return timeUnit;
    }
}
}

0
2017-08-07 13:20