토리맘의 한글라이즈 프로젝트 logo 토리맘의 한글라이즈 프로젝트

아파치 카프카 공식 레퍼런스를 한글로 번역한 문서입니다.

전체 목차는 여기에 있습니다.

목차


7.1 Security Overview

0.9.0.0 릴리즈에서 카프카 커뮤니티는 카프카 클러스터에 여러 가지 보안 기능을 추가했다. 이 기능들은 단독으로도, 여러 가지 조합으로도 사용할 수 있다. 현재는 다음과 같은 보안 조치를 지원한다:

  1. 클라이언트나(프로듀서, 컨슈머), 다른 브로커, 툴을 브로커에 연결할 때, SSL이나 SASL을 통해 커넥션을 인증할 수 있다. 카프카가 지원하는 SASL 메커니즘을 다음과 같다:
    • SASL/GSSAPI (Kerberos) - 0.9.0.0 버전 부터
    • SASL/PLAIN - 0.10.0.0 버전 부터
    • SASL/SCRAM-SHA-256, SASL/SCRAM-SHA-512 - 0.10.2.0 버전 부터
    • SASL/OAUTHBEARER - 2.0 버전 부터
  2. 브로커를 주키퍼에 연결할 때 커넥션을 인증할 수 있다.
  3. 브로커/클라이언트 간, 브로커들 간, 브로커/도구 간에 전송되는 데이터를 SSL을 통해 암호화할 수 있다 (SSL을 활성화하면 성능 저하가 생기며, CPU 타입, JVM 구현체에 따라 정도는 다르다).
  4. 클라이언트의 읽기/쓰기 권한 부여
  5. 인가 서비스는 원하는 걸 선택할 수 있으며, 외부 인가 서비스와도 통합할 수 있다.

보안은 선택 사항이란 점을 알아두자 - 보안 처리가 없는 클러스터도 지원하며, 인증, 비인증, 암호화, 비암호화 클라이언트를 혼합할 수 있다. 아래 가이드에선 클라이언트와 브로커에 보안 기능을 설정하고 사용하는 방법을 설명한다.


7.2 Encryption and Authentication using SSL

아파치 카프카에선 클라이언트가 SSL을 통해 트래픽을 암호화하고 인증을 수행할 수 있다. SSL은 기본적으로 비활성화돼 있지만, 필요하면 활성화할 수 있다. 이어지는 단락에선 자체 PKI 인프라를 세팅하고, 이를 통해 인증서를 만들고, 카프카에서 이 인증서를 사용하는 방법을 자세히 설명한다.

  1. Generate SSL key and certificate for each Kafka broker

    SSL을 지원하는 브로커를 하나 이상 배포할 땐 가정 먼저 모든 서버에 공개키/개인키 쌍을 만드는 것으로 시작한다. 카프카는 모든 키와 인증서를 keystore에서 찾기 때문에, 자바 keytool 명령어로 키와 인증서를 저장해보겠다. 이 툴은 두 가지 keystore 타입을 지원한다. 현재는 deprecated된 자바 전용 jks 포맷과, PKCS12다. PKCS12는 자바 버전 9의 디폴트 포맷이다. 아래 나오는 명령어에선 사용 중인 자바 버전에 관계 없이 이 포맷을 쓸 수 있도록 PKCS12 포맷을 명시한다.

    keytool -keystore {keystorefile} -alias localhost \
      -validity {validity} -genkey -keyalg RSA -storetype pkcs12
    

    위 명령어에선 두 개의 파라미터를 지정해야 한다:

    1. keystorefile: 이 브로커에 대한 키(나중엔 인증서도)를 저장할 keystore 파일. keystore 파일에는 이 브로커의 개인키와 공개키가 들어있으므로 안전하게 보관해야 한다. 키는 의도한 서버 밖으로 전송되거나 나가지 않는 게 좋기 때문에, 이 과정은 키를 사용하는 카프카 브로커에서 실행하는 게 가장 좋다.

    2. validity: 키의 유효 기간 (일수). 이 기간은 인증서를 서명할 때 결정되는 인증서의 유효 기간과는 다르다. 같은 키로 인증서는 여러 개 요청할 수 있다: CA가 1년간 유효한 인증서를 서명하는데 키의 유효 기간이 10년이라면, 시간이 지남에 따라 같은 키를 인증서 10개에 사용할 수 있다.

    이렇게 만든 개인키와 함께 사용할 수 있는 인증서를 얻으려면, 인증서 서명 요청(CSR)을 만들어야 한다. CSR을 통해 신뢰할 수 있는 CA로부터 서명받으면, 실제 인증서를 만들 수 있고, 이 인증서를 keystore에 설치해서 인증에 활용할 수 있다.

    인증서 서명 요청을 생성하려면 지금까지 만든 모든 서버 keystore로 다음 명령어를 실행해라.

    keytool -keystore server.keystore.jks -alias localhost \
      -validity {validity} -genkey -keyalg RSA -destkeystoretype pkcs12 \
      -ext SAN=DNS:{FQDN},IP:{IPADDRESS1}
    

    여기에선 인증서에 호스트명 정보를 추가한다고 가정한다. 호스트명을 추가하지 않을 땐 익스텐션 파라미터 -ext SAN=DNS:{FQDN},IP:{IPADDRESS1}은 생략할 수 있다. 자세한 내용은 아래를 참고해라.

    Host Name Verification

    호스트명 검증은 연결 중인 서버에서 제공하는 인증서의 속성을 해당 서버의 실제 호스트명이나 IP 주소와 대조해서, 실제로 올바른 서버에 연결하고 있는지 확인하는 프로세스다 (활성화하면).

    호스트명을 검사하는 가장 큰 이유는 중간자 공격(man-in-the-middle attack)을 방어하기 위해서다. 카프카에선 오랫동안 호스트명 검사를 디폴트로 비활성화했었지만, 카프카 2.0.0부터는 클라이언트 커넥션과 브로커 간 커넥션에 디폴트로 서버의 호스트명 검증을 활성화한다.

    서버 호스트명 검증을 비활성화려면 ssl.endpoint.identification.algorithm을 빈 문자열로 설정하면 된다.

    브로커 리스너를 동적으로 설정했다면, kafka-configs.sh로 호스트명 검증을 비활성화할 수 있다:

    bin/kafka-configs.sh --bootstrap-server localhost:9093 \
      --entity-type brokers --entity-name 0 \
      --alter --add-config "listener.name.internal.ssl.endpoint.identification.algorithm="
    

    참고 사항:

    호스트명을 검증하지 않는게 “일단 동작시키는” 가장 빠른 방법이라거나, “시간 날때 다음에 수정하겠다”라고 미루는 것 말고는 보통 호스트명 검증을 비활성화할 타당한 이유는 없다!

    제때 호스트명을 검증하기는 그다지 어렵지 않지만, 클러스터를 띄우고 나면 훨씬 더 어려워진다 - 스스로를 위해 지금 적용해라!

    호스트명 검증을 활성화하면, 클라이언트는 다음 두 필드 중 하나를 사용해 서버의 FQDN(fully qualified domain name)이나 IP 주소를 검증한다:

    1. CN (Common Name)
    2. SAN (Subject Alternative Name)

    카프카는 두 필드를 확인하긴 하지만, CN 필드로 호스트명을 검증하는 건 2000년부터 deprecated되었으며, 가능하면 피하는 게 좋다. 게다가 SAN 필드는 훨씬 더 유연해서, 인증서 하나에 DNS와 IP 엔트리를 여러 개 선언할 수 있다.

    한 가지 더 SAN 필드를 사용하면 좋은 점은, 권한 부여를 위해 CN을 좀 더 유의미한 값으로 설정할 수 있다는 거다. 인증서를 서명하려면 함께 넣을 SAN 필드가 필요하므로 서명 요청을 생성할 때 SAN 필드를 지정해보겠다. 키 쌍을 만들 때도 지정할 수 있지만, 서명 요청에 자동으로 복사되진 않는다.

    SAN 필드를 추가하려면 keytool 명령에 -ext SAN=DNS:{FQDN},IP:{IPADDRESS} 인자를 추가해라:

    keytool -keystore server.keystore.jks -alias localhost \
      -validity {validity} -genkey -keyalg RSA -destkeystoretype pkcs12 \
      -ext SAN=DNS:{FQDN},IP:{IPADDRESS1}
    
  2. Creating your own CA

    이 단계를 마쳤다면, 클러스터에 있는 모든 장비에는, 트래픽을 암호화하는데 사용할 공개키/개인키 쌍과, 인증서 생성의 기초가 되는 인증서 서명 요청(CSR)이 있을 거다. 인증 기능을 추가하려면 서명 요청을 신뢰할 수 있는 기관에 서명 받아야 한다. 신뢰할 수 있는 기관은 이번에 만들어볼 거다.

    인증 기관(CA)은 인증서 서명을 담당한다. CA는 여권을 발급하는 정부와 유사하다 - 정부는 여권을 위조하기 어렵도록 모든 여권에 도장(서명)을 남긴다. 다른 정부에선 도장을 검증해서 여권의 진위 여부를 가린다. 마찬가지로 CA는 인증서를 서명하고, 암호화를 통해 서명된 인증서는 확률상 위조하기 매우 어렵다. 따라서 CA가 내가 알고 있는 신뢰할 수 있는 기관이라면, 클라이언트는 의도한 실제 시스템에 연결하고 있다는 강한 확신을 가질 수 있다.

    이 가이드에선 우리가 직접 자체 인증 기관이 돼볼 거다. 보통 기업에서 실제 프로덕션 클러스터를 설정할 땐, 회사를 통틀어 신뢰할 수 있는 기업 CA로 인증서를 서명한다. 이때 고려해야 할 점들은 프로덕션 환경에서 빠지기 쉬운 함정들을 참고해라.

    x509 모듈은 OpenSSL의 버그로 인해 요청한 CRS의 익스텐션 필드를 최종 인증서로 복사하지 않는다. 호스트명 검증을 활성화하려면 인증서에 SAN 익스텐션이 있어야 하므로, 그대신 ca 모듈을 사용할 거다. 그러려면 CA 키쌍을 만들기 전에 몇 가지를 추가로 설정해야 한다. 다음 설정을 openssl-ca.cnf라는 파일에 저장하고, 필요에 따라 유효 기간과 공통 속성 값을 변경해라.

    HOME            = .
    RANDFILE        = $ENV::HOME/.rnd
       
    ####################################################################
    [ ca ]
    default_ca    = CA_default      # The default ca section
       
    [ CA_default ]
       
    base_dir      = .
    certificate   = $base_dir/cacert.pem   # The CA certifcate
    private_key   = $base_dir/cakey.pem    # The CA private key
    new_certs_dir = $base_dir              # Location for new certs after signing
    database      = $base_dir/index.txt    # Database index file
    serial        = $base_dir/serial.txt   # The current serial number
       
    default_days     = 1000         # How long to certify for
    default_crl_days = 30           # How long before next CRL
    default_md       = sha256       # Use public key default MD
    preserve         = no           # Keep passed DN ordering
       
    x509_extensions = ca_extensions # The extensions to add to the cert
       
    email_in_dn     = no            # Don't concat the email in the DN
    copy_extensions = copy          # Required to copy SANs from CSR to cert
       
    ####################################################################
    [ req ]
    default_bits       = 4096
    default_keyfile    = cakey.pem
    distinguished_name = ca_distinguished_name
    x509_extensions    = ca_extensions
    string_mask        = utf8only
       
    ####################################################################
    [ ca_distinguished_name ]
    countryName         = Country Name (2 letter code)
    countryName_default = DE
       
    stateOrProvinceName         = State or Province Name (full name)
    stateOrProvinceName_default = Test Province
       
    localityName                = Locality Name (eg, city)
    localityName_default        = Test Town
       
    organizationName            = Organization Name (eg, company)
    organizationName_default    = Test Company
       
    organizationalUnitName         = Organizational Unit (eg, division)
    organizationalUnitName_default = Test Unit
       
    commonName         = Common Name (e.g. server FQDN or YOUR name)
    commonName_default = Test Name
       
    emailAddress         = Email Address
    emailAddress_default = test@test.com
       
    ####################################################################
    [ ca_extensions ]
       
    subjectKeyIdentifier   = hash
    authorityKeyIdentifier = keyid:always, issuer
    basicConstraints       = critical, CA:true
    keyUsage               = keyCertSign, cRLSign
       
    ####################################################################
    [ signing_policy ]
    countryName            = optional
    stateOrProvinceName    = optional
    localityName           = optional
    organizationName       = optional
    organizationalUnitName = optional
    commonName             = supplied
    emailAddress           = optional
       
    ####################################################################
    [ signing_req ]
    subjectKeyIdentifier   = hash
    authorityKeyIdentifier = keyid,issuer
    basicConstraints       = CA:FALSE
    keyUsage               = digitalSignature, keyEncipherment
    

    그다음 데이터베이스 파일과 시리얼 넘버 파일을 만들어라. 이 CA로 서명된 인증서는 이 파일을 통해 추적한다. 둘 모두 CA 키와 같은 디렉토리에 있는 단순한 텍스트 파일이다.

    echo 01 > serial.txt
    touch index.txt
    

    여기까지 했다면 이후 인증서 서명에 사용할 CA를 생성할 준비가 된거다.

    openssl req -x509 -config openssl-ca.cnf -newkey rsa:4096 -sha256 \
      -nodes -out cacert.pem -outform PEM
    

    이 CA는 단순히 공캐키/개인키 쌍과 자체적으로 서명한 인증서일 뿐이며, 다른 인증서를 서명하는 용도로만 사용한다.

    이 키 쌍은 매우 안전하게 보관해야 한다. 누구라도 접근 권한을 받으면, 이 인프라에서 신뢰할 수 있는 인증서를 만들고 서명할 수 있다. 즉, 이 CA를 신뢰하는 어떤 서비스에도 다른 사람으로 가장해 연결할 수 있다는 말이다.

    다음 단계는 이렇게 생성한 CA를 클라이언트의 truststore에 추가해, 클라이언트가 이 CA를 신뢰할 수 있도록 만드는 거다:

    keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
    

    참고: 카프카 브로커 설정에서 ssl.client.auth를 “requested”나 “required”로 설정했다면, 카프카 브로커는 클라이언트 인증을 요구하므로, 카프카 브로커에 truststore를 제공해야 하며, truststore에는 클라이언트의 키를 서명한 모든 CA 인증서가 있어야 한다.

    keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
    

    1단계에서 만들었던 keystore는 각 장비의 고유 정보를 저장하던 것과 달리, 클라이언트의 truststore는 클라이언트가 신뢰해야 하는 모든 인증서를 저장한다. 인증서를 truststore로 임포트한다는 건, 이 인증서로 서명한 모든 인증서를 신뢰한다는 걸 의미하기도 한다. 위에서처럼 정부와 여권에 비유해보면, 정부(CA)를 신뢰한다는 건 정부가 발급한 모든 여권(인증서)을 신뢰한다는 걸 의미하기도 한다. 이 속성을 신뢰할 수 있는 경로(chain of trust)라고 부르며, SSL을 대규모 카프카 클러스터에 배포한다면 특히 유용하다. 단일 CA로 클러스터의 모든 인증서를 서명할 수 있으며, 모든 장비에 이 CA를 신뢰하는 같은 truststore를 공유할 수 있다. 이렇게하면 모든 장비가 다른 모든 장비를 인증할 수 있다.

  3. Signing the certificate

    이제 이 CA로 인증서를 서명한다:

    openssl ca -config openssl-ca.cnf -policy signing_policy \
      -extensions signing_req -out {server certificate} \
      -infiles {certificate signing request}
    

    마지막으로, CA 인증서와 서명된 인증서를 모두 keystore로 임포트해야 한다:

    keytool -keystore {keystore} -alias CARoot -import -file {CA certificate}
    keytool -keystore {keystore} -alias localhost -import -file cert-signed
    

    파라미터 정의는 다음과 같다:

    1. {keystore}: keystore의 위치
    2. {CA certificate}: CA의 인증서
    3. {certificate signing request}: 서버 키로 생성한 CSR
    4. {server certificate}: 서버의 서명된 인증서를 쓸 파일

    이렇게하면 truststore.jks라는 하나의 truststore가 남게된다 - 이 파일은 모든 클라이언트와 브로커에서 동일할 수 있으며, 민감 정보를 포함하지 않으므로 보호할 필요 없다.

    추가로, 노드 키와 인증서, CA 인증서를 가진 server.keystore.jks 파일이 노드마다 하나씩 있을 거다. 이 파일들을 사용하는 방법은 카프카 브로커 설정카프카 클라이언트 설정을 참고해라.

    이 주제에 관한 툴 수준 지원은 easyRSA 프로젝트를 확인해봐라. 이 과정에서 활용할 수 있는 광범위한 스크립팅을 지원한다.

    SSL key and certificates in PEM format

    2.7.0부터는 카프카 브로커와 클라이언트에 SSL 키와 truststore를 PEM 포맷으로 직접 설정할 수 있다. 이렇게하면 파일 시스템에 별도 파일을 저장하지 않아도 되고, 카프카 설정의 패스워드 보호 기능을 활용할 수 있다. PEM은 JKS와 PKCS12 말고도 다른 파일 기반 keystore와 truststore의 저장 타입으로도 사용할 수 있다. 브로커나 클라이언트 설정에 직접 PEM keystore를 설정하려면, PEM 형식의 개인키를 ssl.keystore.key에 제공하고, PEM 형식의 인증서 체인을 ssl.keystore.certificate.chain에 제공해야 한다. truststore를 설정하려면 ssl.truststore.certificates에 신뢰할 수 있는 인증서(예를 들어, CA의 public 인증서) 제공해야 한다. 일반적으로 PEM은 멀티 라인 base-64 문자열로 저장되기 때문에, 설정 값은 라인 끝에 백슬래시(‘\‘)가 있는 멀티 라인 문자열로 추가하면 된다.

    PEM에서는 저장소 패스워드 설정 ssl.keystore.passwordssl.truststore.password를 사용하지 않는다. 패스워드를 사용해 개인키를 암호화했다면, 키 패스워드는 반드시 ssl.key.password에 제공해야 한다. PEM을 설정 값에 직접 지정할 때는, 개인키는 패스워드 없이 암호화하지 않은 형식으로 제공할 수도 있다. 개인키를 암호화하지 않는다면 프로덕션 배포에선, 카프카의 패스워드 보호 기능을 사용해 설정을 암호화하거나, 외부에 보관해야 한다. OpenSSL같은 외부 도구를 사용해 암호화한다면, 디폴트 SSL 엔진 팩토리는 암호화된 개인키를 복호화하는 기능에 제약이 있다는 점에 주의해라. BouncyCastle같은 타사 라이브러리를 커스텀 SslEngineFactory에 통합하면 더 광범위한 암호화된 개인키를 지원할 수 있다.

  4. Common Pitfalls in Production

    위에서는 자체 CA를 만들고 이 CA를 통해 클러스터의 인증서를 서명하는 절차를 보여준다. 샌드박스나, dev, test 등의 시스템엔 매우 유용하지만, 기업에서 사용하는 프로덕션 클러스터의 인증서를 만들기엔 적합하지 않다. 기업은 보통 자체 CA를 운영하며, 사용자는 이 CA로 서명할 CSR을 전송할 수 있다. 이렇게 하면 사용자는 CA를 안전하게 유지하는데엔 신경쓰지 않아도 되고, 모두가 신뢰할 수 있는 중앙 기관을 활용할 수 있다. 하지만 사용자의 인증서를 서명하는 프로세스에 대한 제어권도 그만큼 사라진다. 기업 CA를 운영할 땐 인증서에 엄격한 제한을 두는 경우가 많아서, 카프카에 이 인증서를 사용하려고 하면 문제가 생길 수도 있다.

    1. Extended Key Usage

      인증서는 이 인증서를 사용할 수 있는 목적을 제어하는 확장 필드를 가질 수 있다. 이 필드가 비어 있으면 사용 목적에 제한을 두지 않지만, 뭔가를 지정했다면 이 용도대로 SSL을 구현해야 한다.

      카프카와 관련된 용도는 다음과 같다:

      • 클라이언트 인증
      • 서버 인증

      카프카 브로커는 클러스터 내부 브로커끼리 통신할 때 모든 브로커가 클라이언트로도, 서버로도 동작하기 때문에, 이 두 가지 모두 허용해야 한다. 기업 CA에 웹서버용 서명 프로필을 두고, 카프카에도 이 프로필을 사용하는 경우가 꽤 있다. 이 프로필은 serverAuth 용도만 포함하고 있기 때문에 SSL 핸드셰이크에 실패할 수 밖에 없다.

    2. Intermediate Certificates

      기업 루트 CA는 보통 보안상의 이유로 오프라인 환경에 보관한다. 매일같이 있는 최종 인증서 서명에 사용할, 소위 말해 중간(intermediate) CA를 만들곤 한다. 중간 CA로 서명한 인증서를 keystore로 임포트할때는 루트 CA까지 이르는 신뢰할 수 있는 경로(chain of trust)를 제공해야 한다. 이때는 인증서 파일들을 하나의 인증서 파일로 cat해서 합친 다음 keytool로 임포트하면 된다.

    3. Failure to copy extension fields

      CA operator는 CSR로 요청한 익스텐션 필드를 복사하지 않는 경우도 많으며, 이런 필드는 직접 지정해 악의적으로 오해의 소지가 있거나 허위 정보가 있는 인증서를 얻기 어렵게 만들곤 한다. 호스트명 검증을 활성화하려면, 서명된 인증서에 요청한 모든 SAN 필드가 들어 있는지 다시 한번 확인해보는 게 좋다. 다음 명령어로 인증서 세부 정보를 콘솔에 출력해볼 수 있다. 이 내용을 원래 요청했던 정보와 비교해봐라:

      openssl x509 -in certificate.crt -text -noout
      
  5. Configuring Kafka Brokers

    카프카 브로커는 여러 포트에서 커넥션 수신(listen)을 지원한다. server.properties에 다음과 같은 프로퍼티를 설정해야 하며, 콤마로 구분한 값이 하나 이상 있어야 한다:

    listeners
    

    브로커 간 통신에는 SSL을 활성화하지 않는다면 (활성화 방법은 아래 참고) PLAINTEXT 포트와 SSL 포트가 모두 필요하다.

    listeners=PLAINTEXT://host.name:port,SSL://host.name:port
    

    브로커 측에 필요한 SSL 설정은 다음과 같다.

    ssl.keystore.location=/var/private/ssl/server.keystore.jks
    ssl.keystore.password=test1234
    ssl.key.password=test1234
    ssl.truststore.location=/var/private/ssl/server.truststore.jks
    ssl.truststore.password=test1234
    

    참고: 엄밀히 말해 ssl.truststore.password는 필수는 아니지만, 적극 권장한다. 패스워드를 설정하지 않으면 truststore에 접근할 순 있어도, 무결성 검사는 비활성화된다. 필수는 아니지만 고려해봄직한 설정은 다음과 같다:

    1. ssl.client.auth=none ( “required”=> 클라이언트 인증 필요, “requested”=> 클라이언트 인증을 요청하며, 인증서가 없는 클라이언트도 연결할 수 있다. “requested”를 사용하면 사용은 보안에 관한 잘못된 인식을 줄 수 있고, 잘못 설정한 클라이언트도 연결할 수 있어서 권장하지 않는다. )
    2. ssl.cipher.suites (Optional). 암호화 스위트(cipher suite)는 TLS나 SSL 네트워크 프로토콜로 네트워크 커넥션 보안 설정을 협상하는데 사용하는 인증, 암호화, MAC, 키 교환 알고리즘을 하나로 조합해놓은 집합이다. (디폴트는 빈 리스트다)
    3. ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1 (클라이언트에서 수락할 SSL 프로토콜을 나열해라. 단, SSL은 TLS 이후 deprecated 되었으며, 프로덕션에서 SSL을 사용하는 건 권장하지 않는다.)
    4. ssl.keystore.type=JKS
    5. ssl.truststore.type=JKS
    6. ssl.secure.random.implementation=SHA1PRNG

    브로커 간 통신에 SSL을 활성화하려면 server.properties 파일에 다음을 추가해라 (기본값은 PLAINTEXT).

    security.inter.broker.protocol=SSL
    

    일부 국가의 수입 규정으로 인해 오라클 구현체는 기본으로 사용 가능한 암호화 알고리즘의 강도를 제한한다. 더 강력한 알고리즘이 필요하다면(256-bit 키를 사용하는 AES 등) JCE Unlimited Strength Jurisdiction Policy Files를 가져와 JDK/JRE에 설치해야 한다. 자세한 정보는 JCA 프로바이더 문서를 참고해라.

    JRE/JDK에는 암호화 연산에 사용하는 디폴트 PRNG(pseudo-random number generator)가 있으므로, ssl.secure.random.implementation에 구현체를 설정하지 않아도 된다. 하지만 일부 구현체엔 성능 이슈가 있다 (특히 리눅스 시스템에서 디폴트로 채택한 NativePRNG는 글로벌 잠금을 사용한다). SSL 연결 성능이 문제가 되면 사용할 구현체를 명시하는 것도 좋다. SHA1PRNG 구현체는 논블로킹이고 부하 (브로커당 50MB/sec로 메세지를 생성하고, 복제 트래픽은 별도) 속에서도 매우 우수한 성능을 보여준다.

    브로커를 시작하고 나면 server.log에서 다음과 같은 로그를 보게될 거다:

    with addresses: PLAINTEXT -> EndPoint(192.168.64.1,9092,PLAINTEXT),SSL -> EndPoint(192.168.64.1,9093,SSL)
    

    서버 keystore, truststore가 제대로 설정됐는지 빠르게 확인해보려면 다음 명령어를 실행하면 된다.

    openssl s_client -debug -connect localhost:9093 -tls1
    

    (참고: ssl.enabled.protocols에 TLSv1를 넣었어야 한다.)

    이 명령어의 출력으로 서버의 인증서를 확인할 수 있어야 한다:

    -----BEGIN CERTIFICATE-----
    {variable sized random bytes}
    -----END CERTIFICATE-----
    subject=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=Sriharsha Chintalapani
    issuer=/C=US/ST=CA/L=Santa Clara/O=org/OU=org/CN=kafka/emailAddress=test@test.com
    

    인증서를 볼 수 없거나 다른 에러 메세지가 보인다면 keystore가 제대로 설정되지 않았다는 거다.

  6. Configuring Kafka Clients

    SSL은 신규 카프카 프로듀서, 컨슈머에서만 지원하며 구버전 API에선 지원하지 않는다. SSL 관련 설정은 프로듀서, 컨슈머 모두 동일하다.

    브로커에서 클라이언트를 인증하지 않아도 된다면, 최소한의 설정 예시는 다음과 같다:

    security.protocol=SSL
    ssl.truststore.location=/var/private/ssl/client.truststore.jks
    ssl.truststore.password=test1234
    

    참고: 엄밀히 말해 ssl.truststore.password는 필수는 아니지만, 적극 권장한다. 패스워드를 설정하지 않으면 truststore에 접근할 순 있어도, 무결성 검사는 비활성화된다. 클라이언트 인증이 필요하다면, 1단계에서 처럼 keystore를 만들고 다음과 같이 설정해줘야 한다:

    ssl.keystore.location=/var/private/ssl/client.keystore.jks
    ssl.keystore.password=test1234
    ssl.key.password=test1234
    

    요구사항과 브로커 설정에 따라 다른 설정도 필요할 수 있다:

    1. ssl.provider (Optional). SSL 커넥션에 사용할 시큐리티 프로바이더 이름. 기본값은 JVM의 디폴트 시큐리티 프로바이더다.
    2. ssl.cipher.suites (Optional). 암호화 스위트(cipher suite)는 TLS나 SSL 네트워크 프로토콜로 네트워크 커넥션 보안 설정을 협상하는데 사용하는 인증, 암호화, MAC, 키 교환 알고리즘을 하나로 조합해놓은 집합이다. (디폴트는 빈 리스트다)
    3. ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1. 브로커에 설정한 프로토콜 중 하나는 나열해야 한다.
    4. ssl.truststore.type=JKS
    5. ssl.keystore.type=JKS

    console-producer와 console-consumer를 사용한 예시:

    kafka-console-producer.sh --bootstrap-server localhost:9093 \
      --topic test --producer.config client-ssl.properties
    kafka-console-consumer.sh --bootstrap-server localhost:9093 \
      --topic test --consumer.config client-ssl.properties
    

7.3 Authentication using SASL

  1. JAAS configuration

    카프카는 SASL 설정에 JAAS(Java Authentication and Authorization Service)를 사용한다.

    1. JAAS configuration for Kafka brokers

      KafkaServer는 각 KafkaServer/Broker에서 사용하는 JAAS 파일의 섹션 이름이다. 이 섹션은 브로커 간 통신을 위해 브로커가 만드는 모든 SASL 클라이언트 커넥션을 포함한, 브로커의 SASL 설정 옵션을 제공한다. 여러 리스너가 SASL을 사용하도록 설정했다면, 섹션 이름 앞에 소문자로된 리스너 이름과 마침표를 붙일 수 있다 (ex. sasl_ssl.KafkaServer).

      Client 섹션은 주키퍼와의 SASL 커넥션을 인증하는데 사용한다. 브로커에 주키퍼 노드 SASL ACL을 설정해서 이 노드들을 잠그고 해당 브로커만 수정 가능하게 만들 수 있다. 이땐 모든 브로커에서 동일한 principal 이름을 가져야 한다. 섹션 이름을 Client말고 다른 값으로 사용하고 싶다면, 적절한 이름을 시스템 프로퍼티 zookeeper.sasl.clientconfig에 설정하면 된다 (ex. -Dzookeeper.sasl.clientconfig=ZkClient).

      주키퍼는 기본적으로 “zookeeper”를 서비스 이름으로 사용한다. 변경하려면 적절한 이름을 시스템 프로퍼티 zookeeper.sasl.client.username에 설정하면 된다 (ex. -Dzookeeper.sasl.client.username=zk).

      브로커에선 브로커 설정 프로퍼티 sasl.jaas.config로도 JAAS를 구성할 수 있다. 프로퍼티명 앞에는 SASL 메커니즘을 포함하는 리스너 프리픽스가 있어야 한다 (ex. listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config). 설정 값에는 로그인 모듈 하나만 지정할 수 있다. 리스너 하나에 메커니즘을 여러 개 설정했다면, 각 메커니즘마다 리스너와 메커니즘 프리픽스를 달아준 설정을 제공해야 한다. 예를 들어,

      listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
          username="admin" \
          password="admin-secret";
      listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
          username="admin" \
          password="admin-secret" \
          user_admin="admin-secret" \
          user_alice="alice-secret";
      

      JAAS 설정을 여러 가지 레벨로 정의할 때는 다음과 같은 우선 순위를 따른다:

      • 브로커 설정 프로퍼티 listener.name.{listenerName}.{saslMechanism}.sasl.jaas.config
      • 스태틱 JAAS 설정의 {listenerName}.KafkaServer 섹션
      • 스태틱 JAAS 설정의 KafkaServer 섹션

      단, 주키퍼 JAAS 설정은 스태틱 JAAS 설정으로만 가능하다.

      브로커 설정 예시는 GSSAPI (Kerberos), PLAIN, SCRAM, OAUTHBEARER를 참고해라.

    2. JAAS configuration for Kafka clients

      클라이언트는 클라이언트 설정 프로퍼티 sasl.jaas.config를 사용하거나, 브로커와 유사한 스태틱 JAAS 설정 파일을 사용해서 JAAS를 설정할 수 있다.

      1. JAAS configuration using client configuration property

        클라이언트는 물리적인 설정 파일을 만들지 않아도, JAAS 설정을 프로듀서나 컨슈머 프로퍼티로 지정할 수 있다. 게다가 이 모드에선 동일한 JVM 내에 있는 프로듀서와 컨슈머마다 다른 클라이언트 프로퍼티를 지정하면 다른 credential을 사용할 수 있다. 스태틱 JAAS 설정 시스템 프로퍼티 java.security.auth.login.config, 클라이언트 프로퍼티 sasl.jaas.config를 모두 지정하면 클라이언트 프로퍼티를 사용한다.

        설정 예시는 GSSAPI (Kerberos), PLAIN, SCRAM, OAUTHBEARER를 참고해라.

      2. JAAS configuration using static config file

        스태틱 JAAS 설정 파일로 클라이언트에 SASL 인증을 설정하려면:

        1. KafkaClient라는 클라이언트 로그인 섹션을 가진 JAAS 설정 파일을 추가해라. GSSAPI (Kerberos), PLAIN, SCRAM, OAUTHBEARER 설정 예시에서 설명하는대로, KafkaClient에 원하는 메커니즘으로 로그인 모듈을 구성해라. 예를 들어 GSSAPI credential은 다음과 같이 설정할 수 있다:

          KafkaClient {
              com.sun.security.auth.module.Krb5LoginModule required
              useKeyTab=true
              storeKey=true
              keyTab="/etc/security/keytabs/kafka_client.keytab"
              principal="kafka-client-1@EXAMPLE.COM";
          };
          
        2. 각 클라이언트 JVM에 JAAS 설정 파일 위치를 JVM 파라미터로 전달해라. 예를 들면:

          -Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf
          
  2. SASL configuration

    SASL에선 보안 프로토콜 SASL_PLAINTEXT나 SASL_SSL을 사용할 수 있다. 각각은 전송 계층으로 PLAINTEXT, SSL을 함께 사용한다. SASL_SSL을 사용한다면 반드시 SSL도 설정해야 한다.

    1. SASL mechanisms

      카프카는 다음과 같은 SASL 메커니즘을 지원한다:

    2. SASL configuration for Kafka brokers

      1. server.properties에 SASL 포트를 설정해라. listeners 파라미터에 SASL_PLAINTEXT나 SASL_SSL 중 최소 하나를 추가하면 된다. 값이 둘 이상일 땐 쉼표로 구분한다:

        listeners=SASL_PLAINTEXT://host.name:port
        

        SASL 포트만 설정하고 싶다면 (또는 카프카 브로커가 SASL로 서로를 인증하도록 만들고 싶다면), 브로커 간 통신에도 같은 SASL 프로토콜을 설정해야 한다:

        security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)
        
      2. 지원되는 메커니즘 중에서 브로커에 활성화할 메커니즘을 하나 이상 고르고, 가이드에 따라 SASL을 해당 메커니즘으로 설정해라. 브로커에서 메커니즘을 여러 개 활성화하려면 여기에 있는 절차를 따라해라.

    3. SASL configuration for Kafka clients

      SASL 인증은 신규 자바 카프카 프로듀서와 컨슈머에서만 지원하며, 구버전 API에선 지원하지 않는다.

      클라이언트에 SASL 인증을 설정하려면, 브로커에서 클라이언트 인증에 활성화한 SASL 메커니즘을 선택하고, 가이드에 따라 SASL을 해당 메커니즘으로 설정해라.

  3. Authentication using SASL/Kerberos

    1. Prerequisites

      1. Kerberos

        조직에서 사용하는 커버로스 서버가 이미 있다면 (예를 들어 Active Directory를 사용하는), 카프카만을 위한 새 서버를 설치할 필요는 없다. 그 외는 서버 하나를 설치해야 한다. 리눅스 벤더는 커버로스용 패키지와, 이 패키치를 설치하고 구성하는 방법에 대한 간단한 가이드를 제공할 거다 (Ubuntu, Redhat). 오라클 자바를 사용하고 있다면, 자바 버전에 맞는 JCE 정책 파일을 다운받아 $JAVA_HOME/jre/lib/security에 복사해야 한다.

      2. Create Kerberos Principals

        조직의 커버로스나 Active Directory 서버를 사용한다면, 클러스터에 있는 모든 카프카 브로커와, 커버로스 인증으로 카프카에 접근하는 (클라이언트나 툴을 통해) 모든 OS 사용자의 principal은 커버로스 관리자에게 문의해라.

        자체 커버로스를 설치했다면 다음 명령어로 직접 principal을 만들어야 할 거다:

        sudo /usr/sbin/kadmin.local -q 'addprinc -randkey kafka/{hostname}@{REALM}'
        sudo /usr/sbin/kadmin.local -q "ktadd -k /etc/security/keytabs/{keytabname}.keytab kafka/{hostname}@{REALM}"
        
      3. Make sure all hosts can be reachable using hostnames - FQDN으로 모든 호스트를 리졸브할 수 있어야 하는 건 커버로스 요구 사항이다.

    2. Configuring Kafka Brokers

      1. 아래 JAAS 파일을 적절히 수정해서 각 카프카 브로커의 config 디렉토리에 추가해라. 이 예시에선 kafka_server_jaas.conf라고 명명하겠다 (모든 브로커엔 자체 키탭이 있어야 한다):

        KafkaServer {
            com.sun.security.auth.module.Krb5LoginModule required
            useKeyTab=true
            storeKey=true
            keyTab="/etc/security/keytabs/kafka_server.keytab"
            principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
        };
                 
        // Zookeeper client authentication
        Client {
            com.sun.security.auth.module.Krb5LoginModule required
            useKeyTab=true
            storeKey=true
            keyTab="/etc/security/keytabs/kafka_server.keytab"
            principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
        };
        

        JAAS 파일에 있는 KafkaServer 섹션은 브로커에 사용할 principal과, 이 principal이 저장된 키탭의 위치를 알려준다. 브로커는 이 섹션에 지정한 키탭을 사용해 로그인할 수 있다. 주키퍼 SASL 설정에 대한 자세한 내용은 참고 사항을 확인해봐라.

      2. 각 카프카 브로커에 JVM 파라미터로 JAAS를 전달해라. 원한다면 krb5 파일 위치도 넘길 수 있다 (자세한 내용은 여기 참고).

        -Djava.security.krb5.conf=/etc/kafka/krb5.conf
            -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
        
      3. 카프카 브로커를 시작하는 OS user가 JAAS 파일에 설정한 키탭을 읽을 수 있는지 확인해봐라.

      4. 여기에서 설명하는대로 server.properties에 SASL 포트와 SASL 메커니즘을 설정해라. 예를 들면:

        listeners=SASL_PLAINTEXT://host.name:port
            security.inter.broker.protocol=SASL_PLAINTEXT
            sasl.mechanism.inter.broker.protocol=GSSAPI
            sasl.enabled.mechanisms=GSSAPI
        

        server.properties에는 반드시 카프카 브로커의 principal 명과 일치하는 서비스 이름도 설정해야 한다. 위 예시에서 principal은 “kafka/kafka1.hostname.com@EXAMPLE.com”이므로, 다음과 같이 설정할 수 있다:

        sasl.kerberos.service.name=kafka
        
    3. Configuring Kafka Clients

      클라이언트에 SASL 인증을 설정하려면:

      1. 클라이언트(프로듀서, 컨슈머, 커넥트 워커 등)는 클러스터에 인증할 때 자체 principal(보통은 클라이언트를 실행하는 user와 동일한 이름)을 사용하므로, 필요에 따라 이 principal을 발급받거나 직접 만들어라. 그런 다음 각 클라이언트에 JAAS 설정 프로퍼티를 추가해라. 같은 JVM 내에 있는 클라이언트가 여럿일 땐 다른 principal을 지정하면 다른 사용자로 실행할 수 있다. producer.properties, consumer.properties의 sasl.jaas.config 프로퍼티는 프로듀서와 컨슈머같은 클라이언트가 카프카 브로커에 연결하는 방법을 나타낸다. 다음은 키탭(오랫동안 실행하는 프로세스에 권장)을 사용하는 클라이언트 설정 예시다:

        sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
            useKeyTab=true \
            storeKey=true  \
            keyTab="/etc/security/keytabs/kafka_client.keytab" \
            principal="kafka-client-1@EXAMPLE.COM";
        

        kafka-console-consumer나 kafka-console-producer같은 커맨드라인 유틸리티에선, 다음과 같이 “useTicketCache=true”를 지정해 kinit을 사용할 수 있다:

        sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required \
            useTicketCache=true;
        

        클라이언트에 대한 JAAS 설정은 여기에서 설명하는대로 브로커와 유사한 JVM 파라미터로 지정할 수도 있다. 클라이언트는 KafkaClient라는 로그인 섹션을 사용한다. 이 옵션은 JVM에 있는 모든 클라이언트 커넥션에 대해 단 한 명의 사용자만 허용한다.

      2. 카프카 클라이언트를 시작하는 OS user가 JAAS 설정에 있는 키탭을 읽을 수 있는지 확인해봐라.

      3. 필요하면 각 클라이언트 JVM에 JVM 파라미터로 krb5 파일 위치를 전달해라 (자세한 내용은 여기 참고):

        -Djava.security.krb5.conf=/etc/kafka/krb5.conf
        
      4. producer.properties 또는 consumer.properties에 다음 프로퍼티를 설정해라:

        security.protocol=SASL_PLAINTEXT (or SASL_SSL)
        sasl.mechanism=GSSAPI
        sasl.kerberos.service.name=kafka
        
  4. Authentication using SASL/PLAIN

    SASL/PLAIN은 보안 인증 구현에서 암호화를 위해 보통 TLS와 함께 사용하는 간단한 사용자 username/password 인증 메커니즘이다. 카프카는 여기에서 설명하는대로 프로덕션 용도로 확장할 수 있는 SASL/PLAIN 디폴트 구현체를 지원다.

    username은 ACL 등을 설정하기 위한 인증된 Principal로 사용된다.

    1. Configuring Kafka Brokers

      1. 아래 JAAS 파일을 적절히 수정해서 각 카프카 브로커의 config 디렉토리에 추가해라. 이 예시에선 kafka_server_jaas.conf라고 명명하겠다:

        KafkaServer {
            org.apache.kafka.common.security.plain.PlainLoginModule required
            username="admin"
            password="admin-secret"
            user_admin="admin-secret"
            user_alice="alice-secret";
        };
        

        이 설정은 두 명의 사용자(adminalice)를 정의하고 있다. KafkaServer 섹션에 있는 프로퍼티 usernamepassword는 브로커가 다른 브로커와 연결을 시작할 때 사용한다. 이 예시에선 admin이 브로커 간 통신을 위한 사용자다. user_userName 프로퍼티 셋은 브로커에 연결하는 모든 사용자의 비밀번호를 정의하며, 브로커는 이 프로퍼티를 통해 모든 클라이언트 커넥션을 검증한다. 다른 브로커가 연결할 때도 마찬가지다.

      2. 각 카프카 브로커에 JVM 파라미터로 JAAS 설정 파일 위치를 전달해라:

        -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
        
      3. 여기에서 설명하는대로 server.properties에 SASL 포트와 SASL 메커니즘을 설정해라. 예를 들면:

        listeners=SASL_SSL://host.name:port
            security.inter.broker.protocol=SASL_SSL
            sasl.mechanism.inter.broker.protocol=PLAIN
            sasl.enabled.mechanisms=PLAIN
        
    2. Configuring Kafka Clients

      클라이언트에 SASL 인증을 설정하려면:

      1. 각 클라이언트에서 producer.properties 또는 consumer.properties에 JAAS 설정 프로퍼티를 추가해라. 로그인 모듈은 프로듀서와 컨슈머같은 클라이언트가 카프카 브로커에 연결하는 방법을 나타낸다. 다음은 PLAIN 메커니즘을 사용하는 클라이언트 설정 예시다:

        sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
            username="alice" \
            password="alice-secret";
        

        username, password 옵션은 클라이언트가 클라이언트 연결을 위한 사용자를 구성할 때 사용한다. 이 예시에선, 사용자 alice로 브로커에 연결한다. 같은 JVM 내에 있는 클라이언트가 여럿일 땐, sasl.jaas.config에 username과 password를 다르게 지정하면 다른 사용자로 연결할 수 있다.

        클라이언트에 대한 JAAS 설정은 여기에서 설명하는대로 브로커와 유사한 JVM 파라미터로 지정할 수도 있다. 클라이언트는 KafkaClient라는 로그인 섹션을 사용한다. 이 옵션은 JVM에 있는 모든 클라이언트 커넥션에 대해 단 한 명의 사용자만 허용한다.

      2. producer.properties 또는 consumer.properties에 다음 프로퍼티를 설정해라:

        security.protocol=SASL_SSL
        sasl.mechanism=PLAIN
        
    3. Use of SASL/PLAIN in production

      • 패스워드를 암호화하지 않은 채로 그대로 네트워크로 전송하지 않도록, SASL/PLAIN은 반드시 전송 계층 SSL과 함께 사용해야 한다.
      • 카프카에서 SASL/PLAIN의 기본 구현체는 여기에 보이는 것처럼 JAAS 설정 파일에 username과 password를 지정한다. 카프카 2.0부터는 설정 옵션 sasl.server.callback.handler.class, sasl.client.callback.handler.class를 사용하면, 외부 소스에서 username과 password를 가져오는 자체 콜백 핸들러를 설정할 수 있다. 이렇게하면 디스크에 패스워드를 그대로 저장하지 않아도 된다.
      • 프로덕션 시스템에선, 외부 인증 서버로 패스워드 인증을 구현할 수도 있다. 카프카 2.0부터는 sasl.server.callback.handler.class를 설정하면, 외부 인증 서버를 사용해 패스워드를 검증하는 자체 콜백 핸들러를 연결할 수 있다.
  5. Authentication using SASL/SCRAM

    SCRAM(Salted Challenge Response Authentication Mechanism)은 PLAIN, DIGEST-MD5같은 username/password 인증을 수행하는 전통적인 메커니즘으로 보안 문제를 해결하는 SASL 계열 메커니즘이다. 이 메커니즘은 RFC 5802에 정의돼 있다. 카프카는 TLS와 함께 사용해 안전하게 인증할 수 있는 SCRAM-SHA-256과 SCRAM-SHA-512를 지원한다. username은 ACL 등을 설정하기 위한 인증된 Principal로 사용된다. 카프카의 디폴트 SCRAM 구현체는 SCRAM credential을 주키퍼에 저장하며, 사설망에 있는 주키퍼를 사용할 때 적합하다. 자세한 내용은 보안상 주의 사항을 참고해라.

    1. Creating SCRAM Credentials

      카프카의 SCRAM 구현체는 주키퍼를 credential 저장소로 사용한다. kafka-configs.sh를 사용하면 주키퍼에 credential을 생성할 수 있다. credential을 만들 땐 반드시 활성화한 SCRAM 메커니즘마다, 메커니즘 이름과 함께 각 설정을 추가해야 한다. 브로커 간 통신을 위한 credential은 카프카 브로커를 시작하기 전에 만들어야 한다. 클라이언트 credential은 동적으로 생성하고 업데이트할 수 있으며, 새 커넥션을 인증할 땐 업데이트된 credential을 사용하게 된다.

      사용자 alice, 패스워드 alice-secret으로 SCRAM credential을 생성해보자:

      > bin/kafka-configs.sh --zookeeper localhost:2182 \
        --zk-tls-config-file zk_tls_config.properties \
        --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-secret],SCRAM-SHA-512=[password=alice-secret]' \
        --entity-type users --entity-name alice
      

      iterations를 지정하지 않으면 디폴트 반복 횟수 4096을 사용한다. 랜덤 솔트가 만들어지며, salt, iterations, StoredKey, ServerKey로 구성된 SCRAM identity가 주키퍼에 저장된다. SCRAM identity와 개별 필드에 대한 자세한 내용은 RFC 5802를 참고해라.

      아래 예제에선 브로커 간의 통신을 위한 사용자 admin도 필요하다:

      > bin/kafka-configs.sh --zookeeper localhost:2182 \
        --zk-tls-config-file zk_tls_config.properties \
        --alter --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA-512=[password=admin-secret]' \
        --entity-type users --entity-name admin
      

      현재 등록된 credential은 --describe 옵션으로 조회할 수 있다:

      > bin/kafka-configs.sh --zookeeper localhost:2182 \
        --zk-tls-config-file zk_tls_config.properties --describe \
        --entity-type users --entity-name alice
      

      --alter --delete-config 옵션을 사용하면 SCRAM 메커니즘에 대한 credential을 하나 이상 삭제할 수 있다:

      > bin/kafka-configs.sh --zookeeper localhost:2182 \
        --zk-tls-config-file zk_tls_config.properties \
        --alter --delete-config 'SCRAM-SHA-512' \
        --entity-type users --entity-name alice
      
    2. Configuring Kafka Brokers

      1. 아래 JAAS 파일을 적절히 수정해서 각 카프카 브로커의 config 디렉토리에 추가해라. 이 예시에선 kafka_server_jaas.conf라고 명명하겠다:

        KafkaServer {
            org.apache.kafka.common.security.scram.ScramLoginModule required
            username="admin"
            password="admin-secret";
        };
        

        KafkaServer 섹션에 있는 프로퍼티 usernamepassword는 브로커가 다른 브로커와 연결을 시작할 때 사용한다. 이 예시에선 admin이 브로커 간 통신을 위한 사용자다.

      2. 각 카프카 브로커에 JVM 파라미터로 JAAS 설정 파일 위치를 전달해라:

        -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
        
      3. 여기에서 설명하는대로 server.properties에 SASL 포트와 SASL 메커니즘을 설정해라. 예를 들면:

        listeners=SASL_SSL://host.name:port
        security.inter.broker.protocol=SASL_SSL
        sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256 (or SCRAM-SHA-512)
        sasl.enabled.mechanisms=SCRAM-SHA-256 (or SCRAM-SHA-512)
        
    3. Configuring Kafka Clients

      클라이언트에 SASL 인증을 설정하려면:

      1. 각 클라이언트에서 producer.properties 또는 consumer.properties에 JAAS 설정 프로퍼티를 추가해라. 로그인 모듈은 프로듀서와 컨슈머같은 클라이언트가 카프카 브로커에 연결하는 방법을 나타낸다. 다음은 SCRAM 메커니즘을 사용하는 클라이언트 설정 예시다:

        sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
             username="alice" \
             password="alice-secret";
        

        username, password 옵션은 클라이언트가 클라이언트 연결을 위한 사용자를 구성할 때 사용한다. 이 예시에선, 사용자 alice로 브로커에 연결한다. 같은 JVM 내에 있는 클라이언트가 여럿일 땐, sasl.jaas.config에 username과 password를 다르게 지정하면 다른 사용자로 연결할 수 있다.

        클라이언트에 대한 JAAS 설정은 여기에서 설명하는대로 브로커와 유사한 JVM 파라미터로 지정할 수도 있다. 클라이언트는 KafkaClient라는 로그인 섹션을 사용한다. 이 옵션은 JVM에 있는 모든 클라이언트 커넥션에 대해 단 한 명의 사용자만 허용한다.

      2. producer.properties 또는 consumer.properties에 다음 프로퍼티를 설정해라:

        security.protocol=SASL_SSL
        sasl.mechanism=SCRAM-SHA-256 (or SCRAM-SHA-512)
        
    4. Security Considerations for SASL/SCRAM

      • 카프카의 SASL/SCRAM의 기본 구현체는 SCRAM credential을 주키퍼에 저장한다. 따라서 프로덕션에선 주키퍼가 안전한 사설 네트워크에 있을 때 적합하다.
      • 카프카는 최소 반복 횟수가 4096인 강력한 해시 함수, SHA-256과 SHA-512만 지원한다. 이 해시 함수는 강력한 패스워드와 높은 반복 횟수를 지녀, 주키퍼 보안이 손상되더라도, 무차별 대입 공격(brute force attack)을 방어할 수 있다.
      • SCRAM 정보를 교환할 때 누군가 가로채갈 수 없도록 SCRAM은 반드시 TLS 암호화와 함께 사용해야 한다. 이렇게하면 주키퍼가 손상되더라도, 사전 공격(dictionary attack)이나 무차별 대입 공격(brute force attack)을 방어할 수 있다.
      • 주키퍼가 안전하다고 볼 수 없다면, 카프카 2.0부터는 sasl.server.callback.handler.class를 설정해서 커스텀 콜백 핸들러를 통해 디폴트 SASL/SCRAM credential 저장소를 재정의할 수 있다.
      • 보안상 주의 사항과 관련한 상세 내용은 RFC 5802를 참고해라.
  6. Authentication using SASL/OAUTHBEARER

    OAuth 2 인가 프레임워크는 “다른 어플리케이션이 HTTP 서비스에 접근할 때 제한된 접근 권한만 갖도록 해준다. 리소스 소유자를 대신해서 리소스 소유자와 HTTP 서비스 간의 승인 상호 작용을 조정하거나, 다른 어플리케이션 스스로가 접근 권한을 가져가도록 허가하는 식으로 말이다.” SASL OAUTHBEARER 메커니즘은 SASL(즉, HTTP가 아닌) 컨텍스트에서 이 프레임워크를 사용할 수 있도록 해준다. 이는 RFC 7628에 정의돼 있다. 카프카의 기본 OAUTHBEARER 구현체는 unsecured JSON 웹 토큰을 생성하고 검증하며, 프로덕션 환경에는 적합하지 않다. 자세한 내용은 보안상 주의 사항을 참고해라.

    1. Configuring Kafka Brokers

      1. 아래 JAAS 파일을 적절히 수정해서 각 카프카 브로커의 config 디렉토리에 추가해라. 이 예시에선 kafka_server_jaas.conf라고 명명하겠다:

        KafkaServer {
            org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required
            unsecuredLoginStringClaim_sub="admin";
        };
        

        KafkaServer 섹션에 있는 unsecuredLoginStringClaim_sub 프로퍼티는 브로커가 다른 브로커와 연결을 시작할 때 사용한다. 이 예시에선 subject(sub) 클레임에 admin을 설정하게 되며, 바로 이 admin이 브로커 간 통신을 위한 사용자가 된다.

      2. 각 카프카 브로커에 JVM 파라미터로 JAAS 설정 파일 위치를 전달해라:

        -Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
        
      3. 여기에서 설명하는대로 server.properties에 SASL 포트와 SASL 메커니즘을 설정해라. 예를 들면:

        listeners=SASL_SSL://host.name:port (or SASL_PLAINTEXT if non-production)
        security.inter.broker.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
        sasl.mechanism.inter.broker.protocol=OAUTHBEARER
        sasl.enabled.mechanisms=OAUTHBEARER
        
    2. Configuring Kafka Clients

      클라이언트에 SASL 인증을 설정하려면:

      1. 각 클라이언트에서 producer.properties 또는 consumer.properties에 JAAS 설정 프로퍼티를 추가해라. 로그인 모듈은 프로듀서와 컨슈머같은 클라이언트가 카프카 브로커에 연결하는 방법을 나타낸다. 다음은 OAUTHBEARER 메커니즘을 사용하는 클라이언트 설정 예시다:

        sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
             unsecuredLoginStringClaim_sub="alice";
        

        unsecuredLoginStringClaim_sub 옵션은 클라이언트가 subject(sub) 클레임을 구성할 때 사용한다. 이 클레임으로 클라이언트 연결을 위한 사용자를 결정한다. 이 예시에선, 사용자 alice로 브로커에 연결한다. 같은 JVM 내에 있는 클라이언트가 여럿일 땐 sasl.jaas.config에 다른 subject(sub) 클레임을 지정하면 다른 사용자로 연결할 수 있다.

        클라이언트에 대한 JAAS 설정은 여기에서 설명하는대로 브로커와 유사한 JVM 파라미터로 지정할 수도 있다. 클라이언트는 KafkaClient라는 로그인 섹션을 사용한다. 이 옵션은 JVM에 있는 모든 클라이언트 커넥션에 대해 단 한 명의 사용자만 허용한다.

      2. producer.properties 또는 consumer.properties에 다음 프로퍼티를 설정해라:

        security.protocol=SASL_SSL (or SASL_PLAINTEXT if non-production)
        sasl.mechanism=OAUTHBEARER
        
      3. SASL/OAUTHBEARER의 기본 구현체는 jackson-databind 라이브러리에 따라 다르다. 이 라이브러리는 필수 의존성이 아니기 때문에, 사용자가 직접 빌드 툴을 통해 의존성을 설정해야 한다.

    3. Unsecured Token Creation Options for SASL/OAUTHBEARER

      • 카프카의 SASL/OAUTHBEARER 기본 구현체는 Unsecured JSON 웹 토큰을 생성하고 검증한다. 프로덕션 환경에선 적합하지 않지만, DEV나 TEST 환경에서 임의의 토큰을 생성하는 식으로 활용하기 좋다.

      • 다음은 클라이언트 측에 (브로커 간 통신 프로토콜이 OAUTHBEARER라면 브로커에서도) 지원하는 여러 가지 JAAS 모듈 옵션이다:

        JAAS Module Option for Unsecured Token Creation Documentation
        unsecuredLoginStringClaim_<claimname>="value" 주어진 이름과 값으로 String 클레임을 만든다. ‘iat’, ‘exp‘를 제외한(자동으로 생성된다) 모든 유효한 클레임 이름을 지정할 수 있다.
        unsecuredLoginNumberClaim_<claimname>="value" 주어진 이름과 값으로 Number 클레임을 만든다. ‘iat’, ‘exp‘를 제외한(자동으로 생성된다) 모든 유효한 클레임 이름을 지정할 수 있다.
        unsecuredLoginListClaim_<claimname>="value" 주어진 값을 파싱해서 주어진 이름에 해당하는 String List 클레임을 만든다. 값에 있는 첫 번째 문자를 구분자로 사용한다. 예를 들면: unsecuredLoginListClaim_fubar="|value1|value2". ‘iat’, ‘exp‘를 제외한(자동으로 생성된다) 모든 유효한 클레임 이름을 지정할 수 있다.
        unsecuredLoginExtension_<extensionname>="value" 주어진 이름과 값으로 String 익스텐션을 만든다. 예를 들면: unsecuredLoginExtension_traceId="123". 익스텐션 이름에는 소문자 또는 대문자로된 알파벳 문자를 사용할 수 있다. 단, “auth” 익스텐션은 예약돼 있다. 익스텐션 값에는 ASCII 코드 1-127 문자를 사용할 수 있다.
        unsecuredLoginPrincipalClaimName principal 이름을 가진 String 클레임명에 ‘sub‘가 아닌 다른 값을 사용하고 싶다면 커스텀 클레임 이름을 설정해라.
        unsecuredLoginLifetimeSeconds 토큰 만료 시간으로 기본값 3600초(1 시간)가 아닌 다른 값을 설정하고 싶다면 정수 값을 지정해라. ‘exp’ 클레임에 이 만료 시간이 반영된다.
        unsecuredLoginScopeClaimName 토큰 스코프를 가진 String, String List 클레임명에 ‘scope‘가 아닌 다른 값을 사용하고 싶다면 커스텀 클레임 이름을 설정해라.
    4. Unsecured Token Validation Options for SASL/OAUTHBEARER

      • 다음은 브로커 측에 지원하는 Unsecured JSON 웹 토큰 검증을 위한 여러 가지 JAAS 모듈 옵션이다:

        JAAS Module Option for Unsecured Token Validation Documentation
        unsecuredValidatorPrincipalClaimName="value" principal 이름을 가진 특정 String 클레임이 존재하는지 확인하려면 비어 있지 않은 값으로 설정해라. 기본적으로는 ‘sub’ 클레임을 확인한다.
        unsecuredValidatorScopeClaimName="value" 토큰 스코프를 가진 String, String List 클레임명에 ‘scope‘가 아닌 다른 값을 사용하고 싶다면 커스텀 클레임 이름을 설정해라.
        unsecuredValidatorRequiredScope="value" 토큰 스코프를 가진 String/String List 클레임이 특정 값을 가지고 있는지 확인하려면 스코프 값 리스트를 지정해라. 값은 공백으로 구분한다.
        unsecuredValidatorAllowableClockSkewMs="value" 특정 밀리세컨드까지 clock skew를 허용하려면, 양의 정수 값으로 설정해라 (디폴트는 0).
      • 디폴트 unsecured SASL/OAUTHBEARER 구현체는 커스텀 로그인, SASL 서버 콜백 핸들러로 재정의할 수 있다 (프로덕션 환경에선 반드시 재정의해야 한다).

      • 보안상 주의 사항과 관련한 상세 내용은 RFC 6749, Section 10을 참고해라.

    5. Token Refresh for SASL/OAUTHBEARER

      카프카는 토큰이 만료되기 전에 주기적으로 토큰을 갱신하기 때문에, 클라이언트는 브로커에 계속해서 연결할 수 있다. 갱신 알고리즘 동작 방식에 영향을 주는 파라미터는 프로듀서/컨슈머/브로커 설정에 지정할 수 있으며, 아래에 표기했다. 자세한 내용은 이 문서에서 해당 프로퍼티들을 설명하는 곳을 찾아봐라. 보통은 디폴트 값으로도 충분하며, 디폴트 값을 사용한다면 설정 파라미터를 명시하지 않도 된다.

      Producer/Consumer/Broker Configuration Property
      sasl.login.refresh.window.factor
      sasl.login.refresh.window.jitter
      sasl.login.refresh.min.period.seconds
      sasl.login.refresh.min.buffer.seconds
    6. Secure/Production Use of SASL/OAUTHBEARER

      프로덕션에서 사용하려면, org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback 인스턴스를 처리할 수 있는 org.apache.kafka.common.security.auth.AuthenticateCallbackHandler 구현체를 작성하고, 브로커가 아닌 클라이언트에선 설정 옵션 sasl.login.callback.handler.class로, 브로커에선(브로커 간의 통신에 SASL/OAUTHBEARER 프로토콜을 사용한다면) 설정 옵션 listener.name.sasl_ssl.oauthbearer.sasl.login.callback.handler.class로 선언해야 한다.

      프로덕션 환경에선, org.apache.kafka.common.security.oauthbearer.OAuthBearerValidatorCallback 인스턴스를 처리할 수 있는 org.apache.kafka.common.security.auth.AuthenticateCallbackHandler 구현체도 작성해서, 브로커 설정 옵션 listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class를 통해 선언해야 한다.

    7. Security Considerations for SASL/OAUTHBEARER

      • 카프카의 기본 SASL/OAUTHBEARER 구현체는 Unsecured JSON 웹 토큰을 생성하고 검증한다. 이 구현체는 프로덕션 환경에는 적합하지 않다.
      • OAUTHBEARER를 프로덕션에서 사용하려면 반드시 토큰을 가로채갈 수 없도록 TLS 암호화와 함께 사용해야 한다.
      • 디폴트 unsecured SASL/OAUTHBEARER 구현체는 위에서 설명한대로, 커스텀 로그인, SASL 서버 콜백 핸들러로 재정의할 수 있다 (프로덕션 환경에선 반드시 재정의해야 한다).
      • 전반적인 OAuth 2와 관련한 보안상 주의 사항은 RFC 6749, Section 10을 참고해라.
  7. Enabling multiple SASL mechanisms in a broker

    1. JAAS 설정 파일의 KafkaServer 섹션에 활성화된 모든 메커니즘의 로그인 모듈 설정을 명시해라. 예를 들면:

      KafkaServer {
          com.sun.security.auth.module.Krb5LoginModule required
          useKeyTab=true
          storeKey=true
          keyTab="/etc/security/keytabs/kafka_server.keytab"
          principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
            
          org.apache.kafka.common.security.plain.PlainLoginModule required
          username="admin"
          password="admin-secret"
          user_admin="admin-secret"
          user_alice="alice-secret";
      };
      
    2. server.properties에 SASL 메커니즘을 활성화해라:

      sasl.enabled.mechanisms=GSSAPI,PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER
      
    3. 필요하다면 server.properties에 브로커 간 통신에 사용할 SASL 보안 프로토콜과 메커니즘을 지정해라:

      security.inter.broker.protocol=SASL_PLAINTEXT (or SASL_SSL)
      sasl.mechanism.inter.broker.protocol=GSSAPI (or one of the other enabled mechanisms)
      
    4. GSSAPI (Kerberos), PLAIN, SCRAM, OAUTHBEARER에 있는 가이드에 따라, 활성화된 각 메커니즘별로 SASL을 설정해라.

  8. Modifying SASL mechanism in a Running Cluster

    아래처럼 하면 실행 중인 클러스터에서 SASL 메커니즘을 수정할 수 있다:

    1. 모든 브로커에서 새 메커니즘을 활성화해라. server.properties의 sasl.enabled.mechanisms에 새 메커니즘을 추가하면 된다. 여기에서 설명하는대로, JAAS 설정 파일을 수정해서 두 메커니즘을 모두 넣어라. 클러스터 노드를 점진적으로 재시작해라.
    2. 클라이언트를 새 메커니즘으로 재시작해라.
    3. 브로커 간의 통신 메커니즘을 변경하려면 (필요하면), server.properties의 sasl.mechanism.inter.broker.protocol을 새 메커니즘으로 설정하고, 클러스터를 점진적으로 다시 재시작해라.
    4. 이전 메커니즘을 제거하려면 (필요하면), server.properties의 sasl.enabled.mechanisms에서 이전 메커니즘을 제거하고, JAAS 설정 파일에서도 이전 메커니즘과 관련한 엔트리를 제거해라. 클러스터를 점진적으로 다시 재시작해라.
  9. Authentication using Delegation Tokens

    Delegation 토큰 기반 인증은 기존 SASL/SSL을 보완하는 경량 인증 메커니즘이다. Delegation 토큰은 카프카 브로커와 클라이언트간에 공유하는 시크릿이다. 프레임워크에서 delegation 토큰을 활용하면, 2-way SSL을 위한 커버로스 TGT/키탭이나 keystore를 배포하는 추가 비용 없이, 안전한 환경에서 사용 가능한 워커에게 워크로드를 분산할 수 있다. 자세한 내용은 KIP-48을 참고해라.

    delegation 토큰을 사용하는 전형적인 스텝은 다음과 같다:

    1. 사용자는 SASL이나 SSL을 통해 카프카 클러스터에 인증하고 delegation 토큰을 획득한다. 이땐 Admin API나 kafka-delegation-tokens.sh 스크립트를 사용하면 된다.

    2. 사용자는 카프카 클러스터에 인증할 수 있도록 delegation 토큰을 카프카 클라이언트에 안전하게 전달한다.

    3. 토큰 owner/renewer는 delegation 토큰을 갱신/만료할 수 있다.

    1. Token Management

      delegation 토큰을 생성하고 검증할 땐 마스터 키/시크릿을 사용한다. 설정 옵션 delegation.token.master.key로 제공할 수 있다. 모든 브로커에 동일한 시크릿 키를 설정해야 한다. 시크릿을 설정하지 않거나 빈 문자열로 설정하면, 브로커는 delegation 토큰 지원을 비활성화한다.

      현재 구현체에선 토큰 세부 정보를 주키퍼에 저장하며, 사설망에 있는 주키퍼를 사용할 때 적합하다. 또, 현재로썬 마스터 키/시크릿을 server.properties 설정 파일에 일반 텍스트로 저장한다. 향후 카프카 릴리즈에선 이 동작도 설정할 수 있게 만들 예정이다.

      토큰에는 현재 수명과 갱신 가능한 최대 수명이 있다. 기본적으로 토큰은 24시간마다 한 번씩 갱신해야 하며, 최대 7일 동안만 가능하다. 설정 옵션 delegation.token.expiry.time.ms, delegation.token.max.lifetime.ms로 변경할 수 있다.

      토큰은 명시적으로 취소할 수도 있다. 토큰 만료 시간까지 토큰이 갱신되지 않거나, 토큰이 최대 수명을 초과하면 주키퍼뿐만 아니라 모든 브로커 캐시에서도 삭제된다.

    2. Creating Delegation Tokens

      토큰은 Admin API나 kafka-delegation-tokens.sh 스크립트로 생성할 수 있다. Delegation 토큰 요청(생성/갱신/만료/조회)은 SASL이나 SSL로 인증된 채널에서만 발행해야 한다. 초기 인증을 delegation 토큰으로 수행하면 토큰을 요청할 수 없다. kafka-delegation-tokens.sh 스크립트 예시는 아래에 있다.

      delegation 토큰을 생성하려면:

      > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 \
        --create  --max-life-time-period -1 \
        --command-config client.properties --renewer-principal User:user1
      

      delegation 토큰을 갱신하려면:

      > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 \
        --renew  --renew-time-period -1 \
        --command-config client.properties --hmac ABCDEFGHIJK
      

      delegation 토큰을 만료하려면:

      > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 \
        --expire  --expiry-time-period -1 \
        --command-config client.properties  --hmac ABCDEFGHIJK
      

      기존 토큰은 --describe 옵션으로 조회할 수 있다:

      > bin/kafka-delegation-tokens.sh --bootstrap-server localhost:9092 \
        --describe --command-config client.properties --owner-principal User:user1
      
    3. Token Authentication

      Delegation 토큰 인증은 현재 SASL/SCRAM 인증 메커니즘에 편승한다. 여기에서 설명하는대로, 카프카 클러스터에 SASL/SCRAM 메커니즘을 활성화해야 한다.

      Configuring Kafka Clients:

      1. 각 클라이언트에서 producer.properties 또는 consumer.properties에 JAAS 설정 프로퍼티를 추가해라. 로그인 모듈은 프로듀서와 컨슈머같은 클라이언트가 카프카 브로커에 연결하는 방법을 나타낸다. 다음은 토큰 인증을 사용하는 클라이언트 설정 예시다:

        sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
          username="tokenID123" \
          password="lAYYSFmLs4bTjf+lTZ1LCHR/ZZFNA==" \
          tokenauth="true";
        

        username, password 옵션은 클라이언트가 토큰 id와 토큰 HMAC을 구성할 때 사용한다. 그리고 tokenauth 옵션은 서버에 토큰 인증을 알리는 역할을 한다. 이 예시에선 클라이언트는 토큰 id: tokenID123을 사용해 브로커에 연결한다. 같은 JVM 내에 있는 클라이언트가 여럿일 땐, sasl.jaas.config에 토큰 세부 정보를 다르게 지정하면 다른 토큰으로 연결할 수 있다.

        클라이언트에 대한 JAAS 설정은 여기에서 설명하는대로 브로커와 유사한 JVM 파라미터로 지정할 수도 있다. 클라이언트는 KafkaClient라는 로그인 섹션을 사용한다. 이 옵션은 JVM에 있는 모든 클라이언트 커넥션에 대해 단 한 명의 사용자만 허용한다.

    4. Procedure to manually rotate the secret:

      시크릿을 교체해야 할 땐 재배포가 필요하다. 이미 연결된 클라이언트는 이 과정을 진행할 때도 계속 동작할 거다. 하지만 새 커넥션 요청이나, 갱신/만료 요청에서 이전 토큰을 사용하면 실패할 수 있다. 시크릿 교체 절차는 다음과 같다.

      1. 기존에 있는 모든 토큰을 만료한다.
      2. 순차적으로 업그레이드해서 시크릿을 교체한다, 그리고
      3. 새 토큰을 생성한다.

      향후 카프카 릴리즈에선 이 과정을 자동화할 계획이다.

    5. Notes on Delegation Tokens

      • 현재로써는 사용자는 자신의 delegation 토큰만 생성할 수 있다. Owner/Renewer는 토큰을 갱신하거나 만료할 수 있다. Owner/Renewer는 언제나 자신의 토큰을 조회할 수 있다. 다른 사용자의 토큰을 조회하려면, 토큰 리소스에 대한 DESCRIBE 퍼미션을 지원해야 한다.

7.4 Authorization and ACLs

카프카는 Authorizer 인터페이스와, 주키퍼로 모든 ACL을 저장하는, 바로 사용할 수 있는 authorizer 구현체를 함께 제공한다. Authorizer는 server.properties의 authorizer.class.name으로 설정한다. 바로 사용할 수 있는 구현체를 사용하려면:

authorizer.class.name=kafka.security.authorizer.AclAuthorizer

카프카 acl은 일반 포맷 “Principal P에게, 호스트 H에서 리소스 패턴 RP와 매칭되는 모든 리소스에 대한 작업 O를 실행할 수 있는 권한을 [허가/거부]한다”로 정의된다. acl 구조는 KIP-11, 리소스 패턴은 KIP-290에 자세히 나와있다. acl을 추가, 제거, 나열할 땐 카프카 authorizer CLI를 사용할 수 있다. 기본적으로 특정 리소스 R과 일치하는 ResourcePatterns가 없다면, R에는 관련 ACL이 없으므로, 수퍼 유저 외에는 R에 접근할 수 없다. 이 동작을 변경하려면 server.properties에 다음을 추가하면 된다.

allow.everyone.if.no.acl.found=true

server.properties에는 다음과 같이 슈퍼 유저도 추가할 수 있다 (SSL 사용자 이름에 콤마가 있을 수도 있기 때문에, 구분 기호는 세미콜론을 사용한다). 디폴트 PrincipalType 문자열 “User”는 대소 문자를 구분한다.

super.users=User:Bob;User:Alice

Customizing SSL User Name

기본적으로 SSL 사용자 이름은 “CN=writeuser,OU=Unknown,O=Unknown,L=Unknown,ST=Unknown,C=Unknown” 형식이다. server.properties에 ssl.principal.mapping.rules를 커스텀 rule로 설정하면 변경할 수 있다. 이 설정으로 X.500 DN(distinguished name)을 짧은 이름에 매핑하는 rule 리스트를 만들 수 있다. 규칙은 순서대로 평가되며, DN과 일치하는 첫 번째 rule을 사용해 짧은 이름에 매핑한다. 뒤에 있는 rule은 무시한다.

ssl.principal.mapping.rules는 rule의 리스트다. 각 rule은 “RULE:”로 시작하며, 다음과 같은 표현식을 갖는다. 디폴트 rule은 X.500 인증서 DN의 문자열 표현을 반환한다. DN이 패턴과 일치하면, 해당 이름에 replacement 명령을 실행한다. 변환된 결과를 모두 소문자/대문자로 강제하는 옵션도 지원한다. rule 끝에 “/L” 또는 “/U”를 추가하면 된다.

RULE:pattern/replacement/
RULE:pattern/replacement/[LU]

ssl.principal.mapping.rules 값 예시:

RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/,
RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/L,
RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L,
DEFAULT

위에 있는 rule은 DN “CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown”은 “serviceuser”로, “CN=adminUser,OU=Admin,O=Unknown,L=Unknown,ST=Unknown,C=Unknown”은 “adminuser@admin”으로 전환한다.

더 나아가면, 다음과 같이 server.properties에 커스텀 PrincipalBuilder를 설정하는 식으로도 이름을 커스텀할 수 있다.

principal.builder.class=CustomizedPrincipalBuilderClass

Customizing SASL User Name

기본적으로 SASL 사용자 이름은 커버로스 principal의 primary 파트가 된다. server.properties에 sasl.kerberos.principal.to.local.rules를 커스텀 rule로 설정하면 변경할 수 있다. sasl.kerberos.principal.to.local.rules는 rule의 리스트다. 각 rule은 커버로스 설정 파일(krb5.conf)의 auth_to_local과 동일한 방식으로 동작한다. 변환된 결과를 모두 소문자/대문자로 강제하는 규칙도 지원한다. rule 끝에 “/L” 또는 “/U”를 추가하면 된다. 문법은 아래 포맷을 확인해봐라. 각 rule은 RULE:로 시작하며, 다음과 같은 표현식을 갖는다. 자세한 내용은 커버로스 문서를 참고해라.

RULE:[n:string](regexp)s/pattern/replacement/
RULE:[n:string](regexp)s/pattern/replacement/g
RULE:[n:string](regexp)s/pattern/replacement//L
RULE:[n:string](regexp)s/pattern/replacement/g/L
RULE:[n:string](regexp)s/pattern/replacement//U
RULE:[n:string](regexp)s/pattern/replacement/g/U

다음은 user@MYDOMAIN.COM을 user로 적절히 변환하는 rule을 추가하는 동시에, 기본 rule도 그대로 유지하는 예시다:

sasl.kerberos.principal.to.local.rules=RULE:[1:$1@$0](.*@MYDOMAIN.COM)s/@.*//,DEFAULT

Command Line Interface

카프카 인가 관리 CLI는 다른 CLI들도 전부 들어 있는 bin 디렉토리에서 찾을 수 있다. CLI 스크립트는 kafka-acls.sh라고 부른다. 다음 테이블은 스크립트가 지원하는 모든 옵션을 담고있다:

OPTION DESCRIPTION DEFAULT OPTION TYPE
--add 사용자가 acl을 추가하려고 한다는 걸 스크립트에 알린다.   Action
--remove 사용자가 acl을 제거하려 한다는 걸 스크립트에 알린다.   Action
--list 사용자가 acl을 나열해보려 한다는 걸 스크립트에 알린다.   Action
--authorizer authorizer 클래스의 풀 네임(fully qualified name) kafka.security.authorizer
.AclAuthorizer
Configuration
--authorizer-properties 초기화를 위해 authorizer에게 전달할 key=val 쌍. 디폴트 authorizer로 예를 들면: zookeeper.connect=localhost:2181   Configuration
--bootstrap-server 카프카 클러스터에 대한 커넥션을 구축하는데 사용할 호스트/포트 쌍 리스트. --bootstrap-server, --authorizer 옵션 중 하나만 지정해야 한다.   Configuration
--command-config 어드민 클라이언트에 전달할 설정이 들어있는 프로퍼티 파일. 이 옵션은 --bootstrap-server 옵션과 함께 사용할 때만 쓸 수 있다.   Configuration
--cluster 사용자가 단일 클러스터 리소스의 ACL과 상호 작용하려 한다는 걸 스크립트에 알린다.   ResourcePattern
--topic [topic-name] 사용자가 토픽 리소스 패턴에 대한 acl과 상호 작용하려 한다는 걸 스크립트에 알린다.   ResourcePattern
--group [group-name] 사용자가 컨슈머 그룹 리소스 패턴에 대한 acl과 상호작용하려 한다는 걸 스크립트에 알린다.   ResourcePattern
--transactional-id [transactional-id] acl을 추가하거나 제거할 transactionalId. *은 acl이 모든 transactionalId에 적용돼야 한다는 걸 나타낸다.   ResourcePattern
--delegation-token [delegation-token] acl을 추가하거나 제거할 delegation 토큰. *은 acl이 모든 토큰에 적용돼야 한다는 걸 나타낸다.   ResourcePattern
--resource-pattern-type [pattern-type] 사용하고자 하는 리소스 패턴(--add), 또는 리소스 패턴 필터(--list, --remove)의 타입을 스크립트에 알린다. acl을 추가할 땐 ‘literal’, ‘prefixed’같은 특정한 패턴 타입을 지정해야 한다. acl을 나열하거나 제거할 땐, 패턴 타입 필터를 통해 특정한 타입의 리소스 패턴을 가진 acl을 필터링할 수 있으며, 필터 값에 ‘any’ 또는 ‘match’를 사용할 수도 있다. 여기서 ‘any’는 모든 패턴 타입과 매칭되지만, 리소스 이름과는 정확히 매칭한다. ‘match’는 제공한 리소스에 패턴 매칭으로 영향을 미치는 모든 acl을 나열하거나 제거할 수 있다. 경고: ‘match’를 ‘--remove’스위치와 함께 사용한다면 주의해서 사용해야 한다. literal Configuration
--allow-principal ACL에 Allow 퍼미션으로 추가될 PrincipalType:name 형식의 principal. 디폴트 PrincipalType 문자열 “User”는 대소문자를 구분한다. --allow-principal은 단일 명령어에 여러 번 지정할 수 있다.   Principal
--deny-principal ACL에 Deny 퍼미션으로 추가될 PrincipalType:name 형식의 principal. 디폴트 PrincipalType 문자열 “User”는 대소문자를 구분한다. --deny-principal은 단일 명령어에 여러 번 지정할 수 있다.   Principal
--principal --list 옵션과 함께 사용하는 PrincipalType:name 형식의 principal. 디폴트 PrincipalType 문자열 “User”는 대소문자를 구분한다. 이 옵션은 지정한 principal에 대한 ACL을 나열한다. --principal은 단일 명령어에 여러 번 지정할 수 있다.   Principal
--allow-host --allow-principal에 나열한 principal이 접근할 수 있는 IP 주소. --allow-principal을 지정했다면 기본값은 “all hosts”로 변환되는 *로 설정된다. Host
--deny-host --deny-principal에 나열한 principal의 접근을 거부할 IP 주소. --deny-principal을 지정했다면 기본값은 “all hosts”로 변환되는 *로 설정된다. Host
--operation 허가 또는 거부할 작업. 유효한 값은: ReadWriteCreateDelete
AlterDescribe
ClusterAction
DescribeConfigsAlterConfigs
IdempotentWriteAll
All Operation
--producer 간편하게 프로듀서 role에 대한 ACL을 추가/제거할 수 있는 옵션. 토픽에 WRITE, DESCRIBE, CREATE를 허용하는 acl을 생성한다.   Convenience
--consumer 간편하게 컨슈머 role에 대한 acl을 추가/제거할 수 있는 옵션. 토픽에 READ, DESCRIBE를, 컨슈머 그룹에 READ를 허용하는 acl을 생성한다.   Convenience
--idempotent 프로듀서에 멱등성을 활성화한다. --producer 옵션과 함께 사용해야 한다. 단, 프로듀서가 특정 transactional-id에 권한이 있으면 멱등성은 자동으로 활성화된다.   Convenience
--force 모든 쿼리를 yes로 가정하고 prompt를 사용하지 않는 간편한 옵션.   Convenience
--zk-tls-config-file authorizer에 대한 주키퍼 클라이언트의 TLS 연결 프로퍼티를 정의한 파일을 식별한다. 아래 있는 프로퍼티 외에는 전부 무시한다 (“authorizer.” 프리픽스는 있어도 되고 없어도 된다):
zookeeper.clientCnxnSocket, zookeeper.ssl.cipher.suites, zookeeper.ssl.client.enable,
zookeeper.ssl.crl.enable, zookeeper.ssl.enabled.protocols, zookeeper.ssl.endpoint.
identification.algorithm, zookeeper.ssl.keystore.location, zookeeper.ssl.keystore.password, zookeeper.ssl.keystore.type, zookeeper.ssl.ocsp.enable,
zookeeper.ssl.protocol, zookeeper.ssl.truststore.location, zookeeper.ssl.truststore.password, zookeeper.ssl.truststore.type
  Configuration

Examples

Authorization Primitives

프로토콜을 호출하면 보통 카프카의 특정한 리소스에 몇 가지 작업을 수행하게 된다. 보호 정책을 제대로 설정하려면 이 작업과 리소스를 알아야 한다. 이 섹션에선 이런 작업과 리소스를 망라하고, 작업과 리소스 조합에 따른 프로토콜을 함께 정리해 유효한 시나리오를 확인해보겠다.

Operations in Kafka

권한을 만들 때 사용할 수 있는 작업 프리미티브는 몇 가지가 있다. 작업 프리미티브를 특정 리소스와 매칭하면, 주어진 사용자에게 특정 프로토콜 호출을 허용할 수 있다. 작업 프리미티브는 다음과 같다:

Resources in Kafka

위에 있는 작업들은 아래에서 설명하는 리소스를 특정해 적용할 수 있다.

Operations and Resources on Protocols

아래 테이블은 카프카 API 프로토콜에 의해 실행되는 리소스에 대한 유효한 작업들을 담고 있다.

PROTOCOL (API KEY) OPERATION RESOURCE NOTE
PRODUCE (0) Write TransactionalId transactional.id를 설정한 트랜잭션 프로듀서는 이 권한이 필요하다.
PRODUCE (0) IdempotentWrite Cluster 명득성(idempotent)을 보장하도록 메세지를 생성하려면 이 권한이 필요하다.
PRODUCE (0) Write Topic 일반적인 메세지 생성에 적용된다.
FETCH (1) ClusterAction Cluster 팔로워는 파티션 데이터를 가져오려면 반드시 클러스터 리소스에 ClusterAction 권한이 있어야 한다.
FETCH (1) Read Topic 일반 카프카 컨슈머는 조회할 파티션에 대한 READ 퍼미션이 필요하다.
LIST_OFFSETS (2) Describe Topic  
METADATA (3) Describe Topic  
METADATA (3) Create Cluster 토픽 자동 생성을 활성화했다면, 브로커 측 API는 클러스터 레벨 권한이 있는지 확인할 거다. 권한을 찾으면 토픽 생성을 허용하고, 그렇지 않으면 토픽 레벨 권한을 순회한다 (바로 아래 참고).
METADATA (3) Create Topic 활성화했다면 자동 토픽 생성을 허가하지만, 이 사용자는 클러스터 레벨 권한은 없다 (위 참고).
LEADER_AND_ISR (4) ClusterAction Cluster  
STOP_REPLICA (5) ClusterAction Cluster  
UPDATE_METADATA (6) ClusterAction Cluster  
CONTROLLED_
SHUTDOWN (7)
ClusterAction Cluster  
OFFSET_COMMIT (8) Read Group 오프셋은 주어진 그룹과 토픽에 권한이 있다면 커밋할 수 있다 (아래 참고). 먼저 그룹 접근 권한을 확인한 다음, 토픽 접근 권한을 확인한다.
OFFSET_COMMIT (8) Read Topic 오프셋 커밋은 컨슈밍 프로세스의 일환이기 때문에, 읽기 권한이 필요하다.
OFFSET_FETCH (9) Describe Group OFFSET_COMMIT과 유사하게, 어플리케이션에서 메세지를 조회하려면 그룹과 토픽 레벨에 대한 권한도 가지고 있어야 한다. 하지만 이 프로토콜에는 read 대신 describe 권한이 필요하다. 먼저 그룹 접근 권한을 확인한 다음, 토픽 접근 권한을 확인한다.
OFFSET_FETCH (9) Describe Topic  
FIND_COORDINATOR (10) Describe Group FIND_COORDINATOR 요청이 컨슈머 그룹 코디네이터를 찾는 요청이라면 “Group” 타입이 된다. 이 권한은 Group 모드를 나타낸다.
FIND_COORDINATOR (10) Describe TransactionalId 트랜잭션 프로듀서일 때만 적용되며, 프로듀서가 트랜잭션 코디네이터를 찾으려고할 때 확인한다.
JOIN_GROUP (11) Read Group  
HEARTBEAT (12) Read Group  
LEAVE_GROUP (13) Read Group  
SYNC_GROUP (14) Read Group  
DESCRIBE_GROUPS (15) Describe Group  
LIST_GROUPS (16) Describe Cluster list_groups 요청에서 브로커가 권한을 확인할 땐, 먼저 이 클러스터 레벨 권한을 확인한다. 권한을 찾지 못하면 그룹을 개별적으로 확인한다. 이 작업은 CLUSTER_AUTHORIZATION_FAILED를 반환하지 않는다.
LIST_GROUPS (16) Describe Group 권한이 부여된 그룹이 없으면, 에러 대신 빈 응답만 전송한다. 이 작업은 CLUSTER_AUTHORIZATION_FAILED를 반환하지 않는다. 2.1 릴리즈부터 적용된다.
SASL_HANDSHAKE (17)     SASL 핸드셰이크는 인증 프로세스 중에 일어나기 때문에, 여기에선 어떤 인가도 적용할 수 없다.
API_VERSIONS (18)     API_VERSIONS 요청은 카프카 프로토콜 핸드셰이크 중 일어나며, 커넥션을 맺고서 인증을 수행하기 전에 발생한다. 따라서 인가 프로세스로 제어할 수 없다.
CREATE_TOPICS (19) Create Cluster 클러스터 레벨 권한이 없으면 CLUSTER_AUTHORIZATION_FAILED를 반환하지 않고 바로 아래에 있는 토픽 레벨을 사용하도록 폴백한다. 토픽 레벨에서 문제가 있으면 에러를 던질 거다.
CREATE_TOPICS (19) Create Topic 2.0 릴리즈부터 적용된다.
DELETE_TOPICS (20) Delete Topic  
DELETE_RECORDS (21) Delete Topic  
INIT_PRODUCER_ID (22) Write TransactionalId  
INIT_PRODUCER_ID (22) IdempotentWrite Cluster  
OFFSET_FOR_
LEADER_EPOCH (23)
ClusterAction Cluster 이 작업에 대한 클러스터 레벨 권한이 없으면, 토픽 레벨 권한을 확인한다.
OFFSET_FOR_
LEADER_EPOCH (23)
Describe Topic 2.1 릴리즈부터 적용된다.
ADD_PARTITIONS_
TO_TXN (24)
Write TransactionalId 이 API는 트랜잭션 요청에만 적용할 수 있다. 먼저 TransactionalId 리소스에 대한 Write 권한을 확인한 다음 토픽을 확인한다 (아래).
ADD_PARTITIONS_
TO_TXN (24)
Write Topic  
ADD_OFFSETS_
TO_TXN (25)
Write TransactionalId ADD_PARTITIONS_TO_TXN과 유사하게 트랜잭션 요청에만 적용할 수 있다. 먼저 TransactionalId 리소스에 대한 Write 권한을 확인한 다음, 지정한 그룹에 Read 권한이 있는지 확인한다 (아래 참고).
ADD_OFFSETS_
TO_TXN (25)
Read Group  
END_TXN (26) Write TransactionalId  
WRITE_TXN_MARKERS (27) ClusterAction Cluster  
TXN_OFFSET_COMMIT (28) Write TransactionalId  
TXN_OFFSET_COMMIT (28) Read Group  
TXN_OFFSET_COMMIT (28) Read Topic  
DESCRIBE_ACLS (29) Describe Cluster  
CREATE_ACLS (30) Alter Cluster  
DELETE_ACLS (31) Alter Cluster  
DESCRIBE_CONFIGS (32) DescribeConfigs Cluster 브로커는 브로커 설정 요청을 받으면 클러스터 레벨 권한을 확인한다.
DESCRIBE_CONFIGS (32) DescribeConfigs Topic 브로커는 토픽 설정 요청을 받으면 토픽 레벨 권한을 확인한다.
ALTER_CONFIGS (33) AlterConfigs Cluster 브로커는 브로커 설정을 수정할 땐 클러스터 레벨 권한을 확인한다.
ALTER_CONFIGS (33) AlterConfigs Topic 브로커는 토픽 설정을 수정할 땐 토픽 레벨 권한을 확인한다.
ALTER_REPLICA_
LOG_DIRS (34)
Alter Cluster  
DESCRIBE_
LOG_DIRS (35)
Describe Cluster 인가에 실패하면 빈 응답을 반환한다.
SASL_AUTHENTICATE (36)     SASL_AUTHENTICATE는 인증 프로세스 중에 일어나기 때문에, 여기에선 어떤 인가도 적용할 수 없다.
CREATE_PARTITIONS (37) Alter Topic  
CREATE_
DELEGATION_TOKEN
(38)
    delegation 토큰을 생성에는 특별한 규칙이 있다. delegation 토큰을 사용한 인증 섹션을 참고해라.
RENEW_
DELEGATION_TOKEN
(39)
    delegation 토큰 갱신에는 특별한 규칙이 있다. delegation 토큰을 사용한 인증 섹션을 참고해라.
EXPIRE_
DELEGATION_TOKEN
(40)
    delegation 토큰 만료에는 특별한 규칙이 있다. delegation 토큰을 사용한 인증 섹션을 참고해라.
DESCRIBE_
DELEGATION_TOKEN
(41)
Describe DelegationToken delegation 토큰 조회에는 특별한 규칙이 있다. delegation 토큰을 사용한 인증 섹션을 참고해라.
DELETE_GROUPS (42) Delete Group  
ELECT_PREFERRED_
LEADERS (43)
ClusterAction Cluster  
INCREMENTAL_
ALTER_CONFIGS (44)
AlterConfigs Cluster 브로커는 브로커 설정을 수정할 땐 클러스터 레벨 권한을 확인한다.
INCREMENTAL_
ALTER_CONFIGS (44)
AlterConfigs Topic 브로커는 토픽 설정을 수정할 땐 토픽 레벨 권한을 확인한다.
ALTER_PARTITION_
REASSIGNMENTS (45)
Alter Cluster  
LIST_PARTITION_
REASSIGNMENTS (46)
Describe Cluster  
OFFSET_DELETE (47) Delete Group  
OFFSET_DELETE (47) Read Topic  

7.5 Incorporating Security Features in a Running Cluster

실행 중인 클러스터에도, 앞서 설명한 지원 프로토콜 중 하나 이상을 적용해 클러스터를 보호할 수 있다. 절차는 다음과 같다:

SSL과 SASL 설정에 필요한 단계는 섹션 7.2, 7.3에 정리돼 있다. 이 가이드에 따라 원하는 프로토콜로 보안을 활성화해라.

브로커-클라이언트, 브로커-브로커 간 통신에 다른 프로토콜을 설정하는 것도 가능하다. 이땐 반드시 별도 재시작으로 활성화해야 한다. 재시작하는 동안에도 PLAINTEXT 포트는 열려 있어야지 브로커, 클라이언트가 통신을 이어갈 수 있다.

점진적으로 재시작할땐 SIGTERM을 통해 브로커를 깔끔하게 종료할 수 있다. 다음 노드로 넘어가기 전에 ISR 리스트에 재시작한 레플리카가 돌아올 때까지 기다리는 것도 좋은 방법이다.

예를 들어, SSL을 사용해 브로커-클라이언트, 브로커-브로커 통신을 모두 암호화하고 싶다고 해보자. 첫 번째로 클러스터를 재시작할 때 모든 노드에서 SSL 포트가 열린다:

listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092

그런 다음 클라이언트가 새로 열린 보안 포트를 가리키도록 설정을 변경하고 클라이언트를 재시작한다:

bootstrap.servers = [broker1:9092,...]
security.protocol = SSL
...etc

두 번째로 클러스터를 재기동하면서 카프카에 브로커-브로커 프로토콜로 SSL을 사용하도록 지시한다 (동일한 SSL 포트를 사용할 거다):

listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092
security.inter.broker.protocol=SSL

마지막으로 PLAINTEXT 포트를 닫아 클러스터를 보호하고, 한 번 더 클러스터를 재기동한다:

listeners=SSL://broker1:9092
security.inter.broker.protocol=SSL

아니면 포트를 여러 개 열어서, 브로커-브로커, 브로커-클라이언트 통신에 다른 프로토콜을 사용할 수도 있다. 모든 통신을 SSL로 암호화를 사용하고 싶은데 (브로커-브로커, 브로커-클라이언트 통신), 브로커-클라이언트 커넥션엔 SASL 인증도 추가하고 싶다고 해보자. 이럴 땐 첫 번째로 클러스터를 재기동할 때 포트를 두 개 추가하면 된다:

listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093

그런 다음 클라이언트가 새로 열린 SASL & SSL 보안 포트를 가리키도록 설정을 변경하고 클라이언트를 재시작한다:

bootstrap.servers = [broker1:9093,...]
security.protocol = SASL_SSL
...etc

두 번째로 클러스터를 재기동하면서는, 앞에서 열어준 SSL 포트 9092로 브로커-브로커 커넥션을 암호화하도록 변경한다:

listeners=PLAINTEXT://broker1:9091,SSL://broker1:9092,SASL_SSL://broker1:9093
security.inter.broker.protocol=SSL

마지막으로 PLAINTEXT 포트를 닫아 클러스터를 보호하고, 한 번 더 클러스터를 재기동한다.

listeners=SSL://broker1:9092,SASL_SSL://broker1:9093
security.inter.broker.protocol=SSL

주키퍼는 카프카 클러스터와는 독립적으로 보호할 수 있다. 주키퍼 보안을 위한 절차는 7.6.2 섹션에서 다룬다.


7.6 ZooKeeper Authentication

주키퍼는 3.5.x 버전부터 mutual TLS(mTLS) 인증을 지원한다. 카프카는 2.5 버전부터 SASL, mTLS를 사용해서 주키퍼에 인증할 수 있다. 둘 중 하나만 사용해도되고 함께 사용해도 된다. 자세한 내용은 KIP-515: ZK 클라이언트에서 새로 지원하는 TLS 인증 활성화하기를 참고해라.

mTLS만 사용하려면 모든 브로커와 CLI 툴(주키퍼 보안 마이그레이션 툴같은)은 모두 같은 DN(Distinguished Name)으로 자신을 식별해야 한다. 이 DN에 ACL이 적용되기 때문이다. 아래에서 설명하는대로 변경할 순 있지만, 커스텀 주키퍼 authentication provider를 만들어 배포해야 한다. 보통 각 인증서는 동일한 DN을 가져야 하지만, SAN(Subject Alternative Name)은 달라야지만 주키퍼가 브로커와 CLI 툴의 호스트명을 검증하는데 성공할 거다.

주키퍼에 SASL 인증을 mTLS와 함께 사용한다면, znode를 생성한 DN(즉, 생성할 브로커의 인증서)이나 보안 마이그레이션 툴의 DN(znode를 생성한 이후에 마이그레이션을 수행하는 경우) 중 하나와 SASL identity에 ACL이 적용된다. 모든 브로커와 CLI 툴은 전부 ACL이 적용된 동일한 SASL identity를 사용하기 때문에, 사용하는 DN이 서로 다르더라도 권한이 부여된다. 모든 DN이 일치해야 하는 건 mTLS 인증을 단독으로 사용할 때만이다 (이때는 SAN이 매우 중요해진다 – 다시 말하지만, 아래에서 설명하는대로 커스텀 주키퍼 authentication provider를 만들어 배포하지 않는 경우에 해당한다).

아래에서 설명하는대로, 브로커에는 브로커 properties 파일을 통해 TLS를 설정한다.

주키퍼 보안 마이그레이션 툴에서는 --zk-tls-config-file <file> 옵션을 사용해서 TLS 설정을 세팅한다. CLI 툴 kafka-acls.sh, kafka-configs.sh--zk-tls-config-file <file> 옵션을 지원한다.

zookeeper-shell.sh CLI 툴에는 -zk-tls-config-file <file> 옵션(--가 아니라 -임에 주의)을 사용해서 TLS 설정을 세팅한다.

7.6.1 New clusters

7.6.1.1 ZooKeeper SASL Authentication

브로커에서 주키퍼 SASL 인증을 활성화하려면, 두 가지가 필요하다:

  1. JAAS 로그인 파일을 만들고, 위에서 설명했던대로 적절한 시스템 프로퍼티를 설정해서 이 파일을 가리키키도록 만든다
  2. 각 브로커에서 설정 프로퍼티 zookeeper.set.acl을 true로 설정한다

주키퍼에 저장된 카프카 클러스터의 메타데이터는 누구나 읽을 수 있지만, 수정은 브로커만 할 수 있다. 이렇게 결정한 이유는 주키퍼에 저장된 데이터는 민감한 데이터는 아니지만, 이 데이터를 잘못 조작하면 클러스터가 중단될 수 있기 때문이다. 추가로, 네트워크 분할을 통해 주키퍼에 대한 액세스를 제한하는 게 좋다 (브로커와 일부 어드민 툴만 주키퍼에 접근하면 된다).

7.6.1.2 ZooKeeper Mutual TLS Authentication

주키퍼 mTLS 인증은 SASL 인증을 사용할 때도, 사용하지 않을 때도 활성화할 수 있다. 위에서도 언급했지만, mTLS만 사용할 때는 보통 모든 브로커와 CLI 툴(주키퍼 보안 마이그레이션 툴같은)이 ACL이 적용된 같은 DN(Distinguished Name)으로 자신을 식별해야 한다. 따라서, 각 인증서엔 적절한 SAN(Subject Alternative Name)이 들어있어야만 주키퍼가 브로커와 CLI 툴의 호스트명을 검증하는 데 성공할 수 있다.

org.apache.zookeeper.server.auth.X509AuthenticationProvider를 상속하고 protected String getClientId(X509Certificate clientCert) 메소드를 재정의하면, mTLS 클라이언트의 identity에 DN 대신 다른 걸 사용할 수 있다. 스킴명을 선택하고 주키퍼에서 authProvider.[scheme]을 커스텀 구현체의 풀 네임(fully-qualified class name)으로 설정해라. 그런 다음 등록한 provider를 사용하도록 ssl.authProvider=[scheme]을 설정해라.

다음 샘플은 TLS 인증을 활성화하기 위한 주키퍼 설정의 일부다. 이런 설정들은 주키퍼 어드민 가이드에서 설명하고 있다.

secureClientPort=2182
serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
authProvider.x509=org.apache.zookeeper.server.auth.X509AuthenticationProvider
ssl.keyStore.location=/path/to/zk/keystore.jks
ssl.keyStore.password=zk-ks-passwd
ssl.trustStore.location=/path/to/zk/truststore.jks
ssl.trustStore.password=zk-ts-passwd

중요: 주키퍼는 주키퍼 서버 keystore의 키 패스워드를, keystore 패스워드 값 자체와 다르게 설정하는 걸 지원하지 않는다. 키 패스워드는 keystore 패스워드와 동일하게 설정해야 한다.

다음 샘플은 주키퍼로 연결할 때 mTLS 인증을 수행하기 위한 카프카 브로커 설정의 일부다. 이 설정들은 브로커 설정에서 다루고 있다.

# connect to the ZooKeeper port configured for TLS
zookeeper.connect=zk1:2182,zk2:2182,zk3:2182
# required to use TLS to ZooKeeper (default is false)
zookeeper.ssl.client.enable=true
# required to use TLS to ZooKeeper
zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
# define key/trust stores to use TLS to ZooKeeper; ignored unless zookeeper.ssl.client.enable=true
zookeeper.ssl.keystore.location=/path/to/kafka/keystore.jks
zookeeper.ssl.keystore.password=kafka-ks-passwd
zookeeper.ssl.truststore.location=/path/to/kafka/truststore.jks
zookeeper.ssl.truststore.password=kafka-ts-passwd
# tell broker to create ACLs on znodes
zookeeper.set.acl=true

중요: 주키퍼는 주키퍼 클라이언트(즉, 브로커) keystore의 키 패스워드를, keystore 패스워드 값 자체와 다르게 설정하는 걸 지원하지 않는다. 키 패스워드는 keystore 패스워드와 동일하게 설정해야 한다.

7.6.2 Migrating clusters

보안을 지원하지 않는 버전으로 카프카를 운영 중이거나, 단순히 비활성화한 상태에서 클러스터를 안전하게 만들고 싶다면, 다음 단계를 통해 운영 중단을 최소화하면서 주키퍼 인증을 활성화해야 한다:

  1. 주키퍼에서 SASL 인증을 활성화한다. 필요하다면 mTLS 인증도 활성화한다. mTLS도 활성화한다면, 다음과 같이 non-TLS 포트와 TLS 포트가 둘 다 있을 거다:

    clientPort=2181
    secureClientPort=2182
    serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
    authProvider.x509=org.apache.zookeeper.server.auth.X509AuthenticationProvider
    ssl.keyStore.location=/path/to/zk/keystore.jks
    ssl.keyStore.password=zk-ks-passwd
    ssl.trustStore.location=/path/to/zk/truststore.jks
    ssl.trustStore.password=zk-ts-passwd
    
  2. 필요에 따라 JAAS 로그인 파일과 주키퍼 mutual TLS 설정(TLS를 활성화한 주키퍼 포트 연결 등)을 정의하고, 브로커가 주키퍼에 인증할 수 있도록 브로커를 순차적으로 재시작한다. 재시작이 다 끝나면 브로커는 엄격한 ACL을 통해 znode를 다룰 순 있지만, 이 ACL로는 znode를 만들지는 않는다.

  3. mTLS를 활성화했다면, 주키퍼에서 non-TLS 포트를 비활성화해라.

  4. 브로커의 두 번째 순차 재시작을 진행한다. 이번에는 설정 파라미터 zookeeper.set.acl을 true로 설정해서, znode를 생성할 때 보안 ACL을 사용하도록 활성화한다.

  5. ZkSecurityMigrator 툴을 실행한다. 이 툴은 zookeeper.acl을 secure로 설정해서 bin/zookeeper-security-migration.sh 스크립트를 실행하면 된다. 이 툴은 znode의 ACL을 변경하는 그에 맞는 하위 트리를 탐색한다. mTLS를 활성화한다면 --zk-tls-config-file <file> 옵션을 사용해라.

보안 클러스터에서 인증을 해제하는 것도 가능하다. 인증을 해제하려면 아래 절차를 따라해라:

  1. JAAS 로그인 파일과 주키퍼 mutual TLS 설정을 정의해서 브로커를 순차적으로 재시작한다. 여기선 브로커에 인증은 활성화하되, zookeeper.set.acl을 false로 설정한다. 재시작이 다 끝나면 브로커는 보안 ACL로 znode를 만드는 걸 중단하지만, 여전히 모든 znode를 인증하고 조작할 수 있다.
  2. ZkSecurityMigrator 툴을 실행한다. 이 툴은 zookeeper.acl을 unsecure로 설정해서 bin/zookeeper-security-migration.sh 스크립트를 실행하면 된다. 이 툴은 znode의 ACL을 변경하는 그에 맞는 하위 트리를 탐색한다. TLS 설정을 세팅해야 한다면 --zk-tls-config-file <file> 옵션을 사용해라.
  3. mTLS를 비활성화한다면, 주키퍼에서 non-TLS 포트를 활성화해라.
  4. 브로커의 두 번째 순차 재시작을 진행한다. 이번에는 JAAS 로그인 파일을 설정하는 시스템 프로퍼티를 생략하고, 필요에 따라 주키퍼 mutual TLS 설정(non-TLS를 활성화한 주키퍼 포트에 대한 연결 등)을 제거한다.
  5. mTLS를 비활성화한다면, 주키퍼에서 TLS 포트를 비활성화해라.

다음은 마이그레이션 툴을 실행하는 예시다:

bin/zookeeper-security-migration.sh --zookeeper.acl=secure --zookeeper.connect=localhost:2181

전체 파라미터 목록을 보고싶다면 다음을 실행해라:

bin/zookeeper-security-migration.sh --help

7.6.3 Migrating the ZooKeeper ensemble

주키퍼 앙상블에서도 SASL/mTLS 인증을 활성화해야 한다. 그러러면 서버를 순차적으로 재시작하고 몇 가지 프로퍼티를 설정해야 한다. mTLS 관련 정보는 위에를 확인해봐라. 자세한 내용은 주키퍼 문서를 참고해라:

  1. Apache ZooKeeper documentation
  2. Apache ZooKeeper wiki

7.6.4 ZooKeeper Quorum Mutual TLS Authentication

주키퍼 서버 자체간에도 mTLS 인증을 활성화할 수 있다. 자세한 내용은 주키퍼 문서를 참고해라.


7.7 ZooKeeper Encryption

mutual TLS를 사용하는 주키퍼 커넥션은 암호화된다. 주키퍼 3.5.7 버전(카프카 2.5와 함께 제공하는 버전)부터 주키퍼는 서버 측 설정 ssl.clientAuth를 지원한다 (대소문자 구분 없음: 유효한 옵션은 want/need/none이며, 기본값은 need다). 주키퍼에서 값을 none으로 설정하면, 클라이언트는 자체 인증서 없이도 TLS로 암호화된 커넥션을 통해 주키퍼에 연결할 수 있다. 아래 샘플은 TLS 암호화만으로 주키퍼에 연결하는 카프카 설정의 일부다. 이 설정들은 브로커 설정에서 다루고 있다.

# connect to the ZooKeeper port configured for TLS
zookeeper.connect=zk1:2182,zk2:2182,zk3:2182
# required to use TLS to ZooKeeper (default is false)
zookeeper.ssl.client.enable=true
# required to use TLS to ZooKeeper
zookeeper.clientCnxnSocket=org.apache.zookeeper.ClientCnxnSocketNetty
# define trust stores to use TLS to ZooKeeper; ignored unless zookeeper.ssl.client.enable=true
# no need to set keystore information assuming ssl.clientAuth=none on ZooKeeper
zookeeper.ssl.truststore.location=/path/to/kafka/truststore.jks
zookeeper.ssl.truststore.password=kafka-ts-passwd
# tell broker to create ACLs on znodes (if using SASL authentication, otherwise do not set this)
zookeeper.set.acl=true

Next :
Kafka Connect
카프카 커넥트 프레임워크 한글 번역. 워커, 싱크/소스 커넥터, 싱크/소스 태스크에 대한 개념을 소개하고 커넥트를 개발하는 방법을 안내합니다.

전체 목차는 여기에 있습니다.

<< >>

TOP