프로젝트 준비로 Change Data Capture를 테스트하다 보니, 테스트 데이터셋을 Kafka에 Producing해야했고, Python에 있는 python-kafka 패키지를 통해 간단하게 구현 할 수 있었다.
0. 버전정보
0.1 테스트 환경
- Linux, Rhel8.6
- Python3.6.4 (Rhel8.6 default python)
- kerberos enabled
0.2 Python Package 정보
$pip list
decorator 5.1.1
gssapi 1.7.3
kafka-python 2.0.2
krbcontext 0.10
pip 21.3.1
setuptools 39.2.0
1. 사전 설정 작업
1.1 Python venv 설정
Linux 환경에서 Python 가상환경을 구성한다.
#python3 -m venv [Path]
python3 -m venv /python/python_kafka
# activate
# cd [Path]/bin
cd /python/python_kafka/bin
source activate
1.2 Python Package 설치
krbcontext는 kerberized된 Kafka 연결에 필요한 패키지
pip install --upgrade pip
pip install kafka-python krbcontext
2. Python kafka Producer file 작성
CDC 테스트에 Json형식의 예제 데이터는 아래와 같다.
date = {"before":{
"id": 1,
"val": 123
},
"after":{
"id": 1,
"val": 321
}
}
2.1 예제 코드
from kafka import KafkaProducer
from krbcontext.context import krbContext
import time
with krbContext(using_keytab=True, principal='test@EXAMPLE.COM', keytab_file='/etc/security/keytabs/test.keytab', ccache_file='/tmp/krb5cc_example'):
try:
producer = KafkaProducer(bootstrap_servers=['bootstrap_server_IP:9092'],
acks=1,
sasl_mechanism='GSSAPI',
security_protocol='SASL_PLAINTEXT',
sasl_kerberos_service_name='kafka',
value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
start = time.time() # Producing 시작시간
for i in range(10000):
data = {'id': i,'val': i} # { Key : value, json 형식의 데이터 스키마}
producer.send('test_topic', value=data) # test_topic에 data라는 value를 produsing
producer.flush() #건 바이 건으로 배치 처리하지 않고, 보류 중인 모든 메시지 즉시 전송
print("Total time : ", time.time() - start)
2.2 실제 테스트 코드
Chagne Data Capture(CDC)를 테스트 하기 위해서 Insert, Update, Delete에 대한 테스트데이터를 Producing하기 위한 코드이다.
- Before : Null, After 존재 > Insert
- Before : 존재, Before Null > Delete
- Before : 존재, AFter 존재 > Update
#! /usr/bin/python3
from kafka import KafkaProducer
from krbcontext.context import krbContext
import time
import json
### krbContext를 with문으로 사용해 프로세스가 떠 있는 동안 Keytab파일을 이용한 Kerberos인증을 하도록 작성.
### keytab_file의 저장 위치와, ccache_file의 위치는 사용자 지정
with krbContext(using_keytab=True, principal='test@REALM_INFO', keytab_file='/etc/security/keytabs/test.keytab', ccache_file='/tmp/krb5cc_pid_kafka_prod'):
try:
producer = KafkaProducer(bootstrap_servers=['kafka_server:9092'],
acks=1,
sasl_mechanism='GSSAPI',
security_protocol='SASL_PLAINTEXT',
sasl_kerberos_service_name='kafka',
value_serializer=lambda x: json.dumps(x).encode('utf-8')
)
for i in range(1,1001):
data = {"after":{"id": i,"val": 100}} #insert
producer.send('cdctopic',value=data) # 메세지 전송
producer.flush() #배치 처리하지 않고, 보류 중인 모든 메시지 즉시 전송
for i in range(10,1001):
data = {"before":{"id": i,"val": 200},"after":{"id": i,"val": 724}} #update
producer.send('cdctopic',value=data) # 메세지 전송
producer.flush() #배치 처리하지 않고, 보류 중인 모든 메시지 즉시 전송
for i in range(50,1001):
data = {"before":{"id": i,"val": 200}} #delete
producer.send('cdctopic',value=data) # 메세지 전송
producer.flush() #배치 처리하지 않고, 보류 중인 모든 메시지 즉시 전송
except Exception as e:
print("An error occurred:", str(e))
pass
3. Kafka Topic 생성 (참고용)
Kerberos가 적용된 Kafka Broker에 Topic을 생성하기 위한 작업이다.
3.1 jaas.conf, client.properties 작성
JAAS(Java Authentication and Authorization Service), JAAS(Java Authentication and Authorization Service)는 자바 프로그래밍 언어의 보안 프레임워크이다.
jaas.conf에 Kerberized Kafka Broker에 Connect 하기 위해 필요한 Client 정보를 작성하는 것.
client.propperties 내용도 kafka-topics --option
과 같이 옵션으로 명시가 가능하나, 파일로 명령어 config를 관리하기 위해 작성한다.
3.1.1 jaas.conf
/etc/security/keytabs/ 아래 경로로 관련 keytab파일을 copy해서 사용하면 된다.
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/etc/security/keytabs/kafka.keytab"
storeKey=true
useTicketCache=false
principal="kafka@REALM.COM";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
storeKey=true
useTicketCache=false
keyTab="/etc/security/keytabs/kafka.keytab"
principal="kafka@REALM.COM";
};
3.1.2 client.properties
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
3.2 Kafka Topic 생성
작업 Linux 터미널(Session)에서 커버로스 관련 인증을 위해 아래의 명령어를 실행한다. 추가적으로, CDC의 경우 데이터의 순차성보장이 매우매우 중요하기 때문에 partition의 수는 1로 고정이다… Kafka의 동작원리를 공부해보면 이해가 가능할 것.
# 필자는 권한관리를 위한 Ranger가 세팅되어 있기 때문에 Kafka topic create 권한을 가진 유저의 keytab을 활용했다.
kinit -kw kafka.keytab kafka@REALM
export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/security/keytabs/jaas.conf"
# topic list 조회
kafka-topics --list --bootstrap-server=cfm01.goodmit.com:9092 --command-config client.properties
kafka-topics --create --topic test_topic --partitions 1 --bootstrap-server=cfm01.goodmit.com:9092 --command-config client.properties