Python 으로 Kafka 데이터를 연동하는 프로젝트를 진행했다.
개발환경
서버에 설치되어 있는 파이썬이 구버전이었는데 버전을 올려서 하기가 어려운 상황이라 그냥 진행했다.
1
2
3
Python: 2.7
Kafka-python: 2.0.2
Elasticsearch: 6.8.2
프로세스
프로세스는 심플하다. 데이터를 가져와서 가공한 후에 각각의 서버로 넣어주는 파이프 역할이었다.
- Kafka Data Poll -> 전처리 -> Kafka Producer 전송
- Kafka Data Poll -> 전처리 -> ES 적재
2개의 프로세스를 실시간으로 처리해야 해서 Thread 2개를 사용했다.
전처리 속도가 크게 문제되지 않는 수준이라 Thread를 사용했지만,
파이썬 속도 이슈가 있다면 multiprocessing 을 쓰는것도 하나의 방법일 것 같다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# -*- encoding:utf-8 -*-
import time
from json import loads
from json import dumps
from threading import Thread
from datetime import datetime
from kafka import KafkaConsumer
from kafka import KafkaProducer
from kafka import TopicPartition
from elasticsearch import Elasticsearch
CONSUMER_SERVERS=['127.0.0.1:9092', '127.0.0.2:9092']
PRODUCER_SERVERS=['127.0.1.1:9092', '127.0.1.2:9092']
ELASTIC_SERVERS=['http://127.0.0.1:9400','http://127.0.0.2:9400']
# Consumer 생성
def create_consumer(group_id):
consumer = KafkaConsumer(
bootstrap_servers=CONSUMER_SERVERS, # 카프카 컨슈머 서버
enable_auto_commit=False, # 오토커밋 여부 (수동 커밋할거라 false)
group_id=group_id, # Group ID
auto_offset_reset='earliest', # 데이터를 어디서부터 끌어올지 여부 (earliest: 제일 오래된 데이터부터 / lastest: 최신 데이터부터)
consumer_timeout_ms=10000, # 타임아웃 설정
value_deserializer=lambda x: loads(x.decode('utf-8')) # 데이터 인코딩
)
return consumer
# Producer 생성
def create_producer():
producer = KafkaProducer(
bootstrap_servers=PRODUCER_SERVERS, # 카프카 프로듀서 서버
acks="all", # Kafka 프로듀서가 데이터를 받았는지 확인 (0: No, 1: Yes (Follower는 확인 X), all: Yes (Follower 포함 - 손실률이 제일 적음))
value_serializer=lambda x: dumps(x).encode('utf-8') # 데이터 인코딩
)
return producer
# Elastic Search 생성
def create_elastic():
es = Elasticsearch(
ELASTIC_SERVERS,
request_timeout=60,
max_retries=1,
retry_on_timeout=True
)
return es
# Redis 데이터 가져오기
def get_redis(redis_key):
connection = redis.Redis("127.0.0.1", port=6379, charset='utf-8', decode_response=True)
data = connection.hgetall(redis_key)
return data
# Kafka -> Kafka
def proc_kafka(topic, group_id):
# 컨슈머 생성
consumer = create_consumer(group_id)
# 구독
consumer.subscribe([topic])
# 프로듀서 생성
producer = create_producer()
while True:
# 데이터 Poll
message = consumer.poll()
# 데이터가 없을 경우 30초 대기
if len(message) == 0:
time.sleep(30)
continue
# 전처리 성공여부
check_bool = True
for topic_partition, records in message.items():
for record in records:
try:
# Data 전처리
data = record.value['result']
# Producer 전송
producer.send('토픽명', value=data)
producer.flush()
except:
# 오류 발생 시 최종 offset 위치 저장
consumer.seek(TopicPartition(record.topic, record.partition), record.offset)
# Log 출력 후 for문 종료
check_bool = False
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S") + ' Kafka -> Kafka 데이터 연동 오류 발생')
time.sleep(60)
break
# 전처리 실패 시 데이터 다시 구독
if check_bool == False:
break
# 전처리 성공 시 Commit 후 다음 메시지 구독
if check_bool == True:
consumer.commit()
# Kafka -> ES
def proc_es(topic, group_id):
# 컨슈머 생성
consumer = create_consumer(group_id)
# 구독
consumer.subscribe([topic])
# Reids Data 가져오기
REDIS_DATA = get_redis('redis_key')
# ES 생성
es = create_elastic()
while True:
# 데이터 Poll
message = consumer.poll()
# 데이터가 없을 경우 30초 대기
if len(message) == 0:
time.sleep(30)
continue
# 전처리 성공여부
check_bool = True
for topic_partition, records in message.items():
for record in records:
try:
# Data 전처리
data = record.value['result']
# ES 입력
es.update(index='인덱스명', doc_type='itmes', id=키값, body={'doc': data, 'doc_as_upsert': True})
except:
# 오류 발생 시 최종 offset 위치 저장
consumer.seek(TopicPartition(record.topic, record.partition), record.offset)
# Log 출력 후 for문 종료
check_bool = False
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S") + ' Kafka -> ES 데이터 연동 오류 발생')
time.sleep(60)
break
# 전처리 실패 시 데이터 다시 구독
if check_bool == False:
break
# 전처리 성공 시 Commit 후 다음 메시지 구독
if check_bool == True:
consumer.commit()
if __name__=="__main__":
th1 = Thread(target=proc_kafka, args=('토픽명1', '그룹ID'))
th2 = Thread(target=proc_es, args=('토픽명2', '그룹ID'))
th1.start()
th2.start()
th1.join()
th2.join()
python 으로 만들긴 했지만 일반적으로는 java를 많이 사용하는 듯 하다.