git: d43cf0650ae2 - main - net-p2p/pulsar-client-cpp: update 3.7.0 → 4.1.0
- Go to: [ bottom of page ] [ top of archives ] [ this month ]
Date: Fri, 17 Apr 2026 06:50:45 UTC
The branch main has been updated by yuri:
URL: https://cgit.FreeBSD.org/ports/commit/?id=d43cf0650ae2bf87cf08041e31ad9c2d476eca4d
commit d43cf0650ae2bf87cf08041e31ad9c2d476eca4d
Author: Yuri Victorovich <yuri@FreeBSD.org>
AuthorDate: 2026-04-17 06:27:31 +0000
Commit: Yuri Victorovich <yuri@FreeBSD.org>
CommitDate: 2026-04-17 06:50:35 +0000
net-p2p/pulsar-client-cpp: update 3.7.0 → 4.1.0
---
net-p2p/pulsar-client-cpp/Makefile | 3 +-
net-p2p/pulsar-client-cpp/distinfo | 6 +-
.../patch-cc30a7b5f52c6d6d7ff94d93a9509fc3f6becc83 | 1111 --------------------
.../files/patch-lib_AutoClusterFailover.cc | 11 +
net-p2p/pulsar-client-cpp/pkg-plist | 4 +
5 files changed, 19 insertions(+), 1116 deletions(-)
diff --git a/net-p2p/pulsar-client-cpp/Makefile b/net-p2p/pulsar-client-cpp/Makefile
index cbc0de2f52ad..2e8611438f09 100644
--- a/net-p2p/pulsar-client-cpp/Makefile
+++ b/net-p2p/pulsar-client-cpp/Makefile
@@ -1,7 +1,6 @@
PORTNAME= pulsar-client-cpp # this port requires instruction sets crc32, pclmul above the default sse2
DISTVERSIONPREFIX= v
-DISTVERSION= 3.7.0
-PORTREVISION= 3
+DISTVERSION= 4.1.0
CATEGORIES= net-p2p
MAINTAINER= yuri@FreeBSD.org
diff --git a/net-p2p/pulsar-client-cpp/distinfo b/net-p2p/pulsar-client-cpp/distinfo
index 3d7c80b515e0..0648f4df6ce5 100644
--- a/net-p2p/pulsar-client-cpp/distinfo
+++ b/net-p2p/pulsar-client-cpp/distinfo
@@ -1,3 +1,3 @@
-TIMESTAMP = 1743152964
-SHA256 (apache-pulsar-client-cpp-v3.7.0_GH0.tar.gz) = 33d6ea82e1f03a2e77f85d3b6ee8e3ac37bfd760ea450537ec2e59ef122c4671
-SIZE (apache-pulsar-client-cpp-v3.7.0_GH0.tar.gz) = 1604627
+TIMESTAMP = 1776406245
+SHA256 (apache-pulsar-client-cpp-v4.1.0_GH0.tar.gz) = 172c9697caf62551c336e0bf64a136ccdff0674e9ea66f94f3f60962c5d41958
+SIZE (apache-pulsar-client-cpp-v4.1.0_GH0.tar.gz) = 1635369
diff --git a/net-p2p/pulsar-client-cpp/files/patch-cc30a7b5f52c6d6d7ff94d93a9509fc3f6becc83 b/net-p2p/pulsar-client-cpp/files/patch-cc30a7b5f52c6d6d7ff94d93a9509fc3f6becc83
deleted file mode 100644
index ca6cb6a02135..000000000000
--- a/net-p2p/pulsar-client-cpp/files/patch-cc30a7b5f52c6d6d7ff94d93a9509fc3f6becc83
+++ /dev/null
@@ -1,1111 +0,0 @@
-- backport of https://github.com/apache/pulsar-client-cpp/pull/477 unbreaking for boost 1.87+
-
-diff --git CMakeLists.txt CMakeLists.txt
-index b0046534..2efeec89 100644
---- CMakeLists.txt
-+++ CMakeLists.txt
-@@ -19,15 +19,16 @@
-
- cmake_minimum_required(VERSION 3.13)
-
--option(USE_ASIO "Use Asio instead of Boost.Asio" OFF)
--
- option(INTEGRATE_VCPKG "Integrate with Vcpkg" OFF)
- if (INTEGRATE_VCPKG)
-- set(USE_ASIO ON)
-+ option(USE_ASIO "Use Asio instead of Boost.Asio" ON)
- if (NOT CMAKE_TOOLCHAIN_FILE)
- set(CMAKE_TOOLCHAIN_FILE "${CMAKE_SOURCE_DIR}/vcpkg/scripts/buildsystems/vcpkg.cmake")
- endif ()
-+else ()
-+ option(USE_ASIO "Use Asio instead of Boost.Asio" OFF)
- endif ()
-+message(STATUS "USE_ASIO: ${USE_ASIO}")
-
- option(BUILD_TESTS "Build tests" ON)
- message(STATUS "BUILD_TESTS: " ${BUILD_TESTS})
-diff --git lib/AckGroupingTrackerEnabled.cc lib/AckGroupingTrackerEnabled.cc
-index 7233b2c9..bc8da970 100644
---- lib/AckGroupingTrackerEnabled.cc
-+++ lib/AckGroupingTrackerEnabled.cc
-@@ -117,8 +117,7 @@ void AckGroupingTrackerEnabled::close() {
- this->flush();
- std::lock_guard<std::mutex> lock(this->mutexTimer_);
- if (this->timer_) {
-- ASIO_ERROR ec;
-- this->timer_->cancel(ec);
-+ this->timer_->cancel();
- }
- }
-
-@@ -168,7 +167,7 @@ void AckGroupingTrackerEnabled::scheduleTimer() {
-
- std::lock_guard<std::mutex> lock(this->mutexTimer_);
- this->timer_ = this->executor_->createDeadlineTimer();
-- this->timer_->expires_from_now(std::chrono::milliseconds(std::max(1L, this->ackGroupingTimeMs_)));
-+ this->timer_->expires_after(std::chrono::milliseconds(std::max(1L, this->ackGroupingTimeMs_)));
- auto self = shared_from_this();
- this->timer_->async_wait([this, self](const ASIO_ERROR& ec) -> void {
- if (!ec) {
-diff --git lib/ClientConnection.cc lib/ClientConnection.cc
-index 2037722f..de226a85 100644
---- lib/ClientConnection.cc
-+++ lib/ClientConnection.cc
-@@ -266,7 +266,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
- if (!clientConfiguration.isTlsAllowInsecureConnection() && clientConfiguration.isValidateHostName()) {
- LOG_DEBUG("Validating hostname for " << serviceUrl.host() << ":" << serviceUrl.port());
- std::string urlHost = isSniProxy_ ? proxyUrl.host() : serviceUrl.host();
-- tlsSocket_->set_verify_callback(ASIO::ssl::rfc2818_verification(urlHost));
-+ tlsSocket_->set_verify_callback(ASIO::ssl::host_name_verification(urlHost));
- }
-
- LOG_DEBUG("TLS SNI Host: " << serviceUrl.host());
-@@ -309,7 +309,7 @@ void ClientConnection::handlePulsarConnected(const proto::CommandConnected& cmdC
- // Only send keep-alive probes if the broker supports it
- keepAliveTimer_ = executor_->createDeadlineTimer();
- if (keepAliveTimer_) {
-- keepAliveTimer_->expires_from_now(std::chrono::seconds(keepAliveIntervalInSeconds_));
-+ keepAliveTimer_->expires_after(std::chrono::seconds(keepAliveIntervalInSeconds_));
- auto weakSelf = weak_from_this();
- keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) {
- auto self = weakSelf.lock();
-@@ -354,7 +354,7 @@ void ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> consumerSta
- // If the close operation has reset the consumerStatsRequestTimer_ then the use_count will be zero
- // Check if we have a timer still before we set the request timer to pop again.
- if (consumerStatsRequestTimer_) {
-- consumerStatsRequestTimer_->expires_from_now(operationsTimeout_);
-+ consumerStatsRequestTimer_->expires_after(operationsTimeout_);
- auto weakSelf = weak_from_this();
- consumerStatsRequestTimer_->async_wait([weakSelf, consumerStatsRequests](const ASIO_ERROR& err) {
- auto self = weakSelf.lock();
-@@ -388,129 +388,87 @@ typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPALIVE> tcp_kee
- typedef ASIO::detail::socket_option::integer<IPPROTO_TCP, TCP_KEEPIDLE> tcp_keep_alive_idle;
- #endif
-
--/*
-- * TCP Connect handler
-- *
-- * if async_connect without any error, connected_ would be set to true
-- * at this point the connection is deemed valid to be used by clients of this class
-- */
--void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, tcp::resolver::iterator endpointIterator) {
-- if (!err) {
-- std::stringstream cnxStringStream;
-- try {
-- cnxStringStream << "[" << socket_->local_endpoint() << " -> " << socket_->remote_endpoint()
-- << "] ";
-- cnxString_ = cnxStringStream.str();
-- } catch (const ASIO_SYSTEM_ERROR& e) {
-- LOG_ERROR("Failed to get endpoint: " << e.what());
-- close(ResultRetryable);
-- return;
-- }
-- if (logicalAddress_ == physicalAddress_) {
-- LOG_INFO(cnxString_ << "Connected to broker");
-- } else {
-- LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_
-- << ", proxy: " << proxyServiceUrl_
-- << ", physical address:" << physicalAddress_);
-- }
-+void ClientConnection::completeConnect(ASIO::ip::tcp::endpoint endpoint) {
-+ std::stringstream cnxStringStream;
-+ try {
-+ cnxStringStream << "[" << socket_->local_endpoint() << " -> " << socket_->remote_endpoint() << "] ";
-+ cnxString_ = cnxStringStream.str();
-+ } catch (const ASIO_SYSTEM_ERROR& e) {
-+ LOG_ERROR("Failed to get endpoint: " << e.what());
-+ close(ResultRetryable);
-+ return;
-+ }
-+ if (logicalAddress_ == physicalAddress_) {
-+ LOG_INFO(cnxString_ << "Connected to broker");
-+ } else {
-+ LOG_INFO(cnxString_ << "Connected to broker through proxy. Logical broker: " << logicalAddress_
-+ << ", proxy: " << proxyServiceUrl_ << ", physical address:" << physicalAddress_);
-+ }
-
-- Lock lock(mutex_);
-- if (isClosed()) {
-- LOG_INFO(cnxString_ << "Connection already closed");
-- return;
-- }
-- state_ = TcpConnected;
-- lock.unlock();
-+ Lock lock(mutex_);
-+ if (isClosed()) {
-+ LOG_INFO(cnxString_ << "Connection already closed");
-+ return;
-+ }
-+ state_ = TcpConnected;
-+ lock.unlock();
-
-- ASIO_ERROR error;
-- socket_->set_option(tcp::no_delay(true), error);
-- if (error) {
-- LOG_WARN(cnxString_ << "Socket failed to set tcp::no_delay: " << error.message());
-- }
-+ ASIO_ERROR error;
-+ socket_->set_option(tcp::no_delay(true), error);
-+ if (error) {
-+ LOG_WARN(cnxString_ << "Socket failed to set tcp::no_delay: " << error.message());
-+ }
-
-- socket_->set_option(tcp::socket::keep_alive(true), error);
-- if (error) {
-- LOG_WARN(cnxString_ << "Socket failed to set tcp::socket::keep_alive: " << error.message());
-- }
-+ socket_->set_option(tcp::socket::keep_alive(true), error);
-+ if (error) {
-+ LOG_WARN(cnxString_ << "Socket failed to set tcp::socket::keep_alive: " << error.message());
-+ }
-
-- // Start TCP keep-alive probes after connection has been idle after 1 minute. Ideally this
-- // should never happen, given that we're sending our own keep-alive probes (within the TCP
-- // connection) every 30 seconds
-- socket_->set_option(tcp_keep_alive_idle(1 * 60), error);
-- if (error) {
-- LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_idle: " << error.message());
-- }
-+ // Start TCP keep-alive probes after connection has been idle after 1 minute. Ideally this
-+ // should never happen, given that we're sending our own keep-alive probes (within the TCP
-+ // connection) every 30 seconds
-+ socket_->set_option(tcp_keep_alive_idle(1 * 60), error);
-+ if (error) {
-+ LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_idle: " << error.message());
-+ }
-
-- // Send up to 10 probes before declaring the connection broken
-- socket_->set_option(tcp_keep_alive_count(10), error);
-- if (error) {
-- LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_count: " << error.message());
-- }
-+ // Send up to 10 probes before declaring the connection broken
-+ socket_->set_option(tcp_keep_alive_count(10), error);
-+ if (error) {
-+ LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_count: " << error.message());
-+ }
-
-- // Interval between probes: 6 seconds
-- socket_->set_option(tcp_keep_alive_interval(6), error);
-- if (error) {
-- LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_interval: " << error.message());
-- }
-+ // Interval between probes: 6 seconds
-+ socket_->set_option(tcp_keep_alive_interval(6), error);
-+ if (error) {
-+ LOG_DEBUG(cnxString_ << "Socket failed to set tcp_keep_alive_interval: " << error.message());
-+ }
-
-- if (tlsSocket_) {
-- if (!isTlsAllowInsecureConnection_) {
-- ASIO_ERROR err;
-- Url service_url;
-- if (!Url::parse(physicalAddress_, service_url)) {
-- LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message());
-- close();
-- return;
-- }
-- }
-- auto weakSelf = weak_from_this();
-- auto socket = socket_;
-- auto tlsSocket = tlsSocket_;
-- // socket and ssl::stream objects must exist until async_handshake is done, otherwise segmentation
-- // fault might happen
-- auto callback = [weakSelf, socket, tlsSocket](const ASIO_ERROR& err) {
-- auto self = weakSelf.lock();
-- if (self) {
-- self->handleHandshake(err);
-- }
-- };
-- tlsSocket_->async_handshake(ASIO::ssl::stream<tcp::socket>::client,
-- ASIO::bind_executor(strand_, callback));
-- } else {
-- handleHandshake(ASIO_SUCCESS);
-- }
-- } else if (endpointIterator != tcp::resolver::iterator()) {
-- LOG_WARN(cnxString_ << "Failed to establish connection: " << err.message());
-- // The connection failed. Try the next endpoint in the list.
-- ASIO_ERROR closeError;
-- socket_->close(closeError); // ignore the error of close
-- if (closeError) {
-- LOG_WARN(cnxString_ << "Failed to close socket: " << err.message());
-- }
-- connectTimeoutTask_->stop();
-- ++endpointIterator;
-- if (endpointIterator != tcp::resolver::iterator()) {
-- LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "...");
-- connectTimeoutTask_->start();
-- tcp::endpoint endpoint = *endpointIterator;
-- auto weakSelf = weak_from_this();
-- socket_->async_connect(endpoint, [weakSelf, endpointIterator](const ASIO_ERROR& err) {
-- auto self = weakSelf.lock();
-- if (self) {
-- self->handleTcpConnected(err, endpointIterator);
-- }
-- });
-- } else {
-- if (err == ASIO::error::operation_aborted) {
-- // TCP connect timeout, which is not retryable
-+ if (tlsSocket_) {
-+ if (!isTlsAllowInsecureConnection_) {
-+ ASIO_ERROR err;
-+ Url service_url;
-+ if (!Url::parse(physicalAddress_, service_url)) {
-+ LOG_ERROR(cnxString_ << "Invalid Url, unable to parse: " << err << " " << err.message());
- close();
-- } else {
-- close(ResultRetryable);
-+ return;
- }
- }
-+ auto weakSelf = weak_from_this();
-+ auto socket = socket_;
-+ auto tlsSocket = tlsSocket_;
-+ // socket and ssl::stream objects must exist until async_handshake is done, otherwise segmentation
-+ // fault might happen
-+ auto callback = [weakSelf, socket, tlsSocket](const ASIO_ERROR& err) {
-+ auto self = weakSelf.lock();
-+ if (self) {
-+ self->handleHandshake(err);
-+ }
-+ };
-+ tlsSocket_->async_handshake(ASIO::ssl::stream<tcp::socket>::client,
-+ ASIO::bind_executor(strand_, callback));
- } else {
-- LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
-- close(ResultRetryable);
-+ handleHandshake(ASIO_SUCCESS);
- }
- }
-
-@@ -603,60 +561,71 @@ void ClientConnection::tcpConnectAsync() {
- }
-
- LOG_DEBUG(cnxString_ << "Resolving " << service_url.host() << ":" << service_url.port());
-- tcp::resolver::query query(service_url.host(), std::to_string(service_url.port()));
-+ tcp::resolver::endpoint_type endpoint(ASIO::ip::make_address(service_url.host()), service_url.port());
- auto weakSelf = weak_from_this();
-- resolver_->async_resolve(query, [weakSelf](const ASIO_ERROR& err, tcp::resolver::iterator iterator) {
-- auto self = weakSelf.lock();
-- if (self) {
-- self->handleResolve(err, iterator);
-- }
-- });
-+ resolver_->async_resolve(
-+ endpoint, [this, weakSelf](const ASIO_ERROR& err, tcp::resolver::results_type results) {
-+ auto self = weakSelf.lock();
-+ if (!self) {
-+ return;
-+ }
-+ if (err) {
-+ std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_;
-+ LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << err.message());
-+ close();
-+ return;
-+ }
-+ if (results.empty()) {
-+ LOG_ERROR(cnxString_ << "No IP address found");
-+ close();
-+ return;
-+ }
-+ connectTimeoutTask_->setCallback([weakSelf](const PeriodicTask::ErrorCode& ec) {
-+ ClientConnectionPtr ptr = weakSelf.lock();
-+ if (!ptr) {
-+ // Connection was already destroyed
-+ return;
-+ }
-+
-+ if (ptr->state_ != Ready) {
-+ LOG_ERROR(ptr->cnxString_ << "Connection was not established in "
-+ << ptr->connectTimeoutTask_->getPeriodMs()
-+ << " ms, close the socket");
-+ PeriodicTask::ErrorCode err;
-+ ptr->socket_->close(err);
-+ if (err) {
-+ LOG_WARN(ptr->cnxString_ << "Failed to close socket: " << err.message());
-+ }
-+ }
-+ ptr->connectTimeoutTask_->stop();
-+ });
-+ connectTimeoutTask_->start();
-+ std::vector<tcp::resolver::endpoint_type> endpoints;
-+ for (const auto& result : results) {
-+ endpoints.emplace_back(result.endpoint());
-+ }
-+ asyncConnect(endpoints, 0);
-+ });
- }
-
--void ClientConnection::handleResolve(const ASIO_ERROR& err, tcp::resolver::iterator endpointIterator) {
-- if (err) {
-- std::string hostUrl = isSniProxy_ ? cnxString_ : proxyServiceUrl_;
-- LOG_ERROR(hostUrl << "Resolve error: " << err << " : " << err.message());
-- close();
-+void ClientConnection::asyncConnect(const std::vector<ASIO::ip::tcp::endpoint>& endpoints, size_t index) {
-+ if (index >= endpoints.size()) {
-+ close(ResultRetryable);
- return;
- }
--
- auto weakSelf = weak_from_this();
-- connectTimeoutTask_->setCallback([weakSelf](const PeriodicTask::ErrorCode& ec) {
-- ClientConnectionPtr ptr = weakSelf.lock();
-- if (!ptr) {
-- // Connection was already destroyed
-+ socket_->async_connect(endpoints[index], [this, weakSelf, endpoints, index](const ASIO_ERROR& err) {
-+ auto self = weakSelf.lock();
-+ if (!self) {
- return;
- }
--
-- if (ptr->state_ != Ready) {
-- LOG_ERROR(ptr->cnxString_ << "Connection was not established in "
-- << ptr->connectTimeoutTask_->getPeriodMs() << " ms, close the socket");
-- PeriodicTask::ErrorCode err;
-- ptr->socket_->close(err);
-- if (err) {
-- LOG_WARN(ptr->cnxString_ << "Failed to close socket: " << err.message());
-- }
-+ if (err) {
-+ LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
-+ asyncConnect(endpoints, index + 1);
-+ return;
- }
-- ptr->connectTimeoutTask_->stop();
-+ completeConnect(endpoints[index]);
- });
--
-- LOG_DEBUG(cnxString_ << "Connecting to " << endpointIterator->endpoint() << "...");
-- connectTimeoutTask_->start();
-- if (endpointIterator != tcp::resolver::iterator()) {
-- LOG_DEBUG(cnxString_ << "Resolved hostname " << endpointIterator->host_name() //
-- << " to " << endpointIterator->endpoint());
-- socket_->async_connect(*endpointIterator, [weakSelf, endpointIterator](const ASIO_ERROR& err) {
-- auto self = weakSelf.lock();
-- if (self) {
-- self->handleTcpConnected(err, endpointIterator);
-- }
-- });
-- } else {
-- LOG_WARN(cnxString_ << "No IP address found");
-- close();
-- return;
-- }
- }
-
- void ClientConnection::readNextCommand() {
-@@ -1058,7 +1027,7 @@ void ClientConnection::newLookup(const SharedBuffer& cmd, const uint64_t request
- LookupRequestData requestData;
- requestData.promise = promise;
- requestData.timer = executor_->createDeadlineTimer();
-- requestData.timer->expires_from_now(operationsTimeout_);
-+ requestData.timer->expires_after(operationsTimeout_);
- auto weakSelf = weak_from_this();
- requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) {
- auto self = weakSelf.lock();
-@@ -1174,8 +1143,9 @@ void ClientConnection::sendPendingCommands() {
- PairSharedBuffer buffer =
- Commands::newSend(outgoingBuffer_, outgoingCmd, getChecksumType(), *args);
-
-- // Capture the buffer because asio does not copy the buffer, if the buffer is destroyed before the
-- // callback is called, an invalid buffer range might be passed to the underlying socket send.
-+ // Capture the buffer because asio does not copy the buffer, if the buffer is destroyed before
-+ // the callback is called, an invalid buffer range might be passed to the underlying socket
-+ // send.
- asyncWrite(buffer, customAllocWriteHandler([this, self, buffer](const ASIO_ERROR& err, size_t) {
- handleSendPair(err);
- }));
-@@ -1198,7 +1168,7 @@ Future<Result, ResponseData> ClientConnection::sendRequestWithId(SharedBuffer cm
-
- PendingRequestData requestData;
- requestData.timer = executor_->createDeadlineTimer();
-- requestData.timer->expires_from_now(operationsTimeout_);
-+ requestData.timer->expires_after(operationsTimeout_);
- auto weakSelf = weak_from_this();
- requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) {
- auto self = weakSelf.lock();
-@@ -1251,7 +1221,7 @@ void ClientConnection::handleKeepAliveTimeout() {
- // be zero And we do not attempt to dereference the pointer.
- Lock lock(mutex_);
- if (keepAliveTimer_) {
-- keepAliveTimer_->expires_from_now(std::chrono::seconds(keepAliveIntervalInSeconds_));
-+ keepAliveTimer_->expires_after(std::chrono::seconds(keepAliveIntervalInSeconds_));
- auto weakSelf = weak_from_this();
- keepAliveTimer_->async_wait([weakSelf](const ASIO_ERROR&) {
- auto self = weakSelf.lock();
-@@ -1430,7 +1400,7 @@ Future<Result, GetLastMessageIdResponse> ClientConnection::newGetLastMessageId(u
- LastMessageIdRequestData requestData;
- requestData.promise = promise;
- requestData.timer = executor_->createDeadlineTimer();
-- requestData.timer->expires_from_now(operationsTimeout_);
-+ requestData.timer->expires_after(operationsTimeout_);
- auto weakSelf = weak_from_this();
- requestData.timer->async_wait([weakSelf, requestData](const ASIO_ERROR& ec) {
- auto self = weakSelf.lock();
-@@ -1478,7 +1448,7 @@ Future<Result, SchemaInfo> ClientConnection::newGetSchema(const std::string& top
- lock.unlock();
-
- auto weakSelf = weak_from_this();
-- timer->expires_from_now(operationsTimeout_);
-+ timer->expires_after(operationsTimeout_);
- timer->async_wait([this, weakSelf, requestId](const ASIO_ERROR& ec) {
- auto self = weakSelf.lock();
- if (!self) {
-@@ -2047,8 +2017,7 @@ void ClientConnection::unsafeRemovePendingRequest(long requestId) {
- auto it = pendingRequests_.find(requestId);
- if (it != pendingRequests_.end()) {
- it->second.promise.setFailed(ResultDisconnected);
-- ASIO_ERROR ec;
-- it->second.timer->cancel(ec);
-+ it->second.timer->cancel();
- pendingRequests_.erase(it);
- }
- }
-diff --git lib/ClientConnection.h lib/ClientConnection.h
-index 7646f85e..14e07652 100644
---- lib/ClientConnection.h
-+++ lib/ClientConnection.h
-@@ -25,13 +25,13 @@
- #include <atomic>
- #ifdef USE_ASIO
- #include <asio/bind_executor.hpp>
--#include <asio/io_service.hpp>
-+#include <asio/io_context.hpp>
- #include <asio/ip/tcp.hpp>
- #include <asio/ssl/stream.hpp>
- #include <asio/strand.hpp>
- #else
- #include <boost/asio/bind_executor.hpp>
--#include <boost/asio/io_service.hpp>
-+#include <boost/asio/io_context.hpp>
- #include <boost/asio/ip/tcp.hpp>
- #include <boost/asio/ssl/stream.hpp>
- #include <boost/asio/strand.hpp>
-@@ -231,13 +231,8 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
- DeadlineTimerPtr timer;
- };
-
-- /*
-- * handler for connectAsync
-- * creates a ConnectionPtr which has a valid ClientConnection object
-- * although not usable at this point, since this is just tcp connection
-- * Pulsar - Connect/Connected has yet to happen
-- */
-- void handleTcpConnected(const ASIO_ERROR& err, ASIO::ip::tcp::resolver::iterator endpointIterator);
-+ void asyncConnect(const std::vector<ASIO::ip::tcp::endpoint>& endpoints, size_t index);
-+ void completeConnect(ASIO::ip::tcp::endpoint endpoint);
-
- void handleHandshake(const ASIO_ERROR& err);
-
-@@ -260,8 +255,6 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
-
- void handlePulsarConnected(const proto::CommandConnected& cmdConnected);
-
-- void handleResolve(const ASIO_ERROR& err, ASIO::ip::tcp::resolver::iterator endpointIterator);
--
- void handleSend(const ASIO_ERROR& err, const SharedBuffer& cmd);
- void handleSendPair(const ASIO_ERROR& err);
- void sendPendingCommands();
-@@ -324,7 +317,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
- */
- SocketPtr socket_;
- TlsSocketPtr tlsSocket_;
-- ASIO::strand<ASIO::io_service::executor_type> strand_;
-+ ASIO::strand<ASIO::io_context::executor_type> strand_;
-
- const std::string logicalAddress_;
- /*
-diff --git lib/ConsumerImpl.cc lib/ConsumerImpl.cc
-index 250845b3..cfdb0b2d 100644
---- lib/ConsumerImpl.cc
-+++ lib/ConsumerImpl.cc
-@@ -422,7 +422,7 @@ void ConsumerImpl::discardChunkMessages(std::string uuid, MessageId messageId, b
- }
-
- void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
-- checkExpiredChunkedTimer_->expires_from_now(milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
-+ checkExpiredChunkedTimer_->expires_after(milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
- std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
- checkExpiredChunkedTimer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) -> void {
- auto self = weakSelf.lock();
-@@ -1668,7 +1668,7 @@ void ConsumerImpl::internalGetLastMessageIdAsync(const BackoffPtr& backoff, Time
- }
- remainTime -= next;
-
-- timer->expires_from_now(next);
-+ timer->expires_after(next);
-
- auto self = shared_from_this();
- timer->async_wait([this, backoff, remainTime, timer, next, callback,
-@@ -1791,9 +1791,8 @@ std::shared_ptr<ConsumerImpl> ConsumerImpl::get_shared_this_ptr() {
- }
-
- void ConsumerImpl::cancelTimers() noexcept {
-- ASIO_ERROR ec;
-- batchReceiveTimer_->cancel(ec);
-- checkExpiredChunkedTimer_->cancel(ec);
-+ batchReceiveTimer_->cancel();
-+ checkExpiredChunkedTimer_->cancel();
- unAckedMessageTrackerPtr_->stop();
- consumerStatsBasePtr_->stop();
- }
-diff --git lib/ConsumerImplBase.cc lib/ConsumerImplBase.cc
-index 098f2d5b..76d99370 100644
---- lib/ConsumerImplBase.cc
-+++ lib/ConsumerImplBase.cc
-@@ -51,7 +51,7 @@ ConsumerImplBase::ConsumerImplBase(ClientImplPtr client, const std::string& topi
-
- void ConsumerImplBase::triggerBatchReceiveTimerTask(long timeoutMs) {
- if (timeoutMs > 0) {
-- batchReceiveTimer_->expires_from_now(std::chrono::milliseconds(timeoutMs));
-+ batchReceiveTimer_->expires_after(std::chrono::milliseconds(timeoutMs));
- std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
- batchReceiveTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
- auto self = weakSelf.lock();
-diff --git lib/ExecutorService.cc lib/ExecutorService.cc
-index 794e3619..7f2a2c14 100644
---- lib/ExecutorService.cc
-+++ lib/ExecutorService.cc
-@@ -18,6 +18,12 @@
- */
- #include "ExecutorService.h"
-
-+#ifdef USE_ASIO
-+#include <asio/post.hpp>
-+#else
-+#include <boost/asio/post.hpp>
-+#endif
-+
- #include "LogUtils.h"
- #include "TimeUtils.h"
- DECLARE_LOG_OBJECT()
-@@ -31,18 +37,13 @@ ExecutorService::~ExecutorService() { close(0); }
- void ExecutorService::start() {
- auto self = shared_from_this();
- std::thread t{[this, self] {
-- LOG_DEBUG("Run io_service in a single thread");
-- ASIO_ERROR ec;
-+ LOG_DEBUG("Run io_context in a single thread");
- while (!closed_) {
-- io_service_.restart();
-- IOService::work work{getIOService()};
-- io_service_.run(ec);
-- }
-- if (ec) {
-- LOG_ERROR("Failed to run io_service: " << ec.message());
-- } else {
-- LOG_DEBUG("Event loop of ExecutorService exits successfully");
-+ io_context_.restart();
-+ auto work{ASIO::make_work_guard(io_context_)};
-+ io_context_.run();
- }
-+ LOG_DEBUG("Event loop of ExecutorService exits successfully");
- {
- std::lock_guard<std::mutex> lock{mutex_};
- ioServiceDone_ = true;
-@@ -63,12 +64,12 @@ ExecutorServicePtr ExecutorService::create() {
- }
-
- /*
-- * factory method of ASIO::ip::tcp::socket associated with io_service_ instance
-+ * factory method of ASIO::ip::tcp::socket associated with io_context_ instance
- * @ returns shared_ptr to this socket
- */
- SocketPtr ExecutorService::createSocket() {
- try {
-- return SocketPtr(new ASIO::ip::tcp::socket(io_service_));
-+ return SocketPtr(new ASIO::ip::tcp::socket(io_context_));
- } catch (const ASIO_SYSTEM_ERROR &e) {
- restart();
- auto error = std::string("Failed to create socket: ") + e.what();
-@@ -82,12 +83,12 @@ TlsSocketPtr ExecutorService::createTlsSocket(SocketPtr &socket, ASIO::ssl::cont
- }
-
- /*
-- * factory method of Resolver object associated with io_service_ instance
-+ * factory method of Resolver object associated with io_context_ instance
- * @returns shraed_ptr to resolver object
- */
- TcpResolverPtr ExecutorService::createTcpResolver() {
- try {
-- return TcpResolverPtr(new ASIO::ip::tcp::resolver(io_service_));
-+ return TcpResolverPtr(new ASIO::ip::tcp::resolver(io_context_));
- } catch (const ASIO_SYSTEM_ERROR &e) {
- restart();
- auto error = std::string("Failed to create resolver: ") + e.what();
-@@ -97,7 +98,7 @@ TcpResolverPtr ExecutorService::createTcpResolver() {
-
- DeadlineTimerPtr ExecutorService::createDeadlineTimer() {
- try {
-- return DeadlineTimerPtr(new ASIO::steady_timer(io_service_));
-+ return DeadlineTimerPtr(new ASIO::steady_timer(io_context_));
- } catch (const ASIO_SYSTEM_ERROR &e) {
- restart();
- auto error = std::string("Failed to create steady_timer: ") + e.what();
-@@ -105,7 +106,7 @@ DeadlineTimerPtr ExecutorService::createDeadlineTimer() {
- }
- }
-
--void ExecutorService::restart() { io_service_.stop(); }
-+void ExecutorService::restart() { io_context_.stop(); }
-
- void ExecutorService::close(long timeoutMs) {
- bool expectedState = false;
-@@ -113,12 +114,12 @@ void ExecutorService::close(long timeoutMs) {
- return;
- }
- if (timeoutMs == 0) { // non-blocking
-- io_service_.stop();
-+ io_context_.stop();
- return;
- }
-
- std::unique_lock<std::mutex> lock{mutex_};
-- io_service_.stop();
-+ io_context_.stop();
- if (timeoutMs > 0) {
- cond_.wait_for(lock, std::chrono::milliseconds(timeoutMs), [this] { return ioServiceDone_; });
- } else { // < 0
-@@ -126,7 +127,7 @@ void ExecutorService::close(long timeoutMs) {
- }
- }
-
--void ExecutorService::postWork(std::function<void(void)> task) { io_service_.post(task); }
-+void ExecutorService::postWork(std::function<void(void)> task) { ASIO::post(io_context_, task); }
-
- /////////////////////
-
-diff --git lib/ExecutorService.h lib/ExecutorService.h
-index 89d06d30..626cb203 100644
---- lib/ExecutorService.h
-+++ lib/ExecutorService.h
-@@ -23,11 +23,11 @@
-
- #include <atomic>
- #ifdef USE_ASIO
--#include <asio/io_service.hpp>
-+#include <asio/io_context.hpp>
- #include <asio/ip/tcp.hpp>
- #include <asio/ssl.hpp>
- #else
--#include <boost/asio/io_service.hpp>
-+#include <boost/asio/io_context.hpp>
- #include <boost/asio/ip/tcp.hpp>
- #include <boost/asio/ssl.hpp>
- #endif
-@@ -46,7 +46,7 @@ typedef std::shared_ptr<ASIO::ssl::stream<ASIO::ip::tcp::socket &> > TlsSocketPt
- typedef std::shared_ptr<ASIO::ip::tcp::resolver> TcpResolverPtr;
- class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_this<ExecutorService> {
- public:
-- using IOService = ASIO::io_service;
-+ using IOService = ASIO::io_context;
- using SharedPtr = std::shared_ptr<ExecutorService>;
-
- static SharedPtr create();
-@@ -67,14 +67,14 @@ class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_this<Execut
- // See TimeoutProcessor for the semantics of the parameter.
- void close(long timeoutMs = 3000);
-
-- IOService &getIOService() { return io_service_; }
-+ IOService &getIOService() { return io_context_; }
- bool isClosed() const noexcept { return closed_; }
-
- private:
- /*
-- * io_service is our interface to os, io object schedule async ops on this object
-+ * io_context is our interface to os, io object schedule async ops on this object
- */
-- IOService io_service_;
-+ IOService io_context_;
-
- std::atomic_bool closed_{false};
- std::mutex mutex_;
-diff --git lib/HandlerBase.cc lib/HandlerBase.cc
-index 65aa0db1..71902481 100644
---- lib/HandlerBase.cc
-+++ lib/HandlerBase.cc
-@@ -50,9 +50,8 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic,
- redirectedClusterURI_("") {}
-
- HandlerBase::~HandlerBase() {
-- ASIO_ERROR ignored;
-- timer_->cancel(ignored);
-- creationTimer_->cancel(ignored);
-+ timer_->cancel();
-+ creationTimer_->cancel();
- }
-
- void HandlerBase::start() {
-@@ -61,15 +60,14 @@ void HandlerBase::start() {
- if (state_.compare_exchange_strong(state, Pending)) {
- grabCnx();
- }
-- creationTimer_->expires_from_now(operationTimeut_);
-+ creationTimer_->expires_after(operationTimeut_);
- std::weak_ptr<HandlerBase> weakSelf{shared_from_this()};
- creationTimer_->async_wait([this, weakSelf](const ASIO_ERROR& error) {
- auto self = weakSelf.lock();
- if (self && !error) {
- LOG_WARN("Cancel the pending reconnection due to the start timeout");
- connectionFailed(ResultTimeout);
-- ASIO_ERROR ignored;
-- timer_->cancel(ignored);
-+ timer_->cancel();
- }
- });
- }
-@@ -133,8 +131,7 @@ void HandlerBase::grabCnx(const boost::optional<std::string>& assignedBrokerUrl)
- connectionTimeMs_ =
- duration_cast<milliseconds>(high_resolution_clock::now() - before).count();
- // Prevent the creationTimer_ from cancelling the timer_ in future
-- ASIO_ERROR ignored;
-- creationTimer_->cancel(ignored);
-+ creationTimer_->cancel();
- LOG_INFO("Finished connecting to broker after " << connectionTimeMs_ << " ms")
- } else if (isResultRetryable(result)) {
- scheduleReconnection();
-@@ -188,7 +185,7 @@ void HandlerBase::scheduleReconnection(const boost::optional<std::string>& assig
- TimeDuration delay = assignedBrokerUrl ? std::chrono::milliseconds(0) : backoff_.next();
-
- LOG_INFO(getName() << "Schedule reconnection in " << (toMillis(delay) / 1000.0) << " s");
-- timer_->expires_from_now(delay);
-+ timer_->expires_after(delay);
- // passing shared_ptr here since time_ will get destroyed, so tasks will be cancelled
- // so we will not run into the case where grabCnx is invoked on out of scope handler
- auto name = getName();
-diff --git lib/MultiTopicsConsumerImpl.cc lib/MultiTopicsConsumerImpl.cc
-index dddade5c..61fbf7b8 100644
---- lib/MultiTopicsConsumerImpl.cc
-+++ lib/MultiTopicsConsumerImpl.cc
-@@ -962,7 +962,7 @@ uint64_t MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() {
- return numberOfConnectedConsumer;
- }
- void MultiTopicsConsumerImpl::runPartitionUpdateTask() {
-- partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_);
-+ partitionsUpdateTimer_->expires_after(partitionsUpdateInterval_);
- auto weakSelf = weak_from_this();
- partitionsUpdateTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
- // If two requests call runPartitionUpdateTask at the same time, the timer will fail, and it
-@@ -1115,8 +1115,7 @@ void MultiTopicsConsumerImpl::beforeConnectionChange(ClientConnection& cnx) {
-
- void MultiTopicsConsumerImpl::cancelTimers() noexcept {
- if (partitionsUpdateTimer_) {
-- ASIO_ERROR ec;
-- partitionsUpdateTimer_->cancel(ec);
-+ partitionsUpdateTimer_->cancel();
- }
- }
-
-diff --git lib/NegativeAcksTracker.cc lib/NegativeAcksTracker.cc
-index e443496d..e50b4ca2 100644
---- lib/NegativeAcksTracker.cc
-+++ lib/NegativeAcksTracker.cc
-@@ -50,7 +50,7 @@ void NegativeAcksTracker::scheduleTimer() {
- return;
- }
- std::weak_ptr<NegativeAcksTracker> weakSelf{shared_from_this()};
-- timer_->expires_from_now(timerInterval_);
-+ timer_->expires_after(timerInterval_);
- timer_->async_wait([weakSelf](const ASIO_ERROR &ec) {
- if (auto self = weakSelf.lock()) {
- self->handleTimer(ec);
-@@ -107,8 +107,7 @@ void NegativeAcksTracker::add(const MessageId &m) {
-
- void NegativeAcksTracker::close() {
- closed_ = true;
-- ASIO_ERROR ec;
-- timer_->cancel(ec);
-+ timer_->cancel();
- std::lock_guard<std::mutex> lock(mutex_);
- nackedMessages_.clear();
- }
-diff --git lib/PartitionedProducerImpl.cc lib/PartitionedProducerImpl.cc
-index 4178096c..923c038b 100644
---- lib/PartitionedProducerImpl.cc
-+++ lib/PartitionedProducerImpl.cc
-@@ -421,7 +421,7 @@ void PartitionedProducerImpl::flushAsync(FlushCallback callback) {
-
- void PartitionedProducerImpl::runPartitionUpdateTask() {
- auto weakSelf = weak_from_this();
-- partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_);
-+ partitionsUpdateTimer_->expires_after(partitionsUpdateInterval_);
- partitionsUpdateTimer_->async_wait([weakSelf](const ASIO_ERROR& ec) {
- auto self = weakSelf.lock();
- if (self) {
-@@ -524,8 +524,7 @@ uint64_t PartitionedProducerImpl::getNumberOfConnectedProducer() {
-
- void PartitionedProducerImpl::cancelTimers() noexcept {
- if (partitionsUpdateTimer_) {
-- ASIO_ERROR ec;
-- partitionsUpdateTimer_->cancel(ec);
-+ partitionsUpdateTimer_->cancel();
- }
- }
-
-diff --git lib/PatternMultiTopicsConsumerImpl.cc lib/PatternMultiTopicsConsumerImpl.cc
-index 4fc7bb61..07d9a7bc 100644
---- lib/PatternMultiTopicsConsumerImpl.cc
-+++ lib/PatternMultiTopicsConsumerImpl.cc
-@@ -48,7 +48,7 @@ const PULSAR_REGEX_NAMESPACE::regex PatternMultiTopicsConsumerImpl::getPattern()
-
- void PatternMultiTopicsConsumerImpl::resetAutoDiscoveryTimer() {
- autoDiscoveryRunning_ = false;
-- autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod()));
-+ autoDiscoveryTimer_->expires_after(seconds(conf_.getPatternAutoDiscoveryPeriod()));
-
- auto weakSelf = weak_from_this();
- autoDiscoveryTimer_->async_wait([weakSelf](const ASIO_ERROR& err) {
-@@ -228,7 +228,7 @@ void PatternMultiTopicsConsumerImpl::start() {
- LOG_DEBUG("PatternMultiTopicsConsumerImpl start autoDiscoveryTimer_.");
-
- if (conf_.getPatternAutoDiscoveryPeriod() > 0) {
-- autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod()));
-+ autoDiscoveryTimer_->expires_after(seconds(conf_.getPatternAutoDiscoveryPeriod()));
- auto weakSelf = weak_from_this();
- autoDiscoveryTimer_->async_wait([weakSelf](const ASIO_ERROR& err) {
- if (auto self = weakSelf.lock()) {
-@@ -248,7 +248,4 @@ void PatternMultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
- MultiTopicsConsumerImpl::closeAsync(callback);
- }
-
--void PatternMultiTopicsConsumerImpl::cancelTimers() noexcept {
-- ASIO_ERROR ec;
-- autoDiscoveryTimer_->cancel(ec);
--}
-+void PatternMultiTopicsConsumerImpl::cancelTimers() noexcept { autoDiscoveryTimer_->cancel(); }
-diff --git lib/PeriodicTask.cc lib/PeriodicTask.cc
-index 9fde012a..4b5f9621 100644
---- lib/PeriodicTask.cc
-+++ lib/PeriodicTask.cc
-@@ -29,7 +29,7 @@ void PeriodicTask::start() {
- state_ = Ready;
- if (periodMs_ >= 0) {
- std::weak_ptr<PeriodicTask> weakSelf{shared_from_this()};
-- timer_->expires_from_now(std::chrono::milliseconds(periodMs_));
-+ timer_->expires_after(std::chrono::milliseconds(periodMs_));
- timer_->async_wait([weakSelf](const ErrorCode& ec) {
- auto self = weakSelf.lock();
- if (self) {
-@@ -44,8 +44,7 @@ void PeriodicTask::stop() noexcept {
- if (!state_.compare_exchange_strong(state, Closing)) {
- return;
- }
-- ErrorCode ec;
-- timer_->cancel(ec);
-+ timer_->cancel();
- state_ = Pending;
- }
-
-@@ -59,7 +58,7 @@ void PeriodicTask::handleTimeout(const ErrorCode& ec) {
- // state_ may be changed in handleTimeout, so we check state_ again
- if (state_ == Ready) {
- auto self = shared_from_this();
-- timer_->expires_from_now(std::chrono::milliseconds(periodMs_));
-+ timer_->expires_after(std::chrono::milliseconds(periodMs_));
- timer_->async_wait([this, self](const ErrorCode& ec) { handleTimeout(ec); });
- }
- }
-diff --git lib/ProducerImpl.cc lib/ProducerImpl.cc
-index 4399ce5f..8b112bf1 100644
---- lib/ProducerImpl.cc
-+++ lib/ProducerImpl.cc
-@@ -570,7 +570,7 @@ void ProducerImpl::sendAsyncWithStatsUpdate(const Message& msg, SendCallback&& c
- bool isFirstMessage = batchMessageContainer_->isFirstMessageToAdd(msg);
- bool isFull = batchMessageContainer_->add(msg, callback);
- if (isFirstMessage) {
-- batchTimer_->expires_from_now(milliseconds(conf_.getBatchingMaxPublishDelayMs()));
-+ batchTimer_->expires_after(milliseconds(conf_.getBatchingMaxPublishDelayMs()));
- auto weakSelf = weak_from_this();
- batchTimer_->async_wait([this, weakSelf](const ASIO_ERROR& ec) {
- auto self = weakSelf.lock();
-@@ -1007,9 +1007,8 @@ void ProducerImpl::shutdown() {
-
- void ProducerImpl::cancelTimers() noexcept {
- dataKeyRefreshTask_.stop();
-- ASIO_ERROR ec;
-- batchTimer_->cancel(ec);
-- sendTimer_->cancel(ec);
-+ batchTimer_->cancel();
-+ sendTimer_->cancel();
- }
-
- bool ProducerImplCmp::operator()(const ProducerImplPtr& a, const ProducerImplPtr& b) const {
-@@ -1030,7 +1029,7 @@ void ProducerImpl::startSendTimeoutTimer() {
- }
-
- void ProducerImpl::asyncWaitSendTimeout(DurationType expiryTime) {
-- sendTimer_->expires_from_now(expiryTime);
-+ sendTimer_->expires_after(expiryTime);
*** 217 LINES SKIPPED ***