dev/Cloud & Infra

kafka 개발 - AdminClient 로 관리 기능 개발하기 - Broker 정보 보기

lugi 2019. 9. 7. 23:35

KAFKA 한국 사용자 모임(https://www.facebook.com/groups/kafka.kru/)에는 Kafka를 사용하는 사람들이 많은 정보와 질문, 답변을 올려주는 곳이다. 이곳에서 내가 설정한 기본 Property 말고 Broker에 설정된 전체 Property를 볼 수 없을지에 대한 질문이 올라와 그 부분에 대해서 코드를 작성하다. 예전부터 해 오던 AdminClient 와 관련된 내용을 전체적으로 한 번 정리하자는 취지에서 포스팅을 하게 되었다.

 

2.3.0 기준으로 Kafka가 제공하는 API는 아래와 같다

  • Producer API
  • Consumer API
  • Streams API
  • Connect API
  • AdminClient API

오늘 살펴볼 API인 AdminClient는 0.11.0.0 에서 소개된 API로 버전마다 많은 기능이 개선되고 있다. 대부분의 API의 호환성은 유지되지만, 일부 API의 경우는 minor version update 임에도 불구하고 deprecated 되거나 없던 API가 추가되기도 한다. 예를 들어, 오늘 소개할 예제는 내가 테스트를 위해 쓰고 있는 2.0.0 환경(회사에서 쓰는 버전과 맞춰서 굴리고 있다)의 브로커에 맞춰서 개발하고 있는데, 2.3.0부터는 alterConfigs 메소드가 Deprecated 되고 incrementalAlterConfigs가 새로이 도입되었다.

 

Kafka 의 AdminClient 를 사용하기 위해서는 Kafka Client 가 필요하다.

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<!-- 버전은 본인이 사용하는 Broker의 버전에 맞게 -->
	<version>2.0.0</version>
</dependency>

Consumer나 Producer API는 major 버전이 같으면 어느 정도 호환성이 확보된 것 같은데, AdminClient 는 대놓고 Evolve 중이라고 말하고 있기 때문에 될 수 있으면 Broker 버전과 맞추는 것이 좋을 것 같다.

 

 

일단 올라온 질문의 취지에 맞게, Broker의 전체 Config를 보는 code를 한 번 살펴보자

동작에 관한 설명은 코드에 달아놓은 주석으로 갈음하겠다.

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;

public class GetBrokerConfigTest {
    private static final Logger LOG = LoggerFactory.getLogger(GetBrokerConfigTest.class);

    private static final String BROKERS = "192.168.0.201:9092,192.168.0.202:9092,192.168.0.203:9092";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1. AdminClient로 Broker에 연결
        Properties props = new Properties();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS);
        // AdminClient 는 객체 생성 후 종료시 닫아줘야 하는 객체인데 try 로 감싸서 생성하는 그걸 신경 쓸 필요가 없음
        try (AdminClient client = AdminClient.create(props)) {
            // Broker 1번의 Config를 describeConfigs 를 통해 얻음, 현재 Type.Broker와 Type.TOPIC 을 지원함
            ConfigResource resource = new ConfigResource(ConfigResource.Type.BROKER, "1");

            /*
            describeConfigs 의 결과는 ConfigResource (어떤 타입(Broker, TOPIC)의 어디의(Broker Id, TOPIC명) 정보인가? 를 Key로 하고
            결과값, 예외, 완료 여부를 비동기로 반환하는 Future를 Value 로 하는 Map 구조를 가진다
            예를 들어, describeConfigs 의 수행 결과를 즉시 찍어보면, 비동기 작업이 아직 완료되지 않았기 때문에
            {ConfigResource(type=BROKER, name='1')=KafkaFuture{value=null,exception=null,done=false}}
            의 형태로 나타난다.
            */
            Set<Map.Entry<ConfigResource, KafkaFuture<Config>>> entries = client.describeConfigs(Arrays.asList(resource)).values().entrySet();

            // 방법1. 비동기식으로 사용하는 방법
            for (Map.Entry<ConfigResource, KafkaFuture<Config>> entry : entries) {
                /*
                호출 결과의 Value는 비동기로 돌아온다. Future의 비동기 값을 처리하는 방법은 thenApply 를 통해 처리하는 방법이 있다.
                thenApply 메소드의 콜백으로 BaseFunction을 전달해주면, 결과가 돌아왔을 때 전달한 BaseFunction의 apply 메소드가 실행된다.
                Java8 이상에서는 lambda식(() -> {})을 활용해서 좀 더 간결하게 표현할 수 있다.
                */
                entry.getValue().thenApply(value -> {
                    // 비동기의 결과가 전달되었을 경우, 이 block이 실행된다. value는 ConfigEntry의 Collection이다.
                    value.entries().forEach(configEntry -> { // Collection 을 순회한다.
                        // source (default 인가 아니면 설정된 값인가)와 name, value를 표시한다.
                        LOG.info("[{}] {} = {}", configEntry.source().name(), configEntry.name(), configEntry.value());
                    });
                    return value;
                });
            }
            // 방법2. 동기식으로 사용하는 방법
            for (Map.Entry<ConfigResource, KafkaFuture<Config>> entry : entries) { // entries 는 동일하게 사용한다.
                // getValue() 에 .thenApply 를 사용하지 않고 .get() 을 사용하면 자체적으로 동기식으로 동작한다.
                for(ConfigEntry configEntry : entry.getValue().get().entries()){
                    // entry의 순회도 forEach가 아니라 enhanced-loops 로 처리하였다.
                    LOG.info("[{}] {} = {}", configEntry.source().name(), configEntry.name(), configEntry.value());
                }
            }

        }
    }
}

 

위의 코드를 수행한 결과는 아래와 같다

[DEFAULT_CONFIG] advertised.host.name = null
[DEFAULT_CONFIG] log.cleaner.min.compaction.lag.ms = 0
[DEFAULT_CONFIG] metric.reporters = 
[DEFAULT_CONFIG] quota.producer.default = 9223372036854775807
[DEFAULT_CONFIG] offsets.topic.num.partitions = 50
[DEFAULT_CONFIG] log.flush.interval.messages = 9223372036854775807
[DEFAULT_CONFIG] auto.create.topics.enable = true
[DEFAULT_CONFIG] controller.socket.timeout.ms = 30000
[DEFAULT_CONFIG] log.flush.interval.ms = null
[DEFAULT_CONFIG] principal.builder.class = null
[DEFAULT_CONFIG] replica.socket.receive.buffer.bytes = 65536
[DEFAULT_CONFIG] min.insync.replicas = 1
[DEFAULT_CONFIG] replica.fetch.wait.max.ms = 500
[STATIC_BROKER_CONFIG] num.recovery.threads.per.data.dir = 1
[DEFAULT_CONFIG] ssl.keystore.type = JKS
[DEFAULT_CONFIG] password.encoder.iterations = 4096
[DEFAULT_CONFIG] sasl.mechanism.inter.broker.protocol = GSSAPI
[DEFAULT_CONFIG] default.replication.factor = 1
[DEFAULT_CONFIG] ssl.truststore.password = null
[DEFAULT_CONFIG] log.preallocate = false
[DEFAULT_CONFIG] sasl.kerberos.principal.to.local.rules = DEFAULT
[DEFAULT_CONFIG] fetch.purgatory.purge.interval.requests = 1000
[DEFAULT_CONFIG] ssl.endpoint.identification.algorithm = https
[DEFAULT_CONFIG] replica.socket.timeout.ms = 30000
[DEFAULT_CONFIG] message.max.bytes = 1000012
[DEFAULT_CONFIG] transactional.id.expiration.ms = 604800000
[STATIC_BROKER_CONFIG] transaction.state.log.replication.factor = 1
[STATIC_BROKER_CONFIG] num.io.threads = 8
[DEFAULT_CONFIG] sasl.login.refresh.buffer.seconds = 300
[DEFAULT_CONFIG] offsets.commit.required.acks = -1
[STATIC_BROKER_CONFIG] delete.topic.enable = true
[DEFAULT_CONFIG] log.flush.offset.checkpoint.interval.ms = 60000
[DEFAULT_CONFIG] quota.window.size.seconds = 1
[DEFAULT_CONFIG] ssl.truststore.type = JKS
[DEFAULT_CONFIG] offsets.commit.timeout.ms = 5000
[DEFAULT_CONFIG] quota.window.num = 11
[STATIC_BROKER_CONFIG] zookeeper.connect = kafka1:2181,kafka2:2181,kafka3.2181/gnu-kafka
[DEFAULT_CONFIG] authorizer.class.name = 
[DEFAULT_CONFIG] password.encoder.secret = null
[DEFAULT_CONFIG] num.replica.fetchers = 1
[DEFAULT_CONFIG] alter.log.dirs.replication.quota.window.size.seconds = 1
[DEFAULT_CONFIG] log.retention.ms = null
[DEFAULT_CONFIG] alter.log.dirs.replication.quota.window.num = 11
[DEFAULT_CONFIG] log.roll.jitter.hours = 0
[DEFAULT_CONFIG] log.cleaner.enable = true
[DEFAULT_CONFIG] password.encoder.old.secret = null
[DEFAULT_CONFIG] offsets.load.buffer.size = 5242880
[DEFAULT_CONFIG] log.cleaner.delete.retention.ms = 86400000
[DEFAULT_CONFIG] ssl.client.auth = none
[DEFAULT_CONFIG] controlled.shutdown.max.retries = 3
[STATIC_BROKER_CONFIG] offsets.topic.replication.factor = 1
[DEFAULT_CONFIG] queued.max.requests = 500
[STATIC_BROKER_CONFIG] transaction.state.log.min.isr = 1
[DEFAULT_CONFIG] log.cleaner.threads = 1
[DEFAULT_CONFIG] ssl.secure.random.implementation = null
[DEFAULT_CONFIG] sasl.kerberos.service.name = null
[DEFAULT_CONFIG] sasl.kerberos.ticket.renew.jitter = 0.05
[STATIC_BROKER_CONFIG] socket.request.max.bytes = 104857600
[DEFAULT_CONFIG] ssl.trustmanager.algorithm = PKIX
[DEFAULT_CONFIG] zookeeper.session.timeout.ms = 6000
[DEFAULT_CONFIG] log.retention.bytes = -1
[DEFAULT_CONFIG] sasl.jaas.config = null
[DEFAULT_CONFIG] log.message.timestamp.type = CreateTime
[DEFAULT_CONFIG] sasl.kerberos.min.time.before.relogin = 60000
[DEFAULT_CONFIG] zookeeper.set.acl = false
[DEFAULT_CONFIG] connections.max.idle.ms = 600000
[DEFAULT_CONFIG] offsets.retention.minutes = 10080
[DEFAULT_CONFIG] delegation.token.expiry.time.ms = 86400000
[DEFAULT_CONFIG] replica.fetch.backoff.ms = 1000
[DEFAULT_CONFIG] inter.broker.protocol.version = 2.0-IV1
[DEFAULT_CONFIG] transaction.state.log.num.partitions = 50
[DEFAULT_CONFIG] listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
[STATIC_BROKER_CONFIG] log.retention.hours = 48
[STATIC_BROKER_CONFIG] num.partitions = 3
[DEFAULT_CONFIG] client.quota.callback.class = null
[STATIC_BROKER_CONFIG] listeners = PLAINTEXT://192.168.0.201:9092
[DEFAULT_CONFIG] broker.id.generation.enable = true
[DEFAULT_CONFIG] ssl.provider = null
[DEFAULT_CONFIG] ssl.enabled.protocols = TLSv1.2,TLSv1.1,TLSv1
[DEFAULT_CONFIG] inter.broker.listener.name = null
[DEFAULT_CONFIG] delete.records.purgatory.purge.interval.requests = 1
[DEFAULT_CONFIG] log.roll.ms = null
[DEFAULT_CONFIG] alter.config.policy.class.name = null
[DEFAULT_CONFIG] delegation.token.expiry.check.interval.ms = 3600000
[DEFAULT_CONFIG] zookeeper.max.in.flight.requests = 10
[DEFAULT_CONFIG] log.flush.scheduler.interval.ms = 9223372036854775807
[DEFAULT_CONFIG] ssl.cipher.suites = 
[DEFAULT_CONFIG] log.index.size.max.bytes = 10485760
[STATIC_BROKER_CONFIG] allow.auto.create.topics = null
[DEFAULT_CONFIG] ssl.keymanager.algorithm = SunX509
[DEFAULT_CONFIG] sasl.login.callback.handler.class = null
[DEFAULT_CONFIG] security.inter.broker.protocol = PLAINTEXT
[DEFAULT_CONFIG] replica.fetch.max.bytes = 1048576
[DEFAULT_CONFIG] sasl.server.callback.handler.class = null
[DEFAULT_CONFIG] advertised.port = null
[DEFAULT_CONFIG] log.cleaner.dedupe.buffer.size = 134217728
[DEFAULT_CONFIG] replica.high.watermark.checkpoint.interval.ms = 5000
[DEFAULT_CONFIG] replication.quota.window.size.seconds = 1
[DEFAULT_CONFIG] log.cleaner.io.buffer.size = 524288
[DEFAULT_CONFIG] sasl.kerberos.ticket.renew.window.factor = 0.8
[DEFAULT_CONFIG] create.topic.policy.class.name = null
[STATIC_BROKER_CONFIG] zookeeper.connection.timeout.ms = 6000
[DEFAULT_CONFIG] metrics.recording.level = INFO
[DEFAULT_CONFIG] password.encoder.cipher.algorithm = AES/CBC/PKCS5Padding
[DEFAULT_CONFIG] controlled.shutdown.retry.backoff.ms = 5000
[DEFAULT_CONFIG] log.roll.hours = 168
[DEFAULT_CONFIG] log.cleanup.policy = delete
[DEFAULT_CONFIG] log.flush.start.offset.checkpoint.interval.ms = 60000
[DEFAULT_CONFIG] host.name = 
[DEFAULT_CONFIG] log.roll.jitter.ms = null
[DEFAULT_CONFIG] transaction.state.log.segment.bytes = 104857600
[DEFAULT_CONFIG] max.connections.per.ip = 2147483647
[DEFAULT_CONFIG] offsets.topic.segment.bytes = 104857600
[DEFAULT_CONFIG] background.threads = 10
[DEFAULT_CONFIG] quota.consumer.default = 9223372036854775807
[DEFAULT_CONFIG] request.timeout.ms = 30000
[STATIC_BROKER_CONFIG] group.initial.rebalance.delay.ms = 0
[DEFAULT_CONFIG] log.message.format.version = 2.0-IV1
[DEFAULT_CONFIG] sasl.login.class = null
[DEFAULT_CONFIG] log.index.interval.bytes = 4096
[DEFAULT_CONFIG] log.dir = /tmp/kafka-logs
[STATIC_BROKER_CONFIG] log.segment.bytes = 1073741824
[DEFAULT_CONFIG] log.cleaner.backoff.ms = 15000
[DEFAULT_CONFIG] offset.metadata.max.bytes = 4096
[DEFAULT_CONFIG] ssl.truststore.location = null
[DEFAULT_CONFIG] replica.fetch.response.max.bytes = 10485760
[DEFAULT_CONFIG] group.max.session.timeout.ms = 300000
[DEFAULT_CONFIG] ssl.keystore.password = null
[DEFAULT_CONFIG] zookeeper.sync.time.ms = 2000
[DEFAULT_CONFIG] port = 9092
[DEFAULT_CONFIG] log.retention.minutes = null
[DEFAULT_CONFIG] log.segment.delete.delay.ms = 60000
[STATIC_BROKER_CONFIG] log.dirs = /data/kafka
[DEFAULT_CONFIG] controlled.shutdown.enable = true
[DEFAULT_CONFIG] compression.type = producer
[DEFAULT_CONFIG] max.connections.per.ip.overrides = 
[DEFAULT_CONFIG] log.message.timestamp.difference.max.ms = 9223372036854775807
[DEFAULT_CONFIG] sasl.login.refresh.window.factor = 0.8
[DEFAULT_CONFIG] sasl.login.refresh.min.period.seconds = 60
[DEFAULT_CONFIG] password.encoder.key.length = 128
[DEFAULT_CONFIG] transaction.abort.timed.out.transaction.cleanup.interval.ms = 60000
[DEFAULT_CONFIG] sasl.kerberos.kinit.cmd = /usr/bin/kinit
[DEFAULT_CONFIG] log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
[DEFAULT_CONFIG] auto.leader.rebalance.enable = true
[DEFAULT_CONFIG] leader.imbalance.check.interval.seconds = 300
[DEFAULT_CONFIG] log.cleaner.min.cleanable.ratio = 0.5
[DEFAULT_CONFIG] replica.lag.time.max.ms = 10000
[DEFAULT_CONFIG] max.incremental.fetch.session.cache.slots = 1000
[DEFAULT_CONFIG] delegation.token.master.key = null
[STATIC_BROKER_CONFIG] num.network.threads = 3
[DEFAULT_CONFIG] ssl.key.password = null
[DEFAULT_CONFIG] reserved.broker.max.id = 1000
[DEFAULT_CONFIG] sasl.client.callback.handler.class = null
[DEFAULT_CONFIG] transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
[DEFAULT_CONFIG] metrics.num.samples = 2
[STATIC_BROKER_CONFIG] socket.send.buffer.bytes = 102400
[DEFAULT_CONFIG] log.message.downconversion.enable = true
[DEFAULT_CONFIG] ssl.protocol = TLS
[DEFAULT_CONFIG] password.encoder.keyfactory.algorithm = null
[DEFAULT_CONFIG] transaction.state.log.load.buffer.size = 5242880
[STATIC_BROKER_CONFIG] socket.receive.buffer.bytes = 102400
[DEFAULT_CONFIG] ssl.keystore.location = null
[DEFAULT_CONFIG] replica.fetch.min.bytes = 1
[DEFAULT_CONFIG] broker.rack = null
[DEFAULT_CONFIG] num.replica.alter.log.dirs.threads = null
[DEFAULT_CONFIG] unclean.leader.election.enable = false
[DEFAULT_CONFIG] sasl.enabled.mechanisms = GSSAPI
[DEFAULT_CONFIG] group.min.session.timeout.ms = 6000
[DEFAULT_CONFIG] log.cleaner.io.buffer.load.factor = 0.9
[DEFAULT_CONFIG] offsets.retention.check.interval.ms = 600000
[DEFAULT_CONFIG] transaction.max.timeout.ms = 900000
[DEFAULT_CONFIG] producer.purgatory.purge.interval.requests = 1000
[DEFAULT_CONFIG] metrics.sample.window.ms = 30000
[STATIC_BROKER_CONFIG] broker.id = 1
[DEFAULT_CONFIG] offsets.topic.compression.codec = 0
[DEFAULT_CONFIG] delegation.token.max.lifetime.ms = 604800000
[DEFAULT_CONFIG] replication.quota.window.num = 11
[STATIC_BROKER_CONFIG] log.retention.check.interval.ms = 300000
[DEFAULT_CONFIG] advertised.listeners = null
[DEFAULT_CONFIG] queued.max.request.bytes = -1
[DEFAULT_CONFIG] leader.imbalance.per.broker.percentage = 10
[DEFAULT_CONFIG] sasl.login.refresh.window.jitter = 0.05

기본값으로 설정된 것들은 DEFAULT_CONFIG, 사용자가 server.properties 에 설정한 부분은 STATIC_BROKER_CONFIG로 표시됨을 알 수 있다.

 

다만 이 부분에 대해서 Kafka 소스 코드를 살펴보면 enum이 굉장히 많은데, 실제로 사용되는 부분은 몇 개 되지 않는 것 같다. (dynamic config 에 관련해서 BROKER 쪽의 enum도 있지만, 실제로 AdminClient 에서 구동 중인 Broker 의 Property 를 동적으로 바꿀 수는 없으며, 실제로 시도할 경우에도 브로커쪽에서는 바꿀 수 없다는 경고 메시지가 나온다) 다만, AdminClient 및 관련 소스 코드는 매우 활발하게 바뀌고 있는 듯 하다. github 의 로그를 보더라도 AdminClient 쪽이 며칠 전에도 매우 활발하게 바뀌고 있어서 이 부분은 계속 지켜봐야 할 것 같다.

 

AdminClient의 구체적인 활용법이나 이용해서 좀 더 많은 제어를 하는 방법은 다음 포스트에서 소개하려 한다.