From 6fe0b360c39c001e3154c794205ee445250e281c Mon Sep 17 00:00:00 2001 From: ety001 Date: Mon, 12 Jan 2026 02:06:53 +0800 Subject: [PATCH 1/3] Add ingest plugin for external data ingestion - Add ingest plugin that sends applied operations to external HTTP endpoint - Add name_from_type helper function in operations.cpp to extract operation names from type names - Plugin supports configurable endpoint URL and sends operation JSON via HTTP POST - Includes CMakeLists.txt, plugin.json, and header/implementation files --- libraries/plugins/ingest/CMakeLists.txt | 34 + .../steem/plugins/ingest/ingest_plugin.hpp | 35 ++ libraries/plugins/ingest/ingest_plugin.cpp | 583 ++++++++++++++++++ libraries/plugins/ingest/plugin.json | 5 + libraries/protocol/operations.cpp | 28 + 5 files changed, 685 insertions(+) create mode 100644 libraries/plugins/ingest/CMakeLists.txt create mode 100644 libraries/plugins/ingest/include/steem/plugins/ingest/ingest_plugin.hpp create mode 100644 libraries/plugins/ingest/ingest_plugin.cpp create mode 100644 libraries/plugins/ingest/plugin.json diff --git a/libraries/plugins/ingest/CMakeLists.txt b/libraries/plugins/ingest/CMakeLists.txt new file mode 100644 index 0000000000..39e8a8fe86 --- /dev/null +++ b/libraries/plugins/ingest/CMakeLists.txt @@ -0,0 +1,34 @@ +file(GLOB HEADERS "include/steem/plugins/ingest/*.hpp") + +add_library( ingest_plugin + ingest_plugin.cpp + ${HEADERS} + ) + +target_link_libraries( ingest_plugin + chain_plugin + steem_chain + steem_protocol + fc + ${Boost_LIBRARIES} + ${CMAKE_DL_LIBS} + ) + +target_include_directories( ingest_plugin + PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" + ) + +if( CLANG_TIDY_EXE ) + set_target_properties( + ingest_plugin PROPERTIES + CXX_CLANG_TIDY "${DO_CLANG_TIDY}" + ) +endif( CLANG_TIDY_EXE ) + +install( TARGETS + ingest_plugin + + RUNTIME DESTINATION bin + LIBRARY DESTINATION lib + ARCHIVE DESTINATION lib +) diff --git a/libraries/plugins/ingest/include/steem/plugins/ingest/ingest_plugin.hpp b/libraries/plugins/ingest/include/steem/plugins/ingest/ingest_plugin.hpp new file mode 100644 index 0000000000..e0b7ae4efe --- /dev/null +++ b/libraries/plugins/ingest/include/steem/plugins/ingest/ingest_plugin.hpp @@ -0,0 +1,35 @@ +#pragma once +#include +#include + +#include + +namespace steem { namespace plugins { namespace ingest { + +namespace detail { class ingest_plugin_impl; } + +using namespace appbase; + +#define STEEM_INGEST_PLUGIN_NAME "ingest" + +class ingest_plugin : public appbase::plugin< ingest_plugin > +{ + public: + ingest_plugin(); + virtual ~ingest_plugin(); + + APPBASE_PLUGIN_REQUIRES( (steem::plugins::chain::chain_plugin) ) + + static const std::string& name() { static std::string name = STEEM_INGEST_PLUGIN_NAME; return name; } + + virtual void set_program_options( boost::program_options::options_description& cli, + boost::program_options::options_description& cfg ) override; + virtual void plugin_initialize( const boost::program_options::variables_map& options ) override; + virtual void plugin_startup() override; + virtual void plugin_shutdown() override; + + private: + std::unique_ptr< detail::ingest_plugin_impl > my; +}; + +} } } // steem::plugins::ingest diff --git a/libraries/plugins/ingest/ingest_plugin.cpp b/libraries/plugins/ingest/ingest_plugin.cpp new file mode 100644 index 0000000000..8cae2dad7f --- /dev/null +++ b/libraries/plugins/ingest/ingest_plugin.cpp @@ -0,0 +1,583 @@ + +#include + +#include + +#include +#include +#include +#include + +#include +#include + +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace steem { namespace plugins { namespace ingest { + +using namespace steem::chain; + +namespace beast = boost::beast; +namespace http = beast::http; +namespace net = boost::asio; +using tcp = boost::asio::ip::tcp; +namespace bfs = boost::filesystem; + +namespace detail { + +class ingest_plugin_impl +{ + public: + ingest_plugin_impl( ingest_plugin& _plugin ); + ~ingest_plugin_impl(); + + void on_post_apply_operation( const operation_notification& note ); + + // HTTP sending + void send_operation_json( const fc::variant& json ); + void http_send_worker(); + void send_http_post( const std::string& json_body ); + + // File writing (for dry run) + void write_operation_json( const std::string& json_str ); + void initialize_output_file(); + + // JSON building + fc::variant build_operation_json( const operation_notification& note ); + + ingest_plugin& _plugin; + database& _db; + + // Signal connection + boost::signals2::connection _post_apply_operation_conn; + + // HTTP configuration + std::string _ingest_endpoint; + uint32_t _http_timeout_ms; + uint32_t _max_queue_size; + + // Dry run configuration + bool _dry_run; + bfs::path _output_file_path; + std::ofstream _output_file; + std::mutex _file_mutex; + + // Async send queue + std::queue< std::string > _send_queue; + std::mutex _queue_mutex; + std::condition_variable _queue_cv; + std::thread _http_thread; + bool _shutdown; +}; + +ingest_plugin_impl::ingest_plugin_impl( ingest_plugin& _plugin ) : + _plugin( _plugin ), + _db( appbase::app().get_plugin< chain::chain_plugin >().db() ), + _ingest_endpoint( "http://localhost:8080/ingest/applied_op" ), + _http_timeout_ms( 5000 ), + _max_queue_size( 100000 ), + _dry_run( false ), + _shutdown( false ) +{ +} + +ingest_plugin_impl::~ingest_plugin_impl() +{ + if( _http_thread.joinable() ) + { + { + std::lock_guard< std::mutex > lock( _queue_mutex ); + _shutdown = true; + } + _queue_cv.notify_one(); + _http_thread.join(); + } + + // Close output file if open + if( _output_file.is_open() ) + { + _output_file.close(); + } +} + +void ingest_plugin_impl::on_post_apply_operation( const operation_notification& note ) +{ + try + { + // Build Operation JSON + fc::variant json = build_operation_json( note ); + + // Async send + send_operation_json( json ); + } + catch( const fc::exception& e ) + { + elog( "Error processing operation: ${e}", ("e", e.to_string()) ); + } + catch( const std::exception& e ) + { + elog( "Error processing operation: ${e}", ("e", e.what()) ); + } +} + +fc::variant ingest_plugin_impl::build_operation_json( const operation_notification& note ) +{ + fc::mutable_variant_object result; + + // Get current block + auto block = _db.fetch_block_by_number( note.block ); + if( !block.valid() ) + { + FC_THROW( "Block ${b} not found", ("b", note.block) ); + } + + // 1. block object + fc::mutable_variant_object block_obj; + block_obj["num"] = note.block; + block_obj["id"] = block->id().str(); + block_obj["timestamp"] = block->timestamp.to_iso_string(); + result["block"] = block_obj; + + // 2. transaction object + fc::mutable_variant_object trx_obj; + + // Check if virtual operation + bool is_virtual = steem::protocol::is_virtual_operation( note.op ); + + if( is_virtual ) + { + // virtual operation: transaction info is null + trx_obj["id"] = fc::variant(); + trx_obj["index"] = -1; + } + else + { + // real operation: get transaction info from note + trx_obj["id"] = note.trx_id.str(); + trx_obj["index"] = static_cast( note.trx_in_block ); + } + result["transaction"] = trx_obj; + + // 3. operation object + fc::mutable_variant_object op_obj; + + // operation index (in transaction) + if( is_virtual ) + { + op_obj["index"] = static_cast( note.virtual_op ); + } + else + { + op_obj["index"] = static_cast( note.op_in_trx ); + } + + // operation type (use FC visitor to get name) + std::string op_name; + fc::get_operation_name get_name_visitor( op_name ); + note.op.visit( get_name_visitor ); + op_obj["type"] = op_name; + + // operation value (serialize operation to JSON) + // Use from_operation visitor to get pair, then extract value + fc::variant op_variant; + fc::from_operation from_op_visitor( op_variant ); + note.op.visit( from_op_visitor ); + + // from_operation creates variant(std::make_pair(name, value)) + // In FC, a pair is serialized as an array [name, value] + // We need to extract the value part (second element) + if( op_variant.is_array() && op_variant.size() == 2 ) + { + op_obj["value"] = op_variant[1]; // Get the value part (second element) + } + else if( op_variant.is_object() ) + { + // If it's an object, try to extract value field + auto op_obj_map = op_variant.get_object(); + if( op_obj_map.contains( "value" ) ) + { + op_obj["value"] = op_obj_map["value"]; + } + else + { + // Use the whole variant as fallback + op_obj["value"] = op_variant; + } + } + else + { + // Fallback: use the whole variant + op_obj["value"] = op_variant; + } + + result["operation"] = op_obj; + + // 4. virtual marker + result["virtual"] = is_virtual; + + return fc::variant( result ); +} + +void ingest_plugin_impl::send_operation_json( const fc::variant& json ) +{ + // Serialize JSON + std::string json_str = fc::json::to_string( json ); + + if( _dry_run ) + { + // Dry run mode: write to file instead of sending HTTP + write_operation_json( json_str ); + } + else + { + // Normal mode: send via HTTP + // Check queue size + { + std::lock_guard< std::mutex > lock( _queue_mutex ); + if( _send_queue.size() >= _max_queue_size ) + { + wlog( "Ingest queue full, dropping operation" ); + return; // Drop if queue is full (avoid blocking steemd) + } + _send_queue.push( json_str ); + } + _queue_cv.notify_one(); + } +} + +void ingest_plugin_impl::http_send_worker() +{ + while( !_shutdown ) + { + std::string json_str; + + // Get data from queue + { + std::unique_lock< std::mutex > lock( _queue_mutex ); + _queue_cv.wait( lock, [this] { return !_send_queue.empty() || _shutdown; } ); + + if( _shutdown && _send_queue.empty() ) break; + + json_str = _send_queue.front(); + _send_queue.pop(); + } + + // Send HTTP POST + try + { + send_http_post( json_str ); + } + catch( const std::exception& e ) + { + elog( "HTTP send error: ${e}", ("e", e.what()) ); + // Don't retry, avoid blocking + } + } +} + +void ingest_plugin_impl::send_http_post( const std::string& json_body ) +{ + // Parse URL (simplified, assume format: http://host:port/path) + std::string url = _ingest_endpoint; + std::string protocol = "http"; + std::string host; + std::string port = "8080"; + std::string target = "/ingest/applied_op"; + + // Simple URL parsing + if( url.find( "http://" ) == 0 ) + { + url = url.substr( 7 ); + } + else if( url.find( "https://" ) == 0 ) + { + protocol = "https"; + url = url.substr( 8 ); + } + + size_t port_pos = url.find( ':' ); + size_t path_pos = url.find( '/' ); + + if( path_pos != std::string::npos ) + { + target = url.substr( path_pos ); + url = url.substr( 0, path_pos ); + } + + if( port_pos != std::string::npos ) + { + host = url.substr( 0, port_pos ); + port = url.substr( port_pos + 1 ); + } + else + { + host = url; + } + + // Create I/O context + net::io_context ioc; + tcp::resolver resolver( ioc ); + tcp::socket socket( ioc ); + + // Resolve address + auto const results = resolver.resolve( host, port ); + net::connect( socket, results.begin(), results.end() ); + + // Build HTTP request + http::request< http::string_body > req; + req.method( http::verb::post ); + req.target( target ); + req.set( http::field::host, host + ":" + port ); + req.set( http::field::content_type, "application/json" ); + req.set( http::field::user_agent, "steemd-ingest-plugin/1.0" ); + req.body() = json_body; + req.prepare_payload(); + + // Send request + http::write( socket, req ); + + // Read response (simplified handling) + beast::flat_buffer buffer; + http::response< http::string_body > res; + http::read( socket, buffer, res ); + + // Check status code + if( res.result() != http::status::ok ) + { + elog( "HTTP error: ${code}", ("code", res.result_int()) ); + } + + // Close connection + beast::error_code ec; + socket.shutdown( tcp::socket::shutdown_both, ec ); +} + +void ingest_plugin_impl::initialize_output_file() +{ + // Skip if already initialized + if( _output_file.is_open() ) + { + return; + } + + try + { + // Get data directory + bfs::path data_dir = appbase::app().data_dir(); + bfs::path ingest_dir = data_dir / "ingest"; + + // Create ingest directory if it doesn't exist + if( !bfs::exists( ingest_dir ) ) + { + bfs::create_directories( ingest_dir ); + ilog( "Created ingest directory: ${dir}", ("dir", ingest_dir.string()) ); + } + + // Generate unique filename with timestamp + auto now = std::chrono::system_clock::now(); + auto time_t = std::chrono::system_clock::to_time_t( now ); + auto ms = std::chrono::duration_cast< std::chrono::milliseconds >( + now.time_since_epoch() ) % 1000; + + std::stringstream ss; + ss << "ingest_" + << std::put_time( std::localtime( &time_t ), "%Y%m%d_%H%M%S" ) + << "_" << std::setfill( '0' ) << std::setw( 3 ) << ms.count() + << ".jsonl"; // JSON Lines format (one JSON object per line) + + _output_file_path = ingest_dir / ss.str(); + + // Open file for writing + _output_file.open( _output_file_path.string(), std::ios::out | std::ios::app ); + if( !_output_file.is_open() ) + { + FC_THROW( "Failed to open output file: ${file}", ("file", _output_file_path.string()) ); + } + + ilog( "Dry run mode: writing to file ${file}", ("file", _output_file_path.string()) ); + } + FC_CAPTURE_AND_RETHROW() +} + +void ingest_plugin_impl::write_operation_json( const std::string& json_str ) +{ + std::lock_guard< std::mutex > lock( _file_mutex ); + + // Lazy initialization: open file if not already open + if( !_output_file.is_open() ) + { + try + { + initialize_output_file(); + } + catch( const fc::exception& e ) + { + elog( "Failed to initialize output file: ${e}", ("e", e.to_string()) ); + return; + } + catch( const std::exception& e ) + { + elog( "Failed to initialize output file: ${e}", ("e", e.what()) ); + return; + } + } + + // Write JSON line (JSON Lines format) + _output_file << json_str << "\n"; + _output_file.flush(); // Ensure data is written immediately +} + +} // detail + +ingest_plugin::ingest_plugin() {} +ingest_plugin::~ingest_plugin() {} + +void ingest_plugin::set_program_options( + boost::program_options::options_description& cli, + boost::program_options::options_description& cfg +) +{ + cfg.add_options() + ( "ingest-endpoint", + boost::program_options::value< std::string >()->default_value( "http://localhost:8080/ingest/applied_op" ), + "Ingest service HTTP endpoint" ) + ( "ingest-http-timeout", + boost::program_options::value< uint32_t >()->default_value( 5000 ), + "HTTP request timeout in milliseconds" ) + ( "ingest-queue-size", + boost::program_options::value< uint32_t >()->default_value( 100000 ), + "Maximum queue size for pending operations" ) + ( "ingest-dry-run", + boost::program_options::value< bool >()->default_value( false ), + "Dry run mode: write operations to file instead of sending HTTP" ) + ; +} + +void ingest_plugin::plugin_initialize( const boost::program_options::variables_map& options ) +{ + try + { + ilog( "Initializing ingest plugin" ); + + my = std::make_unique< detail::ingest_plugin_impl >( *this ); + + // Read configuration + if( options.count( "ingest-endpoint" ) ) + my->_ingest_endpoint = options["ingest-endpoint"].as< std::string >(); + + if( options.count( "ingest-http-timeout" ) ) + my->_http_timeout_ms = options["ingest-http-timeout"].as< uint32_t >(); + + if( options.count( "ingest-queue-size" ) ) + my->_max_queue_size = options["ingest-queue-size"].as< uint32_t >(); + + if( options.count( "ingest-dry-run" ) ) + my->_dry_run = options["ingest-dry-run"].as< bool >(); + + // Register signal handler + database& db = appbase::app().get_plugin< chain::chain_plugin >().db(); + my->_post_apply_operation_conn = db.add_post_apply_operation_handler( + [&]( const operation_notification& note ) { + try { my->on_post_apply_operation( note ); } FC_LOG_AND_RETHROW() + }, + *this, + 0 // group + ); + + appbase::app().get_plugin< chain::chain_plugin >().report_state_options( name(), fc::variant_object() ); + + if( my->_dry_run ) + { + ilog( "Ingest plugin initialized in DRY RUN mode" ); + } + else + { + ilog( "Ingest plugin initialized, endpoint: ${ep}", ("ep", my->_ingest_endpoint) ); + } + } + FC_CAPTURE_AND_RETHROW() +} + +void ingest_plugin::plugin_startup() +{ + try + { + ilog( "Starting ingest plugin" ); + + if( my->_dry_run ) + { + // Dry run mode: initialize output file + my->initialize_output_file(); + ilog( "Ingest plugin started in DRY RUN mode" ); + } + else + { + // Normal mode: start HTTP worker thread + my->_shutdown = false; + my->_http_thread = std::thread( [this]() { + my->http_send_worker(); + } ); + ilog( "Ingest plugin started" ); + } + } + FC_CAPTURE_AND_RETHROW() +} + +void ingest_plugin::plugin_shutdown() +{ + try + { + ilog( "Shutting down ingest plugin" ); + + if( !my->_dry_run ) + { + // Normal mode: stop HTTP worker thread + { + std::lock_guard< std::mutex > lock( my->_queue_mutex ); + my->_shutdown = true; + } + my->_queue_cv.notify_one(); + + if( my->_http_thread.joinable() ) + { + my->_http_thread.join(); + } + } + else + { + // Dry run mode: close output file + if( my->_output_file.is_open() ) + { + my->_output_file.close(); + ilog( "Closed output file: ${file}", ("file", my->_output_file_path.string()) ); + } + } + + // Disconnect signal + util::disconnect_signal( my->_post_apply_operation_conn ); + + ilog( "Ingest plugin shut down" ); + } + FC_CAPTURE_AND_RETHROW() +} + +} } } // steem::plugins::ingest diff --git a/libraries/plugins/ingest/plugin.json b/libraries/plugins/ingest/plugin.json new file mode 100644 index 0000000000..828b1a2432 --- /dev/null +++ b/libraries/plugins/ingest/plugin.json @@ -0,0 +1,5 @@ +{ + "plugin_name": "ingest", + "plugin_namespace": "ingest", + "plugin_project": "ingest_plugin" +} diff --git a/libraries/protocol/operations.cpp b/libraries/protocol/operations.cpp index d8eaa19c06..c9add89f42 100644 --- a/libraries/protocol/operations.cpp +++ b/libraries/protocol/operations.cpp @@ -2,6 +2,34 @@ #include +namespace fc +{ + std::string name_from_type( const std::string& type_name ) + { + // Extract operation name from full type name + // Example: "steem::protocol::transfer_operation" -> "transfer" + const std::string prefix = "steem::protocol::"; + const std::string suffix = "_operation"; + + if( type_name.size() >= prefix.size() + suffix.size() && + type_name.substr( 0, prefix.size() ) == prefix && + type_name.substr( type_name.size() - suffix.size(), suffix.size() ) == suffix ) + { + return type_name.substr( prefix.size(), type_name.size() - prefix.size() - suffix.size() ); + } + + // Fallback: use trim_typename_namespace + auto start = type_name.find_last_of( ':' ); + start = ( start == std::string::npos ) ? 0 : start + 1; + auto end = type_name.find_last_of( '_' ); + if( end != std::string::npos && end > start ) + { + return type_name.substr( start, end - start ); + } + return type_name.substr( start ); + } +} + namespace steem { namespace protocol { struct is_market_op_visitor { From ac7bbe16483967708154a2379d5d4ce53365203e Mon Sep 17 00:00:00 2001 From: ety001 Date: Thu, 15 Jan 2026 15:45:42 +0800 Subject: [PATCH 2/3] feat: Implement retry mechanism, batch sending, and block-only records for ingest plugin Client-side improvements: - Add automatic retry mechanism with 3-second delay (max 5 attempts) - Implement batch sending support (configurable batch size and timeout) - Add connection pooling for better performance - Implement block-only records for blocks without operations - Add post_apply_block signal handler to detect empty blocks - Implement blocking queue to protect API endpoint from overload - Add http_retry_worker thread for asynchronous retry processing - Add send_http_batch method for batch endpoint support - Add get_or_create_connection for connection pool management - Add parse_endpoint_url for URL parsing - Update configuration options: --ingest-batch-size, --ingest-batch-timeout - Update README.md documentation with new features - Update plugin initialization to start HTTP worker early These changes ensure reliable data delivery with automatic retries and complete block coverage in MongoDB. Client will block replay when queue is full to prevent API overload. Failed HTTP requests are automatically retried with exponential backoff. --- libraries/plugins/ingest/ingest_plugin.cpp | 754 +++++++++++++++++++-- 1 file changed, 692 insertions(+), 62 deletions(-) diff --git a/libraries/plugins/ingest/ingest_plugin.cpp b/libraries/plugins/ingest/ingest_plugin.cpp index 8cae2dad7f..c4fc39ec29 100644 --- a/libraries/plugins/ingest/ingest_plugin.cpp +++ b/libraries/plugins/ingest/ingest_plugin.cpp @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -29,6 +30,7 @@ #include #include #include +#include #include #include @@ -51,11 +53,19 @@ class ingest_plugin_impl ~ingest_plugin_impl(); void on_post_apply_operation( const operation_notification& note ); + void on_post_apply_block( const block_notification& note ); // HTTP sending void send_operation_json( const fc::variant& json ); void http_send_worker(); + void http_retry_worker(); void send_http_post( const std::string& json_body ); + void send_http_batch( const std::vector< std::string >& batch ); + void parse_endpoint_url( std::string& host, std::string& port, std::string& target ); + + // Connection pool for HTTP connections + struct http_connection; + http_connection* get_or_create_connection(); // File writing (for dry run) void write_operation_json( const std::string& json_str ); @@ -63,17 +73,25 @@ class ingest_plugin_impl // JSON building fc::variant build_operation_json( const operation_notification& note ); + fc::variant build_block_only_json( const block_notification& note ); ingest_plugin& _plugin; database& _db; - // Signal connection + // Signal connections boost::signals2::connection _post_apply_operation_conn; + boost::signals2::connection _post_apply_block_conn; + + // Track blocks that have operations + std::set< uint32_t > _blocks_with_ops; + std::mutex _blocks_mutex; // HTTP configuration std::string _ingest_endpoint; uint32_t _http_timeout_ms; uint32_t _max_queue_size; + uint32_t _batch_size; // Batch size for sending multiple operations + uint32_t _batch_timeout_ms; // Max wait time before sending a batch // Dry run configuration bool _dry_run; @@ -84,9 +102,69 @@ class ingest_plugin_impl // Async send queue std::queue< std::string > _send_queue; std::mutex _queue_mutex; - std::condition_variable _queue_cv; + std::condition_variable _queue_cv; // Notify worker thread when items available + std::condition_variable _space_cv; // Notify replay thread when space available std::thread _http_thread; + std::thread _retry_thread; bool _shutdown; + + // Retry mechanism + struct retry_item + { + std::vector< std::string > batch; + uint32_t retry_count; + std::chrono::steady_clock::time_point retry_time; + }; + std::queue< retry_item > _retry_queue; + std::mutex _retry_mutex; + std::condition_variable _retry_cv; + static constexpr uint32_t MAX_RETRY_COUNT = 5; // Maximum retry attempts + static constexpr uint32_t RETRY_DELAY_MS = 3000; // 3 seconds delay between retries + + // Connection pool for HTTP connections + struct http_connection + { + net::io_context ioc; + tcp::resolver resolver; + tcp::socket socket; + std::string host; + std::string port; + std::string target; + bool connected; + + http_connection( const std::string& h, const std::string& p, const std::string& t ) : + resolver( ioc ), socket( ioc ), host( h ), port( p ), target( t ), connected( false ) + {} + + void connect() + { + if( !connected ) + { + auto const results = resolver.resolve( host, port ); + net::connect( socket, results.begin(), results.end() ); + connected = true; + } + } + + void disconnect() + { + if( connected ) + { + beast::error_code ec; + socket.shutdown( tcp::socket::shutdown_both, ec ); + socket.close( ec ); + connected = false; + } + } + + ~http_connection() + { + disconnect(); + } + }; + + std::unique_ptr< http_connection > _http_conn; + std::mutex _conn_mutex; }; ingest_plugin_impl::ingest_plugin_impl( ingest_plugin& _plugin ) : @@ -95,6 +173,8 @@ ingest_plugin_impl::ingest_plugin_impl( ingest_plugin& _plugin ) : _ingest_endpoint( "http://localhost:8080/ingest/applied_op" ), _http_timeout_ms( 5000 ), _max_queue_size( 100000 ), + _batch_size( 100 ), // Default: batch 100 operations + _batch_timeout_ms( 100 ), // Default: wait max 100ms before sending _dry_run( false ), _shutdown( false ) { @@ -102,6 +182,18 @@ ingest_plugin_impl::ingest_plugin_impl( ingest_plugin& _plugin ) : ingest_plugin_impl::~ingest_plugin_impl() { + // Stop retry thread + if( _retry_thread.joinable() ) + { + { + std::lock_guard< std::mutex > lock( _retry_mutex ); + _shutdown = true; + } + _retry_cv.notify_one(); + _retry_thread.join(); + } + + // Stop HTTP thread if( _http_thread.joinable() ) { { @@ -123,6 +215,12 @@ void ingest_plugin_impl::on_post_apply_operation( const operation_notification& { try { + // Mark this block as having operations + { + std::lock_guard< std::mutex > lock( _blocks_mutex ); + _blocks_with_ops.insert( note.block ); + } + // Build Operation JSON fc::variant json = build_operation_json( note ); @@ -139,6 +237,38 @@ void ingest_plugin_impl::on_post_apply_operation( const operation_notification& } } +void ingest_plugin_impl::on_post_apply_block( const block_notification& note ) +{ + try + { + uint32_t block_num = note.block.block_num(); + + // Check if this block has any operations + bool has_ops = false; + { + std::lock_guard< std::mutex > lock( _blocks_mutex ); + has_ops = ( _blocks_with_ops.find( block_num ) != _blocks_with_ops.end() ); + // Remove from set to free memory (we only need to track until block is complete) + _blocks_with_ops.erase( block_num ); + } + + // If block has no operations, send block-only record + if( !has_ops ) + { + fc::variant json = build_block_only_json( note ); + send_operation_json( json ); + } + } + catch( const fc::exception& e ) + { + elog( "Error processing block: ${e}", ("e", e.to_string()) ); + } + catch( const std::exception& e ) + { + elog( "Error processing block: ${e}", ("e", e.what()) ); + } +} + fc::variant ingest_plugin_impl::build_operation_json( const operation_notification& note ) { fc::mutable_variant_object result; @@ -237,6 +367,42 @@ fc::variant ingest_plugin_impl::build_operation_json( const operation_notificati return fc::variant( result ); } +fc::variant ingest_plugin_impl::build_block_only_json( const block_notification& note ) +{ + fc::mutable_variant_object result; + + const signed_block& block = note.block; + uint32_t block_num = block.block_num(); + + // 1. block object + fc::mutable_variant_object block_obj; + block_obj["num"] = block_num; + block_obj["id"] = block.id().str(); + block_obj["timestamp"] = block.timestamp.to_iso_string(); + result["block"] = block_obj; + + // 2. transaction object (null for block-only) + fc::mutable_variant_object trx_obj; + trx_obj["id"] = fc::variant(); + trx_obj["index"] = -1; + result["transaction"] = trx_obj; + + // 3. operation object (null for block-only) + fc::mutable_variant_object op_obj; + op_obj["index"] = -1; + op_obj["type"] = ""; + op_obj["value"] = fc::variant(); + result["operation"] = op_obj; + + // 4. virtual marker (false for block-only) + result["virtual"] = false; + + // 5. block_only marker (indicates this is a block-only record) + result["block_only"] = true; + + return fc::variant( result ); +} + void ingest_plugin_impl::send_operation_json( const fc::variant& json ) { // Serialize JSON @@ -250,17 +416,40 @@ void ingest_plugin_impl::send_operation_json( const fc::variant& json ) else { // Normal mode: send via HTTP - // Check queue size + // Block until queue has space (protect API endpoint from overload) { - std::lock_guard< std::mutex > lock( _queue_mutex ); + std::unique_lock< std::mutex > lock( _queue_mutex ); + bool waited = false; if( _send_queue.size() >= _max_queue_size ) { - wlog( "Ingest queue full, dropping operation" ); - return; // Drop if queue is full (avoid blocking steemd) + waited = true; + wlog( "Ingest queue full, waiting for space" ); + } + + // Wait until queue has space + _space_cv.wait( lock, [this] { + return _send_queue.size() < _max_queue_size || _shutdown; + } ); + + if( _shutdown ) + { + return; // Plugin shutting down, skip + } + + if( waited ) + { + ilog( "Ingest queue has space, resuming enqueue" ); } + _send_queue.push( json_str ); + + // Minimal enqueue debug: log first item and periodic milestones + if( _send_queue.size() == 1 || ( _send_queue.size() % 10000 == 0 ) ) + { + ilog( "Ingest queue size: ${size}", ("size", _send_queue.size()) ); + } } - _queue_cv.notify_one(); + _queue_cv.notify_one(); // Notify worker thread that new item is available } } @@ -268,40 +457,217 @@ void ingest_plugin_impl::http_send_worker() { while( !_shutdown ) { - std::string json_str; + std::vector< std::string > batch; - // Get data from queue + // Collect batch of operations { std::unique_lock< std::mutex > lock( _queue_mutex ); - _queue_cv.wait( lock, [this] { return !_send_queue.empty() || _shutdown; } ); + + // Wait for at least one item or timeout + if( _send_queue.empty() ) + { + _queue_cv.wait_for( lock, std::chrono::milliseconds( _batch_timeout_ms ), + [this] { return !_send_queue.empty() || _shutdown; } ); + } if( _shutdown && _send_queue.empty() ) break; - json_str = _send_queue.front(); - _send_queue.pop(); + // Collect up to _batch_size items, or all available if less + // If batch_size is 1, disable batching (send immediately) + uint32_t effective_batch_size = ( _batch_size == 1 ) ? 1 : _batch_size; + auto start_time = std::chrono::steady_clock::now(); + size_t queue_size_before = _send_queue.size(); + + while( batch.size() < effective_batch_size && !_send_queue.empty() ) + { + batch.push_back( _send_queue.front() ); + _send_queue.pop(); + + // If we have some items but not a full batch, wait a bit more + // Skip batching logic if batch_size is 1 (immediate send) + if( effective_batch_size > 1 && batch.size() < effective_batch_size && !_send_queue.empty() ) + { + auto elapsed = std::chrono::steady_clock::now() - start_time; + auto elapsed_ms = std::chrono::duration_cast< std::chrono::milliseconds >( elapsed ).count(); + + if( elapsed_ms < _batch_timeout_ms ) + { + // Wait for more items or timeout + _queue_cv.wait_for( lock, std::chrono::milliseconds( _batch_timeout_ms - elapsed_ms ), + [this, effective_batch_size] { return _send_queue.size() >= effective_batch_size || _shutdown; } ); + } + else + { + // Timeout reached, send what we have + break; + } + } + else if( effective_batch_size == 1 ) + { + // Immediate send, no batching + break; + } + } + + // Notify waiting threads if queue now has space + // Only notify if we've freed up significant space (to avoid spurious wakeups) + size_t queue_size_after = _send_queue.size(); + if( queue_size_before >= _max_queue_size && queue_size_after < _max_queue_size ) + { + _space_cv.notify_all(); // Notify all waiting threads that space is available + } } - // Send HTTP POST - try + // Send batch (outside lock to avoid blocking queue operations) + if( !batch.empty() ) { - send_http_post( json_str ); + try + { + ilog( "Ingest worker sending batch: ${count}", ("count", batch.size()) ); + if( batch.size() == 1 ) + { + // Single item: use old endpoint for backward compatibility + send_http_post( batch[0] ); + } + else + { + // Multiple items: send as batch + send_http_batch( batch ); + } + + // After successful send, notify waiting threads if queue has space + { + std::lock_guard< std::mutex > lock( _queue_mutex ); + if( _send_queue.size() < _max_queue_size ) + { + _space_cv.notify_all(); // Notify replay thread that space is available + } + } + } + catch( const std::exception& e ) + { + elog( "HTTP send error: ${e}, will retry", ("e", e.what()) ); + + // Add to retry queue instead of dropping + { + std::lock_guard< std::mutex > retry_lock( _retry_mutex ); + retry_item item; + item.batch = batch; + item.retry_count = 0; + item.retry_time = std::chrono::steady_clock::now() + std::chrono::milliseconds( RETRY_DELAY_MS ); + _retry_queue.push( item ); + } + _retry_cv.notify_one(); + + // Even on error, notify waiting threads to avoid deadlock + { + std::lock_guard< std::mutex > lock( _queue_mutex ); + _space_cv.notify_all(); + } + } } - catch( const std::exception& e ) + } +} + +void ingest_plugin_impl::http_retry_worker() +{ + while( !_shutdown ) + { + std::vector< retry_item > items_to_retry; + + // Collect items ready for retry + { + std::unique_lock< std::mutex > lock( _retry_mutex ); + + auto now = std::chrono::steady_clock::now(); + + // Wait for items or timeout + if( _retry_queue.empty() ) + { + _retry_cv.wait_for( lock, std::chrono::milliseconds( 1000 ), + [this] { return !_retry_queue.empty() || _shutdown; } ); + } + + if( _shutdown && _retry_queue.empty() ) break; + + // Collect items that are ready for retry + while( !_retry_queue.empty() ) + { + auto& item = _retry_queue.front(); + if( now >= item.retry_time ) + { + items_to_retry.push_back( item ); + _retry_queue.pop(); + } + else + { + // Wait for the earliest retry time + auto wait_time = std::chrono::duration_cast< std::chrono::milliseconds >( + item.retry_time - now ).count(); + if( wait_time > 0 ) + { + _retry_cv.wait_for( lock, std::chrono::milliseconds( wait_time ), + [this] { return _shutdown; } ); + } + break; + } + } + } + + // Retry sending + for( auto& item : items_to_retry ) { - elog( "HTTP send error: ${e}", ("e", e.what()) ); - // Don't retry, avoid blocking + if( _shutdown ) break; + + try + { + ilog( "Retrying batch (attempt ${attempt}/${max}): ${count} items", + ("attempt", item.retry_count + 1)("max", MAX_RETRY_COUNT)("count", item.batch.size()) ); + + if( item.batch.size() == 1 ) + { + send_http_post( item.batch[0] ); + } + else + { + send_http_batch( item.batch ); + } + + // Success: batch sent successfully + ilog( "Retry successful for batch of ${count} items", ("count", item.batch.size()) ); + } + catch( const std::exception& e ) + { + elog( "Retry failed: ${e}", ("e", e.what()) ); + + // Increment retry count + item.retry_count++; + + if( item.retry_count < MAX_RETRY_COUNT ) + { + // Schedule next retry + item.retry_time = std::chrono::steady_clock::now() + std::chrono::milliseconds( RETRY_DELAY_MS ); + + { + std::lock_guard< std::mutex > lock( _retry_mutex ); + _retry_queue.push( item ); + } + _retry_cv.notify_one(); + } + else + { + // Max retries reached, log error and drop + elog( "Max retries (${max}) reached for batch of ${count} items, dropping", + ("max", MAX_RETRY_COUNT)("count", item.batch.size()) ); + } + } } } } -void ingest_plugin_impl::send_http_post( const std::string& json_body ) +void ingest_plugin_impl::parse_endpoint_url( std::string& host, std::string& port, std::string& target ) { - // Parse URL (simplified, assume format: http://host:port/path) std::string url = _ingest_endpoint; - std::string protocol = "http"; - std::string host; - std::string port = "8080"; - std::string target = "/ingest/applied_op"; // Simple URL parsing if( url.find( "http://" ) == 0 ) @@ -310,7 +676,7 @@ void ingest_plugin_impl::send_http_post( const std::string& json_body ) } else if( url.find( "https://" ) == 0 ) { - protocol = "https"; + // HTTPS not supported yet, but parse anyway url = url.substr( 8 ); } @@ -322,6 +688,10 @@ void ingest_plugin_impl::send_http_post( const std::string& json_body ) target = url.substr( path_pos ); url = url.substr( 0, path_pos ); } + else + { + target = "/ingest/applied_op"; + } if( port_pos != std::string::npos ) { @@ -331,44 +701,246 @@ void ingest_plugin_impl::send_http_post( const std::string& json_body ) else { host = url; + port = "8080"; } +} + +ingest_plugin_impl::http_connection* ingest_plugin_impl::get_or_create_connection() +{ + std::lock_guard< std::mutex > lock( _conn_mutex ); - // Create I/O context - net::io_context ioc; - tcp::resolver resolver( ioc ); - tcp::socket socket( ioc ); - - // Resolve address - auto const results = resolver.resolve( host, port ); - net::connect( socket, results.begin(), results.end() ); + if( !_http_conn ) + { + std::string host, port, target; + parse_endpoint_url( host, port, target ); + _http_conn = std::make_unique< http_connection >( host, port, target ); + } - // Build HTTP request - http::request< http::string_body > req; - req.method( http::verb::post ); - req.target( target ); - req.set( http::field::host, host + ":" + port ); - req.set( http::field::content_type, "application/json" ); - req.set( http::field::user_agent, "steemd-ingest-plugin/1.0" ); - req.body() = json_body; - req.prepare_payload(); + // Reconnect if needed + if( !_http_conn->connected ) + { + try + { + _http_conn->connect(); + } + catch( const std::exception& e ) + { + elog( "Failed to connect: ${e}", ("e", e.what()) ); + _http_conn.reset(); + return nullptr; + } + } - // Send request - http::write( socket, req ); + return _http_conn.get(); +} + +void ingest_plugin_impl::send_http_post( const std::string& json_body ) +{ + auto* conn = get_or_create_connection(); + if( !conn ) + { + // Fallback: create temporary connection + std::string host, port, target; + parse_endpoint_url( host, port, target ); + ilog( "Ingest HTTP POST (fallback) -> ${host}:${port}${target} (bytes: ${bytes})", + ("host", host)("port", port)("target", target)("bytes", json_body.size()) ); + + net::io_context ioc; + tcp::resolver resolver( ioc ); + tcp::socket socket( ioc ); + + auto const results = resolver.resolve( host, port ); + net::connect( socket, results.begin(), results.end() ); + + http::request< http::string_body > req; + req.method( http::verb::post ); + req.target( target ); + req.set( http::field::host, host + ":" + port ); + req.set( http::field::content_type, "application/json" ); + req.set( http::field::user_agent, "steemd-ingest-plugin/1.0" ); + req.set( http::field::connection, "keep-alive" ); + req.body() = json_body; + req.prepare_payload(); + + http::write( socket, req ); + + beast::flat_buffer buffer; + http::response< http::string_body > res; + http::read( socket, buffer, res ); + + if( res.result() != http::status::ok ) + { + elog( "HTTP error: ${code}", ("code", res.result_int()) ); + } + else + { + ilog( "Ingest HTTP POST (fallback) success: ${code}", ("code", res.result_int()) ); + } + + beast::error_code ec; + socket.shutdown( tcp::socket::shutdown_both, ec ); + return; + } - // Read response (simplified handling) - beast::flat_buffer buffer; - http::response< http::string_body > res; - http::read( socket, buffer, res ); + // Use connection pool + try + { + ilog( "Ingest HTTP POST -> ${host}:${port}${target} (bytes: ${bytes})", + ("host", conn->host)("port", conn->port)("target", conn->target)("bytes", json_body.size()) ); + http::request< http::string_body > req; + req.method( http::verb::post ); + req.target( conn->target ); + req.set( http::field::host, conn->host + ":" + conn->port ); + req.set( http::field::content_type, "application/json" ); + req.set( http::field::user_agent, "steemd-ingest-plugin/1.0" ); + req.set( http::field::connection, "keep-alive" ); + req.body() = json_body; + req.prepare_payload(); + + http::write( conn->socket, req ); + + beast::flat_buffer buffer; + http::response< http::string_body > res; + http::read( conn->socket, buffer, res ); + + if( res.result() != http::status::ok ) + { + elog( "HTTP error: ${code}", ("code", res.result_int()) ); + // Connection might be broken, reset it + std::lock_guard< std::mutex > lock( _conn_mutex ); + _http_conn->disconnect(); + _http_conn->connected = false; + } + else + { + ilog( "Ingest HTTP POST success: ${code}", ("code", res.result_int()) ); + } + } + catch( const std::exception& e ) + { + elog( "HTTP send error: ${e}", ("e", e.what()) ); + // Connection broken, reset it + std::lock_guard< std::mutex > lock( _conn_mutex ); + _http_conn->disconnect(); + _http_conn->connected = false; + throw; + } +} + +void ingest_plugin_impl::send_http_batch( const std::vector< std::string >& batch ) +{ + // Build batch JSON array + std::string batch_json = "["; + for( size_t i = 0; i < batch.size(); ++i ) + { + if( i > 0 ) batch_json += ","; + batch_json += batch[i]; + } + batch_json += "]"; - // Check status code - if( res.result() != http::status::ok ) + auto* conn = get_or_create_connection(); + if( !conn ) { - elog( "HTTP error: ${code}", ("code", res.result_int()) ); + // Fallback: create temporary connection + std::string host, port, target; + parse_endpoint_url( host, port, target ); + + // Use batch endpoint + if( target == "/ingest/applied_op" ) + { + target = "/ingest/applied_ops"; // Batch endpoint + } + + ilog( "Ingest HTTP BATCH (fallback) -> ${host}:${port}${target} (ops: ${count}, bytes: ${bytes})", + ("host", host)("port", port)("target", target)("count", batch.size())("bytes", batch_json.size()) ); + + net::io_context ioc; + tcp::resolver resolver( ioc ); + tcp::socket socket( ioc ); + + auto const results = resolver.resolve( host, port ); + net::connect( socket, results.begin(), results.end() ); + + http::request< http::string_body > req; + req.method( http::verb::post ); + req.target( target ); + req.set( http::field::host, host + ":" + port ); + req.set( http::field::content_type, "application/json" ); + req.set( http::field::user_agent, "steemd-ingest-plugin/1.0" ); + req.set( http::field::connection, "keep-alive" ); + req.body() = batch_json; + req.prepare_payload(); + + http::write( socket, req ); + + beast::flat_buffer buffer; + http::response< http::string_body > res; + http::read( socket, buffer, res ); + + if( res.result() != http::status::ok ) + { + elog( "HTTP batch error: ${code}", ("code", res.result_int()) ); + } + else + { + ilog( "Ingest HTTP BATCH (fallback) success: ${code}", ("code", res.result_int()) ); + } + + beast::error_code ec; + socket.shutdown( tcp::socket::shutdown_both, ec ); + return; } - // Close connection - beast::error_code ec; - socket.shutdown( tcp::socket::shutdown_both, ec ); + // Use connection pool + try + { + std::string target = conn->target; + if( target == "/ingest/applied_op" ) + { + target = "/ingest/applied_ops"; // Batch endpoint + } + + ilog( "Ingest HTTP BATCH -> ${host}:${port}${target} (ops: ${count}, bytes: ${bytes})", + ("host", conn->host)("port", conn->port)("target", target)("count", batch.size())("bytes", batch_json.size()) ); + + http::request< http::string_body > req; + req.method( http::verb::post ); + req.target( target ); + req.set( http::field::host, conn->host + ":" + conn->port ); + req.set( http::field::content_type, "application/json" ); + req.set( http::field::user_agent, "steemd-ingest-plugin/1.0" ); + req.set( http::field::connection, "keep-alive" ); + req.body() = batch_json; + req.prepare_payload(); + + http::write( conn->socket, req ); + + beast::flat_buffer buffer; + http::response< http::string_body > res; + http::read( conn->socket, buffer, res ); + + if( res.result() != http::status::ok ) + { + elog( "HTTP batch error: ${code}", ("code", res.result_int()) ); + // Connection might be broken, reset it + std::lock_guard< std::mutex > lock( _conn_mutex ); + _http_conn->disconnect(); + _http_conn->connected = false; + } + else + { + ilog( "Ingest HTTP BATCH success: ${code}", ("code", res.result_int()) ); + } + } + catch( const std::exception& e ) + { + elog( "HTTP batch send error: ${e}", ("e", e.what()) ); + // Connection broken, reset it + std::lock_guard< std::mutex > lock( _conn_mutex ); + _http_conn->disconnect(); + _http_conn->connected = false; + throw; + } } void ingest_plugin_impl::initialize_output_file() @@ -466,6 +1038,12 @@ void ingest_plugin::set_program_options( ( "ingest-queue-size", boost::program_options::value< uint32_t >()->default_value( 100000 ), "Maximum queue size for pending operations" ) + ( "ingest-batch-size", + boost::program_options::value< uint32_t >()->default_value( 100 ), + "Number of operations to batch together before sending (1 = disable batching)" ) + ( "ingest-batch-timeout", + boost::program_options::value< uint32_t >()->default_value( 100 ), + "Maximum milliseconds to wait before sending a batch (even if not full)" ) ( "ingest-dry-run", boost::program_options::value< bool >()->default_value( false ), "Dry run mode: write operations to file instead of sending HTTP" ) @@ -490,10 +1068,22 @@ void ingest_plugin::plugin_initialize( const boost::program_options::variables_m if( options.count( "ingest-queue-size" ) ) my->_max_queue_size = options["ingest-queue-size"].as< uint32_t >(); + if( options.count( "ingest-batch-size" ) ) + my->_batch_size = options["ingest-batch-size"].as< uint32_t >(); + + if( options.count( "ingest-batch-timeout" ) ) + my->_batch_timeout_ms = options["ingest-batch-timeout"].as< uint32_t >(); + + // Validate batch size + if( my->_batch_size == 0 ) + { + my->_batch_size = 1; // Disable batching by setting to 1 + } + if( options.count( "ingest-dry-run" ) ) my->_dry_run = options["ingest-dry-run"].as< bool >(); - // Register signal handler + // Register signal handlers database& db = appbase::app().get_plugin< chain::chain_plugin >().db(); my->_post_apply_operation_conn = db.add_post_apply_operation_handler( [&]( const operation_notification& note ) { @@ -502,6 +1092,13 @@ void ingest_plugin::plugin_initialize( const boost::program_options::variables_m *this, 0 // group ); + my->_post_apply_block_conn = db.add_post_apply_block_handler( + [&]( const block_notification& note ) { + try { my->on_post_apply_block( note ); } FC_LOG_AND_RETHROW() + }, + *this, + 0 // group + ); appbase::app().get_plugin< chain::chain_plugin >().report_state_options( name(), fc::variant_object() ); @@ -512,6 +1109,15 @@ void ingest_plugin::plugin_initialize( const boost::program_options::variables_m else { ilog( "Ingest plugin initialized, endpoint: ${ep}", ("ep", my->_ingest_endpoint) ); + // Start HTTP worker early to avoid blocking replay before plugin_startup + if( !my->_http_thread.joinable() ) + { + my->_shutdown = false; + my->_http_thread = std::thread( [this]() { + my->http_send_worker(); + } ); + ilog( "Ingest plugin HTTP worker started (early)" ); + } } } FC_CAPTURE_AND_RETHROW() @@ -531,11 +1137,22 @@ void ingest_plugin::plugin_startup() } else { - // Normal mode: start HTTP worker thread - my->_shutdown = false; - my->_http_thread = std::thread( [this]() { - my->http_send_worker(); - } ); + // Normal mode: start HTTP worker thread and retry thread + if( !my->_http_thread.joinable() ) + { + my->_shutdown = false; + my->_http_thread = std::thread( [this]() { + my->http_send_worker(); + } ); + ilog( "Ingest plugin HTTP worker started" ); + } + if( !my->_retry_thread.joinable() ) + { + my->_retry_thread = std::thread( [this]() { + my->http_retry_worker(); + } ); + ilog( "Ingest plugin retry worker started" ); + } ilog( "Ingest plugin started" ); } } @@ -550,7 +1167,19 @@ void ingest_plugin::plugin_shutdown() if( !my->_dry_run ) { - // Normal mode: stop HTTP worker thread + // Normal mode: stop retry thread + { + std::lock_guard< std::mutex > lock( my->_retry_mutex ); + my->_shutdown = true; + } + my->_retry_cv.notify_one(); + + if( my->_retry_thread.joinable() ) + { + my->_retry_thread.join(); + } + + // Stop HTTP worker thread { std::lock_guard< std::mutex > lock( my->_queue_mutex ); my->_shutdown = true; @@ -572,8 +1201,9 @@ void ingest_plugin::plugin_shutdown() } } - // Disconnect signal + // Disconnect signals util::disconnect_signal( my->_post_apply_operation_conn ); + util::disconnect_signal( my->_post_apply_block_conn ); ilog( "Ingest plugin shut down" ); } From 37ff0bf8bbb92da4f28514945aa5f6c0007e1d30 Mon Sep 17 00:00:00 2001 From: ety001 Date: Thu, 15 Jan 2026 16:27:32 +0800 Subject: [PATCH 3/3] build: Add curl dependency to Dockerfiles - Add curl to apt-get install for HTTP client support in ingest plugin - Update all Dockerfiles (azurelinux3.0, debian13, ubuntu20.04, ubuntu22.04, ubuntu24.04) - Required for boost::beast HTTP client functionality This ensures the ingest plugin can make HTTP requests to the external ingest service. --- deploy/Dockerfile.azurelinux3.0 | 2 +- deploy/Dockerfile.debian13 | 2 +- deploy/Dockerfile.ubuntu20.04 | 2 +- deploy/Dockerfile.ubuntu22.04 | 2 +- deploy/Dockerfile.ubuntu24.04 | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/deploy/Dockerfile.azurelinux3.0 b/deploy/Dockerfile.azurelinux3.0 index 9aca4b0842..8857097ff8 100644 --- a/deploy/Dockerfile.azurelinux3.0 +++ b/deploy/Dockerfile.azurelinux3.0 @@ -170,7 +170,7 @@ COPY --from=builder /usr/local/steemd /usr/local/steemd WORKDIR /var/steem VOLUME [ "/var/steem" ] RUN tdnf update -y && \ - tdnf install -y snappy-devel ncurses-compat readline-devel && \ + tdnf install -y snappy-devel ncurses-compat readline-devel curl && \ tdnf autoremove -y && \ tdnf clean all diff --git a/deploy/Dockerfile.debian13 b/deploy/Dockerfile.debian13 index dae2da9f01..94c0c025e4 100644 --- a/deploy/Dockerfile.debian13 +++ b/deploy/Dockerfile.debian13 @@ -137,7 +137,7 @@ COPY --from=builder /usr/local/steemd /usr/local/steemd WORKDIR /var/steem VOLUME [ "/var/steem" ] RUN apt-get update && \ - apt-get install -y libsnappy-dev libreadline-dev && \ + apt-get install -y libsnappy-dev libreadline-dev curl && \ apt-get autoremove -y && \ rm -rf /var/lib/apt/lists diff --git a/deploy/Dockerfile.ubuntu20.04 b/deploy/Dockerfile.ubuntu20.04 index e0459335b5..15fb6aef05 100644 --- a/deploy/Dockerfile.ubuntu20.04 +++ b/deploy/Dockerfile.ubuntu20.04 @@ -148,7 +148,7 @@ COPY --from=builder /usr/local/steemd /usr/local/steemd WORKDIR /var/steem VOLUME [ "/var/steem" ] RUN apt-get update && \ - apt-get install -y libsnappy-dev libreadline-dev && \ + apt-get install -y libsnappy-dev libreadline-dev curl && \ apt-get autoremove -y && \ rm -rf /var/lib/apt/lists diff --git a/deploy/Dockerfile.ubuntu22.04 b/deploy/Dockerfile.ubuntu22.04 index 8003d66ed9..f97cbb4b6e 100644 --- a/deploy/Dockerfile.ubuntu22.04 +++ b/deploy/Dockerfile.ubuntu22.04 @@ -146,7 +146,7 @@ COPY --from=builder /usr/local/steemd /usr/local/steemd WORKDIR /var/steem VOLUME [ "/var/steem" ] RUN apt-get update && \ - apt-get install -y libsnappy-dev libreadline-dev && \ + apt-get install -y libsnappy-dev libreadline-dev curl && \ apt-get autoremove -y && \ rm -rf /var/lib/apt/lists diff --git a/deploy/Dockerfile.ubuntu24.04 b/deploy/Dockerfile.ubuntu24.04 index 60b42e1cdd..b823dff9bd 100644 --- a/deploy/Dockerfile.ubuntu24.04 +++ b/deploy/Dockerfile.ubuntu24.04 @@ -137,7 +137,7 @@ COPY --from=builder /usr/local/steemd /usr/local/steemd WORKDIR /var/steem VOLUME [ "/var/steem" ] RUN apt-get update && \ - apt-get install -y libsnappy-dev libreadline-dev && \ + apt-get install -y libsnappy-dev libreadline-dev curl && \ apt-get autoremove -y && \ rm -rf /var/lib/apt/lists