Spring Cloud Stream
์คํ๋ง ํด๋ผ์ฐ๋ ์คํธ๋ฆผ ์๊ฐ
์คํ๋ง ํด๋ผ์ฐ๋ ์คํธ๋ฆผ ํ๋ก์ ํธ๋ ์ ํ๋ฆฌ์ผ์ด์ ์ ๋ฉ์์ง ๋ฐํ์์ ์๋น์๋ฅผ ์ฝ๊ฒ ๊ตฌ์ถํ ์ ์๋ ์๋ํ ์ด์ ๊ธฐ๋ฐ ํ๋ ์์ํฌ.
์ฌ์ฉํ๋ ์์ง ํ๋ซํผ์ ๊ตฌํ ์ธ๋ถ ์ฌํญ์ ์ถ์ํํ๋ค. ์ฌ๋ฌ ๋ฉ์์ง ํ๋ซํผ์ด ์คํ๋ง ํด๋ผ์ฐ๋ ์คํธ๋ฆผ๊ณผ ์ฌ์ฉ๋ ์ ์์ผ๋ฉฐ, ํน์ ํ๋ซํผ์ ์ํ ์ธ๋ถ ๊ตฌํ์ ์ ํ๋ฆฌ์ผ์ด์ ์ฝ๋์ ๋ถ๋ฆฌ๋๋ค. ์ ํ๋ฆฌ์ผ์ด์ ์์ ๋ฉ์์ง ๋ฐํ๊ณผ ์๋น ๊ตฌํ์ ํ๋ซํผ ์ค๋ฆฝ์ ์ธ ์คํ๋ง ์ธํฐํ์ด์ค๋ก ์ํ๋๋ค.
์คํ๋ง ํด๋ผ์ฐ๋ ์คํธ๋ฆผ ์ํคํ
์ฒ
๋ฉ์์ง์ ์ด์ฉํ ๋ ๋น์ค์ ํต์ ๊ด์ ์์ ์คํ๋ง ํด๋ผ์ฐ๋ ์คํธ๋ฆผ ์ํคํ ์ฒ๋ฅผ ์ดํด๋ณด๋ฉฐ ๋ ผ์๋ฅผ ์์ํด ๋ณด์. ํ ์๋น์ค๊ฐ ๋ฉ์์ง ๋ฐํ์(publisher)๊ฐ ๋๊ณ , ๋ค๋ฅธ ์๋น์ค๋ ๋ฉ์์ง ์๋น์(consumer)๊ฐ ๋๋ค. ์๋ ๊ทธ๋ฆผ์์ ์คํ๋ง ํด๋ผ์ฐ๋ ์คํธ๋ฆผ์ด ์ด๋ป๊ฒ ๋ฉ์์ง๋ฅผ ์์ํ๊ฒ ์ ๋ฌํ๋์ง ํ์ธํ ์ ์๋ค.
์คํ๋ง ํด๋ผ์ฐ๋์์ ๋ฉ์์ง๋ฅผ ๋ฐํํ๊ณ ์๋นํ๋ ๋ฐ ๋ค์ 4๊ฐ์ ์ปดํฌ๋ํธ๊ฐ ๊ด๋ จ๋์ด ์๋ค.
์์ค
์ฑ๋
๋ฐ์ธ๋
์ฑํฌ

์์ค(SOURCE)
์๋น์ค๊ฐ ๋ฉ์์ง๋ฅผ ๋ฐํํ ์ค๋น๊ฐ ๋๋ฉด ์์ค๋ฅผ ์ฌ์ฉํด ๋ฉ์์ง๋ฅผ ๋ฐํํ๋ค. ์์ค๋ ๋ฐํ๋ ๋ฉ์์ง๋ฅผ ํํํ๋ POJO๋ฅผ ์ ๋ฌ๋ฐ๋ ์คํ๋ง์ ์ ๋ํ ์ด์ ์ธํฐํ์ด์ค๋ค. ์์ค๋ ๋ฉ์์ง๋ฅผ ๋ฐ์ ์ง๋ ฌํ(๊ธฐ๋ณธ ์ง๋ ฌํ ์ค์ ์ JSON)ํ๊ณ ๋ฉ์์ง๋ฅผ ์ฑ๋๋ก ๋ฐํํ๋ค.
์ฑ๋(CHANNEL)
๋ฉ์์ง ์์ฐ์์ ์๋น์๊ฐ ๋ฉ์์ง๋ฅผ ๋ฐํํ๊ฑฐ๋ ์๋นํ ํ ๋ฉ์์ง๋ฅผ ๋ณด๊ดํ ํ๋ฅผ ์ถ์ํํ ๊ฒ. ์ฑ๋ ์ด๋ฆ์ ํญ์ ๋์ ํ์ ์ด๋ฆ๊ณผ ๊ด๋ จ์ด ์์ง๋ง ์ฝ๋์์๋ ํ ์ด๋ฆ์ ์ง์ ์ฌ์ฉํ์ง ์๊ณ , ์ฑ๋ ์ด๋ฆ์ ์ฌ์ฉํ๋ค. ๋ฐ๋ผ์ ์ฑ๋์ด ์ฝ๊ฑฐ๋ ์ฐ๋ ํ๋ฅผ ์ ํํ๋ ค๋ฉด ์ ํ๋ฆฌ์ผ์ด์ ์ฝ๋๊ฐ ์๋ ๊ตฌ์ฑ ์ ๋ณด๋ฅผ ๋ณ๊ฒฝํ๋ค.
๋ฐ์ธ๋(BINDER)
์คํ๋ง ํด๋ผ์ฐ๋ ์คํธ๋ฆผ ํ๋ ์์ํฌ์ ์ผ๋ถ์ธ ์คํ๋ง ์ฝ๋๋ก ํน์ ๋ฉ์์ง ํ๋ซํผ๊ณผ ํต์ ํ๋ค. ์คํ๋ง ํด๋ผ์ฐ๋ ์คํธ๋ฆผ์ ๋ฐ์ธ๋๋ฅผ ์ฌ์ฉํ๋ฉด ๋ฉ์์ง๋ฅผ ๋ฐํํ๊ณ ์๋นํ๊ธฐ ์ํด ํ๋ซํผ๋ง๋ค ๋ณ๋์ ๋ผ์ด๋ธ๋ฌ๋ฆฌ์ API๋ฅผ ์ ๊ณตํ์ง ์๊ณ ๋ ๋ฉ์์ง์ ์ฌ์ฉํ ์ ์๋ค.
์ฑํฌ(SINK)
์คํ๋ง ํด๋ผ์ฐ๋ ์คํธ๋ฆผ์์ ์๋น์ค๋ ์ฑํฌ๋ฅผ ์ฌ์ฉํด ํ์์ ๋ฉ์์ง๋ฅผ ๋ฐ๋๋ค. ์ฑํฌ๋ ๋ค์ด์ค๋ ๋ฉ์์ง๋ฅผ ์ํด ์ฑ๋์ ์์ ๋๊ธฐํ๊ณ , ๋ฉ์์ง๋ฅผ ๋ค์ POJO๋ก ์ญ์ง๋ ฌํํ๋ค. ์ด ๊ณผ์ ์์ ์คํ๋ง ์๋น์ค์ ๋น์ฆ๋์ค ๋ก์ง์ด ๋ฉ์์ง๋ฅผ ์ฒ๋ฆฌํ ์ ์๋ค.
๊ฐ๋จํ ๋ฉ์์ง Publisher์ Consumer ์์ฑ
๋ค์ ์์ ์์ ์กฐ์ง ์๋น์ค๊ฐ ๋ผ์ด์ ์ฑ ์๋น์ค๋ก ๋ฉ์์ง๋ฅผ ์ ๋ฌํ๋ค. ๋ผ์ด์ ์ฑ ์๋น์ค๊ฐ ๋ฉ์์ง๋ฅผ ๋ฐ์ ๋ ํ๋ ์ผ์ ๋ก๊ทธ ๋ฉ์์ง๋ฅผ ์ฝ์์ ์ถ๋ ฅํ๋ ๊ฒ๋ฟ์ด๋ค.
์ด ์ ์์ ํ๋์ ์คํ๋ง ํด๋ผ์ฐ๋ ์คํธ๋ฆผ์ ์์ค(๋ฉ์์ง ์์ฐ์)์ ์ฑํฌ(๋ฉ์์ง ์๋น์)๋ง ์ฌ์ฉํ๋ฏ๋ก, ์กฐ์ง ์๋น์ค์ ์์ค์ ๋ผ์ด์ ์ฑ ์๋น์ค์ ์ฑํฌ ์ค์ ์ ์์ฝ๊ฒ ํด ์ฃผ๋ ์คํ๋ง ํด๋ผ์ฐ๋ ์์ ๋ฅผ ์์ํ ๊ฒ์ด๋ค.

1. ์กฐ์ง ์๋น์ค์ ๋ฉ์์ง ์์ฐ์ ์์ฑ
์กฐ์ง ๋ฐ์ดํฐ๊ฐ ์ถ๊ฐ/์์ /์ญ์ ๋ ๋๋ง๋ค ์กฐ์ง ์๋น์ค๊ฐ ์นดํ์นด ํ ํฝ(topic)์ ๋ฉ์์ง๋ฅผ ๋ฐํํด์ ํ ํฝ์ผ๋ก ์กฐ์ง ๋ณ๊ฒฝ ์ด๋ฒคํธ๊ฐ ๋ฐ์ํ์์ ์๋ ค ์ฃผ๋๋ก ์กฐ์ง ์๋น์ค๋ฅผ ๊ณ ์ณ ๋ณด์. ์ ๊ทธ๋ฆผ์์ ๋ฉ์์ง ์์ฐ์๋ฅผ ์ค๋ช ํ์ฌ, ๊ทธ๋ฆผ 8-3์ ์ผ๋ฐ์ ์ธ ์คํ๋ง ํด๋ผ์ฐ๋ ์คํธ๋ฆผ ์ํคํ ์ฒ๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ํ๋ค.
๋ฐํ๋ ๋ฉ์์ง ์์ ๋ณ๊ฒฝ ์ด๋ฒคํธ์ ๊ด๋ จ๋ ์กฐ์ง ID์ ๋ฐ์ํ ์ก์ ์ ๋ณด (Add, Update, Delete)๊ฐ ํฌํจ๋๋ค.
๋จผ์ ํ ์ผ์ ์กฐ์ง ์๋น์ค์ Maven, Gradle์ ์์กด์ฑ์ ์ค์ ํด์ผ ํ๋ค.
Maven
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
Gradle
implementation 'org.springframework.cloud:spring-cloud-stream'
implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka'
๋ฉ์ด๋ธ ์์กด์ฑ์ ์ ์ํ๋ค๋ฉด, ๋ฉ์์ง ๋ธ๋ก์ปค์ ๋ฐ์ธ๋ฉํ๋๋ก ์ง์ ํด์ผ ํ๋ค. ์ด ์์ ์ ์ํด ์กฐ์ง ์๋น์ค์ ๋ถํธ์คํธ๋ฉ ํด๋์ค์ธ Application.java์ @EnableBinding ์๋ํ ์ด์ ์ ์ถ๊ฐํ๋ค.
package com.thoughtmechanix.organization;
import com.thoughtmechanix.organization.utils.UserContextFilter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import javax.servlet.Filter;
@SpringBootApplication
@EnableEurekaClient
@EnableCircuitBreaker
@EnableBinding(Source.class) ----@EnableBinding ์ ๋ํ
์ด์
์ ์คํ๋ง ํด๋ผ์ฐ๋ ์คํธ๋ฆผ์ ์ ํ๋ฆฌ์ผ์ด์
์ ๋ฉ์์ง ๋ธ๋ก์ปค๋ก ๋ฐ์ธ๋ฉํ๋ผ๊ณ ์๋ฆฐ๋ค.
public class Application {
@Bean
public Filter userContextFilter() {
UserContextFilter userContextFilter = new UserContextFilter();
return userContextFilter;
}
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
@EnableBinding ์ ๋ํ ์ด์ ์ ํด๋น ์๋น์ค๋ฅผ ๋ฉ์์ง ๋ธ๋ก์ปค์ ๋ฐ์ธ๋ฉํ๋๋ก ์คํ๋ง ํด๋ผ์ฐ๋ ์คํธ๋ฆผ์ ์ค์ ํ๋ค. @EnableBinding ์๋ํ ์ด์ ์์ Source.class๋ฅผ ์ฌ์ฉํ๋ฉด ํด๋น ์๋น์ค๊ฐ Source ํด๋์ค์ ์ ์๋ ์ฑ๋๋ค์ ์ด์ฉํด ๋ฉ์์ง ๋ธ๋ก์ปค์ ํต์ ํ๊ฒ ๋๋ค. ์ฑ๋์ ๋ฉ์์ง ํ ์์ ์๋ค๋ ๊ฒ์ ์๊ธฐํ์. ์คํ๋ง ํด๋ผ์ฐ๋ ์คํธ๋ฆผ์ ๋ฉ์์ง ๋ธ๋ก์ปค์ ํต์ ํ ์ ์๋ ๊ธฐ๋ณธ ์ฑ๋์ด ์๋ค.
์์ง ์กฐ์ง ์๋น์ค๊ฐ ์ด๋ค ๋ฉ์์ง ๋ธ๋ก์ปค๋ฅผ ๋ฐ์ธ๋ฉํ ์ง ์คํ๋ง ํด๋ผ์ฐ๋ ์คํธ๋ฆผ์ ์ง์ ํ์ง ์์๋ค. ์ด์ ๋ฉ์์ง๋ฅผ ๋ฐํํ ์ฝ๋๋ฅผ ๊ตฌํํ ์ค๋น๊ฐ ๋์๋ค.
package com.thoughtmechanix.organization.events.source;
// ๊ฐ๊ฒฐํ ์ฝ๋๋ฅผ ์ํด import ์ญ์
@Component
public class SimpleSourceBean {
private Source source;
private static final Logger logger =
LoggerFactory.getLogger(SimpleSourceBean.class);
@Autowired
public SimpleSourceBean(Source source) { ----์คํ๋ง ํด๋ผ์ฐ๋ ์คํธ๋ฆผ์ ์๋น์ค๊ฐ ์ฌ์ฉํ ์์ค ์ธํฐํ์ด์ค ๊ตฌํ์ ์ฃผ์
ํ๋ค.
this.source = source;
}
public void publishOrgChange(String action, String orgId) {
logger.debug("Sending Kafka message {}
for Organization Id: {}",
action, orgId);
OrganizationChangeModel change = new OrganizationChangeModel(
OrganizationChangeModel.class.getTypeName(),
action,
orgId,
UserContext.getCorrelationId()); ----๋ฐํ๋ ๋ฉ์์ง๋ ์๋ฐ POJO๋ค.
source
.output()
.send(
MessageBuilder
.withPayload(change) .build()); ----๋ฉ์์ง๋ฅผ ๋ณด๋ผ ์ค๋น๊ฐ ๋๋ฉด Source ํด๋์ค์์ ์ ์๋ ์ฑ๋์์ send() ๋ฉ์๋๋ฅผ ์ฌ์ฉํ๋ค.
}
}
๋ค์ ์ฝ๋์์ ์คํ๋ง ํด๋ผ์ฐ๋์ Source ํด๋์ค๋ฅผ ์ฝ๋์ ์ฃผ์ ํ๋ค. ํน์ ๋ฉ์์ง ํ ํฝ์ ๋ํ ๋ชจ๋ ํต์ ์ ์ฑ๋์ด๋ผ๋ ์คํ๋ง ํด๋ผ์ฐ๋ ์คํธ๋ฆผ ๊ตฌ์กฐ๋ก ๋ฐ์ํ๋ค๋ ๊ฒ์ ๊ธฐ์ตํ์. ์ฑ๋์ ์๋ฐ ์ธํฐํ์ด์ค๋ก ํํ๋๋ฉฐ, ์ด ์ฝ๋์์ Source ์ธํฐํ์ด์ค๋ฅผ ์ฌ์ฉํ๋ค. Source ์ธํฐํ์ด์ค๋ ์คํ๋ง ํด๋ผ์ฐ๋์์ ์ ์ํ ์ธํฐํ์ด์ค๋ก output()์ด๋ผ๋ ๋จ์ผ ๋ฉ์๋๋ฅผ ๋ ธ์ถํ๋ค. Source ์ธํฐํ์ด์ค๋ ์๋น์ค๊ฐ ๋จ์ผ ์ฑ๋์๋ง ๋ฐํํด์ผ ํ ๋ ์ฌ์ฉํ๊ธฐ์ํธ๋ฆฌํ ์ธํฐํ์ด์ค์ด๋ฉฐ, output()๋ฉ์๋๋ MessageChannelํด๋์ค ํ์ ์ ๋ฐํํ๋ค. MessageChannel์ ๋ฉ์์ง ๋ธ๋ก์ปค์ ๋ฉ์์ง๋ฅผ ๋ณด๋ด๋ ๋ฐฉ๋ฒ์ ์ ์ํ๋ฉฐ, ์ด ์ฅ ๋ท๋ถ๋ถ์์ ์ฌ์ฉ์ ์ ์ ์ธํฐํ์ด์ค๋ฅผ ์ฌ์ฉํด ์ฌ๋ฌ ๋ฉ์์ง ์ฑ๋์๋ ธ์ถํ๋ ๋ฐฉ๋ฒ์ ๋ณด์ฌ ์ค ๊ฒ์ด๋ค.
์ค์ ๋ฉ์์ง ๋ฐํ์ publishOrgChange() ๋ฉ์๋์์ ์ด๋ฃจ์ด์ง๋ค. ์ด ๋ฉ์๋๋ OrganizationChange Model์ด๋ผ๋ ์๋ฐ POJO๋ฅผ ๋ง๋ ๋ค. OrganizationChangeModel ํด๋์ค๋ ๋ค์ ์ธ ๊ฐ์ง ๋ฐ์ดํฐ ์์๋ฅผ ๋ด๋ POJO ํด๋์ค์ด๋ฉฐ, ์ด ์ฅ์์ ๋ณ๋๋ก ์ฝ๋๋ ๋ณด์ฌ ์ฃผ์ง ์๋๋ค.
์ก์ (action) : ์ด๋ฒคํธ๋ฅผ ๋ฐ์์ํจ ์ก์ ์ด๋ค. ๋ฉ์์ง์ ์ก์ ์ ํฌํจ์ํค๋ฉด ๋ฉ์์ง ์๋น์๊ฐ ์ด๋ฒคํธ๋ฅผ ์ฒ๋ฆฌํ๋ ๋ฐ ๋ ๋ง์ ์ปจํ ์คํธ๋ฅผ ์ ๊ณตํ ์ ์๋ค.
์กฐ์ง ID(organization ID) : ์ด๋ฒคํธ์ ์ฐ๊ด๋ ์กฐ์งID๋ค.
์๊ด๊ด๊ณ ID(correlation ID) : ์ด๋ฒคํธ๋ฅผ ๋ฐ์์ํจ ์๋น์ค ํธ์ถ์ ๋ํ ์๊ด๊ด๊ณ ID๋ค. ์๊ด๊ด๊ณ ID๋ ์๋น์ค๋ค์ ๊ฒฝ์ ํ๋ ๋ฉ์์ง ํ๋ฆ์ ์ถ์ ํ๊ณ ๋๋ฒ๊น ํ๋ ๋ฐ ๋์์ด ๋ง์ด ๋๋ฏ๋ก ํญ์ ์ด๋ฒคํธ์ ํฌํจ์์ผ์ผ ํ๋ค.
๋ฉ์์ง๋ฅผ ๋ฐํํ ์ค๋น๊ฐ ๋๋ฉด source.output() ๋ฉ์๋์์ ๋ฐํ๋๋ MessageChannel ํด๋์ค์ send()๋ฉ์๋๋ฅผ ์ฌ์ฉํ๋ค.
source.output().send(MessageBuilder.withPayload(change).build());
send() ๋ฉ์๋๋ ์คํ๋ง Messag ํด๋์ค๋ฅผ ๋งค๊ฐ๋ณ์๋ก ๋ฐ๋๋ค. MessageBuilder๋ผ๋ ์คํ๋ง ํฌํผ ํด๋์ค๋ฅผ ์ฌ์ฉํด OrganizationChangeModel ํด๋์ค ์ ๋ณด๋ฅผ ์ ๋ฌ๋ฐ์ ์คํ๋ง Message ํด๋์ค๋ก ๋ณํํ๋ค.
์ด๊ฒ์ด ๋ฉ์์ง๋ฅผ ๋ณด๋ด๋ ๋ฐ ํ์ํ ์ฝ๋ ์ ๋ถ๋ค. ํ์ง๋ง ์ค์ ๋ฉ์์ง ๋ธ๋ก์ปค๋ฟ๋ง ์๋๋ผ ํน์ ๋ฉ์์ง ํ์์๋ ์กฐ์ง ์๋น์ค๋ฅผ ๋ฐ์ธ๋ฉํ๋ ๋ฐฉ๋ฒ์ ๋ค๋ฃจ์ง ์์ ์์ง๊น์ง๋ ์ด ๋ชจ๋ ๊ฒ์ด ๋ค์ ๋ง์ ์ฒ๋ผ ๋๊ปด์ง ๊ฒ์ด๋ค. ์ค์ ๋ก ์ด ๋ชจ๋ ์ผ์ ๊ตฌ์ฑ ์ค์ ์ผ๋ก ์ด๋ฃจ์ด์ง๋ค. ์ด์ ์ฝ๋์์ ์๋น์ค์ ์คํ๋ง ํด๋ผ์ฐ๋ ์คํธ๋ฆผ์ Source๊ฐ ์นดํ์นด ๋ฉ์์ง ๋ธ๋ก์ปค์ ์นดํ์นด์ ๋ฉ์์ง ํ ํฝ์ ๋งคํ๋๋ ๊ตฌ์ฑ์ ๋ณด์ฌ ์ค๋ค. ์ด ๊ตฌ์ฑ ์ ๋ณด๋ ์๋น์ค์ application.yml ํ์ผ์ด๋ ์คํ๋ง ํด๋ผ์ฐ๋ ์ปจํผ๊ทธ์์ ํด๋น ํ๊ฒฝ์ ๋ง๊ฒ ์ค์ ํ ์ ์๋ค.
spring:
application:
name: organizationservice
// ๊ฐ๊ฒฐํ ์ฝ๋๋ฅผ ์ํด ์ญ์
stream: ----stream.bindings๋ ์๋น์ค๊ฐ ์คํ๋ง ํด๋ผ์ฐ๋ ์คํธ๋ฆผ์ ๋ฉ์์ง ๋ธ๋ก์ปค์ ๋ฐํํ๋ ค๋ ๊ตฌ์ฑ์ ์์์ ์ด๋ค.
bindings:
output: ----output์ ์ฑ๋ ์ด๋ฆ์ด๋ฉฐ, ์ฝ๋ 8-2์์ ๋ณธ Source.output() ์ฑ๋์ ๋งคํ๋๋ค.
destination: orgChangeTopic ----orgChangeTopic์ ๋ฉ์์ง๋ฅผ ๋ฃ์ ๋ฉ์์ง ํ(๋๋ ํ ํฝ) ์ด๋ฆ์ด๋ค.
content-type: application/json ----content-type์ ์คํ๋ง ํด๋ผ์ฐ๋ ์คํธ๋ฆผ์ ์ก์์ ํ ๋ฉ์์ง ํ์
์ ์ ๋ณด๋ฅผ ์ ๊ณตํ๋ค(์ด ๊ฒฝ์ฐ๋ JSON์ด๋ค).
kafka: ----stream.bindings.kafka ํ๋กํผํฐ๋ ํด๋น ์๋น์ค๊ฐ ๋ฉ์์ง ๋ฒ์ค๋ก ์นดํ์นด๋ฅผ ์ฌ์ฉํ ๊ฒ์ด๋ผ๊ณ ์คํ๋ง์ ์ ๋ฌํ๋ค(๋์์ผ๋ก RabbitMQ๋ฅผ ์ฌ์ฉํ ์ ์๋ค).
binder:
zkNodes: localhost ---- zkNodes์ brokers ํ๋กํผํฐ๋ ์คํ๋ง ํด๋ผ์ฐ๋ ์คํธ๋ฆผ์ ์นดํ์นด์ ์ฃผํคํผ์ ๋คํธ์ํฌ ์์น๋ฅผ ์ ๋ฌํ๋ค.
brokers: localhost
spring.stream.bindings.output ๊ตฌ์ฑ ํ๋กํผํฐ๋ source.output()์ฑ๋์ ํต์ ํ๋ ค๋ ๋ฉ์์ง ๋ธ๋ก์ปค์ orgChangeTopic์ ๋งคํํ๋ค. ์ด ํ๋กํผํฐ๋ topic์ ์ ๋ฌํ๋ ๋ฉ์์ง๋ฅผ json์ผ๋ก ์ง๋ ฌํํ๋๋ก ์คํ๋ง ํด๋ผ์ฐ๋ ์คํธ๋ฆผ์ ์ค์ ํ๋ค. ์คํ๋ง ํด๋ผ์ฐ๋ ์คํธ๋ฆผ์ JSON ๋ฐ XML, ์ํ์น ์ฌ๋จ์ ์๋ธ๋ก ํฌ๋งท์ ํฌํจํ ๋ค์ํ ํฌ๋งท์ผ๋ก ๋ฉ์์ง๋ฅผ ์ง๋ ฌํํ ์ ์๋ค.
spring.stream.bindings.kafka ๊ตฌ์ฑ ํ๋กํผํฐ๋ ์คํ๋ง ํด๋ผ์ฐ๋ ์คํธ๋ฆผ์ด ์๋น์ค๋ฅผ ์นดํ์นด์ ๋ฐ์ธ๋ฉํ๋๋ก ์ค์ ํ๋ค. ํ์ ํ๋กํผํฐ๋ ์นดํ์นด ๋ฉ์์ง ๋ธ๋ก์ปค ๋ฐ ์นดํ์นด์ ํจ๊ป ์คํ๋๋ ์ํ์น ์ฃผํคํผ ์๋ฒ์ ๋คํธ์ํฌ ์ฃผ์๋ฅผ ์คํ๋ง ํด๋ผ์ฐ๋ ์คํธ๋ฆผ์ ์ค์ ํ๋ค.
์ด์ ์คํ๋ง ํด๋ผ์ฐ๋ ์คํธ๋ฆผ์ผ๋ก ๋ฉ์์ง๋ฅผ ๋ฐํํ๋ ์ฝ๋์ ์นดํ์นด๋ฅผ ๋ฉ์์ง ๋ธ๋ก์ปค๋ก ์ฌ์ฉํ๋๋ก ๊ตฌ์ฑํ์ผ๋ ์กฐ์ง ์๋น์ค์์๋ ์ค์ ๋ก ์ด๋์ ๋ฉ์์ง๊ฐ ๋ฐํ๋๋์ง ์ดํด๋ณด์. ์ด ์์ ์ OrganizationService.java ํด๋์ค์์ ์ด๋ฃจ์ด์ง๋ค.
package com.thoughtmechanix.organization.services;
// ๊ฐ๊ฒฐํ ์ฝ๋๋ฅผ ์ํด import ์ญ์
@Service
public class OrganizationService {
@Autowired private OrganizationRepository orgRepository;
@Autowired ----SimpleSourceBean์ OrganizationService์ ์ฃผ์
ํ๊ธฐ ์ํด ์คํ๋ง ์๋ ์ฐ๊ฒฐ์ ์ฌ์ฉํ๋ค.
SimpleSourceBean simpleSourceBean;
// ๊ฐ๊ฒฐํ ์ฝ๋๋ฅผ ์ํด ๋๋จธ์ง ์ญ์
public void saveOrg(Organization org) {
org.setId(UUID.randomUUID().toString());
orgRepository.save(org);
simpleSourceBean.publishOrgChange("SAVE", org.getId()); ----์กฐ์ง ๋ฐ์ดํฐ๋ฅผ ๋ณ๊ฒฝํ๋ ๋ฉ์๋ ๋ชจ๋ simpleSourceBean.publishOrgChange()๋ฅผ ํธ์ถํ๋ค.
}
}
Last updated
Was this helpful?