Poller
Polling Consumer
๋ฉ์์ง ๋์ (์ฑ๋ ์ด๋ํฐ)์ด ์ฑ๋์ ์ฐ๊ฒฐ๋๊ณ ์ธ์คํด์คํ๋๋ฉด ๋ค์ ์ธ์คํด์ค ์ค ํ๋๋ฅผ ์์ฑํฉ๋๋ค
PollingConsumerPollableChannel์ธํฐํ์ด์ค(์: QueueChannel)๋ฅผ ๊ตฌํํ๋ ์ฑ๋์ ์ฐ๊ฒฐ๋ ์ฑ๋ ์ด๋ํฐ๋ PollingConsumer์ ์ธ์คํด์ค๋ฅผ ์์ฑํฉ๋๋ค.
EventDrivenConsumerSubscribableChannel์ธํฐํ์ด์ค๋ฅผ ๊ตฌํํ๋ ์ฑ๋์ ์ฐ๊ฒฐ๋ ์ฑ๋ ์ด๋ํฐ๋ EventDrivenConsumer ์ธ์คํด์ค๋ฅผ ์์ฑํฉ๋๋ค.
Pollable Message Source
์ธ๋ฐ์ด๋ ์ฑ๋ ์ด๋ํฐ๊ฐ ์ฌ์ฉ๋๋ ๊ฒฝ์ฐ ์ด๋ฌํ ์ด๋ํฐ๋ ์ข ์ข SourcePollingChannelAdapter๋ก ๋ํ๋ฉ๋๋ค.
์๋ฅผ ๋ค์ด, ์๊ฒฉ FTP ์๋ฒ ์์น์์ ๋ฉ์์ง๋ฅผ ๊ฒ์ํ ๋ FTP ์ธ๋ฐ์ด๋ ์ฑ๋ ์ด๋ํฐ์ ์ค๋ช ๋ ์ด๋ํฐ๋ ๋ฉ์์ง๋ฅผ ์ฃผ๊ธฐ์ ์ผ๋ก ๊ฒ์ํ๋๋ก ํด๋ฌ๋ก ๊ตฌ์ฑ๋ฉ๋๋ค
๊ตฌ์ฑ ์์๊ฐ ํด๋ฌ๋ก ๊ตฌ์ฑ๋ ๊ฒฝ์ฐ ๊ฒฐ๊ณผ ์ธ์คํด์ค๋ ๋ค์ ์ ํ ์ค ํ๋์ ๋๋ค.
PollingConsumerSourcePollingChannelAdapter
์ด๋ ํด๋ฌ๊ฐ ์ธ๋ฐ์ด๋ ๋ฐ ์์๋ฐ์ด๋ ๋ฉ์์ง ์๋๋ฆฌ์ค ๋ชจ๋์์ ์ฌ์ฉ๋จ์ ์๋ฏธํฉ๋๋ค. ํด๋ฌ๊ฐ ์ฌ์ฉ๋๋ ๋ช ๊ฐ์ง ์ฌ์ฉ ์ฌ๋ก๋ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
FTP ์๋ฒ, ๋ฐ์ดํฐ๋ฒ ์ด์ค ๋ฐ ์น ์๋น์ค์ ๊ฐ์ ํน์ ์ธ๋ถ ์์คํ ํด๋ง
๋ด๋ถ(ํด๋ง ๊ฐ๋ฅ) ๋ฉ์์ง ์ฑ๋ ํด๋ง
๋ด๋ถ ์๋น์ค ํด๋ง(์: Java ํด๋์ค์์ ๋ฐ๋ณต์ ์ผ๋ก ์คํ๋๋ ๋ฉ์๋)
Deferred Acknowledgment Pollable Message Source
๋ฒ์ 5.0.1๋ถํฐ ํน์ ๋ชจ๋์ ๋ค์ด์คํธ๋ฆผ ํ๋ฆ์ด ์๋ฃ๋ ๋๊น์ง(๋๋ ๋ฉ์์ง๋ฅผ ๋ค๋ฅธ ์ค๋ ๋๋ก ์ ๋ฌ) ์ฐ๊ธฐ ์น์ธ์ ์ง์ํ๋ MessageSource ๊ตฌํ์ ์ ๊ณตํฉ๋๋ค. ์ด๊ฒ์ ํ์ฌ AmqpMessageSource ๋ฐ KafkaMessageSource๋ก ์ ํ๋ฉ๋๋ค
์ด๋ฌํ ๋ฉ์์ง ์์ค๋ฅผ ์ฌ์ฉํ๋ฉด IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK ํค๋(MessageHeaderAccessor API ์ฐธ์กฐ)๊ฐ ๋ฉ์์ง์ ์ถ๊ฐ๋ฉ๋๋ค.
ํด๋ง ๊ฐ๋ฅํ ๋ฉ์์ง ์์ค์ ํจ๊ป ์ฌ์ฉํ๋ ๊ฒฝ์ฐ ํค๋ ๊ฐ์ ๋ค์ ์์ ์ ๊ฐ์ด AcknowledgementCallback์ ์ธ์คํด์ค์ ๋๋ค.
Conditional Pollers for Message Sources
"Smart" Polling
๋ฒ์ 5.3์ ReceiveMessageAdvice ์ธํฐํ์ด์ค๋ฅผ ๋์
ํ์ต๋๋ค. ์ด ์ธํฐํ์ด์ค๋ฅผ ๊ตฌํํ๋ ์ด๋๋ฐ์ด์ค ์ฒด์ธ์ ๋ชจ๋ ์ด๋๋ฐ์ด์ค ๊ฐ์ฒด๋ receive() ์์
์๋ง ์ ์ฉ๋ฉ๋๋ค.(MessageSource.receive() ๋ฐ PollableChannel.receive(timeout)) .SourcePollingChannelAdapter ๋๋ PollingConsumer์๋ง ์ ์ฉํ ์ ์์ต๋๋ค. ์ด๋ฌํ ํด๋์ค๋ ๋ค์ ๋ฉ์๋๋ฅผ ๊ตฌํํฉ๋๋ค.
beforeReceive(Object source) ์ด ๋ฉ์๋๋ Object.receive() ๋ฉ์๋ ์ ์ ํธ์ถ๋ฉ๋๋ค. ์ด๋ฅผ ํตํด ์์ค๋ฅผ ๊ฒ์ฌํ๊ณ ์ฌ๊ตฌ์ฑํ ์ ์์ต๋๋ค. false๋ฅผ ๋ฐํํ๋ฉด ์ด poll๊ฐ ์ทจ์๋ฉ๋๋ค(์์ ์ธ๊ธํ PollSkipAdvice์ ์ ์ฌ).
Message afterReceive(Message result, Object source)์ด ๋ฉ์๋๋ receive() ๋ฉ์๋ ์ดํ์ ํธ์ถ๋ฉ๋๋ค.
Thread safety
Advice๊ฐ ์์ค๋ฅผ ๋ณ๊ฒฝํ๋ ๊ฒฝ์ฐ TaskExecutor๋ก ํด๋ฌ๋ฅผ ๊ตฌ์ฑํ๋ฉด ์ ๋ฉ๋๋ค. Advice๊ฐ ์์ค๋ฅผ ๋ณ๊ฒฝํ๋ ๊ฒฝ์ฐ ์ด๋ฌํ ๋ณ๊ฒฝ์ ์ค๋ ๋๋ก๋ถํฐ ์์ ํ์ง ์์ผ๋ฉฐ ํนํ ๊ณ ์ฃผํ ํด๋ฌ์์ ์๊ธฐ์น ์์ ๊ฒฐ๊ณผ๋ฅผ ์ด๋ํ ์ ์์ต๋๋ค. ํด๋ง ๊ฒฐ๊ณผ๋ฅผ ๋์์ ์ฒ๋ฆฌํด์ผ ํ๋ ๊ฒฝ์ฐ ํด๋ฌ์ ์คํ๊ธฐ๋ฅผ ์ถ๊ฐํ๋ ๋์ ๋ค์ด์คํธ๋ฆผ ExecutorChannel์ ์ฌ์ฉํ๋ ๊ฒ์ด ์ข์ต๋๋ค.
Advice Chain Ordering
์ด๊ธฐํ ์ค์ ์ด๋๋ฐ์ด์ค ์ฒด์ธ์ด ์ฒ๋ฆฌ๋๋ ๋ฐฉ์์ ์ดํดํด์ผ ํฉ๋๋ค. ReceiveMessageAdvice๋ฅผ ๊ตฌํํ์ง ์๋ ์ด๋๋ฐ์ด์ค ๊ฐ์ฒด๋ ์ ์ฒด ํด๋ง ํ๋ก์ธ์ค์ ์ ์ฉ๋๋ฉฐ ๋ชจ๋ ์์๋๋ก ReceiveMessageAdvice๋ณด๋ค ๋จผ์ ํธ์ถ๋ฉ๋๋ค. ๊ทธ๋ฌ๊ณ ReceiveMessageAdvice ๊ฐ์ฒด๊ฐ ์์ค receive() ๋ฉ์๋ ์ฃผ๋ณ์์ ์์๋๋ก ํธ์ถ๋ฉ๋๋ค.
CompoundTriggerAdvice
ํธ๋ฆฌ๊ฑฐ์ ๊ธฐ๋ณธ ํธ๋ฆฌ๊ฑฐ๋ CronTrigger์ผ ์ ์์ต๋๋ค. ์ด๋๋ฐ์ด์ค๊ฐ ์์ ๋ ๋ฉ์์ง๊ฐ ์์์ ๊ฐ์งํ๋ฉด ๋ ๋ฒ์งธ ํธ๋ฆฌ๊ฑฐ๋ฅผ CompoundTrigger์ ์ถ๊ฐํฉ๋๋ค.CompoundTrigger ์ธ์คํด์ค์ nextExecutionTime ๋ฉ์๋๊ฐ ํธ์ถ๋๋ฉด ๋ณด์กฐ ํธ๋ฆฌ๊ฑฐ(์๋ ๊ฒฝ์ฐ)์ ์์ํฉ๋๋ค. ๊ทธ๋ ์ง ์์ผ๋ฉด ๊ธฐ๋ณธ ํธ๋ฆฌ๊ฑฐ์ ์์ํฉ๋๋ค.
Reference
Last updated