趣改 gossip(二)


在〈趣改 gossip(一)〉中,已經將基於 Spring MVC 的 gossip 專案,在盡量少的修改下,令其可在 WebFlux 的環境下跑起來,不過,流程基本仍是阻斷式的,接著就是想辦法運用 Reactor 的 API,以 Reactive 資料流、非阻斷的風格的概念,修改應用程式。

首先要處理的是涉及資料庫連線的部份,因為 gossip 是基於 JDBC 來連線 H2,R2DBC 目前還不成熟,因此這邊決定還是先用 defersubscribeOn 的方式來銜接 DAO,也就是運用〈Reactive 與資料庫〉第一個 UserService 的修改成果。

接著來修改控制器,DisplayController 比較簡單,先看看修改後的成果:

package cc.openhome.controller;

...略


@Controller
public class DisplayController {
    ...略

//    @GetMapping("/")
//    public String index(Model model) {
//        model.addAttribute("newest", reactiveUserService.newestMessages(10));
//        return INDEX_PATH;
//    }

    @GetMapping("/")
    public Mono<String> index(Model model) {
        return userService
                    .newestMessages(10)
                    .collectList()
                    .map(messages -> model.addAttribute("newest", messages))
                    .then(Mono.just(INDEX_PATH));

    }

    @GetMapping("user/{username}")
    public Mono<String> user(
            @PathVariable("username") String username,
            Model model) {

        return userService
                   .userExisted(username)
                   .doOnNext(isExisted -> {
                       if(isExisted) {
                           model.addAttribute("messages", userService.messages(username));
                       }
                       else {
                           model.addAttribute("errors", Arrays.asList(String.format("%s 還沒有發表訊息", username)));
                       }
                   })
                   .then(Mono.just(USER_PATH));
    }
}

控制器的處理器方法可以傳回 Mono 或字串,現在問題來了,該傳回哪個?這關乎兩個考量,一是 API 的銜接,二是儘早執行完處理器方法,讓流程控制權儘早回到 DispatcherHandler,從其 handler 方法返回,進而讓 Netty 儘早釋放它分配的執行緒,如果你的操作涉及 IO 等阻斷,可以儘量運用資料流相關 API 來達到此目的。

那麼像上頭被註解掉的部份,就有選擇性了,reactiveUserService.newestMessages(10) 傳回 Flux<Message>,發佈流程還沒有觸發,你要使用註解部份的寫法,或者是沒被註解的 index 方法,兩者是一樣的。

然而,阻斷不一定是因為 IO,也有可能是 CPU 密集性的計算過久,若處理器方法中有這類計算,也可以考慮將整個處理器中的流程,封裝至一個 Mono 中,例如,MemberController 就特意這麼做:

package cc.openhome.controller;

...

@Controller
public class MemberController {
    @Value("${path.view.member}")
    private String MEMBER_PATH;

    @Value( "#{'redirect:' + '${path.url.member}'}")
    private String REDIRECT_MEMBER_PATH;

    @Autowired
    private UserService userService;    

    @GetMapping("member")
    @PostMapping("member")
    public Mono<String> member(
            Principal principal, 
            Model model) {
        return userService
                   .messages(principal.getName())
                   .collectList()
                   .map(messages -> model.addAttribute("messages", messages))
                   .then(Mono.just(MEMBER_PATH));
    }

    @PostMapping("new_message")
    protected Mono<String> newMessage(
            ServerWebExchange webExchange, 
            Principal principal, 
            Model model)  {

        return webExchange
                   .getFormData()
                   .map(valueMap -> valueMap.getFirst("blabla"))
                   .flatMap(blabla -> {
                       if(blabla.length() == 0) {
                           return Mono.just(REDIRECT_MEMBER_PATH);
                       }     

                       String username = principal.getName();
                       if(blabla.length() <= 140) {
                           return userService
                                    .addMessage(username, blabla)
                                    .then(Mono.just(REDIRECT_MEMBER_PATH));
                       }
                       else {
                           return userService.messages(username)
                                    .collectList()
                                    .doOnSuccess(messages -> {
                                        model.addAttribute("messages", messages);
                                        model.addAttribute("blabla", blabla);
                                    })
                                    .then(Mono.just(MEMBER_PATH));
                       }  
                   });
    } 

    @PostMapping("del_message")
    protected Mono<String> delMessage(
            ServerWebExchange webExchange, 
            Principal principal) {

        return webExchange
                   .getFormData()
                   .flatMap(valueMap -> userService.deleteMessage(principal.getName(), valueMap.getFirst("millis")))
                   .then(Mono.just(REDIRECT_MEMBER_PATH));
    }   
}

newMessage 中對 blabla 的計算,是可以不用封裝到 flatMap 中的,不過,假設這類計算很耗成本,對流程儘早回到 DispatcherHandler 的手中,這種寫法應該會有些幫助。

接下來,EmailService 也要是 Reactive 風格:

package cc.openhome.model;

import reactor.core.publisher.Mono;

public interface EmailService {
    public Mono<Void> validationLink(Account acct);
    public Mono<Void> failedRegistration(String acctName, String acctEmail);
    public Mono<Void> passwordResetLink(Account account);
}

GmailService 也做出對應的修改:

package cc.openhome.model;

...略

@Service
public class GmailService implements EmailService {
    @Autowired
    private JavaMailSender mailSender;

    @Autowired
    private Scheduler scheduler;

    @Override
    public Mono<Void> validationLink(Account acct) {
        String link = String.format(
            "http://localhost:8080/verify?email=%s&token=%s", 
             uriEncode(acct.getEmail()), uriEncode(acct.getPassword())
        );

        String anchor = String.format("<a href='%s'>驗證郵件</a>", link);
        String html = String.format("請按 %s 啟用帳戶或複製鏈結至網址列:<br><br> %s", anchor, link);

        return sendMessage(acct.getEmail(), "Gossip 註冊結果", html);
    }

   ...略


    private String uriEncode(String text) {
        try {
            return URLEncoder.encode(text, "UTF-8");
        } catch (UnsupportedEncodingException e) {
            throw new RuntimeException(e);
        } 
    }
}

你會在意呼叫 sendMessage 的成本嗎?同樣地,若在意的話,可以自行將之封裝至 Mono 之中。

接著,要修改 AccountController,其實與先前控制器的修改類似,不過要注意的是,在我嘗試修改的過程中,@SessionAttributes 會失效,雖然有查到 SPR-15887 中曾經提出這問題,看來似乎也有解決,不過就撰寫本文的時間點來說,我還是試不出來。

這倒無妨,改使用 ServerWebExchangegetSession 來處理就可以了:

package cc.openhome.controller;

...略

@Controller
public class AccountController {

    ...略

    @GetMapping("register")
    public String registerForm() {
        return REGISTER_FORM_PATH;
    }

    @PostMapping("register")
    public Mono<String> register(
            @Valid RegisterForm form,
            BindingResult bindingResult,
            Model model) {

        return toList(bindingResult)
                   .flatMap(errors -> {
                       if(errors.isEmpty()) {
                            return userService
                                        .tryCreateUser(form.getEmail(), form.getUsername(), form.getPassword())
                                        .flatMap(acct -> emailService.validationLink(acct))
                                        .switchIfEmpty(emailService.failedRegistration(form.getUsername(), form.getEmail()))
                                        .then(Mono.just(REGISTER_SUCCESS_PATH));
                       } 
                       model.addAttribute("errors", errors);
                       return Mono.just(REGISTER_FORM_PATH);
                   });
    }

    @GetMapping("verify")
    public Mono<String> verify(
            @RequestParam String email,
            @RequestParam String token,
            Model model) {

        return userService.verify(email, token)
                          .doOnNext(acct -> model.addAttribute("acct", acct))
                          .then(Mono.just(VERIFY_PATH));
    }

    @PostMapping("forgot")
    public Mono<String> forgot(
            ServerWebExchange webExchange, 
            Model model) {

        return webExchange
                   .getFormData()
                   .flatMap(formData -> {
                        String name = formData.getFirst("name");
                        String email = formData.getFirst("email");
                        model.addAttribute("email", email);
                        return userService.accountByNameEmail(name, email);
                   })
                   .flatMap(acct -> emailService.passwordResetLink(acct))
                   .then(Mono.just(FORGOT_PATH));
    }

    @GetMapping("reset_password")
    public Mono<String> resetPasswordForm(
            @RequestParam String name,
            @RequestParam String email,
            @RequestParam String token,
            ServerWebExchange webExchange, 
            Model model) {

        return userService
                    .accountByNameEmail(name, email)
                    .filter(acct -> acct.getPassword().equals(token))
                    .flatMap(acct -> {              
                        model.addAttribute("acct", acct);
                        return webExchange.getSession()
                                    .map(webSession -> webSession.getAttributes().put("token", token))
                                    .then(Mono.just(RESET_PASSWORD_FORM_PATH));
                    })
                    .switchIfEmpty(Mono.just(REDIRECT_INDEX_PATH));
    }

    @PostMapping("reset_password")
    public Mono<String> resetPassword( 
            @Valid ResetPasswordForm form,
            BindingResult bindingResult,
            @SessionAttribute(name = "token") String storedToken,
            Model model) {

        if(storedToken == null || !storedToken.equals(form.getToken())) {
            return Mono.just(REDIRECT_INDEX_PATH);
        }

        return toList(bindingResult)
                    .flatMap(errors -> {
                        if(!errors.isEmpty()) {
                            model.addAttribute("errors", errors);
                            model.addAttribute("name", form.getName());
                            model.addAttribute("email", form.getEmail());
                            return Mono.just(RESET_PASSWORD_FORM_PATH);
                        } 
                        return userService
                                     .resetPassword(form.getName(), form.getPassword())
                                     .then(Mono.just(RESET_PASSWORD_SUCCESS_PATH)); 
                    });
    } 

    private Mono<List<String>> toList(BindingResult bindingResult) {
        return Mono.fromCallable(() -> {
            List<String> errors = new ArrayList<>(); 
            if(bindingResult.hasErrors()) {
                bindingResult.getFieldErrors().forEach(err -> {
                    errors.add(err.getDefaultMessage());
                });
            }
            return errors;
        });
    }    
}

在〈趣改 gossip(一)〉中曾經談過 HtmlSanitizer 可以有更簡單的寫法,沒有沒有呼叫 block 進行阻斷的情況下,可以在 getFormData 之後直接進行 map

package cc.openhome.aspect;

...

@Component
@Aspect
public class HtmlSanitizer {
    @Autowired
    private PolicyFactory policy;

    @Around("execution(* cc.openhome.controller.MemberController.newMessage(..))")
    public Object around(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
        Object[] args = proceedingJoinPoint.getArgs();
        ServerWebExchange decorated = new SanitizedServerWebExchange((ServerWebExchange) args[0]);
        args[0] = decorated;
        return proceedingJoinPoint.proceed(args);
    }

    class SanitizedServerWebExchange extends ServerWebExchangeDecorator {
        public SanitizedServerWebExchange(ServerWebExchange origin) {
            super(origin);
        }

        @Override
        public Mono<MultiValueMap<String, String>> getFormData() {
            return super.getFormData()
                         .map(multiValueMap -> {
                                multiValueMap.computeIfPresent("blabla", (param, values) -> {
                                     values.set(0, policy.sanitize(values.get(0)));
                                     return values;
                                });
                                return multiValueMap;
                         });
        }   
    }
}

這樣應該就沒問題了,你可以在 gossip 找到以上的範例專案,當然,如果要再積極一些,或許可以設定之後,繼續修改為 REST 風格,搭配前端非同步處理,以發揮 WebFlux (號稱)的優點,這就自己試了吧!

就撰寫本文的這個時間點來說,我不是很建議真的投入 WebFlux 的使用,主要的問題之一在於文件,官方網站上的文件有點不可靠,特別是 Spring MVC 註解模型的部份,感覺有點像是直接從 Spring MVC 本身的文件抄過來,有些東西號稱可以用,然而也許實際試不出來,另外,你想做的東西,文件上可能沒有,通常就只能憑經驗,翻翻 WebFlux、Security 等的原始碼,想辦法拆解出做法。

另一個問題在於生態系還有待形成,如果 WebFlux 真有其效率上的優點,應用程式其他部件也很配合才能發揮,Reactive 寫出來的程式,在某些地方很有用,然而,全面採行的話,維護性是個爭議點,例如,就程式的可讀性等方面來說,懂 Reactive 與熟練 Functional Programming 風格的開發者,可能覺得寫來很爽,然而,也有其他開發者持反對的態度,認為 Reactive 或 Functional Programming 風格用的過火的話,整個應用程式反而難以理解。

不過,就熟悉 Reactive 風格開發來說,Reactor 與 WebFlux 是個不錯的對象,它用來真的比較直覺,也易於銜接 Java 8 的 API,如果過去你在接觸 Reactive 風格開發有過挫折,從 Reactor 與 WebFlux 入手,或許會有不一樣的體驗。