Poller
Polling Consumer
๋ฉ์์ง ๋์ (์ฑ๋ ์ด๋ํฐ)์ด ์ฑ๋์ ์ฐ๊ฒฐ๋๊ณ ์ธ์คํด์คํ๋๋ฉด ๋ค์ ์ธ์คํด์ค ์ค ํ๋๋ฅผ ์์ฑํฉ๋๋ค
PollingConsumer
PollableChannel
์ธํฐํ์ด์ค(์: QueueChannel)๋ฅผ ๊ตฌํํ๋ ์ฑ๋์ ์ฐ๊ฒฐ๋ ์ฑ๋ ์ด๋ํฐ๋ PollingConsumer์ ์ธ์คํด์ค๋ฅผ ์์ฑํฉ๋๋ค.
EventDrivenConsumer
SubscribableChannel
์ธํฐํ์ด์ค๋ฅผ ๊ตฌํํ๋ ์ฑ๋์ ์ฐ๊ฒฐ๋ ์ฑ๋ ์ด๋ํฐ๋ EventDrivenConsumer ์ธ์คํด์ค๋ฅผ ์์ฑํฉ๋๋ค.
Pollable Message Source
์ธ๋ฐ์ด๋ ์ฑ๋ ์ด๋ํฐ๊ฐ ์ฌ์ฉ๋๋ ๊ฒฝ์ฐ ์ด๋ฌํ ์ด๋ํฐ๋ ์ข ์ข SourcePollingChannelAdapter๋ก ๋ํ๋ฉ๋๋ค.
์๋ฅผ ๋ค์ด, ์๊ฒฉ FTP ์๋ฒ ์์น์์ ๋ฉ์์ง๋ฅผ ๊ฒ์ํ ๋ FTP ์ธ๋ฐ์ด๋ ์ฑ๋ ์ด๋ํฐ์ ์ค๋ช ๋ ์ด๋ํฐ๋ ๋ฉ์์ง๋ฅผ ์ฃผ๊ธฐ์ ์ผ๋ก ๊ฒ์ํ๋๋ก ํด๋ฌ๋ก ๊ตฌ์ฑ๋ฉ๋๋ค
๊ตฌ์ฑ ์์๊ฐ ํด๋ฌ๋ก ๊ตฌ์ฑ๋ ๊ฒฝ์ฐ ๊ฒฐ๊ณผ ์ธ์คํด์ค๋ ๋ค์ ์ ํ ์ค ํ๋์ ๋๋ค.
PollingConsumer
SourcePollingChannelAdapter
์ด๋ ํด๋ฌ๊ฐ ์ธ๋ฐ์ด๋ ๋ฐ ์์๋ฐ์ด๋ ๋ฉ์์ง ์๋๋ฆฌ์ค ๋ชจ๋์์ ์ฌ์ฉ๋จ์ ์๋ฏธํฉ๋๋ค. ํด๋ฌ๊ฐ ์ฌ์ฉ๋๋ ๋ช ๊ฐ์ง ์ฌ์ฉ ์ฌ๋ก๋ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
FTP ์๋ฒ, ๋ฐ์ดํฐ๋ฒ ์ด์ค ๋ฐ ์น ์๋น์ค์ ๊ฐ์ ํน์ ์ธ๋ถ ์์คํ ํด๋ง
๋ด๋ถ(ํด๋ง ๊ฐ๋ฅ) ๋ฉ์์ง ์ฑ๋ ํด๋ง
๋ด๋ถ ์๋น์ค ํด๋ง(์: Java ํด๋์ค์์ ๋ฐ๋ณต์ ์ผ๋ก ์คํ๋๋ ๋ฉ์๋)
Deferred Acknowledgment Pollable Message Source
๋ฒ์ 5.0.1๋ถํฐ ํน์ ๋ชจ๋์ ๋ค์ด์คํธ๋ฆผ ํ๋ฆ์ด ์๋ฃ๋ ๋๊น์ง(๋๋ ๋ฉ์์ง๋ฅผ ๋ค๋ฅธ ์ค๋ ๋๋ก ์ ๋ฌ) ์ฐ๊ธฐ ์น์ธ์ ์ง์ํ๋ MessageSource ๊ตฌํ์ ์ ๊ณตํฉ๋๋ค. ์ด๊ฒ์ ํ์ฌ AmqpMessageSource ๋ฐ KafkaMessageSource๋ก ์ ํ๋ฉ๋๋ค
์ด๋ฌํ ๋ฉ์์ง ์์ค๋ฅผ ์ฌ์ฉํ๋ฉด IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK ํค๋(MessageHeaderAccessor API ์ฐธ์กฐ)๊ฐ ๋ฉ์์ง์ ์ถ๊ฐ๋ฉ๋๋ค.
ํด๋ง ๊ฐ๋ฅํ ๋ฉ์์ง ์์ค์ ํจ๊ป ์ฌ์ฉํ๋ ๊ฒฝ์ฐ ํค๋ ๊ฐ์ ๋ค์ ์์ ์ ๊ฐ์ด AcknowledgementCallback์ ์ธ์คํด์ค์ ๋๋ค.
@FunctionalInterface
public interface AcknowledgmentCallback {
void acknowledge(Status status);
boolean isAcknowledged();
void noAutoAck();
default boolean isAutoAck();
enum Status {
/**
* Mark the message as accepted.
*/
ACCEPT,
/**
* Mark the message as rejected.
*/
REJECT,
/**
* Reject the message and requeue so that it will be redelivered.
*/
REQUEUE
}
}
Conditional Pollers for Message Sources
"Smart" Polling
๋ฒ์ 5.3์ ReceiveMessageAdvice ์ธํฐํ์ด์ค๋ฅผ ๋์
ํ์ต๋๋ค. ์ด ์ธํฐํ์ด์ค๋ฅผ ๊ตฌํํ๋ ์ด๋๋ฐ์ด์ค ์ฒด์ธ์ ๋ชจ๋ ์ด๋๋ฐ์ด์ค ๊ฐ์ฒด๋ receive() ์์
์๋ง ์ ์ฉ๋ฉ๋๋ค.(MessageSource.receive() ๋ฐ PollableChannel.receive(timeout))
.SourcePollingChannelAdapter
๋๋ PollingConsumer
์๋ง ์ ์ฉํ ์ ์์ต๋๋ค. ์ด๋ฌํ ํด๋์ค๋ ๋ค์ ๋ฉ์๋๋ฅผ ๊ตฌํํฉ๋๋ค.
beforeReceive(Object source) ์ด ๋ฉ์๋๋ Object.receive() ๋ฉ์๋ ์ ์ ํธ์ถ๋ฉ๋๋ค. ์ด๋ฅผ ํตํด ์์ค๋ฅผ ๊ฒ์ฌํ๊ณ ์ฌ๊ตฌ์ฑํ ์ ์์ต๋๋ค. false๋ฅผ ๋ฐํํ๋ฉด ์ด poll๊ฐ ์ทจ์๋ฉ๋๋ค(์์ ์ธ๊ธํ PollSkipAdvice์ ์ ์ฌ).
Message afterReceive(Message result, Object source)
์ด ๋ฉ์๋๋ receive() ๋ฉ์๋ ์ดํ์ ํธ์ถ๋ฉ๋๋ค.
Thread safety
Advice๊ฐ ์์ค๋ฅผ ๋ณ๊ฒฝํ๋ ๊ฒฝ์ฐ TaskExecutor๋ก ํด๋ฌ๋ฅผ ๊ตฌ์ฑํ๋ฉด ์ ๋ฉ๋๋ค. Advice๊ฐ ์์ค๋ฅผ ๋ณ๊ฒฝํ๋ ๊ฒฝ์ฐ ์ด๋ฌํ ๋ณ๊ฒฝ์ ์ค๋ ๋๋ก๋ถํฐ ์์ ํ์ง ์์ผ๋ฉฐ ํนํ ๊ณ ์ฃผํ ํด๋ฌ์์ ์๊ธฐ์น ์์ ๊ฒฐ๊ณผ๋ฅผ ์ด๋ํ ์ ์์ต๋๋ค. ํด๋ง ๊ฒฐ๊ณผ๋ฅผ ๋์์ ์ฒ๋ฆฌํด์ผ ํ๋ ๊ฒฝ์ฐ ํด๋ฌ์ ์คํ๊ธฐ๋ฅผ ์ถ๊ฐํ๋ ๋์ ๋ค์ด์คํธ๋ฆผ ExecutorChannel์ ์ฌ์ฉํ๋ ๊ฒ์ด ์ข์ต๋๋ค.
Advice Chain Ordering
์ด๊ธฐํ ์ค์ ์ด๋๋ฐ์ด์ค ์ฒด์ธ์ด ์ฒ๋ฆฌ๋๋ ๋ฐฉ์์ ์ดํดํด์ผ ํฉ๋๋ค. ReceiveMessageAdvice๋ฅผ ๊ตฌํํ์ง ์๋ ์ด๋๋ฐ์ด์ค ๊ฐ์ฒด๋ ์ ์ฒด ํด๋ง ํ๋ก์ธ์ค์ ์ ์ฉ๋๋ฉฐ ๋ชจ๋ ์์๋๋ก ReceiveMessageAdvice๋ณด๋ค ๋จผ์ ํธ์ถ๋ฉ๋๋ค. ๊ทธ๋ฌ๊ณ ReceiveMessageAdvice ๊ฐ์ฒด๊ฐ ์์ค receive() ๋ฉ์๋ ์ฃผ๋ณ์์ ์์๋๋ก ํธ์ถ๋ฉ๋๋ค.
CompoundTriggerAdvice
ํธ๋ฆฌ๊ฑฐ์ ๊ธฐ๋ณธ ํธ๋ฆฌ๊ฑฐ๋ CronTrigger์ผ ์ ์์ต๋๋ค. ์ด๋๋ฐ์ด์ค๊ฐ ์์ ๋ ๋ฉ์์ง๊ฐ ์์์ ๊ฐ์งํ๋ฉด ๋ ๋ฒ์งธ ํธ๋ฆฌ๊ฑฐ๋ฅผ CompoundTrigger
์ ์ถ๊ฐํฉ๋๋ค.CompoundTrigger
์ธ์คํด์ค์ nextExecutionTime
๋ฉ์๋๊ฐ ํธ์ถ๋๋ฉด ๋ณด์กฐ ํธ๋ฆฌ๊ฑฐ(์๋ ๊ฒฝ์ฐ)์ ์์ํฉ๋๋ค. ๊ทธ๋ ์ง ์์ผ๋ฉด ๊ธฐ๋ณธ ํธ๋ฆฌ๊ฑฐ์ ์์ํฉ๋๋ค.
Reference
Last updated
Was this helpful?