๐Ÿ’ป
Albert's Til
GitHub
  • ๋งค์ผ๋งค์ผ ์กฐ๊ธˆ์”ฉ ์„ฑ์žฅํ•˜๊ธฐ
    • README
    • CS
      • Network
      • HTTP
        • NO-CACHE
      • ์˜ค๋ฅ˜ ์ฝ”๋“œ
      • ORM ๋„๊ตฌ
      • Design Pattern
        • CQRS Pattern
          • Event Sourcing and CQRS pattern
        • Builder Pattern
    • DB
      • MySQL
        • Timeline
        • Pagination
        • Index
        • Database Performance Optimization Strategies
        • B+ tree
        • MySQL Connectors VS MySQL Shell(Scripting) VS MySQL Workbench
        • MySQL Storage Engine Architecture
      • Normalization & Denormalization
      • JPA
        • @Transactional
        • Why JPA?
        • About JPA
        • N+1 Issue
        • Index
        • ElementCollection&CollectionTable
        • orphanRemoval
        • CascadeType
        • Use Subselect
        • Dynamic Instance Creation
        • Paging
        • Order
        • Spefication
        • mappedBy
      • MongoDB
        • ObjectId
      • Why MySQL?
      • ACID properties of transactions
      • Between JPA and JDBC
      • Identifiers in Hibernate/JPA
    • Java
      • Jackson de/serialize
      • Collections.singletonList() vs List.of()
      • Manage dependencies in Gradle
      • Logging Level
      • Bean Validation
      • JVM Internals
        • Threads
          • Frame
        • Shared Between Threads
          • Classloader
            • Class Loader Hierarchy
            • Loading Linking Initialization
      • Java Collection Framework
      • Annotation
      • Generic
      • ๋””๋ฏธํ„ฐ ๋ฒ•์น™
    • Spring
      • Caching
      • Spring Integration Overview
        • ThreadPollTaskExecutor
        • Messaging Bridge
        • Channel Adapter
        • Poller
        • Configuration and @EnableIntegration
        • Message Endpoints
        • Message Channels
      • HATEOAS
      • @Autowired vs Constructor Dependency Injection
      • Spring Security
        • JWT ํ† ํฐ ์‚ฌ์šฉํ•œ ์ธ๊ฐ€
        • OAuth 2 Login
        • OAuth 2 ์ธ์ฆ
        • ์ธ๊ฐ€
        • ์ธ์ฆ
        • PasswordEncoder
      • IoC Container
      • Filter,Interceptor,AOP,Argument Resolver
      • Spring Annotation
      • About Spring
    • Kafka
      • Error Channel
    • Infra
      • Scale Up || Scale Out
      • Docker
        • Dockerfile
        • Docker Hub Deploy
        • Command
      • Cloud ์œ ํ˜•
        • Infrastructure as a Service
        • Platform as a Service
        • Software as a Service
      • ๋ฌด์ค‘๋‹จ ๋ฐฐํฌ
        • ์—”์ง„์—‘์Šค(Nginx)
      • ์ฝ”๋“œ ์ž๋™ ๋ฐฐํฌ
        • Technical
      • AWS EC2
        • PEM(Privacy Enhanced Mail) ํ‚ค
      • AWS RDS
      • AWS S3
    • CodeSquad
      • Spring Boot Project 1์ฃผ์ฐจ ํšŒ๊ณ 
      • Spring Boot Project 2์ฃผ์ฐจ ํšŒ๊ณ 
      • Spirng Boot Project 3์ฃผ์ฐจ ํšŒ๊ณ 
      • Spring Boot Project 4์ฃผ์ฐจ ํšŒ๊ณ 
    • Foody Moody ํ”„๋กœ์ ํŠธ
      • Query Performance Comparison
      • HeartCount Asynchronous Issue
      • DeferredResult
      • ResponseBodyEmitter
      • SseEmitter (Spring)
      • Server-Sent Events (SSE)
      • ๊ธฐ์ˆ  ์Šคํƒ ์ ์šฉ ์ด์œ 
      • NO-CACHE(HTTP)
      • Transactional
    • DDD
      • AggregateId
    • Test
      • RestAssured
    • Coding and Algorithmic Problems
      • 819. Most Common Word
      • 344. Reverse String
      • 125. Valid Palindrome
      • 937. Reorder Data in Log Files
    • Node
      • Async... Await...
      • Custom Transactional Decorator Challenger
    • Python
      • Python Basic Grammar
        • Comments and Input/Output
        • Variable
        • Data type
        • Operations and syntax
        • List,Tuple,Dictionary,Set
        • Function
        • Conditional statement
        • Loop
    • HTML
      • HTML Basic
      • HTML Basic Tags
      • HTML Form Tags
      • HTML Table Tags
    • CSS
      • CSS Basic
      • CSS Practice
Powered by GitBook
On this page
  • Polling Consumer
  • Pollable Message Source
  • Deferred Acknowledgment Pollable Message Source
  • Conditional Pollers for Message Sources
  • "Smart" Polling
  • CompoundTriggerAdvice
  • Reference

Was this helpful?

  1. ๋งค์ผ๋งค์ผ ์กฐ๊ธˆ์”ฉ ์„ฑ์žฅํ•˜๊ธฐ
  2. Spring
  3. Spring Integration Overview

Poller

Polling Consumer

๋ฉ”์‹œ์ง€ ๋์ (์ฑ„๋„ ์–ด๋Œ‘ํ„ฐ)์ด ์ฑ„๋„์— ์—ฐ๊ฒฐ๋˜๊ณ  ์ธ์Šคํ„ด์Šคํ™”๋˜๋ฉด ๋‹ค์Œ ์ธ์Šคํ„ด์Šค ์ค‘ ํ•˜๋‚˜๋ฅผ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค

  • PollingConsumer

    • PollableChannel ์ธํ„ฐํŽ˜์ด์Šค(์˜ˆ: QueueChannel)๋ฅผ ๊ตฌํ˜„ํ•˜๋Š” ์ฑ„๋„์— ์—ฐ๊ฒฐ๋œ ์ฑ„๋„ ์–ด๋Œ‘ํ„ฐ๋Š” PollingConsumer์˜ ์ธ์Šคํ„ด์Šค๋ฅผ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

  • EventDrivenConsumer

    • SubscribableChannel ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ๊ตฌํ˜„ํ•˜๋Š” ์ฑ„๋„์— ์—ฐ๊ฒฐ๋œ ์ฑ„๋„ ์–ด๋Œ‘ํ„ฐ๋Š” EventDrivenConsumer ์ธ์Šคํ„ด์Šค๋ฅผ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

Pollable Message Source

์ธ๋ฐ”์šด๋“œ ์ฑ„๋„ ์–ด๋Œ‘ํ„ฐ๊ฐ€ ์‚ฌ์šฉ๋˜๋Š” ๊ฒฝ์šฐ ์ด๋Ÿฌํ•œ ์–ด๋Œ‘ํ„ฐ๋Š” ์ข…์ข… SourcePollingChannelAdapter๋กœ ๋ž˜ํ•‘๋ฉ๋‹ˆ๋‹ค.

์˜ˆ๋ฅผ ๋“ค์–ด, ์›๊ฒฉ FTP ์„œ๋ฒ„ ์œ„์น˜์—์„œ ๋ฉ”์‹œ์ง€๋ฅผ ๊ฒ€์ƒ‰ํ•  ๋•Œ FTP ์ธ๋ฐ”์šด๋“œ ์ฑ„๋„ ์–ด๋Œ‘ํ„ฐ์— ์„ค๋ช…๋œ ์–ด๋Œ‘ํ„ฐ๋Š” ๋ฉ”์‹œ์ง€๋ฅผ ์ฃผ๊ธฐ์ ์œผ๋กœ ๊ฒ€์ƒ‰ํ•˜๋„๋ก ํด๋Ÿฌ๋กœ ๊ตฌ์„ฑ๋ฉ๋‹ˆ๋‹ค

๊ตฌ์„ฑ ์š”์†Œ๊ฐ€ ํด๋Ÿฌ๋กœ ๊ตฌ์„ฑ๋œ ๊ฒฝ์šฐ ๊ฒฐ๊ณผ ์ธ์Šคํ„ด์Šค๋Š” ๋‹ค์Œ ์œ ํ˜• ์ค‘ ํ•˜๋‚˜์ž…๋‹ˆ๋‹ค.

  • PollingConsumer

  • SourcePollingChannelAdapter

์ด๋Š” ํด๋Ÿฌ๊ฐ€ ์ธ๋ฐ”์šด๋“œ ๋ฐ ์•„์›ƒ๋ฐ”์šด๋“œ ๋ฉ”์‹œ์ง• ์‹œ๋‚˜๋ฆฌ์˜ค ๋ชจ๋‘์—์„œ ์‚ฌ์šฉ๋จ์„ ์˜๋ฏธํ•ฉ๋‹ˆ๋‹ค. ํด๋Ÿฌ๊ฐ€ ์‚ฌ์šฉ๋˜๋Š” ๋ช‡ ๊ฐ€์ง€ ์‚ฌ์šฉ ์‚ฌ๋ก€๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค.

  • FTP ์„œ๋ฒ„, ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค ๋ฐ ์›น ์„œ๋น„์Šค์™€ ๊ฐ™์€ ํŠน์ • ์™ธ๋ถ€ ์‹œ์Šคํ…œ ํด๋ง

  • ๋‚ด๋ถ€(ํด๋ง ๊ฐ€๋Šฅ) ๋ฉ”์‹œ์ง€ ์ฑ„๋„ ํด๋ง

  • ๋‚ด๋ถ€ ์„œ๋น„์Šค ํด๋ง(์˜ˆ: Java ํด๋ž˜์Šค์—์„œ ๋ฐ˜๋ณต์ ์œผ๋กœ ์‹คํ–‰๋˜๋Š” ๋ฉ”์„œ๋“œ)

Deferred Acknowledgment Pollable Message Source

๋ฒ„์ „ 5.0.1๋ถ€ํ„ฐ ํŠน์ • ๋ชจ๋“ˆ์€ ๋‹ค์šด์ŠคํŠธ๋ฆผ ํ๋ฆ„์ด ์™„๋ฃŒ๋  ๋•Œ๊นŒ์ง€(๋˜๋Š” ๋ฉ”์‹œ์ง€๋ฅผ ๋‹ค๋ฅธ ์Šค๋ ˆ๋“œ๋กœ ์ „๋‹ฌ) ์—ฐ๊ธฐ ์Šน์ธ์„ ์ง€์›ํ•˜๋Š” MessageSource ๊ตฌํ˜„์„ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค. ์ด๊ฒƒ์€ ํ˜„์žฌ AmqpMessageSource ๋ฐ KafkaMessageSource๋กœ ์ œํ•œ๋ฉ๋‹ˆ๋‹ค

์ด๋Ÿฌํ•œ ๋ฉ”์‹œ์ง€ ์†Œ์Šค๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK ํ—ค๋”(MessageHeaderAccessor API ์ฐธ์กฐ)๊ฐ€ ๋ฉ”์‹œ์ง€์— ์ถ”๊ฐ€๋ฉ๋‹ˆ๋‹ค.

ํด๋ง ๊ฐ€๋Šฅํ•œ ๋ฉ”์‹œ์ง€ ์†Œ์Šค์™€ ํ•จ๊ป˜ ์‚ฌ์šฉํ•˜๋Š” ๊ฒฝ์šฐ ํ—ค๋” ๊ฐ’์€ ๋‹ค์Œ ์˜ˆ์ œ์™€ ๊ฐ™์ด AcknowledgementCallback์˜ ์ธ์Šคํ„ด์Šค์ž…๋‹ˆ๋‹ค.

@FunctionalInterface
public interface AcknowledgmentCallback {

    void acknowledge(Status status);

    boolean isAcknowledged();

    void noAutoAck();

    default boolean isAutoAck();

    enum Status {

        /**
         * Mark the message as accepted.
         */
        ACCEPT,

        /**
         * Mark the message as rejected.
         */
        REJECT,

        /**
         * Reject the message and requeue so that it will be redelivered.
         */
        REQUEUE

    }

}

Conditional Pollers for Message Sources

"Smart" Polling

๋ฒ„์ „ 5.3์€ ReceiveMessageAdvice ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ๋„์ž…ํ–ˆ์Šต๋‹ˆ๋‹ค. ์ด ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ๊ตฌํ˜„ํ•˜๋Š” ์–ด๋“œ๋ฐ”์ด์Šค ์ฒด์ธ์˜ ๋ชจ๋“  ์–ด๋“œ๋ฐ”์ด์Šค ๊ฐ์ฒด๋Š” receive() ์ž‘์—…์—๋งŒ ์ ์šฉ๋ฉ๋‹ˆ๋‹ค.(MessageSource.receive() ๋ฐ PollableChannel.receive(timeout)) .SourcePollingChannelAdapter ๋˜๋Š” PollingConsumer์—๋งŒ ์ ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด๋Ÿฌํ•œ ํด๋ž˜์Šค๋Š” ๋‹ค์Œ ๋ฉ”์„œ๋“œ๋ฅผ ๊ตฌํ˜„ํ•ฉ๋‹ˆ๋‹ค.

  • beforeReceive(Object source) ์ด ๋ฉ”์„œ๋“œ๋Š” Object.receive() ๋ฉ”์„œ๋“œ ์ „์— ํ˜ธ์ถœ๋ฉ๋‹ˆ๋‹ค. ์ด๋ฅผ ํ†ตํ•ด ์†Œ์Šค๋ฅผ ๊ฒ€์‚ฌํ•˜๊ณ  ์žฌ๊ตฌ์„ฑํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. false๋ฅผ ๋ฐ˜ํ™˜ํ•˜๋ฉด ์ด poll๊ฐ€ ์ทจ์†Œ๋ฉ๋‹ˆ๋‹ค(์•ž์„œ ์–ธ๊ธ‰ํ•œ PollSkipAdvice์™€ ์œ ์‚ฌ).

  • Message afterReceive(Message result, Object source) ์ด ๋ฉ”์†Œ๋“œ๋Š” receive() ๋ฉ”์†Œ๋“œ ์ดํ›„์— ํ˜ธ์ถœ๋ฉ๋‹ˆ๋‹ค.

Thread safety

Advice๊ฐ€ ์†Œ์Šค๋ฅผ ๋ณ€๊ฒฝํ•˜๋Š” ๊ฒฝ์šฐ TaskExecutor๋กœ ํด๋Ÿฌ๋ฅผ ๊ตฌ์„ฑํ•˜๋ฉด ์•ˆ ๋ฉ๋‹ˆ๋‹ค. Advice๊ฐ€ ์†Œ์Šค๋ฅผ ๋ณ€๊ฒฝํ•˜๋Š” ๊ฒฝ์šฐ ์ด๋Ÿฌํ•œ ๋ณ€๊ฒฝ์€ ์Šค๋ ˆ๋“œ๋กœ๋ถ€ํ„ฐ ์•ˆ์ „ํ•˜์ง€ ์•Š์œผ๋ฉฐ ํŠนํžˆ ๊ณ ์ฃผํŒŒ ํด๋Ÿฌ์—์„œ ์˜ˆ๊ธฐ์น˜ ์•Š์€ ๊ฒฐ๊ณผ๋ฅผ ์ดˆ๋ž˜ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ํด๋ง ๊ฒฐ๊ณผ๋ฅผ ๋™์‹œ์— ์ฒ˜๋ฆฌํ•ด์•ผ ํ•˜๋Š” ๊ฒฝ์šฐ ํด๋Ÿฌ์— ์‹คํ–‰๊ธฐ๋ฅผ ์ถ”๊ฐ€ํ•˜๋Š” ๋Œ€์‹  ๋‹ค์šด์ŠคํŠธ๋ฆผ ExecutorChannel์„ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์ด ์ข‹์Šต๋‹ˆ๋‹ค.

Advice Chain Ordering

์ดˆ๊ธฐํ™” ์ค‘์— ์–ด๋“œ๋ฐ”์ด์Šค ์ฒด์ธ์ด ์ฒ˜๋ฆฌ๋˜๋Š” ๋ฐฉ์‹์„ ์ดํ•ดํ•ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. ReceiveMessageAdvice๋ฅผ ๊ตฌํ˜„ํ•˜์ง€ ์•Š๋Š” ์–ด๋“œ๋ฐ”์ด์Šค ๊ฐœ์ฒด๋Š” ์ „์ฒด ํด๋ง ํ”„๋กœ์„ธ์Šค์— ์ ์šฉ๋˜๋ฉฐ ๋ชจ๋‘ ์ˆœ์„œ๋Œ€๋กœ ReceiveMessageAdvice๋ณด๋‹ค ๋จผ์ € ํ˜ธ์ถœ๋ฉ๋‹ˆ๋‹ค. ๊ทธ๋Ÿฌ๊ณ  ReceiveMessageAdvice ๊ฐ์ฒด๊ฐ€ ์†Œ์Šค receive() ๋ฉ”์„œ๋“œ ์ฃผ๋ณ€์—์„œ ์ˆœ์„œ๋Œ€๋กœ ํ˜ธ์ถœ๋ฉ๋‹ˆ๋‹ค.

CompoundTriggerAdvice

ํŠธ๋ฆฌ๊ฑฐ์˜ ๊ธฐ๋ณธ ํŠธ๋ฆฌ๊ฑฐ๋Š” CronTrigger์ผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์–ด๋“œ๋ฐ”์ด์Šค๊ฐ€ ์ˆ˜์‹ ๋œ ๋ฉ”์‹œ์ง€๊ฐ€ ์—†์Œ์„ ๊ฐ์ง€ํ•˜๋ฉด ๋‘ ๋ฒˆ์งธ ํŠธ๋ฆฌ๊ฑฐ๋ฅผ CompoundTrigger์— ์ถ”๊ฐ€ํ•ฉ๋‹ˆ๋‹ค.CompoundTrigger ์ธ์Šคํ„ด์Šค์˜ nextExecutionTime ๋ฉ”์„œ๋“œ๊ฐ€ ํ˜ธ์ถœ๋˜๋ฉด ๋ณด์กฐ ํŠธ๋ฆฌ๊ฑฐ(์žˆ๋Š” ๊ฒฝ์šฐ)์— ์œ„์ž„ํ•ฉ๋‹ˆ๋‹ค. ๊ทธ๋ ‡์ง€ ์•Š์œผ๋ฉด ๊ธฐ๋ณธ ํŠธ๋ฆฌ๊ฑฐ์— ์œ„์ž„ํ•ฉ๋‹ˆ๋‹ค.

Reference

Last updated 1 year ago

Was this helpful?

Core Messaging
Logo