diff --git a/deploy/Dockerfile.azurelinux3.0 b/deploy/Dockerfile.azurelinux3.0 index 9aca4b0842..8857097ff8 100644 --- a/deploy/Dockerfile.azurelinux3.0 +++ b/deploy/Dockerfile.azurelinux3.0 @@ -170,7 +170,7 @@ COPY --from=builder /usr/local/steemd /usr/local/steemd WORKDIR /var/steem VOLUME [ "/var/steem" ] RUN tdnf update -y && \ - tdnf install -y snappy-devel ncurses-compat readline-devel && \ + tdnf install -y snappy-devel ncurses-compat readline-devel curl && \ tdnf autoremove -y && \ tdnf clean all diff --git a/deploy/Dockerfile.debian13 b/deploy/Dockerfile.debian13 index dae2da9f01..94c0c025e4 100644 --- a/deploy/Dockerfile.debian13 +++ b/deploy/Dockerfile.debian13 @@ -137,7 +137,7 @@ COPY --from=builder /usr/local/steemd /usr/local/steemd WORKDIR /var/steem VOLUME [ "/var/steem" ] RUN apt-get update && \ - apt-get install -y libsnappy-dev libreadline-dev && \ + apt-get install -y libsnappy-dev libreadline-dev curl && \ apt-get autoremove -y && \ rm -rf /var/lib/apt/lists diff --git a/deploy/Dockerfile.ubuntu20.04 b/deploy/Dockerfile.ubuntu20.04 index e0459335b5..15fb6aef05 100644 --- a/deploy/Dockerfile.ubuntu20.04 +++ b/deploy/Dockerfile.ubuntu20.04 @@ -148,7 +148,7 @@ COPY --from=builder /usr/local/steemd /usr/local/steemd WORKDIR /var/steem VOLUME [ "/var/steem" ] RUN apt-get update && \ - apt-get install -y libsnappy-dev libreadline-dev && \ + apt-get install -y libsnappy-dev libreadline-dev curl && \ apt-get autoremove -y && \ rm -rf /var/lib/apt/lists diff --git a/deploy/Dockerfile.ubuntu22.04 b/deploy/Dockerfile.ubuntu22.04 index 8003d66ed9..f97cbb4b6e 100644 --- a/deploy/Dockerfile.ubuntu22.04 +++ b/deploy/Dockerfile.ubuntu22.04 @@ -146,7 +146,7 @@ COPY --from=builder /usr/local/steemd /usr/local/steemd WORKDIR /var/steem VOLUME [ "/var/steem" ] RUN apt-get update && \ - apt-get install -y libsnappy-dev libreadline-dev && \ + apt-get install -y libsnappy-dev libreadline-dev curl && \ apt-get autoremove -y && \ rm -rf /var/lib/apt/lists diff --git a/deploy/Dockerfile.ubuntu24.04 b/deploy/Dockerfile.ubuntu24.04 index 60b42e1cdd..b823dff9bd 100644 --- a/deploy/Dockerfile.ubuntu24.04 +++ b/deploy/Dockerfile.ubuntu24.04 @@ -137,7 +137,7 @@ COPY --from=builder /usr/local/steemd /usr/local/steemd WORKDIR /var/steem VOLUME [ "/var/steem" ] RUN apt-get update && \ - apt-get install -y libsnappy-dev libreadline-dev && \ + apt-get install -y libsnappy-dev libreadline-dev curl && \ apt-get autoremove -y && \ rm -rf /var/lib/apt/lists diff --git a/libraries/plugins/ingest/CMakeLists.txt b/libraries/plugins/ingest/CMakeLists.txt new file mode 100644 index 0000000000..39e8a8fe86 --- /dev/null +++ b/libraries/plugins/ingest/CMakeLists.txt @@ -0,0 +1,34 @@ +file(GLOB HEADERS "include/steem/plugins/ingest/*.hpp") + +add_library( ingest_plugin + ingest_plugin.cpp + ${HEADERS} + ) + +target_link_libraries( ingest_plugin + chain_plugin + steem_chain + steem_protocol + fc + ${Boost_LIBRARIES} + ${CMAKE_DL_LIBS} + ) + +target_include_directories( ingest_plugin + PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include" + ) + +if( CLANG_TIDY_EXE ) + set_target_properties( + ingest_plugin PROPERTIES + CXX_CLANG_TIDY "${DO_CLANG_TIDY}" + ) +endif( CLANG_TIDY_EXE ) + +install( TARGETS + ingest_plugin + + RUNTIME DESTINATION bin + LIBRARY DESTINATION lib + ARCHIVE DESTINATION lib +) diff --git a/libraries/plugins/ingest/include/steem/plugins/ingest/ingest_plugin.hpp b/libraries/plugins/ingest/include/steem/plugins/ingest/ingest_plugin.hpp new file mode 100644 index 0000000000..e0b7ae4efe --- /dev/null +++ b/libraries/plugins/ingest/include/steem/plugins/ingest/ingest_plugin.hpp @@ -0,0 +1,35 @@ +#pragma once +#include +#include + +#include + +namespace steem { namespace plugins { namespace ingest { + +namespace detail { class ingest_plugin_impl; } + +using namespace appbase; + +#define STEEM_INGEST_PLUGIN_NAME "ingest" + +class ingest_plugin : public appbase::plugin< ingest_plugin > +{ + public: + ingest_plugin(); + virtual ~ingest_plugin(); + + APPBASE_PLUGIN_REQUIRES( (steem::plugins::chain::chain_plugin) ) + + static const std::string& name() { static std::string name = STEEM_INGEST_PLUGIN_NAME; return name; } + + virtual void set_program_options( boost::program_options::options_description& cli, + boost::program_options::options_description& cfg ) override; + virtual void plugin_initialize( const boost::program_options::variables_map& options ) override; + virtual void plugin_startup() override; + virtual void plugin_shutdown() override; + + private: + std::unique_ptr< detail::ingest_plugin_impl > my; +}; + +} } } // steem::plugins::ingest diff --git a/libraries/plugins/ingest/ingest_plugin.cpp b/libraries/plugins/ingest/ingest_plugin.cpp new file mode 100644 index 0000000000..c4fc39ec29 --- /dev/null +++ b/libraries/plugins/ingest/ingest_plugin.cpp @@ -0,0 +1,1213 @@ + +#include + +#include + +#include +#include +#include +#include + +#include +#include + +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace steem { namespace plugins { namespace ingest { + +using namespace steem::chain; + +namespace beast = boost::beast; +namespace http = beast::http; +namespace net = boost::asio; +using tcp = boost::asio::ip::tcp; +namespace bfs = boost::filesystem; + +namespace detail { + +class ingest_plugin_impl +{ + public: + ingest_plugin_impl( ingest_plugin& _plugin ); + ~ingest_plugin_impl(); + + void on_post_apply_operation( const operation_notification& note ); + void on_post_apply_block( const block_notification& note ); + + // HTTP sending + void send_operation_json( const fc::variant& json ); + void http_send_worker(); + void http_retry_worker(); + void send_http_post( const std::string& json_body ); + void send_http_batch( const std::vector< std::string >& batch ); + void parse_endpoint_url( std::string& host, std::string& port, std::string& target ); + + // Connection pool for HTTP connections + struct http_connection; + http_connection* get_or_create_connection(); + + // File writing (for dry run) + void write_operation_json( const std::string& json_str ); + void initialize_output_file(); + + // JSON building + fc::variant build_operation_json( const operation_notification& note ); + fc::variant build_block_only_json( const block_notification& note ); + + ingest_plugin& _plugin; + database& _db; + + // Signal connections + boost::signals2::connection _post_apply_operation_conn; + boost::signals2::connection _post_apply_block_conn; + + // Track blocks that have operations + std::set< uint32_t > _blocks_with_ops; + std::mutex _blocks_mutex; + + // HTTP configuration + std::string _ingest_endpoint; + uint32_t _http_timeout_ms; + uint32_t _max_queue_size; + uint32_t _batch_size; // Batch size for sending multiple operations + uint32_t _batch_timeout_ms; // Max wait time before sending a batch + + // Dry run configuration + bool _dry_run; + bfs::path _output_file_path; + std::ofstream _output_file; + std::mutex _file_mutex; + + // Async send queue + std::queue< std::string > _send_queue; + std::mutex _queue_mutex; + std::condition_variable _queue_cv; // Notify worker thread when items available + std::condition_variable _space_cv; // Notify replay thread when space available + std::thread _http_thread; + std::thread _retry_thread; + bool _shutdown; + + // Retry mechanism + struct retry_item + { + std::vector< std::string > batch; + uint32_t retry_count; + std::chrono::steady_clock::time_point retry_time; + }; + std::queue< retry_item > _retry_queue; + std::mutex _retry_mutex; + std::condition_variable _retry_cv; + static constexpr uint32_t MAX_RETRY_COUNT = 5; // Maximum retry attempts + static constexpr uint32_t RETRY_DELAY_MS = 3000; // 3 seconds delay between retries + + // Connection pool for HTTP connections + struct http_connection + { + net::io_context ioc; + tcp::resolver resolver; + tcp::socket socket; + std::string host; + std::string port; + std::string target; + bool connected; + + http_connection( const std::string& h, const std::string& p, const std::string& t ) : + resolver( ioc ), socket( ioc ), host( h ), port( p ), target( t ), connected( false ) + {} + + void connect() + { + if( !connected ) + { + auto const results = resolver.resolve( host, port ); + net::connect( socket, results.begin(), results.end() ); + connected = true; + } + } + + void disconnect() + { + if( connected ) + { + beast::error_code ec; + socket.shutdown( tcp::socket::shutdown_both, ec ); + socket.close( ec ); + connected = false; + } + } + + ~http_connection() + { + disconnect(); + } + }; + + std::unique_ptr< http_connection > _http_conn; + std::mutex _conn_mutex; +}; + +ingest_plugin_impl::ingest_plugin_impl( ingest_plugin& _plugin ) : + _plugin( _plugin ), + _db( appbase::app().get_plugin< chain::chain_plugin >().db() ), + _ingest_endpoint( "http://localhost:8080/ingest/applied_op" ), + _http_timeout_ms( 5000 ), + _max_queue_size( 100000 ), + _batch_size( 100 ), // Default: batch 100 operations + _batch_timeout_ms( 100 ), // Default: wait max 100ms before sending + _dry_run( false ), + _shutdown( false ) +{ +} + +ingest_plugin_impl::~ingest_plugin_impl() +{ + // Stop retry thread + if( _retry_thread.joinable() ) + { + { + std::lock_guard< std::mutex > lock( _retry_mutex ); + _shutdown = true; + } + _retry_cv.notify_one(); + _retry_thread.join(); + } + + // Stop HTTP thread + if( _http_thread.joinable() ) + { + { + std::lock_guard< std::mutex > lock( _queue_mutex ); + _shutdown = true; + } + _queue_cv.notify_one(); + _http_thread.join(); + } + + // Close output file if open + if( _output_file.is_open() ) + { + _output_file.close(); + } +} + +void ingest_plugin_impl::on_post_apply_operation( const operation_notification& note ) +{ + try + { + // Mark this block as having operations + { + std::lock_guard< std::mutex > lock( _blocks_mutex ); + _blocks_with_ops.insert( note.block ); + } + + // Build Operation JSON + fc::variant json = build_operation_json( note ); + + // Async send + send_operation_json( json ); + } + catch( const fc::exception& e ) + { + elog( "Error processing operation: ${e}", ("e", e.to_string()) ); + } + catch( const std::exception& e ) + { + elog( "Error processing operation: ${e}", ("e", e.what()) ); + } +} + +void ingest_plugin_impl::on_post_apply_block( const block_notification& note ) +{ + try + { + uint32_t block_num = note.block.block_num(); + + // Check if this block has any operations + bool has_ops = false; + { + std::lock_guard< std::mutex > lock( _blocks_mutex ); + has_ops = ( _blocks_with_ops.find( block_num ) != _blocks_with_ops.end() ); + // Remove from set to free memory (we only need to track until block is complete) + _blocks_with_ops.erase( block_num ); + } + + // If block has no operations, send block-only record + if( !has_ops ) + { + fc::variant json = build_block_only_json( note ); + send_operation_json( json ); + } + } + catch( const fc::exception& e ) + { + elog( "Error processing block: ${e}", ("e", e.to_string()) ); + } + catch( const std::exception& e ) + { + elog( "Error processing block: ${e}", ("e", e.what()) ); + } +} + +fc::variant ingest_plugin_impl::build_operation_json( const operation_notification& note ) +{ + fc::mutable_variant_object result; + + // Get current block + auto block = _db.fetch_block_by_number( note.block ); + if( !block.valid() ) + { + FC_THROW( "Block ${b} not found", ("b", note.block) ); + } + + // 1. block object + fc::mutable_variant_object block_obj; + block_obj["num"] = note.block; + block_obj["id"] = block->id().str(); + block_obj["timestamp"] = block->timestamp.to_iso_string(); + result["block"] = block_obj; + + // 2. transaction object + fc::mutable_variant_object trx_obj; + + // Check if virtual operation + bool is_virtual = steem::protocol::is_virtual_operation( note.op ); + + if( is_virtual ) + { + // virtual operation: transaction info is null + trx_obj["id"] = fc::variant(); + trx_obj["index"] = -1; + } + else + { + // real operation: get transaction info from note + trx_obj["id"] = note.trx_id.str(); + trx_obj["index"] = static_cast( note.trx_in_block ); + } + result["transaction"] = trx_obj; + + // 3. operation object + fc::mutable_variant_object op_obj; + + // operation index (in transaction) + if( is_virtual ) + { + op_obj["index"] = static_cast( note.virtual_op ); + } + else + { + op_obj["index"] = static_cast( note.op_in_trx ); + } + + // operation type (use FC visitor to get name) + std::string op_name; + fc::get_operation_name get_name_visitor( op_name ); + note.op.visit( get_name_visitor ); + op_obj["type"] = op_name; + + // operation value (serialize operation to JSON) + // Use from_operation visitor to get pair, then extract value + fc::variant op_variant; + fc::from_operation from_op_visitor( op_variant ); + note.op.visit( from_op_visitor ); + + // from_operation creates variant(std::make_pair(name, value)) + // In FC, a pair is serialized as an array [name, value] + // We need to extract the value part (second element) + if( op_variant.is_array() && op_variant.size() == 2 ) + { + op_obj["value"] = op_variant[1]; // Get the value part (second element) + } + else if( op_variant.is_object() ) + { + // If it's an object, try to extract value field + auto op_obj_map = op_variant.get_object(); + if( op_obj_map.contains( "value" ) ) + { + op_obj["value"] = op_obj_map["value"]; + } + else + { + // Use the whole variant as fallback + op_obj["value"] = op_variant; + } + } + else + { + // Fallback: use the whole variant + op_obj["value"] = op_variant; + } + + result["operation"] = op_obj; + + // 4. virtual marker + result["virtual"] = is_virtual; + + return fc::variant( result ); +} + +fc::variant ingest_plugin_impl::build_block_only_json( const block_notification& note ) +{ + fc::mutable_variant_object result; + + const signed_block& block = note.block; + uint32_t block_num = block.block_num(); + + // 1. block object + fc::mutable_variant_object block_obj; + block_obj["num"] = block_num; + block_obj["id"] = block.id().str(); + block_obj["timestamp"] = block.timestamp.to_iso_string(); + result["block"] = block_obj; + + // 2. transaction object (null for block-only) + fc::mutable_variant_object trx_obj; + trx_obj["id"] = fc::variant(); + trx_obj["index"] = -1; + result["transaction"] = trx_obj; + + // 3. operation object (null for block-only) + fc::mutable_variant_object op_obj; + op_obj["index"] = -1; + op_obj["type"] = ""; + op_obj["value"] = fc::variant(); + result["operation"] = op_obj; + + // 4. virtual marker (false for block-only) + result["virtual"] = false; + + // 5. block_only marker (indicates this is a block-only record) + result["block_only"] = true; + + return fc::variant( result ); +} + +void ingest_plugin_impl::send_operation_json( const fc::variant& json ) +{ + // Serialize JSON + std::string json_str = fc::json::to_string( json ); + + if( _dry_run ) + { + // Dry run mode: write to file instead of sending HTTP + write_operation_json( json_str ); + } + else + { + // Normal mode: send via HTTP + // Block until queue has space (protect API endpoint from overload) + { + std::unique_lock< std::mutex > lock( _queue_mutex ); + bool waited = false; + if( _send_queue.size() >= _max_queue_size ) + { + waited = true; + wlog( "Ingest queue full, waiting for space" ); + } + + // Wait until queue has space + _space_cv.wait( lock, [this] { + return _send_queue.size() < _max_queue_size || _shutdown; + } ); + + if( _shutdown ) + { + return; // Plugin shutting down, skip + } + + if( waited ) + { + ilog( "Ingest queue has space, resuming enqueue" ); + } + + _send_queue.push( json_str ); + + // Minimal enqueue debug: log first item and periodic milestones + if( _send_queue.size() == 1 || ( _send_queue.size() % 10000 == 0 ) ) + { + ilog( "Ingest queue size: ${size}", ("size", _send_queue.size()) ); + } + } + _queue_cv.notify_one(); // Notify worker thread that new item is available + } +} + +void ingest_plugin_impl::http_send_worker() +{ + while( !_shutdown ) + { + std::vector< std::string > batch; + + // Collect batch of operations + { + std::unique_lock< std::mutex > lock( _queue_mutex ); + + // Wait for at least one item or timeout + if( _send_queue.empty() ) + { + _queue_cv.wait_for( lock, std::chrono::milliseconds( _batch_timeout_ms ), + [this] { return !_send_queue.empty() || _shutdown; } ); + } + + if( _shutdown && _send_queue.empty() ) break; + + // Collect up to _batch_size items, or all available if less + // If batch_size is 1, disable batching (send immediately) + uint32_t effective_batch_size = ( _batch_size == 1 ) ? 1 : _batch_size; + auto start_time = std::chrono::steady_clock::now(); + size_t queue_size_before = _send_queue.size(); + + while( batch.size() < effective_batch_size && !_send_queue.empty() ) + { + batch.push_back( _send_queue.front() ); + _send_queue.pop(); + + // If we have some items but not a full batch, wait a bit more + // Skip batching logic if batch_size is 1 (immediate send) + if( effective_batch_size > 1 && batch.size() < effective_batch_size && !_send_queue.empty() ) + { + auto elapsed = std::chrono::steady_clock::now() - start_time; + auto elapsed_ms = std::chrono::duration_cast< std::chrono::milliseconds >( elapsed ).count(); + + if( elapsed_ms < _batch_timeout_ms ) + { + // Wait for more items or timeout + _queue_cv.wait_for( lock, std::chrono::milliseconds( _batch_timeout_ms - elapsed_ms ), + [this, effective_batch_size] { return _send_queue.size() >= effective_batch_size || _shutdown; } ); + } + else + { + // Timeout reached, send what we have + break; + } + } + else if( effective_batch_size == 1 ) + { + // Immediate send, no batching + break; + } + } + + // Notify waiting threads if queue now has space + // Only notify if we've freed up significant space (to avoid spurious wakeups) + size_t queue_size_after = _send_queue.size(); + if( queue_size_before >= _max_queue_size && queue_size_after < _max_queue_size ) + { + _space_cv.notify_all(); // Notify all waiting threads that space is available + } + } + + // Send batch (outside lock to avoid blocking queue operations) + if( !batch.empty() ) + { + try + { + ilog( "Ingest worker sending batch: ${count}", ("count", batch.size()) ); + if( batch.size() == 1 ) + { + // Single item: use old endpoint for backward compatibility + send_http_post( batch[0] ); + } + else + { + // Multiple items: send as batch + send_http_batch( batch ); + } + + // After successful send, notify waiting threads if queue has space + { + std::lock_guard< std::mutex > lock( _queue_mutex ); + if( _send_queue.size() < _max_queue_size ) + { + _space_cv.notify_all(); // Notify replay thread that space is available + } + } + } + catch( const std::exception& e ) + { + elog( "HTTP send error: ${e}, will retry", ("e", e.what()) ); + + // Add to retry queue instead of dropping + { + std::lock_guard< std::mutex > retry_lock( _retry_mutex ); + retry_item item; + item.batch = batch; + item.retry_count = 0; + item.retry_time = std::chrono::steady_clock::now() + std::chrono::milliseconds( RETRY_DELAY_MS ); + _retry_queue.push( item ); + } + _retry_cv.notify_one(); + + // Even on error, notify waiting threads to avoid deadlock + { + std::lock_guard< std::mutex > lock( _queue_mutex ); + _space_cv.notify_all(); + } + } + } + } +} + +void ingest_plugin_impl::http_retry_worker() +{ + while( !_shutdown ) + { + std::vector< retry_item > items_to_retry; + + // Collect items ready for retry + { + std::unique_lock< std::mutex > lock( _retry_mutex ); + + auto now = std::chrono::steady_clock::now(); + + // Wait for items or timeout + if( _retry_queue.empty() ) + { + _retry_cv.wait_for( lock, std::chrono::milliseconds( 1000 ), + [this] { return !_retry_queue.empty() || _shutdown; } ); + } + + if( _shutdown && _retry_queue.empty() ) break; + + // Collect items that are ready for retry + while( !_retry_queue.empty() ) + { + auto& item = _retry_queue.front(); + if( now >= item.retry_time ) + { + items_to_retry.push_back( item ); + _retry_queue.pop(); + } + else + { + // Wait for the earliest retry time + auto wait_time = std::chrono::duration_cast< std::chrono::milliseconds >( + item.retry_time - now ).count(); + if( wait_time > 0 ) + { + _retry_cv.wait_for( lock, std::chrono::milliseconds( wait_time ), + [this] { return _shutdown; } ); + } + break; + } + } + } + + // Retry sending + for( auto& item : items_to_retry ) + { + if( _shutdown ) break; + + try + { + ilog( "Retrying batch (attempt ${attempt}/${max}): ${count} items", + ("attempt", item.retry_count + 1)("max", MAX_RETRY_COUNT)("count", item.batch.size()) ); + + if( item.batch.size() == 1 ) + { + send_http_post( item.batch[0] ); + } + else + { + send_http_batch( item.batch ); + } + + // Success: batch sent successfully + ilog( "Retry successful for batch of ${count} items", ("count", item.batch.size()) ); + } + catch( const std::exception& e ) + { + elog( "Retry failed: ${e}", ("e", e.what()) ); + + // Increment retry count + item.retry_count++; + + if( item.retry_count < MAX_RETRY_COUNT ) + { + // Schedule next retry + item.retry_time = std::chrono::steady_clock::now() + std::chrono::milliseconds( RETRY_DELAY_MS ); + + { + std::lock_guard< std::mutex > lock( _retry_mutex ); + _retry_queue.push( item ); + } + _retry_cv.notify_one(); + } + else + { + // Max retries reached, log error and drop + elog( "Max retries (${max}) reached for batch of ${count} items, dropping", + ("max", MAX_RETRY_COUNT)("count", item.batch.size()) ); + } + } + } + } +} + +void ingest_plugin_impl::parse_endpoint_url( std::string& host, std::string& port, std::string& target ) +{ + std::string url = _ingest_endpoint; + + // Simple URL parsing + if( url.find( "http://" ) == 0 ) + { + url = url.substr( 7 ); + } + else if( url.find( "https://" ) == 0 ) + { + // HTTPS not supported yet, but parse anyway + url = url.substr( 8 ); + } + + size_t port_pos = url.find( ':' ); + size_t path_pos = url.find( '/' ); + + if( path_pos != std::string::npos ) + { + target = url.substr( path_pos ); + url = url.substr( 0, path_pos ); + } + else + { + target = "/ingest/applied_op"; + } + + if( port_pos != std::string::npos ) + { + host = url.substr( 0, port_pos ); + port = url.substr( port_pos + 1 ); + } + else + { + host = url; + port = "8080"; + } +} + +ingest_plugin_impl::http_connection* ingest_plugin_impl::get_or_create_connection() +{ + std::lock_guard< std::mutex > lock( _conn_mutex ); + + if( !_http_conn ) + { + std::string host, port, target; + parse_endpoint_url( host, port, target ); + _http_conn = std::make_unique< http_connection >( host, port, target ); + } + + // Reconnect if needed + if( !_http_conn->connected ) + { + try + { + _http_conn->connect(); + } + catch( const std::exception& e ) + { + elog( "Failed to connect: ${e}", ("e", e.what()) ); + _http_conn.reset(); + return nullptr; + } + } + + return _http_conn.get(); +} + +void ingest_plugin_impl::send_http_post( const std::string& json_body ) +{ + auto* conn = get_or_create_connection(); + if( !conn ) + { + // Fallback: create temporary connection + std::string host, port, target; + parse_endpoint_url( host, port, target ); + ilog( "Ingest HTTP POST (fallback) -> ${host}:${port}${target} (bytes: ${bytes})", + ("host", host)("port", port)("target", target)("bytes", json_body.size()) ); + + net::io_context ioc; + tcp::resolver resolver( ioc ); + tcp::socket socket( ioc ); + + auto const results = resolver.resolve( host, port ); + net::connect( socket, results.begin(), results.end() ); + + http::request< http::string_body > req; + req.method( http::verb::post ); + req.target( target ); + req.set( http::field::host, host + ":" + port ); + req.set( http::field::content_type, "application/json" ); + req.set( http::field::user_agent, "steemd-ingest-plugin/1.0" ); + req.set( http::field::connection, "keep-alive" ); + req.body() = json_body; + req.prepare_payload(); + + http::write( socket, req ); + + beast::flat_buffer buffer; + http::response< http::string_body > res; + http::read( socket, buffer, res ); + + if( res.result() != http::status::ok ) + { + elog( "HTTP error: ${code}", ("code", res.result_int()) ); + } + else + { + ilog( "Ingest HTTP POST (fallback) success: ${code}", ("code", res.result_int()) ); + } + + beast::error_code ec; + socket.shutdown( tcp::socket::shutdown_both, ec ); + return; + } + + // Use connection pool + try + { + ilog( "Ingest HTTP POST -> ${host}:${port}${target} (bytes: ${bytes})", + ("host", conn->host)("port", conn->port)("target", conn->target)("bytes", json_body.size()) ); + http::request< http::string_body > req; + req.method( http::verb::post ); + req.target( conn->target ); + req.set( http::field::host, conn->host + ":" + conn->port ); + req.set( http::field::content_type, "application/json" ); + req.set( http::field::user_agent, "steemd-ingest-plugin/1.0" ); + req.set( http::field::connection, "keep-alive" ); + req.body() = json_body; + req.prepare_payload(); + + http::write( conn->socket, req ); + + beast::flat_buffer buffer; + http::response< http::string_body > res; + http::read( conn->socket, buffer, res ); + + if( res.result() != http::status::ok ) + { + elog( "HTTP error: ${code}", ("code", res.result_int()) ); + // Connection might be broken, reset it + std::lock_guard< std::mutex > lock( _conn_mutex ); + _http_conn->disconnect(); + _http_conn->connected = false; + } + else + { + ilog( "Ingest HTTP POST success: ${code}", ("code", res.result_int()) ); + } + } + catch( const std::exception& e ) + { + elog( "HTTP send error: ${e}", ("e", e.what()) ); + // Connection broken, reset it + std::lock_guard< std::mutex > lock( _conn_mutex ); + _http_conn->disconnect(); + _http_conn->connected = false; + throw; + } +} + +void ingest_plugin_impl::send_http_batch( const std::vector< std::string >& batch ) +{ + // Build batch JSON array + std::string batch_json = "["; + for( size_t i = 0; i < batch.size(); ++i ) + { + if( i > 0 ) batch_json += ","; + batch_json += batch[i]; + } + batch_json += "]"; + + auto* conn = get_or_create_connection(); + if( !conn ) + { + // Fallback: create temporary connection + std::string host, port, target; + parse_endpoint_url( host, port, target ); + + // Use batch endpoint + if( target == "/ingest/applied_op" ) + { + target = "/ingest/applied_ops"; // Batch endpoint + } + + ilog( "Ingest HTTP BATCH (fallback) -> ${host}:${port}${target} (ops: ${count}, bytes: ${bytes})", + ("host", host)("port", port)("target", target)("count", batch.size())("bytes", batch_json.size()) ); + + net::io_context ioc; + tcp::resolver resolver( ioc ); + tcp::socket socket( ioc ); + + auto const results = resolver.resolve( host, port ); + net::connect( socket, results.begin(), results.end() ); + + http::request< http::string_body > req; + req.method( http::verb::post ); + req.target( target ); + req.set( http::field::host, host + ":" + port ); + req.set( http::field::content_type, "application/json" ); + req.set( http::field::user_agent, "steemd-ingest-plugin/1.0" ); + req.set( http::field::connection, "keep-alive" ); + req.body() = batch_json; + req.prepare_payload(); + + http::write( socket, req ); + + beast::flat_buffer buffer; + http::response< http::string_body > res; + http::read( socket, buffer, res ); + + if( res.result() != http::status::ok ) + { + elog( "HTTP batch error: ${code}", ("code", res.result_int()) ); + } + else + { + ilog( "Ingest HTTP BATCH (fallback) success: ${code}", ("code", res.result_int()) ); + } + + beast::error_code ec; + socket.shutdown( tcp::socket::shutdown_both, ec ); + return; + } + + // Use connection pool + try + { + std::string target = conn->target; + if( target == "/ingest/applied_op" ) + { + target = "/ingest/applied_ops"; // Batch endpoint + } + + ilog( "Ingest HTTP BATCH -> ${host}:${port}${target} (ops: ${count}, bytes: ${bytes})", + ("host", conn->host)("port", conn->port)("target", target)("count", batch.size())("bytes", batch_json.size()) ); + + http::request< http::string_body > req; + req.method( http::verb::post ); + req.target( target ); + req.set( http::field::host, conn->host + ":" + conn->port ); + req.set( http::field::content_type, "application/json" ); + req.set( http::field::user_agent, "steemd-ingest-plugin/1.0" ); + req.set( http::field::connection, "keep-alive" ); + req.body() = batch_json; + req.prepare_payload(); + + http::write( conn->socket, req ); + + beast::flat_buffer buffer; + http::response< http::string_body > res; + http::read( conn->socket, buffer, res ); + + if( res.result() != http::status::ok ) + { + elog( "HTTP batch error: ${code}", ("code", res.result_int()) ); + // Connection might be broken, reset it + std::lock_guard< std::mutex > lock( _conn_mutex ); + _http_conn->disconnect(); + _http_conn->connected = false; + } + else + { + ilog( "Ingest HTTP BATCH success: ${code}", ("code", res.result_int()) ); + } + } + catch( const std::exception& e ) + { + elog( "HTTP batch send error: ${e}", ("e", e.what()) ); + // Connection broken, reset it + std::lock_guard< std::mutex > lock( _conn_mutex ); + _http_conn->disconnect(); + _http_conn->connected = false; + throw; + } +} + +void ingest_plugin_impl::initialize_output_file() +{ + // Skip if already initialized + if( _output_file.is_open() ) + { + return; + } + + try + { + // Get data directory + bfs::path data_dir = appbase::app().data_dir(); + bfs::path ingest_dir = data_dir / "ingest"; + + // Create ingest directory if it doesn't exist + if( !bfs::exists( ingest_dir ) ) + { + bfs::create_directories( ingest_dir ); + ilog( "Created ingest directory: ${dir}", ("dir", ingest_dir.string()) ); + } + + // Generate unique filename with timestamp + auto now = std::chrono::system_clock::now(); + auto time_t = std::chrono::system_clock::to_time_t( now ); + auto ms = std::chrono::duration_cast< std::chrono::milliseconds >( + now.time_since_epoch() ) % 1000; + + std::stringstream ss; + ss << "ingest_" + << std::put_time( std::localtime( &time_t ), "%Y%m%d_%H%M%S" ) + << "_" << std::setfill( '0' ) << std::setw( 3 ) << ms.count() + << ".jsonl"; // JSON Lines format (one JSON object per line) + + _output_file_path = ingest_dir / ss.str(); + + // Open file for writing + _output_file.open( _output_file_path.string(), std::ios::out | std::ios::app ); + if( !_output_file.is_open() ) + { + FC_THROW( "Failed to open output file: ${file}", ("file", _output_file_path.string()) ); + } + + ilog( "Dry run mode: writing to file ${file}", ("file", _output_file_path.string()) ); + } + FC_CAPTURE_AND_RETHROW() +} + +void ingest_plugin_impl::write_operation_json( const std::string& json_str ) +{ + std::lock_guard< std::mutex > lock( _file_mutex ); + + // Lazy initialization: open file if not already open + if( !_output_file.is_open() ) + { + try + { + initialize_output_file(); + } + catch( const fc::exception& e ) + { + elog( "Failed to initialize output file: ${e}", ("e", e.to_string()) ); + return; + } + catch( const std::exception& e ) + { + elog( "Failed to initialize output file: ${e}", ("e", e.what()) ); + return; + } + } + + // Write JSON line (JSON Lines format) + _output_file << json_str << "\n"; + _output_file.flush(); // Ensure data is written immediately +} + +} // detail + +ingest_plugin::ingest_plugin() {} +ingest_plugin::~ingest_plugin() {} + +void ingest_plugin::set_program_options( + boost::program_options::options_description& cli, + boost::program_options::options_description& cfg +) +{ + cfg.add_options() + ( "ingest-endpoint", + boost::program_options::value< std::string >()->default_value( "http://localhost:8080/ingest/applied_op" ), + "Ingest service HTTP endpoint" ) + ( "ingest-http-timeout", + boost::program_options::value< uint32_t >()->default_value( 5000 ), + "HTTP request timeout in milliseconds" ) + ( "ingest-queue-size", + boost::program_options::value< uint32_t >()->default_value( 100000 ), + "Maximum queue size for pending operations" ) + ( "ingest-batch-size", + boost::program_options::value< uint32_t >()->default_value( 100 ), + "Number of operations to batch together before sending (1 = disable batching)" ) + ( "ingest-batch-timeout", + boost::program_options::value< uint32_t >()->default_value( 100 ), + "Maximum milliseconds to wait before sending a batch (even if not full)" ) + ( "ingest-dry-run", + boost::program_options::value< bool >()->default_value( false ), + "Dry run mode: write operations to file instead of sending HTTP" ) + ; +} + +void ingest_plugin::plugin_initialize( const boost::program_options::variables_map& options ) +{ + try + { + ilog( "Initializing ingest plugin" ); + + my = std::make_unique< detail::ingest_plugin_impl >( *this ); + + // Read configuration + if( options.count( "ingest-endpoint" ) ) + my->_ingest_endpoint = options["ingest-endpoint"].as< std::string >(); + + if( options.count( "ingest-http-timeout" ) ) + my->_http_timeout_ms = options["ingest-http-timeout"].as< uint32_t >(); + + if( options.count( "ingest-queue-size" ) ) + my->_max_queue_size = options["ingest-queue-size"].as< uint32_t >(); + + if( options.count( "ingest-batch-size" ) ) + my->_batch_size = options["ingest-batch-size"].as< uint32_t >(); + + if( options.count( "ingest-batch-timeout" ) ) + my->_batch_timeout_ms = options["ingest-batch-timeout"].as< uint32_t >(); + + // Validate batch size + if( my->_batch_size == 0 ) + { + my->_batch_size = 1; // Disable batching by setting to 1 + } + + if( options.count( "ingest-dry-run" ) ) + my->_dry_run = options["ingest-dry-run"].as< bool >(); + + // Register signal handlers + database& db = appbase::app().get_plugin< chain::chain_plugin >().db(); + my->_post_apply_operation_conn = db.add_post_apply_operation_handler( + [&]( const operation_notification& note ) { + try { my->on_post_apply_operation( note ); } FC_LOG_AND_RETHROW() + }, + *this, + 0 // group + ); + my->_post_apply_block_conn = db.add_post_apply_block_handler( + [&]( const block_notification& note ) { + try { my->on_post_apply_block( note ); } FC_LOG_AND_RETHROW() + }, + *this, + 0 // group + ); + + appbase::app().get_plugin< chain::chain_plugin >().report_state_options( name(), fc::variant_object() ); + + if( my->_dry_run ) + { + ilog( "Ingest plugin initialized in DRY RUN mode" ); + } + else + { + ilog( "Ingest plugin initialized, endpoint: ${ep}", ("ep", my->_ingest_endpoint) ); + // Start HTTP worker early to avoid blocking replay before plugin_startup + if( !my->_http_thread.joinable() ) + { + my->_shutdown = false; + my->_http_thread = std::thread( [this]() { + my->http_send_worker(); + } ); + ilog( "Ingest plugin HTTP worker started (early)" ); + } + } + } + FC_CAPTURE_AND_RETHROW() +} + +void ingest_plugin::plugin_startup() +{ + try + { + ilog( "Starting ingest plugin" ); + + if( my->_dry_run ) + { + // Dry run mode: initialize output file + my->initialize_output_file(); + ilog( "Ingest plugin started in DRY RUN mode" ); + } + else + { + // Normal mode: start HTTP worker thread and retry thread + if( !my->_http_thread.joinable() ) + { + my->_shutdown = false; + my->_http_thread = std::thread( [this]() { + my->http_send_worker(); + } ); + ilog( "Ingest plugin HTTP worker started" ); + } + if( !my->_retry_thread.joinable() ) + { + my->_retry_thread = std::thread( [this]() { + my->http_retry_worker(); + } ); + ilog( "Ingest plugin retry worker started" ); + } + ilog( "Ingest plugin started" ); + } + } + FC_CAPTURE_AND_RETHROW() +} + +void ingest_plugin::plugin_shutdown() +{ + try + { + ilog( "Shutting down ingest plugin" ); + + if( !my->_dry_run ) + { + // Normal mode: stop retry thread + { + std::lock_guard< std::mutex > lock( my->_retry_mutex ); + my->_shutdown = true; + } + my->_retry_cv.notify_one(); + + if( my->_retry_thread.joinable() ) + { + my->_retry_thread.join(); + } + + // Stop HTTP worker thread + { + std::lock_guard< std::mutex > lock( my->_queue_mutex ); + my->_shutdown = true; + } + my->_queue_cv.notify_one(); + + if( my->_http_thread.joinable() ) + { + my->_http_thread.join(); + } + } + else + { + // Dry run mode: close output file + if( my->_output_file.is_open() ) + { + my->_output_file.close(); + ilog( "Closed output file: ${file}", ("file", my->_output_file_path.string()) ); + } + } + + // Disconnect signals + util::disconnect_signal( my->_post_apply_operation_conn ); + util::disconnect_signal( my->_post_apply_block_conn ); + + ilog( "Ingest plugin shut down" ); + } + FC_CAPTURE_AND_RETHROW() +} + +} } } // steem::plugins::ingest diff --git a/libraries/plugins/ingest/plugin.json b/libraries/plugins/ingest/plugin.json new file mode 100644 index 0000000000..828b1a2432 --- /dev/null +++ b/libraries/plugins/ingest/plugin.json @@ -0,0 +1,5 @@ +{ + "plugin_name": "ingest", + "plugin_namespace": "ingest", + "plugin_project": "ingest_plugin" +} diff --git a/libraries/protocol/operations.cpp b/libraries/protocol/operations.cpp index d8eaa19c06..c9add89f42 100644 --- a/libraries/protocol/operations.cpp +++ b/libraries/protocol/operations.cpp @@ -2,6 +2,34 @@ #include +namespace fc +{ + std::string name_from_type( const std::string& type_name ) + { + // Extract operation name from full type name + // Example: "steem::protocol::transfer_operation" -> "transfer" + const std::string prefix = "steem::protocol::"; + const std::string suffix = "_operation"; + + if( type_name.size() >= prefix.size() + suffix.size() && + type_name.substr( 0, prefix.size() ) == prefix && + type_name.substr( type_name.size() - suffix.size(), suffix.size() ) == suffix ) + { + return type_name.substr( prefix.size(), type_name.size() - prefix.size() - suffix.size() ); + } + + // Fallback: use trim_typename_namespace + auto start = type_name.find_last_of( ':' ); + start = ( start == std::string::npos ) ? 0 : start + 1; + auto end = type_name.find_last_of( '_' ); + if( end != std::string::npos && end > start ) + { + return type_name.substr( start, end - start ); + } + return type_name.substr( start ); + } +} + namespace steem { namespace protocol { struct is_market_op_visitor {