web/SpringBoot

[MSA] SpringBoot + Kafka 를 이용하여 RestFul API 구현

뽀리님 2023. 11. 27. 18:20

해당글에 앞서 백킹서비스 개념을 먼저 알고 가는게 좋다

https://ssmyefrin.tistory.com/56

 

[MSA] Backing service - MOM

✔ Backing service Backing Service란, 어플리케이션이 실행되는 가운데 네트워크를 통해서 사용할 수 있는 모든 서비스를 말하며 My SQL과 같은 데이터베이스, 캐쉬 시스템, SMTP 서비스 등 어플리케이션

ssmyefrin.tistory.com

 

카프카를 설치했으니, 이제 Spring Boot 를 연동하여 구현해보자.

 

 실행환경

- MacOS(Intel)
- Kafka + Zookeeper (Docker)
- IntelliJ + Gradle + SpringBoot 3.1.5 + JDK17
- Spring Cloud Eureka 적용
- WebClient 비동기통신
- 카카오모먼트 API

 

 

 Description

나는 기존에 했던 프로젝트에서 리펙토링을 한다고 생각하고 구성을 했다.

카카오 매체에 내가 원하는 광고를 집행시키려면 캠페인,광고그룹,광고를 등록하는 과정을 거쳐야 한다. (이개념은 그냥이렇다 하고 넘어가자)

그리고 광고그룹을 등록시 캠페인ID 가 필요하고, 광고를 등록시엔 광고그룹ID 가 필요하다.(앞단계의 결과값이 필요함)

 

카카오모먼트는 API 초당 콜수 제한이 있기에 메시지 큐(Redis)서버를 이용하여 대기열에 쌓아두고 배치를 돌려 상호작용하였다.

발그림ㅠ ㅠ

 

 

 전제 :  카카오모먼트 API 콜수제한이 없다고 가정

기존프로젝트 => 쓰로틀링 개선(큐서버 상호작용)

카카오모먼트 : 캠페인등록 -> 광고그룹등록 ->  광고그룹등록

기존통신모듈 : WebClient.block() 동기모듈

 

 

 

 Repactoring

쓰로틀링 => 카프카 메시지브로커

카카오모먼트 : 캠페인 Topic / 광고그룹 Topic 

캠페인 등록후 결과(캠페인ID)를 발행 -> 구독하고 있던 그룹서비스가 ID 받아서 광고그룹등록 (...)

통신모듈 : WebClient 비동기

 

 ✔  발행(Pub)서버 구축

1. 의존성추가

implementation 'org.springframework.boot:spring-boot-starter-web'
// for apache kafka
implementation 'org.springframework.kafka:spring-kafka'
implementation 'org.springframework.boot:spring-boot-starter-webflux'

 

2. Properties 설정(application.yml)

server:
  port: 8000
spring:
  kafka:
    bootstrap-servers: localhost:9092

 서버포트와 카프카서버(9092) 쪽 설정해준다.

 

3. KafkaConfig 설정

@EnableKafka  // @KafkaListener 사용을 위한 설정
@Configuration
public class KafkaConfig {
    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }

    // ------------------------ Publish 설정 -------------------------------------
    /**
     * 캠페인토픽
     * @return
     */
    @Bean
    public NewTopic campaignTopic() {
        return new NewTopic("campaign", 1, (short) 1);
    }

    /**
     * 광고그룹토픽
     * @return
     */
    @Bean
    public NewTopic adGroupTopic() {
        return new NewTopic("adgroup", 1, (short) 1);
    }

    /**
     * 광고토픽
     * @return
     */
    @Bean
    public NewTopic adTopic() {
        return new NewTopic("ad", 1, (short) 1);
    }

    /**
     * 카프카 데이터전송에 필요한 설정
     * @return
     */
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    /**
     * 카프카 데이터를 전송을 위한 템플릿생성
     * @return
     */
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

토픽생성 및 전송을 위한 설정 생성(데이터직렬화를 위한 Factory 생성, 템플릿등)

 

4. 컨트롤러 생성

@RestController
@RequiredArgsConstructor
@RequestMapping("campaign/kakao")
@Slf4j
public class CampaignProducerController {
    private final KafkaTemplate<String, String> kafkaTemplate;
    private final NewTopic campaignTopic;
    private final KakaoService kakaoService;
    private final SendService sendService;

    @GetMapping("/search")
    public ResponseEntity<?> publishKakaoCampaign(){
        log.info("[카카오] 캠페인조회");
        Mono<String> monoResult = kakaoService.getCampaignList();
        kakaoService.campaignSubscribe(monoResult);
        return ResponseEntity.ok().body(null);
    }

    @GetMapping("/regist")
    public ResponseEntity<?> publicKakaoRegist() {
        log.info("[카카오] 캠페인등록");
        Mono<String> monoResult = kakaoService.campaignRegist();
        kakaoService.campaignSubscribe(monoResult);
        return ResponseEntity.ok().body(null);
    }
}

 

5. 서비스 생성

@Service
@RequiredArgsConstructor
@Slf4j
public class KakaoService {
    private final WebClientConfig webClientConfig;
    private final SendService sendService;

    /**
     * 캠페인조회
     * @return
     */
    public Mono<String> getCampaignList(){
        Mono<String> mono = null;
        try{
            mono = webClientConfig.getKakaoCampaignList();
        }  catch (Exception e){
            log.error("[카카오 API] 캠페인조회 EXCEPTION:"+e.getMessage(),e);
        }
        return mono;
    }

    /**
     * 캠페인생성
     * @return
     */
    public Mono<String> campaignRegist() {
        Mono<String> mono = null;
        try{
            JSONObject param = new JSONObject();
            param.put("name","20231127_디스플레이방문");
            JSONObject goal = new JSONObject();
            goal.put("campaignType","DISPLAY");
            goal.put("goal","VISITING");
            param.put("campaignTypeGoal", goal);
            mono = webClientConfig.createKakaoCampaign(param);
        }  catch (Exception e){
            log.error("[카카오 API] 캠페인등록 EXCEPTION:"+e.getMessage(),e);
        }
        return mono;
    }

    /**
     * 비동기작업콜백
     * @param monoResult
     */
    public void campaignSubscribe(Mono<String> monoResult){
        monoResult.subscribe(
                // 비동기 작업이 성공적으로 완료된 경우 실행되는 코드
                result -> {
                    log.info("[카카오 API] Result => {}",result);
                    // 카프카로 메시지전송
                    sendService.kafkaSend(result);
                },
                // 에러가 발생한 경우 실행되는 코드
                error -> {
                    log.error("[카카오 API] Exception:"+error.getMessage());
                },
                // 비동기 작업이 완료되었을 때 실행되는 코드
                () -> {
                    log.info("[카카오 API] Successful Completed");
                }
        );
    }
}

비동기는 Try-Catch 로는 예외를 잡을수 없다.  서로 다른스레드에서 작업중이므로 캐치하지못한다.

따라서 따로 Error 함수에서 처리해야한다. (그 외의 다른 함수에서 처리되는것들을 잡으려면 쓰긴해야함)

 

6. 카프카로 메시지전송

@Slf4j
@Service
@RequiredArgsConstructor
public class SendService {
    private final KafkaTemplate<String, String> kafkaTemplate;
    private final NewTopic campaignTopic;

    /**
     * 카프카로 메시지전송
     * @param obj
     */
    public void kafkaSend(Object obj){
        CompletableFuture<SendResult<String, String>> future = kafkaTemplate.send(campaignTopic.name(), obj.toString());
        future.whenComplete((result, ex) -> {
            if (ex == null) {
                log.info("[카카오] 카프카 Pub Sent message=[" + obj.toString() + "] with offset=[" + result.getRecordMetadata().offset() + "]");
            } else {
                log.error("[카카오] 카프카 Pub Unable to send message=[" +  obj.toString() + "] due to : " + ex.getMessage(), ex);
            }
        });
    }

 

 

실행하면, 토픽이 3개 생성되어있다. (캠페인,광고그룹,광고)

 

 

✔  Consumer

 

1. 컨슈머 Config 설정

@EnableKafka
@Configuration
public class KafkaConsumerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Value("${spring.kafka.my.push.topic.name}")
    private String groupId;

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "todo");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

 

 

2. Properties 설정(application.yml)

server:
  port: 8002

spring:
  application:
    name: backend-service
  kafka:
    bootstrap-servers: localhost:9092
    my:
      push:
        topic:
          name: campaign
eureka:
  client:
    serviceUrl:
      defaultZone: http://localhost:8761/eureka

참고로 유레카서버를 이용하여 Gateway 설정을 하였다.

 

3. Kafka 리스너 설정

@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaMessageListener {
    private final KakaoGroupService kakaoGroupService;

    @KafkaListener(topics = "${spring.kafka.my.push.topic.name}", groupId = "todo")
    public void kakaoConsumer(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION) int partition){
        log.info("[카카오 컨슈머 message]: {} from partition: {}", message, partition);
        JSONObject param = ConvertUtils.strToJson(message);
        if(!ObjectUtils.isEmpty(param)) {
            kakaoGroupService.registAdGroup(param);
        }
    }
}

 

 

4. 광고그룹등록 서비스 등록

@Service
@RequiredArgsConstructor
@Slf4j
public class KakaoGroupService {
    private final WebClientConfig webClientConfig;

    /**
     * 광고그룹등록
     * @param obj
     */
    public void registAdGroup(JSONObject obj) {
        log.info("[카카오 API] 광고그룹등록");
        JSONObject param = new JSONObject();
        JSONObject campaignObj = new JSONObject();
        JSONObject targetObj = new JSONObject();
        JSONObject dateObj = new JSONObject();
        String[] place = new String[]{"KAKAO_SERVICE", "KAKAO_TALK"};
        String[] deviceType = new String[]{"ANDROID", "IOS"};
        campaignObj.put("id", obj.getInt("id"));
        param.put("campaign",campaignObj);
        param.put("name", "20231127_광고그룹");
        param.put("placements", place);
        param.put("allAvailableDeviceType", false);
        param.put("allAvailablePlacement", true);
        param.put("deviceTypes", deviceType);
        targetObj.put("ageType", "ALL");
        targetObj.put("genderType", "ALL");
        targetObj.put("locationType", "ALL");
        param.put("targeting", targetObj);
        param.put("adult", false);
        param.put("dailyBudgetAmount", 50000);
        param.put("bidStrategy", "AUTOBID");
        param.put("pricingType", "CPC");
        param.put("bidAmount", 0);
        param.put("pacing", "NONE");
        param.put("pricingType", "CPC");
        dateObj.put("beginDate", "2023-12-01");
        dateObj.put("lateNight", true);
        dateObj.put("detailTime", false);
        param.put("schedule", dateObj);
        adGroupSubscribe(webClientConfig.createKakaoAdGroup(param));
    }

    /**
     * WebClient Nonblocking
     * @param monoResult
     */
    public void adGroupSubscribe(Mono<String> monoResult){
        monoResult.subscribe(
                // 비동기 작업이 성공적으로 완료된 경우 실행되는 코드
                result -> {
                    log.info("[카카오 API] Result => {}",result);
                    // 광고 카프카로 메시지전송
                },
                // 에러가 발생한 경우 실행되는 코드
                error -> {
                    // WebClientResponseException 예외 처리(카카오API)
                    if (error instanceof WebClientResponseException) {
                        WebClientResponseException we = (WebClientResponseException) error;
                        log.error("[카카오 API] WEBCLIENT EXCEPTION:"+we.getRequest().getURI()+"] : "+we.getStatusCode()+"/"+we.getResponseBodyAsString(Charset.forName("UTF-8")),we);
                    } else
                        // 다른 예외 처리
                        log.error("[카카오 API] EXCEPTION:"+error.getMessage());
                },
                // 비동기 작업이 완료되었을 때 실행되는 코드
                () -> {
                    log.info("[카카오 API] Successful Completed");
                }
        );
    }
}

 

 

5. 테스트

 

Publisher

2023-11-27T16:19:19.812+09:00  INFO 6468 --- [nio-8000-exec-6] c.m.k.c.CampaignProducerController       : [카카오] 캠페인등록
2023-11-27T16:19:20.971+09:00  INFO 6468 --- [tor-http-nio-10] com.msa.kafka.service.KakaoService       : [카카오 API] Result => {"id":,"name":"20231127_디스플레이방문","campaignTypeGoal":{"campaignType":"DISPLAY","goal":"VISITING"},"objective":null,"dailyBudgetAmount":null,"config":"ON","statusDescription":"운영중","trackId":null,"adAccountId":,"status":["READY"],"systemConfig":"ON","isDailyBudgetAmountOver":false}
2023-11-27T16:19:20.976+09:00  INFO 6468 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Resetting the last seen epoch of partition campaign-0 to 0 since the associated topicId changed from null to kt18OAhrTXqCV5MBN7cixw
2023-11-27T16:19:20.979+09:00  INFO 6468 --- [tor-http-nio-10] com.msa.kafka.service.KakaoService       : [카카오 API] Successful Completed
2023-11-27T16:19:20.986+09:00  INFO 6468 --- [ad | producer-1] com.msa.kafka.service.SendService        : [카카오] 카프카 Pub Sent message=[{"id":,"name":"20231127_디스플레이방문","campaignTypeGoal":{"campaignType":"DISPLAY","goal":"VISITING"},"objective":null,"dailyBudgetAmount":null,"config":"ON","statusDescription":"운영중","trackId":null,"adAccountId":,"status":["READY"],"systemConfig":"ON","isDailyBudgetAmountOver":false}] with offset=[16]

캠페인 ID 를 가지고 카프카로 전송

 

 

Consumer

[카카오 컨슈머 message]: {"id":,"name":"20231127_디스플레이방문","campaignTypeGoal":{"campaignType":"DISPLAY","goal":"VISITING"},"objective":null,"dailyBudgetAmount":null,"config":"ON","statusDescription":"운영중","trackId":null,"adAccountId":390257,"status":["READY"],"systemConfig":"ON","isDailyBudgetAmountOver":false} from partition: 0
2023-11-27T16:19:21.005+09:00  INFO 16299 --- [ntainer#0-0-C-1] com.msa.todo.service.KakaoGroupService   : [카카오 API] 광고그룹등록
2023-11-27T16:19:21.005+09:00 DEBUG 16299 --- [ntainer#0-0-C-1] com.msa.todo.common.HttpWebClient        : [KAKAO_Request_Param]:{"bidStrategy":"AUTOBID","allAvailableDeviceType":false,"dailyBudgetAmount":50000,"allAvailablePlacement":true,"bidAmount":0,"placements":["KAKAO_SERVICE","KAKAO_TALK"],"schedule":{"beginDate":"2023-12-01","detailTime":false,"lateNight":true},"targeting":{"locationType":"ALL","ageType":"ALL","genderType":"ALL"},"deviceTypes":["ANDROID","IOS"],"pacing":"NONE","name":"20231127_광고그룹","campaign":{"id":},"adult":false,"pricingType":"CPC"}
2023-11-27T16:19:23.045+09:00  INFO 16299 --- [ctor-http-nio-2] com.msa.todo.service.KakaoGroupService   : [카카오 API] Successful Completed

ID 를 받아 광고그룹을 등록한다.

 

 

 

 

참고 :

https://backendcode.tistory.com/209

https://velog.io/@_koiil/Spring-Component%EC%99%80-Repository-Service-Controller

 

 

https://victorydntmd.tistory.com/343

https://tech.kakaopay.com/post/bff_webflux_coroutine/#%EB%8B%A4%EC%96%91%ED%95%9C-%ED%99%98%EA%B2%BD%EC%97%90%EC%84%9C-%EA%B5%AC%EB%8F%99%EB%90%9C%EB%8B%A4%EB%A9%B4-graphql-%EA%B3%A0%EB%A0%A4%ED%95%98%EA%B8%B0

https://americanopeople.tistory.com/417

 

https://developer-syubrofo.tistory.com/150

https://erjuer.tistory.com/89

https://velog.io/@18k7102dy/%EC%9C%84%EB%93%9C%EB%A7%88%EC%BC%93-%EA%B0%9C%EB%B0%9C%EA%B8%B0-%EB%A6%AC%EB%B7%B0-%EC%9E%91%EC%84%B1%EC%9D%84-%EC%B2%98%EB%A6%AC%ED%95%98%EB%8A%94%EA%B2%8C-%EC%9D%B4%EB%A0%87%EA%B2%8C-%EC%96%B4%EB%A0%B5%EB%8B%A4%EA%B3%A0