Message Channels

The MessageChannel Interface

public interface MessageChannel {

    boolean send(Message message);

    boolean send(Message message, long timeout);
}

๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ผ ๋•Œ ๋ฉ”์‹œ์ง€๊ฐ€ ์„ฑ๊ณต์ ์œผ๋กœ ์ „์†ก๋˜๋ฉด ๋ฐ˜ํ™˜ ๊ฐ’์€ true์ž…๋‹ˆ๋‹ค. ๋ณด๋‚ด๊ธฐ ํ˜ธ์ถœ์ด ์‹œ๊ฐ„ ์ดˆ๊ณผ๋˜๊ฑฐ๋‚˜ ์ค‘๋‹จ๋˜๋ฉด false๋ฅผ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค.

PollableChannel

๋ฉ”์‹œ์ง€ ์ฑ„๋„์€ ๋ฉ”์‹œ์ง€๋ฅผ ๋ฒ„ํผ๋งํ•˜๊ฑฐ๋‚˜ ๋ฒ„ํผ๋งํ•˜์ง€ ์•Š์„ ์ˆ˜ ์žˆ์œผ๋ฏ€๋กœ ๋‘ ๊ฐœ์˜ ํ•˜์œ„ ์ธํ„ฐํŽ˜์ด์Šค๊ฐ€ ๋ฒ„ํผ๋ง(ํด๋ง ๊ฐ€๋Šฅ) ๋ฐ ๋น„๋ฒ„ํผ๋ง(๊ตฌ๋… ๊ฐ€๋Šฅ) ์ฑ„๋„ ๋™์ž‘์„ ์ •์˜ํ•ฉ๋‹ˆ๋‹ค

public interface PollableChannel extends MessageChannel {

    Message<?> receive();

    Message<?> receive(long timeout);

}

SubscribableChannel

SubscribableChannel ๊ธฐ๋ณธ ์ธํ„ฐํŽ˜์ด์Šค๋Š” ๊ตฌ๋…๋œ MessageHandler ์ธ์Šคํ„ด์Šค์— ๋ฉ”์‹œ์ง€๋ฅผ ์ง์ ‘ ๋ณด๋‚ด๋Š” ์ฑ„๋„์— ์˜ํ•ด ๊ตฌํ˜„๋ฉ๋‹ˆ๋‹ค. ๋”ฐ๋ผ์„œ ํด๋ง์„ ์œ„ํ•œ ์ˆ˜์‹  ๋ฐฉ๋ฒ•์„ ์ œ๊ณตํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค. ๋Œ€์‹  ์ด๋Ÿฌํ•œ ๊ตฌ๋…์ž๋ฅผ ๊ด€๋ฆฌํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ์ •์˜ํ•ฉ๋‹ˆ๋‹ค.

public interface SubscribableChannel extends MessageChannel {

    boolean subscribe(MessageHandler handler);

    boolean unsubscribe(MessageHandler handler);

}

Message Channel Implementations

PublishSubscribeChannel

PublishSubscribeChannel ๊ตฌํ˜„์€ ์ „์†ก๋œ ๋ชจ๋“  ๋ฉ”์‹œ์ง€๋ฅผ ๊ตฌ๋…ํ•œ ๋ชจ๋“  ํ•ธ๋“ค๋Ÿฌ๋กœ ๋ธŒ๋กœ๋“œ์บ์ŠคํŠธํ•ฉ๋‹ˆ๋‹ค.

QueueChannel

QueueChannel ๊ตฌํ˜„์€ queue๋กœ ๋ž˜ํ•‘ํ•ฉ๋‹ˆ๋‹ค. PublishSubscribeChannel๊ณผ ๋‹ฌ๋ฆฌ QueueChannel์—๋Š” point-to-point semantics๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค. ์ฆ‰, ์ฑ„๋„์— ์—ฌ๋Ÿฌ ์†Œ๋น„์ž๊ฐ€ ์žˆ๋”๋ผ๋„ ๊ทธ ์ค‘ ํ•˜๋‚˜๋งŒ ํ•ด๋‹น ์ฑ„๋„๋กœ ์ „์†ก๋œ ๋ฉ”์‹œ์ง€๋ฅผ ์ˆ˜์‹ ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

์ธ์ˆ˜๊ฐ€ ์—†๋Š” ๊ธฐ๋ณธ ์ƒ์„ฑ์ž(๋ณธ์งˆ์ ์œผ๋กœ ์ œํ•œ๋˜์ง€ ์•Š์€ Integer.MAX_VALUE ์šฉ๋Ÿ‰ ์ œ๊ณต)์™€ ๋‹ค์Œ ๋ชฉ๋ก๊ณผ ๊ฐ™์ด ๋Œ€๊ธฐ์—ด ์šฉ๋Ÿ‰์„ ํ—ˆ์šฉํ•˜๋Š” ์ƒ์„ฑ์ž๋ฅผ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค.

public QueueChannel(int capacity)

PriorityChannel

QueueChannel์€ ์„ ์ž…์„ ์ถœ(FIFO) ์ˆœ์„œ๋ฅผ ์ ์šฉํ•˜๋Š” ๋ฐ˜๋ฉด, PriorityChannel์€ ์šฐ์„ ์ˆœ์œ„์— ๋”ฐ๋ผ ์ฑ„๋„ ๋‚ด์—์„œ ๋ฉ”์‹œ์ง€๋ฅผ ์ •๋ ฌํ•  ์ˆ˜ ์žˆ๋Š” ๋Œ€์ฒด ๊ตฌํ˜„์ž…๋‹ˆ๋‹ค

๊ธฐ๋ณธ์ ์œผ๋กœ ์šฐ์„  ์ˆœ์œ„๋Š” ๊ฐ ๋ฉ”์‹œ์ง€ ๋‚ด์˜ ์šฐ์„  ์ˆœ์œ„ ํ—ค๋”์— ์˜ํ•ด ๊ฒฐ์ •๋ฉ๋‹ˆ๋‹ค.

RendezvousChannel

RendezvousChannel์€ ๋‹ค๋ฅธ ๋‹น์‚ฌ์ž๊ฐ€ ์ฑ„๋„์˜ receive() ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•  ๋•Œ๊นŒ์ง€ ๋ฐœ์‹ ์ž๊ฐ€ ์ฐจ๋‹จํ•˜๋Š” "์ง์ ‘ ํ•ธ๋“œ์˜คํ”„" ์‹œ๋‚˜๋ฆฌ์˜ค๋ฅผ ๊ฐ€๋Šฅํ•˜๊ฒŒ ํ•ฉ๋‹ˆ๋‹ค

DirectChannel

DirectChannel์—๋Š” ์ง€์  ๊ฐ„ ์˜๋ฏธ ์ฒด๊ณ„๊ฐ€ ์žˆ์ง€๋งŒ ๊ทธ ์™ธ์—๋Š” ์•ž์—์„œ ์„ค๋ช…ํ•œ ๋Œ€๊ธฐ์—ด ๊ธฐ๋ฐ˜ ์ฑ„๋„ ๊ตฌํ˜„๋ณด๋‹ค PublishSubscribeChannel๊ณผ ๋” ์œ ์‚ฌํ•ฉ๋‹ˆ๋‹ค

PollableChannel ์ธํ„ฐํŽ˜์ด์Šค ๋Œ€์‹  SubscribableChannel ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ๊ตฌํ˜„ํ•˜๋ฏ€๋กœ ๋ฉ”์‹œ์ง€๋ฅผ ๊ตฌ๋…์ž์—๊ฒŒ ์ง์ ‘ ๋ฐœ์†กํ•ฉ๋‹ˆ๋‹ค. ๊ทธ๋Ÿฌ๋‚˜ ์ ๋Œ€์  ์ฑ„๋„๋กœ์„œ ๊ฐ ๋ฉ”์‹œ์ง€๋ฅผ ๋‹จ์ผ ๊ฐ€์ž… MessageHandler๋กœ ๋ณด๋‚ธ๋‹ค๋Š” ์ ์—์„œ PublishSubscribeChannel๊ณผ ๋‹ค๋ฆ…๋‹ˆ๋‹ค.

ExecutorChannel

ExecutorChannel์€ DirectChannel๊ณผ ๋™์ผํ•œ ๋””์ŠคํŒจ์ฒ˜ ๊ตฌ์„ฑ(๋ถ€ํ•˜ ๊ท ํ˜• ์ „๋žต ๋ฐ ์žฅ์•  ์กฐ์น˜ ๋ถ€์šธ ์†์„ฑ)์„ ์ง€์›ํ•˜๋Š” ์ง€์ ๊ฐ„ ์ฑ„๋„์ž…๋‹ˆ๋‹ค.

์ด ๋‘ ๋””์ŠคํŒจ์น˜ ์ฑ„๋„ ์œ ํ˜•์˜ ์ฃผ์š” ์ฐจ์ด์ ์€ ExecutorChannel์ด ๋””์ŠคํŒจ์น˜๋ฅผ โ€‹โ€‹์ˆ˜ํ–‰ํ•˜๊ธฐ ์œ„ํ•ด TaskExecutor์˜ ์ธ์Šคํ„ด์Šค์— ์œ„์ž„ํ•œ๋‹ค๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค.

์ฆ‰, send ๋ฉ”์„œ๋“œ๋Š” ์ผ๋ฐ˜์ ์œผ๋กœ ์ฐจ๋‹จํ•˜์ง€ ์•Š์ง€๋งŒ ๋ณด๋‚ธ ์‚ฌ๋žŒ์˜ ์Šค๋ ˆ๋“œ์—์„œ ์ฒ˜๋ฆฌ๊ธฐ ํ˜ธ์ถœ์ด ๋ฐœ์ƒํ•˜์ง€ ์•Š์„ ์ˆ˜๋„ ์žˆ์Œ์„ ์˜๋ฏธํ•ฉ๋‹ˆ๋‹ค.

PartitionedChannel

๋ฒ„์ „ 6.1๋ถ€ํ„ฐ PartitionedChannel ๊ตฌํ˜„์ด ์ œ๊ณต๋ฉ๋‹ˆ๋‹ค. ์ด๊ฒƒ์€ AbstractExecutorChannel์˜ ํ™•์žฅ์ด๋ฉฐ ์ด ์ฑ„๋„๋กœ ์ „์†ก๋œ ๋ฉ”์‹œ์ง€์—์„œ ํ‰๊ฐ€๋œ ํŒŒํ‹ฐ์…˜ ํ‚ค์— ์˜ํ•ด ๊ฒฐ์ •๋˜๋Š” ํŠน์ • ์Šค๋ ˆ๋“œ์—์„œ ์‹ค์ œ ์†Œ๋น„๊ฐ€ ์ฒ˜๋ฆฌ๋˜๋Š” ์ง€์ ๊ฐ„ ๋””์ŠคํŒจ์นญ ๋…ผ๋ฆฌ๋ฅผ ๋‚˜ํƒ€๋ƒ…๋‹ˆ๋‹ค.

์ด ์ฑ„๋„์€ ์œ„์—์„œ ์–ธ๊ธ‰ํ•œ ExecutorChannel๊ณผ ๋น„์Šทํ•˜์ง€๋งŒ ํŒŒํ‹ฐ์…˜ ํ‚ค๊ฐ€ ๊ฐ™์€ ๋ฉ”์‹œ์ง€๋Š” ํ•ญ์ƒ ์ˆœ์„œ๋ฅผ ์œ ์ง€ํ•˜๋ฉด์„œ ๋™์ผํ•œ ์Šค๋ ˆ๋“œ์—์„œ ์ฒ˜๋ฆฌ๋œ๋‹ค๋Š” ์ฐจ์ด์ ์ด ์žˆ์Šต๋‹ˆ๋‹ค.

์™ธ๋ถ€ TaskExecutor๊ฐ€ ํ•„์š”ํ•˜์ง€ ์•Š์ง€๋งŒ ์‚ฌ์šฉ์ž ์ง€์ • ThreadFactory๋กœ ๊ตฌ์„ฑํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์ด ํŒฉํ† ๋ฆฌ๋Š” ๋‹จ์ผ ์Šค๋ ˆ๋“œ ์‹คํ–‰๊ธฐ๋ฅผ ํŒŒํ‹ฐ์…˜๋ณ„๋กœ MessageDispatcher ๋Œ€๋ฆฌ์ž๋กœ ์ฑ„์šฐ๋Š” ๋ฐ ์‚ฌ์šฉ๋ฉ๋‹ˆ๋‹ค. ๊ธฐ๋ณธ์ ์œผ๋กœ IntegrationMessageHeaderAccessor.CORRELATION_ID ๋ฉ”์‹œ์ง€ ํ—ค๋”๊ฐ€ ํŒŒํ‹ฐ์…˜ ํ‚ค๋กœ ์‚ฌ์šฉ๋ฉ๋‹ˆ๋‹ค.

@Bean
PartitionedChannel somePartitionedChannel() {
    return new PartitionedChannel(3, (message) -> message.getHeaders().get("partitionKey"));
}
// ์ฑ„๋„์—๋Š” ์ „์šฉ ์Šค๋ ˆ๋“œ์ธ 3๊ฐœ์˜ ํŒŒํ‹ฐ์…˜์ด ์žˆ์Šต๋‹ˆ๋‹ค. 
// ๋ฉ”์‹œ์ง€๋ฅผ ์ฒ˜๋ฆฌํ•  ํŒŒํ‹ฐ์…˜์„ ๊ฒฐ์ •ํ•˜๊ธฐ ์œ„ํ•ด partitionKey ํ—ค๋”๋ฅผ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค. 

FluxMessageChannel

FluxMessageChannel์€ ๋ณด๋‚ธ ๋ฉ”์‹œ์ง€๋ฅผ ๋‚ด๋ถ€ react.core.publisher.Flux๋กœ "์‹ฑํฌ"ํ•˜๊ธฐ ์œ„ํ•œ org.reactivestreams.Publisher ๊ตฌํ˜„์ž…๋‹ˆ๋‹ค.

Channel Interceptors

๋ฉ”์‹œ์ง• ์•„ํ‚คํ…์ฒ˜์˜ ์žฅ์  ์ค‘ ํ•˜๋‚˜๋Š” ์ผ๋ฐ˜์ ์ธ ๋™์ž‘์„ ์ œ๊ณตํ•˜๊ณ  ์‹œ์Šคํ…œ์„ ํ†ต๊ณผํ•˜๋Š” ๋ฉ”์‹œ์ง€์— ๋Œ€ํ•œ ์˜๋ฏธ ์žˆ๋Š” ์ •๋ณด๋ฅผ ๋น„์นจํˆฌ์ ์ธ ๋ฐฉ์‹์œผ๋กœ ์บก์ฒ˜ํ•˜๋Š” ๊ธฐ๋Šฅ์ž…๋‹ˆ๋‹ค.

Message ์ธ์Šคํ„ด์Šค๋Š” MessageChannel ์ธ์Šคํ„ด์Šค์—์„œ ์†ก์ˆ˜์‹ ๋˜๋ฏ€๋กœ ์ด๋Ÿฌํ•œ ์ฑ„๋„์€ ์†ก์ˆ˜์‹  ์ž‘์—…์„ ๊ฐ€๋กœ์ฑŒ ์ˆ˜ ์žˆ๋Š” ๊ธฐํšŒ๋ฅผ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค.

public interface ChannelInterceptor {

    Message<?> preSend(Message<?> message, MessageChannel channel);

    void postSend(Message<?> message, MessageChannel channel, boolean sent);

    void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex);

    boolean preReceive(MessageChannel channel);

    Message<?> postReceive(Message<?> message, MessageChannel channel);

    void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex);
}

์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ๊ตฌํ˜„ํ•œ ํ›„ ์ธํ„ฐ์…‰ํ„ฐ๋ฅผ ์ฑ„๋„์— ๋“ฑ๋กํ•˜๋Š” ๊ฒƒ์€ ๋‹ค์Œ ํ˜ธ์ถœ์„ ์ˆ˜ํ–‰ํ•˜๊ธฐ๋งŒ ํ•˜๋ฉด ๋ฉ๋‹ˆ๋‹ค.

channel.addInterceptor(someChannelInterceptor);

MessagingTemplate

Spring Integration์€ ์š”์ฒญ ๋ฐ ์‘๋‹ต ์‹œ๋‚˜๋ฆฌ์˜ค๋ฅผ ํฌํ•จํ•˜์—ฌ ๋ฉ”์‹œ์ง€ ์ฑ„๋„์—์„œ ๋‹ค์–‘ํ•œ ์ž‘์—…์„ ์ง€์›ํ•˜๋Š” MessagingTemplate์„ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค.

Example

MessagingTemplate template = new MessagingTemplate();

Message reply = template.sendAndReceive(someChannel, new GenericMessage("test"));

Signatures

public boolean send(final MessageChannel channel, final Message<?> message) { ...
}

public Message<?> sendAndReceive(final MessageChannel channel, final Message<?> request) { ...
}

public Message<?> receive(final PollableChannel<?> channel) { ...
}

Special Channels

ErrorChannel

NullChannel

Reference

Last updated

Was this helpful?