ThreadPoolExecutor i ustawienie czasu wywolania kolejnego taska

0

Witam

Ostatnio ćwiczę trochę bardziej zaawansowaną wielowątkowość.
Chcę osiągnąć:
Są zadania (obiekty typu Callable) oraz może być jakaś maksymalna ilość tych obiektów które mają być brane do wykonania. Jeżeli zadanie już się niemieści to następuje oczekiwanie na skończenie tych poprzednich i dopiero dodanie. Ma być jakiś czas po którym nastąpi ponowna próba dodania zadania.

Dla przykładu daje maksymalnie jedno zadanie naraz. Daje też TimeOut na 10 sec ale nie bardzo to działa.

Implementacja Callable

 
public class CallableImpl implements Callable<String> {

    private int myName;
    private double temp;

    CallableImpl(int i) {
        myName = i;
    }

    public String call() {
	// Tutaj operacje by było widać ze coś zadanie robi
        System.out.println("GO " + myName);
        for (int i = 0; i < 100000000; i++) {
            int asd = 12781721 + i;
            temp = asd + Math.sqrt(asd) - Math.log10(asd);
        }
        System.out.println("RETURN " + myName);
        return "TEST " + myName;

    }

    public int getMyName() {
        return myName;
    }

    public void setMyName(int myName) {
        this.myName = myName;
    }
}


Klasa testowa:

 

public static void main(String[] args) {
        BlockingQueue queue = new SynchronousQueue();
        ExecutorService executor = new ThreadPoolExecutor(1, 1, 10000, TimeUnit.MILLISECONDS, queue);       
        
        
        int numerPodejsciaZadania = 1;
        while (true) {
            try {
                executor.submit(new CallableImpl(numerPodejsciaZadania ));
                numerPodejsciaZadania ++;
            } catch (Exception e) {
		e.printStackTrace();
            }
        }
        
    }

Zadanie nr 1 wchodzi natomiast 2 i każde następne powoduje java.util.concurrent.RejectedExecutionException i w sumie bardzo dobrze tylko od razu po tym znowu próbuje robic submit - nie ma żadnego timeout'a. Nie rozumiem jak działa ten parametr w ThreadPoolExecutor. Jak to zrealizować?

0

Ale ten parametr to nie żaden TimeOut tylko KeepAliveTime. Wątpię, aby jakakolwiek klasa z JRE udupiała samoczynnie jakiś wątek poprzez metody typu Thread.stop(). Jedyną rekomendowaną metodą udupiania wątku jest zmiana jakiegoś stanu dostępnego dla wątku udupianego i udupiającego (np Lock, flaga etc).

0

Ale dlaczego piszesz o upupianiu wątku? to nie o to mi chodzi - bardziej o to by ten wątek w ogóle nie był powołany do życia dopóki inny sam z siebie się nie skońćzy.

Normalnie bez finezji zrobiłbym to tak:

 
while (true)
    if (currentThreadCount < MAX_THREADS_COUNT)
        try
        {
             new Thread(...start());
        	currentThreadCount ++;	
        }
        catch (IOException ex) { ex.printStackTrace(); }
    else
        try
        {
            Thread.sleep(10000);
        }
        catch (InterruptedException ex) { ex.printStackTrace(); }
}

Oczywiście po zakończeniu pojedyńczego wąteku currentThreadCount --;

MAX_THREADS_COUNT to przecież wprost parametr maximumPollSize w klasie ThreadPoolExecutor. A parametr keppAlive faktcznie nie od tego jest-doczytałem ale z moim ang mam problemy: jak wątek jest bezczynny to po takim czasie go uwala tak?

Jak mój problem rozwiązać egzekutorami i jednocześnie nie dawać znowu Thread.sleep w pętli?

EDIT: Reasumując: po wyczerpaniu puli gdy jeden się zwalania to jeden wchodzi na jego miejsce. Gdy pula pełna i żaden się zamoistnie nie skończył to czeka okreslony czas przed ponowną próbą wsadzenia nowego wątku do puli. Jak znowu żaden nie zwolnił się to znowu czeka przed następną próba itd.

0
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;


class MyCallable implements Callable<Void> {
    
    private int id;

    public MyCallable(int id) {
        this.id = id;
    }

    public Void call() throws Exception {
        System.out.println("Callable id: " + id + " started.");
        long value = 5l;
        for (int i = 0; i < 123456789; i++) {
            value = (long) (value * value + i * value + Math.sqrt(value) + Math.log10(i));
        }
        System.out.println("Callable id: " + id + " finished. Control value: " + value);
        return null;
    }
    
}

public class Main {

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        System.out.println("Start.");
        executorService.submit(new MyCallable(1));
        System.out.println("Next.");
        executorService.submit(new MyCallable(2));
        System.out.println("Next.");
        executorService.submit(new MyCallable(3));
        System.out.println("Stop.");
        executorService.shutdown();
        executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
    }
}

Spełnia wymagania?

1 użytkowników online, w tym zalogowanych: 0, gości: 1