Üdvözlünk a modern Java világában, ahol az alkalmazások teljesítménye és reakcióképessége kritikus fontosságú. A felhasználók gyors válaszokat várnak, a rendszereknek pedig hatékonyan kell kezelniük a párhuzamos feladatokat anélkül, hogy blokkolnák a fő szálat. Itt jön képbe az aszinkron programozás, és vele együtt a Java 8 egyik legforradalmibb kiegészítése: a CompletableFuture.
Ebben a cikkben mélyrehatóan megvizsgáljuk, hogyan használhatjuk a CompletableFuture
-t aszinkron és nem-blokkoló kód írására. Felfedezzük annak alapjaitól kezdve a komplex kompozíciós mintákig mindent, ami ahhoz szükséges, hogy a legtöbbet hozd ki ebből a hihetetlenül erős eszközből.
Miért van szükség aszinkron kódra?
Képzeljünk el egy webes alkalmazást, amelynek adatokat kell letöltenie egy külső API-ból, feldolgoznia azokat, majd elmentenie egy adatbázisba. Ha ezeket a műveleteket szinkron módon, egymás után hajtjuk végre, a felhasználóknak hosszú másodperceket kell várniuk, mire a válasz megérkezik. Ez egy rossz felhasználói élményt eredményez, és pazarlás a szerver erőforrásaira nézve, mivel a fő szál tétlenül várakozik az I/O műveletek befejezésére.
Az aszinkron programozás lényege, hogy a potenciálisan időigényes feladatokat háttérben futtatjuk, miközben a fő szál szabadon marad más feladatok elvégzésére vagy további felhasználói kérések kezelésére. Amikor a háttérben futó feladat befejeződik, értesíti a fő szálat az eredményről, amely aztán feldolgozhatja azt.
A Java régebbi verzióiban ez jellemzően explicit szálkezeléssel (Thread
) vagy a Future
interfésszel történt. Bár ezek hasznosak voltak, gyakran vezethettek komplex kódhoz, az úgynevezett „callback hell”-hez, ahol a függő feladatok láncolása nehezen olvashatóvá és karbantarthatóvá tette a kódot. A CompletableFuture pontosan erre a problémára kínál elegáns és hatékony megoldást.
Mi az a CompletableFuture?
A CompletableFuture
a Java 8-ban bevezetett osztály, amely a Future
interfészt valósítja meg, de annál sokkal rugalmasabb és funkcionálisabb. Amíg a hagyományos Future
csak lekérdezni tudta egy aszinkron művelet eredményét (és blokkolta a hívó szálat, amíg az eredmény el nem készült), addig a CompletableFuture
képessé tesz minket arra, hogy:
- Kézzel fejezzünk be egy jövőbeli műveletet, vagy jelezzünk hibát.
- Láncoljunk össze több aszinkron műveletet, anélkül, hogy blokkolnánk a szálat.
- Kombináljunk több jövőbeli eredményt.
- Kezeljük a hibákat elegánsan.
Lényegében egy CompletableFuture
egy olyan ígéretet képvisel, hogy egy érték egy bizonyos időpontban elérhetővé válik – vagy egy hiba történik. Ahelyett, hogy megvárnánk az értéket, megadhatjuk, hogy mit tegyen az alkalmazás, ha az érték elkészült (vagy hiba történt).
Alapok: CompletableFuture létrehozása és indítása
Kezdjük az alapokkal: hogyan hozhatunk létre és indíthatunk el aszinkron feladatokat a CompletableFuture
segítségével.
1. Érték visszaadása: supplyAsync()
Ha egy aszinkron feladatnak értéket kell visszaadnia, a supplyAsync()
metódust használjuk. Ez egy Supplier
interfészt vár paraméterül.
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
public class CompletableFutureAlapok {
public static void main(String[] args) {
System.out.println("Fő szál indult: " + Thread.currentThread().getName());
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("Aszinkron feladat fut: " + Thread.currentThread().getName());
Thread.sleep(2000); // Szimulálunk egy időigényes műveletet
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "Hello, CompletableFuture!";
});
// Itt a fő szál folytathatja a munkát, nem blokkol
System.out.println("Fő szál folytatja a munkát...");
// Lekérdezzük az eredményt (ez blokkolja a fő szálat, ha még nem készült el)
try {
String result = future.get(); // Blokkoló hívás, de csak a példa kedvéért
System.out.println("Eredmény: " + result);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Fő szál befejeződött.");
}
}
Fontos megjegyezni, hogy a future.get()
blokkolja a szálat, amíg az eredmény el nem készül. A CompletableFuture igazi ereje azonban nem ebben rejlik, hanem a nem-blokkoló kompozíciós metódusaiban.
2. Nincs visszaadott érték: runAsync()
Ha az aszinkron feladatnak nincs visszaadott értéke (mellékhatást fejt ki), a runAsync()
metódust használjuk, amely egy Runnable
interfészt vár paraméterül.
CompletableFuture<Void> futureAction = CompletableFuture.runAsync(() -> {
System.out.println("Aszinkron művelet, eredmény nélkül: " + Thread.currentThread().getName());
// Például: adatbázisba írás
});
3. Kézi befejezés: new CompletableFuture()
Létrehozhatunk egy „üres” CompletableFuture
-t is, amelyet később manuálisan fejezhetünk be egy értékkel vagy egy kivétellel. Ez akkor hasznos, ha egy régebbi API-t vagy egy teljesen külső rendszert szeretnénk becsomagolni, amely nem ad vissza CompletableFuture
-t.
CompletableFuture<String> manualFuture = new CompletableFuture<>();
// Később, egy másik szálon vagy esemény hatására
new Thread(() -> {
try {
Thread.sleep(1500);
manualFuture.complete("Kézzel befejezett eredmény!"); // Siker
} catch (InterruptedException e) {
manualFuture.completeExceptionally(e); // Hiba
}
}).start();
manualFuture.thenAccept(System.out::println);
Aszinkron műveletek láncolása és kompozíciója
Itt mutatkozik meg igazán a CompletableFuture képessége. Ahelyett, hogy blokkolnánk a fő szálat, különböző metódusokkal láncolhatjuk és kombinálhatjuk az aszinkron feladatokat.
1. Eredmény transzformálása: thenApply()
és thenApplyAsync()
Ha egy aszinkron feladat eredményét szeretnénk egy másik értékre transzformálni, a thenApply()
metódust használjuk, amely egy Function
-t vár. Ha a transzformációt egy másik szálon szeretnénk futtatni, a thenApplyAsync()
metódust hívjuk meg.
CompletableFuture<Integer> futureLength = CompletableFuture.supplyAsync(() -> "Hello Világ")
.thenApply(s -> {
System.out.println("Transzformáció fut: " + Thread.currentThread().getName());
return s.length();
});
futureLength.thenAccept(len -> System.out.println("A string hossza: " + len));
2. Eredmény felhasználása (mellékhatás): thenAccept()
és thenAcceptAsync()
Ha egy aszinkron feladat eredményét fel szeretnénk használni egy mellékhatás kifejtésére (pl. kiírás konzolra, adatbázisba mentés), de nem szeretnénk új értéket visszaadni, a thenAccept()
metódust használjuk, amely egy Consumer
-t vár.
CompletableFuture.supplyAsync(() -> 123)
.thenAccept(number -> {
System.out.println("Feldolgozom a számot: " + number + " szálon: " + Thread.currentThread().getName());
});
3. Független művelet futtatása: thenRun()
és thenRunAsync()
Ha egy előző CompletableFuture
befejezése után egy olyan Runnable
-t szeretnénk futtatni, amely nem függ az előző eredményétől és nem is ad vissza értéket, a thenRun()
metódust használjuk.
CompletableFuture.supplyAsync(() -> "Adat betöltve")
.thenRun(() -> System.out.println("Betöltés befejezve! Szál: " + Thread.currentThread().getName()));
4. Láncolt CompletableFuture-ök: thenCompose()
és thenComposeAsync()
Ez a metódus az egyik legfontosabb a komplex aszinkron folyamatok építésekor. Akkor használjuk, ha az egyik aszinkron lépés eredményéből egy másik CompletableFuture
-t akarunk generálni. Ezzel elkerülhetjük az egymásba ágyazott CompletableFuture
-ket (CompletableFuture<CompletableFuture<T>>
).
CompletableFuture<String> fetchUserId() {
return CompletableFuture.supplyAsync(() -> {
System.out.println("User ID lekérése: " + Thread.currentThread().getName());
return "user123";
});
}
CompletableFuture<String> fetchUserName(String userId) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Felhasználónév lekérése (" + userId + "): " + Thread.currentThread().getName());
return "Kovács János";
});
}
// ...
CompletableFuture<String> userNameFuture = fetchUserId()
.thenCompose(this::fetchUserName);
userNameFuture.thenAccept(name -> System.out.println("Lekért felhasználónév: " + name));
A thenCompose()
hasonló a Stream API flatMap()
metódusához – „kisimítja” a jövőbeli objektumokat.
5. Két független CompletableFuture kombinálása: thenCombine()
és thenCombineAsync()
Ha két független aszinkron feladat eredményét szeretnénk kombinálni, a thenCombine()
metódust használjuk. Ez egy BiFunction
-t vár paraméterül.
CompletableFuture<String> weatherFuture = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(1000); } catch (InterruptedException e) {}
return "Napos, 25°C";
});
CompletableFuture<String> newsFuture = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(1500); } catch (InterruptedException e) {}
return "Sport eredmények";
});
CompletableFuture<String> combinedFuture = weatherFuture
.thenCombine(newsFuture, (weather, news) -> {
System.out.println("Kombinálás fut: " + Thread.currentThread().getName());
return "Időjárás: " + weather + ", Hírek: " + news;
});
combinedFuture.thenAccept(System.out::println);
6. Több CompletableFuture együttes kezelése: allOf()
és anyOf()
Néha szükségünk van arra, hogy megvárjuk több aszinkron feladat befejezését, mielőtt tovább lépnénk. Az allOf()
és anyOf()
statikus metódusok segítenek ebben.
CompletableFuture.allOf(CompletableFuture... futures)
: Akkor fejeződik be, amikor az összes paraméterként megadottCompletableFuture
befejeződött. Az eredménye egyCompletableFuture<Void>
, így az egyedi eredményeket külön kell lekérdezni (pl. ajoin()
metódussal).CompletableFuture.anyOf(CompletableFuture... futures)
: Akkor fejeződik be, amikor az *első* paraméterként megadottCompletableFuture
befejeződött. Az eredménye egyCompletableFuture<Object>
, amely az első befejezettFuture
eredményét tartalmazza.
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> "Eredmény 1");
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> "Eredmény 2");
CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(500); } catch (InterruptedException e) {}
return "Eredmény 3 (gyorsabb)";
});
// Várjunk meg minden feladatot
CompletableFuture<Void> allTasks = CompletableFuture.allOf(task1, task2, task3);
allTasks.thenRun(() -> {
System.out.println("Minden feladat befejeződött.");
// Itt lekérdezhetjük az egyedi eredményeket
System.out.println(task1.join()); // join() blokkolás nélkül adja vissza az eredményt, ha a future már kész
System.out.println(task2.join());
System.out.println(task3.join());
});
// Várjuk meg az első befejeződő feladatot
CompletableFuture<Object> anyTask = CompletableFuture.anyOf(task1, task2, task3);
anyTask.thenAccept(result -> System.out.println("Az első befejezett feladat eredménye: " + result));
Hibakezelés CompletableFuture-rel
Az aszinkron műveletek során elkerülhetetlenek a hibák. A CompletableFuture kiváló eszközöket biztosít a hibák elegáns kezelésére.
1. Kivételek elfogása: exceptionally()
Ez a metódus lehetővé teszi, hogy egy CompletableFuture
láncban bekövetkező kivételt elkapjuk, és egy alapértelmezett értéket adjunk vissza, vagy egy alternatív logikát futtassunk.
CompletableFuture<String> faultyFuture = CompletableFuture.supplyAsync(() -> {
if (Math.random() < 0.5) {
throw new RuntimeException("Valami hiba történt!");
}
return "Sikeres eredmény.";
});
faultyFuture
.exceptionally(ex -> {
System.err.println("Hiba történt: " + ex.getMessage());
return "Helyreállított eredmény (alapértelmezett)."; // Hiba esetén visszaadott érték
})
.thenAccept(System.out::println);
2. Siker és hiba kezelése egyaránt: handle()
A handle()
metódus egy BiFunction
-t vár, amelynek két paramétere van: az eredmény és a kivétel. Az egyik mindig null
lesz, a másik tartalmazza az értéket vagy a kivételt. Ez lehetőséget ad az eredmény transzformálására vagy a hiba helyreállítására.
CompletableFuture<String> result = CompletableFuture.supplyAsync(() -> {
if (Math.random() < 0.5) {
throw new IllegalArgumentException("Érvénytelen bemenet!");
}
return "Sikeres adat.";
}).handle((res, ex) -> {
if (ex != null) {
System.err.println("Kivételel kezelve a handle-ben: " + ex.getMessage());
return "Hiba utáni feldolgozás.";
}
return res + " (feldolgozva)";
});
result.thenAccept(System.out::println);
3. Befejezés utáni művelet: whenComplete()
A whenComplete()
metódus akkor hasznos, ha egy műveletet szeretnénk végrehajtani, akár sikeresen, akár hibával fejeződött be az előző szakasz, de az eredményt vagy a kivételt nem szeretnénk módosítani. Például logolásra vagy erőforrások felszabadítására alkalmas.
CompletableFuture.supplyAsync(() -> {
System.out.println("Művelet fut...");
if (Math.random() < 0.5) {
throw new RuntimeException("Kivétel a whenComplete előtt!");
}
return "Sikeres adat";
})
.whenComplete((res, ex) -> {
if (ex != null) {
System.err.println("Hiba történt: " + ex.getMessage());
} else {
System.out.println("Sikeresen befejeződött, eredmény: " + res);
}
})
.thenAccept(finalResult -> System.out.println("Végső eredmény (nem módosult): " + finalResult))
.exceptionally(e -> { // Ezt csak akkor hívja meg, ha a whenComplete után is Exception van
System.err.println("Végső hibakezelés: " + e.getMessage());
return null; // A lánc lezárása
});
Executorok és testreszabott szálkezelés
A CompletableFuture
metódusai, amelyek `Async` utótaggal rendelkeznek (pl. supplyAsync()
, thenApplyAsync()
), alapértelmezés szerint a ForkJoinPool.commonPool()
-t használják a feladatok futtatására. Ez a legtöbb esetben megfelelő, de néha szükség lehet egy testreszabott ExecutorService
megadására.
Miért?
- Erőforrás-kontroll: Egy dedikált
ExecutorService
segítségével jobban szabályozhatjuk a szálak számát, például ha tudjuk, hogy egy feladat I/O-intenzív (sok várakozás) vagy CPU-intenzív (sok számítás). - Deadlock elkerülése: Ha a
commonPool
-t túlterheljük blokkoló feladatokkal, az más, nem-blokkoló feladatok futását is akadályozhatja. - Kontextus öröklése: Bizonyos esetekben (pl. logolás, biztonsági kontextus) előnyös lehet a szálkontextus átadása, ami testreszabott
Executor
-ral könnyebb.
Egyéni Executor
megadása egyszerű:
ExecutorService customExecutor = Executors.newFixedThreadPool(5); // 5 szálat tartalmazó pool
CompletableFuture.supplyAsync(() -> {
System.out.println("Custom Executoron fut: " + Thread.currentThread().getName());
return "Sikeres!";
}, customExecutor) // Itt adjuk meg az executort
.thenAccept(System.out::println)
.whenComplete((res, ex) -> customExecutor.shutdown()); // Ne felejtsük el leállítani!
Blokkolás elkerülése: get()
és a „sötét oldal”
Bár a get()
metódus lehetővé teszi egy CompletableFuture
eredményének lekérdezését, ez egy blokkoló művelet. Ha a Future
még nem fejeződött be, a hívó szál várni fog. Ez ellentétes az aszinkron programozás alapelvével, miszerint elkerüljük a fő szál blokkolását.
A jó gyakorlat az, hogy a CompletableFuture
láncolási metódusait használjuk (thenApply
, thenAccept
, thenCompose
stb.) ahelyett, hogy a get()
-tel várnánk az eredményre. A get()
használata csak akkor indokolt, ha egy aszinkron folyamat legvégén, vagy egy alkalmazás kilépésekor gyűjtjük össze az utolsó eredményeket, vagy tesztekben, ahol a szinkronizáció egyszerűsége felülírhatja az aszinkronitás előnyeit.
A join()
metódus hasonlóan működik, mint a get()
, de nem deklarálja a checked Exception
-öket, hanem CompletionException
-t dob. Ez is blokkoló, de kényelmesebb lehet láncoláskor.
Gyakori minták és tippek
Most, hogy áttekintettük az alapokat és a kompozíciós metódusokat, nézzünk néhány gyakori mintát és tippet.
Időtúllépés kezelése
A CompletableFuture
rendelkezik beépített metódusokkal az időtúllépés kezelésére:
orTimeout(long timeout, TimeUnit unit)
: Ha aCompletableFuture
nem fejeződik be a megadott időn belül, egyTimeoutException
-nel fog hibát dobni.completeOnTimeout(T value, long timeout, TimeUnit unit)
: Ha aCompletableFuture
nem fejeződik be időben, egy megadott értékkel fog befejeződni ahelyett, hogy hibát dobna.
CompletableFuture<String> slowTask = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(3000); } catch (InterruptedException e) {}
return "Lassú feladat vége.";
});
slowTask.orTimeout(2, TimeUnit.SECONDS) // 2 másodperc múlva TimeoutException
.exceptionally(ex -> {
System.err.println("Időtúllépés történt: " + ex.getMessage());
return "Időtúllépés miatt alternatív eredmény.";
})
.thenAccept(System.out::println);
Visszatérési próbálkozások (Retries)
Bár nincs közvetlen beépített mechanizmus, a CompletableFuture
segítségével könnyedén implementálhatunk visszatérési logikát. Ehhez jellemzően rekurziót vagy egy ciklust használunk, amely CompletableFuture
-t ad vissza.
public CompletableFuture<String> unreliableService(int attempt) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("Próbálkozás: " + attempt + " - " + Thread.currentThread().getName());
if (Math.random() < 0.7 && attempt < 3) { // 70% esély a hibára, max 3 próbálkozás
throw new RuntimeException("Ideiglenes hiba a szolgáltatásban!");
}
return "Adat sikeresen letöltve a " + attempt + ". próbálkozásra.";
}).exceptionallyCompose(ex -> { // exceptionalyCompose, ha a hiba egy új CompletableFuture-t igényel
if (attempt < 3) { // Maximum 3 próbálkozás
System.err.println("Hiba a " + attempt + ". próbálkozásnál, újrapróbálkozás...");
try { Thread.sleep(500); } catch (InterruptedException e) {} // Kis szünet
return unreliableService(attempt + 1);
} else {
return CompletableFuture.failedFuture(new RuntimeException("Nem sikerült lekérni az adatot 3 próbálkozás után."));
}
});
}
// ...
unreliableService(1)
.thenAccept(System.out::println)
.exceptionally(ex -> {
System.err.println("Végleges hiba: " + ex.getMessage());
return null;
});
Összefoglalás
A Java CompletableFuture egy erőteljes és elegáns eszköz az aszinkron és párhuzamos programozás kihívásainak kezelésére. Segítségével elkerülhetjük a blokkoló műveleteket, javíthatjuk az alkalmazások reakcióképességét és hatékonyságát, miközben a kódunk tiszta, olvasható és karbantartható marad.
A supplyAsync()
és runAsync()
alapvető műveletektől a komplex láncolási (thenApply
, thenCompose
), kombinálási (thenCombine
, allOf
) és hibakezelési (exceptionally
, handle
) mechanizmusokig, a CompletableFuture egy teljes eszköztárat kínál a modern Java fejlesztéshez. Ha még nem építetted be a mindennapi munkafolyamataidba, itt az ideje elkezdeni – alkalmazásaid és felhasználóid hálásak lesznek érte!
Leave a Reply