[MSA] SpringBoot + Kafka 를 이용하여 RestFul API 구현
해당글에 앞서 백킹서비스 개념을 먼저 알고 가는게 좋다
https://ssmyefrin.tistory.com/56
카프카를 설치했으니, 이제 Spring Boot 를 연동하여 구현해보자.
✔ 실행환경
- MacOS(Intel)
- Kafka + Zookeeper (Docker)
- IntelliJ + Gradle + SpringBoot 3.1.5 + JDK17
- Spring Cloud Eureka 적용
- WebClient 비동기통신
- 카카오모먼트 API
✔ Description
나는 기존에 했던 프로젝트에서 리펙토링을 한다고 생각하고 구성을 했다.
카카오 매체에 내가 원하는 광고를 집행시키려면 캠페인,광고그룹,광고를 등록하는 과정을 거쳐야 한다. (이개념은 그냥이렇다 하고 넘어가자)
그리고 광고그룹을 등록시 캠페인ID 가 필요하고, 광고를 등록시엔 광고그룹ID 가 필요하다.(앞단계의 결과값이 필요함)
카카오모먼트는 API 초당 콜수 제한이 있기에 메시지 큐(Redis)서버를 이용하여 대기열에 쌓아두고 배치를 돌려 상호작용하였다.
✔ 전제 : 카카오모먼트 API 콜수제한이 없다고 가정
기존프로젝트 => 쓰로틀링 개선(큐서버 상호작용)
카카오모먼트 : 캠페인등록 -> 광고그룹등록 -> 광고그룹등록
기존통신모듈 : WebClient.block() 동기모듈
✔ Repactoring
쓰로틀링 => 카프카 메시지브로커
카카오모먼트 : 캠페인 Topic / 광고그룹 Topic
캠페인 등록후 결과(캠페인ID)를 발행 -> 구독하고 있던 그룹서비스가 ID 받아서 광고그룹등록 (...)
통신모듈 : WebClient 비동기
✔ 발행(Pub)서버 구축
1. 의존성추가
implementation 'org.springframework.boot:spring-boot-starter-web'
// for apache kafka
implementation 'org.springframework.kafka:spring-kafka'
implementation 'org.springframework.boot:spring-boot-starter-webflux'
2. Properties 설정(application.yml)
server:
port: 8000
spring:
kafka:
bootstrap-servers: localhost:9092
서버포트와 카프카서버(9092) 쪽 설정해준다.
3. KafkaConfig 설정
@EnableKafka // @KafkaListener 사용을 위한 설정
@Configuration
public class KafkaConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
// ------------------------ Publish 설정 -------------------------------------
/**
* 캠페인토픽
* @return
*/
@Bean
public NewTopic campaignTopic() {
return new NewTopic("campaign", 1, (short) 1);
}
/**
* 광고그룹토픽
* @return
*/
@Bean
public NewTopic adGroupTopic() {
return new NewTopic("adgroup", 1, (short) 1);
}
/**
* 광고토픽
* @return
*/
@Bean
public NewTopic adTopic() {
return new NewTopic("ad", 1, (short) 1);
}
/**
* 카프카 데이터전송에 필요한 설정
* @return
*/
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
/**
* 카프카 데이터를 전송을 위한 템플릿생성
* @return
*/
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
토픽생성 및 전송을 위한 설정 생성(데이터직렬화를 위한 Factory 생성, 템플릿등)
4. 컨트롤러 생성
@RestController
@RequiredArgsConstructor
@RequestMapping("campaign/kakao")
@Slf4j
public class CampaignProducerController {
private final KafkaTemplate<String, String> kafkaTemplate;
private final NewTopic campaignTopic;
private final KakaoService kakaoService;
private final SendService sendService;
@GetMapping("/search")
public ResponseEntity<?> publishKakaoCampaign(){
log.info("[카카오] 캠페인조회");
Mono<String> monoResult = kakaoService.getCampaignList();
kakaoService.campaignSubscribe(monoResult);
return ResponseEntity.ok().body(null);
}
@GetMapping("/regist")
public ResponseEntity<?> publicKakaoRegist() {
log.info("[카카오] 캠페인등록");
Mono<String> monoResult = kakaoService.campaignRegist();
kakaoService.campaignSubscribe(monoResult);
return ResponseEntity.ok().body(null);
}
}
5. 서비스 생성
@Service
@RequiredArgsConstructor
@Slf4j
public class KakaoService {
private final WebClientConfig webClientConfig;
private final SendService sendService;
/**
* 캠페인조회
* @return
*/
public Mono<String> getCampaignList(){
Mono<String> mono = null;
try{
mono = webClientConfig.getKakaoCampaignList();
} catch (Exception e){
log.error("[카카오 API] 캠페인조회 EXCEPTION:"+e.getMessage(),e);
}
return mono;
}
/**
* 캠페인생성
* @return
*/
public Mono<String> campaignRegist() {
Mono<String> mono = null;
try{
JSONObject param = new JSONObject();
param.put("name","20231127_디스플레이방문");
JSONObject goal = new JSONObject();
goal.put("campaignType","DISPLAY");
goal.put("goal","VISITING");
param.put("campaignTypeGoal", goal);
mono = webClientConfig.createKakaoCampaign(param);
} catch (Exception e){
log.error("[카카오 API] 캠페인등록 EXCEPTION:"+e.getMessage(),e);
}
return mono;
}
/**
* 비동기작업콜백
* @param monoResult
*/
public void campaignSubscribe(Mono<String> monoResult){
monoResult.subscribe(
// 비동기 작업이 성공적으로 완료된 경우 실행되는 코드
result -> {
log.info("[카카오 API] Result => {}",result);
// 카프카로 메시지전송
sendService.kafkaSend(result);
},
// 에러가 발생한 경우 실행되는 코드
error -> {
log.error("[카카오 API] Exception:"+error.getMessage());
},
// 비동기 작업이 완료되었을 때 실행되는 코드
() -> {
log.info("[카카오 API] Successful Completed");
}
);
}
}
비동기는 Try-Catch 로는 예외를 잡을수 없다. 서로 다른스레드에서 작업중이므로 캐치하지못한다.
따라서 따로 Error 함수에서 처리해야한다. (그 외의 다른 함수에서 처리되는것들을 잡으려면 쓰긴해야함)
6. 카프카로 메시지전송
@Slf4j
@Service
@RequiredArgsConstructor
public class SendService {
private final KafkaTemplate<String, String> kafkaTemplate;
private final NewTopic campaignTopic;
/**
* 카프카로 메시지전송
* @param obj
*/
public void kafkaSend(Object obj){
CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(campaignTopic.name(), obj.toString());
future.whenComplete((result, ex) -> {
if (ex == null) {
log.info("[카카오] 카프카 Pub Sent message=[" + obj.toString() + "] with offset=[" + result.getRecordMetadata().offset() + "]");
} else {
log.error("[카카오] 카프카 Pub Unable to send message=[" + obj.toString() + "] due to : " + ex.getMessage(), ex);
}
});
}
실행하면, 토픽이 3개 생성되어있다. (캠페인,광고그룹,광고)
✔ Consumer
1. 컨슈머 Config 설정
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Value("${spring.kafka.my.push.topic.name}")
private String groupId;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "todo");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
2. Properties 설정(application.yml)
server:
port: 8002
spring:
application:
name: backend-service
kafka:
bootstrap-servers: localhost:9092
my:
push:
topic:
name: campaign
eureka:
client:
serviceUrl:
defaultZone: http://localhost:8761/eureka
참고로 유레카서버를 이용하여 Gateway 설정을 하였다.
3. Kafka 리스너 설정
@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaMessageListener {
private final KakaoGroupService kakaoGroupService;
@KafkaListener(topics = "${spring.kafka.my.push.topic.name}", groupId = "todo")
public void kakaoConsumer(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION) int partition){
log.info("[카카오 컨슈머 message]: {} from partition: {}", message, partition);
JSONObject param = ConvertUtils.strToJson(message);
if(!ObjectUtils.isEmpty(param)) {
kakaoGroupService.registAdGroup(param);
}
}
}
4. 광고그룹등록 서비스 등록
@Service
@RequiredArgsConstructor
@Slf4j
public class KakaoGroupService {
private final WebClientConfig webClientConfig;
/**
* 광고그룹등록
* @param obj
*/
public void registAdGroup(JSONObject obj) {
log.info("[카카오 API] 광고그룹등록");
JSONObject param = new JSONObject();
JSONObject campaignObj = new JSONObject();
JSONObject targetObj = new JSONObject();
JSONObject dateObj = new JSONObject();
String[] place = new String[]{"KAKAO_SERVICE", "KAKAO_TALK"};
String[] deviceType = new String[]{"ANDROID", "IOS"};
campaignObj.put("id", obj.getInt("id"));
param.put("campaign",campaignObj);
param.put("name", "20231127_광고그룹");
param.put("placements", place);
param.put("allAvailableDeviceType", false);
param.put("allAvailablePlacement", true);
param.put("deviceTypes", deviceType);
targetObj.put("ageType", "ALL");
targetObj.put("genderType", "ALL");
targetObj.put("locationType", "ALL");
param.put("targeting", targetObj);
param.put("adult", false);
param.put("dailyBudgetAmount", 50000);
param.put("bidStrategy", "AUTOBID");
param.put("pricingType", "CPC");
param.put("bidAmount", 0);
param.put("pacing", "NONE");
param.put("pricingType", "CPC");
dateObj.put("beginDate", "2023-12-01");
dateObj.put("lateNight", true);
dateObj.put("detailTime", false);
param.put("schedule", dateObj);
adGroupSubscribe(webClientConfig.createKakaoAdGroup(param));
}
/**
* WebClient Nonblocking
* @param monoResult
*/
public void adGroupSubscribe(Mono<String> monoResult){
monoResult.subscribe(
// 비동기 작업이 성공적으로 완료된 경우 실행되는 코드
result -> {
log.info("[카카오 API] Result => {}",result);
// 광고 카프카로 메시지전송
},
// 에러가 발생한 경우 실행되는 코드
error -> {
// WebClientResponseException 예외 처리(카카오API)
if (error instanceof WebClientResponseException) {
WebClientResponseException we = (WebClientResponseException) error;
log.error("[카카오 API] WEBCLIENT EXCEPTION:"+we.getRequest().getURI()+"] : "+we.getStatusCode()+"/"+we.getResponseBodyAsString(Charset.forName("UTF-8")),we);
} else
// 다른 예외 처리
log.error("[카카오 API] EXCEPTION:"+error.getMessage());
},
// 비동기 작업이 완료되었을 때 실행되는 코드
() -> {
log.info("[카카오 API] Successful Completed");
}
);
}
}
5. 테스트
Publisher
2023-11-27T16:19:19.812+09:00 INFO 6468 --- [nio-8000-exec-6] c.m.k.c.CampaignProducerController : [카카오] 캠페인등록
2023-11-27T16:19:20.971+09:00 INFO 6468 --- [tor-http-nio-10] com.msa.kafka.service.KakaoService : [카카오 API] Result => {"id":,"name":"20231127_디스플레이방문","campaignTypeGoal":{"campaignType":"DISPLAY","goal":"VISITING"},"objective":null,"dailyBudgetAmount":null,"config":"ON","statusDescription":"운영중","trackId":null,"adAccountId":,"status":["READY"],"systemConfig":"ON","isDailyBudgetAmountOver":false}
2023-11-27T16:19:20.976+09:00 INFO 6468 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Resetting the last seen epoch of partition campaign-0 to 0 since the associated topicId changed from null to kt18OAhrTXqCV5MBN7cixw
2023-11-27T16:19:20.979+09:00 INFO 6468 --- [tor-http-nio-10] com.msa.kafka.service.KakaoService : [카카오 API] Successful Completed
2023-11-27T16:19:20.986+09:00 INFO 6468 --- [ad | producer-1] com.msa.kafka.service.SendService : [카카오] 카프카 Pub Sent message=[{"id":,"name":"20231127_디스플레이방문","campaignTypeGoal":{"campaignType":"DISPLAY","goal":"VISITING"},"objective":null,"dailyBudgetAmount":null,"config":"ON","statusDescription":"운영중","trackId":null,"adAccountId":,"status":["READY"],"systemConfig":"ON","isDailyBudgetAmountOver":false}] with offset=[16]
캠페인 ID 를 가지고 카프카로 전송
Consumer
[카카오 컨슈머 message]: {"id":,"name":"20231127_디스플레이방문","campaignTypeGoal":{"campaignType":"DISPLAY","goal":"VISITING"},"objective":null,"dailyBudgetAmount":null,"config":"ON","statusDescription":"운영중","trackId":null,"adAccountId":390257,"status":["READY"],"systemConfig":"ON","isDailyBudgetAmountOver":false} from partition: 0
2023-11-27T16:19:21.005+09:00 INFO 16299 --- [ntainer#0-0-C-1] com.msa.todo.service.KakaoGroupService : [카카오 API] 광고그룹등록
2023-11-27T16:19:21.005+09:00 DEBUG 16299 --- [ntainer#0-0-C-1] com.msa.todo.common.HttpWebClient : [KAKAO_Request_Param]:{"bidStrategy":"AUTOBID","allAvailableDeviceType":false,"dailyBudgetAmount":50000,"allAvailablePlacement":true,"bidAmount":0,"placements":["KAKAO_SERVICE","KAKAO_TALK"],"schedule":{"beginDate":"2023-12-01","detailTime":false,"lateNight":true},"targeting":{"locationType":"ALL","ageType":"ALL","genderType":"ALL"},"deviceTypes":["ANDROID","IOS"],"pacing":"NONE","name":"20231127_광고그룹","campaign":{"id":},"adult":false,"pricingType":"CPC"}
2023-11-27T16:19:23.045+09:00 INFO 16299 --- [ctor-http-nio-2] com.msa.todo.service.KakaoGroupService : [카카오 API] Successful Completed
ID 를 받아 광고그룹을 등록한다.
참고 :
https://backendcode.tistory.com/209
https://velog.io/@_koiil/Spring-Component%EC%99%80-Repository-Service-Controller
https://victorydntmd.tistory.com/343
https://americanopeople.tistory.com/417
https://developer-syubrofo.tistory.com/150