From c5bd9c099f35c60f3972a3f70848357bf43ec99b Mon Sep 17 00:00:00 2001 From: mason turner Date: Sun, 28 Jul 2019 18:07:50 +0000 Subject: [PATCH 1/3] Add ability for LogReader to read in next log file --- infrastructure/logging/log_reader.cc | 100 +++++++++++++++++---------- infrastructure/logging/log_reader.hh | 4 +- 2 files changed, 67 insertions(+), 37 deletions(-) diff --git a/infrastructure/logging/log_reader.cc b/infrastructure/logging/log_reader.cc index 813c2f1..c550a61 100644 --- a/infrastructure/logging/log_reader.cc +++ b/infrastructure/logging/log_reader.cc @@ -20,10 +20,8 @@ LogReader::LogReader(const std::string& log_path, const std::vector { for (auto& channel_name : channels_in_log_) { ChannelState channel; - const std::string file_path = generate_log_file_path(log_path, channel_name, channel.current_file_number); - if (!open_file(file_path, channel.current_file)) - { - throw std::runtime_error("Couldn't open file: " + file_path); + if (!get_logfile(channel, channel_name)) { + throw std::runtime_error("Couldn't open file for " + channel_name); } channels_.emplace(channel_name, std::move(channel)); } @@ -34,10 +32,8 @@ LogReader::LogReader(const std::string& log_path, const std::vector if (channel_it != channels_in_log_.end()) { ChannelState channel; - const std::string file_path = generate_log_file_path(log_path, channel_name, channel.current_file_number); - if (!open_file(file_path, channel.current_file)) - { - throw std::runtime_error("Couldn't open file: " + file_path); + if (!get_logfile(channel, channel_name)) { + throw std::runtime_error("Couldn't open file for " + channel_name); } channels_.emplace(channel_name, std::move(channel)); } @@ -48,6 +44,7 @@ LogReader::LogReader(const std::string& log_path, const std::vector } } + LogReader::~LogReader() { for (auto& channel : channels_) { @@ -55,48 +52,79 @@ LogReader::~LogReader() { } } +bool LogReader::get_logfile(ChannelState& channel, std::string channel_name) { + if (channel.current_file.is_open()) { + channel.current_file.close(); + } + const std::string file_path = generate_log_file_path(log_path_, channel_name, channel.current_file_number); + return open_file(file_path, channel.current_file); + +} + +bool LogReader::try_read_next_message(ChannelState& channel, Message& message) { + auto& file = channel.current_file; + + uint32_t channel_id; + uint32_t message_length; + file.read((char*) &channel_id, sizeof(uint32_t)); + file.read((char*) &message_length, sizeof(uint32_t)); + + if (file.eof()) { + return false; + } + + std::string message_data(message_length, ' '); + file.read(&message_data[0], message_length); + if (!file) { + return false; + } + message.deserialize(message_data); + return true; +} + bool LogReader::read_next_message(const std::string& channel_name, Message& message) { auto channel_it = channels_.find(channel_name); if (channel_it != channels_.end()) { - auto& file = channel_it->second.current_file; - - uint32_t channel_id; - uint32_t message_length; - file.read((char*) &channel_id, sizeof(uint32_t)); - file.read((char*) &message_length, sizeof(uint32_t)); - - if (file.eof()) { - return false; + // Try to read a message from the current log file + if (try_read_next_message(channel_it->second, message)) { + return true; } - std::string message_data(message_length, ' '); - file.read(&message_data[0], message_length); - if (!file) { - // TODO: Check to see if there's another log file after this one before assuming we've reached the end of the log. - // std::cerr << "Failed to read from file for channel: " << channel_name << ". Reached end of file." << std::endl; + // Otherwise get the next logfile if it exists + channel_it->second.current_file_number++; + if (!get_logfile(channel_it->second, channel_name)) { return false; } - message.deserialize(message_data); - return true; + // If we can't read from next logfile then give up + return try_read_next_message(channel_it->second, message); } return false; } +bool LogReader::try_read_next_message_raw(ChannelState& channel, std::string& message_data) { + auto& file = channel.current_file; + uint32_t channel_id; + uint32_t message_length; + file.read((char*) &channel_id, sizeof(uint32_t)); + file.read((char*) &message_length, sizeof(uint32_t)); + message_data.resize(message_length, ' '); + file.read(&message_data[0], message_length); + return file.good(); +} + bool LogReader::read_next_message_raw(const std::string& channel_name, std::string& message_data) { auto channel_it = channels_.find(channel_name); if (channel_it != channels_.end()) { - auto& file = channel_it->second.current_file; - uint32_t channel_id; - uint32_t message_length; - file.read((char*) &channel_id, sizeof(uint32_t)); - file.read((char*) &message_length, sizeof(uint32_t)); - message_data.resize(message_length, ' '); - file.read(&message_data[0], message_length); - if (!file) { - // TODO: Check to see if there's another log file after this one before assuming we've reached the end of the log. - // std::cerr << "Failed to read from file for channel: " << channel_name << ". Reached end of file." << std::endl; + // Try to read a message from the current log file + if (try_read_next_message_raw(channel_it->second, message_data)) { + return true; + } + // Otherwise get the next log file if it exists + channel_it->second.current_file_number++; + if (!get_logfile(channel_it->second, channel_name)) { return false; } - return true; + // If we can't read from teh next logfile, then give up + return try_read_next_message_raw(channel_it->second, message_data); } return false; } @@ -125,7 +153,7 @@ bool LogReader::open_file(const std::string& file_path, std::ifstream& file) { std::cerr << "Could not open file " << file_path << " because " << e.what() << std::endl; return false; } - return true; + return file.good(); } } // namespace jet diff --git a/infrastructure/logging/log_reader.hh b/infrastructure/logging/log_reader.hh index 618fa26..226f565 100644 --- a/infrastructure/logging/log_reader.hh +++ b/infrastructure/logging/log_reader.hh @@ -38,8 +38,10 @@ class LogReader { private: bool open_file(const std::string& file_path, std::ifstream& file); + bool try_read_next_message(ChannelState& channel, Message& message); + bool try_read_next_message_raw(ChannelState& channel, std::string& message_data); + bool get_logfile(ChannelState& channel, std::string channel_name); bool read_metadata(const std::string& log_path, std::vector& channel_names); - std::string log_path_; std::unordered_map channels_; std::vector channels_in_log_; From b0e3eab359eb96111b0240d60c5b9084aada71a2 Mon Sep 17 00:00:00 2001 From: mason turner Date: Sun, 28 Jul 2019 18:16:17 +0000 Subject: [PATCH 2/3] Formatting and better file checks --- infrastructure/logging/log_reader.cc | 78 ++++++++++++++-------------- 1 file changed, 39 insertions(+), 39 deletions(-) diff --git a/infrastructure/logging/log_reader.cc b/infrastructure/logging/log_reader.cc index c550a61..b334c82 100644 --- a/infrastructure/logging/log_reader.cc +++ b/infrastructure/logging/log_reader.cc @@ -16,8 +16,7 @@ LogReader::LogReader(const std::string& log_path, const std::vector throw std::runtime_error("Couldn't open metadata file for log: " + log_path); } - if (channel_names.empty()) - { + if (channel_names.empty()) { for (auto& channel_name : channels_in_log_) { ChannelState channel; if (!get_logfile(channel, channel_name)) { @@ -29,8 +28,7 @@ LogReader::LogReader(const std::string& log_path, const std::vector else { for (auto& channel_name : channel_names) { auto channel_it = std::find(channels_in_log_.begin(), channels_in_log_.end(), channel_name); - if (channel_it != channels_in_log_.end()) - { + if (channel_it != channels_in_log_.end()) { ChannelState channel; if (!get_logfile(channel, channel_name)) { throw std::runtime_error("Couldn't open file for " + channel_name); @@ -44,10 +42,8 @@ LogReader::LogReader(const std::string& log_path, const std::vector } } - LogReader::~LogReader() { - for (auto& channel : channels_) - { + for (auto& channel : channels_) { channel.second.current_file.close(); } } @@ -69,13 +65,13 @@ bool LogReader::try_read_next_message(ChannelState& channel, Message& message) { file.read((char*) &channel_id, sizeof(uint32_t)); file.read((char*) &message_length, sizeof(uint32_t)); - if (file.eof()) { + if (!file.good()) { return false; } std::string message_data(message_length, ' '); file.read(&message_data[0], message_length); - if (!file) { + if (!file.good()) { return false; } message.deserialize(message_data); @@ -84,20 +80,20 @@ bool LogReader::try_read_next_message(ChannelState& channel, Message& message) { bool LogReader::read_next_message(const std::string& channel_name, Message& message) { auto channel_it = channels_.find(channel_name); - if (channel_it != channels_.end()) { - // Try to read a message from the current log file - if (try_read_next_message(channel_it->second, message)) { - return true; - } - // Otherwise get the next logfile if it exists - channel_it->second.current_file_number++; - if (!get_logfile(channel_it->second, channel_name)) { - return false; - } - // If we can't read from next logfile then give up - return try_read_next_message(channel_it->second, message); + if (channel_it == channels_.end()) { + return false; + } + // Try to read a message from the current log file + if (try_read_next_message(channel_it->second, message)) { + return true; } - return false; + // Otherwise get the next logfile if it exists + channel_it->second.current_file_number++; + if (!get_logfile(channel_it->second, channel_name)) { + return false; + } + // If we can't read from next logfile then give up + return try_read_next_message(channel_it->second, message); } bool LogReader::try_read_next_message_raw(ChannelState& channel, std::string& message_data) { @@ -106,40 +102,44 @@ bool LogReader::try_read_next_message_raw(ChannelState& channel, std::string& me uint32_t message_length; file.read((char*) &channel_id, sizeof(uint32_t)); file.read((char*) &message_length, sizeof(uint32_t)); + + if (!file.good()) { + return false; + } + message_data.resize(message_length, ' '); file.read(&message_data[0], message_length); + return file.good(); } bool LogReader::read_next_message_raw(const std::string& channel_name, std::string& message_data) { auto channel_it = channels_.find(channel_name); - if (channel_it != channels_.end()) { - // Try to read a message from the current log file - if (try_read_next_message_raw(channel_it->second, message_data)) { - return true; - } - // Otherwise get the next log file if it exists - channel_it->second.current_file_number++; - if (!get_logfile(channel_it->second, channel_name)) { - return false; - } - // If we can't read from teh next logfile, then give up - return try_read_next_message_raw(channel_it->second, message_data); + if (channel_it == channels_.end()) { + return false; + } + // Try to read a message from the current log file + if (try_read_next_message_raw(channel_it->second, message_data)) { + return true; + } + // Otherwise get the next log file if it exists + channel_it->second.current_file_number++; + if (!get_logfile(channel_it->second, channel_name)) { + return false; } - return false; + // If we can't read from teh next logfile, then give up + return try_read_next_message_raw(channel_it->second, message_data); } bool LogReader::read_metadata(const std::string& log_path, std::vector& channel_names) { const std::string metadata_path = log_path + "/metadata.txt"; std::ifstream metadata_file; - if (!open_file(metadata_path, metadata_file)) - { + if (!open_file(metadata_path, metadata_file)) { return false; } std::string line; - while (std::getline(metadata_file, line)) - { + while (std::getline(metadata_file, line)) { channels_in_log_.emplace_back(line); } metadata_file.close(); From a40d5c7fc0b9f5b53896581d90c6b172d05d2381 Mon Sep 17 00:00:00 2001 From: mason turner Date: Sun, 28 Jul 2019 18:17:43 +0000 Subject: [PATCH 3/3] misspell --- infrastructure/logging/log_reader.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/infrastructure/logging/log_reader.cc b/infrastructure/logging/log_reader.cc index b334c82..ade55c1 100644 --- a/infrastructure/logging/log_reader.cc +++ b/infrastructure/logging/log_reader.cc @@ -109,7 +109,7 @@ bool LogReader::try_read_next_message_raw(ChannelState& channel, std::string& me message_data.resize(message_length, ' '); file.read(&message_data[0], message_length); - + return file.good(); } @@ -127,7 +127,7 @@ bool LogReader::read_next_message_raw(const std::string& channel_name, std::stri if (!get_logfile(channel_it->second, channel_name)) { return false; } - // If we can't read from teh next logfile, then give up + // If we can't read from the next logfile, then give up return try_read_next_message_raw(channel_it->second, message_data); }