Search

RabbitMQ vs Kafka

생성일
2025/04/22 14:59
태그
관련 개념
상태
완료

RabbitMQ

RabbitMQ란 Erlang으로 AMQP를 구현한 오픈소스 메세지 브로커 시스템이다.

AMQP란?

AMQP(Advanced Message Queuing Protocol)는 메세지 지향 미들웨어(MOM, Message Oriented Middleware) 시스템 간의 통신하기 위한 개방형 네트워크 프로토콜로, 메세징 처리 방식을 지원하는 여러 시스템이나 미들웨어 사이의 통신 규약이다.
AMQP가 등장하기 전 기존의 여러 상용화된 Message Broker 제품들은 대부분 플랫폼에 종속적인 제품이라서, 서로 다른 제품 간에 메세지를 교환하기 위해서는 메세지 브리지를 통해 메세지 포맷 컨버전을 수행하거나 제품을 통일시켜야 한다는 불편함과 비효율적이라는 문제가 있었다. 이러한 문제를 해결하기 위해 MQ를 사용하는 서로 다른 시스템 간에 효율적으로 메세지를 교환하기 위해 규정한 통신 규약이 AMQP이다.
간단히 말해, AMQP는 송신자(Producer)와 수신자(Consumer) 사이에서 메세지를 안전하게 교환하는 표준 프로토콜이다.
AMQP의 규약은 대략 아래와 같다.
모든 Broker는 동일한 방식으로 동작할 것
모든 Client는 동일한 방식으로 동작할 것
네트워크 상으로 전송되는 명령어의 표준화
프로그래밍 언어 중립적

AMQP 장단점

MSA나 분산 시스템에서는 여러 서버들이 서로 메세지를 주고 받아야하는 상황이 발생한다. HTTP 통신으로 이를 수행하면, 요청에 대한 응답을 받을 때까지 서버의 수행이 중단된다. 이러한 상황에서 AMQP를 지키는 Message Broker를 활용해 비동기 방식으로 메세지를 전달할 수 있다. 이를 통해 시스템 간 통신을 향상시키고, 확장성 있는 아키텍처를 구축하는데 유용하다. 또한 메세지를 정상적으로 받았는지 확인하지 않기 때문에, 서버 간의 결합성이 감소한다.
하지만 메세지 전달 과정에서의 약간의 성능 오버헤드가 발생하고, AMQP 제품이더라도 버전에 따라 호환되지 않을수도 있다. AMQP는 서로 다른 시스템 간의 상호 운용성 제공을 목표로하지만, 일부 Message Broker나 Client는 AMQP 사양을 완전히 준수하지는 않아서 호환이 되지 않기도 한다.

AMQP 구현 제품

이러한 AMQP를 지키며 작성된 Message Broker는 다음과 같다.
RabbitMQ
OpenAMQ
StromMQ
Apache Qpid

AMQP Routing model

AMQP의 라우팅 모델은 다음과 같이 구성된다.
Exchange
Publisher로부터 수신한 메세지를 적절한 메세지 큐나 다른 exchange로 분배하는 라우팅 기능을 수행한다.
각 메세지 큐나 exchange는 라우팅 되어 메세지를 전달받기 위해, exchange에 binding되어 있다.
exchange에서 binding된 메세지 큐나 다른 exchange를 찾기 위한 알고리즘을 표준화하여 정의한 것을 Standard Exchange Type이라 한다.
Broker는 여러 개의 exchange 인스턴스를 가질 수 있다.
Message Queue
메세지를 수신하여 메모리나 디스크에 저장하고, 이후 consumer에게 전달하는 기능을 수행한다.
각 메세지 큐는 binding을 통해 구독(관심) 해둘 메세지 타입을 정한 후 exchange에 의해 메세지를 전달받는다.
Virtual Host
가상 호스트로, 사용자마다 가상 호스트에 권한을 부여해 권한이 있는 사용자만 해당 호스트에 접근하도록 제어한다.
Routing Key
Publisher에서 메세지를 송신할 때 헤더에 담아서 보내는 key로, 일종의 가상 주소이다.
Exchange는 Routing key를 통해 어떤 큐로 메세지를 라우팅할 지 결정할 수 있다.

Standard Exchange Type

Exchange Type은 어떻게 라우팅을 수행할 지에 대해 결정하는 알고리즘의 일종이다. AMQP에서는 표준 Exchange Type으로 Routing key에 기반한 Direct Exchange, Topic Exchange, Fanout Exchange와 key-value 헤더에 기반한 Headers Exchange를 반드시 정의하게 되어 있다.
Direct Exchange
Routing key를 메세지 큐에 1:N으로 매칭시키는 방법이다.
Topic Exchange
정규식을 사용해 메세지를 메세지 큐에 매칭시키는 방법이다.
Fanout Exchange
모든 메세지를 모든 메세지 큐에 라우팅하는 방법이다.
Headers Exchange
key-value로 정의된 헤더에 의해 라우팅을 결정하는 방법이다. all과 any를 사용해 모든 조건을 충족 시켜야 매칭할 지, 하나만 충족해도 매칭할 지 결정할 수 있다.

그 외 메세지 프로토콜들

메세지 전달 프로토콜에는 이러한 AMQP 말고도 웹소켓에서 동작하는 텍스트 기반 메세징 프로토콜인 STOMP나, 사물 인터넷과 같이 대역폭이 제한된 통신 환경에 최적화된 경량 메세지 전송 프로토콜인 MQTT도 있다.

RabbitMQ 설치하기

homebrew를 이용해서 rabbitMQ를 설치한 후,
rabbitmq-server 명령어를 입력하여 rabbitmq 서버를 켤 수 있다. 그 후 웹 브라우저 기본 설정 포트인 15672번 포트로 연결하면,
이와 같이 로그인 화면이 뜬다. 여기에 ID - password를 guest - guest 넣으면,
이처럼 현재 rabbitmq 서버의 상태를 확인할 수 있다.

Kafka

Kafka란?

Kafka는 세계 최대 비즈니스 네트워크 사이트인 LinkedIn에서 하나의 서비스에 과도하게 많은 시스템이 연결되어 유지보수 및 서비스 관리에 어려움을 겪어, 이를 해결하기 위해 데이터를 스트림 파이프라인을 통해 실시간으로 관리하고 전달하는 고성능 분산 스트리밍 플랫폼을 만든 것이 Kafka이다.
Kafka는 이와 같이 데이터를 생성하는 애플리케이션(Producer)와 소비하는 애플리케이션(Consumer) 분리하고 그 사이의 중재자 역할을 수행함으로써, 보내는쪽과 받는쪽 모두 상대 측을 신경 쓰지 않고 메세지 송수신 할 수 있고 하나의 메세지를 여러 Consumer에게 전달하는 것을 가능하게 만든다. 이를 통해 모든 시스템으로 데이터를 실시간으로 전송하여 처리할 수 있고 시스템 구조를 확장이 용이하도록 만들 수 있다.
Kafka는 데이터의 전송 제어 역할을 수행하고, Pub-Sub 모델의 메세지 큐 형태로 동작하며 분산 시스템 환경에 특화되어 있다. 실시간 데이터 피드를 관리하기 위해 메세지를 최적화하여 높은 처리량과 낮은 지연 시간을 가지고 있다.

Kafka의 구성요소

Topic
각각의 메세지를 목적에 맞게 구분할 때 사용한다.
메세지를 생산하거나 소비할 때 반드시 Topic을 입력하며, Consumer는 자신이 담당하는 Topic 메세지를 처리한다.
병렬처리를 위해 한 개의 Topic은 한 개 이상의 Partiton으로 구성되어 분산 저장한다.
Partition
분산 처리를 위해 Topic에 대해 여러 개로 쪼개어 처리할 수 있도록 만든 기능이다.
Topic 생성 시 partition의 개수를 지정할 수 있다.
partition 내부에 각 메세지는 offset(고유 번호)로 구분된다.
partition이 1개이면 모든 메세지 순서가 보장되고, 여러 개라면 라운드-로빈 방식으로 분배되어 분산처리되어 순서가 보장되지 않는다.
partition이 많을수록 한 번에 많은 양을 처리할 수 있지만, 장애 복구 시간이 늘어난다.
Offset
Consumer에서 메세지를 어디까지 읽었는지 저장하는 값이다.
Consumer Group은 각각 파티션에 자신이 읽은 메세지 위치 정보(offset)을 기록하여, 장애 시에도 전에 마지막으로 읽었던 위치에서부터 다시 읽을 수 있다.
Producer
메세지를 생산하여 Kafka Cluster에 전송한다.
메세지 전송 시 Batch 처리가 가능하며, key 값을 지정하여 특정 파티션으로만 전송하는 것도 가능하다.
Ack 설정으로 partition reader가 받았는지 확인 여부를 바꿀 수 있어, 효율성을 높일 수 있다.
Consumer
Kafka Cluster에 저장된 메세지를 읽어 처리한다.
메세지 수신 시 Batch 처리가 가능하며, 한 개의 Consumer는 여러 개의 Topic을 처리할 수 있다.
policy에 따라 다르지만, 기본적으로 메세지를 소비하여도 삭제하지 않고 여러 번 재소비하는 것도 가능하다.
Consumer는 Consumer Group에 속한다.
하나의 partition은 같은 Consumer Group의 여러 개의 Consumer에서 연결할 수 없다.
Broker
실행된 Kafka 서버를 지칭하는 말이며, 별도의 애플리케이션인 Producer나 Consumer와 달리 Broker는 Kafka 그 자체이다.
Broker 내부에 메세지를 저장하고 관리하는 역할을 수행한다.
각 Broker(서버)는 Kafka Cluster 내부에 존재한다.
Zookeeper
분산 애플리케이션 관리를 위한 코디네이션 시스템으로, 분산 메세지 큐의 메타 정보를 중앙에서 관리한다.
Broker Cluster는 일반적으로 애플리케이션의 서버 상태, 서버의 리더, 문제가 생겼을 때 장애 체크 및 복구와 같은 코디네이터를 같이 연동하여 사용하는데, Kafka Cluster에서는 해당 코디네이터 역할을 맡는 것이 Apache의 Zookeeper이다.
Kafka Cluster를 구성하는 여러 대의 Broker 중 하나는 각 Broker에 파티션을 할당하거나 Broker들이 정상적으로 동작하는 지 모니터링하는 리더(Controller) 역할을 수행한다. Zookeeper는 연결되어 있는 Worker Broker와 Controller Broker의 메타데이터(Broker ID, Controller ID 등)를 저장한다. 만약 리더 Broker에 문제가 발생하면 해당 Broker를 잠시 정지하고, 나머지 워커 Borker 중 하나를 선택하여 리더로 승격 시키는 역할도 수행한다.
일반적으로 Broker Cluster는 하나의 Broker가 고장 났을 때 빠른 장애 복구를 위해, Zookeeper를 연동하여 3대 이상의 Broker로 구성하는 것이 권장된다.

Kafka 장단점

위에도 언급했지만 Kafka는 비동기식으로 데이터를 처리하여 짧은 지연 시간과 높은 처리량을 가지고 있어 실시간성이 좋다. 이를 통해 복잡한 시스템 구조를 실시간 데이터 처리를 통해 확장 가능한 분산 시스템으로 만들기에 용이하다.
하지만 Kafka가 개발된 기간이 길지 않아 아직 도구들이 많이 개발 중이며, Message 수정에 비용이 들고 topic selection에 제한적이라는 단점이 있다. 또한 압축과 압축 해제에서 성능 저하가 발생한다.

RabbitMQ vs Kafka

Kafka
RabbitMQ
라우팅
기본 기능에는 라우팅을 지원하지 않는다. Kafka Streams를 활용해 동적 라우팅 구현 가능
Direct, Topic, Fanout, Headers의 라우팅 옵션을 제공하여 유연한 라우팅이 가능하다.
프로토콜
단순한 메세지 헤더를 가진 TCP 기반 custom 프로토콜을 사용하여 대체하기가 어렵다.
AMQP, MQTT, STOMP 등 여러 메세징 프로토콜 지원한다.
우선 순위
메세징 큐가 변경 불가능한 시퀀스 큐로, 한 파티션 내에서는 시간 순서를 보장한다. 하지만 여러 파티션이 병렬로 처리할 때는 보장하지 않는다.
Priority Queue를 지원하여 우선순위에 따라 처리 가능하다.
Queue의 이벤트 저장
이벤트를 삭제하지 않고 디스크에 저장하여 영속성이 보장되고 재처리가 가능하다.
메세지가 성공적으로 전달되었다고 판단되면 메세지가 큐에서 삭제되어 재처리가 어렵다.
속도
100 KB/sec
20 message/sec
장점
이벤트가 영속성 보장, 이벤트 재처리 가능 고성능, 고가용성, 분산처리에 효율적 Producer 중심적(많은 데이터를 병렬 처리)
오래전에 개발되어 제품 성숙도가 높다 필요에 따라 동기/비동기 가능 유연한 라우팅 Producer - Consumer 간 메세지 전달 보장 Mange UI 기본 제공 Broker, Consumer 중심적(관리적 측면이나 다양한 기능 구현에 사용)
단점
범용 메세징 시스템에서 제공되는 다양한 기능을 지원하지 않는다.
Kafka에 비해 느리다.

Kafka Client 사용해보기

각 Kafka-Client는 Kafka Cluster를 통해서 메세지를 주고 받으면서, 보내는 메세지를 누가 받을 것인지나 받은 메세지를 누가 보낸 것인지 관심을 두지 않고 메세지 자체로만 주고받는 Producer-Consumer 형태로 사용해보자.
이처럼 kafka.apache.org에서 다운로드하고, 받은 파일을 압축을 해제하면
이처럼 shell 환경에서 동작할 수 있는 명령어 형태로 파일들이 들어있다.
Kafka를 다운로드하여 압출을 풀어둔 폴더로 이동하여, 서버와 클라이언트를 실행해보자.
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
Shell
복사
위 명령어를 입력하여 zookeeper를 실행시키고,
./bin/kafka-server-start.sh ./config/server.properties
Shell
복사
이를 통해 kafka 서버를 실행시킨다.
클라이언트를 실행 시키기 전에 각 클라이언트가 구독(관심)하고 있는 Topic을 만들어야 한다.
# Topic 목록 확인 ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --list # Topic 생성 ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic quickstart-events --partitions 1 # Topic 정보 자세히 보기 ./bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe quickstart-events
Shell
복사
이 명령어들을 통해 Topic을 생성했다면,
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic quickstart-events
Shell
복사
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic quickstart-events --from-beginning
Shell
복사
이와 같이 Producer와 Consumer 클라이언트를 실행시키자.
Producer와 Consumer 모두 같은 Topic에 대해 등록했기 때문에,
이처럼 producer에서 전달하는 메세지를 consumer에서 받는 것을 확인할 수 있다.

Kafka Connect 설치하기

Kafka Connect는 데이터를 자유롭게 Import/Export 할 수 있는 기능을 말한다. Producer-Consumer처럼 Kafka를 거쳐 단순히 메세지를 주고 받는 것이 아니라, 다양한 Data Source에서 메세지를 가져와 Kafka를 통해 다른 시스템이나 애플리케이션에 전달하고 싶을 때 사용하는 것이 Kafka Connect이다.
별도의 코드 작성 없이도 Configuration만으로도 데이터를 가져와서 전달하는 것이 가능하고, standalone 모드나 분산 모드로 사용하는 것이 가능하다. RESTful API를 사용할 수 있기 때문에, 커넥터 생성, 삭제, 변경 등 커넥터를 관리할 수 있다. Stream이나 Batch 형태로 데이터 전송하는 것이 가능하고, 다양한 플러그인과 조합하여 커스텀 커넥터를 사용할 수 있다.
파일이나 데이터베이스와 같은 소스에서 데이터를 가져와 Kafka Cluster에 저장하는 것이 Kafka Connect Source의 역할이고, 반대로 Kafka Cluster에 저장되어 있던 데이터를 S3나 다른 타겟 시스템으로 export하는 것이 Kafka Connect Sink의 역할이다.
이와 같은 Kafka Connect를 사용하여 데이터베이스에서 데이터를 꺼내 다른 file로 저장해보자.
curl -O http://packages.confluent.io/archive/6.1/confluent-community-6.1.0.tar.gz
Shell
복사
위 명령어를 통해 Kafka Connect를 설치하고 압축을 해제하면,
Kafka를 설치했을 때와 동일하게 여러 명령어들과 Kafka에서 사용하는 라이브러리가 들어있다.
./bin/connect-distributed ./etc/kafka/connect-distributed.properties
Shell
복사
Kafka Connect는 위 명령어를 통해 실행 시킬 수 있다. Kafka Connect를 실행시키고 나면,
이와 같이 우리가 등록하지 않은 몇 가지 Topic이 추가된다. 이는 Kafka Connect가 Source에서 데이터를 읽어 저장하기 위한 Topic을 관리하기 위해 생성해둔 것이다.
다음으로 Kafka 커넥터에서 사용할 JDBC 커넥터를 설치해야한다.
이처럼 http://confluent.io/hub/confluentinc/kafka-connect-jdbc에서 JDBC 커넥터를 다운로드하고, 파일을 압축해제 하면
이처럼 여러 파일들이 들어있을 것이다. 여기에서 lib 폴더 안의 kafka-connect-jdbc-x.x.x.jar 파일을 Kafka Connect에서 사용할 수 있도록 등록해주어야 한다.
Kafka Connect가 설치된 폴더에서 etc - kafka 폴더 안의 http://connect-distributed.properties 파일을 열고,
파일 제일 하단의 plugin.path부분에 아까 kafka-connect-jdbc-x.x.x.jar 파일이 있던 경로로 변경해준다.
마지막으로 우리가 프로젝트를 생성하면서 라이브러리 파일이 저장되어있는 곳에서, Mariadb에 대한 jar 파일을 복사하여 넣어주어야 한다.
${HOME}/.m2/repository/org/mariadb 경로에서 mariadb의 jar 파일을 복사하여, Kafka Connect 폴더의 share - java - kafka 폴더에 붙여넣자.
만약 이 과정까지 수행했는데 No suitable driver found for jdbc 오류가 발생하며 실행이 되지 않는다면, https://repo1.maven.org/maven2/com/mysql/mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar에서 mysql의 jar 파일을 다운로드 받아 동일한 폴더에 넣어주면 해결된다.

Kafka Source Connector 추가하기

./bin/connect-distributed ./etc/kafka/connect-distributed.properties
Shell
복사
먼저 zookeeper와 Kafka 서버가 켜져있는 상태에서 위 명령어를 통해 Kafka Connect 서버를 켜주고,
{ "name": "my-source-connect", "config" : { "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url":"jdbc:mysql://localhost:3306/mydb", "connection.user":"root", "connection.password":"test1357", "mode": "incrementing", "incrementing.column.name" : "id", "table.whitelist":"users", "topic.prefix" : "my_topic_", "tasks.max" : "1" } }
Shell
복사
위와 같이 등록할 connector의 종류, url, db의 아이디/비밀번호 등 설정을 JSON 형태로 담아 Kafka Connect 서버에 POST 요청을 보내 등록한다.
제대로 등록이 되었다면 이와 같이 connectors 목록을 조회하거나 상세 정보 보기를 했을 때, 위처럼 정상 동작된 것을 확인할 수 있다.
Source Connect를 막 등록하면 이처럼 Kafka Connect에서 관리하는 기본 Topic 밖에 없었지만,
이처럼 등록한 DB의 변경사항이 발생하면
자동으로 Topic을 등록하고 해당 Topic에 변경된 사항에 대한 정보를 저장한다.
때문에 consumer를 통해 데이터를 읽어보면, 위처럼 DB에 직접 값을 추가했던 내용이 JSON 형태로 Kafka에 저장된 것을 확인할 수 있다.
이와 같이 새로운 유저를 추가해도
마찬가지로 새로운 데이터가 Kafka의 Topic에 저장되고 consumer에서 해당 내용을 볼 수 있다.
해당 내용을 Json Parser를 통해 펼쳐보면,
이와 같이 되어있는데, 이는 데이터베이스의 필드 타입들(schema)과 거기에 저장할/저장된 값(payload) 형태로 되어있다. 데이터의 변경을 감지할 때 이러한 형태로 Kafka에 저장되기 때문에, 우리가 인위적으로 데이터 변경을 시키고 싶다면 위와 동일한 포맷을 지켜 작성해야한다.

Kafka Sink Connector 추가하기

{ "name": "my-sink-connect", "config" : { "connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector", "connection.url":"jdbc:mysql://localhost:3306/mydb", "connection.user":"root", "connection.password":"test1357", "auto.create": "true", "auto.evolve": "true", "delete.enabled": "false", "tasks.max" : "1", "topics" : "my_topic_users" } }
Shell
복사
Sink Connector를 등록하는 과정도 Source Connector를 등록하는 과정과 동일하게 Kafka Connect 서버에 등록할 Sink Connector 정보를 JSON 형태로 담아서 POST 요청을 보내면 등록이 된다.
Sink Connector가 등록된 후 DB를 살펴보면,
이처럼 우리가 직접 생성하지 않은 my_topic_users라는 새로운 테이블이 Sink Connector를 등록하면서 추가된다.
my_topic_users 테이블 내에는 우리가 이전에 user 테이블에 추가해두었던 값이 들어있는데, 이는 Kafka Topic에 저장되어 있던 값을 Sink Connector가 등록된 후 데이터들을 읽어 추가해둔 것이다.
만약 이와 같이 새로운 유저를 추가한다면,
consumer를 통해 Kafka Topic에 새로운 데이터가 저장되는 것을 볼 수 있고
이 내용이 Sink Connector를 통해서 다시 my_topic_users 테이블에 저장된다.
추가적으로
이와 같은 데이터가 저장되어 있는 DB에서
우리가 consumer를 통해 보았던 포맷 형태로 새로 추가할 데이터들을 넣어 producer에 입력하면,
Kafka Connect의 해당 Topic에 데이터가 저장되면서 Sink Connector가 데이터를 전달하여 DB에서도 새로운 데이터가 추가되는 것을 확인할 수 있다.

참고