Co warto wiedzie膰 o pulach w膮tk贸w w Javie? – Najlepszy przewodnik jakiego potrzebujesz 馃帰

Jednym z najwa偶niejszych czynnik贸w zwi臋kszaj膮c膮 wydajno艣膰 aplikacji w Javie jest mo偶liwo艣膰 korzystania z w膮tk贸w. Dzi臋ki zr贸wnolegleniu niekt贸rych dzia艂a艅 efektywno艣膰 programu mo偶e znacz膮co wzrosn膮膰. Przy pracy z w膮tkami warto jednak pami臋ta膰 o dobrych praktykach, mi臋dzy innymi o korzystaniu z pul w膮tk贸w.

Ale jak to dobrze robi膰? O tym w niniejszym artykule.

Obs艂ug臋 cyklicznych, wyst臋puj膮cych jednocze艣nie zdarze艅 warto cz臋sto zr贸wnolegli膰. Na przyk艂ad obs艂uga 偶膮da艅 HTTP, zapyta艅 do bazy danych, odpytywania zewn臋trznych serwis贸w, czy wykonywania wymagaj膮cych oblicze艅 matematycznych. Mo偶emy to uzyska膰 poprzez korzystanie z w膮tk贸w.

Ale warto pami臋ta膰, 偶e tworzenie nowych w膮tk贸w per ka偶de takie dzia艂anie jest bardzo kosztowne. Wymaga stworzenia prawdziwego w膮tku w systemie operacyjnym, zaalokowania pami臋ci, stworzenia deskryptora w膮tku. Wszystko to kosztuje.

W sytuacji, gdy dane zadanie b臋dzie 偶y膰 stosunkowo kr贸tko i dany w膮tek zaraz b臋dziemy ubija膰 jest to marnowanie zasob贸w.

Zamiast tego du偶o lepiej jest skorzysta膰 z puli w膮tk贸w, za pomoc膮 kt贸rej b臋dziemy tylko wskazywa膰 jakie zadanie ma by膰 wykonane, a pula w膮tk贸w samodzielnie zadba o to by przypisa膰 wolny w膮tek do wykonania tego zadania.

W Javie pule w膮tk贸w s膮 opisywane przez dwa interfejsy.

  1. Executor
  2. ExecutorService

Pierwszy jest bardzo prosty i zawiera jedn膮 metod臋.

public interface Executor {
    void execute(Runnable command);
}

Dzi臋ki niemu mo偶emy przekaza膰 nasze zadanie (poprzez implementacj臋 interfejsu Runnable) do wykonania na puli w膮tk贸w.

Drugi z nich za艣 s艂u偶y do wykonywania zada艅 i zarz膮dzania ca艂膮 pul膮 w膮tk贸w. Posiada zdecydowanie bogatszy zakres metod.

public interface ExecutorService extends Executor {
    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;

    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

Wi臋kszo艣膰 z metod do艣膰 jasno m贸wi do czego s艂u偶y, wi臋c zostawiam to uwa偶nemu czytelnikowi do samodzielnego przeanalizowania.

Opr贸cz powy偶szych dw贸ch interfejs贸w warto wspomnie膰 o jeszcze jednym: ScheduledExecutorService.

public interface ScheduledExecutorService extends ExecutorService {

    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);

    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);

    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
}

Jak wida膰 na powy偶szym listingu za pomoc膮 tego typu mo偶emy:

  1. uruchomi膰 pojedyncze zadanie w przysz艂o艣ci: schedule(Runnable ...) i schedule(Callable<V> ...)
  2. uruchomi膰 zadanie cykliczne w sta艂ym odst臋pie czasowym: scheduleAtFixedRate
  3. uruchomi膰 zadanie cykliczne ze sta艂ym odst臋pem mi臋dzy zako艅czeniem poprzedniego uruchomienia scheduledWithFixedDelay.

Uruchamianie zada艅 ze sta艂ym op贸藕nieniem (scheduledAtFixedRate) sprawia, 偶e zadanie wykonuje si臋 zawsze co okre艣lony interwa艂, na przyk艂ad co 10 sekund. D艂ugo艣膰 trwania jednego zadania nie ma wp艂ywu na moment uruchomienia si臋 drugiego. W praktyce mo偶e to wygl膮da膰 tak:

  1. zadanie 1 – start: 00:00:00, czas trwania: 3s
  2. zadanie 2 – start: 00:00:10, czas trwania: 1s
  3. zadanie 3 – start: 00:00:20, czas trwania: 5s
  4. zadanie 4 – start: 00:00:30, czas trwania: 7s

Z drugiej strony uruchamianie zada艅 ze sta艂ym odst臋pem czasowym (scheduledWithFixedDelay) sprawia, 偶e kolejne uruchomienie b臋dzie mia艂o miejsce gdy up艂ynie okre艣lony czas od sko艅czenia si臋 poprzedniego uruchomienia. W tym wypadku b臋dzie to wygl膮da膰 nast臋puj膮co:

  1. zadanie 1 – start: 00:00:00, czas trwania: 3s
  2. zadanie 2 – start: 00:00:13, czas trwania: 1s
  3. zadanie 3 – start: 00:00:24, czas trwania: 5s
  4. zadanie 4 – start: 00:00:39, czas trwania: 7s

Sk膮d wzi膮膰 pule w膮tk贸w?

Wszystko fajnie, ale sk膮d wzi膮膰 instancje tych pul w膮tk贸w?

Najpro艣ciej skorzysta膰 z klasy Exeutors z pakietu java.util.concurrent, kt贸ra jest fabryk膮 pul w膮tk贸w.

public class Executors {

    public static ExecutorService newFixedThreadPool(int nThreads) { ... }

    public static ExecutorService newSingleThreadExecutor() { ... }

    public static ExecutorService newCachedThreadPool() { ... }

    public static ScheduledExecutorService newSingleThreadScheduledExecutor() { ... }

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { ... }... }

    // ...

}

Do czego s艂u偶y ThreadFactory?

Opr贸cz standardowych metod do tworzenia pul w膮tk贸w w klasie Executors znajdziemy te偶 analogicznie metody, kt贸re przyjmuj膮 dodatkowy argument ThreadFactory.

public class Executors {

    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { ... }

    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { ... }

    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { ... }

    public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { ... }

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) { ... }

    // ...


}

Jak sama nazwa wskazuje ThreadFactory jest fabryk膮 w膮tk贸w dla puli w膮tk贸w. Jest interfejsem i posiada jedn膮 metod臋 newThread. Poprzez przekazanie w艂asnej implementacji ThreadFactory mo偶emy wyspecyfikowa膰 szczeg贸艂y nowo tworzonych w膮tk贸w. Po co nam to potrzebne? Po to by na przyk艂ad ustali膰 nazwy nowych w膮tk贸w, okre艣li膰 czy powinny by膰 w膮tkami typu daemon, czy chocia偶by zaj膮膰 si臋 obs艂ug膮 niez艂apanych wyj膮tk贸w (thread.setUncaughtExceptionHandler).

ThreadFactory factory = new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r);
        thread.setName("...");
        thread.setDaemon(true);
        thread.setUncaughtExceptionHandler(...);
        return thread;
    }
};

Co z obs艂ug膮 wyj膮tk贸w?

Do 艂apania wyj膮tk贸w w ramach pul w膮tk贸w, kt贸rych nie obs艂ugujemy w spos贸b jawny nale偶y skorzysta膰 z metody thread.setUncaughtExceptionHandler, kt贸r膮 mo偶emy wyspecyfikowa膰 w ramach ThreadFactory. Poprzez implementacj臋 handlera niez艂apanych wyj膮tk贸w okre艣lamy co powinno si臋 wtedy sta膰 z w膮tkiem, z informacj膮 o b艂臋dzie, i tak dalej.

thread.setUncaughtExceptionHandler(
    new Thread.UncaughtExceptionHandler() {
        @Override
        public void uncaughtException(Thread t, Throwable e) {
            // ...
        }
    }
);

Czy pula mo偶e przyjmowa膰 zadania w niesko艅czono艣膰?

Nie.

Aby dowiedzie膰 si臋 dlaczego, sp贸jrzmy dok艂adniej jak wygl膮da kod z fabryki Executors do utworzenia nowej puli w膮tk贸w na przyk艂adzie newFixedThreadPool.

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(
      nThreads,   // (1)
      nThreads,   // (2)
      0L,         // (3)
      TimeUnit.MILLISECONDS,  // (4)
      new LinkedBlockingQueue<Runnable>() // (5)
    );
}

Pod spodem tworzona jest instancja klasy ThreadPoolExecutor. Parametry 1 i 2 okre艣laj膮 minimaln膮 i maksymaln膮 liczb臋 w膮tk贸w w tej puli (jako 偶e jest to pula „fixed”, te liczby s膮 takie same).

Parametry 3 i 4 okre艣laj膮 jak d艂ugo maj膮 偶y膰 w膮tki nadmiarowe, zanim zostan膮 ubite (w tym wypadku jest to 0, bo metoda dotyczy puli o sta艂ej liczbie w膮tk贸w).

I parametr 5, kt贸ry nas najbardziej interesuje – opisuje kolejk臋, na kt贸r膮 trafiaj膮 kolejne zadania do wykonania – w tym wypadku jest to LinkedBlockingQueue.

I teraz, gdy spojrzymy w kod tej kolejki zobaczymy tam taki konstruktor.

/**
 * Creates a {@code LinkedBlockingQueue} with a capacity of
 * {@link Integer#MAX_VALUE}.
 */
public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

Oznacza to, 偶e kolejka przyjmie maksymalnie 2.147.483.647 zada艅. Do艣膰 du偶o, prawda?

Warto pami臋ta膰 jednak o tym, 偶e prawdopodobnie wcze艣niej sko艅czy nam si臋 pami臋膰 na stercie (heap) i program zako艅czy swoje dzia艂anie z powodu OutOfMemoryError.

Jak zatrzyma膰 pul臋 w膮tk贸w?

W typie ExecutorsService znajdziemy 5 metod, kt贸re pomog膮 nam ten cel osi膮gn膮膰.

public interface ExecutorService extends Executor {
    void shutdown();
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
    List<Runnable> shutdownNow();

  boolean isShutdown();
  boolean isTerminated();
    // ...
}

Sprawd藕my, do czego s艂u偶膮.

  1. shutdown() – wysy艂a sygna艂 do puli w膮tk贸w, by ta przesta艂a przyjmowa膰 kolejne zadania do wykonania. Sprawia jednak, 偶e wszystkie zadania, kt贸re by艂y wys艂ane do tej pory b臋d膮 normalnie czeka膰 w kolejce na wykonanie si臋. Metoda nie blokuje wykonania kolejnych instrukcji kodu.
  2. awaitTermination() – z kolei ta metoda sprawia, 偶e wykonanie kodu blokuje si臋 w oczekiwaniu na zako艅czenie wykonania wszystkich zada艅. Metoda przyjmuje parametry timeout, po kt贸rych przerywa swoje dzia艂anie, je艣li do tego czasu wszystkie zadania si臋 nie wykonaj膮. W tym wypadku ko艅czy si臋 to rzuceniem wyj膮tku InterruptedException.
  3. shutdownNow() – w przeciwie艅stwie do metody shutdown() ko艅czy dzia艂anie puli w膮tk贸w natychmiast. Kolejne zadania nie b臋d膮 przyjmowane na kolejk臋, zadania oczekuj膮ce w kolejce zostan膮 zwr贸cone z metody (List<Runnable>), a zadania b臋d膮ce w trakcie wykonywania b臋d膮 przerwane za pomoc膮 Thread.interrupt(). Uwaga! Je艣li nie maj膮 odpowiednio zaimplementowanej obs艂ugi przerywania (interrupt), w贸wczas nadal mog膮 si臋 wykonywa膰. Metoda shutdownNow() nie zatrzymuje wykonywania kolejnych instrukcji kodu, wi臋c je艣li chcemy poczeka膰 na zako艅czenie dzia艂ania puli w膮tk贸w ponownie musimy skorzysta膰 z metody awaitTermination().

Opr贸cz powy偶szych metod mamy do dyspozycji jeszcze 2 do sprawdzania stanu zamkni臋cia puli w膮tk贸w:

  1. isShutdown() – zwraca flag臋 true/false z informacj膮 czy pula w膮tk贸w zosta艂a zamkni臋ta,
  2. isTerminated() – zwraca flag臋 true je艣li pula zosta艂a zamkni臋ta i wszystkie zadania ju偶 zako艅czy艂y swoj膮 prac臋, false w przeciwnym wypadku. Zwr贸膰 uwag臋, 偶e je艣li nie zawo艂ano wcze艣niej shutdown lub shutdownNow, to ta metoda zawsze zwr贸ci false.

Jak anulowa膰 zadanie wys艂ane do puli w膮tk贸w?

Nie jest to takie proste.

Na przyk艂ad poni偶szy fragment nie sprawi, 偶e zadanie przestanie si臋 wykonywa膰.

ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<?> future = executorService.submit((Runnable) () -> {
    while(true) {
        System.out.println("Catch me if you can 馃懟");
        try {
            Thread.sleep(2_000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
});
Thread.sleep(5_000);
future.cancel(true);

Zadanie co 2 sekundy wypisuje napis na konsol臋, a po 5 sekundach od startu pr贸bujemy je zatrzyma膰 metod膮 Future#cancel.

Nic z tego.

Aby zatrzyma膰 zadanie z puli nale偶y w warunku while sprawdza膰 flag臋 Thread.currentThread.isInterrupted() oraz w bloku catch w momencie wyst膮pienia wyj膮tku InterruptedException ustawi膰 odpowiednio t臋 flag臋.

ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<?> future = executorService.submit(() -> {
    while(!Thread.currentThread().isInterrupted()) {
        System.out.println("Catch me if you can 馃懟");
        try {
            Thread.sleep(2_000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
});
Thread.sleep(5_000);
future.cancel(true);

W przypadku zada艅, kt贸re nie zacz臋艂y jeszcze swojej pracy na puli w膮tk贸w musimy zatrzyma膰 ca艂膮 pul臋 – shutdownNow().

Inne rozwi膮zanie polega na w艂asnej implementacji interfejsu Runnable, w kt贸rym eksponujemy samodzielnie zdefiniowan膮 metod臋 cancel, kt贸ra sprawi, 偶e zadanie w momencie trafienia do wykonania od razu zako艅czy swoj膮 prac臋 bez podejmowania jakichkolwiek dzia艂a艅.


Podsumowanie

Praca z wielow膮tkowo艣ci膮 nie nale偶y do najprostszych. Warto jednak wiedzie膰, 偶e jak wszystko jest kwesti膮 zag艂臋bienia si臋 w dany temat. To co warto zapami臋ta膰 z tego wpisu to:

  1. Tworzenie nowych w膮tk贸w jest kosztowne. Lepiej korzysta膰 z g贸ry ustalonej puli w膮tk贸w, kt贸ra lepiej zarz膮dza u偶ywanymi zasobami.
  2. Java dostarcza nam 2 interfejsy: Executor i ExecutorService do pracy z pulami w膮tk贸w oraz fabryk臋 Executors do wygodnego tworzenia nowych instancji pul w膮tk贸w.
  3. Do wykonywania zada艅 cyklicznie, lub z op贸藕nieniem czasowym s艂u偶y inny interfejs: ScheduledExecutorService.
  4. Za pomoc膮 ThreadFactory mo偶emy lepiej okre艣li膰 jak maj膮 by膰 tworzone nowe w膮tki w ramach Executor贸w.
  5. Do 艂apania niez艂apanych wyj膮tk贸w s艂u偶y metoda thread.setUncaughtExceptionHandler.
  6. Pule w膮tk贸w nie mog膮 przyjmowa膰 zada艅 w niesko艅czono艣膰. Domy艣lna implementacja przyjmuje maksymalnie ponad 2 miliardy zada艅, co wcze艣niej zapewne sko艅czy si臋 problemami z pami臋ci膮.
  7. Mo偶emy te偶 sami zdefiniowa膰 ile zada艅 maksymalnie mo偶e trafi膰 na pul臋 samodzielnie tworz膮c jej instancj臋 i przekazuj膮c w艂asn膮 instancj臋 kolejki na zadania.
  8. Do zatrzymywania pul w膮tk贸w s艂u偶膮 metody shutdown, shutdownNow() i metoda pomocnicza awaitTermination.
  9. Zatrzymanie dzia艂aj膮cych zada艅 nie jest takie proste i wymaga poprawnego korzystania z flagi interrupted.

Author: Dariusz Mydlarz

Cze艣膰, nazywam si臋 Dariusz Mydlarz i od 2012 pracuj臋 jako programista. Tworz臋 w ekosystemie Javy i uwielbiam systemy backendowe. Chc臋 w tym miejscu pomaga膰 Ci stawa膰 si臋 lepszym programista. Je艣li mog臋 Ci jako艣 pomoc, po prostu napisz do mnie.

2 Replies to “Co warto wiedzie膰 o pulach w膮tk贸w w Javie? – Najlepszy przewodnik jakiego potrzebujesz 馃帰

  1. Ma艂a uwaga, cz臋sto pope艂niany przez pocz膮tkuj膮cych b艂膮d to ustawienie sobie setUncaughtExceptionHandler handlera a potem zdziwienie 偶e si臋 nie uruchomi艂 gdy zadanie by艂o wys艂ane przez submit i zosta艂 zg艂oszony wyj膮tek.
    No oczywi艣cie 偶e si臋 nie uruchomi艂 bo wyj膮tki rzucane przez kod zosta艂y przekazane do wynikowego Future. Warto by o tym wspomnie膰 bo mi ju偶 r臋ce opadaj膮 jak musze to po raz Nty t艂umaczy膰….

    1. Dzi臋ki Marcin za dope艂nienie wpisu 馃槈 mo偶e to znak, ze warto napisa膰 cos wi臋cej o klasie Future 馃槈

Dodaj komentarz

Tw贸j adres email nie zostanie opublikowany. Pola, kt贸rych wype艂nienie jest wymagane, s膮 oznaczone symbolem *