본문 바로가기
실습/리눅스 서버 + 스프링 부트

07. Kafka_Pub&Con

by 이민우 2021. 5. 30.
728x90
반응형

 

Kafka란 이전 포스트에서 작성한 RabbitMQ와 마찬가지로 메시지를 보내는 시스템이다.

 

Kafka는 아파치 소프트웨어 재단이 스칼라로 개발한 오픈소스 메시지 브로커로, 분삭 환경에 특화되어 있으며 빠른 속도를 자랑한다.

Kafka는 다음 요소로 구성되어 있다.

  • Broker : Kafka 서버
  • Topic : 메시지가 생산되고 소비되는 주체
  • Partition : 토픽 내에서 메시지가 분산되어 저장되는 단위
  • Zookeeper : 여러 Broker의 분산 메시지 큐의 정보 관리

 

자세한 내용은 다음 포스트에 작성해놓았다.

 

해당 포스팅에서 해볼 것은 RabbitMQ와 마찬가지로 Kafka를 사용한 생성자, 구독자 만들기이다.

 

RabbitMQ에서 했던 것처럼 별도로 웹 UI에 접속해 설정해야할 부분은 없다.

그냥 application.properties를 설정하고 Consumer, Controller, Publisher 클래스 정도만 생성할 예정이다.

 

 

 

먼저 applications.properties는 다음과 같이 설정한다.

 

다음은 RabbitMQ의 클래스들을 카피하여 Kafka 전용 Publisher, Controller, Subscriber을 제작한다.

 

KafkaPublisher.java

package kr.co.opensource.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;

import lombok.extern.slf4j.Slf4j;

@Slf4j
@Service
public class KafkaPublisher {
	
	@Autowired private KafkaTemplate<String, String> kafkaTemplate;
	private final String topic = "test_topic";
	
	public boolean publish(String msg) {
		
		log.info("KafkaPublisher.publish publish message : {}", msg);
		
		try {
			ListenableFuture<SendResult<String, String>> listenable = kafkaTemplate.send(topic, msg);
		}
		catch (Exception e) {
			log.error("KafkaPublisher.publish can't publsh message : {}", e.getMessage());
			return false;
		}
		
		log.info("KafkaPublisher.publish successfully published message.");
		return true;
	}
	
}

 

KafkaController.java

package kr.co.opensource.kafka;

import java.nio.charset.Charset;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

import kr.co.opensource.ResponseMessage;
import kr.co.opensource.ResponseVO;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@RestController
public class KafkaController {

	@Autowired KafkaPublisher publisher;
	
	@PostMapping("/kafka")
	public ResponseEntity<ResponseVO> publish(@RequestBody(required=true) String message) {
		log.info("KafkaController.publish init");

		ResponseVO msg = new ResponseVO();
		HttpHeaders headers = new HttpHeaders();
		headers.setContentType(new MediaType("application", "json", Charset.forName("UTF-8")));
		
		boolean result = publisher.publish(message);
		
		if(!result) {
			msg.setStatus(HttpStatus.BAD_REQUEST);
			msg.setMessage(ResponseMessage.INTERNAL_ERROR);
			return new ResponseEntity<ResponseVO>(msg, headers, HttpStatus.METHOD_NOT_ALLOWED);
		}
		else {
			msg.setStatus(HttpStatus.OK);
			msg.setMessage(ResponseMessage.OK);
			msg.setData(message);
			return new ResponseEntity<ResponseVO>(msg, headers, HttpStatus.OK);
		}
		
	}
}

 

KafkaSubscriber.java

package kr.co.opensource.kafka;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

@Slf4j
@Component
public class KafkaSubscriber {
	
	private final String topic = "test_topic";
	
	@KafkaListener(topics = topic)
	public void recieveMessage(String msg) {
		
		log.info("KafkaSubscriber.revieveMessage recieved Kafka message : {}", msg);

	}
}

 

 

코딩 완료 후 POST 메시지를 보내보면 아래와 같이 잘 동작함을 확인할 수 있다.

 

 

728x90
반응형

'실습 > 리눅스 서버 + 스프링 부트' 카테고리의 다른 글

09. MariaDB CRUD_UI  (0) 2021.09.18
08. HAProxy  (0) 2021.05.30
06. RabbitMQ_Con&Pro  (0) 2021.05.30
05. RestfulAPI_Redis CRUD  (0) 2021.05.23
04. RestfulAPI_MongoDB  (0) 2021.05.23