Producer Internals
νλ‘λμμ λ©μμ§ μ μ‘ κ³Όμ
νλ‘λμκ° send() λ©μλλ₯Ό νΈμΆνλ©΄ λ©μμ§λ μ¦μ λ€νΈμν¬λ₯Ό ν΅ν΄ μ μ‘νλ κ²μ΄ μλ, λ΄λΆ λ²νΌμ μ μ₯λ ν λ³λμ μ€λ λμ μν΄ λΈλ‘μ»€λ‘ μ μ‘λλ λΉλκΈ° ꡬ쑰λ₯Ό κ°μ§λ€.
λ©μΈ μ€λ λ(μ ν리μΌμ΄μ μ€λ λ)
ProducerRecordκ°μ²΄λ₯Ό μμ±νκ³send()λ©μλλ₯Ό νΈμΆλ©μμ§λ μ§λ ¬ν(Serialize)λκ³ , νν°μ λ(Partitioner)μ μν΄ λμ νν°μ μ΄ κ²°μ
κ²°μ λ νν°μ μ ν΄λΉνλ λ²νΌ(RecordAccumulator λ΄λΆμ Deque)μ λ©μμ§κ° μ μ₯
Sender μ€λ λ
νλ‘λμ λ΄λΆμ μ‘΄μ¬νλ λ³λμ λ°±κ·ΈλΌμ΄λ μ€λ λ
RecordAccumulatorμμ μ μ‘ν μ€λΉκ° λ λ©μμ§ λ°°μΉ(Batch)λ₯Ό κ°μ Έμ΄ν΄λΉ λ°°μΉλ₯Ό λμ λΈλ‘μ»€λ‘ μ μ‘νκ³ , μλ΅(Acknowledgement)μ μ²λ¦¬
μ μ‘ κ²°κ³Ό νμΈ
send()λ©μλλFutureκ°μ²΄ λ°νfuture.get()μ νΈμΆνλ©΄, Sender μ€λ λκ° λΈλ‘컀λ‘λΆν° μλ΅μ λ°μ λκΉμ§ λ©μΈ μ€λ λλ λΈλ‘νΉsend()λ©μλμ μ½λ°±(Callback) ν¨μλ₯Ό μ λ¬νμ¬ μλ΅μ λ°μμ λ λΉλκΈ°μ μΌλ‘ κ²°κ³Ό μ²λ¦¬ κ°λ₯μΌλ°μ μΌλ‘ λΉλκΈ° λ°©μμ μ¬μ©ν΄ λμ μ²λ¦¬λμ ν보
νν°μ
λ μ λ΅
λ©μμ§κ° ν ν½μ μ¬λ¬ νν°μ μ€ μ΄λ κ³³μΌλ‘ μ μ‘λ μ§ κ²°μ νλ λ°©λ²μ λ€μκ³Ό κ°λ€.
1. ProducerRecordμ νν°μ
λ²νΈκ° λͺ
μλ κ²½μ°
ProducerRecordμ νν°μ
λ²νΈκ° λͺ
μλ κ²½μ°κ°μ₯ λμ μ°μ μμλ₯Ό κ°μ§λ©°, 무쑰건 ν΄λΉ νν°μ μΌλ‘ λ©μμ§κ° μ μ‘λλ€.
2. ν€κ° μλ κ²½μ°
λ©μμ§μ νΉμ ν€λ₯Ό ν λΉνλ©΄, νλ‘λμλ ν€μ ν΄μκ°(Hash Value)μ κ³μ°νμ¬ λ°μ΄ν°λ₯Ό λ³΄λΌ νν°μ μ μΌκ΄λκ² κ²°μ νλ€.
λμ μ리: λμΌν ν€λ₯Ό κ°μ§ λ©μμ§λ€μ νμ κ°μ ν΄μ κ°μ κ°μ§λ―λ‘, λ°λμ λμΌν νν°μ μΌλ‘ μ μ‘
μ£Όμ λͺ©μ : νΉμ μλ³μ(μ: μ¬μ©μ ID, μ£Όλ¬Έ λ²νΈ)λ₯Ό κΈ°μ€μΌλ‘ λ°μ΄ν°μ μ²λ¦¬ μμλ₯Ό 보μ₯ν΄μΌ ν λ μ¬μ©
3. ν€κ° μλ κ²½μ°: μ²λ¦¬λ κ·Ήλν
λ©μμ§μ ν€κ° μμΌλ©΄, νλ‘λμλ μμλ₯Ό 보μ₯ν νμκ° μλ€κ³ νλ¨νκ³ μ²λ¦¬λμ κ·Ήλννλ λ°©ν₯μΌλ‘ λμνλ€.
κ³Όκ±° λ°©μ (Round-Robin): λ©μμ§λ₯Ό νν°μ λ³λ‘ νλμ© μμ°¨ λΆλ°°
λΆνλ κ· λ±ν΄μ§μ§λ§, κ° νν°μ μΌλ‘ 보λ΄λ λ°°μΉκ° μκ² νμ±λμ΄ λ€νΈμν¬ μ€λ²ν€λκ° μ¦κ°νκ³ μ 체 μ²λ¦¬λμ΄ μ νλ μ μμ
μ΅μ λ°©μ (Sticky Partitioner): νλμ νν°μ μ λ©μμ§λ₯Ό μ§μ€μ μΌλ‘ λ³΄λ΄ λ°°μΉλ₯Ό μ΅λν
μ΄ λ°©μμ λ©μμ§λ₯Ό μ΅λν ν° λ°°μΉ(Batch)λ‘ λ¬Άμ΄ μ μ‘νλ―λ‘ λ€νΈμν¬ μ€λ²ν€λλ₯Ό μ΅μννκ³ μ 체 μ²λ¦¬λμ κ·Ήλνν©λλ€.
λμ λ°©μ
νλμ νν°μ μ μμλ‘ μ ν
λ°°μΉ λ²νΌκ° κ°λ μ°¨κ±°λ
linger.msμκ°μ΄ μ΄κ³Όλ λκΉμ§ ν΄λΉ νν°μ μλ§ λ©μμ§λ₯Ό κ³μ μ μ‘λ°°μΉκ° μ μ‘λλ©΄, λ€μ νν°μ μ μ ννμ¬ μ κ³Όμ μ λ°λ³΅
λ°°μΉ μ²λ¦¬μ μμΆ
νλ‘λμλ λ€νΈμν¬ ν¨μ¨μ±κ³Ό λΈλ‘컀 λΆνλ₯Ό μ€μ΄κΈ° μν΄ λ©μμ§λ₯Ό μμΆνκ³ λ°°μΉ λ¨μλ‘ λ¬Άμ΄ μ μ‘νλ λ°©μμ μ¬μ©νλ€.
batch.sizeνλμ λ°°μΉμ λ΄μ μ μλ λ©μμ§μ μ΅λ ν¬κΈ°(byte)
μ΄ ν¬κΈ°μ λλ¬νλ©΄ Sender μ€λ λλ μ¦μ λ©μμ§ λ°°μΉλ₯Ό μ μ‘
λ무 μμΌλ©΄ λ°°μΉ ν¨μ¨μ΄ λ¨μ΄μ§κ³ , λ무 ν¬λ©΄ λ©λͺ¨λ¦¬ μ¬μ©λμ΄ μ¦κ°
linger.msλ°°μΉκ°
batch.sizeμ λλ¬νμ§ μλλΌλ, Sender μ€λ λκ° λ©μμ§λ₯Ό 보λ΄κΈ° μ κΉμ§ λκΈ°νλ μ΅λ μκ°(ms)μ΄ κ°μ λ리면 μ§μ° μκ°μ μ¦κ°νμ§λ§, λ λ§μ λ©μμ§λ₯Ό νλμ λ°°μΉλ‘ λ¬Άμ μ μμ΄ μ²λ¦¬λ ν₯μ
compression.typeλ©μμ§ λ°°μΉλ₯Ό λΈλ‘μ»€λ‘ λ³΄λ΄κΈ° μ μ μμΆνμ¬ λ€νΈμν¬ λμν μ¬μ©λ κ°μ
snappy,lz4,gzip,zstdλ±μ μ΅μ μμΆμ κ°λ³ λ©μμ§κ° μλ λ°°μΉ λ¨μλ‘ μν
λ€νΈμν¬ μ€λ²ν€λλ₯Ό μ€μ¬ μ²λ¦¬λμ λμ΄λ λ° ν¨κ³Όμ μ΄μ§λ§, μμΆ/ν΄μ κ³Όμ μμ Producer / Consumer CPU μ¬μ©λμ΄ μ¦κ°ν μ μμ
μ μ‘ λ³΄μ₯μ μν Ack λ©μ»€λμ¦
μ μ‘ λ³΄μ₯μ μν΄ νλ‘λμλ λΈλ‘컀λ‘λΆν° λ©μμ§ μ μ‘ μ±κ³΅ μ¬λΆλ₯Ό νμΈνλ λ©μ»€λμ¦μΌλ‘, acks μ€μ μ λ°λΌ λ€μ μΈ κ°μ§ λͺ¨λκ° μ 곡λλ€.
acks=0
λΈλ‘컀 μλ΅ μμ΄ μ±κ³΅ μ²λ¦¬
λΉ λ₯Έ μ μ‘ μλ
λ°μ΄ν° μμ€ κ°λ₯μ± λμ
acks=1
리λκ° λ©μμ§ μμ μ μ±κ³΅
μ μ ν μ±λ₯κ³Ό μ λ’°μ± κ· ν
리λ μ₯μ μ λ°μ΄ν° μμ€ κ°λ₯
acks=all
λͺ¨λ ISR 볡μ μλ£ μ μ±κ³΅
κ°λ ₯ν λ΄κ΅¬μ± λ° λ°μ΄ν° 무결μ±
μ²λ¦¬ μ§μ° λ° μ±λ₯ μ ν κ°λ₯
μ¬μλμ λ©±λ±μ±
λ€νΈμν¬μ μΌμμ μΈ λ¬Έμ λ λΈλ‘컀μ 리λ μ μΆ κ³Όμ μμ λ©μμ§ μ μ‘μ μ€ν¨ν μ μκΈ° λλ¬Έμ, νλ‘λμλ μ¬μλ λ©μ»€λμ¦μ μ 곡νλ€.
μ¬μλ(
retries)μ μ‘ μ€ν¨ μ, νλ‘λμκ° μλμΌλ‘ μ¬μλνλ νμ μ§μ
μ¬μλ κ³Όμ μμ λ©μμ§ μ€λ³΅μ΄ λ°μ κ°λ₯
νλ‘λμκ° λ©μμ§λ₯Ό 보λκ³ λ¦¬λκ° μ±κ³΅μ μΌλ‘ μ μ₯
νλ‘λμμκ² Ackμ 보λ΄κΈ° μ§μ λ€νΈμν¬ λ¬Έμ λ°μ
νλ‘λμλ μ€ν¨λ‘ κ°μ£Όνκ³ μ¬μλνμ¬ λμΌν λ©μμ§λ₯Ό λ€μ μ μ‘
λ©±λ±μ± 보μ₯(
enable.idempotence=true)μ€λ³΅ λ¬Έμ λ₯Ό ν΄κ²°νκΈ° μν κΈ°λ₯
νλ‘λμλ κ³ μ ν PID(Producer ID)λ₯Ό ν λΉλ°κ³ , 보λ΄λ κ° λ©μμ§μ μνμ€ λ²νΈλ₯Ό λΆμ¬
λΈλ‘컀λ
(PID, νν°μ )μ‘°ν©λ³λ‘ λ§μ§λ§ μνμ€ λ²νΈλ₯Ό κΈ°λ‘μ΄λ³΄λ€ μκ±°λ κ°μ λ²νΈμ λ©μμ§κ° μ€λ©΄ μ€λ³΅μΌλ‘ κ°μ£Όνμ¬ μ μ₯νμ§ μκ³ Ackλ§ λ°ν
enable.idempotence=trueλ‘ μ€μ νλ©΄,acksλallλ‘,retriesλInteger.MAX_VALUEλ‘ μλ μ‘°μ λ°μ΄ν° μ μ€ μμ΄ μ νν ν λ²λ§ μ μ‘λλ κ²μ 보μ₯
max.in.flight.requests.per.connectionμ 5 μ΄νλ‘ μ€μ νμ¬ μ¬μλ μ λ©μμ§ μμκ° λ€λ°λλ λ¬Έμ λ°©μ§ κ°λ₯
Last updated
Was this helpful?