Producer Internals

ν”„λ‘œλ“€μ„œμ˜ λ©”μ‹œμ§€ 전솑 κ³Όμ •

ν”„λ‘œλ“€μ„œκ°€ send() λ©”μ†Œλ“œλ₯Ό ν˜ΈμΆœν•˜λ©΄ λ©”μ‹œμ§€λŠ” μ¦‰μ‹œ λ„€νŠΈμ›Œν¬λ₯Ό 톡해 μ „μ†‘ν•˜λŠ” 것이 μ•„λ‹Œ, λ‚΄λΆ€ 버퍼에 μ €μž₯된 ν›„ λ³„λ„μ˜ μŠ€λ ˆλ“œμ— μ˜ν•΄ 브둜컀둜 μ „μ†‘λ˜λŠ” 비동기 ꡬ쑰λ₯Ό κ°€μ§„λ‹€.

  • 메인 μŠ€λ ˆλ“œ(μ• ν”Œλ¦¬μΌ€μ΄μ…˜ μŠ€λ ˆλ“œ)

    • ProducerRecord 객체λ₯Ό μƒμ„±ν•˜κ³  send() λ©”μ†Œλ“œλ₯Ό 호좜

    • λ©”μ‹œμ§€λŠ” 직렬화(Serialize)되고, νŒŒν‹°μ…”λ„ˆ(Partitioner)에 μ˜ν•΄ λŒ€μƒ νŒŒν‹°μ…˜μ΄ κ²°μ •

    • κ²°μ •λœ νŒŒν‹°μ…˜μ— ν•΄λ‹Ήν•˜λŠ” 버퍼(RecordAccumulator λ‚΄λΆ€μ˜ Deque)에 λ©”μ‹œμ§€κ°€ μ €μž₯

  • Sender μŠ€λ ˆλ“œ

    • ν”„λ‘œλ“€μ„œ 내뢀에 μ‘΄μž¬ν•˜λŠ” λ³„λ„μ˜ λ°±κ·ΈλΌμš΄λ“œ μŠ€λ ˆλ“œ

    • RecordAccumulatorμ—μ„œ 전솑할 μ€€λΉ„κ°€ 된 λ©”μ‹œμ§€ 배치(Batch)λ₯Ό κ°€μ Έμ˜΄

    • ν•΄λ‹Ή 배치λ₯Ό λŒ€μƒ 브둜컀둜 μ „μ†‘ν•˜κ³ , 응닡(Acknowledgement)을 처리

  • 전솑 κ²°κ³Ό 확인

    • send() λ©”μ†Œλ“œλŠ” Future 객체 λ°˜ν™˜

    • future.get()을 ν˜ΈμΆœν•˜λ©΄, Sender μŠ€λ ˆλ“œκ°€ λΈŒλ‘œμ»€λ‘œλΆ€ν„° 응닡을 받을 λ•ŒκΉŒμ§€ 메인 μŠ€λ ˆλ“œλŠ” λΈ”λ‘œν‚Ή

    • send() λ©”μ†Œλ“œμ— 콜백(Callback) ν•¨μˆ˜λ₯Ό μ „λ‹¬ν•˜μ—¬ 응닡을 λ°›μ•˜μ„ λ•Œ λΉ„λ™κΈ°μ μœΌλ‘œ κ²°κ³Ό 처리 κ°€λŠ₯

      • 일반적으둜 비동기 방식을 μ‚¬μš©ν•΄ 높은 μ²˜λ¦¬λŸ‰μ„ 확보

νŒŒν‹°μ…”λ‹ μ „λž΅

λ©”μ‹œμ§€κ°€ ν† ν”½μ˜ μ—¬λŸ¬ νŒŒν‹°μ…˜ 쀑 μ–΄λŠ 곳으둜 전솑될지 κ²°μ •ν•˜λŠ” 방법은 λ‹€μŒκ³Ό κ°™λ‹€.

1. ProducerRecord에 νŒŒν‹°μ…˜ λ²ˆν˜Έκ°€ λͺ…μ‹œλœ 경우

κ°€μž₯ 높은 μš°μ„ μˆœμœ„λ₯Ό κ°€μ§€λ©°, 무쑰건 ν•΄λ‹Ή νŒŒν‹°μ…˜μœΌλ‘œ λ©”μ‹œμ§€κ°€ μ „μ†‘λœλ‹€.

2. ν‚€κ°€ μžˆλŠ” 경우

λ©”μ‹œμ§€μ— νŠΉμ • ν‚€λ₯Ό ν• λ‹Ήν•˜λ©΄, ν”„λ‘œλ“€μ„œλŠ” ν‚€μ˜ ν•΄μ‹œκ°’(Hash Value)을 κ³„μ‚°ν•˜μ—¬ 데이터λ₯Ό 보낼 νŒŒν‹°μ…˜μ„ μΌκ΄€λ˜κ²Œ κ²°μ •ν•œλ‹€.

  • λ™μž‘ 원리: λ™μΌν•œ ν‚€λ₯Ό κ°€μ§„ λ©”μ‹œμ§€λ“€μ€ 항상 같은 ν•΄μ‹œ 값을 κ°€μ§€λ―€λ‘œ, λ°˜λ“œμ‹œ λ™μΌν•œ νŒŒν‹°μ…˜μœΌλ‘œ 전솑

  • μ£Όμš” λͺ©μ : νŠΉμ • μ‹λ³„μž(예: μ‚¬μš©μž ID, μ£Όλ¬Έ 번호)λ₯Ό κΈ°μ€€μœΌλ‘œ λ°μ΄ν„°μ˜ 처리 μˆœμ„œλ₯Ό 보μž₯ν•΄μ•Ό ν•  λ•Œ μ‚¬μš©

3. ν‚€κ°€ μ—†λŠ” 경우: μ²˜λ¦¬λŸ‰ κ·ΉλŒ€ν™”

λ©”μ‹œμ§€μ— ν‚€κ°€ μ—†μœΌλ©΄, ν”„λ‘œλ“€μ„œλŠ” μˆœμ„œλ₯Ό 보μž₯ν•  ν•„μš”κ°€ μ—†λ‹€κ³  νŒλ‹¨ν•˜κ³  μ²˜λ¦¬λŸ‰μ„ κ·ΉλŒ€ν™”ν•˜λŠ” λ°©ν–₯으둜 λ™μž‘ν•œλ‹€.

  • κ³Όκ±° 방식 (Round-Robin): λ©”μ‹œμ§€λ₯Ό νŒŒν‹°μ…˜λ³„λ‘œ ν•˜λ‚˜μ”© 순차 λΆ„λ°°

    • λΆ€ν•˜λŠ” κ· λ“±ν•΄μ§€μ§€λ§Œ, 각 νŒŒν‹°μ…˜μœΌλ‘œ λ³΄λ‚΄λŠ” λ°°μΉ˜κ°€ μž‘κ²Œ ν˜•μ„±λ˜μ–΄ λ„€νŠΈμ›Œν¬ μ˜€λ²„ν—€λ“œκ°€ μ¦κ°€ν•˜κ³  전체 μ²˜λ¦¬λŸ‰μ΄ μ €ν•˜λ  수 있음

  • μ΅œμ‹  방식 (Sticky Partitioner): ν•˜λ‚˜μ˜ νŒŒν‹°μ…˜μ— λ©”μ‹œμ§€λ₯Ό μ§‘μ€‘μ μœΌλ‘œ 보내 배치λ₯Ό μ΅œλŒ€ν™”

    • 이 방식은 λ©”μ‹œμ§€λ₯Ό μ΅œλŒ€ν•œ 큰 배치(Batch)둜 λ¬Άμ–΄ μ „μ†‘ν•˜λ―€λ‘œ λ„€νŠΈμ›Œν¬ μ˜€λ²„ν—€λ“œλ₯Ό μ΅œμ†Œν™”ν•˜κ³  전체 μ²˜λ¦¬λŸ‰μ„ κ·ΉλŒ€ν™”ν•©λ‹ˆλ‹€.

    • λ™μž‘ 방식

      1. ν•˜λ‚˜μ˜ νŒŒν‹°μ…˜μ„ μž„μ˜λ‘œ 선택

      2. 배치 버퍼가 가득 μ°¨κ±°λ‚˜ linger.ms μ‹œκ°„μ΄ 초과될 λ•ŒκΉŒμ§€ ν•΄λ‹Ή νŒŒν‹°μ…˜μ—λ§Œ λ©”μ‹œμ§€λ₯Ό 계속 전솑

      3. λ°°μΉ˜κ°€ μ „μ†‘λ˜λ©΄, λ‹€μŒ νŒŒν‹°μ…˜μ„ μ„ νƒν•˜μ—¬ μœ„ 과정을 반볡

배치 μ²˜λ¦¬μ™€ μ••μΆ•

ν”„λ‘œλ“€μ„œλŠ” λ„€νŠΈμ›Œν¬ νš¨μœ¨μ„±κ³Ό 브둜컀 λΆ€ν•˜λ₯Ό 쀄이기 μœ„ν•΄ λ©”μ‹œμ§€λ₯Ό μ••μΆ•ν•˜κ³  배치 λ‹¨μœ„λ‘œ λ¬Άμ–΄ μ „μ†‘ν•˜λŠ” 방식을 μ‚¬μš©ν•œλ‹€.

  • batch.size

    • ν•˜λ‚˜μ˜ λ°°μΉ˜μ— 담을 수 μžˆλŠ” λ©”μ‹œμ§€μ˜ μ΅œλŒ€ 크기(byte)

    • 이 크기에 λ„λ‹¬ν•˜λ©΄ Sender μŠ€λ ˆλ“œλŠ” μ¦‰μ‹œ λ©”μ‹œμ§€ 배치λ₯Ό 전솑

    • λ„ˆλ¬΄ μž‘μœΌλ©΄ 배치 효율이 λ–¨μ–΄μ§€κ³ , λ„ˆλ¬΄ 크면 λ©”λͺ¨λ¦¬ μ‚¬μš©λŸ‰μ΄ 증가

  • linger.ms

    • λ°°μΉ˜κ°€ batch.size에 λ„λ‹¬ν•˜μ§€ μ•Šλ”λΌλ„, Sender μŠ€λ ˆλ“œκ°€ λ©”μ‹œμ§€λ₯Ό 보내기 μ „κΉŒμ§€ λŒ€κΈ°ν•˜λŠ” μ΅œλŒ€ μ‹œκ°„(ms)

    • 이 값을 늘리면 μ§€μ—° μ‹œκ°„μ€ μ¦κ°€ν•˜μ§€λ§Œ, 더 λ§Žμ€ λ©”μ‹œμ§€λ₯Ό ν•˜λ‚˜μ˜ 배치둜 묢을 수 μžˆμ–΄ μ²˜λ¦¬λŸ‰ ν–₯상

  • compression.type

    • λ©”μ‹œμ§€ 배치λ₯Ό 브둜컀둜 보내기 전에 μ••μΆ•ν•˜μ—¬ λ„€νŠΈμ›Œν¬ λŒ€μ—­ν­ μ‚¬μš©λŸ‰ κ°μ†Œ

    • snappy, lz4, gzip, zstd λ“±μ˜ μ˜΅μ…˜

    • 압좕은 κ°œλ³„ λ©”μ‹œμ§€κ°€ μ•„λ‹Œ 배치 λ‹¨μœ„λ‘œ μˆ˜ν–‰

    • λ„€νŠΈμ›Œν¬ μ˜€λ²„ν—€λ“œλ₯Ό 쀄여 μ²˜λ¦¬λŸ‰μ„ λ†’μ΄λŠ” 데 νš¨κ³Όμ μ΄μ§€λ§Œ, μ••μΆ•/ν•΄μ œ κ³Όμ •μ—μ„œ Producer / Consumer CPU μ‚¬μš©λŸ‰μ΄ 증가할 수 있음

전솑 보μž₯을 μœ„ν•œ Ack λ©”μ»€λ‹ˆμ¦˜

전솑 보μž₯을 μœ„ν•΄ ν”„λ‘œλ“€μ„œλŠ” λΈŒλ‘œμ»€λ‘œλΆ€ν„° λ©”μ‹œμ§€ 전솑 성곡 μ—¬λΆ€λ₯Ό ν™•μΈν•˜λŠ” λ©”μ»€λ‹ˆμ¦˜μœΌλ‘œ, acks 섀정에 따라 λ‹€μŒ μ„Έ κ°€μ§€ λͺ¨λ“œκ°€ μ œκ³΅λœλ‹€.

acks λͺ¨λ“œ
λ™μž‘
μž₯점
단점

acks=0

브둜컀 응닡 없이 성곡 처리

λΉ λ₯Έ 전솑 속도

데이터 손싀 κ°€λŠ₯μ„± λ†’μŒ

acks=1

리더가 λ©”μ‹œμ§€ μˆ˜μ‹  μ‹œ 성곡

μ μ ˆν•œ μ„±λŠ₯κ³Ό μ‹ λ’°μ„± κ· ν˜•

리더 μž₯μ•  μ‹œ 데이터 손싀 κ°€λŠ₯

acks=all

λͺ¨λ“  ISR 볡제 μ™„λ£Œ μ‹œ 성곡

κ°•λ ₯ν•œ 내ꡬ성 및 데이터 무결성

처리 μ§€μ—° 및 μ„±λŠ₯ μ €ν•˜ κ°€λŠ₯

μž¬μ‹œλ„μ™€ λ©±λ“±μ„±

λ„€νŠΈμ›Œν¬μ˜ μΌμ‹œμ μΈ λ¬Έμ œλ‚˜ 브둜컀의 리더 μ„ μΆœ κ³Όμ •μ—μ„œ λ©”μ‹œμ§€ 전솑은 μ‹€νŒ¨ν•  수 있기 λ•Œλ¬Έμ—, ν”„λ‘œλ“€μ„œλŠ” μž¬μ‹œλ„ λ©”μ»€λ‹ˆμ¦˜μ„ μ œκ³΅ν•œλ‹€.

  • μž¬μ‹œλ„(retries)

    • 전솑 μ‹€νŒ¨ μ‹œ, ν”„λ‘œλ“€μ„œκ°€ μžλ™μœΌλ‘œ μž¬μ‹œλ„ν•˜λŠ” 횟수 μ§€μ •

    • μž¬μ‹œλ„ κ³Όμ •μ—μ„œ λ©”μ‹œμ§€ 쀑볡이 λ°œμƒ κ°€λŠ₯

      1. ν”„λ‘œλ“€μ„œκ°€ λ©”μ‹œμ§€λ₯Ό λ³΄λƒˆκ³  리더가 μ„±κ³΅μ μœΌλ‘œ μ €μž₯

      2. ν”„λ‘œλ“€μ„œμ—κ²Œ Ack을 보내기 직전 λ„€νŠΈμ›Œν¬ 문제 λ°œμƒ

      3. ν”„λ‘œλ“€μ„œλŠ” μ‹€νŒ¨λ‘œ κ°„μ£Όν•˜κ³  μž¬μ‹œλ„ν•˜μ—¬ λ™μΌν•œ λ©”μ‹œμ§€λ₯Ό λ‹€μ‹œ 전솑

  • λ©±λ“±μ„± 보μž₯(enable.idempotence=true)

    • 쀑볡 문제λ₯Ό ν•΄κ²°ν•˜κΈ° μœ„ν•œ κΈ°λŠ₯

    • ν”„λ‘œλ“€μ„œλŠ” κ³ μœ ν•œ PID(Producer ID)λ₯Ό ν• λ‹Ήλ°›κ³ , λ³΄λ‚΄λŠ” 각 λ©”μ‹œμ§€μ— μ‹œν€€μŠ€ 번호λ₯Ό λΆ€μ—¬

    • λΈŒλ‘œμ»€λŠ” (PID, νŒŒν‹°μ…˜) μ‘°ν•©λ³„λ‘œ λ§ˆμ§€λ§‰ μ‹œν€€μŠ€ 번호λ₯Ό 기둝

      • 이보닀 μž‘κ±°λ‚˜ 같은 번호의 λ©”μ‹œμ§€κ°€ 였면 μ€‘λ³΅μœΌλ‘œ κ°„μ£Όν•˜μ—¬ μ €μž₯ν•˜μ§€ μ•Šκ³  Ack만 λ°˜ν™˜

    • enable.idempotence=true둜 μ„€μ •ν•˜λ©΄, acksλŠ” all둜, retriesλŠ” Integer.MAX_VALUE둜 μžλ™ μ‘°μ •

      • 데이터 μœ μ‹€ 없이 μ •ν™•νžˆ ν•œ 번만 μ „μ†‘λ˜λŠ” 것을 보μž₯

    • max.in.flight.requests.per.connection을 5 μ΄ν•˜λ‘œ μ„€μ •ν•˜μ—¬ μž¬μ‹œλ„ μ‹œ λ©”μ‹œμ§€ μˆœμ„œκ°€ λ’€λ°”λ€ŒλŠ” 문제 λ°©μ§€ κ°€λŠ₯

Last updated

Was this helpful?