Asio C++ Library 是一個免費、開放原始碼、跨平台的 C++ 網路程式庫。 它為開發者提供一致的非同步 I/O 模型(包含 Timer、File、Pipe、Serial Port 以及網路協定 TCP, UDP 與 ICMP), Boost.Asio 在 20 天的審查後,於 2005 年 12 月 30 日被 Boost 函式庫接納。 目前 Asio C++ Library 提供二種函式庫,一種可以獨立使用的 Asio C++ library,一種是與 Boost 函式庫整合的 Boost.Asio, 二種函式庫的核心相同,差別在於 Boost.Asio 跟隨 Boost 函式庫的發佈時程(這表示當 bugs 修正的時候, 有時候會慢一點才會隨著 Boost 的新版更正)。因為已經有安裝 Boost 函式庫,所以我使用的是 Boost.Asio。
Asio 在設計上使用 Proactor pattern。 Proactor 是一種用於事件處理的軟體設計模式,其中耗時較長的活動在非同步部分運行(在 Asio 就是 I/O 處理的部份)。 非同步部分終止後,會呼叫完成處理程序。 所有使用 asio 的程式都需要至少一個 I/O execution context,例如 io_context 或 thread_pool 物件。 I/O execution context 提供對 I/O 功能的存取。如果是非同步的操作,那麼需要實作 completion handler 來提供工作完成之後的通知目標。
下面是一個測試的程式,來自 Asio 教學網頁的 Using a timer synchronously。 boost::asio::io_context 就是執行 I/O 的部份。
#include <boost/asio.hpp>
#include <iostream>
int main() {
boost::asio::io_context io;
boost::asio::steady_timer t(io, boost::asio::chrono::seconds(3));
t.wait();
std::cout << "Hello, world!" << std::endl;
return 0;
}
使用 CMake 編譯,CMakeLists.txt 的內容如下:
cmake_minimum_required(VERSION 3.18)
project(timer)
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED True)
find_package(Boost 1.89.0 REQUIRED CONFIG COMPONENTS)
add_executable(timer timer.cpp)
Using a timer asynchronously
使用 asio 的非同步功能意味著需要一個 completion token,該 token 決定了非同步操作完成後如何將結果傳遞給完成處理程序。 在這裡使用 print 函數,該函數將在非同步等待結束後被呼叫。
務必記住,在呼叫 boost::asio::io_context::run() 之前,要先給 io_context 一些工作。 如果沒指定一些工作(在本例中是 steady_timer::async_wait()),boost::asio::io_context::run() 會立即返回。
#include <boost/asio.hpp>
#include <iostream>
void print(const boost::system::error_code & /*e*/) {
std::cout << "Hello, world!" << std::endl;
}
int main() {
boost::asio::io_context io;
boost::asio::steady_timer t(io, boost::asio::chrono::seconds(3));
t.async_wait(&print);
io.run();
return 0;
}
Binding arguments to a completion handler
要使用 asio 實作重複定時器,需要在完成處理程序中更改定時器的過期時間,然後啟動新的非同步等待。 這意味著 completion handler 需要能夠存取定時器物件。
#include <boost/asio.hpp>
#include <functional>
#include <iostream>
void print(const boost::system::error_code & /*e*/,
boost::asio::steady_timer *t, int *count) {
if (*count < 5) {
std::cout << *count << std::endl;
++(*count);
t->expires_at(t->expiry() + boost::asio::chrono::seconds(1));
t->async_wait(
std::bind(print, boost::asio::placeholders::error, t, count));
}
}
int main() {
boost::asio::io_context io;
int count = 0;
boost::asio::steady_timer t(io, boost::asio::chrono::seconds(1));
t.async_wait(
std::bind(print, boost::asio::placeholders::error, &t, &count));
io.run();
std::cout << "Final count is " << count << std::endl;
return 0;
}
Using a member function as a completion handler
std::bind 函式對類別成員函式和函式同樣有效。由於所有非靜態類別成員函數都有一個隱式的 this 參數,我們需要將 this 綁定到函數上。 std::bind 將我們的 completion handler(現在是成員函數)轉換為函數對象。
#include <boost/asio.hpp>
#include <functional>
#include <iostream>
class printer {
public:
printer(boost::asio::io_context &io)
: timer_(io, boost::asio::chrono::seconds(1)), count_(0) {
timer_.async_wait(std::bind(&printer::print, this));
}
~printer() { std::cout << "Final count is " << count_ << std::endl; }
void print() {
if (count_ < 5) {
std::cout << count_ << std::endl;
++count_;
timer_.expires_at(timer_.expiry() +
boost::asio::chrono::seconds(1));
timer_.async_wait(std::bind(&printer::print, this));
}
}
private:
boost::asio::steady_timer timer_;
int count_;
};
int main() {
boost::asio::io_context io;
printer p(io);
io.run();
return 0;
}
Synchronising completion handlers in multithreaded programs
strand class template 是 executor adapter,它保證透過它分發的處理程序,在下一個處理程序啟動之前, 目前正在執行的處理程序必須完成。無論呼叫 boost::asio::io_context::run() 的執行緒數是多少,此保證都有效。 當然,這些處理程序仍然可能與其他未透過 strand 分發的處理程序,或透過不同 strand 物件分發的處理程序並發執行。
#include <boost/asio.hpp>
#include <functional>
#include <iostream>
#include <thread>
class printer {
public:
printer(boost::asio::io_context &io)
: strand_(boost::asio::make_strand(io)),
timer1_(io, boost::asio::chrono::seconds(1)),
timer2_(io, boost::asio::chrono::seconds(1)), count_(0) {
timer1_.async_wait(boost::asio::bind_executor(
strand_, std::bind(&printer::print1, this)));
timer2_.async_wait(boost::asio::bind_executor(
strand_, std::bind(&printer::print2, this)));
}
~printer() { std::cout << "Final count is " << count_ << std::endl; }
void print1() {
if (count_ < 10) {
std::cout << "Timer 1: " << count_ << std::endl;
++count_;
timer1_.expires_at(timer1_.expiry() +
boost::asio::chrono::seconds(1));
timer1_.async_wait(boost::asio::bind_executor(
strand_, std::bind(&printer::print1, this)));
}
}
void print2() {
if (count_ < 10) {
std::cout << "Timer 2: " << count_ << std::endl;
++count_;
timer2_.expires_at(timer2_.expiry() +
boost::asio::chrono::seconds(1));
timer2_.async_wait(boost::asio::bind_executor(
strand_, std::bind(&printer::print2, this)));
}
}
private:
boost::asio::strand<boost::asio::io_context::executor_type> strand_;
boost::asio::steady_timer timer1_;
boost::asio::steady_timer timer2_;
int count_;
};
int main() {
boost::asio::io_context io;
printer p(io);
std::thread t([&] { io.run(); });
io.run();
t.join();
return 0;
}
File
Linux io_uring 在 Kernel 5.1 加入,其主要目標是透過高效率的非同步 I/O 框架,解決傳統 I/O 模型中系統呼叫和上下文切換的效能瓶頸, 移除傳統同步I/O 與 epoll 就緒通知模型需要頻繁切換使用者空間與核心空間的負擔,進而大幅提升系統在處理大量並發 I/O 操作時的效能。 liburing 是 Jens Axboe 維護的輔助函式庫,其主要目的是簡化 io_uring 的使用。 Asio 對於 Linux liburing 提供了包裝(目前需要使用者使用 flag 啟用),下面是我測試的程式, 讀取 /etc/os-release 取得 Linux Distribution Name:
#include <boost/asio.hpp>
#include <boost/asio/stream_file.hpp>
#include <filesystem>
#include <iostream>
#include <vector>
namespace asio = boost::asio;
namespace fs = std::filesystem;
std::vector<std::string> split(const std::string &str,
const std::string &delim) {
std::vector<std::string> tokens;
size_t prev = 0, pos = 0;
do {
pos = str.find(delim, prev);
if (pos == std::string::npos)
pos = str.length();
std::string token = str.substr(prev, pos - prev);
if (!token.empty())
tokens.push_back(token);
prev = pos + delim.length();
} while (pos < str.length() && prev < str.length());
return tokens;
}
void read_next_line(asio::stream_file &file, asio::streambuf &buffer) {
asio::async_read_until(file, buffer, '\n',
[&](const boost::system::error_code &ec,
std::size_t bytes_transferred) {
if (!ec) {
std::istream is(&buffer);
std::string line;
std::getline(is, line);
auto splitArray = split(line, "=");
if (splitArray[0].compare("NAME") == 0) {
std::cout << splitArray[1] << std::endl;
} else {
read_next_line(file, buffer);
}
} else if (ec == asio::error::eof) {
std::cout << "End of file reached."
<< std::endl;
} else {
std::cerr
<< "Error reading file: " << ec.message()
<< std::endl;
}
});
}
int main() {
fs::path test_file_path = "/etc/os-release";
asio::io_context io_context;
boost::system::error_code ec_open;
asio::stream_file file(io_context);
file.open(test_file_path.string(), asio::stream_file::read_only, ec_open);
if (ec_open) {
std::cerr << "Failed to open file: " << ec_open.message() << std::endl;
return 1;
}
asio::streambuf buffer;
read_next_line(file, buffer);
io_context.run();
file.close();
return 0;
}
使用 CMake 編譯,CMakeLists.txt 的內容如下:
cmake_minimum_required(VERSION 3.18)
project(name)
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED True)
find_package(PkgConfig REQUIRED)
pkg_check_modules(uring REQUIRED IMPORTED_TARGET liburing)
find_package(Boost 1.89.0 REQUIRED CONFIG COMPONENTS)
add_executable(name name.cpp)
target_link_libraries(name PRIVATE PkgConfig::uring)
target_compile_definitions(name PRIVATE BOOST_ASIO_HAS_IO_URING BOOST_ASIO_DISABLE_EPOLL)
Tcp
A synchronous TCP daytime client
我們需要將作為參數傳遞給應用程式的伺服器名稱轉換為 TCP 端點。為此,我們使用 ip::tcp::resolver 物件。 resolver 接收主機名稱和服務名,並將它們轉換為端點列表。 程式接下來建立並連接 Socket。上面獲得的端點列表可能同時包含 IPv4 和 IPv6 端點,因此我們需要逐一嘗試,直到找到可用的端點。 這樣可以確保客戶端程式與特定的 IP 版本無關。boost::asio::connect() 函數會自動執行此操作。
#include <array>
#include <boost/asio.hpp>
#include <iostream>
namespace asio = boost::asio;
int main(int argc, char *argv[]) {
try {
if (argc != 2) {
std::cerr << "Usage: client <host>" << std::endl;
return 1;
}
asio::io_context io_context;
asio::ip::tcp::resolver resolver(io_context);
asio::ip::tcp::resolver::results_type endpoints =
resolver.resolve(argv[1], "daytime");
asio::ip::tcp::socket socket(io_context);
asio::connect(socket, endpoints);
for (;;) {
std::array<char, 128> buf;
boost::system::error_code error;
size_t len = socket.read_some(asio::buffer(buf), error);
if (error == asio::error::eof)
break; // Connection closed cleanly by peer.
else if (error)
throw boost::system::system_error(error); // Some other error.
std::cout.write(buf.data(), len);
}
} catch (std::exception &e) {
std::cerr << e.what() << std::endl;
}
return 0;
}
A synchronous TCP daytime server
需要建立一個 ip::tcp::acceptor 物件來監聽新連線。它被初始化為監聽 TCP 連接埠 13,支援 IP 版本 6。
#include <boost/asio.hpp>
#include <ctime>
#include <iostream>
#include <string>
namespace asio = boost::asio;
std::string make_daytime_string() {
std::time_t now = std::time(0);
return std::ctime(&now);
}
int main() {
try {
asio::io_context io_context;
asio::ip::tcp::acceptor acceptor(
io_context, asio::ip::tcp::endpoint(asio::ip::tcp::v6(), 13));
for (;;) {
asio::ip::tcp::socket socket(io_context);
acceptor.accept(socket);
std::string message = make_daytime_string();
boost::system::error_code ignored_error;
asio::write(socket, boost::asio::buffer(message), ignored_error);
}
} catch (std::exception &e) {
std::cerr << e.what() << std::endl;
}
return 0;
}
An asynchronous TCP daytime server
#include <boost/asio.hpp>
#include <ctime>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
namespace asio = boost::asio;
std::string make_daytime_string() {
std::time_t now = std::time(0);
return std::ctime(&now);
}
class tcp_connection : public std::enable_shared_from_this<tcp_connection> {
public:
typedef std::shared_ptr<tcp_connection> pointer;
static pointer create(asio::io_context &io_context) {
return pointer(new tcp_connection(io_context));
}
asio::ip::tcp::socket &socket() { return socket_; }
void start() {
message_ = make_daytime_string();
asio::async_write(socket_, asio::buffer(message_),
std::bind(&tcp_connection::handle_write,
shared_from_this(),
asio::placeholders::error,
asio::placeholders::bytes_transferred));
}
private:
tcp_connection(asio::io_context &io_context) : socket_(io_context) {}
void handle_write(const boost::system::error_code & /*error*/,
size_t /*bytes_transferred*/) {}
asio::ip::tcp::socket socket_;
std::string message_;
};
class tcp_server {
public:
tcp_server(asio::io_context &io_context)
: io_context_(io_context),
acceptor_(io_context,
asio::ip::tcp::endpoint(asio::ip::tcp::v6(), 13)) {
start_accept();
}
private:
void start_accept() {
tcp_connection::pointer new_connection =
tcp_connection::create(io_context_);
acceptor_.async_accept(new_connection->socket(),
std::bind(&tcp_server::handle_accept, this,
new_connection,
asio::placeholders::error));
}
void handle_accept(tcp_connection::pointer new_connection,
const boost::system::error_code &error) {
if (!error) {
new_connection->start();
}
start_accept();
}
asio::io_context &io_context_;
asio::ip::tcp::acceptor acceptor_;
};
int main() {
try {
asio::io_context io_context;
tcp_server server(io_context);
io_context.run();
} catch (std::exception &e) {
std::cerr << e.what() << std::endl;
}
return 0;
}
下面是我的練習程式,將 client 改寫為 asynchronous:
#include <boost/asio.hpp>
#include <iostream>
#include <vector>
namespace asio = boost::asio;
const int BUFFER_SIZE = 128;
void handle_read(const boost::system::error_code &error,
std::size_t bytes_transferred, asio::ip::tcp::socket &socket,
std::vector<char> &buffer) {
if (!error) {
for (std::size_t i = 0; i < bytes_transferred; ++i) {
std::cout << buffer[i];
}
} else {
std::cerr << "Error during read: " << error.message() << std::endl;
}
}
int main(int argc, char *argv[]) {
try {
if (argc != 2) {
std::cerr << "Usage: client <host>" << std::endl;
return 1;
}
asio::io_context io_context;
asio::ip::tcp::resolver resolver(io_context);
asio::ip::tcp::resolver::results_type endpoints =
resolver.resolve(argv[1], "daytime");
asio::ip::tcp::socket socket(io_context);
asio::connect(socket, endpoints);
std::vector<char> buffer(BUFFER_SIZE);
socket.async_read_some(asio::buffer(buffer),
std::bind(handle_read, std::placeholders::_1,
std::placeholders::_2,
std::ref(socket), std::ref(buffer)));
io_context.run();
socket.close();
} catch (std::exception &e) {
std::cerr << e.what() << std::endl;
}
return 0;
}
UDP
A synchronous UDP daytime client
我們使用 ip::udp::resolver 物件,根據主機名稱和服務名稱尋找要使用的正確遠端端點。 透過 ip::udp::v6() 參數,查詢被限制為僅傳回 IPv6 端點。 如果 ip::udp::resolver::resolve()函數沒有失敗,則保證至少會傳回清單中的一個端點。這意味著直接解引用回傳值是安全的。
#include <boost/asio.hpp>
#include <array>
#include <iostream>
namespace asio = boost::asio;
int main(int argc, char *argv[]) {
try {
if (argc != 2) {
std::cerr << "Usage: client <host>" << std::endl;
return 1;
}
asio::io_context io_context;
asio::ip::udp::resolver resolver(io_context);
asio::ip::udp::endpoint receiver_endpoint =
*resolver.resolve(asio::ip::udp::v6(), argv[1], "daytime").begin();
asio::ip::udp::socket socket(io_context);
socket.open(asio::ip::udp::v6());
std::array<char, 1> send_buf = {{0}};
socket.send_to(asio::buffer(send_buf), receiver_endpoint);
std::array<char, 128> recv_buf;
asio::ip::udp::endpoint sender_endpoint;
size_t len =
socket.receive_from(asio::buffer(recv_buf), sender_endpoint);
std::cout.write(recv_buf.data(), len);
} catch (std::exception &e) {
std::cerr << e.what() << std::endl;
}
return 0;
}
A synchronous UDP daytime server
#include <boost/asio.hpp>
#include <array>
#include <ctime>
#include <iostream>
#include <string>
namespace asio = boost::asio;
std::string make_daytime_string() {
std::time_t now = std::time(0);
return std::ctime(&now);
}
int main() {
try {
asio::io_context io_context;
asio::ip::udp::socket socket(
io_context, asio::ip::udp::endpoint(asio::ip::udp::v6(), 13));
for (;;) {
std::array<char, 1> recv_buf;
asio::ip::udp::endpoint remote_endpoint;
socket.receive_from(asio::buffer(recv_buf), remote_endpoint);
std::string message = make_daytime_string();
boost::system::error_code ignored_error;
socket.send_to(asio::buffer(message), remote_endpoint, 0,
ignored_error);
}
} catch (std::exception &e) {
std::cerr << e.what() << std::endl;
}
return 0;
}
An asynchronous UDP daytime server
#include <boost/asio.hpp>
#include <array>
#include <ctime>
#include <functional>
#include <iostream>
#include <memory>
#include <string>
namespace asio = boost::asio;
std::string make_daytime_string() {
std::time_t now = std::time(0);
return std::ctime(&now);
}
class udp_server {
public:
udp_server(asio::io_context &io_context)
: socket_(io_context,
asio::ip::udp::endpoint(asio::ip::udp::v6(), 13)) {
start_receive();
}
private:
void start_receive() {
socket_.async_receive_from(
asio::buffer(recv_buffer_), remote_endpoint_,
std::bind(&udp_server::handle_receive, this,
asio::placeholders::error,
asio::placeholders::bytes_transferred));
}
void handle_receive(const boost::system::error_code &error,
std::size_t /*bytes_transferred*/) {
if (!error) {
std::shared_ptr<std::string> message(
new std::string(make_daytime_string()));
socket_.async_send_to(
asio::buffer(*message), remote_endpoint_,
std::bind(&udp_server::handle_send, this, message,
asio::placeholders::error,
asio::placeholders::bytes_transferred));
start_receive();
}
}
void handle_send(std::shared_ptr<std::string> /*message*/,
const boost::system::error_code & /*error*/,
std::size_t /*bytes_transferred*/) {}
asio::ip::udp::socket socket_;
asio::ip::udp::endpoint remote_endpoint_;
std::array<char, 1> recv_buffer_;
};
int main() {
try {
asio::io_context io_context;
udp_server server(io_context);
io_context.run();
} catch (std::exception &e) {
std::cerr << e.what() << std::endl;
}
return 0;
}
下面是我的練習程式,將 client 改寫為 asynchronous:
#include <array>
#include <boost/asio.hpp>
#include <iostream>
namespace asio = boost::asio;
class udp_client {
public:
udp_client(asio::io_context &io_context, const std::string &host)
: socket_(io_context) {
asio::ip::udp::resolver resolver(io_context);
remote_endpoint_ =
*resolver.resolve(asio::ip::udp::v6(), host, "daytime").begin();
socket_.open(asio::ip::udp::v6());
start_send();
}
private:
void start_send() {
socket_.async_send_to(asio::buffer(send_buffer_), remote_endpoint_,
std::bind(&udp_client::handle_send, this,
asio::placeholders::error,
asio::placeholders::bytes_transferred));
}
void handle_send(const boost::system::error_code &error,
std::size_t /*bytes_transferred*/) {
if (!error) {
socket_.async_receive_from(
asio::buffer(recv_buffer_), remote_endpoint_,
std::bind(&udp_client::handle_receive, this,
asio::placeholders::error,
asio::placeholders::bytes_transferred));
} else {
std::cerr << "Error during send: " << error.message() << std::endl;
}
}
void handle_receive(const boost::system::error_code &error,
std::size_t bytes_transferred) {
if (!error) {
std::cout.write(recv_buffer_.data(), bytes_transferred);
} else {
std::cerr << "Error during receive: " << error.message()
<< std::endl;
}
}
asio::ip::udp::socket socket_;
asio::ip::udp::endpoint remote_endpoint_;
std::array<char, 1> send_buffer_ = {{0}};
std::array<char, 128> recv_buffer_;
};
int main(int argc, char *argv[]) {
try {
if (argc != 2) {
std::cerr << "Usage: client <host>" << std::endl;
return 1;
}
asio::io_context io_context;
udp_client client(io_context, argv[1]);
io_context.run();
} catch (std::exception &e) {
std::cerr << e.what() << std::endl;
}
return 0;
}
沒有留言:
張貼留言
注意:只有此網誌的成員可以留言。