๐Ÿ’ป
Albert's Til
GitHub
  • ๋งค์ผ๋งค์ผ ์กฐ๊ธˆ์”ฉ ์„ฑ์žฅํ•˜๊ธฐ
    • README
    • CS
      • Network
      • HTTP
        • NO-CACHE
      • ์˜ค๋ฅ˜ ์ฝ”๋“œ
      • ORM ๋„๊ตฌ
      • Design Pattern
        • CQRS Pattern
          • Event Sourcing and CQRS pattern
        • Builder Pattern
    • DB
      • MySQL
        • Timeline
        • Pagination
        • Index
        • Database Performance Optimization Strategies
        • B+ tree
        • MySQL Connectors VS MySQL Shell(Scripting) VS MySQL Workbench
        • MySQL Storage Engine Architecture
      • Normalization & Denormalization
      • JPA
        • @Transactional
        • Why JPA?
        • About JPA
        • N+1 Issue
        • Index
        • ElementCollection&CollectionTable
        • orphanRemoval
        • CascadeType
        • Use Subselect
        • Dynamic Instance Creation
        • Paging
        • Order
        • Spefication
        • mappedBy
      • MongoDB
        • ObjectId
      • Why MySQL?
      • ACID properties of transactions
      • Between JPA and JDBC
      • Identifiers in Hibernate/JPA
    • Java
      • Jackson de/serialize
      • Collections.singletonList() vs List.of()
      • Manage dependencies in Gradle
      • Logging Level
      • Bean Validation
      • JVM Internals
        • Threads
          • Frame
        • Shared Between Threads
          • Classloader
            • Class Loader Hierarchy
            • Loading Linking Initialization
      • Java Collection Framework
      • Annotation
      • Generic
      • ๋””๋ฏธํ„ฐ ๋ฒ•์น™
    • Spring
      • Caching
      • Spring Integration Overview
        • ThreadPollTaskExecutor
        • Messaging Bridge
        • Channel Adapter
        • Poller
        • Configuration and @EnableIntegration
        • Message Endpoints
        • Message Channels
      • HATEOAS
      • @Autowired vs Constructor Dependency Injection
      • Spring Security
        • JWT ํ† ํฐ ์‚ฌ์šฉํ•œ ์ธ๊ฐ€
        • OAuth 2 Login
        • OAuth 2 ์ธ์ฆ
        • ์ธ๊ฐ€
        • ์ธ์ฆ
        • PasswordEncoder
      • IoC Container
      • Filter,Interceptor,AOP,Argument Resolver
      • Spring Annotation
      • About Spring
    • Kafka
      • Error Channel
    • Infra
      • Scale Up || Scale Out
      • Docker
        • Dockerfile
        • Docker Hub Deploy
        • Command
      • Cloud ์œ ํ˜•
        • Infrastructure as a Service
        • Platform as a Service
        • Software as a Service
      • ๋ฌด์ค‘๋‹จ ๋ฐฐํฌ
        • ์—”์ง„์—‘์Šค(Nginx)
      • ์ฝ”๋“œ ์ž๋™ ๋ฐฐํฌ
        • Technical
      • AWS EC2
        • PEM(Privacy Enhanced Mail) ํ‚ค
      • AWS RDS
      • AWS S3
    • CodeSquad
      • Spring Boot Project 1์ฃผ์ฐจ ํšŒ๊ณ 
      • Spring Boot Project 2์ฃผ์ฐจ ํšŒ๊ณ 
      • Spirng Boot Project 3์ฃผ์ฐจ ํšŒ๊ณ 
      • Spring Boot Project 4์ฃผ์ฐจ ํšŒ๊ณ 
    • Foody Moody ํ”„๋กœ์ ํŠธ
      • Query Performance Comparison
      • HeartCount Asynchronous Issue
      • DeferredResult
      • ResponseBodyEmitter
      • SseEmitter (Spring)
      • Server-Sent Events (SSE)
      • ๊ธฐ์ˆ  ์Šคํƒ ์ ์šฉ ์ด์œ 
      • NO-CACHE(HTTP)
      • Transactional
    • DDD
      • AggregateId
    • Test
      • RestAssured
    • Coding and Algorithmic Problems
      • 819. Most Common Word
      • 344. Reverse String
      • 125. Valid Palindrome
      • 937. Reorder Data in Log Files
    • Node
      • Async... Await...
      • Custom Transactional Decorator Challenger
    • Python
      • Python Basic Grammar
        • Comments and Input/Output
        • Variable
        • Data type
        • Operations and syntax
        • List,Tuple,Dictionary,Set
        • Function
        • Conditional statement
        • Loop
    • HTML
      • HTML Basic
      • HTML Basic Tags
      • HTML Form Tags
      • HTML Table Tags
    • CSS
      • CSS Basic
      • CSS Practice
Powered by GitBook
On this page
  • The MessageChannel Interface
  • PollableChannel
  • SubscribableChannel
  • Message Channel Implementations
  • PublishSubscribeChannel
  • QueueChannel
  • PriorityChannel
  • RendezvousChannel
  • DirectChannel
  • ExecutorChannel
  • PartitionedChannel
  • FluxMessageChannel
  • Channel Interceptors
  • MessagingTemplate
  • Signatures
  • Special Channels
  • ErrorChannel
  • NullChannel
  • Reference

Was this helpful?

  1. ๋งค์ผ๋งค์ผ ์กฐ๊ธˆ์”ฉ ์„ฑ์žฅํ•˜๊ธฐ
  2. Spring
  3. Spring Integration Overview

Message Channels

The MessageChannel Interface

public interface MessageChannel {

    boolean send(Message message);

    boolean send(Message message, long timeout);
}

๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ผ ๋•Œ ๋ฉ”์‹œ์ง€๊ฐ€ ์„ฑ๊ณต์ ์œผ๋กœ ์ „์†ก๋˜๋ฉด ๋ฐ˜ํ™˜ ๊ฐ’์€ true์ž…๋‹ˆ๋‹ค. ๋ณด๋‚ด๊ธฐ ํ˜ธ์ถœ์ด ์‹œ๊ฐ„ ์ดˆ๊ณผ๋˜๊ฑฐ๋‚˜ ์ค‘๋‹จ๋˜๋ฉด false๋ฅผ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค.

PollableChannel

๋ฉ”์‹œ์ง€ ์ฑ„๋„์€ ๋ฉ”์‹œ์ง€๋ฅผ ๋ฒ„ํผ๋งํ•˜๊ฑฐ๋‚˜ ๋ฒ„ํผ๋งํ•˜์ง€ ์•Š์„ ์ˆ˜ ์žˆ์œผ๋ฏ€๋กœ ๋‘ ๊ฐœ์˜ ํ•˜์œ„ ์ธํ„ฐํŽ˜์ด์Šค๊ฐ€ ๋ฒ„ํผ๋ง(ํด๋ง ๊ฐ€๋Šฅ) ๋ฐ ๋น„๋ฒ„ํผ๋ง(๊ตฌ๋… ๊ฐ€๋Šฅ) ์ฑ„๋„ ๋™์ž‘์„ ์ •์˜ํ•ฉ๋‹ˆ๋‹ค

public interface PollableChannel extends MessageChannel {

    Message<?> receive();

    Message<?> receive(long timeout);

}

SubscribableChannel

SubscribableChannel ๊ธฐ๋ณธ ์ธํ„ฐํŽ˜์ด์Šค๋Š” ๊ตฌ๋…๋œ MessageHandler ์ธ์Šคํ„ด์Šค์— ๋ฉ”์‹œ์ง€๋ฅผ ์ง์ ‘ ๋ณด๋‚ด๋Š” ์ฑ„๋„์— ์˜ํ•ด ๊ตฌํ˜„๋ฉ๋‹ˆ๋‹ค. ๋”ฐ๋ผ์„œ ํด๋ง์„ ์œ„ํ•œ ์ˆ˜์‹  ๋ฐฉ๋ฒ•์„ ์ œ๊ณตํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค. ๋Œ€์‹  ์ด๋Ÿฌํ•œ ๊ตฌ๋…์ž๋ฅผ ๊ด€๋ฆฌํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ์ •์˜ํ•ฉ๋‹ˆ๋‹ค.

public interface SubscribableChannel extends MessageChannel {

    boolean subscribe(MessageHandler handler);

    boolean unsubscribe(MessageHandler handler);

}

Message Channel Implementations

PublishSubscribeChannel

PublishSubscribeChannel ๊ตฌํ˜„์€ ์ „์†ก๋œ ๋ชจ๋“  ๋ฉ”์‹œ์ง€๋ฅผ ๊ตฌ๋…ํ•œ ๋ชจ๋“  ํ•ธ๋“ค๋Ÿฌ๋กœ ๋ธŒ๋กœ๋“œ์บ์ŠคํŠธํ•ฉ๋‹ˆ๋‹ค.

QueueChannel

QueueChannel ๊ตฌํ˜„์€ queue๋กœ ๋ž˜ํ•‘ํ•ฉ๋‹ˆ๋‹ค. PublishSubscribeChannel๊ณผ ๋‹ฌ๋ฆฌ QueueChannel์—๋Š” point-to-point semantics๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค. ์ฆ‰, ์ฑ„๋„์— ์—ฌ๋Ÿฌ ์†Œ๋น„์ž๊ฐ€ ์žˆ๋”๋ผ๋„ ๊ทธ ์ค‘ ํ•˜๋‚˜๋งŒ ํ•ด๋‹น ์ฑ„๋„๋กœ ์ „์†ก๋œ ๋ฉ”์‹œ์ง€๋ฅผ ์ˆ˜์‹ ํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

์ธ์ˆ˜๊ฐ€ ์—†๋Š” ๊ธฐ๋ณธ ์ƒ์„ฑ์ž(๋ณธ์งˆ์ ์œผ๋กœ ์ œํ•œ๋˜์ง€ ์•Š์€ Integer.MAX_VALUE ์šฉ๋Ÿ‰ ์ œ๊ณต)์™€ ๋‹ค์Œ ๋ชฉ๋ก๊ณผ ๊ฐ™์ด ๋Œ€๊ธฐ์—ด ์šฉ๋Ÿ‰์„ ํ—ˆ์šฉํ•˜๋Š” ์ƒ์„ฑ์ž๋ฅผ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค.

public QueueChannel(int capacity)

PriorityChannel

QueueChannel์€ ์„ ์ž…์„ ์ถœ(FIFO) ์ˆœ์„œ๋ฅผ ์ ์šฉํ•˜๋Š” ๋ฐ˜๋ฉด, PriorityChannel์€ ์šฐ์„ ์ˆœ์œ„์— ๋”ฐ๋ผ ์ฑ„๋„ ๋‚ด์—์„œ ๋ฉ”์‹œ์ง€๋ฅผ ์ •๋ ฌํ•  ์ˆ˜ ์žˆ๋Š” ๋Œ€์ฒด ๊ตฌํ˜„์ž…๋‹ˆ๋‹ค

๊ธฐ๋ณธ์ ์œผ๋กœ ์šฐ์„  ์ˆœ์œ„๋Š” ๊ฐ ๋ฉ”์‹œ์ง€ ๋‚ด์˜ ์šฐ์„  ์ˆœ์œ„ ํ—ค๋”์— ์˜ํ•ด ๊ฒฐ์ •๋ฉ๋‹ˆ๋‹ค.

RendezvousChannel

RendezvousChannel์€ ๋‹ค๋ฅธ ๋‹น์‚ฌ์ž๊ฐ€ ์ฑ„๋„์˜ receive() ๋ฉ”์„œ๋“œ๋ฅผ ํ˜ธ์ถœํ•  ๋•Œ๊นŒ์ง€ ๋ฐœ์‹ ์ž๊ฐ€ ์ฐจ๋‹จํ•˜๋Š” "์ง์ ‘ ํ•ธ๋“œ์˜คํ”„" ์‹œ๋‚˜๋ฆฌ์˜ค๋ฅผ ๊ฐ€๋Šฅํ•˜๊ฒŒ ํ•ฉ๋‹ˆ๋‹ค

DirectChannel

DirectChannel์—๋Š” ์ง€์  ๊ฐ„ ์˜๋ฏธ ์ฒด๊ณ„๊ฐ€ ์žˆ์ง€๋งŒ ๊ทธ ์™ธ์—๋Š” ์•ž์—์„œ ์„ค๋ช…ํ•œ ๋Œ€๊ธฐ์—ด ๊ธฐ๋ฐ˜ ์ฑ„๋„ ๊ตฌํ˜„๋ณด๋‹ค PublishSubscribeChannel๊ณผ ๋” ์œ ์‚ฌํ•ฉ๋‹ˆ๋‹ค

PollableChannel ์ธํ„ฐํŽ˜์ด์Šค ๋Œ€์‹  SubscribableChannel ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ๊ตฌํ˜„ํ•˜๋ฏ€๋กœ ๋ฉ”์‹œ์ง€๋ฅผ ๊ตฌ๋…์ž์—๊ฒŒ ์ง์ ‘ ๋ฐœ์†กํ•ฉ๋‹ˆ๋‹ค. ๊ทธ๋Ÿฌ๋‚˜ ์ ๋Œ€์  ์ฑ„๋„๋กœ์„œ ๊ฐ ๋ฉ”์‹œ์ง€๋ฅผ ๋‹จ์ผ ๊ฐ€์ž… MessageHandler๋กœ ๋ณด๋‚ธ๋‹ค๋Š” ์ ์—์„œ PublishSubscribeChannel๊ณผ ๋‹ค๋ฆ…๋‹ˆ๋‹ค.

ExecutorChannel

ExecutorChannel์€ DirectChannel๊ณผ ๋™์ผํ•œ ๋””์ŠคํŒจ์ฒ˜ ๊ตฌ์„ฑ(๋ถ€ํ•˜ ๊ท ํ˜• ์ „๋žต ๋ฐ ์žฅ์•  ์กฐ์น˜ ๋ถ€์šธ ์†์„ฑ)์„ ์ง€์›ํ•˜๋Š” ์ง€์ ๊ฐ„ ์ฑ„๋„์ž…๋‹ˆ๋‹ค.

์ด ๋‘ ๋””์ŠคํŒจ์น˜ ์ฑ„๋„ ์œ ํ˜•์˜ ์ฃผ์š” ์ฐจ์ด์ ์€ ExecutorChannel์ด ๋””์ŠคํŒจ์น˜๋ฅผ โ€‹โ€‹์ˆ˜ํ–‰ํ•˜๊ธฐ ์œ„ํ•ด TaskExecutor์˜ ์ธ์Šคํ„ด์Šค์— ์œ„์ž„ํ•œ๋‹ค๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค.

์ฆ‰, send ๋ฉ”์„œ๋“œ๋Š” ์ผ๋ฐ˜์ ์œผ๋กœ ์ฐจ๋‹จํ•˜์ง€ ์•Š์ง€๋งŒ ๋ณด๋‚ธ ์‚ฌ๋žŒ์˜ ์Šค๋ ˆ๋“œ์—์„œ ์ฒ˜๋ฆฌ๊ธฐ ํ˜ธ์ถœ์ด ๋ฐœ์ƒํ•˜์ง€ ์•Š์„ ์ˆ˜๋„ ์žˆ์Œ์„ ์˜๋ฏธํ•ฉ๋‹ˆ๋‹ค.

PartitionedChannel

๋ฒ„์ „ 6.1๋ถ€ํ„ฐ PartitionedChannel ๊ตฌํ˜„์ด ์ œ๊ณต๋ฉ๋‹ˆ๋‹ค. ์ด๊ฒƒ์€ AbstractExecutorChannel์˜ ํ™•์žฅ์ด๋ฉฐ ์ด ์ฑ„๋„๋กœ ์ „์†ก๋œ ๋ฉ”์‹œ์ง€์—์„œ ํ‰๊ฐ€๋œ ํŒŒํ‹ฐ์…˜ ํ‚ค์— ์˜ํ•ด ๊ฒฐ์ •๋˜๋Š” ํŠน์ • ์Šค๋ ˆ๋“œ์—์„œ ์‹ค์ œ ์†Œ๋น„๊ฐ€ ์ฒ˜๋ฆฌ๋˜๋Š” ์ง€์ ๊ฐ„ ๋””์ŠคํŒจ์นญ ๋…ผ๋ฆฌ๋ฅผ ๋‚˜ํƒ€๋ƒ…๋‹ˆ๋‹ค.

์ด ์ฑ„๋„์€ ์œ„์—์„œ ์–ธ๊ธ‰ํ•œ ExecutorChannel๊ณผ ๋น„์Šทํ•˜์ง€๋งŒ ํŒŒํ‹ฐ์…˜ ํ‚ค๊ฐ€ ๊ฐ™์€ ๋ฉ”์‹œ์ง€๋Š” ํ•ญ์ƒ ์ˆœ์„œ๋ฅผ ์œ ์ง€ํ•˜๋ฉด์„œ ๋™์ผํ•œ ์Šค๋ ˆ๋“œ์—์„œ ์ฒ˜๋ฆฌ๋œ๋‹ค๋Š” ์ฐจ์ด์ ์ด ์žˆ์Šต๋‹ˆ๋‹ค.

์™ธ๋ถ€ TaskExecutor๊ฐ€ ํ•„์š”ํ•˜์ง€ ์•Š์ง€๋งŒ ์‚ฌ์šฉ์ž ์ง€์ • ThreadFactory๋กœ ๊ตฌ์„ฑํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

์ด ํŒฉํ† ๋ฆฌ๋Š” ๋‹จ์ผ ์Šค๋ ˆ๋“œ ์‹คํ–‰๊ธฐ๋ฅผ ํŒŒํ‹ฐ์…˜๋ณ„๋กœ MessageDispatcher ๋Œ€๋ฆฌ์ž๋กœ ์ฑ„์šฐ๋Š” ๋ฐ ์‚ฌ์šฉ๋ฉ๋‹ˆ๋‹ค. ๊ธฐ๋ณธ์ ์œผ๋กœ IntegrationMessageHeaderAccessor.CORRELATION_ID ๋ฉ”์‹œ์ง€ ํ—ค๋”๊ฐ€ ํŒŒํ‹ฐ์…˜ ํ‚ค๋กœ ์‚ฌ์šฉ๋ฉ๋‹ˆ๋‹ค.

@Bean
PartitionedChannel somePartitionedChannel() {
    return new PartitionedChannel(3, (message) -> message.getHeaders().get("partitionKey"));
}
// ์ฑ„๋„์—๋Š” ์ „์šฉ ์Šค๋ ˆ๋“œ์ธ 3๊ฐœ์˜ ํŒŒํ‹ฐ์…˜์ด ์žˆ์Šต๋‹ˆ๋‹ค. 
// ๋ฉ”์‹œ์ง€๋ฅผ ์ฒ˜๋ฆฌํ•  ํŒŒํ‹ฐ์…˜์„ ๊ฒฐ์ •ํ•˜๊ธฐ ์œ„ํ•ด partitionKey ํ—ค๋”๋ฅผ ์‚ฌ์šฉํ•ฉ๋‹ˆ๋‹ค. 

FluxMessageChannel

FluxMessageChannel์€ ๋ณด๋‚ธ ๋ฉ”์‹œ์ง€๋ฅผ ๋‚ด๋ถ€ react.core.publisher.Flux๋กœ "์‹ฑํฌ"ํ•˜๊ธฐ ์œ„ํ•œ org.reactivestreams.Publisher ๊ตฌํ˜„์ž…๋‹ˆ๋‹ค.

Channel Interceptors

๋ฉ”์‹œ์ง• ์•„ํ‚คํ…์ฒ˜์˜ ์žฅ์  ์ค‘ ํ•˜๋‚˜๋Š” ์ผ๋ฐ˜์ ์ธ ๋™์ž‘์„ ์ œ๊ณตํ•˜๊ณ  ์‹œ์Šคํ…œ์„ ํ†ต๊ณผํ•˜๋Š” ๋ฉ”์‹œ์ง€์— ๋Œ€ํ•œ ์˜๋ฏธ ์žˆ๋Š” ์ •๋ณด๋ฅผ ๋น„์นจํˆฌ์ ์ธ ๋ฐฉ์‹์œผ๋กœ ์บก์ฒ˜ํ•˜๋Š” ๊ธฐ๋Šฅ์ž…๋‹ˆ๋‹ค.

Message ์ธ์Šคํ„ด์Šค๋Š” MessageChannel ์ธ์Šคํ„ด์Šค์—์„œ ์†ก์ˆ˜์‹ ๋˜๋ฏ€๋กœ ์ด๋Ÿฌํ•œ ์ฑ„๋„์€ ์†ก์ˆ˜์‹  ์ž‘์—…์„ ๊ฐ€๋กœ์ฑŒ ์ˆ˜ ์žˆ๋Š” ๊ธฐํšŒ๋ฅผ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค.

public interface ChannelInterceptor {

    Message<?> preSend(Message<?> message, MessageChannel channel);

    void postSend(Message<?> message, MessageChannel channel, boolean sent);

    void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex);

    boolean preReceive(MessageChannel channel);

    Message<?> postReceive(Message<?> message, MessageChannel channel);

    void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex);
}

์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ๊ตฌํ˜„ํ•œ ํ›„ ์ธํ„ฐ์…‰ํ„ฐ๋ฅผ ์ฑ„๋„์— ๋“ฑ๋กํ•˜๋Š” ๊ฒƒ์€ ๋‹ค์Œ ํ˜ธ์ถœ์„ ์ˆ˜ํ–‰ํ•˜๊ธฐ๋งŒ ํ•˜๋ฉด ๋ฉ๋‹ˆ๋‹ค.

channel.addInterceptor(someChannelInterceptor);

MessagingTemplate

Spring Integration์€ ์š”์ฒญ ๋ฐ ์‘๋‹ต ์‹œ๋‚˜๋ฆฌ์˜ค๋ฅผ ํฌํ•จํ•˜์—ฌ ๋ฉ”์‹œ์ง€ ์ฑ„๋„์—์„œ ๋‹ค์–‘ํ•œ ์ž‘์—…์„ ์ง€์›ํ•˜๋Š” MessagingTemplate์„ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค.

Example

MessagingTemplate template = new MessagingTemplate();

Message reply = template.sendAndReceive(someChannel, new GenericMessage("test"));

Signatures

public boolean send(final MessageChannel channel, final Message<?> message) { ...
}

public Message<?> sendAndReceive(final MessageChannel channel, final Message<?> request) { ...
}

public Message<?> receive(final PollableChannel<?> channel) { ...
}

Special Channels

ErrorChannel

NullChannel

Reference

Last updated 1 year ago

Was this helpful?

Core Messaging
Logo