Project

General

Profile

Actions

Bug #61254

open

kafka: potential race condition between removing and adding connection to the connection list

Added by Yuval Lifshitz 11 months ago. Updated 11 months ago.

Status:
Pending Backport
Priority:
Normal
Target version:
-
% Done:

0%

Source:
Q/A
Tags:
kafka backport_processed
Backport:
reef, quincy
Regression:
No
Severity:
3 - minor
Reviewed:
Affected Versions:
ceph-qa-suite:
Pull request ID:
Crash signature (v1):
Crash signature (v2):

Description

after the connection goes idle, we remove it fro mthe list, however we don't guard this operation with a mutex


Files

xab (48.4 KB) xab Oguzhan Ozmen, 05/22/2023 07:57 PM
xaa (1000 KB) xaa Oguzhan Ozmen, 05/22/2023 07:57 PM

Related issues 2 (2 open0 closed)

Copied to rgw - Backport #61478: quincy: kafka: potential race condition between removing and adding connection to the connection listIn ProgressYuval LifshitzActions
Copied to rgw - Backport #61479: reef: kafka: potential race condition between removing and adding connection to the connection listIn ProgressYuval LifshitzActions
Actions #1

Updated by Yuval Lifshitz 11 months ago

  • Status changed from New to Fix Under Review
  • Pull request ID set to 51575

Updated by Oguzhan Ozmen 11 months ago

gzip'ed file is split into 2: cat xaa xab > rgw_log.txt.gz`.
Only 1 rgw instances was left in the cluster. After setting `debug_rgw` to 20, that single instance was restarted and ~1 min later thread crash observed:

2023-05-22T19:04:51.649+0000 7f3d10189700 -1 *** Caught signal (Segmentation fault) **
 in thread 7f3d10189700 thread_name:kafka_manager

 ceph version 18.0.0-0.bb.feature_pr5k_debug (94cd23f4947b81cb72d7f1111f67ffead49d632b) reef (dev)
 1: /lib64/libpthread.so.0(+0x12ce0) [0x7f455cdcdce0]
 2: __pthread_mutex_lock()
 3: /lib64/librdkafka.so.1(+0x73a4d) [0x7f455d6c3a4d]
 4: rd_kafka_flush()
 5: (rgw::kafka::connection_t::destroy(int)+0x6e) [0x561c080a29ce]
 6: (rgw::kafka::Manager::publish_internal(rgw::kafka::message_wrapper_t*)+0xc5b) [0x561c080a39cb]
 7: (rgw::kafka::Manager::run()+0x28c) [0x561c080a5b2c]
 8: /lib64/libstdc++.so.6(+0xc2ba3) [0x7f455c7d4ba3]
 9: /lib64/libpthread.so.0(+0x81cf) [0x7f455cdc31cf]
 10: clone()
Actions #3

Updated by Krunal Chheda 11 months ago

This is the log line before the crash and lot of these log lines.

Kafka run: n/ack received, invoking callback with tag=14 and result=Broker: Unknown topic or partition

So the broker is definitely not offline, but its ack'ing back saying unknown topic !

Actions #4

Updated by Krunal Chheda 11 months ago

So looks like there are 2 places where the connection is destroyed but not removed from connection_list
and during the destroying of connection, its not removed from the list !
so for some reason you get following error while you post error

2023-05-22T19:04:51.647+0000 7f3d10189700 10 Kafka publish: failed to produce: Local: Unknown topic

And due to following error connection is destroyed as seen below but not removed from the list !
const auto rc = rd_kafka_produce(
            topic,
            // TODO: non builtin partitioning
            RD_KAFKA_PARTITION_UA,
            // make a copy of the payload
            // so it is safe to pass the pointer from the string
            RD_KAFKA_MSG_F_COPY,
            message->message.data(),
            message->message.length(),
            // optional key and its length
            nullptr, 
            0,
            // opaque data: tag, used in the global callback
            // in order to invoke the real callback
            // null if no callback exists
            tag);
    if (rc == -1) {
      const auto err = rd_kafka_last_error();
      ldout(conn->cct, 10) << "Kafka publish: failed to produce: " << rd_kafka_err2str(err) << dendl;
      // TODO: dont error on full queue, and don't destroy connection, retry instead
      // immediatly invoke callback on error if needed
      if (message->cb) {
        message->cb(err);
      }
      conn->destroy(err);

Actions #5

Updated by Yuval Lifshitz 11 months ago

Krunal, thanks for providing the information and logs.
I have this PR: https://github.com/ceph/ceph/pull/51662 to make sure that the destroy() call and the callbacks would only be invoked from the kafka thread.
Currently the connection is used as a shared pointer, used by both the persistent notification thread and the kafka thread. the danger is that both threads will call "destroy()" on the connection at the same time, invoking the same callback.

In addition, I agree with your doubt on destroying the connection when "rd_kafka_produce()" return with an error, without removing it from the list.
Best solution is probably not to call "destroy()" in these cases, and let the internal kafka library handle the errors. will submit a separate PR forthat

Actions #6

Updated by Casey Bodley 11 months ago

i'm seeing a strange kafka-related failure in the reef bucket notification tests: https://pulpito.ceph.com/yuriw-2023-05-26_11:45:23-rgw-wip-yuri-testing-2023-05-25-1927-reef-distro-default-smithi/7287055/

2023-05-26T12:52:08.185 INFO:teuthology.orchestra.run.smithi008.stderr:test pushing persistent notification kafka ... FAIL
2023-05-26T12:52:08.186 INFO:teuthology.orchestra.run.smithi008.stderr:
2023-05-26T12:52:08.186 INFO:teuthology.orchestra.run.smithi008.stderr:======================================================================
2023-05-26T12:52:08.186 INFO:teuthology.orchestra.run.smithi008.stderr:FAIL: test pushing persistent notification kafka
2023-05-26T12:52:08.186 INFO:teuthology.orchestra.run.smithi008.stderr:----------------------------------------------------------------------
2023-05-26T12:52:08.186 INFO:teuthology.orchestra.run.smithi008.stderr:Traceback (most recent call last):
2023-05-26T12:52:08.186 INFO:teuthology.orchestra.run.smithi008.stderr:  File "/usr/lib/python3.6/site-packages/nose/case.py", line 197, in runTest
2023-05-26T12:52:08.187 INFO:teuthology.orchestra.run.smithi008.stderr:    self.test(*self.arg)
2023-05-26T12:52:08.187 INFO:teuthology.orchestra.run.smithi008.stderr:  File "/home/ubuntu/cephtest/ceph/src/test/rgw/bucket_notification/test_bn.py", line 3558, in test_ps_s3_persistent_notification_kafka
2023-05-26T12:52:08.187 INFO:teuthology.orchestra.run.smithi008.stderr:    persistent_notification('kafka')
2023-05-26T12:52:08.187 INFO:teuthology.orchestra.run.smithi008.stderr:  File "/home/ubuntu/cephtest/ceph/src/test/rgw/bucket_notification/test_bn.py", line 3513, in persistent_notification
2023-05-26T12:52:08.187 INFO:teuthology.orchestra.run.smithi008.stderr:    receiver.verify_s3_events(keys, exact_match=exact_match, deletions=False)
2023-05-26T12:52:08.187 INFO:teuthology.orchestra.run.smithi008.stderr:  File "/home/ubuntu/cephtest/ceph/src/test/rgw/bucket_notification/test_bn.py", line 440, in verify_s3_events
2023-05-26T12:52:08.187 INFO:teuthology.orchestra.run.smithi008.stderr:    verify_s3_records_by_elements(self.events, keys, exact_match=exact_match, deletions=deletions, etags=etags)
2023-05-26T12:52:08.188 INFO:teuthology.orchestra.run.smithi008.stderr:  File "/home/ubuntu/cephtest/ceph/src/test/rgw/bucket_notification/test_bn.py", line 393, in verify_s3_records_by_elements
2023-05-26T12:52:08.188 INFO:teuthology.orchestra.run.smithi008.stderr:    assert False, err
2023-05-26T12:52:08.188 INFO:teuthology.orchestra.run.smithi008.stderr:AssertionError: no creation event found for key: <Key: skgtxr-3,84>
2023-05-26T12:52:08.188 INFO:teuthology.orchestra.run.smithi008.stderr:-------------------- >> begin captured logging << --------------------
2023-05-26T12:52:08.188 INFO:teuthology.orchestra.run.smithi008.stderr:kafka.client: WARNING: Unable to send to wakeup socket! 
2023-05-26T12:52:08.188 INFO:teuthology.orchestra.run.smithi008.stderr:kafka.consumer.fetcher: DEBUG: Adding fetch request for partition TopicPartition(topic='skgtxr-2_topic_1', partition=0) at offset 40
2023-05-26T12:52:08.188 INFO:teuthology.orchestra.run.smithi008.stderr:kafka.client: WARNING: Unable to send to wakeup socket! 
2023-05-26T12:52:08.188 INFO:teuthology.orchestra.run.smithi008.stderr:kafka.consumer.fetcher: DEBUG: Adding fetch request for partition TopicPartition(topic='skgtxr-2_topic_1', partition=0) at offset 40
2023-05-26T12:52:08.189 INFO:teuthology.orchestra.run.smithi008.stderr:kafka.client: WARNING: Unable to send to wakeup socket! 
2023-05-26T12:52:08.189 INFO:teuthology.orchestra.run.smithi008.stderr:kafka.consumer.fetcher: DEBUG: Adding fetch request for partition TopicPartition(topic='skgtxr-2_topic_1', partition=0) at offset 40
2023-05-26T12:52:08.189 INFO:teuthology.orchestra.run.smithi008.stderr:kafka.client: WARNING: Unable to send to wakeup socket! 
2023-05-26T12:52:08.189 INFO:teuthology.orchestra.run.smithi008.stderr:kafka.consumer.fetcher: DEBUG: Adding fetch request for partition TopicPartition(topic='skgtxr-2_topic_1', partition=0) at offset 40
2023-05-26T12:52:08.189 INFO:teuthology.orchestra.run.smithi008.stderr:kafka.client: WARNING: Unable to send to wakeup socket! 
2023-05-26T12:52:08.189 INFO:teuthology.orchestra.run.smithi008.stderr:kafka.consumer.fetcher: DEBUG: Adding fetch request for partition TopicPartition(topic='skgtxr-2_topic_1', partition=0) at offset 40
2023-05-26T12:52:08.189 INFO:teuthology.orchestra.run.smithi008.stderr:kafka.client: WARNING: Unable to send to wakeup socket!
2023-05-26T12:52:08.189 INFO:teuthology.orchestra.run.smithi008.stderr:kafka.consumer.fetcher: DEBUG: Adding fetch request for partition TopicPartition(topic='skgtxr-2_topic_1', partition=0) at offset 40
2023-05-26T12:52:08.190 INFO:teuthology.orchestra.run.smithi008.stderr:kafka.client: WARNING: Unable to send to wakeup socket!
2023-05-26T12:52:08.190 INFO:teuthology.orchestra.run.smithi008.stderr:kafka.consumer.fetcher: DEBUG: Adding fetch request for partition TopicPartition(topic='skgtxr-2_topic_1', partition=0) at offset 40
2023-05-26T12:52:08.190 INFO:teuthology.orchestra.run.smithi008.stderr:kafka.client: WARNING: Unable to send to wakeup socket!
2023-05-26T12:52:08.190 INFO:teuthology.orchestra.run.smithi008.stderr:kafka.consumer.fetcher: DEBUG: Adding fetch request for partition TopicPartition(topic='skgtxr-2_topic_1', partition=0) at offset 40

those WARNING/DEBUG messages then repeated in a tight loop for 20+ minutes, filling up the teuthology.log, until:

2023-05-26T13:16:22.193 INFO:teuthology.orchestra.run.smithi008.stderr:kafka.client: WARNING: Unable to send to wakeup socket!
2023-05-26T13:16:22.194 INFO:teuthology.orchestra.run.smithi008.stderr:kafka.consumer.fetcher: DEBUG: Adding fetch request for partition TopicPartition(topic='skgtxr-2_topic_1', partition=0) at offset 40
2023-05-26T13:16:22.194 INFO:teuthology.orchestra.run.smithi008.stderr:--------------------- >> end captured logging << ---------------------
2023-05-26T13:16:22.194 INFO:teuthology.orchestra.run.smithi008.stderr:
2023-05-26T13:16:22.194 INFO:teuthology.orchestra.run.smithi008.stderr:----------------------------------------------------------------------
2023-05-26T13:16:22.194 INFO:teuthology.orchestra.run.smithi008.stderr:Ran 3 tests in 1232.641s
2023-05-26T13:16:22.194 INFO:teuthology.orchestra.run.smithi008.stderr:
2023-05-26T13:16:22.194 INFO:teuthology.orchestra.run.smithi008.stderr:FAILED (failures=1)
2023-05-26T13:16:22.195 INFO:teuthology.orchestra.run.smithi008.stdout:Kafka consumer created on topic: skgtxr-1_topic_1
2023-05-26T13:16:22.195 INFO:teuthology.orchestra.run.smithi008.stdout:Kafka receiver started
2023-05-26T13:16:22.195 INFO:teuthology.orchestra.run.smithi008.stdout:average time for creation + kafka notification is: 113.54601383209229 milliseconds
2023-05-26T13:16:22.195 INFO:teuthology.orchestra.run.smithi008.stdout:wait for 5sec for the messages...
2023-05-26T13:16:22.196 INFO:teuthology.orchestra.run.smithi008.stdout:average time for deletion + kafka notification is: 52.690887451171875 milliseconds
2023-05-26T13:16:22.196 INFO:teuthology.orchestra.run.smithi008.stdout:wait for 5sec for the messages...
2023-05-26T13:16:22.196 INFO:teuthology.orchestra.run.smithi008.stdout:send: b'DELETE /skgtxr-1?notification HTTP/1.1\r\nAccept-Encoding: identity\r\nContent-Length: 0\r\nAuthorization: AWS LTLEWFFFRWPECLGMMWEY:A/s4sgKciY0kJhbWVAdyL236kac=\r\nDate: Fri, 26 May 2023 12:31:47 +0000\r\nHost: smithi008.front.sepia.ceph.com:80\r\n\r\n'
2023-05-26T13:16:22.196 INFO:teuthology.orchestra.run.smithi008.stdout:Zero length chunk ignored
2023-05-26T13:16:22.196 INFO:teuthology.orchestra.run.smithi008.stdout:reply: 'HTTP/1.1 200 OK\r\n'
2023-05-26T13:16:22.196 INFO:teuthology.orchestra.run.smithi008.stdout:header: x-amz-request-id: tx0000056c0450a76e9b41d-006470a6b3-11ac-default
2023-05-26T13:16:22.197 INFO:teuthology.orchestra.run.smithi008.stdout:header: Content-Length: 0
2023-05-26T13:16:22.197 INFO:teuthology.orchestra.run.smithi008.stdout:header: Date: Fri, 26 May 2023 12:31:47 GMT
2023-05-26T13:16:22.197 INFO:teuthology.orchestra.run.smithi008.stdout:header: Connection: Keep-Alive
2023-05-26T13:16:22.197 INFO:teuthology.orchestra.run.smithi008.stdout:Kafka consumer created on topic: skgtxr-2_topic_1
2023-05-26T13:16:22.197 INFO:teuthology.orchestra.run.smithi008.stdout:Kafka receiver started
2023-05-26T13:16:22.197 INFO:teuthology.orchestra.run.smithi008.stdout:average time for creation + kafka notification is: 31477.905297279358 milliseconds
2023-05-26T13:16:22.197 INFO:teuthology.orchestra.run.smithi008.stdout:wait for 5sec for the messages...
2023-05-26T13:16:22.198 INFO:teuthology.orchestra.run.smithi008.stdout:average time for deletion + kafka notification is: 437.2687816619873 milliseconds
2023-05-26T13:16:22.198 INFO:teuthology.orchestra.run.smithi008.stdout:wait for 5sec for the messages...
2023-05-26T13:16:22.198 INFO:teuthology.orchestra.run.smithi008.stdout:waiting for 10sec for checking idleness
2023-05-26T13:16:22.198 INFO:teuthology.orchestra.run.smithi008.stdout:average time for creation + kafka notification is: 30841.525721549988 milliseconds
2023-05-26T13:16:22.198 INFO:teuthology.orchestra.run.smithi008.stdout:wait for 5sec for the messages...
2023-05-26T13:16:22.198 INFO:teuthology.orchestra.run.smithi008.stdout:average time for deletion + kafka notification is: 453.22699546813965 milliseconds
2023-05-26T13:16:22.198 INFO:teuthology.orchestra.run.smithi008.stdout:wait for 5sec for the messages...

do you think the attached PR would resolve this? or should we track it separately?

Actions #7

Updated by Yuval Lifshitz 11 months ago

i don't think these issues are related. they are not happening because of RGW crash.
these are issues with the kafka python client we are using to consume the messages.

i think this is related to: https://github.com/dpkp/kafka-python/issues/2073
submitted a test tracker: https://tracker.ceph.com/issues/61477

Actions #8

Updated by Yuval Lifshitz 11 months ago

  • Status changed from Fix Under Review to Pending Backport
Actions #9

Updated by Backport Bot 11 months ago

  • Copied to Backport #61478: quincy: kafka: potential race condition between removing and adding connection to the connection list added
Actions #10

Updated by Backport Bot 11 months ago

  • Copied to Backport #61479: reef: kafka: potential race condition between removing and adding connection to the connection list added
Actions #11

Updated by Backport Bot 11 months ago

  • Tags changed from kafka to kafka backport_processed
Actions #12

Updated by Krunal Chheda 11 months ago

Hi Yuval,
do we have a consistent reproducer for this issue ? Crash happening on persistent kafka notifications ?

Actions

Also available in: Atom PDF