Message Channels
The MessageChannel Interface
public interface MessageChannel {
boolean send(Message message);
boolean send(Message message, long timeout);
}
๋ฉ์์ง๋ฅผ ๋ณด๋ผ ๋ ๋ฉ์์ง๊ฐ ์ฑ๊ณต์ ์ผ๋ก ์ ์ก๋๋ฉด ๋ฐํ ๊ฐ์ true์ ๋๋ค. ๋ณด๋ด๊ธฐ ํธ์ถ์ด ์๊ฐ ์ด๊ณผ๋๊ฑฐ๋ ์ค๋จ๋๋ฉด false๋ฅผ ๋ฐํํฉ๋๋ค.
PollableChannel
๋ฉ์์ง ์ฑ๋์ ๋ฉ์์ง๋ฅผ ๋ฒํผ๋งํ๊ฑฐ๋ ๋ฒํผ๋งํ์ง ์์ ์ ์์ผ๋ฏ๋ก ๋ ๊ฐ์ ํ์ ์ธํฐํ์ด์ค๊ฐ ๋ฒํผ๋ง(ํด๋ง ๊ฐ๋ฅ) ๋ฐ ๋น๋ฒํผ๋ง(๊ตฌ๋ ๊ฐ๋ฅ) ์ฑ๋ ๋์์ ์ ์ํฉ๋๋ค
public interface PollableChannel extends MessageChannel {
Message<?> receive();
Message<?> receive(long timeout);
}
SubscribableChannel
SubscribableChannel ๊ธฐ๋ณธ ์ธํฐํ์ด์ค๋ ๊ตฌ๋ ๋ MessageHandler ์ธ์คํด์ค์ ๋ฉ์์ง๋ฅผ ์ง์ ๋ณด๋ด๋ ์ฑ๋์ ์ํด ๊ตฌํ๋ฉ๋๋ค. ๋ฐ๋ผ์ ํด๋ง์ ์ํ ์์ ๋ฐฉ๋ฒ์ ์ ๊ณตํ์ง ์์ต๋๋ค. ๋์ ์ด๋ฌํ ๊ตฌ๋ ์๋ฅผ ๊ด๋ฆฌํ๋ ๋ฐฉ๋ฒ์ ์ ์ํฉ๋๋ค.
public interface SubscribableChannel extends MessageChannel {
boolean subscribe(MessageHandler handler);
boolean unsubscribe(MessageHandler handler);
}
Message Channel Implementations
PublishSubscribeChannel
PublishSubscribeChannel ๊ตฌํ์ ์ ์ก๋ ๋ชจ๋ ๋ฉ์์ง๋ฅผ ๊ตฌ๋ ํ ๋ชจ๋ ํธ๋ค๋ฌ๋ก ๋ธ๋ก๋์บ์คํธํฉ๋๋ค.
QueueChannel
QueueChannel ๊ตฌํ์ queue๋ก ๋ํํฉ๋๋ค. PublishSubscribeChannel๊ณผ ๋ฌ๋ฆฌ QueueChannel์๋ point-to-point semantics๊ฐ ์์ต๋๋ค. ์ฆ, ์ฑ๋์ ์ฌ๋ฌ ์๋น์๊ฐ ์๋๋ผ๋ ๊ทธ ์ค ํ๋๋ง ํด๋น ์ฑ๋๋ก ์ ์ก๋ ๋ฉ์์ง๋ฅผ ์์ ํด์ผ ํฉ๋๋ค.
์ธ์๊ฐ ์๋ ๊ธฐ๋ณธ ์์ฑ์(๋ณธ์ง์ ์ผ๋ก ์ ํ๋์ง ์์ Integer.MAX_VALUE ์ฉ๋ ์ ๊ณต)์ ๋ค์ ๋ชฉ๋ก๊ณผ ๊ฐ์ด ๋๊ธฐ์ด ์ฉ๋์ ํ์ฉํ๋ ์์ฑ์๋ฅผ ์ ๊ณตํฉ๋๋ค.
public QueueChannel(int capacity)
PriorityChannel
QueueChannel์ ์ ์ ์ ์ถ(FIFO) ์์๋ฅผ ์ ์ฉํ๋ ๋ฐ๋ฉด, PriorityChannel์ ์ฐ์ ์์์ ๋ฐ๋ผ ์ฑ๋ ๋ด์์ ๋ฉ์์ง๋ฅผ ์ ๋ ฌํ ์ ์๋ ๋์ฒด ๊ตฌํ์ ๋๋ค
๊ธฐ๋ณธ์ ์ผ๋ก ์ฐ์ ์์๋ ๊ฐ ๋ฉ์์ง ๋ด์ ์ฐ์ ์์ ํค๋์ ์ํด ๊ฒฐ์ ๋ฉ๋๋ค.
RendezvousChannel
RendezvousChannel์ ๋ค๋ฅธ ๋น์ฌ์๊ฐ ์ฑ๋์ receive() ๋ฉ์๋๋ฅผ ํธ์ถํ ๋๊น์ง ๋ฐ์ ์๊ฐ ์ฐจ๋จํ๋ "์ง์ ํธ๋์คํ" ์๋๋ฆฌ์ค๋ฅผ ๊ฐ๋ฅํ๊ฒ ํฉ๋๋ค
DirectChannel
DirectChannel์๋ ์ง์ ๊ฐ ์๋ฏธ ์ฒด๊ณ๊ฐ ์์ง๋ง ๊ทธ ์ธ์๋ ์์์ ์ค๋ช ํ ๋๊ธฐ์ด ๊ธฐ๋ฐ ์ฑ๋ ๊ตฌํ๋ณด๋ค PublishSubscribeChannel๊ณผ ๋ ์ ์ฌํฉ๋๋ค
PollableChannel ์ธํฐํ์ด์ค ๋์ SubscribableChannel ์ธํฐํ์ด์ค๋ฅผ ๊ตฌํํ๋ฏ๋ก ๋ฉ์์ง๋ฅผ ๊ตฌ๋ ์์๊ฒ ์ง์ ๋ฐ์กํฉ๋๋ค. ๊ทธ๋ฌ๋ ์ ๋์ ์ฑ๋๋ก์ ๊ฐ ๋ฉ์์ง๋ฅผ ๋จ์ผ ๊ฐ์ MessageHandler๋ก ๋ณด๋ธ๋ค๋ ์ ์์ PublishSubscribeChannel๊ณผ ๋ค๋ฆ ๋๋ค.
ExecutorChannel
ExecutorChannel์ DirectChannel๊ณผ ๋์ผํ ๋์คํจ์ฒ ๊ตฌ์ฑ(๋ถํ ๊ท ํ ์ ๋ต ๋ฐ ์ฅ์ ์กฐ์น ๋ถ์ธ ์์ฑ)์ ์ง์ํ๋ ์ง์ ๊ฐ ์ฑ๋์ ๋๋ค.
์ด ๋ ๋์คํจ์น ์ฑ๋ ์ ํ์ ์ฃผ์ ์ฐจ์ด์ ์ ExecutorChannel์ด ๋์คํจ์น๋ฅผ โโ์ํํ๊ธฐ ์ํด TaskExecutor์ ์ธ์คํด์ค์ ์์ํ๋ค๋ ๊ฒ์ ๋๋ค.
์ฆ, send ๋ฉ์๋๋ ์ผ๋ฐ์ ์ผ๋ก ์ฐจ๋จํ์ง ์์ง๋ง ๋ณด๋ธ ์ฌ๋์ ์ค๋ ๋์์ ์ฒ๋ฆฌ๊ธฐ ํธ์ถ์ด ๋ฐ์ํ์ง ์์ ์๋ ์์์ ์๋ฏธํฉ๋๋ค.
PartitionedChannel
๋ฒ์ 6.1๋ถํฐ PartitionedChannel ๊ตฌํ์ด ์ ๊ณต๋ฉ๋๋ค. ์ด๊ฒ์ AbstractExecutorChannel
์ ํ์ฅ์ด๋ฉฐ ์ด ์ฑ๋๋ก ์ ์ก๋ ๋ฉ์์ง์์ ํ๊ฐ๋ ํํฐ์
ํค์ ์ํด ๊ฒฐ์ ๋๋ ํน์ ์ค๋ ๋
์์ ์ค์ ์๋น๊ฐ ์ฒ๋ฆฌ๋๋ ์ง์ ๊ฐ ๋์คํจ์นญ ๋
ผ๋ฆฌ๋ฅผ ๋ํ๋
๋๋ค.
์ด ์ฑ๋์ ์์์ ์ธ๊ธํ ExecutorChannel๊ณผ ๋น์ทํ์ง๋ง ํํฐ์ ํค๊ฐ ๊ฐ์ ๋ฉ์์ง๋ ํญ์ ์์๋ฅผ ์ ์งํ๋ฉด์ ๋์ผํ ์ค๋ ๋์์ ์ฒ๋ฆฌ๋๋ค๋ ์ฐจ์ด์ ์ด ์์ต๋๋ค.
์ธ๋ถ TaskExecutor๊ฐ ํ์ํ์ง ์์ง๋ง ์ฌ์ฉ์ ์ง์ ThreadFactory๋ก ๊ตฌ์ฑํ ์ ์์ต๋๋ค.
์ด ํฉํ ๋ฆฌ๋ ๋จ์ผ ์ค๋ ๋ ์คํ๊ธฐ๋ฅผ ํํฐ์
๋ณ๋ก MessageDispatcher ๋๋ฆฌ์๋ก ์ฑ์ฐ๋ ๋ฐ ์ฌ์ฉ๋ฉ๋๋ค. ๊ธฐ๋ณธ์ ์ผ๋ก IntegrationMessageHeaderAccessor.CORRELATION_ID
๋ฉ์์ง ํค๋๊ฐ ํํฐ์
ํค๋ก ์ฌ์ฉ๋ฉ๋๋ค.
@Bean
PartitionedChannel somePartitionedChannel() {
return new PartitionedChannel(3, (message) -> message.getHeaders().get("partitionKey"));
}
// ์ฑ๋์๋ ์ ์ฉ ์ค๋ ๋์ธ 3๊ฐ์ ํํฐ์
์ด ์์ต๋๋ค.
// ๋ฉ์์ง๋ฅผ ์ฒ๋ฆฌํ ํํฐ์
์ ๊ฒฐ์ ํ๊ธฐ ์ํด partitionKey ํค๋๋ฅผ ์ฌ์ฉํฉ๋๋ค.
FluxMessageChannel
FluxMessageChannel์ ๋ณด๋ธ ๋ฉ์์ง๋ฅผ ๋ด๋ถ react.core.publisher.Flux
๋ก "์ฑํฌ"ํ๊ธฐ ์ํ org.reactivestreams.Publisher ๊ตฌํ์
๋๋ค.
Channel Interceptors
๋ฉ์์ง ์ํคํ ์ฒ์ ์ฅ์ ์ค ํ๋๋ ์ผ๋ฐ์ ์ธ ๋์์ ์ ๊ณตํ๊ณ ์์คํ ์ ํต๊ณผํ๋ ๋ฉ์์ง์ ๋ํ ์๋ฏธ ์๋ ์ ๋ณด๋ฅผ ๋น์นจํฌ์ ์ธ ๋ฐฉ์์ผ๋ก ์บก์ฒํ๋ ๊ธฐ๋ฅ์ ๋๋ค.
Message ์ธ์คํด์ค๋ MessageChannel ์ธ์คํด์ค์์ ์ก์์ ๋๋ฏ๋ก ์ด๋ฌํ ์ฑ๋์ ์ก์์ ์์ ์ ๊ฐ๋ก์ฑ ์ ์๋ ๊ธฐํ๋ฅผ ์ ๊ณตํฉ๋๋ค.
public interface ChannelInterceptor {
Message<?> preSend(Message<?> message, MessageChannel channel);
void postSend(Message<?> message, MessageChannel channel, boolean sent);
void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex);
boolean preReceive(MessageChannel channel);
Message<?> postReceive(Message<?> message, MessageChannel channel);
void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex);
}
์ธํฐํ์ด์ค๋ฅผ ๊ตฌํํ ํ ์ธํฐ์ ํฐ๋ฅผ ์ฑ๋์ ๋ฑ๋กํ๋ ๊ฒ์ ๋ค์ ํธ์ถ์ ์ํํ๊ธฐ๋ง ํ๋ฉด ๋ฉ๋๋ค.
channel.addInterceptor(someChannelInterceptor);
MessagingTemplate
Spring Integration์ ์์ฒญ ๋ฐ ์๋ต ์๋๋ฆฌ์ค๋ฅผ ํฌํจํ์ฌ ๋ฉ์์ง ์ฑ๋์์ ๋ค์ํ ์์ ์ ์ง์ํ๋ MessagingTemplate์ ์ ๊ณตํฉ๋๋ค.
Example
MessagingTemplate template = new MessagingTemplate();
Message reply = template.sendAndReceive(someChannel, new GenericMessage("test"));
Signatures
public boolean send(final MessageChannel channel, final Message<?> message) { ...
}
public Message<?> sendAndReceive(final MessageChannel channel, final Message<?> request) { ...
}
public Message<?> receive(final PollableChannel<?> channel) { ...
}
Special Channels
ErrorChannel
NullChannel
Reference
Last updated
Was this helpful?