본문 바로가기
DevOps

아파치 카프카 개발

qbang 2022. 1. 25.

AWS에 카프카 클러스터 설치 및 실행

1. AWS ec2 3대를 준비하고 각각 접속한다.

2. 각 인스턴스에서 jdk를 설치한다.

yum install java-1.8.0-openjdk-devel.x86_64

3. 각 인스턴스에서 주키퍼를 다운받는다.

wget https://dlcdn.apache.org/zookeeper/zookeeper-3.5.9/apache-zookeeper-3.5.9-bin.tar.gz

4. 각 인스턴스에서 다운받은 주키퍼의 압축을 풀어준다.

tar xvf apache-zookeeper-3.5.9-bin.tar.gz

5. 주키퍼 앙상블을 구축하기 위해서 각 서버마다 주키퍼 설정을 해준다.

cd apache-zookeeper-3.5.9-bin/conf

위 경로에 들어가서 zoo.cfg를 다음과 같이 편집해준다. 해당 버전을 설치했더니 zoo-example.cfg 파일이 있어서 mv zoo-exmaple.cfg zoo.cfg 명령어를 이용해 파일명을 바꾸어주었다.

tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
initLimit=20
syncLimit=5
server.1=test-broker01:2888:3888
server.2=test-broker02:2888:3888
server.3=test-broker03:2888:3888

6. 각 서버별로 ip가 아닌 hostname으로 통신하기 위해 /etc/hosts를 수정한다.

0.0.0.0 test-broker01
{test-broker02의 퍼블릭 IPv4 주소} test-broker02
{test-broker03의 퍼블릭 IPv4 주소} test-broker03
0.0.0.0 test-broker02
{test-broker01의 퍼블릭 IPv4 주소} test-broker01
{test-broker03의 퍼블릭 IPv4 주소} test-broker03
0.0.0.0 test-broker03
{test-broker01의 퍼블릭 IPv4 주소} test-broker01
{test-broker02의 퍼블릭 IPv4 주소} test-broker02

7. 보안 그룹 설정

인바운드 규칙을 추가한다.

8. 주키퍼 실행

./zkServer.sh start

각 서버의 apache-zookeeper-3.5.9-bin/bin 에서 주키퍼를 실행한다.

9. AWS 주키퍼에 연결

brew install zookeeper

로컬 환경에 zookeeper를 설치해준다(본인은 homebrew를 통해 설치). 

./zkCli -server {인스턴스 주소:2181}

/usr/local/Cellar/zookeeper/3.7.0_1/bin 경로에 들어간다. 위 명령어를 이용해 각 인스턴스 주소를 바꿔가면서 연결이 잘 되는지 확인한다.

10. 카프카 설치

# wget https://archive.apache.org/dist/kafka/2.1.0/kafka_2.11-2.1.0.tgz
# tar xzf kafka_2.11-2.1.0.tgz

/home/ec2-user 경로에서 주키퍼와 마찬가지로 wget을 이용하여 각 서버마다 카프카를 설치해준다.

각 서버마다 kafka_2.11-2.1.0/config 경로에 있는 server.properties 파일을 수정해준다.(꼭 각기다른 숫자들로 설정해주어야 한다)

listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://{브로커 이름 ex.test-broker01}:9092

listener와 advertise listener도 설정하고, 아까 실행한 주키퍼의 hostname과 port도 넣어준다.

./kafka-server-start.sh ../config/server.properties

이제 kafka_2.11-2.1.0/bin 경로에서 위 명령어를 통해 카프카를 실행해본다. 만약 There is insufficient memory ... 에러가 출력된다면 면 접은 글을 참조하자.

더보기

이런 에러가 난다면 먼저 free -h 명령어를 통해 swap 메모리 공간을 확인한다.

sudo dd if=/dev/zero of=/swapfile bs=128M count=32

만약 0이라면 위 명령어를 통해 스왑 파일을 생성한다.

sudo chmod 600 /swapfile

 스왑 파일의 권한을 업데이트한다.

# sudo mkswap /swapfile
# sudo swapon /swapfile

 리눅스 스왑 영역을 설정하고, 스왑 공간에 스왑 파일을 추가하여 즉시 사용할 수 있도록 한다. 그리고 /etc/fstab 파일을 열어서 파일 끝에 /swapfile swap swap defaults 0 0 를 추가한다.

설정이 제대로 되었는지 확인한다.

11. 테스트

./kafka-topics.sh --create --zookeeper test-broker01:2181,test-broker02:2181,test-broker03:2181/test --replication-factor 3 --partitions 1 --topic test

테스트 전 하나의 서버에서 토픽을 생성해준다.

./kafka-console-producer.sh --broker-list test-broker01:9092,test-broker02:9092,test-broker03:9092 --topic test
./kafka-console-consumer.sh --bootstrap-server test-broker01:9092,test-broker02:9092,test-broker03:9092 --topic test --from-beginning

하나의 서버에서는 producer를 실행해주고, 동시에 다른 서버에 consumer를 실행해준다.

프로듀서에서 데이터 생성
컨슈머에서 데이터 확인

프로듀서에서 This is a message from test-broker02 라는 데이터를 생성했고, 컨슈머 측에서 데이터가 정상적으로 확인되었다.

 

참고

https://www.youtube.com/watch?v=Qr0HVvtMFhg 

https://blog.voidmainvoid.net/325

댓글