Spring Cloud Stream

์Šคํ”„๋ง ํด๋ผ์šฐ๋“œ ์ŠคํŠธ๋ฆผ ์†Œ๊ฐœ

์Šคํ”„๋ง ํด๋ผ์šฐ๋“œ ์ŠคํŠธ๋ฆผ ํ”„๋กœ์ ํŠธ๋Š” ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์— ๋ฉ”์‹œ์ง€ ๋ฐœํ–‰์ž์™€ ์†Œ๋น„์ž๋ฅผ ์‰ฝ๊ฒŒ ๊ตฌ์ถ•ํ•  ์ˆ˜ ์žˆ๋Š” ์—๋„ˆํ…Œ์ด์…˜ ๊ธฐ๋ฐ˜ ํ”„๋ ˆ์ž„์›Œํฌ.

์‚ฌ์šฉํ•˜๋Š” ์‹œ์ง• ํ”Œ๋žซํผ์˜ ๊ตฌํ˜„ ์„ธ๋ถ€ ์‚ฌํ•ญ์„ ์ถ”์ƒํ™”ํ•œ๋‹ค. ์—ฌ๋Ÿฌ ๋ฉ”์‹œ์ง€ ํ”Œ๋žซํผ์ด ์Šคํ”„๋ง ํด๋ผ์šฐ๋“œ ์ŠคํŠธ๋ฆผ๊ณผ ์‚ฌ์šฉ๋  ์ˆ˜ ์žˆ์œผ๋ฉฐ, ํŠน์ • ํ”Œ๋žซํผ์„ ์œ„ํ•œ ์„ธ๋ถ€ ๊ตฌํ˜„์€ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ์ฝ”๋“œ์™€ ๋ถ„๋ฆฌ๋œ๋‹ค. ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์—์„œ ๋ฉ”์‹œ์ง€ ๋ฐœํ–‰๊ณผ ์†Œ๋น„ ๊ตฌํ˜„์€ ํ”Œ๋žซํผ ์ค‘๋ฆฝ์ ์ธ ์Šคํ”„๋ง ์ธํ„ฐํŽ˜์ด์Šค๋กœ ์ˆ˜ํ–‰๋œ๋‹ค.

์Šคํ”„๋ง ํด๋ผ์šฐ๋“œ ์ŠคํŠธ๋ฆผ ์•„ํ‚คํ…์ฒ˜

๋ฉ”์‹œ์ง•์„ ์ด์šฉํ•œ ๋‘ ๋น„์Šค์˜ ํ†ต์‹  ๊ด€์ ์—์„œ ์Šคํ”„๋ง ํด๋ผ์šฐ๋“œ ์ŠคํŠธ๋ฆผ ์•„ํ‚คํ…์ฒ˜๋ฅผ ์‚ดํŽด๋ณด๋ฉฐ ๋…ผ์˜๋ฅผ ์‹œ์ž‘ํ•ด ๋ณด์ž. ํ•œ ์„œ๋น„์Šค๊ฐ€ ๋ฉ”์‹œ์ง€ ๋ฐœํ–‰์ž(publisher)๊ฐ€ ๋˜๊ณ , ๋‹ค๋ฅธ ์„œ๋น„์Šค๋Š” ๋ฉ”์‹œ์ง€ ์†Œ๋น„์ž(consumer)๊ฐ€ ๋œ๋‹ค. ์•„๋ž˜ ๊ทธ๋ฆผ์—์„œ ์Šคํ”„๋ง ํด๋ผ์šฐ๋“œ ์ŠคํŠธ๋ฆผ์ด ์–ด๋–ป๊ฒŒ ๋ฉ”์‹œ์ง€๋ฅผ ์ˆ˜์›”ํ•˜๊ฒŒ ์ „๋‹ฌํ•˜๋Š”์ง€ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

์Šคํ”„๋ง ํด๋ผ์šฐ๋“œ์—์„œ ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐœํ–‰ํ•˜๊ณ  ์†Œ๋น„ํ•˜๋Š” ๋ฐ ๋‹ค์Œ 4๊ฐœ์˜ ์ปดํฌ๋„ŒํŠธ๊ฐ€ ๊ด€๋ จ๋˜์–ด ์žˆ๋‹ค.

  • ์†Œ์Šค

  • ์ฑ„๋„

  • ๋ฐ”์ธ๋”

  • ์‹ฑํฌ

8-3 ๋ฉ”์‹œ์ง€๊ฐ€ ๋ฐœํ–‰๋˜๊ณ  ์†Œ๋น„๋  ๋•Œ ํ•˜๋ถ€ ๋ฉ”์‹œ์ง• ํ”Œ๋žซํผ์„ ์ถ”์ƒํ™”ํ•˜๋Š” ์Šคํ”„๋ง ํด๋ผ์šฐ๋“œ ์ŠคํŠธ๋ฆผ ์ปดํฌ๋„ŒํŠธ๋“ค์„ ํ†ต๊ณผํ•œ๋‹ค.

์†Œ์Šค(SOURCE)

์„œ๋น„์Šค๊ฐ€ ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐœํ–‰ํ•  ์ค€๋น„๊ฐ€ ๋˜๋ฉด ์†Œ์Šค๋ฅผ ์‚ฌ์šฉํ•ด ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐœํ–‰ํ•œ๋‹ค. ์†Œ์Šค๋Š” ๋ฐœํ–‰๋  ๋ฉ”์‹œ์ง€๋ฅผ ํ‘œํ˜„ํ•˜๋Š” POJO๋ฅผ ์ „๋‹ฌ๋ฐ›๋Š” ์Šคํ”„๋ง์˜ ์• ๋„ˆํ…Œ์ด์…˜ ์ธํ„ฐํŽ˜์ด์Šค๋‹ค. ์†Œ์Šค๋Š” ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐ›์•„ ์ง๋ ฌํ™”(๊ธฐ๋ณธ ์ง๋ ฌํ™” ์„ค์ •์€ JSON)ํ•˜๊ณ  ๋ฉ”์‹œ์ง€๋ฅผ ์ฑ„๋„๋กœ ๋ฐœํ–‰ํ•œ๋‹ค.

์ฑ„๋„(CHANNEL)

๋ฉ”์‹œ์ง€ ์ƒ์‚ฐ์ž์™€ ์†Œ๋น„์ž๊ฐ€ ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐœํ–‰ํ•˜๊ฑฐ๋‚˜ ์†Œ๋น„ํ•œ ํ›„ ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๊ด€ํ•  ํ๋ฅผ ์ถ”์ƒํ™”ํ•œ ๊ฒƒ. ์ฑ„๋„ ์ด๋ฆ„์€ ํ•ญ์ƒ ๋Œ€์ƒ ํ์˜ ์ด๋ฆ„๊ณผ ๊ด€๋ จ์ด ์žˆ์ง€๋งŒ ์ฝ”๋“œ์—์„œ๋Š” ํ ์ด๋ฆ„์„ ์ง์ ‘ ์‚ฌ์šฉํ•˜์ง€ ์•Š๊ณ , ์ฑ„๋„ ์ด๋ฆ„์„ ์‚ฌ์šฉํ•œ๋‹ค. ๋”ฐ๋ผ์„œ ์ฑ„๋„์ด ์ฝ๊ฑฐ๋‚˜ ์“ฐ๋Š” ํ๋ฅผ ์ „ํ™˜ํ•˜๋ ค๋ฉด ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ์ฝ”๋“œ๊ฐ€ ์•„๋‹Œ ๊ตฌ์„ฑ ์ •๋ณด๋ฅผ ๋ณ€๊ฒฝํ•œ๋‹ค.

๋ฐ”์ธ๋”(BINDER)

์Šคํ”„๋ง ํด๋ผ์šฐ๋“œ ์ŠคํŠธ๋ฆผ ํ”„๋ ˆ์ž„์›Œํฌ์˜ ์ผ๋ถ€์ธ ์Šคํ”„๋ง ์ฝ”๋“œ๋กœ ํŠน์ • ๋ฉ”์‹œ์ง€ ํ”Œ๋žซํผ๊ณผ ํ†ต์‹ ํ•œ๋‹ค. ์Šคํ”„๋ง ํด๋ผ์šฐ๋“œ ์ŠคํŠธ๋ฆผ์˜ ๋ฐ”์ธ๋”๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐœํ–‰ํ•˜๊ณ  ์†Œ๋น„ํ•˜๊ธฐ ์œ„ํ•ด ํ”Œ๋žซํผ๋งˆ๋‹ค ๋ณ„๋„์˜ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ์™€ API๋ฅผ ์ œ๊ณตํ•˜์ง€ ์•Š๊ณ ๋„ ๋ฉ”์‹œ์ง•์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.

์‹ฑํฌ(SINK)

์Šคํ”„๋ง ํด๋ผ์šฐ๋“œ ์ŠคํŠธ๋ฆผ์—์„œ ์„œ๋น„์Šค๋Š” ์‹ฑํฌ๋ฅผ ์‚ฌ์šฉํ•ด ํ์—์„œ ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐ›๋Š”๋‹ค. ์‹ฑํฌ๋Š” ๋“ค์–ด์˜ค๋Š” ๋ฉ”์‹œ์ง€๋ฅผ ์œ„ํ•ด ์ฑ„๋„์„ ์ˆ˜์‹  ๋Œ€๊ธฐํ•˜๊ณ , ๋ฉ”์‹œ์ง€๋ฅผ ๋‹ค์‹œ POJO๋กœ ์—ญ์ง๋ ฌํ™”ํ•œ๋‹ค. ์ด ๊ณผ์ •์—์„œ ์Šคํ”„๋ง ์„œ๋น„์Šค์˜ ๋น„์ฆˆ๋‹ˆ์Šค ๋กœ์ง์ด ๋ฉ”์‹œ์ง€๋ฅผ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋‹ค.

๊ฐ„๋‹จํ•œ ๋ฉ”์‹œ์ง€ Publisher์™€ Consumer ์ž‘์„ฑ

๋‹ค์Œ ์˜ˆ์ œ์—์„  ์กฐ์ง ์„œ๋น„์Šค๊ฐ€ ๋ผ์ด์„ ์‹ฑ ์„œ๋น„์Šค๋กœ ๋ฉ”์‹œ์ง€๋ฅผ ์ „๋‹ฌํ•œ๋‹ค. ๋ผ์ด์„ ์‹ฑ ์„œ๋น„์Šค๊ฐ€ ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐ›์„ ๋•Œ ํ•˜๋Š” ์ผ์€ ๋กœ๊ทธ ๋ฉ”์‹œ์ง€๋ฅผ ์ฝ˜์†”์— ์ถœ๋ ฅํ•˜๋Š” ๊ฒƒ๋ฟ์ด๋‹ค.

์ด ์ œ์—์„  ํ•˜๋‚˜์˜ ์Šคํ”„๋ง ํด๋ผ์šฐ๋“œ ์ŠคํŠธ๋ฆผ์˜ ์†Œ์Šค(๋ฉ”์‹œ์ง€ ์ƒ์‚ฐ์ž)์™€ ์‹ฑํฌ(๋ฉ”์‹œ์ง€ ์†Œ๋น„์ž)๋งŒ ์‚ฌ์šฉํ•˜๋ฏ€๋กœ, ์กฐ์ง ์„œ๋น„์Šค์˜ ์†Œ์Šค์™€ ๋ผ์ด์„ ์‹ฑ ์„œ๋น„์Šค์˜ ์‹ฑํฌ ์„ค์ •์„ ์†์‰ฝ๊ฒŒ ํ•ด ์ฃผ๋Š” ์Šคํ”„๋ง ํด๋ผ์šฐ๋“œ ์˜ˆ์ œ๋ฅผ ์‹œ์ž‘ํ•  ๊ฒƒ์ด๋‹ค.

8-4 ์กฐ์ง ์ดํ„ฐ๊ฐ€ ๋ณ€๊ฒฝ๋˜๋ฉด ์นดํ”„์นด์— ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐœํ–‰ํ•œ๋‹ค

1. ์กฐ์ง ์„œ๋น„์Šค์˜ ๋ฉ”์‹œ์ง€ ์ƒ์‚ฐ์ž ์ž‘์„ฑ

์กฐ์ง ๋ฐ์ดํ„ฐ๊ฐ€ ์ถ”๊ฐ€/์ˆ˜์ •/์‚ญ์ œ๋  ๋•Œ๋งˆ๋‹ค ์กฐ์ง ์„œ๋น„์Šค๊ฐ€ ์นดํ”„์นด ํ† ํ”ฝ(topic)์— ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐœํ–‰ํ•ด์„œ ํ† ํ”ฝ์œผ๋กœ ์กฐ์ง ๋ณ€๊ฒฝ ์ด๋ฒคํŠธ๊ฐ€ ๋ฐœ์ƒํ–ˆ์Œ์„ ์•Œ๋ ค ์ฃผ๋„๋ก ์กฐ์ง ์„œ๋น„์Šค๋ฅผ ๊ณ ์ณ ๋ณด์ž. ์œ„ ๊ทธ๋ฆผ์—์„œ ๋ฉ”์‹œ์ง€ ์ƒ์‚ฐ์ž๋ฅผ ์„ค๋ช…ํ•˜์—ฌ, ๊ทธ๋ฆผ 8-3์˜ ์ผ๋ฐ˜์ ์ธ ์Šคํ”„๋ง ํด๋ผ์šฐ๋“œ ์ŠคํŠธ๋ฆผ ์•„ํ‚คํ…์ฒ˜๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ ํ•œ๋‹ค.

๋ฐœํ–‰๋œ ๋ฉ”์‹œ์ง€ ์•ˆ์— ๋ณ€๊ฒฝ ์ด๋ฒคํŠธ์™€ ๊ด€๋ จ๋œ ์กฐ์ง ID์™€ ๋ฐœ์ƒํ•œ ์•ก์…˜ ์ •๋ณด (Add, Update, Delete)๊ฐ€ ํฌํ•จ๋œ๋‹ค.

๋จผ์ € ํ•  ์ผ์€ ์กฐ์ง ์„œ๋น„์Šค์˜ Maven, Gradle์— ์˜์กด์„ฑ์„ ์„ค์ •ํ•ด์•ผ ํ•œ๋‹ค.

Maven

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

Gradle

implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka'

๋ฉ”์ด๋ธ ์˜์กด์„ฑ์„ ์ •์˜ํ–ˆ๋‹ค๋ฉด, ๋ฉ”์‹œ์ง€ ๋ธŒ๋กœ์ปค์™€ ๋ฐ”์ธ๋”ฉํ•˜๋„๋ก ์ง€์ •ํ•ด์•ผ ํ•œ๋‹ค. ์ด ์ž‘์—…์„ ์œ„ํ•ด ์กฐ์ง ์„œ๋น„์Šค์˜ ๋ถ€ํŠธ์ŠคํŠธ๋žฉ ํด๋ž˜์Šค์ธ Application.java์— @EnableBinding ์—๋„ˆํ…Œ์ด์…˜์„ ์ถ”๊ฐ€ํ•œ๋‹ค.

package com.thoughtmechanix.organization;
import com.thoughtmechanix.organization.utils.UserContextFilter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import javax.servlet.Filter;
@SpringBootApplication
@EnableEurekaClient
@EnableCircuitBreaker
@EnableBinding(Source.class) ----@EnableBinding ์• ๋„ˆํ…Œ์ด์…˜์€ ์Šคํ”„๋ง ํด๋ผ์šฐ๋“œ ์ŠคํŠธ๋ฆผ์— ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์„ ๋ฉ”์‹œ์ง€ ๋ธŒ๋กœ์ปค๋กœ ๋ฐ”์ธ๋”ฉํ•˜๋ผ๊ณ  ์•Œ๋ฆฐ๋‹ค.
public class Application {
    @Bean
    public Filter userContextFilter() {
        UserContextFilter userContextFilter = new UserContextFilter();
            return userContextFilter;
    }
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}
 

@EnableBinding ์• ๋„ˆํ…Œ์ด์…˜์€ ํ•ด๋‹น ์„œ๋น„์Šค๋ฅผ ๋ฉ”์‹œ์ง€ ๋ธŒ๋กœ์ปค์— ๋ฐ”์ธ๋”ฉํ•˜๋„๋ก ์Šคํ”„๋ง ํด๋ผ์šฐ๋“œ ์ŠคํŠธ๋ฆผ์„ ์„ค์ •ํ•œ๋‹ค. @EnableBinding ์—๋„ˆํ…Œ์ด์…˜์—์„œ Source.class๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ํ•ด๋‹น ์„œ๋น„์Šค๊ฐ€ Source ํด๋ž˜์Šค์— ์ •์˜๋œ ์ฑ„๋„๋“ค์„ ์ด์šฉํ•ด ๋ฉ”์‹œ์ง€ ๋ธŒ๋กœ์ปค์™€ ํ†ต์‹ ํ•˜๊ฒŒ ๋œ๋‹ค. ์ฑ„๋„์€ ๋ฉ”์‹œ์ง€ ํ ์œ„์— ์žˆ๋‹ค๋Š” ๊ฒƒ์„ ์ƒ๊ธฐํ•˜์ž. ์Šคํ”„๋ง ํด๋ผ์šฐ๋“œ ์ŠคํŠธ๋ฆผ์€ ๋ฉ”์‹œ์ง€ ๋ธŒ๋กœ์ปค์™€ ํ†ต์‹ ํ•  ์ˆ˜ ์žˆ๋Š” ๊ธฐ๋ณธ ์ฑ„๋„์ด ์žˆ๋‹ค.

์•„์ง ์กฐ์ง ์„œ๋น„์Šค๊ฐ€ ์–ด๋–ค ๋ฉ”์‹œ์ง€ ๋ธŒ๋กœ์ปค๋ฅผ ๋ฐ”์ธ๋”ฉํ• ์ง€ ์Šคํ”„๋ง ํด๋ผ์šฐ๋“œ ์ŠคํŠธ๋ฆผ์— ์ง€์ •ํ•˜์ง€ ์•Š์•˜๋‹ค. ์ด์ œ ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐœํ–‰ํ•  ์ฝ”๋“œ๋ฅผ ๊ตฌํ˜„ํ•  ์ค€๋น„๊ฐ€ ๋˜์—ˆ๋‹ค.

package com.thoughtmechanix.organization.events.source;

// ๊ฐ„๊ฒฐํ•œ ์ฝ”๋“œ๋ฅผ ์œ„ํ•ด import ์‚ญ์ œ

@Component
public class SimpleSourceBean {
    private Source source;

    private static final Logger logger =
      LoggerFactory.getLogger(SimpleSourceBean.class);

    @Autowired
    public SimpleSourceBean(Source source) { ----์Šคํ”„๋ง ํด๋ผ์šฐ๋“œ ์ŠคํŠธ๋ฆผ์€ ์„œ๋น„์Šค๊ฐ€ ์‚ฌ์šฉํ•  ์†Œ์Šค ์ธํ„ฐํŽ˜์ด์Šค ๊ตฌํ˜„์„ ์ฃผ์ž…ํ•œ๋‹ค.
        this.source = source;
    }

    public void publishOrgChange(String action, String orgId) {
        logger.debug("Sending Kafka message {}
            for Organization Id: {}",
            action, orgId);
        OrganizationChangeModel change = new OrganizationChangeModel(
            OrganizationChangeModel.class.getTypeName(),
            action,
            orgId,
            UserContext.getCorrelationId()); ----๋ฐœํ–‰๋  ๋ฉ”์‹œ์ง€๋Š” ์ž๋ฐ” POJO๋‹ค.

        source
           .output()
           .send(
              MessageBuilder
                .withPayload(change)          .build()); ----๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ผ ์ค€๋น„๊ฐ€ ๋˜๋ฉด Source ํด๋ž˜์Šค์—์„œ ์ •์˜๋œ ์ฑ„๋„์—์„œ send() ๋ฉ”์„œ๋“œ๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.
    }
}

๋‹ค์Œ ์ฝ”๋“œ์—์„œ ์Šคํ”„๋ง ํด๋ผ์šฐ๋“œ์˜ Source ํด๋ž˜์Šค๋ฅผ ์ฝ”๋“œ์— ์ฃผ์ž…ํ•œ๋‹ค. ํŠน์ • ๋ฉ”์‹œ์ง€ ํ† ํ”ฝ์— ๋Œ€ํ•œ ๋ชจ๋“  ํ†ต์‹ ์€ ์ฑ„๋„์ด๋ผ๋Š” ์Šคํ”„๋ง ํด๋ผ์šฐ๋“œ ์ŠคํŠธ๋ฆผ ๊ตฌ์กฐ๋กœ ๋ฐœ์ƒํ•œ๋‹ค๋Š” ๊ฒƒ์„ ๊ธฐ์–ตํ•˜์ž. ์ฑ„๋„์€ ์ž๋ฐ” ์ธํ„ฐํŽ˜์ด์Šค๋กœ ํ‘œํ˜„๋˜๋ฉฐ, ์ด ์ฝ”๋“œ์—์„œ Source ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค. Source ์ธํ„ฐํŽ˜์ด์Šค๋Š” ์Šคํ”„๋ง ํด๋ผ์šฐ๋“œ์—์„œ ์ •์˜ํ•œ ์ธํ„ฐํŽ˜์ด์Šค๋กœ output()์ด๋ผ๋Š” ๋‹จ์ผ ๋ฉ”์„œ๋“œ๋ฅผ ๋…ธ์ถœํ•œ๋‹ค. Source ์ธํ„ฐํŽ˜์ด์Šค๋Š” ์„œ๋น„์Šค๊ฐ€ ๋‹จ์ผ ์ฑ„๋„์—๋งŒ ๋ฐœํ–‰ํ•ด์•ผ ํ•  ๋•Œ ์‚ฌ์šฉํ•˜๊ธฐ์—ํŽธ๋ฆฌํ•œ ์ธํ„ฐํŽ˜์ด์Šค์ด๋ฉฐ, output()๋ฉ”์„œ๋“œ๋Š” MessageChannelํด๋ž˜์Šค ํƒ€์ž…์„ ๋ฐ˜ํ™˜ํ•œ๋‹ค. MessageChannel์€ ๋ฉ”์‹œ์ง€ ๋ธŒ๋กœ์ปค์— ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ด๋Š” ๋ฐฉ๋ฒ•์„ ์ •์˜ํ•˜๋ฉฐ, ์ด ์žฅ ๋’ท๋ถ€๋ถ„์—์„œ ์‚ฌ์šฉ์ž ์ •์˜ ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ์‚ฌ์šฉํ•ด ์—ฌ๋Ÿฌ ๋ฉ”์‹œ์ง• ์ฑ„๋„์„๋…ธ์ถœํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ๋ณด์—ฌ ์ค„ ๊ฒƒ์ด๋‹ค.

์‹ค์ œ ๋ฉ”์‹œ์ง€ ๋ฐœํ–‰์€ publishOrgChange() ๋ฉ”์„œ๋“œ์—์„œ ์ด๋ฃจ์–ด์ง„๋‹ค. ์ด ๋ฉ”์„œ๋“œ๋Š” OrganizationChange Model์ด๋ผ๋Š” ์ž๋ฐ” POJO๋ฅผ ๋งŒ๋“ ๋‹ค. OrganizationChangeModel ํด๋ž˜์Šค๋Š” ๋‹ค์Œ ์„ธ ๊ฐ€์ง€ ๋ฐ์ดํ„ฐ ์š”์†Œ๋ฅผ ๋‹ด๋Š” POJO ํด๋ž˜์Šค์ด๋ฉฐ, ์ด ์žฅ์—์„œ ๋ณ„๋„๋กœ ์ฝ”๋“œ๋Š” ๋ณด์—ฌ ์ฃผ์ง€ ์•Š๋Š”๋‹ค.

  • ์•ก์…˜(action) : ์ด๋ฒคํŠธ๋ฅผ ๋ฐœ์ƒ์‹œํ‚จ ์•ก์…˜์ด๋‹ค. ๋ฉ”์‹œ์ง€์— ์•ก์…˜์„ ํฌํ•จ์‹œํ‚ค๋ฉด ๋ฉ”์‹œ์ง€ ์†Œ๋น„์ž๊ฐ€ ์ด๋ฒคํŠธ๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š” ๋ฐ ๋” ๋งŽ์€ ์ปจํ…์ŠคํŠธ๋ฅผ ์ œ๊ณตํ•  ์ˆ˜ ์žˆ๋‹ค.

  • ์กฐ์ง ID(organization ID) : ์ด๋ฒคํŠธ์™€ ์—ฐ๊ด€๋œ ์กฐ์งID๋‹ค.

  • ์ƒ๊ด€๊ด€๊ณ„ ID(correlation ID) : ์ด๋ฒคํŠธ๋ฅผ ๋ฐœ์ƒ์‹œํ‚จ ์„œ๋น„์Šค ํ˜ธ์ถœ์— ๋Œ€ํ•œ ์ƒ๊ด€๊ด€๊ณ„ ID๋‹ค. ์ƒ๊ด€๊ด€๊ณ„ ID๋Š” ์„œ๋น„์Šค๋“ค์„ ๊ฒฝ์œ ํ•˜๋Š” ๋ฉ”์‹œ์ง€ ํ๋ฆ„์„ ์ถ”์ ํ•˜๊ณ  ๋””๋ฒ„๊น…ํ•˜๋Š” ๋ฐ ๋„์›€์ด ๋งŽ์ด ๋˜๋ฏ€๋กœ ํ•ญ์ƒ ์ด๋ฒคํŠธ์— ํฌํ•จ์‹œ์ผœ์•ผ ํ•œ๋‹ค.

๋ฉ”์‹œ์ง€๋ฅผ ๋ฐœํ–‰ํ•  ์ค€๋น„๊ฐ€ ๋˜๋ฉด source.output() ๋ฉ”์„œ๋“œ์—์„œ ๋ฐ˜ํ™˜๋˜๋Š” MessageChannel ํด๋ž˜์Šค์˜ send()๋ฉ”์„œ๋“œ๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

source.output().send(MessageBuilder.withPayload(change).build());

send() ๋ฉ”์„œ๋“œ๋Š” ์Šคํ”„๋ง Messag ํด๋ž˜์Šค๋ฅผ ๋งค๊ฐœ๋ณ€์ˆ˜๋กœ ๋ฐ›๋Š”๋‹ค. MessageBuilder๋ผ๋Š” ์Šคํ”„๋ง ํ—ฌํผ ํด๋ž˜์Šค๋ฅผ ์‚ฌ์šฉํ•ด OrganizationChangeModel ํด๋ž˜์Šค ์ •๋ณด๋ฅผ ์ „๋‹ฌ๋ฐ›์•„ ์Šคํ”„๋ง Message ํด๋ž˜์Šค๋กœ ๋ณ€ํ™˜ํ•œ๋‹ค.

์ด๊ฒƒ์ด ๋ฉ”์‹œ์ง€๋ฅผ ๋ณด๋‚ด๋Š” ๋ฐ ํ•„์š”ํ•œ ์ฝ”๋“œ ์ „๋ถ€๋‹ค. ํ•˜์ง€๋งŒ ์‹ค์ œ ๋ฉ”์‹œ์ง€ ๋ธŒ๋กœ์ปค๋ฟ๋งŒ ์•„๋‹ˆ๋ผ ํŠน์ • ๋ฉ”์‹œ์ง€ ํ์—์„œ๋„ ์กฐ์ง ์„œ๋น„์Šค๋ฅผ ๋ฐ”์ธ๋”ฉํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ๋‹ค๋ฃจ์ง€ ์•Š์•„ ์•„์ง๊นŒ์ง€๋Š” ์ด ๋ชจ๋“  ๊ฒƒ์ด ๋‹ค์†Œ ๋งˆ์ˆ ์ฒ˜๋Ÿผ ๋А๊ปด์งˆ ๊ฒƒ์ด๋‹ค. ์‹ค์ œ๋กœ ์ด ๋ชจ๋“  ์ผ์€ ๊ตฌ์„ฑ ์„ค์ •์œผ๋กœ ์ด๋ฃจ์–ด์ง„๋‹ค. ์ด์ „ ์ฝ”๋“œ์—์„œ ์„œ๋น„์Šค์˜ ์Šคํ”„๋ง ํด๋ผ์šฐ๋“œ ์ŠคํŠธ๋ฆผ์˜ Source๊ฐ€ ์นดํ”„์นด ๋ฉ”์‹œ์ง€ ๋ธŒ๋กœ์ปค์™€ ์นดํ”„์นด์˜ ๋ฉ”์‹œ์ง€ ํ† ํ”ฝ์— ๋งคํ•‘๋˜๋Š” ๊ตฌ์„ฑ์„ ๋ณด์—ฌ ์ค€๋‹ค. ์ด ๊ตฌ์„ฑ ์ •๋ณด๋Š” ์„œ๋น„์Šค์˜ application.yml ํŒŒ์ผ์ด๋‚˜ ์Šคํ”„๋ง ํด๋ผ์šฐ๋“œ ์ปจํ”ผ๊ทธ์—์„œ ํ•ด๋‹น ํ™˜๊ฒฝ์— ๋งž๊ฒŒ ์„ค์ •ํ•  ์ˆ˜ ์žˆ๋‹ค.

spring:
   application:
      name: organizationservice

   // ๊ฐ„๊ฒฐํ•œ ์ฝ”๋“œ๋ฅผ ์œ„ํ•ด ์‚ญ์ œ

      stream: ----stream.bindings๋Š” ์„œ๋น„์Šค๊ฐ€ ์Šคํ”„๋ง ํด๋ผ์šฐ๋“œ ์ŠคํŠธ๋ฆผ์˜ ๋ฉ”์‹œ์ง€ ๋ธŒ๋กœ์ปค์— ๋ฐœํ–‰ํ•˜๋ ค๋Š” ๊ตฌ์„ฑ์˜ ์‹œ์ž‘์ ์ด๋‹ค.
         bindings:  
            output: ----output์€ ์ฑ„๋„ ์ด๋ฆ„์ด๋ฉฐ, ์ฝ”๋“œ 8-2์—์„œ ๋ณธ Source.output() ์ฑ„๋„์— ๋งคํ•‘๋œ๋‹ค.
               destination: orgChangeTopic ----orgChangeTopic์€ ๋ฉ”์‹œ์ง€๋ฅผ ๋„ฃ์„ ๋ฉ”์‹œ์ง€ ํ(๋˜๋Š” ํ† ํ”ฝ) ์ด๋ฆ„์ด๋‹ค.
               content-type: application/json ----content-type์€ ์Šคํ”„๋ง ํด๋ผ์šฐ๋“œ ์ŠคํŠธ๋ฆผ์— ์†ก์ˆ˜์‹ ํ•  ๋ฉ”์‹œ์ง€ ํƒ€์ž…์˜ ์ •๋ณด๋ฅผ ์ œ๊ณตํ•œ๋‹ค(์ด ๊ฒฝ์šฐ๋Š” JSON์ด๋‹ค).
            kafka: ----stream.bindings.kafka ํ”„๋กœํผํ‹ฐ๋Š” ํ•ด๋‹น ์„œ๋น„์Šค๊ฐ€ ๋ฉ”์‹œ์ง€ ๋ฒ„์Šค๋กœ ์นดํ”„์นด๋ฅผ ์‚ฌ์šฉํ•  ๊ฒƒ์ด๋ผ๊ณ  ์Šคํ”„๋ง์— ์ „๋‹ฌํ•œ๋‹ค(๋Œ€์•ˆ์œผ๋กœ RabbitMQ๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค).
               binder:                  
                  zkNodes: localhost ---- zkNodes์™€ brokers ํ”„๋กœํผํ‹ฐ๋Š” ์Šคํ”„๋ง ํด๋ผ์šฐ๋“œ ์ŠคํŠธ๋ฆผ์— ์นดํ”„์นด์™€ ์ฃผํ‚คํผ์˜ ๋„คํŠธ์›Œํฌ ์œ„์น˜๋ฅผ ์ „๋‹ฌํ•œ๋‹ค.
                  brokers: localhost

spring.stream.bindings.output ๊ตฌ์„ฑ ํ”„๋กœํผํ‹ฐ๋Š” source.output()์ฑ„๋„์„ ํ†ต์‹ ํ•˜๋ ค๋Š” ๋ฉ”์‹œ์ง€ ๋ธŒ๋กœ์ปค์˜ orgChangeTopic์— ๋งคํ•‘ํ•œ๋‹ค. ์ด ํ”„๋กœํผํ‹ฐ๋„ topic์— ์ „๋‹ฌํ•˜๋Š” ๋ฉ”์‹œ์ง€๋ฅผ json์œผ๋กœ ์ง๋ ฌํ™”ํ•˜๋„๋ก ์Šคํ”„๋ง ํด๋ผ์šฐ๋“œ ์ŠคํŠธ๋ฆผ์„ ์„ค์ •ํ•œ๋‹ค. ์Šคํ”„๋ง ํด๋ผ์šฐ๋“œ ์ŠคํŠธ๋ฆผ์€ JSON ๋ฐ XML, ์•„ํŒŒ์น˜ ์žฌ๋‹จ์˜ ์•„๋ธŒ๋กœ ํฌ๋งท์„ ํฌํ•จํ•œ ๋‹ค์–‘ํ•œ ํฌ๋งท์œผ๋กœ ๋ฉ”์‹œ์ง€๋ฅผ ์ง๋ ฌํ™”ํ•  ์ˆ˜ ์žˆ๋‹ค.

spring.stream.bindings.kafka ๊ตฌ์„ฑ ํ”„๋กœํผํ‹ฐ๋Š” ์Šคํ”„๋ง ํด๋ผ์šฐ๋“œ ์ŠคํŠธ๋ฆผ์ด ์„œ๋น„์Šค๋ฅผ ์นดํ”„์นด์— ๋ฐ”์ธ๋”ฉํ•˜๋„๋ก ์„ค์ •ํ•œ๋‹ค. ํ•˜์œ„ ํ”„๋กœํผํ‹ฐ๋Š” ์นดํ”„์นด ๋ฉ”์‹œ์ง€ ๋ธŒ๋กœ์ปค ๋ฐ ์นดํ”„์นด์™€ ํ•จ๊ป˜ ์‹คํ–‰๋˜๋Š” ์•„ํŒŒ์น˜ ์ฃผํ‚คํผ ์„œ๋ฒ„์˜ ๋„คํŠธ์›Œํฌ ์ฃผ์†Œ๋ฅผ ์Šคํ”„๋ง ํด๋ผ์šฐ๋“œ ์ŠคํŠธ๋ฆผ์— ์„ค์ •ํ•œ๋‹ค.

์ด์ œ ์Šคํ”„๋ง ํด๋ผ์šฐ๋“œ ์ŠคํŠธ๋ฆผ์œผ๋กœ ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐœํ–‰ํ•˜๋Š” ์ฝ”๋“œ์™€ ์นดํ”„์นด๋ฅผ ๋ฉ”์‹œ์ง€ ๋ธŒ๋กœ์ปค๋กœ ์‚ฌ์šฉํ•˜๋„๋ก ๊ตฌ์„ฑํ–ˆ์œผ๋‹ˆ ์กฐ์ง ์„œ๋น„์Šค์—์„œ๋Š” ์‹ค์ œ๋กœ ์–ด๋””์„œ ๋ฉ”์‹œ์ง€๊ฐ€ ๋ฐœํ–‰๋˜๋Š”์ง€ ์‚ดํŽด๋ณด์ž. ์ด ์ž‘์—…์€ OrganizationService.java ํด๋ž˜์Šค์—์„œ ์ด๋ฃจ์–ด์ง„๋‹ค.

package com.thoughtmechanix.organization.services;

// ๊ฐ„๊ฒฐํ•œ ์ฝ”๋“œ๋ฅผ ์œ„ํ•ด import ์‚ญ์ œ

@Service
public class OrganizationService {
    @Autowired    private OrganizationRepository orgRepository;

    @Autowired ----SimpleSourceBean์„ OrganizationService์— ์ฃผ์ž…ํ•˜๊ธฐ ์œ„ํ•ด ์Šคํ”„๋ง ์ž๋™ ์—ฐ๊ฒฐ์„ ์‚ฌ์šฉํ•œ๋‹ค.
    SimpleSourceBean simpleSourceBean;

    // ๊ฐ„๊ฒฐํ•œ ์ฝ”๋“œ๋ฅผ ์œ„ํ•ด ๋‚˜๋จธ์ง€ ์‚ญ์ œ

    public void saveOrg(Organization org) {
        org.setId(UUID.randomUUID().toString());

        orgRepository.save(org);
        simpleSourceBean.publishOrgChange("SAVE", org.getId()); ----์กฐ์ง ๋ฐ์ดํ„ฐ๋ฅผ ๋ณ€๊ฒฝํ•˜๋Š” ๋ฉ”์„œ๋“œ ๋ชจ๋‘ simpleSourceBean.publishOrgChange()๋ฅผ ํ˜ธ์ถœํ•œ๋‹ค.
    }
}

Note ์–ด๋–ค ๋ฐ์ดํ„ฐ๋ฅผ ๋ฉ”์‹œ์ง€์— ์ถ”๊ฐ€ํ•ด์•ผ ํ• ๊นŒ?

๋ฉ”์‹œ์ง• ๊ฐœ๋ฐœ์„ ์‹œ์ž‘ํ•œ ํŒ€์—์„œ ๋งŽ์ด ๋ฐ›๋Š” ์งˆ๋ฌธ ์ค‘ ํ•˜๋‚˜๋Š” โ€œ์ •ํ™•ํ•˜๊ฒŒ ์–ผ๋งˆ๋‚˜ ๋งŽ์€ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฉ”์‹œ์ง€์— ๋„ฃ์–ด์•ผ ํ•˜๋Š”๊ฐ€โ€ ํ•˜๋Š” ๊ฒƒ์ด๋‹ค. ์ด์— ๋Œ€ํ•ด ํ•„์ž๋Š” โ€œ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜๋งˆ๋‹ค ๋‹ค๋ฅด๋‹ค.โ€๋ผ๊ณ  ๋‹ตํ•œ๋‹ค. ์•Œ๊ฒ ์ง€๋งŒ ์ด ์žฅ ๋ชจ๋“  ์˜ˆ์ œ์—์„œ๋Š” ๋ณ€๊ฒฝ๋œ ์กฐ์ง ๋ ˆ์ฝ”๋“œ์˜ ์กฐ์ง ID๋งŒ ๋ฐ˜ํ™˜ํ•œ๋‹ค. ํ•„์ž๋Š” ๋ฉ”์‹œ์ง€์— ๋ณ€๊ฒฝ๋œ ๋ฐ์ดํ„ฐ ๋ณต์‚ฌ๋ณธ์€ ์ ˆ๋Œ€ ๋„ฃ์ง€ ์•Š๋Š”๋‹ค. ์˜ˆ์ œ(์™€ ์ „ํ™” ํ†ต์‹  ๋ถ„์•ผ์—์„œ ํ•„์ž๊ฐ€ ๊ฒช์€ ๋งŽ์€ ๋ฌธ์ œ)์—์„œ ์‹คํ–‰๋˜๋Š” ๋น„์ฆˆ๋‹ˆ์Šค ๋กœ์ง์€ ๋ฐ์ดํ„ฐ ๋ณ€ํ™”์— ๋ฏผ๊ฐํ•˜๋‹ค. ํ•„์ž๋Š” ์‹œ์Šคํ…œ ์ด๋ฒคํŠธ์— ๊ธฐ๋ฐ˜์„ ๋‘” ๋ฉ”์‹œ์ง€๋ฅผ ์‚ฌ์šฉํ•ด ๋‹ค๋ฅธ ์„œ๋น„์Šค์—๋Š” ๋ฐ์ดํ„ฐ๊ฐ€ ๋ณ€๊ฒฝ๋œ ์‚ฌ์‹ค๋งŒ ์ „ํŒŒํ•˜๊ณ , ๊ทธ ์„œ๋น„์Šค๋“ค์ด ํ•ญ์ƒ ๋งˆ์Šคํ„ฐ(๋ฐ์ดํ„ฐ๋ฅผ ์†Œ์œ ํ•œ ์„œ๋น„์Šค)๋กœ ๋‹ค์‹œ ๊ฐ€์„œ ์ƒˆ๋กœ์šด ๋ฐ์ดํ„ฐ ์‚ฌ๋ณธ์„ ๊ฐ€์ ธ์˜ค๊ฒŒ ํ–ˆ๋‹ค. ์ด ๋ฐฉ์‹์€ ์‹คํ–‰ ์‹œ๊ฐ„ ๋ฉด์—์„œ ๋” ๋งŽ์€ ๋น„์šฉ์ด ๋“ค์ง€๋งŒ ํ•ญ์ƒ ์ตœ์‹  ๋ฐ์ดํ„ฐ ๋ณต์‚ฌ๋ณธ์œผ๋กœ ์ž‘์—…ํ•˜๋Š” ๊ฒƒ์„ ๋ณด์žฅํ•œ๋‹ค. ์ž‘์—… ์ค‘์ธ ๋ฐ์ดํ„ฐ๋ฅผ ์›๋ณธ ์‹œ์Šคํ…œ์—์„œ ์ฝ์–ด ์˜จ ์งํ›„์—๋„ ๋ณ€๊ฒฝ๋  ๊ฐ€๋Šฅ์„ฑ์€ ์—ฌ์ „ํžˆ ์กด์žฌํ•˜์ง€๋งŒ, ํ์—์„œ ๋ฐ›์€ ์ •๋ณด๋ฅผ ๋ฌดํ„ฑ๋Œ€๊ณ  ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ๋ณด๋‹ค ๊ฐ€๋Šฅ์„ฑ์ด ํ›จ์”ฌ ๋‚ฎ๋‹ค.

์–ผ๋งˆ๋‚˜ ๋งŽ์€ ๋ฐ์ดํ„ฐ๋ฅผ ์ „๋‹ฌํ• ์ง€ ์‹ ์ค‘ํžˆ ์ƒ๊ฐํ•ด์•ผ ํ•œ๋‹ค. ์—ฌ๋Ÿฌ๋ถ„์€ ์กฐ๋งŒ๊ฐ„ ์ „๋‹ฌํ•œ ๋ฐ์ดํ„ฐ๊ฐ€ ์˜ˆ์ „(stale) ๋ฐ์ดํ„ฐ์ธ ์ƒํ™ฉ์— ์ฒ˜ํ•  ๊ฒƒ์ด๋‹ค. ์–ด๋–ค ๋ฌธ์ œ๋กœ ๋ฉ”์‹œ์ง€ ํ์— ๋„ˆ๋ฌด ์˜ค๋ž˜ ๋ณด๊ด€๋˜๊ฑฐ๋‚˜ ๋ฐ์ดํ„ฐ๋ฅผ ํฌํ•จํ•œ ์ด์ „ ๋ฉ”์‹œ์ง€๊ฐ€ ์‹คํŒจํ•ด์„œ ์ผ๊ด€์„ฑ ์—†๋Š” ๋ฉ”์‹œ์ง€๊ฐ€ ์ „๋‹ฌ๋˜๊ธฐ ๋•Œ๋ฌธ์—(์• ํ”Œ๋ฆฌ์ผ€์ด์…˜์ด ์›๋ž˜ ๋ฐ์ดํ„ฐ ์Šคํ† ์–ด์— ์žˆ๋Š” ๋ฐ์ดํ„ฐ์˜ ์‹ค์ œ ์ƒํƒœ๋ณด๋‹ค ๋ฉ”์‹œ์ง€ ์ƒํƒœ์— ๋” ์˜์กดํ•˜๊ธฐ ๋•Œ๋ฌธ์—) ์ „๋‹ฌ๋œ ๋ฐ์ดํ„ฐ๋Š” ์ตœ์‹ ์ด ์•„๋‹ ์ˆ˜ ์žˆ๋‹ค. ๋ฉ”์‹œ์ง€์˜ ์ƒํƒœ๋ฅผ ์ „๋‹ฌํ•˜๋ ค๋ฉด ๋ฉ”์‹œ์ง€์— ๋‚ ์งœ-์‹œ๊ฐ„ ์Šคํƒฌํ”„ ๋˜๋Š” ๋ฒ„์ „ ๋ฒˆํ˜ธ๋„ ํฌํ•จ์‹œ์ผœ ๋ฐ์ดํ„ฐ๋ฅผ ์†Œ๋น„ํ•˜๋Š” ์„œ๋น„์Šค๊ฐ€ ์ „๋‹ฌ๋œ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฒ€์‚ฌํ•˜๊ณ  ์ด๋ฏธ ๊ฐ€์ง„ ๋ฐ์ดํ„ฐ ๋ณต์‚ฌ๋ณธ๋ณด๋‹ค ์ด์ „ ๊ฒƒ์ด ์•„๋‹Œ์ง€ ํ™•์ธํ•  ์ˆ˜ ์žˆ์–ด์•ผ ํ•œ๋‹ค(๋ฐ์ดํ„ฐ๋Š” ๋น„์ˆœ์ฐจ์ ์œผ๋กœ ๊ฒ€์ƒ‰๋  ์ˆ˜ ์žˆ์Œ์„ ๊ธฐ์–ตํ•˜์ž).

Last updated

Was this helpful?