Hogyan írj aszinkron kódot Java CompletableFuture segítségével?

Üdvözlünk a modern Java világában, ahol az alkalmazások teljesítménye és reakcióképessége kritikus fontosságú. A felhasználók gyors válaszokat várnak, a rendszereknek pedig hatékonyan kell kezelniük a párhuzamos feladatokat anélkül, hogy blokkolnák a fő szálat. Itt jön képbe az aszinkron programozás, és vele együtt a Java 8 egyik legforradalmibb kiegészítése: a CompletableFuture.

Ebben a cikkben mélyrehatóan megvizsgáljuk, hogyan használhatjuk a CompletableFuture-t aszinkron és nem-blokkoló kód írására. Felfedezzük annak alapjaitól kezdve a komplex kompozíciós mintákig mindent, ami ahhoz szükséges, hogy a legtöbbet hozd ki ebből a hihetetlenül erős eszközből.

Miért van szükség aszinkron kódra?

Képzeljünk el egy webes alkalmazást, amelynek adatokat kell letöltenie egy külső API-ból, feldolgoznia azokat, majd elmentenie egy adatbázisba. Ha ezeket a műveleteket szinkron módon, egymás után hajtjuk végre, a felhasználóknak hosszú másodperceket kell várniuk, mire a válasz megérkezik. Ez egy rossz felhasználói élményt eredményez, és pazarlás a szerver erőforrásaira nézve, mivel a fő szál tétlenül várakozik az I/O műveletek befejezésére.

Az aszinkron programozás lényege, hogy a potenciálisan időigényes feladatokat háttérben futtatjuk, miközben a fő szál szabadon marad más feladatok elvégzésére vagy további felhasználói kérések kezelésére. Amikor a háttérben futó feladat befejeződik, értesíti a fő szálat az eredményről, amely aztán feldolgozhatja azt.

A Java régebbi verzióiban ez jellemzően explicit szálkezeléssel (Thread) vagy a Future interfésszel történt. Bár ezek hasznosak voltak, gyakran vezethettek komplex kódhoz, az úgynevezett „callback hell”-hez, ahol a függő feladatok láncolása nehezen olvashatóvá és karbantarthatóvá tette a kódot. A CompletableFuture pontosan erre a problémára kínál elegáns és hatékony megoldást.

Mi az a CompletableFuture?

A CompletableFuture a Java 8-ban bevezetett osztály, amely a Future interfészt valósítja meg, de annál sokkal rugalmasabb és funkcionálisabb. Amíg a hagyományos Future csak lekérdezni tudta egy aszinkron művelet eredményét (és blokkolta a hívó szálat, amíg az eredmény el nem készült), addig a CompletableFuture képessé tesz minket arra, hogy:

  • Kézzel fejezzünk be egy jövőbeli műveletet, vagy jelezzünk hibát.
  • Láncoljunk össze több aszinkron műveletet, anélkül, hogy blokkolnánk a szálat.
  • Kombináljunk több jövőbeli eredményt.
  • Kezeljük a hibákat elegánsan.

Lényegében egy CompletableFuture egy olyan ígéretet képvisel, hogy egy érték egy bizonyos időpontban elérhetővé válik – vagy egy hiba történik. Ahelyett, hogy megvárnánk az értéket, megadhatjuk, hogy mit tegyen az alkalmazás, ha az érték elkészült (vagy hiba történt).

Alapok: CompletableFuture létrehozása és indítása

Kezdjük az alapokkal: hogyan hozhatunk létre és indíthatunk el aszinkron feladatokat a CompletableFuture segítségével.

1. Érték visszaadása: supplyAsync()

Ha egy aszinkron feladatnak értéket kell visszaadnia, a supplyAsync() metódust használjuk. Ez egy Supplier interfészt vár paraméterül.

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;

public class CompletableFutureAlapok {
    public static void main(String[] args) {
        System.out.println("Fő szál indult: " + Thread.currentThread().getName());

        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("Aszinkron feladat fut: " + Thread.currentThread().getName());
                Thread.sleep(2000); // Szimulálunk egy időigényes műveletet
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return "Hello, CompletableFuture!";
        });

        // Itt a fő szál folytathatja a munkát, nem blokkol
        System.out.println("Fő szál folytatja a munkát...");

        // Lekérdezzük az eredményt (ez blokkolja a fő szálat, ha még nem készült el)
        try {
            String result = future.get(); // Blokkoló hívás, de csak a példa kedvéért
            System.out.println("Eredmény: " + result);
        } catch (Exception e) {
            e.printStackTrace();
        }

        System.out.println("Fő szál befejeződött.");
    }
}

Fontos megjegyezni, hogy a future.get() blokkolja a szálat, amíg az eredmény el nem készül. A CompletableFuture igazi ereje azonban nem ebben rejlik, hanem a nem-blokkoló kompozíciós metódusaiban.

2. Nincs visszaadott érték: runAsync()

Ha az aszinkron feladatnak nincs visszaadott értéke (mellékhatást fejt ki), a runAsync() metódust használjuk, amely egy Runnable interfészt vár paraméterül.

CompletableFuture<Void> futureAction = CompletableFuture.runAsync(() -> {
    System.out.println("Aszinkron művelet, eredmény nélkül: " + Thread.currentThread().getName());
    // Például: adatbázisba írás
});

3. Kézi befejezés: new CompletableFuture()

Létrehozhatunk egy „üres” CompletableFuture-t is, amelyet később manuálisan fejezhetünk be egy értékkel vagy egy kivétellel. Ez akkor hasznos, ha egy régebbi API-t vagy egy teljesen külső rendszert szeretnénk becsomagolni, amely nem ad vissza CompletableFuture-t.

CompletableFuture<String> manualFuture = new CompletableFuture<>();

// Később, egy másik szálon vagy esemény hatására
new Thread(() -> {
    try {
        Thread.sleep(1500);
        manualFuture.complete("Kézzel befejezett eredmény!"); // Siker
    } catch (InterruptedException e) {
        manualFuture.completeExceptionally(e); // Hiba
    }
}).start();

manualFuture.thenAccept(System.out::println);

Aszinkron műveletek láncolása és kompozíciója

Itt mutatkozik meg igazán a CompletableFuture képessége. Ahelyett, hogy blokkolnánk a fő szálat, különböző metódusokkal láncolhatjuk és kombinálhatjuk az aszinkron feladatokat.

1. Eredmény transzformálása: thenApply() és thenApplyAsync()

Ha egy aszinkron feladat eredményét szeretnénk egy másik értékre transzformálni, a thenApply() metódust használjuk, amely egy Function-t vár. Ha a transzformációt egy másik szálon szeretnénk futtatni, a thenApplyAsync() metódust hívjuk meg.

CompletableFuture<Integer> futureLength = CompletableFuture.supplyAsync(() -> "Hello Világ")
    .thenApply(s -> {
        System.out.println("Transzformáció fut: " + Thread.currentThread().getName());
        return s.length();
    });

futureLength.thenAccept(len -> System.out.println("A string hossza: " + len));

2. Eredmény felhasználása (mellékhatás): thenAccept() és thenAcceptAsync()

Ha egy aszinkron feladat eredményét fel szeretnénk használni egy mellékhatás kifejtésére (pl. kiírás konzolra, adatbázisba mentés), de nem szeretnénk új értéket visszaadni, a thenAccept() metódust használjuk, amely egy Consumer-t vár.

CompletableFuture.supplyAsync(() -> 123)
    .thenAccept(number -> {
        System.out.println("Feldolgozom a számot: " + number + " szálon: " + Thread.currentThread().getName());
    });

3. Független művelet futtatása: thenRun() és thenRunAsync()

Ha egy előző CompletableFuture befejezése után egy olyan Runnable-t szeretnénk futtatni, amely nem függ az előző eredményétől és nem is ad vissza értéket, a thenRun() metódust használjuk.

CompletableFuture.supplyAsync(() -> "Adat betöltve")
    .thenRun(() -> System.out.println("Betöltés befejezve! Szál: " + Thread.currentThread().getName()));

4. Láncolt CompletableFuture-ök: thenCompose() és thenComposeAsync()

Ez a metódus az egyik legfontosabb a komplex aszinkron folyamatok építésekor. Akkor használjuk, ha az egyik aszinkron lépés eredményéből egy másik CompletableFuture-t akarunk generálni. Ezzel elkerülhetjük az egymásba ágyazott CompletableFuture-ket (CompletableFuture<CompletableFuture<T>>).

CompletableFuture<String> fetchUserId() {
    return CompletableFuture.supplyAsync(() -> {
        System.out.println("User ID lekérése: " + Thread.currentThread().getName());
        return "user123";
    });
}

CompletableFuture<String> fetchUserName(String userId) {
    return CompletableFuture.supplyAsync(() -> {
        System.out.println("Felhasználónév lekérése (" + userId + "): " + Thread.currentThread().getName());
        return "Kovács János";
    });
}

// ...
CompletableFuture<String> userNameFuture = fetchUserId()
    .thenCompose(this::fetchUserName);

userNameFuture.thenAccept(name -> System.out.println("Lekért felhasználónév: " + name));

A thenCompose() hasonló a Stream API flatMap() metódusához – „kisimítja” a jövőbeli objektumokat.

5. Két független CompletableFuture kombinálása: thenCombine() és thenCombineAsync()

Ha két független aszinkron feladat eredményét szeretnénk kombinálni, a thenCombine() metódust használjuk. Ez egy BiFunction-t vár paraméterül.

CompletableFuture<String> weatherFuture = CompletableFuture.supplyAsync(() -> {
    try { Thread.sleep(1000); } catch (InterruptedException e) {}
    return "Napos, 25°C";
});

CompletableFuture<String> newsFuture = CompletableFuture.supplyAsync(() -> {
    try { Thread.sleep(1500); } catch (InterruptedException e) {}
    return "Sport eredmények";
});

CompletableFuture<String> combinedFuture = weatherFuture
    .thenCombine(newsFuture, (weather, news) -> {
        System.out.println("Kombinálás fut: " + Thread.currentThread().getName());
        return "Időjárás: " + weather + ", Hírek: " + news;
    });

combinedFuture.thenAccept(System.out::println);

6. Több CompletableFuture együttes kezelése: allOf() és anyOf()

Néha szükségünk van arra, hogy megvárjuk több aszinkron feladat befejezését, mielőtt tovább lépnénk. Az allOf() és anyOf() statikus metódusok segítenek ebben.

  • CompletableFuture.allOf(CompletableFuture... futures): Akkor fejeződik be, amikor az összes paraméterként megadott CompletableFuture befejeződött. Az eredménye egy CompletableFuture<Void>, így az egyedi eredményeket külön kell lekérdezni (pl. a join() metódussal).
  • CompletableFuture.anyOf(CompletableFuture... futures): Akkor fejeződik be, amikor az *első* paraméterként megadott CompletableFuture befejeződött. Az eredménye egy CompletableFuture<Object>, amely az első befejezett Future eredményét tartalmazza.
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> "Eredmény 1");
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> "Eredmény 2");
CompletableFuture<String> task3 = CompletableFuture.supplyAsync(() -> {
    try { Thread.sleep(500); } catch (InterruptedException e) {}
    return "Eredmény 3 (gyorsabb)";
});

// Várjunk meg minden feladatot
CompletableFuture<Void> allTasks = CompletableFuture.allOf(task1, task2, task3);
allTasks.thenRun(() -> {
    System.out.println("Minden feladat befejeződött.");
    // Itt lekérdezhetjük az egyedi eredményeket
    System.out.println(task1.join()); // join() blokkolás nélkül adja vissza az eredményt, ha a future már kész
    System.out.println(task2.join());
    System.out.println(task3.join());
});

// Várjuk meg az első befejeződő feladatot
CompletableFuture<Object> anyTask = CompletableFuture.anyOf(task1, task2, task3);
anyTask.thenAccept(result -> System.out.println("Az első befejezett feladat eredménye: " + result));

Hibakezelés CompletableFuture-rel

Az aszinkron műveletek során elkerülhetetlenek a hibák. A CompletableFuture kiváló eszközöket biztosít a hibák elegáns kezelésére.

1. Kivételek elfogása: exceptionally()

Ez a metódus lehetővé teszi, hogy egy CompletableFuture láncban bekövetkező kivételt elkapjuk, és egy alapértelmezett értéket adjunk vissza, vagy egy alternatív logikát futtassunk.

CompletableFuture<String> faultyFuture = CompletableFuture.supplyAsync(() -> {
    if (Math.random() < 0.5) {
        throw new RuntimeException("Valami hiba történt!");
    }
    return "Sikeres eredmény.";
});

faultyFuture
    .exceptionally(ex -> {
        System.err.println("Hiba történt: " + ex.getMessage());
        return "Helyreállított eredmény (alapértelmezett)."; // Hiba esetén visszaadott érték
    })
    .thenAccept(System.out::println);

2. Siker és hiba kezelése egyaránt: handle()

A handle() metódus egy BiFunction-t vár, amelynek két paramétere van: az eredmény és a kivétel. Az egyik mindig null lesz, a másik tartalmazza az értéket vagy a kivételt. Ez lehetőséget ad az eredmény transzformálására vagy a hiba helyreállítására.

CompletableFuture<String> result = CompletableFuture.supplyAsync(() -> {
    if (Math.random() < 0.5) {
        throw new IllegalArgumentException("Érvénytelen bemenet!");
    }
    return "Sikeres adat.";
}).handle((res, ex) -> {
    if (ex != null) {
        System.err.println("Kivételel kezelve a handle-ben: " + ex.getMessage());
        return "Hiba utáni feldolgozás.";
    }
    return res + " (feldolgozva)";
});

result.thenAccept(System.out::println);

3. Befejezés utáni művelet: whenComplete()

A whenComplete() metódus akkor hasznos, ha egy műveletet szeretnénk végrehajtani, akár sikeresen, akár hibával fejeződött be az előző szakasz, de az eredményt vagy a kivételt nem szeretnénk módosítani. Például logolásra vagy erőforrások felszabadítására alkalmas.

CompletableFuture.supplyAsync(() -> {
    System.out.println("Művelet fut...");
    if (Math.random() < 0.5) {
        throw new RuntimeException("Kivétel a whenComplete előtt!");
    }
    return "Sikeres adat";
})
.whenComplete((res, ex) -> {
    if (ex != null) {
        System.err.println("Hiba történt: " + ex.getMessage());
    } else {
        System.out.println("Sikeresen befejeződött, eredmény: " + res);
    }
})
.thenAccept(finalResult -> System.out.println("Végső eredmény (nem módosult): " + finalResult))
.exceptionally(e -> { // Ezt csak akkor hívja meg, ha a whenComplete után is Exception van
    System.err.println("Végső hibakezelés: " + e.getMessage());
    return null; // A lánc lezárása
});

Executorok és testreszabott szálkezelés

A CompletableFuture metódusai, amelyek `Async` utótaggal rendelkeznek (pl. supplyAsync(), thenApplyAsync()), alapértelmezés szerint a ForkJoinPool.commonPool()-t használják a feladatok futtatására. Ez a legtöbb esetben megfelelő, de néha szükség lehet egy testreszabott ExecutorService megadására.

Miért?

  • Erőforrás-kontroll: Egy dedikált ExecutorService segítségével jobban szabályozhatjuk a szálak számát, például ha tudjuk, hogy egy feladat I/O-intenzív (sok várakozás) vagy CPU-intenzív (sok számítás).
  • Deadlock elkerülése: Ha a commonPool-t túlterheljük blokkoló feladatokkal, az más, nem-blokkoló feladatok futását is akadályozhatja.
  • Kontextus öröklése: Bizonyos esetekben (pl. logolás, biztonsági kontextus) előnyös lehet a szálkontextus átadása, ami testreszabott Executor-ral könnyebb.

Egyéni Executor megadása egyszerű:

ExecutorService customExecutor = Executors.newFixedThreadPool(5); // 5 szálat tartalmazó pool

CompletableFuture.supplyAsync(() -> {
    System.out.println("Custom Executoron fut: " + Thread.currentThread().getName());
    return "Sikeres!";
}, customExecutor) // Itt adjuk meg az executort
.thenAccept(System.out::println)
.whenComplete((res, ex) -> customExecutor.shutdown()); // Ne felejtsük el leállítani!

Blokkolás elkerülése: get() és a „sötét oldal”

Bár a get() metódus lehetővé teszi egy CompletableFuture eredményének lekérdezését, ez egy blokkoló művelet. Ha a Future még nem fejeződött be, a hívó szál várni fog. Ez ellentétes az aszinkron programozás alapelvével, miszerint elkerüljük a fő szál blokkolását.

A jó gyakorlat az, hogy a CompletableFuture láncolási metódusait használjuk (thenApply, thenAccept, thenCompose stb.) ahelyett, hogy a get()-tel várnánk az eredményre. A get() használata csak akkor indokolt, ha egy aszinkron folyamat legvégén, vagy egy alkalmazás kilépésekor gyűjtjük össze az utolsó eredményeket, vagy tesztekben, ahol a szinkronizáció egyszerűsége felülírhatja az aszinkronitás előnyeit.

A join() metódus hasonlóan működik, mint a get(), de nem deklarálja a checked Exception-öket, hanem CompletionException-t dob. Ez is blokkoló, de kényelmesebb lehet láncoláskor.

Gyakori minták és tippek

Most, hogy áttekintettük az alapokat és a kompozíciós metódusokat, nézzünk néhány gyakori mintát és tippet.

Időtúllépés kezelése

A CompletableFuture rendelkezik beépített metódusokkal az időtúllépés kezelésére:

  • orTimeout(long timeout, TimeUnit unit): Ha a CompletableFuture nem fejeződik be a megadott időn belül, egy TimeoutException-nel fog hibát dobni.
  • completeOnTimeout(T value, long timeout, TimeUnit unit): Ha a CompletableFuture nem fejeződik be időben, egy megadott értékkel fog befejeződni ahelyett, hogy hibát dobna.
CompletableFuture<String> slowTask = CompletableFuture.supplyAsync(() -> {
    try { Thread.sleep(3000); } catch (InterruptedException e) {}
    return "Lassú feladat vége.";
});

slowTask.orTimeout(2, TimeUnit.SECONDS) // 2 másodperc múlva TimeoutException
    .exceptionally(ex -> {
        System.err.println("Időtúllépés történt: " + ex.getMessage());
        return "Időtúllépés miatt alternatív eredmény.";
    })
    .thenAccept(System.out::println);

Visszatérési próbálkozások (Retries)

Bár nincs közvetlen beépített mechanizmus, a CompletableFuture segítségével könnyedén implementálhatunk visszatérési logikát. Ehhez jellemzően rekurziót vagy egy ciklust használunk, amely CompletableFuture-t ad vissza.

public CompletableFuture<String> unreliableService(int attempt) {
    return CompletableFuture.supplyAsync(() -> {
        System.out.println("Próbálkozás: " + attempt + " - " + Thread.currentThread().getName());
        if (Math.random() < 0.7 && attempt < 3) { // 70% esély a hibára, max 3 próbálkozás
            throw new RuntimeException("Ideiglenes hiba a szolgáltatásban!");
        }
        return "Adat sikeresen letöltve a " + attempt + ". próbálkozásra.";
    }).exceptionallyCompose(ex -> { // exceptionalyCompose, ha a hiba egy új CompletableFuture-t igényel
        if (attempt < 3) { // Maximum 3 próbálkozás
            System.err.println("Hiba a " + attempt + ". próbálkozásnál, újrapróbálkozás...");
            try { Thread.sleep(500); } catch (InterruptedException e) {} // Kis szünet
            return unreliableService(attempt + 1);
        } else {
            return CompletableFuture.failedFuture(new RuntimeException("Nem sikerült lekérni az adatot 3 próbálkozás után."));
        }
    });
}

// ...
unreliableService(1)
    .thenAccept(System.out::println)
    .exceptionally(ex -> {
        System.err.println("Végleges hiba: " + ex.getMessage());
        return null;
    });

Összefoglalás

A Java CompletableFuture egy erőteljes és elegáns eszköz az aszinkron és párhuzamos programozás kihívásainak kezelésére. Segítségével elkerülhetjük a blokkoló műveleteket, javíthatjuk az alkalmazások reakcióképességét és hatékonyságát, miközben a kódunk tiszta, olvasható és karbantartható marad.

A supplyAsync() és runAsync() alapvető műveletektől a komplex láncolási (thenApply, thenCompose), kombinálási (thenCombine, allOf) és hibakezelési (exceptionally, handle) mechanizmusokig, a CompletableFuture egy teljes eszköztárat kínál a modern Java fejlesztéshez. Ha még nem építetted be a mindennapi munkafolyamataidba, itt az ideje elkezdeni – alkalmazásaid és felhasználóid hálásak lesznek érte!

Leave a Reply

Az e-mail címet nem tesszük közzé. A kötelező mezőket * karakterrel jelöltük