Reactive 與資料庫


在逐步應用 Reactive 風格之後,自然地,就會面對如何銜接至資料庫處理上的問題,有些資料庫提供非同步的驅動程式,例如 MongoDB,在銜接至 Reactor 或者是 WebFlux 上,也能有實質的益處。

當然,像 MongoDB 提供的非同步驅動程式,並不是 JDBC 標準規範,畢竟 JDBC 本身並沒有非同步的概念,如果想基於 JDBC 來處理資料庫,該怎麼做呢?一個簡單的想法是,透過 FluxMono 的一些方法,像是 defer 來轉接,並使用 subscribeOn 令資料流在訂閱時,會是在另一執行緒上。

問題是該在哪一層轉接?這就看怎麼處理比較方便,或者比較能增加點效益了,以〈套用 Spring Data JDBC〉有關資料庫處理的部份來說,或許在服務層,也就是 UserService 上處理會比較方便,修改一下大概就會像是:

package cc.openhome;

...略

@Service
public class UserService {
    @Autowired
    private AccountDAO accountDAO;

    @Autowired
    private MessageDAO messageDAO;

    @Autowired
    private PasswordEncoder passwordEncoder;

    @Autowired
    private Scheduler scheduler;


    public Mono<Account> tryCreateUser(String email, String username, String password)  {
        return Mono.defer(() -> {
            if(accountDAO.accountByEmail(email).isPresent() || accountDAO.accountByUsername(username).isPresent()) {
                return Mono.empty();
            }
            return createUser(username, email, password);
        }).subscribeOn(scheduler);
    }

    private Mono<Account> createUser(String username, String email, String password) {
        return Mono.defer(
            () -> Mono.just(
                accountDAO.save(
                    new Account(
                        username, 
                        email, 
                        passwordEncoder.encode(password), 
                        0, 
                        "ROLE_MEMBER"
                    )
               )
           )
        );
    }

    public Flux<Message> messages(String username) {
        return Flux.defer(() -> {
            List<Message> messages = messageDAO.messagesBy(username);
            messages.sort(Comparator.comparing(Message::getMillis).reversed());
            return Flux.fromIterable(messages);
        }).subscribeOn(scheduler);
    }   

    public Mono<Message> addMessage(String username, String blabla) {
        return Mono.defer(() -> Mono.just(
                messageDAO.save(
                    new Message(username, Instant.now().toEpochMilli(), blabla)
                )
            )
        ).subscribeOn(scheduler);

    }    

    public Mono<Void> deleteMessage(String username, String millis) {
        return Mono.defer(() -> {
            messageDAO.deleteMessageBy(username, millis);
            return Mono.<Void>empty();
        }).subscribeOn(scheduler);
    }

    public Mono<Boolean> userExisted(String username) {
        return Mono.defer(() -> {
            return Mono.just(accountDAO.accountByUsername(username).isPresent());
        }).subscribeOn(scheduler);
    }

    public Flux<Message> newestMessages(int n) {
        return Flux.defer(() -> Flux.fromIterable(messageDAO.newestMessages(n)))
                   .subscribeOn(scheduler);
    }

    public Mono<Boolean> emailExisted(String email) {
        return Mono.defer(() -> {
            return Mono.just(accountDAO.accountByEmail(email).isPresent());
        }).subscribeOn(scheduler);

    }

    public Mono<Account> verify(String email, String token) {
        return Mono.defer(() -> {
            Optional<Account> optionalAcct= accountDAO.accountByEmail(email);
            if(optionalAcct.isPresent()) {
                Account acct = optionalAcct.get();
                if(acct.getPassword().equals(token)) {
                    accountDAO.activateAccount(acct.getName());
                    return Mono.just(acct);
                }
            }
            return Mono.empty();
        }).subscribeOn(scheduler);
    }

    public Mono<Account> accountByNameEmail(String name, String email) {
        return Mono.defer(() -> {
            Optional<Account> optionalAcct = accountDAO.accountByUsername(name);
            if(optionalAcct.isPresent() && optionalAcct.get().getEmail().equals(email)) {
                return Mono.just(optionalAcct.get());
            }
            return Mono.empty();
        }).subscribeOn(scheduler);
    }

    public Mono<Void> resetPassword(String name, String password) {
        return Mono.defer(() -> {
            accountDAO.updatePassword(name, passwordEncoder.encode(password));
            return Mono.<Void>empty();
        }).subscribeOn(scheduler);
    }
}

你可以在 ReactiveJDBC 找到以上的專案,基本上,透過這類的方法,也可以將其他同步的資源轉為非同步。

當然,就上例來說,也可以進一步,在 AccountDAOMessageDAO 的層次,修改其介面,令相關方法傳回 FluxMono,由 AccountDAOMessageDAO 的實作類別來進行轉接的動作,只不過相對來說,就會麻煩一些,因為等於要封裝 JDBC 的相關細節,有的程式庫做了這類的動作,例如 rxjava2-jdbc,有興趣瞭解的話,可以看看〈Spring WebFlux and rxjava2-jdbc〉。

rxjava2-jdbc 這類程式庫,底層還使用 JDBC,不過透過其實現的非阻斷式連線池,以及一些資料流封裝,在實現自己的 DAO(Repository)時會比較輕鬆一些。

在 Java 商標的擁有者 Oracle 這方面,正在推行 Asynchronous Database Access API,簡稱 ADBA,也就是非同步的資料庫驅動程式,希望可以成為未來的標準,只是目前來說,可用性還未知,就看到的示範程式碼來看,是基於 CompletableFuture 之類來實現。

只是 Spring 看來不怎麼認同 ADBA,他們希望能基於 Reactive 風格,而不是單純的非同步,CompletableFuture 到底算不算是 Reactive 是個爭議,後來 Spring 在 2018 的 SpringOne 大會上,提出了 R2DBC,當然,基於自家的 Reactor,在〈Reactive Programming and Relational Databases〉中,可以略為看到 ADBA 與 R2DBC 的 API 比較。

Spring 官方直言,R2DBC 目的之一,就是要影響 ADBA 規範,在〈Experimental Reactive Relational Database Connectivity Driver, R2DBC, Announced at SpringOne〉報導中可以看到,Ben Hale 還蠻嗆地提到:

Spring doesn’t generate specs. We aren’t spec leads and we don’t host specifications. This project's entire goal is a way to influence the ADBA spec, and that is the best possible scenario. But make no mistake, I am not the sort of person who is going to tolerate the ADBA spec being bad. If they don’t take our advice, if they don’t see that Reactive is different from being async, then this is something that the Spring team will do.

當然,Spring 本身有 Spring Data,如果 Spring Data 的設計模型,能適時封裝底層,讓 API 運用上更一致一些就好了,事實上,對於一些本身有提供非同步驅動程式的資料庫,Spring Data 提供了 Reactive 的 Repository 版本,例如,MongoDB 就有 ReactiveMongoRepository,它實現了 ReactiveCrudRepository,從 API 文件上可以看到,都是傳回 FluxMono

在 R2DBC 這部份,也有 Spring Data R2DBC,不過就撰寫本身的這個時候,R2DBC 還在開發,未正式釋出的階段,Spring Data R2DBC 也就還不完整,不過我試出了查詢方面的操作,例如:

package cc.openhome;

import org.springframework.data.jdbc.repository.query.Query;
import org.springframework.data.repository.reactive.ReactiveCrudRepository;

import reactor.core.publisher.Mono;

public interface AccountDAO extends ReactiveCrudRepository<Account, Integer> {
    @Query("SELECT * FROM account WHERE name = $1")
    Mono<Account> accountByUsername(String name);

    @Query("SELECT * FROM account WHERE email = $1")
    Mono<Account> accountByEmail(String email);
}

有了這類 DAO,UserService 寫來就會輕鬆一些:

package cc.openhome;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Service
public class UserService {
    @Autowired
    private AccountDAO accountDAO;

    @Autowired
    private MessageDAO messageDAO;

    public Flux<Message> messages(String username) {
        return messageDAO.messagesBy(username);
    }   

    public Mono<Boolean> userExisted(String username) {
        return accountDAO.accountByUsername(username)
                         .map(acct -> true)
                         .switchIfEmpty(Mono.just(false));
    }

    public Flux<Message> newestMessages(int n) {
        return messageDAO.newestMessages(n);
    }

    public Mono<Boolean> emailExisted(String email) {
        return accountDAO.accountByEmail(email)
                .map(acct -> true)
                .switchIfEmpty(Mono.just(false));
    }

    public Mono<Account> accountByNameEmail(String name, String email) {
        return accountDAO.accountByUsername(name)
                         .flatMap(acct -> acct.getEmail().equals(email) ? Mono.just(acct) : Mono.empty());
    }
}

就撰寫本文的這時間點來說,儲存的部份還有些問題,如果你想看看實際的專案怎麼設定,可以參考 ReactiveR2DBC,當然,這日後還會再變動,而且未來應該會與 Spring Boot 整合的更好,相關設定應會省事許多。