diff --git a/src/producer.cpp b/src/producer.cpp index af138d04..29f11751 100644 --- a/src/producer.cpp +++ b/src/producer.cpp @@ -131,10 +131,16 @@ void Producer::do_produce(const MessageBuilder& builder, RD_KAFKA_V_MSGFLAGS(policy), RD_KAFKA_V_TIMESTAMP(builder.timestamp().count()), RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()), - RD_KAFKA_V_HEADERS(headers.release_handle()), //pass ownership to rdkafka + RD_KAFKA_V_HEADERS(headers.get_handle()), //pass ownership to rdkafka RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()), RD_KAFKA_V_OPAQUE(builder.user_data()), RD_KAFKA_V_END); + // We only want to release the handle on the headers data + // if the rd_kafka_producev function returned no error, otherwise + // the function doesn't take ownership of the headers data. + if(result == RD_KAFKA_RESP_ERR_NO_ERROR) { + headers.release_handle(); + } check_error(result); } @@ -150,10 +156,16 @@ void Producer::do_produce(const Message& message, RD_KAFKA_V_MSGFLAGS(policy), RD_KAFKA_V_TIMESTAMP(duration), RD_KAFKA_V_KEY((void*)key.get_data(), key.get_size()), - RD_KAFKA_V_HEADERS(headers.release_handle()), //pass ownership to rdkafka + RD_KAFKA_V_HEADERS(headers.get_handle()), //pass ownership to rdkafka RD_KAFKA_V_VALUE((void*)payload.get_data(), payload.get_size()), RD_KAFKA_V_OPAQUE(message.get_user_data()), RD_KAFKA_V_END); + // We only want to release the handle on the headers data + // if the rd_kafka_producev function returned no error, otherwise + // the function doesn't take ownership of the headers data. + if(result == RD_KAFKA_RESP_ERR_NO_ERROR) { + headers.release_handle(); + } check_error(result); }