์ด๋ฒ์ ํ๋ก์ ํธ์์ MSA๋ฅผ ์ ์ฉํ๊ฒ ๋๋ฉด์ ๋น๋๊ธฐ ํต์ ์์ ์ ์ํด Kafka๋ฅผ ํ์ตํ์๊ณ ํ๊ฒฝ์ค์ ์ ํ๊ฒ ๋์๋ค.
1. kafka, zookeeper Docker Compose๋ก ์ค์น
version: '3.8'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_CREATE_TOPICS: "test:1:1"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
- zookeeper
2. Kafka ํ ์คํธ
(1) ํ ํฝ์์ฑํ๊ธฐ
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-events --partitions 1
(2) Producer ์คํ
./opt/kafka_2.13-2.8.1/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-events
(3) Consumer ์คํ
./opt/kafka_2.13-2.8.1/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-events \ --from-beginning
(4) ํ ์คํธ ๊ฒฐ๊ณผ
Producer๊ฐ ํ ํฝ์ ๋ฉ์์ง๋ฅผ ๋ณด๋ด๋ฉด ํด๋น ํ ํฝ์ ๊ตฌ๋ ํ๊ณ ์๋ Consumer๊ฐ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ด์ค๋ ๊ฒ์ ๋ณผ ์ ์๋ค.
3. Kafka Connect ์ค์น
(1) kafka connect ์ค์น
curl -O http://packages.confluent.io/archive/6.1/confluent-community-6.1.0.tar.gz
(1) kafka connect ์คํ (zookeeper์ kafka๊ฐ ์คํ์ค์ผ ๋ Mac๊ธฐ์ค)
./bin/connect-distributed ./etc/kafka/connect-distributed.properties
์คํํ์ ํ ํฝ์ ์กฐํํด ๋ณด๋ฉด ๊ธฐ์กด์ ์กด์ฌํ์ง ์๋ ๊ฒ๋ค์ด ์๋์ผ๋ก ์๊ธด๋ค.
4. Kafka Connect JDBC ์ค์น ๋ฐ Kafka Connect ์ค์
https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc
์ URL๋ก ์ ๊ทผํ๋ฉด JDBC Connector๋ฅผ ์ค์นํ ์ ์๋ค.
๊ทธ๋ค์์ Kafka Connect ์ค์นํ ํด๋์ ๋ค์ด๊ฐ์ connect-distributed.properties ํ์ผ์ ์ด์ด์ ๋ฐฉ๊ธ ์ค์นํ Kafka Connect JDBC์ confluentic-kafka-connect-jdbc-10.7.2/lib ํด๋ ๋ด์ kafka-connect-jdbc-10.7.2.jar๋ฅผ ์ฌ์ฉํ๋๋ก ์ฐ๊ฒฐํด์ผ ํ๋ค.
plugin.path๋ฅผ ์ํ๋ jarํ์ผ์ด ์๋ path๋ก ๋ณ๊ฒฝํด์ค๋ค.
5. Kafka Conenct ํด๋ ๋ด๋ก mariadb-java-client.jar ํ์ผ ๋ณต์ฌ
~/.m2/repository/org/mariadb/jdbc/mariadb-java-client/2.7.2 ๊ฒฝ๋ก๋ก ์ด๋
mariadb-java-client-2.7.2.jar ํ์ผ์ ~/xxx/confluent-6.1.0/share/java/kafka ํด๋ ๋ด๋ก ๋ณต์ฌ
cp ./mariadb-java-client-2.7.2.jar ~/xxx/confluent-6.1.0/share/java/kafka
6. Kafka Source Connect ๋ฑ๋ก
Postman์ ์คํํ์ฌ http://127.0.0.1:8083/connectors๋ก ์๋ json์ body์ ํฌํจํ์ฌ POST ์์ฒญํ๋ค.
{
"name" : "my-source-connect",
"config" : {
"connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:mariadb://localhost:3305/mydb",
"connection.user":"root",
"connection.password":"๋น๋ฐ๋ฒํธ",
"mode": "incrementing",
"incrementing.column.name" : "id", //์๋ ์ฆ๊ฐ์ํฌ ์ปฌ๋ผ
"table.whitelist":"users", //๊ฐ์งํ ํ
์ด๋ธ ์ด๋ฆ
"topic.prefix" : "my_topic_",
"tasks.max" : "1"
}
}
๋ง์ฝ ์์ฒญ์ ์๋ชป ๋ณด๋ด์ ์ญ์ ํด์ผ ํ๋ ๊ฒฝ์ฐ http://127.0.0.1:8083/connectors/์์ค์ปค๋ฅํฐ์ด๋ฆ DELETE ์์ฒญ์ ํ๋ค.
๊ทธ๋ฌ๊ณ ๋์ ๊ฐ์งํ ํ
์ด๋ธ ์ฆ, ์ฌ๊ธฐ์๋ users ํ
์ด๋ธ์ row๋ฅผ ์ถ๊ฐํด ๋ณธ๋ค.
๊ทธ๋ฌ๋ฉด ๊ฐ์ง๋์ด ์๋์ ๊ฐ์ด ํ ํฝ์ด ์์ฑ๋๊ณ ์ปจ์๋จธ๋ฅผ ์คํํ๋ฉด ๊ฐ์ ๋ณผ ์ ์๋ค.
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": true,
"field": "user_id"
},
{
"type": "string",
"optional": true,
"field": "pwd"
},
{
"type": "string",
"optional": true,
"field": "name"
},
{
"type": "int64",
"optional": true,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"field": "created_at"
}
],
"optional": false,
"name": "users"
},
"payload": {
"id": 1,
"user_id": "user1",
"pwd": "test111",
"name": "username",
"created_at": 1685872665000
}
}
7. Kafka Sink Connect ๋ฑ๋ก
Kafka Source Connector์ ๋๊ฐ์ด Postman์ ํตํด sink connector๋ฅผ ๋ฑ๋กํ๋ค.
{
"name":"my-sink-connect",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://localhost:3305/mydb",
"connection.user":"root",
"connection.password":"love1994",
"auto.create":"true",
"auto.evolve":"true",
"delete.enabled":"false",
"tasks.max":"1",
"topics":"my_topic_users"
}
}
8. Kafka Source Connector, Sink Connector ๋ฑ๋ก ํ์ธ
http://127.0.0.1:8083/connectors Get ํธ์ถํ์ฌ ์ปค๋ฅํฐ๊ฐ ์ ๋๋ก ๋ฑ๋ก๋์๋์ง ํ์ธํ๋ค.
sink connector ๋ฑ๋ก ์์ auto.create=ture ์ค์ ์ ์คฌ์ผ๋ฏ๋ก ์์ ๊ฐ์ด my_topic_users ํ
์ด๋ธ์ด ๋ฑ๋ก๋์๋ค.
๋ํ ์ด์ ์ source connector ๋ฑ๋ก ํ์ usersํ
์ด๋ธ์ insert ๋์๋ 2๊ฐ์ user row๊ฐ ๋ค์ด๊ฐ ์๋ ๊ฒ๋ ํ์ธํ ์ ์๋ค.
์์ง ์ด ๊ตฌ์กฐ๊ฐ ์ต์ํ์ง ์์ kafka source connector, sink connector๋ ์ปจํ ์ด๋ ํ๊ฒฝ์ด ์๋ ๋ก์ปฌ์์ ๋์ํ๋๋ก ํ์๋๋ฐ ์ถํ์ ์ต์ํด์ง๋ฉด ๋์ ํด๋ด์ผ๊ฒ ๋ค..
'๐ณ DevOps > ๐ Kafka' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
Kafka์ ์ดํด2 - ISR (0) | 2023.05.31 |
---|---|
Kafka์ ์ดํด1 - ๋ฐฐ๊ฒฝ๋ถํฐ ๊ธฐ๋ณธ ์ฉ์ด ์ค๋ช (0) | 2023.05.31 |