Poller

Polling Consumer

๋ฉ”์‹œ์ง€ ๋์ (์ฑ„๋„ ์–ด๋Œ‘ํ„ฐ)์ด ์ฑ„๋„์— ์—ฐ๊ฒฐ๋˜๊ณ  ์ธ์Šคํ„ด์Šคํ™”๋˜๋ฉด ๋‹ค์Œ ์ธ์Šคํ„ด์Šค ์ค‘ ํ•˜๋‚˜๋ฅผ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค

  • PollingConsumer

    • PollableChannel ์ธํ„ฐํŽ˜์ด์Šค(์˜ˆ: QueueChannel)๋ฅผ ๊ตฌํ˜„ํ•˜๋Š” ์ฑ„๋„์— ์—ฐ๊ฒฐ๋œ ์ฑ„๋„ ์–ด๋Œ‘ํ„ฐ๋Š” PollingConsumer์˜ ์ธ์Šคํ„ด์Šค๋ฅผ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

  • EventDrivenConsumer

    • SubscribableChannel ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ๊ตฌํ˜„ํ•˜๋Š” ์ฑ„๋„์— ์—ฐ๊ฒฐ๋œ ์ฑ„๋„ ์–ด๋Œ‘ํ„ฐ๋Š” EventDrivenConsumer ์ธ์Šคํ„ด์Šค๋ฅผ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

Pollable Message Source

์ธ๋ฐ”์šด๋“œ ์ฑ„๋„ ์–ด๋Œ‘ํ„ฐ๊ฐ€ ์‚ฌ์šฉ๋˜๋Š” ๊ฒฝ์šฐ ์ด๋Ÿฌํ•œ ์–ด๋Œ‘ํ„ฐ๋Š” ์ข…์ข… SourcePollingChannelAdapter๋กœ ๋ž˜ํ•‘๋ฉ๋‹ˆ๋‹ค.

์˜ˆ๋ฅผ ๋“ค์–ด, ์›๊ฒฉ FTP ์„œ๋ฒ„ ์œ„์น˜์—์„œ ๋ฉ”์‹œ์ง€๋ฅผ ๊ฒ€์ƒ‰ํ•  ๋•Œ FTP ์ธ๋ฐ”์šด๋“œ ์ฑ„๋„ ์–ด๋Œ‘ํ„ฐ์— ์„ค๋ช…๋œ ์–ด๋Œ‘ํ„ฐ๋Š” ๋ฉ”์‹œ์ง€๋ฅผ ์ฃผ๊ธฐ์ ์œผ๋กœ ๊ฒ€์ƒ‰ํ•˜๋„๋ก ํด๋Ÿฌ๋กœ ๊ตฌ์„ฑ๋ฉ๋‹ˆ๋‹ค

๊ตฌ์„ฑ ์š”์†Œ๊ฐ€ ํด๋Ÿฌ๋กœ ๊ตฌ์„ฑ๋œ ๊ฒฝ์šฐ ๊ฒฐ๊ณผ ์ธ์Šคํ„ด์Šค๋Š” ๋‹ค์Œ ์œ ํ˜• ์ค‘ ํ•˜๋‚˜์ž…๋‹ˆ๋‹ค.

  • PollingConsumer

  • SourcePollingChannelAdapter

์ด๋Š” ํด๋Ÿฌ๊ฐ€ ์ธ๋ฐ”์šด๋“œ ๋ฐ ์•„์›ƒ๋ฐ”์šด๋“œ ๋ฉ”์‹œ์ง• ์‹œ๋‚˜๋ฆฌ์˜ค ๋ชจ๋‘์—์„œ ์‚ฌ์šฉ๋จ์„ ์˜๋ฏธํ•ฉ๋‹ˆ๋‹ค. ํด๋Ÿฌ๊ฐ€ ์‚ฌ์šฉ๋˜๋Š” ๋ช‡ ๊ฐ€์ง€ ์‚ฌ์šฉ ์‚ฌ๋ก€๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค.

  • FTP ์„œ๋ฒ„, ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ๋ฐ ์›น ์„œ๋น„์Šค์™€ ๊ฐ™์€ ํŠน์ • ์™ธ๋ถ€ ์‹œ์Šคํ…œ ํด๋ง

  • ๋‚ด๋ถ€(ํด๋ง ๊ฐ€๋Šฅ) ๋ฉ”์‹œ์ง€ ์ฑ„๋„ ํด๋ง

  • ๋‚ด๋ถ€ ์„œ๋น„์Šค ํด๋ง(์˜ˆ: Java ํด๋ž˜์Šค์—์„œ ๋ฐ˜๋ณต์ ์œผ๋กœ ์‹คํ–‰๋˜๋Š” ๋ฉ”์„œ๋“œ)

Deferred Acknowledgment Pollable Message Source

๋ฒ„์ „ 5.0.1๋ถ€ํ„ฐ ํŠน์ • ๋ชจ๋“ˆ์€ ๋‹ค์šด์ŠคํŠธ๋ฆผ ํ๋ฆ„์ด ์™„๋ฃŒ๋  ๋•Œ๊นŒ์ง€(๋˜๋Š” ๋ฉ”์‹œ์ง€๋ฅผ ๋‹ค๋ฅธ ์Šค๋ ˆ๋“œ๋กœ ์ „๋‹ฌ) ์—ฐ๊ธฐ ์Šน์ธ์„ ์ง€์›ํ•˜๋Š” MessageSource ๊ตฌํ˜„์„ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค. ์ด๊ฒƒ์€ ํ˜„์žฌ AmqpMessageSource ๋ฐ KafkaMessageSource๋กœ ์ œํ•œ๋ฉ๋‹ˆ๋‹ค

์ด๋Ÿฌํ•œ ๋ฉ”์‹œ์ง€ ์†Œ์Šค๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK ํ—ค๋”(MessageHeaderAccessor API ์ฐธ์กฐ)๊ฐ€ ๋ฉ”์‹œ์ง€์— ์ถ”๊ฐ€๋ฉ๋‹ˆ๋‹ค.

ํด๋ง ๊ฐ€๋Šฅํ•œ ๋ฉ”์‹œ์ง€ ์†Œ์Šค์™€ ํ•จ๊ป˜ ์‚ฌ์šฉํ•˜๋Š” ๊ฒฝ์šฐ ํ—ค๋” ๊ฐ’์€ ๋‹ค์Œ ์˜ˆ์ œ์™€ ๊ฐ™์ด AcknowledgementCallback์˜ ์ธ์Šคํ„ด์Šค์ž…๋‹ˆ๋‹ค.

@FunctionalInterface
public interface AcknowledgmentCallback {

    void acknowledge(Status status);

    boolean isAcknowledged();

    void noAutoAck();

    default boolean isAutoAck();

    enum Status {

        /**
         * Mark the message as accepted.
         */
        ACCEPT,

        /**
         * Mark the message as rejected.
         */
        REJECT,

        /**
         * Reject the message and requeue so that it will be redelivered.
         */
        REQUEUE

    }

}

Conditional Pollers for Message Sources

"Smart" Polling

๋ฒ„์ „ 5.3์€ ReceiveMessageAdvice ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ๋„์ž…ํ–ˆ์Šต๋‹ˆ๋‹ค. ์ด ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ๊ตฌํ˜„ํ•˜๋Š” ์–ด๋“œ๋ฐ”์ด์Šค ์ฒด์ธ์˜ ๋ชจ๋“  ์–ด๋“œ๋ฐ”์ด์Šค ๊ฐ์ฒด๋Š” receive() ์ž‘์—…์—๋งŒ ์ ์šฉ๋ฉ๋‹ˆ๋‹ค.(MessageSource.receive() ๋ฐ PollableChannel.receive(timeout)) .SourcePollingChannelAdapter ๋˜๋Š” PollingConsumer์—๋งŒ ์ ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด๋Ÿฌํ•œ ํด๋ž˜์Šค๋Š” ๋‹ค์Œ ๋ฉ”์„œ๋“œ๋ฅผ ๊ตฌํ˜„ํ•ฉ๋‹ˆ๋‹ค.

  • beforeReceive(Object source) ์ด ๋ฉ”์„œ๋“œ๋Š” Object.receive() ๋ฉ”์„œ๋“œ ์ „์— ํ˜ธ์ถœ๋ฉ๋‹ˆ๋‹ค. ์ด๋ฅผ ํ†ตํ•ด ์†Œ์Šค๋ฅผ ๊ฒ€์‚ฌํ•˜๊ณ  ์žฌ๊ตฌ์„ฑํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. false๋ฅผ ๋ฐ˜ํ™˜ํ•˜๋ฉด ์ด poll๊ฐ€ ์ทจ์†Œ๋ฉ๋‹ˆ๋‹ค(์•ž์„œ ์–ธ๊ธ‰ํ•œ PollSkipAdvice์™€ ์œ ์‚ฌ).

  • Message afterReceive(Message result, Object source) ์ด ๋ฉ”์†Œ๋“œ๋Š” receive() ๋ฉ”์†Œ๋“œ ์ดํ›„์— ํ˜ธ์ถœ๋ฉ๋‹ˆ๋‹ค.

CompoundTriggerAdvice

ํŠธ๋ฆฌ๊ฑฐ์˜ ๊ธฐ๋ณธ ํŠธ๋ฆฌ๊ฑฐ๋Š” CronTrigger์ผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์–ด๋“œ๋ฐ”์ด์Šค๊ฐ€ ์ˆ˜์‹ ๋œ ๋ฉ”์‹œ์ง€๊ฐ€ ์—†์Œ์„ ๊ฐ์ง€ํ•˜๋ฉด ๋‘ ๋ฒˆ์งธ ํŠธ๋ฆฌ๊ฑฐ๋ฅผ CompoundTrigger์— ์ถ”๊ฐ€ํ•ฉ๋‹ˆ๋‹ค.CompoundTrigger ์ธ์Šคํ„ด์Šค์˜ nextExecutionTime ๋ฉ”์„œ๋“œ๊ฐ€ ํ˜ธ์ถœ๋˜๋ฉด ๋ณด์กฐ ํŠธ๋ฆฌ๊ฑฐ(์žˆ๋Š” ๊ฒฝ์šฐ)์— ์œ„์ž„ํ•ฉ๋‹ˆ๋‹ค. ๊ทธ๋ ‡์ง€ ์•Š์œผ๋ฉด ๊ธฐ๋ณธ ํŠธ๋ฆฌ๊ฑฐ์— ์œ„์ž„ํ•ฉ๋‹ˆ๋‹ค.

Reference

Last updated

Was this helpful?