Câu hỏi Làm thế nào để có được ThreadPoolExecutor để tăng chủ đề để tối đa trước khi xếp hàng?


Tôi đã thất vọng một thời gian với hành vi mặc định của ThreadPoolExecutor mà ủng hộ ExecutorService thread-pool mà rất nhiều người trong chúng ta sử dụng. Để trích dẫn từ Javadocs:

Nếu có nhiều hơn corePoolSize nhưng ít hơn tối đaPoolSize chủ đề chạy, một chủ đề mới sẽ được tạo ra chỉ khi hàng đợi đầy.

Điều này có nghĩa là nếu bạn định nghĩa một nhóm luồng với đoạn mã sau, nó sẽ không bao giờ bắt đầu chuỗi thứ hai vì LinkedBlockingQueue không bị chặn.

ExecutorService threadPool =
    new ThreadPoolExecutor(1 /*core*/, 50 /*max*/, 60 /*timeout*/,
        TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(/* unlimited queue */));

Chỉ khi bạn có một bị chặn hàng đợi và hàng đợi là đầy là bất kỳ chủ đề nào trên số lõi bắt đầu. Tôi nghi ngờ một số lượng lớn các lập trình viên đa luồng Java cơ sở không biết về hành vi này của ThreadPoolExecutor.

Bây giờ tôi có trường hợp sử dụng cụ thể, điều này không tối ưu. Tôi đang tìm cách, mà không cần viết lớp TPE của riêng tôi, để làm việc xung quanh nó.

Yêu cầu của tôi là dành cho dịch vụ web đang thực hiện gọi lại cho bên thứ ba có thể không đáng tin cậy.

  • Tôi không muốn thực hiện cuộc gọi lại đồng bộ với yêu cầu web, vì vậy tôi muốn sử dụng một nhóm chủ đề.
  • Tôi thường nhận được một vài trong số này một phút vì vậy tôi không muốn có một newFixedThreadPool(...) với một số lượng lớn các chủ đề mà chủ yếu là không hoạt động.
  • Mỗi khi tôi thường xuyên nhận được một burst của lưu lượng truy cập này và tôi muốn mở rộng số lượng các chủ đề đến một số giá trị tối đa (chúng ta hãy nói 50).
  • Tôi cần phải thực hiện một tốt cố gắng thực hiện tất cả các cuộc gọi lại vì vậy tôi muốn xếp hàng bất kỳ phần bổ sung nào trên 50. Tôi không muốn áp đảo phần còn lại của máy chủ web của mình bằng cách sử dụng newCachedThreadPool().

Làm thế nào tôi có thể làm việc xung quanh giới hạn này trong ThreadPoolExecutor nơi hàng đợi cần được giới hạn và đầy trước nhiều chủ đề hơn sẽ được bắt đầu? Làm thế nào tôi có thể làm cho nó bắt đầu nhiều chủ đề hơn trước công việc xếp hàng?

Chỉnh sửa:

@ Flavio làm cho một điểm tốt về việc sử dụng ThreadPoolExecutor.allowCoreThreadTimeOut(true) để có các chủ đề chính hết thời gian chờ và thoát. Tôi đã xem xét điều đó nhưng tôi vẫn muốn tính năng chủ đề cốt lõi. Tôi không muốn số lượng các chủ đề trong hồ bơi giảm xuống dưới kích thước lõi nếu có thể.


76
2017-10-22 21:02


gốc


Giả sử ví dụ của bạn tạo tối đa 10 luồng, có bất kỳ khoản tiết kiệm thực nào trong việc sử dụng thứ gì đó phát triển / co lại trên một hồ bơi có kích thước cố định không? - bstempi
Điểm tốt @bstempi. Con số này có phần tùy ý. Tôi đã tăng nó trong câu hỏi lên 50. Không chính xác chắc chắn bao nhiêu chủ đề đồng thời tôi thực sự muốn làm việc nhưng bây giờ tôi có giải pháp này. - Gray
Ôi chết thật! 10 upvotes nếu tôi có thể ở đây, chính xác cùng một vị trí tôi vào. - Eugene


Các câu trả lời:


Làm thế nào tôi có thể làm việc xung quanh giới hạn này trong ThreadPoolExecutor nơi hàng đợi cần được giới hạn và đầy đủ trước khi các luồng khác được bắt đầu.

Tôi tin rằng cuối cùng tôi đã tìm thấy một giải pháp hơi thanh lịch (có thể là một chút) cho giới hạn này ThreadPoolExecutor. Nó liên quan đến việc mở rộng LinkedBlockingQueue để có nó trở lại false cho queue.offer(...) khi đã có một số nhiệm vụ được xếp hàng đợi. Nếu các luồng hiện tại không theo kịp các nhiệm vụ được xếp hàng đợi, TPE sẽ thêm các luồng bổ sung. Nếu hồ bơi đã ở các chủ đề tối đa, thì RejectedExecutionHandler sẽ được gọi. Nó là trình xử lý mà sau đó thực hiện put(...) vào hàng đợi.

Nó chắc chắn là lạ để viết một hàng đợi nơi offer(...) có thể trở lại false và put() không bao giờ chặn nên đó là phần hack. Nhưng điều này làm việc tốt với việc sử dụng hàng đợi của TPE vì vậy tôi không thấy có vấn đề gì với việc này.

Đây là mã:

// extend LinkedBlockingQueue to force offer() to return false conditionally
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    private static final long serialVersionUID = -6903933921423432194L;
    @Override
    public boolean offer(Runnable e) {
        /*
         * Offer it to the queue if there is 0 items already queued, else
         * return false so the TPE will add another thread. If we return false
         * and max threads have been reached then the RejectedExecutionHandler
         * will be called which will do the put into the queue.
         */
        if (size() == 0) {
            return super.offer(e);
        } else {
            return false;
        }
    }
};
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1 /*core*/, 50 /*max*/,
        60 /*secs*/, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        try {
            /*
             * This does the actual put into the queue. Once the max threads
             * have been reached, the tasks will then queue up.
             */
            executor.getQueue().put(r);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return;
        }
    }
});

Với cơ chế này, khi tôi gửi nhiệm vụ cho hàng đợi, ThreadPoolExecutor sẽ:

  1. Quy mô số lượng các chủ đề lên đến kích thước lõi ban đầu (ở đây 1).
  2. Cung cấp nó vào hàng đợi. Nếu hàng đợi trống, nó sẽ được xếp hàng đợi để được xử lý bởi các luồng hiện có.
  3. Nếu hàng đợi có 1 hoặc nhiều phần tử, offer(...) sẽ trả về false.
  4. Nếu false được trả về, hãy tăng quy mô số lượng các chủ đề trong nhóm cho đến khi chúng đạt đến số tối đa (ở đây là 50).
  5. Nếu ở mức tối đa thì nó gọi RejectedExecutionHandler
  6. Các RejectedExecutionHandler sau đó đặt nhiệm vụ vào hàng đợi để được xử lý bởi chuỗi đầu tiên có sẵn trong thứ tự FIFO.

Mặc dù trong mã ví dụ của tôi ở trên, hàng đợi không bị chặn, bạn cũng có thể xác định nó như là một hàng đợi bị chặn. Ví dụ: nếu bạn thêm dung lượng 1000 vào LinkedBlockingQueue sau đó nó sẽ:

  1. quy mô các chủ đề tối đa
  2. sau đó xếp hàng cho đến khi nó đầy 1000 nhiệm vụ
  3. sau đó chặn người gọi cho đến khi không gian có sẵn cho hàng đợi.

Ngoài ra, nếu bạn thực sự cần sử dụng offer(...) bên trong RejectedExecutionHandler thì bạn có thể sử dụng offer(E, long, TimeUnit) phương pháp thay thế bằng Long.MAX_VALUE như thời gian chờ.

Chỉnh sửa:

Tôi đã chỉnh sửa offer(...) phương pháp ghi đè lên mỗi phản hồi của @ Ralf. Điều này sẽ chỉ mở rộng số lượng các chủ đề trong nhóm nếu chúng không theo kịp tải.

Chỉnh sửa:

Một tinh chỉnh khác cho câu trả lời này có thể là thực sự yêu cầu TPE nếu có chủ đề nhàn rỗi và chỉ enqueue mục nếu có. Bạn sẽ phải thực hiện một lớp học thực sự cho điều này và thêm một ourQueue.setThreadPoolExecutor(tpe); phương pháp trên đó.

Sau đó của bạn offer(...) phương pháp có thể trông giống như:

  1. Kiểm tra xem liệu tpe.getPoolSize() == tpe.getMaximumPoolSize() trong trường hợp đó chỉ cần gọi super.offer(...).
  2. Khác nếu tpe.getPoolSize() > tpe.getActiveCount() sau đó gọi super.offer(...) vì dường như các chủ đề nhàn rỗi.
  3. Ngược lại false để ngã ba một sợi khác.

Có lẽ điều này:

int poolSize = tpe.getPoolSize();
int maximumPoolSize = tpe.getMaximumPoolSize();
if (poolSize >= maximumPoolSize || poolSize > tpe.getActiveCount()) {
    return super.offer(e);
} else {
    return false;
}

Lưu ý rằng các phương thức nhận được trên TPE rất đắt vì chúng truy cập volatile trường hoặc (trong trường hợp getActiveCount()) khóa TPE và đi bộ danh sách chỉ. Ngoài ra, có điều kiện chủng tộc ở đây có thể gây ra một nhiệm vụ để được enqueued không đúng cách hoặc sợi khác chia rẽ khi có một thread nhàn rỗi.


35
2017-10-22 21:22



Tôi cũng vật lộn với cùng một vấn đề, kết thúc bằng phương pháp thực thi quan trọng. Nhưng đây thực sự là một giải pháp tốt đẹp. :) - Batty
Nhiều như tôi không thích ý tưởng phá vỡ hợp đồng Queue để thực hiện điều này, bạn chắc chắn không đơn độc trong ý tưởng của mình: groovy-programming.com/post/26923146865 - bstempi
Hấp dẫn. FYI. Tôi đã thực sự chỉ cải thiện phương pháp của tôi @bstempi cho mỗi phản hồi của Ralf. - Gray
Bạn không nhận được một sự kỳ quặc ở đây trong đó vài nhiệm vụ đầu tiên sẽ được xếp hàng đợi, và chỉ sau khi các chủ đề mới xuất hiện? Ví dụ: nếu một chuỗi chủ đề của bạn bận rộn với một tác vụ dài, và bạn gọi execute(runnable), sau đó runnable chỉ được thêm vào hàng đợi. Nếu bạn gọi execute(secondRunnable), sau đó secondRunnable được thêm vào hàng đợi. Nhưng bây giờ nếu bạn gọi execute(thirdRunnable), sau đó thirdRunnable sẽ được chạy trong một chủ đề mới. Các runnable và secondRunnable chỉ chạy một lần thirdRunnable (hoặc tác vụ chạy dài ban đầu) đã hoàn tất. - Robert Tupelo-Schneck
Đúng, Robert là đúng, trong một môi trường đa luồng, hàng đợi đôi khi phát triển trong khi có các chủ đề miễn phí để sử dụng. Giải pháp dưới đây mở rộng TPE - hoạt động tốt hơn nhiều. Tôi nghĩ rằng lời đề nghị của Robert nên được đánh dấu là câu trả lời, mặc dù hack ở trên là thú vị - Wanna Know All


Đặt kích thước lõi và kích thước tối đa thành cùng một giá trị và cho phép xóa các chủ đề chính khỏi hồ bơi allowCoreThreadTimeOut(true).


21
2018-06-30 15:37



1 Yeah Tôi nghĩ về điều đó nhưng tôi vẫn muốn có tính năng cốt lõi-chủ đề. Tôi không muốn các hồ bơi thread để đi đến 0 chủ đề trong thời gian không hoạt động. Tôi sẽ chỉnh sửa câu hỏi của tôi để chỉ ra điều đó. Nhưng điểm tuyệt vời. - Gray
Đến nay giải pháp tốt nhất cho vấn đề thiết kế ngu ngốc. - sergej shafarenka
Đây là những gì nó làm việc cho tôi. Cảm ơn! - rocotocloc


Tôi đã có hai câu trả lời khác về câu hỏi này, nhưng tôi nghi ngờ câu trả lời này là tốt nhất.

Nó dựa trên kỹ thuật câu trả lời hiện được chấp nhận, cụ thể là:

  1. Ghi đè hàng đợi offer() phương thức để (đôi khi) trả về false,
  2. gây ra ThreadPoolExecutor để tạo ra một chuỗi mới hoặc từ chối tác vụ và
  3. đặt RejectedExecutionHandler đến thực ra xếp hàng nhiệm vụ từ chối.

Vấn đề là khi nào offer() phải trả về false. Câu trả lời hiện được chấp nhận trả về false khi hàng đợi có một vài nhiệm vụ trên nó, nhưng như tôi đã chỉ ra trong bình luận của tôi ở đó, điều này gây ra các hiệu ứng không mong muốn. Cách khác, nếu bạn luôn trả về false, bạn sẽ tiếp tục sinh ra các luồng mới ngay cả khi bạn có chủ đề chờ đợi trên hàng đợi.

Giải pháp là sử dụng Java 7 LinkedTransferQueue và có offer() gọi điện tryTransfer(). Khi có một chuỗi người tiêu dùng chờ đợi, nhiệm vụ sẽ được chuyển đến chủ đề đó. Nếu không thì, offer() sẽ trả về false và ThreadPoolExecutor sẽ sinh ra một chủ đề mới.

    BlockingQueue<Runnable> queue = new LinkedTransferQueue<Runnable>() {
        @Override
        public boolean offer(Runnable e) {
            return tryTransfer(e);
        }
    };
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue);
    threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                executor.getQueue().put(r);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    });

14
2017-11-22 19:43



Tôi phải đồng ý, điều này có vẻ sạch nhất đối với tôi. Nhược điểm duy nhất với giải pháp là LinkedTransferQueue không bị chặn nên bạn không nhận được hàng đợi tác vụ bị giới hạn dung lượng mà không cần thêm công việc. - Yeroc
Có một vấn đề khi hồ bơi phát triển đến kích thước tối đa. Giả sử pool được tăng kích thước tối đa và mỗi thread hiện đang thực hiện một nhiệm vụ, khi runnable submit impl này sẽ trả về false và ThreadPoolExecutor cố gắng addWorker thread, nhưng pool đã đạt đến mức tối đa để runnable sẽ bị từ chối. Theo từ chốiExceHandler bạn đã viết nó sẽ được cung cấp trong hàng đợi một lần nữa kết quả này nhảy khỉ xảy ra từ đầu một lần nữa. - Sudheera
@Sudheera Tôi tin rằng bạn đang nhầm lẫn. queue.offer()bởi vì nó thực sự đang gọi LinkedTransferQueue.tryTransfer(), sẽ trả về false và không enqueue nhiệm vụ. Tuy nhiên RejectedExecutionHandler cuộc gọi queue.put(), mà không thất bại và không enqueue nhiệm vụ. - Robert Tupelo-Schneck
Vâng tôi nghĩ bạn đúng - Sudheera
@ RobertTupelo-Schneck cực kỳ hữu ích và tốt đẹp! - Eugene


Lưu ý: Bây giờ tôi thích và đề xuất câu trả lời khác của tôi.

Đây là một phiên bản mà tôi cảm thấy đơn giản hơn nhiều: Tăng corePoolSize (tối đa giới hạn tối đaPoolSize) bất cứ khi nào một tác vụ mới được thực hiện, sau đó giảm corePoolSize (xuống đến giới hạn của người dùng đã chỉ định "kích thước nhóm lõi") bất cứ khi nào nhiệm vụ hoàn thành.

Để đặt nó theo một cách khác, hãy theo dõi số lượng tác vụ đang chạy hoặc enqueued, và đảm bảo rằng corePoolSize bằng số lượng nhiệm vụ miễn là giữa người dùng đã chỉ định "kích thước nhóm lõi" và maximumPoolSize.

public class GrowBeforeQueueThreadPoolExecutor extends ThreadPoolExecutor {
    private int userSpecifiedCorePoolSize;
    private int taskCount;

    public GrowBeforeQueueThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        userSpecifiedCorePoolSize = corePoolSize;
    }

    @Override
    public void execute(Runnable runnable) {
        synchronized (this) {
            taskCount++;
            setCorePoolSizeToTaskCountWithinBounds();
        }
        super.execute(runnable);
    }

    @Override
    protected void afterExecute(Runnable runnable, Throwable throwable) {
        super.afterExecute(runnable, throwable);
        synchronized (this) {
            taskCount--;
            setCorePoolSizeToTaskCountWithinBounds();
        }
    }

    private void setCorePoolSizeToTaskCountWithinBounds() {
        int threads = taskCount;
        if (threads < userSpecifiedCorePoolSize) threads = userSpecifiedCorePoolSize;
        if (threads > getMaximumPoolSize()) threads = getMaximumPoolSize();
        setCorePoolSize(threads);
    }
}

Như đã viết, lớp không hỗ trợ thay đổi người dùng đã chỉ định corePoolSize hoặc maximumPoolSize sau khi xây dựng và không hỗ trợ thao tác hàng đợi công việc trực tiếp hoặc thông qua remove() hoặc là purge().


7
2017-10-23 10:15



Tôi thích nó ngoại trừ synchronized khối. Bạn có thể gọi đến hàng đợi để nhận được số nhiệm vụ không. Hoặc có thể sử dụng AtomicInteger? - Gray
Tôi muốn tránh chúng, nhưng vấn đề là vậy. Nếu có một số execute() cuộc gọi trong các chủ đề riêng biệt, mỗi chủ đề sẽ (1) tìm ra số lượng chủ đề cần thiết, (2) setCorePoolSize với số đó, và (3) gọi super.execute(). Nếu các bước (1) và (2) không được đồng bộ hóa, tôi không chắc chắn cách ngăn chặn thứ tự không may khi bạn đặt kích thước nhóm lõi thành số thấp hơn sau một số lớn hơn. Với quyền truy cập trực tiếp vào trường siêu lớp, điều này có thể được thực hiện bằng cách sử dụng so sánh và đặt thay thế, nhưng tôi không thấy một cách rõ ràng để làm điều đó trong một lớp con mà không đồng bộ hóa. - Robert Tupelo-Schneck
Tôi nghĩ rằng hình phạt cho điều kiện chủng tộc đó tương đối thấp miễn là taskCount trường hợp lệ (ví dụ: a AtomicInteger). Nếu hai luồng lặp lại kích thước hồ bơi ngay lập tức sau mỗi luồng, nó sẽ nhận được các giá trị thích hợp. Nếu người thứ hai co lại các chủ đề cốt lõi thì nó hẳn đã thấy một sự sụt giảm trong hàng đợi hay cái gì đó. - Gray
Đáng buồn là tôi nghĩ nó còn tệ hơn thế. Giả sử nhiệm vụ 10 và 11 gọi execute(). Mỗi người sẽ gọi atomicTaskCount.incrementAndGet() và họ sẽ nhận được 10 và 11 tương ứng. Nhưng nếu không đồng bộ hóa (vượt quá số nhiệm vụ và thiết lập kích thước nhóm lõi), bạn có thể nhận được (1) nhiệm vụ 11 đặt kích thước nhóm lõi là 11, (2) nhiệm vụ 10 đặt kích thước nhóm lõi là 10, (3) nhiệm vụ 10 cuộc gọi super.execute(), (4) nhiệm vụ 11 cuộc gọi super.execute() và được enqueued. - Robert Tupelo-Schneck
Tôi đã đưa ra giải pháp này một số thử nghiệm nghiêm trọng và rõ ràng là tốt nhất. Trong môi trường đa luồng, nó vẫn sẽ đôi khi enqueue khi có chủ đề miễn phí (do tự do threaded TPE.execute tự nhiên), nhưng nó hiếm khi xảy ra, trái ngược với giải pháp đánh dấu-như-trả lời, nơi mà điều kiện chủng tộc có nhiều cơ hội để xảy ra, do đó, điều này xảy ra khá nhiều trên mỗi chạy đa luồng. - Wanna Know All


Chúng tôi có một phân lớp của ThreadPoolExecutor phải mất thêm creationThreshold và ghi đè execute.

public void execute(Runnable command) {
    super.execute(command);
    final int poolSize = getPoolSize();
    if (poolSize < getMaximumPoolSize()) {
        if (getQueue().size() > creationThreshold) {
            synchronized (this) {
                setCorePoolSize(poolSize + 1);
                setCorePoolSize(poolSize);
            }
        }
    }
}

có lẽ điều đó cũng sẽ giúp ích cho bạn, nhưng bạn trông có vẻ nghệ thuật hơn ...


5
2018-06-20 14:02



Hấp dẫn. Cảm ơn vì điều này. Tôi thực sự không biết rằng kích thước lõi là có thể thay đổi được. - Gray
Bây giờ tôi nghĩ về nó một số chi tiết, giải pháp này là tốt hơn so với tôi về kiểm tra kích thước của hàng đợi. Tôi đã chỉnh sửa câu trả lời của mình để có offer(...) phương thức chỉ trả lại false có điều kiện. Cảm ơn! - Gray


Câu trả lời được đề nghị chỉ giải quyết một (1) vấn đề với nhóm chủ đề JDK:

  1. Các nhóm luồng JDK được thiên về hàng đợi. Vì vậy, thay vì sinh ra một luồng mới, chúng sẽ xếp hàng nhiệm vụ. Chỉ khi hàng đợi đạt đến giới hạn của nó thì hồ bơi thread sẽ sinh ra một luồng mới.

  2. Rút lui chủ đề không xảy ra khi tải sáng. Ví dụ nếu chúng tôi có một loạt các công việc đánh vào hồ bơi gây ra các hồ bơi để đi đến tối đa, tiếp theo là ánh sáng tải tối đa 2 nhiệm vụ tại một thời điểm, hồ bơi sẽ sử dụng tất cả các chủ đề để phục vụ tải ánh sáng ngăn chặn thread nghỉ hưu. (chỉ cần 2 chủ đề ...)

Không hài lòng với hành vi trên, tôi đã đi trước và triển khai một hồ bơi để khắc phục những thiếu sót ở trên.

Để giải quyết 2) Sử dụng tính năng lập lịch trình Lifo giải quyết vấn đề. Ý tưởng này được trình bày bởi Ben Maurer tại hội thảo ACM 2015 Hệ thống @ Facebook quy mô 

Vì vậy, một triển khai mới được sinh ra:

LifoThreadPoolExecutorSQP

Cho đến nay, triển khai này cải thiện hiệu suất thực thi không đồng bộ cho ZEL.

Việc thực hiện có khả năng xoay để giảm chi phí chuyển đổi ngữ cảnh, mang lại hiệu năng vượt trội cho các trường hợp sử dụng nhất định.

Hy vọng nó giúp...

PS: JDK Fork Tham gia Pool thực hiện ExecutorService và hoạt động như một hồ bơi thread "bình thường", thực hiện là performant, Nó sử dụng LIFO Thread scheduling, tuy nhiên không có quyền kiểm soát kích thước hàng đợi nội bộ, thời gian chờ nghỉ hưu ...


3
2017-11-27 19:15



Quá tệ là việc triển khai này có quá nhiều phụ thuộc bên ngoài. Làm cho nó vô dụng đối với tôi: - / - Martin L.


Lưu ý: Bây giờ tôi thích và đề xuất câu trả lời khác của tôi.

Tôi có một đề xuất khác, theo ý tưởng ban đầu về việc thay đổi hàng đợi để trả về false. Trong một nhiệm vụ này, tất cả các nhiệm vụ có thể nhập vào hàng đợi, nhưng bất cứ khi nào một nhiệm vụ được enqueued sau execute(), chúng ta làm theo nó với một nhiệm vụ không có nhiệm vụ gửi đi mà hàng đợi từ chối, làm cho một luồng mới xuất hiện, sẽ thực hiện lệnh no-op ngay lập tức theo sau một thứ gì đó từ hàng đợi.

Bởi vì các chuỗi công nhân có thể bỏ phiếu LinkedBlockingQueue cho một nhiệm vụ mới, nó có thể cho một nhiệm vụ để có được enqueued ngay cả khi có một chủ đề có sẵn. Để tránh tạo ra các luồng mới ngay cả khi có các luồng, chúng ta cần phải theo dõi xem có bao nhiêu luồng đang đợi các nhiệm vụ mới trên hàng đợi, và chỉ sinh ra một luồng mới khi có nhiều nhiệm vụ trên hàng đợi hơn là các chuỗi chờ đợi.

final Runnable SENTINEL_NO_OP = new Runnable() { public void run() { } };

final AtomicInteger waitingThreads = new AtomicInteger(0);

BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>() {
    @Override
    public boolean offer(Runnable e) {
        // offer returning false will cause the executor to spawn a new thread
        if (e == SENTINEL_NO_OP) return size() <= waitingThreads.get();
        else return super.offer(e);
    }

    @Override
    public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
        try {
            waitingThreads.incrementAndGet();
            return super.poll(timeout, unit);
        } finally {
            waitingThreads.decrementAndGet();
        }
    }

    @Override
    public Runnable take() throws InterruptedException {
        try {
            waitingThreads.incrementAndGet();
            return super.take();
        } finally {
            waitingThreads.decrementAndGet();
        }
    }
};

ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 50, 60, TimeUnit.SECONDS, queue) {
    @Override
    public void execute(Runnable command) {
        super.execute(command);
        if (getQueue().size() > waitingThreads.get()) super.execute(SENTINEL_NO_OP);
    }
};
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (r == SENTINEL_NO_OP) return;
        else throw new RejectedExecutionException();            
    }
});

1
2017-10-22 21:52





Giải pháp tốt nhất mà tôi có thể nghĩ đến là mở rộng.

ThreadPoolExecutor cung cấp một vài phương pháp móc: beforeExecute và afterExecute. Trong phần mở rộng của bạn, bạn có thể duy trì sử dụng một hàng đợi có giới hạn để nạp vào các nhiệm vụ và một hàng đợi không bị ràng buộc thứ hai để xử lý tràn. Khi ai đó gọi submit, bạn có thể tìm cách đặt yêu cầu vào hàng đợi bị chặn. Nếu bạn gặp một ngoại lệ, bạn chỉ cần gắn tác vụ vào hàng đợi tràn của bạn. Sau đó, bạn có thể sử dụng afterExecute móc để xem nếu có bất cứ điều gì trong hàng đợi tràn sau khi hoàn thành một nhiệm vụ. Bằng cách này, người thực hiện sẽ chăm sóc những thứ trong hàng đợi bị chặn trước tiên, và tự động kéo từ hàng đợi không bị chặn này theo thời gian cho phép.

Nó có vẻ như công việc nhiều hơn giải pháp của bạn, nhưng ít nhất nó không liên quan đến việc đưa ra những hành vi bất ngờ hàng đợi. Tôi cũng tưởng tượng rằng có một cách tốt hơn để kiểm tra trạng thái của hàng đợi và chủ đề thay vì dựa vào các ngoại lệ, điều này khá chậm để ném.


0



Tôi không thích giải pháp này. Tôi khá chắc chắn rằng ThreadPoolExecutor đã không được thiết kế cho kế thừa. - scottb
Có thực sự là một ví dụ về một phần mở rộng ngay trong JavaDoc. Họ nói rằng hầu hết có lẽ sẽ chỉ thực hiện các phương pháp móc, nhưng họ cho bạn biết những gì khác bạn cần phải tìm cho ra khi mở rộng. - bstempi