git: d43cf0650ae2 - main - net-p2p/pulsar-client-cpp: update 3.7.0 → 4.1.0

From: Yuri Victorovich <yuri_at_FreeBSD.org>
Date: Fri, 17 Apr 2026 06:50:45 UTC
The branch main has been updated by yuri:

URL: https://cgit.FreeBSD.org/ports/commit/?id=d43cf0650ae2bf87cf08041e31ad9c2d476eca4d

commit d43cf0650ae2bf87cf08041e31ad9c2d476eca4d
Author:     Yuri Victorovich <yuri@FreeBSD.org>
AuthorDate: 2026-04-17 06:27:31 +0000
Commit:     Yuri Victorovich <yuri@FreeBSD.org>
CommitDate: 2026-04-17 06:50:35 +0000

    net-p2p/pulsar-client-cpp: update 3.7.0 → 4.1.0
---
 net-p2p/pulsar-client-cpp/Makefile                 |    3 +-
 net-p2p/pulsar-client-cpp/distinfo                 |    6 +-
 .../patch-cc30a7b5f52c6d6d7ff94d93a9509fc3f6becc83 | 1111 --------------------
 .../files/patch-lib_AutoClusterFailover.cc         |   11 +
 net-p2p/pulsar-client-cpp/pkg-plist                |    4 +
 5 files changed, 19 insertions(+), 1116 deletions(-)

diff --git a/net-p2p/pulsar-client-cpp/Makefile b/net-p2p/pulsar-client-cpp/Makefile
index cbc0de2f52ad..2e8611438f09 100644
--- a/net-p2p/pulsar-client-cpp/Makefile
+++ b/net-p2p/pulsar-client-cpp/Makefile
@@ -1,7 +1,6 @@
 PORTNAME=	pulsar-client-cpp # this port requires instruction sets crc32, pclmul above the default sse2
 DISTVERSIONPREFIX=	v
-DISTVERSION=	3.7.0
-PORTREVISION=	3
+DISTVERSION=	4.1.0
 CATEGORIES=	net-p2p
 
 MAINTAINER=	yuri@FreeBSD.org
diff --git a/net-p2p/pulsar-client-cpp/distinfo b/net-p2p/pulsar-client-cpp/distinfo
index 3d7c80b515e0..0648f4df6ce5 100644
--- a/net-p2p/pulsar-client-cpp/distinfo
+++ b/net-p2p/pulsar-client-cpp/distinfo
@@ -1,3 +1,3 @@
-TIMESTAMP = 1743152964
-SHA256 (apache-pulsar-client-cpp-v3.7.0_GH0.tar.gz) = 33d6ea82e1f03a2e77f85d3b6ee8e3ac37bfd760ea450537ec2e59ef122c4671
-SIZE (apache-pulsar-client-cpp-v3.7.0_GH0.tar.gz) = 1604627
+TIMESTAMP = 1776406245
+SHA256 (apache-pulsar-client-cpp-v4.1.0_GH0.tar.gz) = 172c9697caf62551c336e0bf64a136ccdff0674e9ea66f94f3f60962c5d41958
+SIZE (apache-pulsar-client-cpp-v4.1.0_GH0.tar.gz) = 1635369
diff --git a/net-p2p/pulsar-client-cpp/files/patch-cc30a7b5f52c6d6d7ff94d93a9509fc3f6becc83 b/net-p2p/pulsar-client-cpp/files/patch-cc30a7b5f52c6d6d7ff94d93a9509fc3f6becc83
deleted file mode 100644
index ca6cb6a02135..000000000000
--- a/net-p2p/pulsar-client-cpp/files/patch-cc30a7b5f52c6d6d7ff94d93a9509fc3f6becc83
+++ /dev/null
@@ -1,1111 +0,0 @@
-- backport of https://github.com/apache/pulsar-client-cpp/pull/477 unbreaking for boost 1.87+
-
-diff --git CMakeLists.txt CMakeLists.txt
-index b0046534..2efeec89 100644
---- CMakeLists.txt
-+++ CMakeLists.txt
-@@ -19,15 +19,16 @@
- 
- cmake_minimum_required(VERSION 3.13)
- 
--option(USE_ASIO "Use Asio instead of Boost.Asio" OFF)
--
- option(INTEGRATE_VCPKG "Integrate with Vcpkg" OFF)
- if (INTEGRATE_VCPKG)
--    set(USE_ASIO ON)
-+    option(USE_ASIO "Use Asio instead of Boost.Asio" ON)
-     if (NOT CMAKE_TOOLCHAIN_FILE)
-         set(CMAKE_TOOLCHAIN_FILE "${CMAKE_SOURCE_DIR}/vcpkg/scripts/buildsystems/vcpkg.cmake")
-     endif ()
-+else ()
-+    option(USE_ASIO "Use Asio instead of Boost.Asio" OFF)
- endif ()
-+message(STATUS "USE_ASIO: ${USE_ASIO}")
- 
- option(BUILD_TESTS "Build tests" ON)
- message(STATUS "BUILD_TESTS:  " ${BUILD_TESTS})
-diff --git lib/AckGroupingTrackerEnabled.cc lib/AckGroupingTrackerEnabled.cc
-index 7233b2c9..bc8da970 100644
---- lib/AckGroupingTrackerEnabled.cc
-+++ lib/AckGroupingTrackerEnabled.cc
-@@ -117,8 +117,7 @@ void AckGroupingTrackerEnabled::close() {
-     this->flush();
-     std::lock_guard<std::mutex> lock(this->mutexTimer_);
-     if (this->timer_) {
--        ASIO_ERROR ec;
--        this->timer_->cancel(ec);
-+        this->timer_->cancel();
-     }
- }
- 
-@@ -168,7 +167,7 @@ void AckGroupingTrackerEnabled::scheduleTimer() {
- 
-     std::lock_guard<std::mutex> lock(this->mutexTimer_);
-     this->timer_ = this->executor_->createDeadlineTimer();
--    this->timer_->expires_from_now(std::chrono::milliseconds(std::max(1L, this->ackGroupingTimeMs_)));
-+    this->timer_->expires_after(std::chrono::milliseconds(std::max(1L, this->ackGroupingTimeMs_)));
-     auto self = shared_from_this();
-     this->timer_->async_wait([this, self](const ASIO_ERROR& ec) -> void {
-         if (!ec) {
-diff --git lib/ClientConnection.cc lib/ClientConnection.cc
-index 2037722f..de226a85 100644
---- lib/ClientConnection.cc
-+++ lib/ClientConnection.cc
-@@ -266,7 +266,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
-         if (!clientConfiguration.isTlsAllowInsecureConnection() && clientConfiguration.isValidateHostName()) {
-             LOG_DEBUG("Validating hostname for " << serviceUrl.host() << ":" << serviceUrl.port());
-             std::string urlHost = isSniProxy_ ? proxyUrl.host() : serviceUrl.host();
--            tlsSocket_->set_verify_callback(ASIO::ssl::rfc2818_verification(urlHost));
-+            tlsSocket_->set_verify_callback(ASIO::ssl::host_name_verification(urlHost));
-         }
- 
-         LOG_DEBUG("TLS SNI Host: " << serviceUrl.host());
-@@ -309,7 +309,7 @@ void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdC
-         // Only send keep-alive probes if the broker supports it
-         keepAliveTimer_ = executor_->createDeadlineTimer();
-         if (keepAliveTimer_) {
--            keepAliveTimer_->expires_from_now(std::chrono::seconds(keepAliveIntervalInSeconds_));
-+            keepAliveTimer_->expires_after(std::chrono::seconds(keepAliveIntervalInSeconds_));
-             auto weakSelf = weak_from_this();
-             keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) {
-                 auto self = weakSelf.lock();
-@@ -354,7 +354,7 @@ void ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> consumerSta
-     // If the close operation has reset the consumerStatsRequestTimer_ then the use_count will be zero
-     // Check if we have a timer still before we set the request timer to pop again.
-     if (consumerStatsRequestTimer_) {
--        consumerStatsRequestTimer_->expires_from_now(operationsTimeout_);
-+        consumerStatsRequestTimer_->expires_after(operationsTimeout_);
-         auto weakSelf = weak_from_this();
-         consumerStatsRequestTimer_->async_wait([weakSelf, consumerStatsRequests](const ASIO_ERROR& err) {
-             auto self = weakSelf.lock();
-@@ -388,129 +388,87 @@ typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPALIVE> tcp_kee
- typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPIDLE> tcp_keep_alive_idle;
- #endif
- 
--/*
-- *  TCP Connect handler
-- *
-- *  if async_connect without any error, connected_ would be set to true
-- *  at this point the connection is deemed valid to be used by clients of this class
-- */
--void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver::iterator endpointIterator) {
--    if (!err) {
--        std::stringstream cnxStringStream;
--        try {
--            cnxStringStream << "[" << socket_->local_endpoint() << " -> " << socket_->remote_endpoint()
--                            << "] ";
--            cnxString_ = cnxStringStream.str();
--        } catch (const ASIO_SYSTEM_ERROR& e) {
--            LOG_ERROR("Failed to get endpoint: " << e.what());
--            close(ResultRetryable);
--            return;
--        }
--        if (logicalAddress_ == physicalAddress_) {
--            LOG_INFO(cnxString_ << "Connected to broker");
--        } else {
--            LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_
--                                << ", proxy: " << proxyServiceUrl_
--                                << ", physical address:" << physicalAddress_);
--        }
-+void ClientConnection::completeConnect(ASIO::ip::tcp::endpoint endpoint) {
-+    std::stringstream cnxStringStream;
-+    try {
-+        cnxStringStream << "[" << socket_->local_endpoint() << " -> " << socket_->remote_endpoint() << "] ";
-+        cnxString_ = cnxStringStream.str();
-+    } catch (const ASIO_SYSTEM_ERROR& e) {
-+        LOG_ERROR("Failed to get endpoint: " << e.what());
-+        close(ResultRetryable);
-+        return;
-+    }
-+    if (logicalAddress_ == physicalAddress_) {
-+        LOG_INFO(cnxString_ << "Connected to broker");
-+    } else {
-+        LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_
-+                            << ", proxy: " << proxyServiceUrl_ << ", physical address:" << physicalAddress_);
-+    }
- 
--        Lock lock(mutex_);
--        if (isClosed()) {
--            LOG_INFO(cnxString_ << "Connection already closed");
--            return;
--        }
--        state_ = TcpConnected;
--        lock.unlock();
-+    Lock lock(mutex_);
-+    if (isClosed()) {
-+        LOG_INFO(cnxString_ << "Connection already closed");
-+        return;
-+    }
-+    state_ = TcpConnected;
-+    lock.unlock();
- 
--        ASIO_ERROR error;
--        socket_->set_option(tcp::no_delay(true), error);
--        if (error) {
--            LOG_WARN(cnxString_ << "Socket failed to set tcp::no_delay: " << error.message());
--        }
-+    ASIO_ERROR error;
-+    socket_->set_option(tcp::no_delay(true), error);
-+    if (error) {
-+        LOG_WARN(cnxString_ << "Socket failed to set tcp::no_delay: " << error.message());
-+    }
- 
--        socket_->set_option(tcp::socket::keep_alive(true), error);
--        if (error) {
--            LOG_WARN(cnxString_ << "Socket failed to set tcp::socket::keep_alive: " << error.message());
--        }
-+    socket_->set_option(tcp::socket::keep_alive(true), error);
-+    if (error) {
-+        LOG_WARN(cnxString_ << "Socket failed to set tcp::socket::keep_alive: " << error.message());
-+    }
- 
--        // Start TCP keep-alive probes after connection has been idle after 1 minute. Ideally this
--        // should never happen, given that we're sending our own keep-alive probes (within the TCP
--        // connection) every 30 seconds
--        socket_->set_option(tcp_keep_alive_idle(1 * 60), error);
--        if (error) {
--            LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_idle: " << error.message());
--        }
-+    // Start TCP keep-alive probes after connection has been idle after 1 minute. Ideally this
-+    // should never happen, given that we're sending our own keep-alive probes (within the TCP
-+    // connection) every 30 seconds
-+    socket_->set_option(tcp_keep_alive_idle(1 * 60), error);
-+    if (error) {
-+        LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_idle: " << error.message());
-+    }
- 
--        // Send up to 10 probes before declaring the connection broken
--        socket_->set_option(tcp_keep_alive_count(10), error);
--        if (error) {
--            LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_count: " << error.message());
--        }
-+    // Send up to 10 probes before declaring the connection broken
-+    socket_->set_option(tcp_keep_alive_count(10), error);
-+    if (error) {
-+        LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_count: " << error.message());
-+    }
- 
--        // Interval between probes: 6 seconds
--        socket_->set_option(tcp_keep_alive_interval(6), error);
--        if (error) {
--            LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_interval: " << error.message());
--        }
-+    // Interval between probes: 6 seconds
-+    socket_->set_option(tcp_keep_alive_interval(6), error);
-+    if (error) {
-+        LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_interval: " << error.message());
-+    }
- 
--        if (tlsSocket_) {
--            if (!isTlsAllowInsecureConnection_) {
--                ASIO_ERROR err;
--                Url service_url;
--                if (!Url::parse(physicalAddress_, service_url)) {
--                    LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message());
--                    close();
--                    return;
--                }
--            }
--            auto weakSelf = weak_from_this();
--            auto socket = socket_;
--            auto tlsSocket = tlsSocket_;
--            // socket and ssl::stream objects must exist until async_handshake is done, otherwise segmentation
--            // fault might happen
--            auto callback = [weakSelf, socket, tlsSocket](const ASIO_ERROR& err) {
--                auto self = weakSelf.lock();
--                if (self) {
--                    self->handleHandshake(err);
--                }
--            };
--            tlsSocket_->async_handshake(ASIO::ssl::stream<tcp::socket>::client,
--                                        ASIO::bind_executor(strand_, callback));
--        } else {
--            handleHandshake(ASIO_SUCCESS);
--        }
--    } else if (endpointIterator != tcp::resolver::iterator()) {
--        LOG_WARN(cnxString_ << "Failed to establish connection: " << err.message());
--        // The connection failed. Try the next endpoint in the list.
--        ASIO_ERROR closeError;
--        socket_->close(closeError);  // ignore the error of close
--        if (closeError) {
--            LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
--        }
--        connectTimeoutTask_->stop();
--        ++endpointIterator;
--        if (endpointIterator != tcp::resolver::iterator()) {
--            LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "...");
--            connectTimeoutTask_->start();
--            tcp::endpoint endpoint = *endpointIterator;
--            auto weakSelf = weak_from_this();
--            socket_->async_connect(endpoint, [weakSelf, endpointIterator](const ASIO_ERROR& err) {
--                auto self = weakSelf.lock();
--                if (self) {
--                    self->handleTcpConnected(err, endpointIterator);
--                }
--            });
--        } else {
--            if (err == ASIO::error::operation_aborted) {
--                // TCP connect timeout, which is not retryable
-+    if (tlsSocket_) {
-+        if (!isTlsAllowInsecureConnection_) {
-+            ASIO_ERROR err;
-+            Url service_url;
-+            if (!Url::parse(physicalAddress_, service_url)) {
-+                LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message());
-                 close();
--            } else {
--                close(ResultRetryable);
-+                return;
-             }
-         }
-+        auto weakSelf = weak_from_this();
-+        auto socket = socket_;
-+        auto tlsSocket = tlsSocket_;
-+        // socket and ssl::stream objects must exist until async_handshake is done, otherwise segmentation
-+        // fault might happen
-+        auto callback = [weakSelf, socket, tlsSocket](const ASIO_ERROR& err) {
-+            auto self = weakSelf.lock();
-+            if (self) {
-+                self->handleHandshake(err);
-+            }
-+        };
-+        tlsSocket_->async_handshake(ASIO::ssl::stream<tcp::socket>::client,
-+                                    ASIO::bind_executor(strand_, callback));
-     } else {
--        LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
--        close(ResultRetryable);
-+        handleHandshake(ASIO_SUCCESS);
-     }
- }
- 
-@@ -603,60 +561,71 @@ void ClientConnection::tcpConnectAsync() {
-     }
- 
-     LOG_DEBUG(cnxString_ << "Resolving " << service_url.host() << ":" << service_url.port());
--    tcp::resolver::query query(service_url.host(), std::to_string(service_url.port()));
-+    tcp::resolver::endpoint_type endpoint(ASIO::ip::make_address(service_url.host()), service_url.port());
-     auto weakSelf = weak_from_this();
--    resolver_->async_resolve(query, [weakSelf](const ASIO_ERROR& err, tcp::resolver::iterator iterator) {
--        auto self = weakSelf.lock();
--        if (self) {
--            self->handleResolve(err, iterator);
--        }
--    });
-+    resolver_->async_resolve(
-+        endpoint, [this, weakSelf](const ASIO_ERROR& err, tcp::resolver::results_type results) {
-+            auto self = weakSelf.lock();
-+            if (!self) {
-+                return;
-+            }
-+            if (err) {
-+                std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_;
-+                LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << err.message());
-+                close();
-+                return;
-+            }
-+            if (results.empty()) {
-+                LOG_ERROR(cnxString_ << "No IP address found");
-+                close();
-+                return;
-+            }
-+            connectTimeoutTask_->setCallback([weakSelf](const PeriodicTask::ErrorCode& ec) {
-+                ClientConnectionPtr ptr = weakSelf.lock();
-+                if (!ptr) {
-+                    // Connection was already destroyed
-+                    return;
-+                }
-+
-+                if (ptr->state_ != Ready) {
-+                    LOG_ERROR(ptr->cnxString_ << "Connection was not established in "
-+                                              << ptr->connectTimeoutTask_->getPeriodMs()
-+                                              << " ms, close the socket");
-+                    PeriodicTask::ErrorCode err;
-+                    ptr->socket_->close(err);
-+                    if (err) {
-+                        LOG_WARN(ptr->cnxString_ << "Failed to close socket: " << err.message());
-+                    }
-+                }
-+                ptr->connectTimeoutTask_->stop();
-+            });
-+            connectTimeoutTask_->start();
-+            std::vector<tcp::resolver::endpoint_type> endpoints;
-+            for (const auto& result : results) {
-+                endpoints.emplace_back(result.endpoint());
-+            }
-+            asyncConnect(endpoints, 0);
-+        });
- }
- 
--void ClientConnection::handleResolve(const ASIO_ERROR& err, tcp::resolver::iterator endpointIterator) {
--    if (err) {
--        std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_;
--        LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << err.message());
--        close();
-+void ClientConnection::asyncConnect(const std::vector<ASIO::ip::tcp::endpoint>& endpoints, size_t index) {
-+    if (index >= endpoints.size()) {
-+        close(ResultRetryable);
-         return;
-     }
--
-     auto weakSelf = weak_from_this();
--    connectTimeoutTask_->setCallback([weakSelf](const PeriodicTask::ErrorCode& ec) {
--        ClientConnectionPtr ptr = weakSelf.lock();
--        if (!ptr) {
--            // Connection was already destroyed
-+    socket_->async_connect(endpoints[index], [this, weakSelf, endpoints, index](const ASIO_ERROR& err) {
-+        auto self = weakSelf.lock();
-+        if (!self) {
-             return;
-         }
--
--        if (ptr->state_ != Ready) {
--            LOG_ERROR(ptr->cnxString_ << "Connection was not established in "
--                                      << ptr->connectTimeoutTask_->getPeriodMs() << " ms, close the socket");
--            PeriodicTask::ErrorCode err;
--            ptr->socket_->close(err);
--            if (err) {
--                LOG_WARN(ptr->cnxString_ << "Failed to close socket: " << err.message());
--            }
-+        if (err) {
-+            LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
-+            asyncConnect(endpoints, index + 1);
-+            return;
-         }
--        ptr->connectTimeoutTask_->stop();
-+        completeConnect(endpoints[index]);
-     });
--
--    LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "...");
--    connectTimeoutTask_->start();
--    if (endpointIterator != tcp::resolver::iterator()) {
--        LOG_DEBUG(cnxString_ << "Resolved hostname " << endpointIterator->host_name()  //
--                             << " to " << endpointIterator->endpoint());
--        socket_->async_connect(*endpointIterator, [weakSelf, endpointIterator](const ASIO_ERROR& err) {
--            auto self = weakSelf.lock();
--            if (self) {
--                self->handleTcpConnected(err, endpointIterator);
--            }
--        });
--    } else {
--        LOG_WARN(cnxString_ << "No IP address found");
--        close();
--        return;
--    }
- }
- 
- void ClientConnection::readNextCommand() {
-@@ -1058,7 +1027,7 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, const uint64_t request
-     LookupRequestData requestData;
-     requestData.promise = promise;
-     requestData.timer = executor_->createDeadlineTimer();
--    requestData.timer->expires_from_now(operationsTimeout_);
-+    requestData.timer->expires_after(operationsTimeout_);
-     auto weakSelf = weak_from_this();
-     requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) {
-         auto self = weakSelf.lock();
-@@ -1174,8 +1143,9 @@ void ClientConnection::sendPendingCommands() {
-             PairSharedBuffer buffer =
-                 Commands::newSend(outgoingBuffer_, outgoingCmd, getChecksumType(), *args);
- 
--            // Capture the buffer because asio does not copy the buffer, if the buffer is destroyed before the
--            // callback is called, an invalid buffer range might be passed to the underlying socket send.
-+            // Capture the buffer because asio does not copy the buffer, if the buffer is destroyed before
-+            // the callback is called, an invalid buffer range might be passed to the underlying socket
-+            // send.
-             asyncWrite(buffer, customAllocWriteHandler([this, self, buffer](const ASIO_ERROR& err, size_t) {
-                            handleSendPair(err);
-                        }));
-@@ -1198,7 +1168,7 @@ Future<Result, ResponseData> ClientConnection::sendRequestWithId(SharedBuffer cm
- 
-     PendingRequestData requestData;
-     requestData.timer = executor_->createDeadlineTimer();
--    requestData.timer->expires_from_now(operationsTimeout_);
-+    requestData.timer->expires_after(operationsTimeout_);
-     auto weakSelf = weak_from_this();
-     requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) {
-         auto self = weakSelf.lock();
-@@ -1251,7 +1221,7 @@ void ClientConnection::handleKeepAliveTimeout() {
-         // be zero And we do not attempt to dereference the pointer.
-         Lock lock(mutex_);
-         if (keepAliveTimer_) {
--            keepAliveTimer_->expires_from_now(std::chrono::seconds(keepAliveIntervalInSeconds_));
-+            keepAliveTimer_->expires_after(std::chrono::seconds(keepAliveIntervalInSeconds_));
-             auto weakSelf = weak_from_this();
-             keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) {
-                 auto self = weakSelf.lock();
-@@ -1430,7 +1400,7 @@ Future<Result, GetLastMessageIdResponse> ClientConnection::newGetLastMessageId(u
-     LastMessageIdRequestData requestData;
-     requestData.promise = promise;
-     requestData.timer = executor_->createDeadlineTimer();
--    requestData.timer->expires_from_now(operationsTimeout_);
-+    requestData.timer->expires_after(operationsTimeout_);
-     auto weakSelf = weak_from_this();
-     requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) {
-         auto self = weakSelf.lock();
-@@ -1478,7 +1448,7 @@ Future<Result, SchemaInfo> ClientConnection::newGetSchema(const std::string& top
-     lock.unlock();
- 
-     auto weakSelf = weak_from_this();
--    timer->expires_from_now(operationsTimeout_);
-+    timer->expires_after(operationsTimeout_);
-     timer->async_wait([this, weakSelf, requestId](const ASIO_ERROR& ec) {
-         auto self = weakSelf.lock();
-         if (!self) {
-@@ -2047,8 +2017,7 @@ void ClientConnection::unsafeRemovePendingRequest(long requestId) {
-     auto it = pendingRequests_.find(requestId);
-     if (it != pendingRequests_.end()) {
-         it->second.promise.setFailed(ResultDisconnected);
--        ASIO_ERROR ec;
--        it->second.timer->cancel(ec);
-+        it->second.timer->cancel();
-         pendingRequests_.erase(it);
-     }
- }
-diff --git lib/ClientConnection.h lib/ClientConnection.h
-index 7646f85e..14e07652 100644
---- lib/ClientConnection.h
-+++ lib/ClientConnection.h
-@@ -25,13 +25,13 @@
- #include <atomic>
- #ifdef USE_ASIO
- #include <asio/bind_executor.hpp>
--#include <asio/io_service.hpp>
-+#include <asio/io_context.hpp>
- #include <asio/ip/tcp.hpp>
- #include <asio/ssl/stream.hpp>
- #include <asio/strand.hpp>
- #else
- #include <boost/asio/bind_executor.hpp>
--#include <boost/asio/io_service.hpp>
-+#include <boost/asio/io_context.hpp>
- #include <boost/asio/ip/tcp.hpp>
- #include <boost/asio/ssl/stream.hpp>
- #include <boost/asio/strand.hpp>
-@@ -231,13 +231,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
-         DeadlineTimerPtr timer;
-     };
- 
--    /*
--     * handler for connectAsync
--     * creates a ConnectionPtr which has a valid ClientConnection object
--     * although not usable at this point, since this is just tcp connection
--     * Pulsar - Connect/Connected has yet to happen
--     */
--    void handleTcpConnected(const ASIO_ERROR& err, ASIO::ip::tcp::resolver::iterator endpointIterator);
-+    void asyncConnect(const std::vector<ASIO::ip::tcp::endpoint>& endpoints, size_t index);
-+    void completeConnect(ASIO::ip::tcp::endpoint endpoint);
- 
-     void handleHandshake(const ASIO_ERROR& err);
- 
-@@ -260,8 +255,6 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
- 
-     void handlePulsarConnected(const proto::CommandConnected& cmdConnected);
- 
--    void handleResolve(const ASIO_ERROR& err, ASIO::ip::tcp::resolver::iterator endpointIterator);
--
-     void handleSend(const ASIO_ERROR& err, const SharedBuffer& cmd);
-     void handleSendPair(const ASIO_ERROR& err);
-     void sendPendingCommands();
-@@ -324,7 +317,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
-      */
-     SocketPtr socket_;
-     TlsSocketPtr tlsSocket_;
--    ASIO::strand<ASIO::io_service::executor_type> strand_;
-+    ASIO::strand<ASIO::io_context::executor_type> strand_;
- 
-     const std::string logicalAddress_;
-     /*
-diff --git lib/ConsumerImpl.cc lib/ConsumerImpl.cc
-index 250845b3..cfdb0b2d 100644
---- lib/ConsumerImpl.cc
-+++ lib/ConsumerImpl.cc
-@@ -422,7 +422,7 @@ void ConsumerImpl::discardChunkMessages(std::string uuid, MessageId messageId, b
- }
- 
- void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
--    checkExpiredChunkedTimer_->expires_from_now(milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
-+    checkExpiredChunkedTimer_->expires_after(milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
-     std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
-     checkExpiredChunkedTimer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) -> void {
-         auto self = weakSelf.lock();
-@@ -1668,7 +1668,7 @@ void ConsumerImpl::internalGetLastMessageIdAsync(const BackoffPtr& backoff, Time
-         }
-         remainTime -= next;
- 
--        timer->expires_from_now(next);
-+        timer->expires_after(next);
- 
-         auto self = shared_from_this();
-         timer->async_wait([this, backoff, remainTime, timer, next, callback,
-@@ -1791,9 +1791,8 @@ std::shared_ptr<ConsumerImpl> ConsumerImpl::get_shared_this_ptr() {
- }
- 
- void ConsumerImpl::cancelTimers() noexcept {
--    ASIO_ERROR ec;
--    batchReceiveTimer_->cancel(ec);
--    checkExpiredChunkedTimer_->cancel(ec);
-+    batchReceiveTimer_->cancel();
-+    checkExpiredChunkedTimer_->cancel();
-     unAckedMessageTrackerPtr_->stop();
-     consumerStatsBasePtr_->stop();
- }
-diff --git lib/ConsumerImplBase.cc lib/ConsumerImplBase.cc
-index 098f2d5b..76d99370 100644
---- lib/ConsumerImplBase.cc
-+++ lib/ConsumerImplBase.cc
-@@ -51,7 +51,7 @@ ConsumerImplBase::ConsumerImplBase(ClientImplPtr client, const std::string& topi
- 
- void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) {
-     if (timeoutMs > 0) {
--        batchReceiveTimer_->expires_from_now(std::chrono::milliseconds(timeoutMs));
-+        batchReceiveTimer_->expires_after(std::chrono::milliseconds(timeoutMs));
-         std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
-         batchReceiveTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
-             auto self = weakSelf.lock();
-diff --git lib/ExecutorService.cc lib/ExecutorService.cc
-index 794e3619..7f2a2c14 100644
---- lib/ExecutorService.cc
-+++ lib/ExecutorService.cc
-@@ -18,6 +18,12 @@
-  */
- #include "ExecutorService.h"
- 
-+#ifdef USE_ASIO
-+#include <asio/post.hpp>
-+#else
-+#include <boost/asio/post.hpp>
-+#endif
-+
- #include "LogUtils.h"
- #include "TimeUtils.h"
- DECLARE_LOG_OBJECT()
-@@ -31,18 +37,13 @@ ExecutorService::~ExecutorService() { close(0); }
- void ExecutorService::start() {
-     auto self = shared_from_this();
-     std::thread t{[this, self] {
--        LOG_DEBUG("Run io_service in a single thread");
--        ASIO_ERROR ec;
-+        LOG_DEBUG("Run io_context in a single thread");
-         while (!closed_) {
--            io_service_.restart();
--            IOService::work work{getIOService()};
--            io_service_.run(ec);
--        }
--        if (ec) {
--            LOG_ERROR("Failed to run io_service: " << ec.message());
--        } else {
--            LOG_DEBUG("Event loop of ExecutorService exits successfully");
-+            io_context_.restart();
-+            auto work{ASIO::make_work_guard(io_context_)};
-+            io_context_.run();
-         }
-+        LOG_DEBUG("Event loop of ExecutorService exits successfully");
-         {
-             std::lock_guard<std::mutex> lock{mutex_};
-             ioServiceDone_ = true;
-@@ -63,12 +64,12 @@ ExecutorServicePtr ExecutorService::create() {
- }
- 
- /*
-- *  factory method of ASIO::ip::tcp::socket associated with io_service_ instance
-+ *  factory method of ASIO::ip::tcp::socket associated with io_context_ instance
-  *  @ returns shared_ptr to this socket
-  */
- SocketPtr ExecutorService::createSocket() {
-     try {
--        return SocketPtr(new ASIO::ip::tcp::socket(io_service_));
-+        return SocketPtr(new ASIO::ip::tcp::socket(io_context_));
-     } catch (const ASIO_SYSTEM_ERROR &e) {
-         restart();
-         auto error = std::string("Failed to create socket: ") + e.what();
-@@ -82,12 +83,12 @@ TlsSocketPtr ExecutorService::createTlsSocket(SocketPtr &socket, ASIO::ssl::cont
- }
- 
- /*
-- *  factory method of Resolver object associated with io_service_ instance
-+ *  factory method of Resolver object associated with io_context_ instance
-  *  @returns shraed_ptr to resolver object
-  */
- TcpResolverPtr ExecutorService::createTcpResolver() {
-     try {
--        return TcpResolverPtr(new ASIO::ip::tcp::resolver(io_service_));
-+        return TcpResolverPtr(new ASIO::ip::tcp::resolver(io_context_));
-     } catch (const ASIO_SYSTEM_ERROR &e) {
-         restart();
-         auto error = std::string("Failed to create resolver: ") + e.what();
-@@ -97,7 +98,7 @@ TcpResolverPtr ExecutorService::createTcpResolver() {
- 
- DeadlineTimerPtr ExecutorService::createDeadlineTimer() {
-     try {
--        return DeadlineTimerPtr(new ASIO::steady_timer(io_service_));
-+        return DeadlineTimerPtr(new ASIO::steady_timer(io_context_));
-     } catch (const ASIO_SYSTEM_ERROR &e) {
-         restart();
-         auto error = std::string("Failed to create steady_timer: ") + e.what();
-@@ -105,7 +106,7 @@ DeadlineTimerPtr ExecutorService::createDeadlineTimer() {
-     }
- }
- 
--void ExecutorService::restart() { io_service_.stop(); }
-+void ExecutorService::restart() { io_context_.stop(); }
- 
- void ExecutorService::close(long timeoutMs) {
-     bool expectedState = false;
-@@ -113,12 +114,12 @@ void ExecutorService::close(long timeoutMs) {
-         return;
-     }
-     if (timeoutMs == 0) {  // non-blocking
--        io_service_.stop();
-+        io_context_.stop();
-         return;
-     }
- 
-     std::unique_lock<std::mutex> lock{mutex_};
--    io_service_.stop();
-+    io_context_.stop();
-     if (timeoutMs > 0) {
-         cond_.wait_for(lock, std::chrono::milliseconds(timeoutMs), [this] { return ioServiceDone_; });
-     } else {  // < 0
-@@ -126,7 +127,7 @@ void ExecutorService::close(long timeoutMs) {
-     }
- }
- 
--void ExecutorService::postWork(std::function<void(void)> task) { io_service_.post(task); }
-+void ExecutorService::postWork(std::function<void(void)> task) { ASIO::post(io_context_, task); }
- 
- /////////////////////
- 
-diff --git lib/ExecutorService.h lib/ExecutorService.h
-index 89d06d30..626cb203 100644
---- lib/ExecutorService.h
-+++ lib/ExecutorService.h
-@@ -23,11 +23,11 @@
- 
- #include <atomic>
- #ifdef USE_ASIO
--#include <asio/io_service.hpp>
-+#include <asio/io_context.hpp>
- #include <asio/ip/tcp.hpp>
- #include <asio/ssl.hpp>
- #else
--#include <boost/asio/io_service.hpp>
-+#include <boost/asio/io_context.hpp>
- #include <boost/asio/ip/tcp.hpp>
- #include <boost/asio/ssl.hpp>
- #endif
-@@ -46,7 +46,7 @@ typedef std::shared_ptr<ASIO::ssl::stream<ASIO::ip::tcp::socket &> > TlsSocketPt
- typedef std::shared_ptr<ASIO::ip::tcp::resolver> TcpResolverPtr;
- class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_this<ExecutorService> {
-    public:
--    using IOService = ASIO::io_service;
-+    using IOService = ASIO::io_context;
-     using SharedPtr = std::shared_ptr<ExecutorService>;
- 
-     static SharedPtr create();
-@@ -67,14 +67,14 @@ class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_this<Execut
-     // See TimeoutProcessor for the semantics of the parameter.
-     void close(long timeoutMs = 3000);
- 
--    IOService &getIOService() { return io_service_; }
-+    IOService &getIOService() { return io_context_; }
-     bool isClosed() const noexcept { return closed_; }
- 
-    private:
-     /*
--     * io_service is our interface to os, io object schedule async ops on this object
-+     * io_context is our interface to os, io object schedule async ops on this object
-      */
--    IOService io_service_;
-+    IOService io_context_;
- 
-     std::atomic_bool closed_{false};
-     std::mutex mutex_;
-diff --git lib/HandlerBase.cc lib/HandlerBase.cc
-index 65aa0db1..71902481 100644
---- lib/HandlerBase.cc
-+++ lib/HandlerBase.cc
-@@ -50,9 +50,8 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic,
-       redirectedClusterURI_("") {}
- 
- HandlerBase::~HandlerBase() {
--    ASIO_ERROR ignored;
--    timer_->cancel(ignored);
--    creationTimer_->cancel(ignored);
-+    timer_->cancel();
-+    creationTimer_->cancel();
- }
- 
- void HandlerBase::start() {
-@@ -61,15 +60,14 @@ void HandlerBase::start() {
-     if (state_.compare_exchange_strong(state, Pending)) {
-         grabCnx();
-     }
--    creationTimer_->expires_from_now(operationTimeut_);
-+    creationTimer_->expires_after(operationTimeut_);
-     std::weak_ptr<HandlerBase> weakSelf{shared_from_this()};
-     creationTimer_->async_wait([this, weakSelf](const ASIO_ERROR& error) {
-         auto self = weakSelf.lock();
-         if (self && !error) {
-             LOG_WARN("Cancel the pending reconnection due to the start timeout");
-             connectionFailed(ResultTimeout);
--            ASIO_ERROR ignored;
--            timer_->cancel(ignored);
-+            timer_->cancel();
-         }
-     });
- }
-@@ -133,8 +131,7 @@ void HandlerBase::grabCnx(const boost::optional<std::string>& assignedBrokerUrl)
-                     connectionTimeMs_ =
-                         duration_cast<milliseconds>(high_resolution_clock::now() - before).count();
-                     // Prevent the creationTimer_ from cancelling the timer_ in future
--                    ASIO_ERROR ignored;
--                    creationTimer_->cancel(ignored);
-+                    creationTimer_->cancel();
-                     LOG_INFO("Finished connecting to broker after " << connectionTimeMs_ << " ms")
-                 } else if (isResultRetryable(result)) {
-                     scheduleReconnection();
-@@ -188,7 +185,7 @@ void HandlerBase::scheduleReconnection(const boost::optional<std::string>& assig
-         TimeDuration delay = assignedBrokerUrl ? std::chrono::milliseconds(0) : backoff_.next();
- 
-         LOG_INFO(getName() << "Schedule reconnection in " << (toMillis(delay) / 1000.0) << " s");
--        timer_->expires_from_now(delay);
-+        timer_->expires_after(delay);
-         // passing shared_ptr here since time_ will get destroyed, so tasks will be cancelled
-         // so we will not run into the case where grabCnx is invoked on out of scope handler
-         auto name = getName();
-diff --git lib/MultiTopicsConsumerImpl.cc lib/MultiTopicsConsumerImpl.cc
-index dddade5c..61fbf7b8 100644
---- lib/MultiTopicsConsumerImpl.cc
-+++ lib/MultiTopicsConsumerImpl.cc
-@@ -962,7 +962,7 @@ uint64_t MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() {
-     return numberOfConnectedConsumer;
- }
- void MultiTopicsConsumerImpl::runPartitionUpdateTask() {
--    partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_);
-+    partitionsUpdateTimer_->expires_after(partitionsUpdateInterval_);
-     auto weakSelf = weak_from_this();
-     partitionsUpdateTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
-         // If two requests call runPartitionUpdateTask at the same time, the timer will fail, and it
-@@ -1115,8 +1115,7 @@ void MultiTopicsConsumerImpl::beforeConnectionChange(ClientConnection& cnx) {
- 
- void MultiTopicsConsumerImpl::cancelTimers() noexcept {
-     if (partitionsUpdateTimer_) {
--        ASIO_ERROR ec;
--        partitionsUpdateTimer_->cancel(ec);
-+        partitionsUpdateTimer_->cancel();
-     }
- }
- 
-diff --git lib/NegativeAcksTracker.cc lib/NegativeAcksTracker.cc
-index e443496d..e50b4ca2 100644
---- lib/NegativeAcksTracker.cc
-+++ lib/NegativeAcksTracker.cc
-@@ -50,7 +50,7 @@ void NegativeAcksTracker::scheduleTimer() {
-         return;
-     }
-     std::weak_ptr<NegativeAcksTracker> weakSelf{shared_from_this()};
--    timer_->expires_from_now(timerInterval_);
-+    timer_->expires_after(timerInterval_);
-     timer_->async_wait([weakSelf](const ASIO_ERROR &ec) {
-         if (auto self = weakSelf.lock()) {
-             self->handleTimer(ec);
-@@ -107,8 +107,7 @@ void NegativeAcksTracker::add(const MessageId &m) {
- 
- void NegativeAcksTracker::close() {
-     closed_ = true;
--    ASIO_ERROR ec;
--    timer_->cancel(ec);
-+    timer_->cancel();
-     std::lock_guard<std::mutex> lock(mutex_);
-     nackedMessages_.clear();
- }
-diff --git lib/PartitionedProducerImpl.cc lib/PartitionedProducerImpl.cc
-index 4178096c..923c038b 100644
---- lib/PartitionedProducerImpl.cc
-+++ lib/PartitionedProducerImpl.cc
-@@ -421,7 +421,7 @@ void PartitionedProducerImpl::flushAsync(FlushCallback callback) {
- 
- void PartitionedProducerImpl::runPartitionUpdateTask() {
-     auto weakSelf = weak_from_this();
--    partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_);
-+    partitionsUpdateTimer_->expires_after(partitionsUpdateInterval_);
-     partitionsUpdateTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
-         auto self = weakSelf.lock();
-         if (self) {
-@@ -524,8 +524,7 @@ uint64_t PartitionedProducerImpl::getNumberOfConnectedProducer() {
- 
- void PartitionedProducerImpl::cancelTimers() noexcept {
-     if (partitionsUpdateTimer_) {
--        ASIO_ERROR ec;
--        partitionsUpdateTimer_->cancel(ec);
-+        partitionsUpdateTimer_->cancel();
-     }
- }
- 
-diff --git lib/PatternMultiTopicsConsumerImpl.cc lib/PatternMultiTopicsConsumerImpl.cc
-index 4fc7bb61..07d9a7bc 100644
---- lib/PatternMultiTopicsConsumerImpl.cc
-+++ lib/PatternMultiTopicsConsumerImpl.cc
-@@ -48,7 +48,7 @@ const PULSAR_REGEX_NAMESPACE::regex PatternMultiTopicsConsumerImpl::getPattern()
- 
- void PatternMultiTopicsConsumerImpl::resetAutoDiscoveryTimer() {
-     autoDiscoveryRunning_ = false;
--    autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod()));
-+    autoDiscoveryTimer_->expires_after(seconds(conf_.getPatternAutoDiscoveryPeriod()));
- 
-     auto weakSelf = weak_from_this();
-     autoDiscoveryTimer_->async_wait([weakSelf](const ASIO_ERROR& err) {
-@@ -228,7 +228,7 @@ void PatternMultiTopicsConsumerImpl::start() {
-     LOG_DEBUG("PatternMultiTopicsConsumerImpl start autoDiscoveryTimer_.");
- 
-     if (conf_.getPatternAutoDiscoveryPeriod() > 0) {
--        autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod()));
-+        autoDiscoveryTimer_->expires_after(seconds(conf_.getPatternAutoDiscoveryPeriod()));
-         auto weakSelf = weak_from_this();
-         autoDiscoveryTimer_->async_wait([weakSelf](const ASIO_ERROR& err) {
-             if (auto self = weakSelf.lock()) {
-@@ -248,7 +248,4 @@ void PatternMultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
-     MultiTopicsConsumerImpl::closeAsync(callback);
- }
- 
--void PatternMultiTopicsConsumerImpl::cancelTimers() noexcept {
--    ASIO_ERROR ec;
--    autoDiscoveryTimer_->cancel(ec);
--}
-+void PatternMultiTopicsConsumerImpl::cancelTimers() noexcept { autoDiscoveryTimer_->cancel(); }
-diff --git lib/PeriodicTask.cc lib/PeriodicTask.cc
-index 9fde012a..4b5f9621 100644
---- lib/PeriodicTask.cc
-+++ lib/PeriodicTask.cc
-@@ -29,7 +29,7 @@ void PeriodicTask::start() {
-     state_ = Ready;
-     if (periodMs_ >= 0) {
-         std::weak_ptr<PeriodicTask> weakSelf{shared_from_this()};
--        timer_->expires_from_now(std::chrono::milliseconds(periodMs_));
-+        timer_->expires_after(std::chrono::milliseconds(periodMs_));
-         timer_->async_wait([weakSelf](const ErrorCode& ec) {
-             auto self = weakSelf.lock();
-             if (self) {
-@@ -44,8 +44,7 @@ void PeriodicTask::stop() noexcept {
-     if (!state_.compare_exchange_strong(state, Closing)) {
-         return;
-     }
--    ErrorCode ec;
--    timer_->cancel(ec);
-+    timer_->cancel();
-     state_ = Pending;
- }
- 
-@@ -59,7 +58,7 @@ void PeriodicTask::handleTimeout(const ErrorCode& ec) {
-     // state_ may be changed in handleTimeout, so we check state_ again
-     if (state_ == Ready) {
-         auto self = shared_from_this();
--        timer_->expires_from_now(std::chrono::milliseconds(periodMs_));
-+        timer_->expires_after(std::chrono::milliseconds(periodMs_));
-         timer_->async_wait([this, self](const ErrorCode& ec) { handleTimeout(ec); });
-     }
- }
-diff --git lib/ProducerImpl.cc lib/ProducerImpl.cc
-index 4399ce5f..8b112bf1 100644
---- lib/ProducerImpl.cc
-+++ lib/ProducerImpl.cc
-@@ -570,7 +570,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& c
-         bool isFirstMessage = batchMessageContainer_->isFirstMessageToAdd(msg);
-         bool isFull = batchMessageContainer_->add(msg, callback);
-         if (isFirstMessage) {
--            batchTimer_->expires_from_now(milliseconds(conf_.getBatchingMaxPublishDelayMs()));
-+            batchTimer_->expires_after(milliseconds(conf_.getBatchingMaxPublishDelayMs()));
-             auto weakSelf = weak_from_this();
-             batchTimer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) {
-                 auto self = weakSelf.lock();
-@@ -1007,9 +1007,8 @@ void ProducerImpl::shutdown() {
- 
- void ProducerImpl::cancelTimers() noexcept {
-     dataKeyRefreshTask_.stop();
--    ASIO_ERROR ec;
--    batchTimer_->cancel(ec);
--    sendTimer_->cancel(ec);
-+    batchTimer_->cancel();
-+    sendTimer_->cancel();
- }
- 
- bool ProducerImplCmp::operator()(const ProducerImplPtr& a, const ProducerImplPtr& b) const {
-@@ -1030,7 +1029,7 @@ void ProducerImpl::startSendTimeoutTimer() {
- }
- 
- void ProducerImpl::asyncWaitSendTimeout(DurationType expiryTime) {
--    sendTimer_->expires_from_now(expiryTime);
-+    sendTimer_->expires_after(expiryTime);
*** 217 LINES SKIPPED ***