diff --git a/.gitignore b/.gitignore index 6dee9d2..18b7dd8 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ CMakeFiles cmake_install.cmake lib tags +install_manifest.txt diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..1e316b0 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,19 @@ +language: cpp +compiler: + - gcc + - clang + +before_install: + - echo $LANG + - echo $LC_ALL + - cmake --version + - gcc -v + - sudo apt-get install -y libboost-dev + +script: + - mkdir build && cd build && cmake .. && make + +notifications: + email: + - guweigang@bullsoft.org + - shangyuanchun@bullsoft.org diff --git a/CMakeLists.txt b/CMakeLists.txt index d86aa26..6cf8619 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,9 +1,10 @@ project (mysql-replication-listener) -cmake_minimum_required (VERSION 2.6) +cmake_minimum_required (VERSION 2.8) set(MySQL_BINLOG_VERSION_MAJOR "0") -set(MySQL_BINLOG_VERSION_MINOR "0.1") -set(MRL_VERSION "${MySQL_BINLOG_VERSION_MAJOR}.${MySQL_BINLOG_VERSION_MINOR}") +set(MySQL_BINLOG_VERSION_MINOR "2") +set(MySQL_BINLOG_VERSION_PATCH "0") +set(MySQL_BINLOG_VERSION "${MySQL_BINLOG_VERSION_MAJOR}.${MySQL_BINLOG_VERSION_MINOR}.${MySQL_BINLOG_VERSION_PATCH}") set(CMAKE_VERSION_STRING "${CMAKE_MAJOR_VERSION}.${CMAKE_MINOR_VERSION}") @@ -14,9 +15,13 @@ set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${PROJECT_BINARY_DIR}/lib) include_directories(include) link_directories(${PROJECT_BINARY_DIR}/lib) -# TODO: order: find asio -> allow to define asio directory -> use include/asio -# For ASIO headers -INCLUDE_DIRECTORIES(include/asio) +find_package(Boost) +if(Boost_FOUND) + include_directories(${Boost_INCLUDE_DIRS}) + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DHAVE_BOOST") +else(Boost_FOUND) + message(INFO "No Boost found, will not use Boost") +endif(Boost_FOUND) # --------- Find crypt ------------------------------------------ FIND_LIBRARY(LIB_CRYPTO crypto /opt/local/lib /opt/lib /usr/lib /usr/local/lib) @@ -26,5 +31,5 @@ add_subdirectory(src) include(InstallRequiredSystemLibraries) # installation configuration -install(DIRECTORY include DESTINATION /usr FILES_MATCHING PATTERN "*.h") +install(DIRECTORY include DESTINATION . FILES_MATCHING PATTERN "*.h" PATTERN "*.hpp" PATTERN "*.ipp") diff --git a/README.md b/README.md index 25fdc06..ee5ee57 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,40 @@ -mysql-replication-listener +MySQL Replication Listener ========================= +[![Build Status](https://img.shields.io/travis/BullSoft/mysql-replication-listener/master.svg?style=flat)](http://travis-ci.org/BullSoft/mysql-replication-listener) + A listener to fetch binlog from mysql server. Reference: -https://code.launchpad.net/mysql-replication-listener -https://github.com/wangmh/mysql-replication-listener + - Origin repo: https://code.launchpad.net/mysql-replication-listener + - We forked from: https://github.com/wangmh/mysql-replication-listener + +Install +-------------------- +mysql-replication-listener uses cmake, so you need cmake first. + +We use asio for socket communication, and it involes some **boost** libraries, so maybe you need install boost(1.47 and newer). + +And then: + +$ cmake . -DCMAKE_INSTALL_PREFIX=/path/to/install + +$ make + +$ make install + + +Notice +-------------------- +We need to make mysql server to dump binlog-stream to us, so we need **REPLICATION SLAVE** privilege. +And We need to get master status, so we need **REPLICATION CLIENT** privilege. + +If you want to set_position to a specified binlog-file and the position, we need to ensure the binlog-file and it's position are legal, so we need to execute "SHOW BINARY LOGS" on the server. Unfortunately, + In MySQL 5.1.63 and earlier, the **SUPER** privilege was required to use this statement. Starting with MYSQL 5.1.64, a user with the **REPLICATION CLIENT** privilege may also execute this statement. + +So If you use MySQL 5.1.63 and earlier, you need **SUPER** privilege too. + +Also, We auth privilege before binlog_dump, so we will access database **mysql** for accout checking... That means you have to give at least **READ** privilege for database mysql ... + +Also notice that besides `log-bin` and `binlog-format` set in mysqld section, you should also set a `server-id` for mysql master. + diff --git a/include/asio/basic_socket_iostream.hpp b/include/asio/basic_socket_iostream.hpp index e90f9ae..1c70f5a 100644 --- a/include/asio/basic_socket_iostream.hpp +++ b/include/asio/basic_socket_iostream.hpp @@ -19,11 +19,15 @@ #if !defined(BOOST_NO_IOSTREAM) -#include -#include -#include -#include -#include +#ifdef HAVE_BOOST +# include +# include +# include +# include +# include +#else +#endif + #include "asio/basic_socket_streambuf.hpp" #include "asio/stream_socket_service.hpp" diff --git a/include/asio/basic_socket_streambuf.hpp b/include/asio/basic_socket_streambuf.hpp index 91b9d3c..c2c884c 100644 --- a/include/asio/basic_socket_streambuf.hpp +++ b/include/asio/basic_socket_streambuf.hpp @@ -20,11 +20,31 @@ #if !defined(BOOST_NO_IOSTREAM) #include -#include -#include -#include -#include -#include +#ifdef HAVE_BOOST +# include +# include +# include +# include +# include +#else +namespace boost { + template + class base_from_member + { + protected: + MemberType member; + + base_from_member(): member() + { + } + }; +} +# ifndef BOOST_PP_CAT +# define BOOST_PP_CAT(a, b) a ## b +# endif +# define BOOST_PP_INC(x) (x) +#endif + #include "asio/basic_socket.hpp" #include "asio/deadline_timer_service.hpp" #include "asio/detail/array.hpp" diff --git a/include/asio/detail/config.hpp b/include/asio/detail/config.hpp index 3aa7b48..df0dca3 100644 --- a/include/asio/detail/config.hpp +++ b/include/asio/detail/config.hpp @@ -11,7 +11,19 @@ #ifndef ASIO_DETAIL_CONFIG_HPP #define ASIO_DETAIL_CONFIG_HPP -#include +#ifdef HAVE_BOOST +# include +#else +# if defined(_WIN32) || defined(__WIN32__) || defined(WIN32) +# define BOOST_WINDOWS +# if !defined(__GNUC__) && !defined(BOOST_HAS_DECLSPEC) +# define BOOST_HAS_DECLSPEC +# endif +# ifdef _MSC_VER +# define BOOST_MSVC _MSC_VER +# endif +# endif +#endif // Default to a header-only implementation. The user must specifically request // separate compilation by defining either ASIO_SEPARATE_COMPILATION or diff --git a/include/binlog_api.h b/include/binlog_api.h index c49dee9..277652b 100644 --- a/include/binlog_api.h +++ b/include/binlog_api.h @@ -21,6 +21,11 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA #ifndef _REPEVENT_H #define _REPEVENT_H +// temp fix +#ifndef HAVE_BOOST +#define HAVE_BOOST +#endif + #include #include #include @@ -49,12 +54,16 @@ enum Error_code { class Dummy_driver : public system::Binary_log_driver { -public: + public: Dummy_driver() : Binary_log_driver("", 0) {} virtual ~Dummy_driver() {} virtual int connect() { return 1; } + virtual int disconnect() { return ERR_OK; } + + virtual int set_server_id(int server_id) { return server_id; } + virtual int wait_for_next_event(mysql::Binary_log_event **event) { return ERR_EOF; } @@ -66,6 +75,7 @@ class Dummy_driver : public system::Binary_log_driver virtual int get_position(std::string *str, unsigned long *position) { return ERR_OK; } + }; class Content_handler; @@ -73,24 +83,25 @@ class Content_handler; typedef std::list Content_handler_pipeline; class Binary_log { -private: + private: system::Binary_log_driver *m_driver; Dummy_driver m_dummy_driver; Content_handler_pipeline m_content_handlers; unsigned long m_binlog_position; std::string m_binlog_file; -public: + public: Binary_log(system::Binary_log_driver *drv); ~Binary_log() {} - + int connect(); + int disconnect(); + /** * Blocking attempt to get the next binlog event from the stream */ int wait_for_next_event(Binary_log_event **event); - /** * Inserts/removes content handlers in and out of the chain * The Content_handler_pipeline is a derived std::list @@ -101,9 +112,9 @@ class Binary_log { * Set the binlog position (filename, position) * * @return Error_code - * @retval ERR_OK The position is updated. - * @retval ERR_EOF The position is out-of-range - * @retval >= ERR_CODE_COUNT An unspecified error occurred + * @retval ERR_OK The position is updated. + * @retval ERR_EOF The position is out-of-range + * @retval >= ERR_CODE_COUNT An unspecified error occurred */ int set_position(const std::string &filename, unsigned long position); @@ -112,12 +123,15 @@ class Binary_log { * @param position Requested position * * @return Error_code - * @retval ERR_OK The position is updated. - * @retval ERR_EOF The position is out-of-range - * @retval >= ERR_CODE_COUNT An unspecified error occurred + * @retval ERR_OK The position is updated. + * @retval ERR_EOF The position is out-of-range + * @retval >= ERR_CODE_COUNT An unspecified error occurred */ int set_position(unsigned long position); + + int set_server_id(int server_id); + /** * Fetch the binlog position for the current file */ diff --git a/include/binlog_driver.h b/include/binlog_driver.h index 37774e9..4656c3d 100644 --- a/include/binlog_driver.h +++ b/include/binlog_driver.h @@ -31,7 +31,7 @@ class Binary_log_driver { public: template - Binary_log_driver(const FilenameT& filename = FilenameT(), unsigned int offset = 0) + Binary_log_driver(const FilenameT& filename = FilenameT(), unsigned int offset = 0) : m_binlog_file_name(filename), m_binlog_offset(offset) { } @@ -44,9 +44,12 @@ class Binary_log_driver * @retval 0 Success * @retval >0 Error code (to be specified) */ - virtual int connect()= 0; + virtual int connect() = 0; + virtual int disconnect() = 0; + virtual int set_server_id(int server_id) = 0; + /** * Blocking attempt to get the next binlog event from the stream * @param event [out] Pointer to a binary log event to be fetched. diff --git a/include/binlog_event.h b/include/binlog_event.h index b6eeccb..e863c66 100644 --- a/include/binlog_event.h +++ b/include/binlog_event.h @@ -81,31 +81,32 @@ enum Log_event_type DELETE_ROWS_EVENT = 25, /* - Something out of the ordinary happened on the master + * Something out of the ordinary happened on the master */ INCIDENT_EVENT= 26, - /* - * A user defined event - */ - USER_DEFINED= 27, + /* + * A user defined event + */ + USER_DEFINED= 27, + /* Add new events here - right above this comment! Existing events (except ENUM_END_EVENT) should never change their numbers */ - ENUM_END_EVENT /* end marker */ }; namespace system { -/** - * Convenience function to get the string representation of a binlog event. - */ -const char* get_event_type_str(Log_event_type type); + /** + * Convenience function to get the string representation of a binlog event. + */ + const char* get_event_type_str(Log_event_type type); } // end namespace system #define LOG_EVENT_HEADER_SIZE 20 + class Log_event_header { public: @@ -129,16 +130,16 @@ class Binary_log_event public: Binary_log_event() { - /* - An event length of 0 indicates that the header isn't initialized - */ - m_header.event_length= 0; - m_header.type_code= 0; + /* + An event length of 0 indicates that the header isn't initialized + */ + m_header.event_length = 0; + m_header.type_code = 0; } Binary_log_event(Log_event_header *header) { - m_header= *header; + m_header= *header; } virtual ~Binary_log_event(); @@ -151,11 +152,18 @@ class Binary_log_event return (enum Log_event_type) m_header.type_code; } + uint32_t get_next_position() const + { + return m_header.next_position; + } + /** * Return a pointer to the header of the log event */ Log_event_header *header() { return &m_header; } + virtual bool is_valid() const { return true; } + private: Log_event_header m_header; }; @@ -171,6 +179,8 @@ class Query_event: public Binary_log_event std::string db_name; std::string query; + + bool is_valid() const { return query.length() != 0; } }; class Rotate_event: public Binary_log_event diff --git a/include/bounded_buffer.h b/include/bounded_buffer.h index 46649f1..8ec6dfa 100644 --- a/include/bounded_buffer.h +++ b/include/bounded_buffer.h @@ -50,7 +50,7 @@ class bounded_buffer void push_front(const value_type& item) { pthread_mutex_lock(&m_mutex); - if (m_unread == capacity) + while (m_unread == capacity) pthread_cond_wait(&m_not_full, &m_mutex); for (int i = m_unread; i > 0; i--) { m_container[i] = m_container[i-1]; diff --git a/include/decimal.h b/include/decimal.h new file mode 100644 index 0000000..27f1ad8 --- /dev/null +++ b/include/decimal.h @@ -0,0 +1,88 @@ +/* Copyright (c) 2000, 2011, Oracle and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#ifndef _decimal_h +#define _decimal_h + +#include +#include + +#if !defined(__GNUC__) || (__GNUC__ == 2 && __GNUC_MINOR__ < 96) +#define __builtin_expect(x, expected_value) (x) +#endif + +#define likely(x) __builtin_expect((x),1) +#define unlikely(x) __builtin_expect((x),0) + +typedef enum +{ + TRUNCATE=0, + HALF_EVEN, + HALF_UP, + CEILING, + FLOOR +} decimal_round_mode; + +typedef int32_t decimal_digit_t; + +/** + intg is the number of *decimal* digits (NOT number of decimal_digit_t's !) + before the point + frac is the number of decimal digits after the point + len is the length of buf (length of allocated space) in decimal_digit_t's, + not in bytes + sign false means positive, true means negative + buf is an array of decimal_digit_t's + */ +typedef struct st_decimal_t { + int intg, frac, len; + char sign; + decimal_digit_t *buf; +} decimal_t; + +#ifdef __cplusplus +extern "C" { +#endif + +int decimal_bin_size(int precision, int scale); +int decimal2string(const decimal_t *from, char *to, int *to_len, + int fixed_precision, int fixed_decimals, + char filler); + +int bin2decimal(const u_char *from, decimal_t *to, int precision, int scale); + +#ifdef __cplusplus +} +#endif + +#define E_DEC_OK 0 +#define E_DEC_TRUNCATED 1 +#define E_DEC_OVERFLOW 2 +#define E_DEC_DIV_ZERO 4 +#define E_DEC_BAD_NUM 8 +#define E_DEC_OOM 16 + +#define E_DEC_ERROR 31 +#define E_DEC_FATAL_ERROR 30 + +#define decimal_make_zero(dec) do { \ + (dec)->buf[0]=0; \ + (dec)->intg=1; \ + (dec)->frac=0; \ + (dec)->sign=0; \ + } while(0) + +#endif + diff --git a/include/field_iterator.h b/include/field_iterator.h index 294f5a4..32b9dee 100644 --- a/include/field_iterator.h +++ b/include/field_iterator.h @@ -33,7 +33,7 @@ namespace mysql { bool is_null(unsigned char *bitmap, int index); int lookup_metadata_field_size(enum mysql::system::enum_field_types field_type); -uint32_t extract_metadata(const Table_map_event *map, int col_no); +uint16_t extract_metadata(const Table_map_event *map, int col_no); template class Row_event_iterator : public std::iterator size_t Row_event_iterator::fields(Iterator_value_type& fields_vector ) { @@ -87,8 +85,8 @@ size_t Row_event_iterator::fields(Iterator_value_type& fiel for(unsigned col_no=0; col_no < m_table_map->columns.size(); ++col_no) { ++row_field_col_index; - unsigned int type= m_table_map->columns[col_no]&0xFF; - uint32_t metadata= extract_metadata(m_table_map, col_no); + unsigned int type = m_table_map->columns[col_no]&0xFF; + uint16_t metadata = extract_metadata(m_table_map, col_no); mysql::Value val((enum mysql::system::enum_field_types)type, metadata, (const char *)&m_row_event->row[field_offset]); diff --git a/include/file_driver.h b/include/file_driver.h index 3c711bb..7019b8b 100644 --- a/include/file_driver.h +++ b/include/file_driver.h @@ -48,6 +48,7 @@ class Binlog_file_driver } int connect(); + int set_server_id(int server_id); int disconnect(); int wait_for_next_event(mysql::Binary_log_event **event); int set_position(const std::string &str, unsigned long position); diff --git a/include/protocol.h b/include/protocol.h index bf27f5b..b8907c1 100644 --- a/include/protocol.h +++ b/include/protocol.h @@ -165,6 +165,7 @@ enum enum_field_types { MYSQL_TYPE_DECIMAL, MYSQL_TYPE_TINY, }; + #define int3store(T,A) do { *(T)= (unsigned char) ((A));\ *(T+1)=(unsigned char) (((unsigned int) (A) >> 8));\ *(T+2)=(unsigned char) (((A) >> 16)); } while (0) @@ -220,6 +221,9 @@ class Protocol { public: Protocol() { m_length_encoded_binary= false; } + + virtual ~Protocol() { } + /** Return the number of bytes which is read or written by this protocol chunk. The default size is equal to the underlying storage data type. @@ -254,6 +258,11 @@ class Protocol friend std::istream &operator>>(std::istream &is, std::string &str); }; +enum enum_pchunk_memory { + NEED_NOT_ALLOC, + NEED_ALLOC, +}; + template class Protocol_chunk : public Protocol { @@ -261,20 +270,31 @@ class Protocol_chunk : public Protocol Protocol_chunk() : Protocol() { - m_size= 0; - m_data= 0; + m_size = 0; + m_data = 0; + m_alloc = NEED_NOT_ALLOC; } - Protocol_chunk(T &chunk) : Protocol() + Protocol_chunk(T &chunk, enum enum_pchunk_memory alloc = NEED_NOT_ALLOC) : Protocol(), m_alloc(alloc) { - m_data= (const char *)&chunk; - m_size= sizeof(T); + m_size = sizeof(T); + if (m_alloc == NEED_ALLOC) { + m_data = (char *) new char[m_size]; + std::memcpy(m_data, &chunk, m_size); + } else { + m_data = (char *) &chunk; + } } - Protocol_chunk(const T &chunk) : Protocol () + Protocol_chunk(const T &chunk, enum enum_pchunk_memory alloc = NEED_NOT_ALLOC) : Protocol (), m_alloc(alloc) { - m_data= (const char *) &chunk; - m_size= sizeof(T); + m_size = sizeof(T); + if (m_alloc == NEED_ALLOC) { + m_data = (char *) new char[m_size]; + std::memcpy(m_data, &chunk, m_size); + } else { + m_data = (char *) &chunk; + } } /** @@ -286,8 +306,16 @@ class Protocol_chunk : public Protocol */ Protocol_chunk(T *buffer, unsigned long size) : Protocol () { - m_data= (const char *)buffer; - m_size= size; + m_data = (char *)buffer; + m_size = size; + m_alloc = NEED_NOT_ALLOC; + } + + virtual ~Protocol_chunk() + { + if (m_alloc == NEED_ALLOC) { + delete [] m_data; + } } virtual unsigned int size() { return m_size; } @@ -296,11 +324,14 @@ class Protocol_chunk : public Protocol { //assert(new_size <= m_size); memset((char *)m_data+new_size,'\0', m_size-new_size); - m_size= new_size; + m_size = new_size; } + private: - const char *m_data; - unsigned long m_size; + + char *m_data; + unsigned long m_size; + enum enum_pchunk_memory m_alloc; }; std::ostream &operator<<(std::ostream &os, Protocol &chunk); @@ -315,7 +346,7 @@ class Protocol_chunk_string //: public Protocol_chunk_uint8 m_str= &chunk; m_str->assign(size,'*'); } - + ~ Protocol_chunk_string() {} virtual unsigned int size() const { return m_str->size(); } virtual const char *data() const { return m_str->data(); } virtual void collapse_size(unsigned int new_size) @@ -380,6 +411,8 @@ class Protocol_chunk_string_len m_storage= &str; } + ~Protocol_chunk_string_len() {} + private: friend std::istream &operator>>(std::istream &is, Protocol_chunk_string_len &lenstr); std::string *m_storage; diff --git a/include/resultset_iterator.h b/include/resultset_iterator.h index b23c676..b793b0e 100644 --- a/include/resultset_iterator.h +++ b/include/resultset_iterator.h @@ -52,7 +52,7 @@ struct Field_packet //uint64_t default_value; // Length coded binary; only in table descr. }; -typedef std::list String_storage; +typedef std::list String_storage; namespace system { void digest_result_header(std::istream &is, uint64_t &field_count, uint64_t extra); @@ -76,6 +76,12 @@ class Result_set iterator end(); const_iterator begin() const; const_iterator end() const; + ~Result_set() { + for (String_storage::iterator it=m_storage.begin(); it != m_storage.end(); ++it) { + delete *it; + } + m_storage.clear(); + } private: void digest_row_set(); diff --git a/include/tcp_driver.h b/include/tcp_driver.h index 83134de..f31ec6a 100644 --- a/include/tcp_driver.h +++ b/include/tcp_driver.h @@ -44,12 +44,12 @@ class Binlog_tcp_driver : public Binary_log_driver { public: - Binlog_tcp_driver(const std::string& user, const std::string& passwd, - const std::string& host, unsigned long port) - : Binary_log_driver("", 4), m_host(host), m_user(user), m_passwd(passwd), - m_port(port), m_socket(NULL), m_waiting_event(0), m_event_loop(0), - m_total_bytes_transferred(0), m_shutdown(false), - m_event_queue(new bounded_buffer(50)) + Binlog_tcp_driver(const std::string& user, const std::string& passwd, + const std::string& host, unsigned long port) + : Binary_log_driver("", 4), m_host(host), m_user(user), m_passwd(passwd), + m_port(port), m_socket(NULL), m_waiting_event(0), m_event_loop(0), + m_total_bytes_transferred(0), m_shutdown(false), m_server_id(1), + m_event_queue(new bounded_buffer(50)) { } @@ -77,6 +77,8 @@ class Binlog_tcp_driver : public Binary_log_driver int get_position(std::string *str, unsigned long *position); + int set_server_id(int server_id); + const std::string& user() const { return m_user; } const std::string& password() const { return m_passwd; } const std::string& host() const { return m_host; } @@ -162,7 +164,7 @@ class Binlog_tcp_driver : public Binary_log_driver * this function is called. * The event queue is emptied. */ - void disconnect(void); + int disconnect(); /** * Terminates the io service and sets the shudown flag. @@ -174,7 +176,8 @@ class Binlog_tcp_driver : public Binary_log_driver asio::io_service m_io_service; tcp::socket *m_socket; bool m_shutdown; - + // slave server id + int m_server_id; /** * Temporary storage for a handshake package */ @@ -216,7 +219,7 @@ class Binlog_tcp_driver : public Binary_log_driver * constructed yet. */ Log_event_header *m_waiting_event; - Log_event_header m_log_event_header; + /** * A ring buffer used to dispatch aggregated events to the user application */ @@ -272,7 +275,7 @@ int authenticate(tcp::socket *socket, const std::string& user, tcp::socket * sync_connect_and_authenticate(asio::io_service &io_service, const std::string &user, - const std::string &passwd, const std::string &host, long port); + const std::string &passwd, const std::string &host, long port, int server_id); } } diff --git a/include/value.h b/include/value.h index f3a4c25..3468635 100644 --- a/include/value.h +++ b/include/value.h @@ -40,9 +40,10 @@ namespace mysql { @return The size in bytes of a particular field */ -int calc_field_size(unsigned char column_type, const unsigned char *field_ptr, - uint32_t metadata); +int calc_field_size(enum mysql::system::enum_field_types column_type, const unsigned char *field_ptr, + uint16_t metadata); +uint8_t calc_newdecimal_size(uint8_t m, uint8_t d); /** * A value object class which encapsluate a tuple (value type, metadata, storage) @@ -53,21 +54,30 @@ int calc_field_size(unsigned char column_type, const unsigned char *field_ptr, class Value { public: - Value(enum system::enum_field_types type, uint32_t metadata, const char *storage) : + Value(enum system::enum_field_types type, uint16_t metadata, const char *storage) : m_type(type), m_storage(storage), m_metadata(metadata), m_is_null(false) { - m_size = calc_field_size((unsigned char)type, + m_size = calc_field_size(type, (const unsigned char*)storage, metadata); - //std::cout << "TYPE: " << type << " SIZE: " << m_size << std::endl; + // metadata: 16bits, lower byte is real type, high byte is length + if (type == mysql::system::MYSQL_TYPE_STRING) { + enum system::enum_field_types lower = (enum system::enum_field_types)(metadata & 0xFF); + if (lower == mysql::system::MYSQL_TYPE_SET + || lower == mysql::system::MYSQL_TYPE_ENUM) { + m_type = lower; + m_metadata = (metadata >> 8U); + } + } + // std::cout << "TYPE: " << type << " SIZE: " << m_size << std::endl; }; Value() { - m_size= 0; - m_storage= 0; - m_metadata= 0; - m_is_null= false; + m_size = 0; + m_storage = 0; + m_metadata = 0; + m_is_null = false; } /** @@ -92,7 +102,7 @@ class Value */ size_t length() const { return m_size; } enum system::enum_field_types type() const { return m_type; } - uint32_t metadata() const { return m_metadata; } + uint16_t metadata() const { return m_metadata; } /** * Returns the integer representation of a storage of a pre-specified @@ -146,7 +156,7 @@ class Value enum system::enum_field_types m_type; size_t m_size; const char *m_storage; - uint32_t m_metadata; + uint16_t m_metadata; bool m_is_null; }; @@ -154,7 +164,7 @@ class Converter { public: - void to(long long &out, const Value &val) const; + void to(long long &out, const Value &val) const; /** * Converts and copies the sql value to a std::string object. * @param[out] str The target string diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index b0308c1..d456193 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -5,10 +5,11 @@ set(replication_sources binlog_driver.cpp basic_transaction_parser.cpp tcp_driver.cpp file_driver.cpp binary_log.cpp protocol.cpp value.cpp binlog_event.cpp resultset_iterator.cpp basic_transaction_parser.cpp - basic_content_handler.cpp utilities.cpp) + basic_content_handler.cpp utilities.cpp decimal.cpp) # Configure for building static library add_library(replication_static STATIC ${replication_sources}) +set_target_properties(replication_static PROPERTIES COMPILE_FLAGS -fPIC) target_link_libraries(replication_static crypto pthread) set_target_properties(replication_static PROPERTIES OUTPUT_NAME "replication") @@ -18,8 +19,12 @@ add_library(replication_shared SHARED ${replication_sources}) target_link_libraries(replication_shared crypto pthread) set_target_properties(replication_shared PROPERTIES - VERSION 0.1 SOVERSION 1 - OUTPUT_NAME "replication") +VERSION "${MySQL_BINLOG_VERSION}" SOVERSION 2 OUTPUT_NAME "replication" +) + +if(APPLE) +set_target_properties(replication_shared PROPERTIES MACOSX_RPATH ON) +endif(APPLE) install(TARGETS replication_shared LIBRARY DESTINATION lib) install(TARGETS replication_static ARCHIVE DESTINATION lib) diff --git a/src/binary_log.cpp b/src/binary_log.cpp index 47b540a..3575d2a 100644 --- a/src/binary_log.cpp +++ b/src/binary_log.cpp @@ -32,9 +32,9 @@ Binary_log::Binary_log(Binary_log_driver *drv) : m_binlog_position(4), m_binlog_ if (drv == NULL) { m_driver= &m_dummy_driver; + } else { + m_driver= drv; } - else - m_driver= drv; } Content_handler_pipeline *Binary_log::content_handler_pipeline(void) @@ -56,12 +56,11 @@ int Binary_log::wait_for_next_event(mysql::Binary_log_event **event_ptr) { event= reinjection_queue.front(); reinjection_queue.pop_front(); - } - else - { + } else { // Return in case of non-ERR_OK. - if(rc= m_driver->wait_for_next_event(&event)) + if(rc= m_driver->wait_for_next_event(&event)) { return rc; + } } m_binlog_position= event->header()->next_position; mysql::Content_handler *handler; @@ -79,8 +78,7 @@ int Binary_log::wait_for_next_event(mysql::Binary_log_event **event_ptr) } } while(event == 0 || !reinjection_queue.empty()); - if (event_ptr) - *event_ptr= event; + if (event_ptr) *event_ptr= event; return 0; } @@ -120,4 +118,14 @@ int Binary_log::connect() return m_driver->connect(); } +int Binary_log::disconnect() +{ + return m_driver->disconnect(); +} + +int Binary_log::set_server_id(int server_id) +{ + return m_driver->set_server_id(server_id); +} + } diff --git a/src/binlog_driver.cpp b/src/binlog_driver.cpp index 1495d0f..9f00efa 100644 --- a/src/binlog_driver.cpp +++ b/src/binlog_driver.cpp @@ -20,56 +20,55 @@ #include "binlog_driver.h" -namespace mysql { namespace system { +namespace mysql { -/* -Binary_log_event* Binary_log_driver::parse_event(asio::streambuf - &sbuff, Log_event_header - *header) - */ -Binary_log_event* Binary_log_driver::parse_event(std::istream &is, - Log_event_header *header) -{ - Binary_log_event *parsed_event= 0; + namespace system { - switch (header->type_code) { - case TABLE_MAP_EVENT: - parsed_event= proto_table_map_event(is, header); - break; - case QUERY_EVENT: - parsed_event= proto_query_event(is, header); - break; - case INCIDENT_EVENT: - parsed_event= proto_incident_event(is, header); - break; - case WRITE_ROWS_EVENT: - case UPDATE_ROWS_EVENT: - case DELETE_ROWS_EVENT: - parsed_event= proto_rows_event(is, header); - break; - case ROTATE_EVENT: - { - Rotate_event *rot= proto_rotate_event(is, header); - m_binlog_file_name= rot->binlog_file; - m_binlog_offset= (unsigned long)rot->binlog_pos; - parsed_event= rot; - } - break; - case INTVAR_EVENT: - parsed_event= proto_intvar_event(is, header); - break; - case USER_VAR_EVENT: - parsed_event= proto_uservar_event(is, header); - break; - default: - { - // Create a dummy driver. - parsed_event= new Binary_log_event(header); - } - } + /* Binary_log_event* Binary_log_driver::parse_event(asio::streambuf &sbuff, Log_event_header *header) */ - return parsed_event; -} + Binary_log_event* Binary_log_driver::parse_event(std::istream &is, Log_event_header *header) + { + Binary_log_event *parsed_event= 0; -} + switch (header->type_code) { + case TABLE_MAP_EVENT: + parsed_event= proto_table_map_event(is, header); + break; + case QUERY_EVENT: + parsed_event= proto_query_event(is, header); + break; + case INCIDENT_EVENT: + parsed_event= proto_incident_event(is, header); + break; + case WRITE_ROWS_EVENT: + case UPDATE_ROWS_EVENT: + case DELETE_ROWS_EVENT: + parsed_event= proto_rows_event(is, header); + break; + case ROTATE_EVENT: + { + Rotate_event *rot= proto_rotate_event(is, header); + m_binlog_file_name= rot->binlog_file; + m_binlog_offset= (unsigned long)rot->binlog_pos; + parsed_event= rot; + } + break; + case INTVAR_EVENT: + parsed_event= proto_intvar_event(is, header); + break; + case USER_VAR_EVENT: + parsed_event= proto_uservar_event(is, header); + break; + default: + { + // Create a dummy driver. + parsed_event= new Binary_log_event(header); + } + } + + return parsed_event; + } + + } + } diff --git a/src/binlog_event.cpp b/src/binlog_event.cpp index c2f52c6..67d26aa 100644 --- a/src/binlog_event.cpp +++ b/src/binlog_event.cpp @@ -1,21 +1,21 @@ /* -Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights -reserved. + Copyright (c) 2003, 2011, Oracle and/or its affiliates. All rights + reserved. -This program is free software; you can redistribute it and/or -modify it under the terms of the GNU General Public License -as published by the Free Software Foundation; version 2 of -the License. + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. -You should have received a copy of the GNU General Public License -along with this program; if not, write to the Free Software -Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA -02110-1301 USA + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA + 02110-1301 USA */ #include "binlog_event.h" @@ -25,59 +25,59 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA namespace mysql { -namespace system { + namespace system { -const char *get_event_type_str(Log_event_type type) -{ - switch(type) { - case START_EVENT_V3: return "Start_v3"; - case STOP_EVENT: return "Stop"; - case QUERY_EVENT: return "Query"; - case ROTATE_EVENT: return "Rotate"; - case INTVAR_EVENT: return "Intvar"; - case LOAD_EVENT: return "Load"; - case NEW_LOAD_EVENT: return "New_load"; - case SLAVE_EVENT: return "Slave"; - case CREATE_FILE_EVENT: return "Create_file"; - case APPEND_BLOCK_EVENT: return "Append_block"; - case DELETE_FILE_EVENT: return "Delete_file"; - case EXEC_LOAD_EVENT: return "Exec_load"; - case RAND_EVENT: return "RAND"; - case XID_EVENT: return "Xid"; - case USER_VAR_EVENT: return "User var"; - case FORMAT_DESCRIPTION_EVENT: return "Format_desc"; - case TABLE_MAP_EVENT: return "Table_map"; - case PRE_GA_WRITE_ROWS_EVENT: return "Write_rows_event_old"; - case PRE_GA_UPDATE_ROWS_EVENT: return "Update_rows_event_old"; - case PRE_GA_DELETE_ROWS_EVENT: return "Delete_rows_event_old"; - case WRITE_ROWS_EVENT: return "Write_rows"; - case UPDATE_ROWS_EVENT: return "Update_rows"; - case DELETE_ROWS_EVENT: return "Delete_rows"; - case BEGIN_LOAD_QUERY_EVENT: return "Begin_load_query"; - case EXECUTE_LOAD_QUERY_EVENT: return "Execute_load_query"; - case INCIDENT_EVENT: return "Incident"; - case USER_DEFINED: return "User defined"; - default: return "Unknown"; - } -} + const char *get_event_type_str(Log_event_type type) + { + switch(type) { + case START_EVENT_V3: return "Start_v3"; + case STOP_EVENT: return "Stop"; + case QUERY_EVENT: return "Query"; + case ROTATE_EVENT: return "Rotate"; + case INTVAR_EVENT: return "Intvar"; + case LOAD_EVENT: return "Load"; + case NEW_LOAD_EVENT: return "New_load"; + case SLAVE_EVENT: return "Slave"; + case CREATE_FILE_EVENT: return "Create_file"; + case APPEND_BLOCK_EVENT: return "Append_block"; + case DELETE_FILE_EVENT: return "Delete_file"; + case EXEC_LOAD_EVENT: return "Exec_load"; + case RAND_EVENT: return "RAND"; + case XID_EVENT: return "Xid"; + case USER_VAR_EVENT: return "User var"; + case FORMAT_DESCRIPTION_EVENT: return "Format_desc"; + case TABLE_MAP_EVENT: return "Table_map"; + case PRE_GA_WRITE_ROWS_EVENT: return "Write_rows_event_old"; + case PRE_GA_UPDATE_ROWS_EVENT: return "Update_rows_event_old"; + case PRE_GA_DELETE_ROWS_EVENT: return "Delete_rows_event_old"; + case WRITE_ROWS_EVENT: return "Write_rows"; + case UPDATE_ROWS_EVENT: return "Update_rows"; + case DELETE_ROWS_EVENT: return "Delete_rows"; + case BEGIN_LOAD_QUERY_EVENT: return "Begin_load_query"; + case EXECUTE_LOAD_QUERY_EVENT: return "Execute_load_query"; + case INCIDENT_EVENT: return "Incident"; + case USER_DEFINED: return "User defined"; + default: return "Unknown"; + } + } -} // end namespace system + } // end namespace system -Binary_log_event::~Binary_log_event() -{ -} + Binary_log_event::~Binary_log_event() + { + } -Binary_log_event * create_incident_event(unsigned int type, const char *message, unsigned long pos) -{ - Incident_event *incident= new Incident_event(); - incident->header()->type_code= INCIDENT_EVENT; - incident->header()->next_position= pos; - incident->header()->event_length= LOG_EVENT_HEADER_SIZE + 2 + strlen(message); - incident->type= type; - incident->message.append(message); - return incident; -} + Binary_log_event * create_incident_event(unsigned int type, const char *message, unsigned long pos) + { + Incident_event *incident= new Incident_event(); + incident->header()->type_code= INCIDENT_EVENT; + incident->header()->next_position= pos; + incident->header()->event_length= LOG_EVENT_HEADER_SIZE + 2 + strlen(message); + incident->type= type; + incident->message.append(message); + return incident; + } } // end namespace mysql diff --git a/src/decimal.cpp b/src/decimal.cpp new file mode 100644 index 0000000..d66adb3 --- /dev/null +++ b/src/decimal.cpp @@ -0,0 +1,422 @@ +/* Copyright (c) 2004, 2013, Oracle and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include +#include +#include +#include + +#include "decimal.h" + +typedef decimal_digit_t dec1; +typedef long long dec2; + +#define DBUG_ASSERT(a) assert(a) +#define test(a) ((a) ? 1 : 0) +#define max(a, b) ((a) > (b) ? (a) : (b)) +#define min(a, b) ((a) < (b) ? (a) : (b)) +#define mi_sint1korr(A) ((int8_t)(*A)) +#define mi_uint1korr(A) ((uint8_t)(*A)) +#define mi_sint2korr(A) ((int16_t) (((int16_t) (((u_char*) (A))[1])) +\ + ((int16_t) ((int16_t) ((char *) (A))[0]) << 8))) +#define mi_sint3korr(A) ((int32_t) (((((u_char*) (A))[0]) & 128) ? \ + (((uint32_t) 255L << 24) | \ + (((uint32_t) ((u_char*) (A))[0]) << 16) |\ + (((uint32_t) ((u_char*) (A))[1]) << 8) | \ + ((uint32_t) ((u_char*) (A))[2])) : \ + (((uint32_t) ((u_char*) (A))[0]) << 16) |\ + (((uint32_t) ((u_char*) (A))[1]) << 8) | \ + ((uint32_t) ((u_char*) (A))[2]))) +#define mi_sint4korr(A) ((int32_t) (((int32_t) (((u_char*) (A))[3])) +\ + ((int32_t) (((u_char*) (A))[2]) << 8) +\ + ((int32_t) (((u_char*) (A))[1]) << 16) +\ + ((int32_t) ((int16_t) ((char*) (A))[0]) << 24))) + +#define DIG_PER_DEC1 9 +#define DIG_MASK 100000000 +#define DIG_BASE 1000000000 +#define DIG_MAX (DIG_BASE-1) +#define DIG_BASE2 ((dec2)DIG_BASE * (dec2)DIG_BASE) +#define ROUND_UP(X) (((X)+DIG_PER_DEC1-1)/DIG_PER_DEC1) + +static const dec1 powers10[DIG_PER_DEC1+1]={ + 1, 10, 100, 1000, 10000, 100000, 1000000, 10000000, 100000000, 1000000000}; + +static const int dig2bytes[DIG_PER_DEC1+1]={0, 1, 1, 2, 2, 3, 3, 4, 4, 4}; + +static const dec1 frac_max[DIG_PER_DEC1-1]={ + 900000000, 990000000, 999000000, + 999900000, 999990000, 999999000, + 999999900, 999999990 }; + +#define sanity(d) DBUG_ASSERT((d)->len >0 && ((d)->buf[0] | \ + (d)->buf[(d)->len-1] | 1)) + +#define FIX_INTG_FRAC_ERROR(len, intg1, frac1, error) \ + do \ + { \ + if (unlikely(intg1+frac1 > (len))) \ + { \ + if (unlikely(intg1 > (len))) \ + { \ + intg1=(len); \ + frac1=0; \ + error=E_DEC_OVERFLOW; \ + } \ + else \ + { \ + frac1=(len)-intg1; \ + error=E_DEC_TRUNCATED; \ + } \ + } \ + else \ + error=E_DEC_OK; \ + } while(0) + +#define ADD(to, from1, from2, carry) /* assume carry <= 1 */ \ + do \ + { \ + dec1 a=(from1)+(from2)+(carry); \ + DBUG_ASSERT((carry) <= 1); \ + if (((carry)= a >= DIG_BASE)) /* no division here! */ \ + a-=DIG_BASE; \ + (to)=a; \ + } while(0) + +#define ADD2(to, from1, from2, carry) \ + do \ + { \ + dec2 a=((dec2)(from1))+(from2)+(carry); \ + if (((carry)= a >= DIG_BASE)) \ + a-=DIG_BASE; \ + if (unlikely(a >= DIG_BASE)) \ + { \ + a-=DIG_BASE; \ + carry++; \ + } \ + (to)=(dec1) a; \ + } while(0) + +#define SUB(to, from1, from2, carry) /* to=from1-from2 */ \ + do \ + { \ + dec1 a=(from1)-(from2)-(carry); \ + if (((carry)= a < 0)) \ + a+=DIG_BASE; \ + (to)=a; \ + } while(0) + +#define SUB2(to, from1, from2, carry) /* to=from1-from2 */ \ + do \ + { \ + dec1 a=(from1)-(from2)-(carry); \ + if (((carry)= a < 0)) \ + a+=DIG_BASE; \ + if (unlikely(a < 0)) \ + { \ + a+=DIG_BASE; \ + carry++; \ + } \ + (to)=a; \ + } while(0) + +static dec1 *remove_leading_zeroes(const decimal_t *from, int *intg_result) +{ + int intg= from->intg, i; + dec1 *buf0= from->buf; + i= ((intg - 1) % DIG_PER_DEC1) + 1; + while (intg > 0 && *buf0 == 0) + { + intg-= i; + i= DIG_PER_DEC1; + buf0++; + } + if (intg > 0) + { + for (i= (intg - 1) % DIG_PER_DEC1; *buf0 < powers10[i--]; intg--) ; + DBUG_ASSERT(intg > 0); + } + else + intg=0; + *intg_result= intg; + return buf0; +} + +/* + Convert decimal to its printable string representation + + SYNOPSIS + decimal2string() + from - value to convert + to - points to buffer where string representation + should be stored + *to_len - in: size of to buffer (incl. terminating '\0') + out: length of the actually written string (excl. '\0') + fixed_precision - 0 if representation can be variable length and + fixed_decimals will not be checked in this case. + Put number as with fixed point position with this + number of digits (sign counted and decimal point is + counted) + fixed_decimals - number digits after point. + filler - character to fill gaps in case of fixed_precision > 0 + + RETURN VALUE + E_DEC_OK/E_DEC_TRUNCATED/E_DEC_OVERFLOW +*/ + +int decimal2string(const decimal_t *from, char *to, int *to_len, + int fixed_precision, int fixed_decimals, + char filler) +{ + /* {intg_len, frac_len} output widths; {intg, frac} places in input */ + int len, intg, frac= from->frac, i, intg_len, frac_len, fill; + /* number digits before decimal point */ + int fixed_intg= (fixed_precision ? + (fixed_precision - fixed_decimals) : 0); + int error=E_DEC_OK; + char *s=to; + dec1 *buf, *buf0=from->buf, tmp; + + DBUG_ASSERT(*to_len >= 2+from->sign); + + /* removing leading zeroes */ + buf0 = remove_leading_zeroes(from, &intg); + if (unlikely(intg+frac==0)) + { + intg=1; + tmp=0; + buf0=&tmp; + } + + if (!(intg_len= fixed_precision ? fixed_intg : intg)) + intg_len= 1; + frac_len= fixed_precision ? fixed_decimals : frac; + len= from->sign + intg_len + test(frac) + frac_len; + if (fixed_precision) + { + if (frac > fixed_decimals) + { + error= E_DEC_TRUNCATED; + frac= fixed_decimals; + } + if (intg > fixed_intg) + { + error= E_DEC_OVERFLOW; + intg= fixed_intg; + } + } + else if (unlikely(len > --*to_len)) /* reserve one byte for \0 */ + { + int j= len-*to_len; + error= (frac && j <= frac + 1) ? E_DEC_TRUNCATED : E_DEC_OVERFLOW; + if (frac && j >= frac + 1) j--; + if (j > frac) + { + intg-= j-frac; + frac= 0; + } + else + frac-=j; + len= from->sign + intg_len + test(frac) + frac_len; + } + *to_len=len; + s[len]=0; + + if (from->sign) + *s++='-'; + + if (frac) + { + char *s1= s + intg_len; + fill= frac_len - frac; + buf=buf0+ROUND_UP(intg); + *s1++='.'; + for (; frac>0; frac-=DIG_PER_DEC1) + { + dec1 x=*buf++; + for (i=min(frac, DIG_PER_DEC1); i; i--) + { + dec1 y=x/DIG_MASK; + *s1++='0'+(u_char)y; + x-=y*DIG_MASK; + x*=10; + } + } + for(; fill; fill--) + *s1++=filler; + } + + fill= intg_len - intg; + if (intg == 0) + fill--; /* symbol 0 before digital point */ + for(; fill; fill--) + *s++=filler; + if (intg) + { + s+=intg; + for (buf=buf0+ROUND_UP(intg); intg>0; intg-=DIG_PER_DEC1) + { + dec1 x=*--buf; + for (i=min(intg, DIG_PER_DEC1); i; i--) + { + dec1 y=x/10; + *--s='0'+(u_char)(x-y*10); + x=y; + } + } + } + else + *s= '0'; + return error; +} + +/* + Restores decimal from its binary fixed-length representation + + SYNOPSIS + bin2decimal() + from - value to convert + to - result + precision/scale - see decimal_bin_size() below + + NOTE + see decimal2bin() + the buffer is assumed to be of the size decimal_bin_size(precision, scale) + + RETURN VALUE + E_DEC_OK/E_DEC_TRUNCATED/E_DEC_OVERFLOW +*/ + +int bin2decimal(const u_char *from, decimal_t *to, int precision, int scale) +{ + int error=E_DEC_OK, intg=precision-scale, + intg0=intg/DIG_PER_DEC1, frac0=scale/DIG_PER_DEC1, + intg0x=intg-intg0*DIG_PER_DEC1, frac0x=scale-frac0*DIG_PER_DEC1, + intg1=intg0+(intg0x>0), frac1=frac0+(frac0x>0); + dec1 *buf=to->buf, mask=(*from & 0x80) ? 0 : -1; + const u_char *stop; + u_char *d_copy; + int bin_size= decimal_bin_size(precision, scale); + + sanity(to); + d_copy= (u_char*) malloc(bin_size); + memcpy(d_copy, from, bin_size); + d_copy[0]^= 0x80; + from= d_copy; + + FIX_INTG_FRAC_ERROR(to->len, intg1, frac1, error); + if (unlikely(error)) + { + if (intg1 < intg0+(intg0x>0)) + { + from+=dig2bytes[intg0x]+sizeof(dec1)*(intg0-intg1); + frac0=frac0x=intg0x=0; + intg0=intg1; + } + else + { + frac0x=0; + frac0=frac1; + } + } + + to->sign=(mask != 0); + to->intg=intg0*DIG_PER_DEC1+intg0x; + to->frac=frac0*DIG_PER_DEC1+frac0x; + + if (intg0x) + { + int i=dig2bytes[intg0x]; + dec1 x = 0; + switch (i) + { + case 1: x=mi_sint1korr(from); break; + case 2: x=mi_sint2korr(from); break; + case 3: x=mi_sint3korr(from); break; + case 4: x=mi_sint4korr(from); break; + default: DBUG_ASSERT(0); + } + from+=i; + *buf=x ^ mask; + if (((unsigned long long)*buf) >= (unsigned long long) powers10[intg0x+1]) + goto err; + if (buf > to->buf || *buf != 0) + buf++; + else + to->intg-=intg0x; + } + for (stop=from+intg0*sizeof(dec1); from < stop; from+=sizeof(dec1)) + { + DBUG_ASSERT(sizeof(dec1) == 4); + *buf=mi_sint4korr(from) ^ mask; + if (((uint32_t)*buf) > DIG_MAX) + goto err; + if (buf > to->buf || *buf != 0) + buf++; + else + to->intg-=DIG_PER_DEC1; + } + DBUG_ASSERT(to->intg >=0); + for (stop=from+frac0*sizeof(dec1); from < stop; from+=sizeof(dec1)) + { + DBUG_ASSERT(sizeof(dec1) == 4); + *buf=mi_sint4korr(from) ^ mask; + if (((uint32_t)*buf) > DIG_MAX) + goto err; + buf++; + } + if (frac0x) + { + int i=dig2bytes[frac0x]; + dec1 x = 0; + switch (i) + { + case 1: x=mi_sint1korr(from); break; + case 2: x=mi_sint2korr(from); break; + case 3: x=mi_sint3korr(from); break; + case 4: x=mi_sint4korr(from); break; + default: DBUG_ASSERT(0); + } + *buf=(x ^ mask) * powers10[DIG_PER_DEC1 - frac0x]; + if (((uint32_t)*buf) > DIG_MAX) + goto err; + buf++; + } + free(d_copy); + + /* + No digits? We have read the number zero, of unspecified precision. + Make it a proper zero, with non-zero precision. + */ + if (to->intg == 0 && to->frac == 0) + decimal_make_zero(to); + return error; + +err: + free(d_copy); + decimal_make_zero(to); + return(E_DEC_BAD_NUM); +} + +int decimal_bin_size(int precision, int scale) +{ + int intg=precision-scale, + intg0=intg/DIG_PER_DEC1, frac0=scale/DIG_PER_DEC1, + intg0x=intg-intg0*DIG_PER_DEC1, frac0x=scale-frac0*DIG_PER_DEC1; + + return intg0*sizeof(dec1)+dig2bytes[intg0x]+ + frac0*sizeof(dec1)+dig2bytes[frac0x]; +} + diff --git a/src/field_iterator.cpp b/src/field_iterator.cpp index 1198a11..6f4dba7 100644 --- a/src/field_iterator.cpp +++ b/src/field_iterator.cpp @@ -34,7 +34,7 @@ bool is_null(unsigned char *bitmap, int index) } -uint32_t extract_metadata(const Table_map_event *map, int col_no) +uint16_t extract_metadata(const Table_map_event *map, int col_no) { int offset= 0; @@ -44,21 +44,19 @@ uint32_t extract_metadata(const Table_map_event *map, int col_no) offset += lookup_metadata_field_size((enum mysql::system::enum_field_types)type); } - uint32_t metadata= 0; + uint16_t metadata= 0; unsigned int type= (unsigned int)map->columns[col_no]&0xFF; switch(lookup_metadata_field_size((enum mysql::system::enum_field_types)type)) { - case 1: - metadata= map->metadata[offset]; - break; - case 2: - { - unsigned int tmp= ((unsigned int)map->metadata[offset])&0xFF; - metadata= static_cast(tmp); - tmp= (((unsigned int)map->metadata[offset+1])&0xFF) << 8; - metadata+= static_cast(tmp); - } - break; + case 1: + metadata = (unsigned char)map->metadata[offset]; + break; + case 2: + { + metadata = ((unsigned char)map->metadata[offset]) & 0xFF; + metadata += ((((unsigned char)map->metadata[offset+1]) & 0xFF) << 8); + } + break; } return metadata; } diff --git a/src/file_driver.cpp b/src/file_driver.cpp index cfdc267..e502522 100644 --- a/src/file_driver.cpp +++ b/src/file_driver.cpp @@ -70,6 +70,10 @@ using namespace std; return ERR_OK; } + int Binlog_file_driver::set_server_id(int server_id) + { + return server_id; + } int Binlog_file_driver::set_position(const string &str, unsigned long position) { diff --git a/src/protocol.cpp b/src/protocol.cpp index adace5b..2f1695f 100644 --- a/src/protocol.cpp +++ b/src/protocol.cpp @@ -96,18 +96,22 @@ void prot_parse_error_message(std::istream &is, struct st_error_package &err, int packet_length) { uint8_t marker; - + int message_size; + Protocol_chunk prot_errno(err.error_code); Protocol_chunk prot_marker(marker); - Protocol_chunk prot_sql_state(err.sql_state,5); is >> prot_errno - >> prot_marker - >> prot_sql_state; - - // TODO is the number of bytes read = is.tellg() ? - - int message_size= packet_length -2 -1 -5; // the remaining part of the package + >> prot_marker; + + // TODO is the number of bytes read = is.tellg() ? + if(marker == '#') { + Protocol_chunk prot_sql_state(err.sql_state,5); + is >> prot_sql_state; + message_size = packet_length - 2 - 1 - 5; + } else { + message_size= packet_length - 2 - 1; // the remaining part of the package + } Protocol_chunk_string prot_message(err.message, message_size); is >> prot_message; err.message[message_size]= '\0'; @@ -115,8 +119,7 @@ void prot_parse_error_message(std::istream &is, struct st_error_package &err, void prot_parse_ok_message(std::istream &is, struct st_ok_package &ok, int packet_length) { - // TODO: Assure that zero length messages can be but on the input stream. - + // TODO: Assure that zero length messages can be but on the input stream. //Protocol_chunk prot_result_type(result_type); Protocol_chunk prot_affected_rows(ok.affected_rows); Protocol_chunk prot_insert_id(ok.insert_id); @@ -315,7 +318,7 @@ std::istream &operator>>(std::istream &is, Protocol_chunk_string_len &lenstr) std::ostream &operator<<(std::ostream &os, Protocol &chunk) { if (!os.bad()) - os.write((const char *) chunk.data(),chunk.size()); + os.write((const char *) chunk.data(), chunk.size()); return os; } @@ -446,7 +449,7 @@ Row_event *proto_rows_event(std::istream &is, Log_event_header *header) bytes_read+=used_column_len; unsigned long row_len= header->event_length - bytes_read - LOG_EVENT_HEADER_SIZE + 1; - std::cout << "Bytes read: " << bytes_read << " Bytes expected: " << row_len << std::endl; + //std::cout << "Bytes read: " << bytes_read << " Bytes expected: " << row_len << std::endl; Protocol_chunk_vector proto_row(rev->row, row_len); is >> proto_row; diff --git a/src/resultset_iterator.cpp b/src/resultset_iterator.cpp index 169e3d7..58ca058 100644 --- a/src/resultset_iterator.cpp +++ b/src/resultset_iterator.cpp @@ -78,10 +78,9 @@ void Result_set::digest_row_set() bool is_eof= false; Row_of_fields row(0); system::digest_row_content(response_stream, m_field_count, row, m_storage, is_eof); - if (is_eof) + if (is_eof) { m_current_state= EOF_PACKET; - else - { + } else { m_rows.push_back(row); ++m_row_count; } @@ -110,7 +109,7 @@ void digest_result_header(std::istream &is, uint64_t &field_count, uint64_t extr //proto_extra.set_length_encoded_binary(true); is >> proto_field_count; - //>> proto_extra; + //>> proto_extra; } void digest_field_packet(std::istream &is, Field_packet &field_packet) @@ -154,6 +153,7 @@ void digest_marker(std::istream &is) void digest_row_content(std::istream &is, int field_count, Row_of_fields &row, String_storage &storage, bool &is_eof) { uint8_t size; + Protocol_chunk proto_size(size); is >> proto_size; if (size == 0xfe) @@ -167,12 +167,18 @@ void digest_row_content(std::istream &is, int field_count, Row_of_fields &row, S is.putback((char)size); for(int field_no=0; field_no < field_count; ++field_no) { - std::string *storage= new std::string; + // std::string storage_str; + std::string *storage_str = new std::string; - Protocol_chunk_string_len proto_value(*storage); + // Protocol_chunk_string_len proto_value(storage_str); + Protocol_chunk_string_len proto_value(*storage_str); + is >> proto_value; - - Value value(MYSQL_TYPE_VAR_STRING, storage->length(), storage->c_str()); + storage.push_back(storage_str); + + // Value value(MYSQL_TYPE_VAR_STRING, storage.back().length(), storage.back().c_str()); + Value value(MYSQL_TYPE_VAR_STRING, storage.back()->length(), storage.back()->c_str()); + row.push_back(value); } } diff --git a/src/tcp_driver.cpp b/src/tcp_driver.cpp index 1c7e015..df05ca5 100644 --- a/src/tcp_driver.cpp +++ b/src/tcp_driver.cpp @@ -51,9 +51,19 @@ static int encrypt_password(uint8_t *reply, /* buffer at least EVP_MAX_MD_SIZE const char *pass); static int hash_sha1(uint8_t *output, ...); - int Binlog_tcp_driver::connect(const std::string& user, const std::string& passwd, - const std::string& host, long port, - const std::string& binlog_filename, size_t offset) +int Binlog_tcp_driver::set_server_id(int server_id) +{ + if(server_id < 1) { + srand((unsigned int)(time(NULL))); + server_id = rand() % 10000 + 10000; + } + m_server_id = server_id; + return m_server_id; +} + +int Binlog_tcp_driver::connect(const std::string& user, const std::string& passwd, + const std::string& host, long port, + const std::string& binlog_filename, size_t offset) { m_user=user; m_passwd=passwd; @@ -62,7 +72,7 @@ static int hash_sha1(uint8_t *output, ...); if (!m_socket) { - if ((m_socket=sync_connect_and_authenticate(m_io_service, user, passwd, host, port)) == 0) + if ((m_socket=sync_connect_and_authenticate(m_io_service, user, passwd, host, port, m_server_id)) == 0) return 1; } @@ -73,20 +83,18 @@ static int hash_sha1(uint8_t *output, ...); { if (fetch_master_status(m_socket, &m_binlog_file_name, &m_binlog_offset)) return 1; - } else - { - m_binlog_file_name=binlog_filename; - m_binlog_offset=offset; + } else { + m_binlog_file_name = binlog_filename; + m_binlog_offset = offset; } - /* We're ready to start the io service and request the binlog dump. */ start_binlog_dump(m_binlog_file_name, m_binlog_offset); return 0; } -tcp::socket *sync_connect_and_authenticate(asio::io_service &io_service, const std::string &user, const std::string &passwd, const std::string &host, long port) +tcp::socket *sync_connect_and_authenticate(asio::io_service &io_service, const std::string &user, const std::string &passwd, const std::string &host, long port, int server_id) { tcp::resolver resolver(io_service); @@ -94,31 +102,29 @@ tcp::socket *sync_connect_and_authenticate(asio::io_service &io_service, const s asio::error_code error = asio::error::host_not_found; - if (port == 0) - port= 3306; + if (port == 0) port = 3306; tcp::socket *socket=new tcp::socket(io_service); /* Try each endpoint until we successfully establish a connection. */ try { - tcp::resolver::iterator endpoint_iterator=resolver.resolve(query); - tcp::resolver::iterator end; - - while (error && endpoint_iterator != end) - { - /* - Hack to set port number from a long int instead of a service. - */ - tcp::endpoint endpoint=endpoint_iterator->endpoint(); - endpoint.port(port); + tcp::resolver::iterator endpoint_iterator=resolver.resolve(query); + tcp::resolver::iterator end; - socket->close(); - socket->connect(endpoint, error); - endpoint_iterator++; - } - } catch(...) - { + while (error && endpoint_iterator != end) + { + /* + Hack to set port number from a long int instead of a service. + */ + tcp::endpoint endpoint=endpoint_iterator->endpoint(); + endpoint.port(port); + + socket->close(); + socket->connect(endpoint, error); + endpoint_iterator++; + } + } catch(...) { return 0; } @@ -153,8 +159,7 @@ tcp::socket *sync_connect_and_authenticate(asio::io_service &io_service, const s * Get server handshake package */ std::streamsize inbuffer=server_messages.in_avail(); - if (inbuffer < 0) - inbuffer=0; + if (inbuffer < 0) inbuffer=0; asio::read(*socket, server_messages, asio::transfer_at_least(packet_length - inbuffer)); std::istream server_stream(&server_messages); @@ -162,23 +167,25 @@ tcp::socket *sync_connect_and_authenticate(asio::io_service &io_service, const s proto_get_handshake_package(server_stream, handshake_package, packet_length); - if (authenticate(socket, user, passwd, handshake_package)) - return 0; + if (authenticate(socket, user, passwd, handshake_package)) return 0; /* * Register slave to master */ std::ostream command_request_stream(&server_messages); - Protocol_chunk prot_command(COM_REGISTER_SLAVE); - Protocol_chunk prot_connection_port(port); - Protocol_chunk prot_rpl_recovery_rank(0); - Protocol_chunk prot_server_id(1); - Protocol_chunk prot_master_server_id(1); + Protocol_chunk prot_command(COM_REGISTER_SLAVE, NEED_ALLOC); + Protocol_chunk prot_connection_port((uint16_t)port, NEED_ALLOC); + Protocol_chunk prot_rpl_recovery_rank(0, NEED_ALLOC); + Protocol_chunk prot_server_id(server_id, NEED_ALLOC); // slave server-id + /* + * See document at http://dev.mysql.com/doc/internals/en/replication-protocol.html + */ + Protocol_chunk prot_master_server_id(0, NEED_ALLOC); - Protocol_chunk prot_report_host_strlen(host.size()); - Protocol_chunk prot_user_strlen(user.size()); - Protocol_chunk prot_passwd_strlen(passwd.size()); + Protocol_chunk prot_report_host_strlen(host.size(), NEED_ALLOC); + Protocol_chunk prot_user_strlen(user.size(), NEED_ALLOC); + Protocol_chunk prot_passwd_strlen(passwd.size(), NEED_ALLOC); command_request_stream << prot_command << prot_server_id @@ -193,18 +200,18 @@ tcp::socket *sync_connect_and_authenticate(asio::io_service &io_service, const s << prot_master_server_id; int size=server_messages.size(); + char command_packet_header[4]; try { write_packet_header(command_packet_header, size, 0); // packet_no= 0 // Send the request. asio::write(*socket, - asio::buffer(command_packet_header, 4), - asio::transfer_at_least(4)); + asio::buffer(command_packet_header, 4), + asio::transfer_at_least(4)); asio::write(*socket, server_messages, - asio::transfer_at_least(size)); - } catch( asio::error_code e) - { + asio::transfer_at_least(size)); + } catch( asio::error_code e) { return 0; } @@ -215,15 +222,14 @@ tcp::socket *sync_connect_and_authenticate(asio::io_service &io_service, const s uint8_t result_type; Protocol_chunk prot_result_type(result_type); - + // printf("Result type:\t%x\n", result_type); cmd_response_stream >> prot_result_type; if (result_type == 0) { struct st_ok_package ok_package; prot_parse_ok_message(cmd_response_stream, ok_package, packet_length); - } else - { + } else { struct st_error_package error_package; prot_parse_error_message(cmd_response_stream, error_package, packet_length); return 0; @@ -239,10 +245,10 @@ void Binlog_tcp_driver::start_binlog_dump(const std::string &binlog_file_name, s std::ostream command_request_stream(&server_messages); - Protocol_chunk prot_command(COM_BINLOG_DUMP); - Protocol_chunk prot_binlog_offset(offset); // binlog position to start at - Protocol_chunk prot_binlog_flags(0); // not used - Protocol_chunk prot_server_id(1); // must not be 0; see handshake package + Protocol_chunk prot_command(COM_BINLOG_DUMP, NEED_ALLOC); + Protocol_chunk prot_binlog_offset(offset, NEED_ALLOC); // binlog position to start at + Protocol_chunk prot_binlog_flags(0, NEED_ALLOC); // not used + Protocol_chunk prot_server_id(m_server_id, NEED_ALLOC); // must not be 0; see handshake package command_request_stream << prot_command @@ -279,7 +285,10 @@ void Binlog_tcp_driver::start_binlog_dump(const std::string &binlog_file_name, s if (!m_event_loop) { this->thread_data->tcp_driver = this; m_event_loop = (pthread_t *)malloc(sizeof(pthread_t)); - pthread_create(m_event_loop, NULL, &Binlog_tcp_driver::start, (void *)this->thread_data); + pthread_attr_t attr; + pthread_attr_init(&attr); + pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + pthread_create(m_event_loop, &attr, &Binlog_tcp_driver::start, (void *)this->thread_data); } } @@ -299,12 +308,12 @@ static void proto_event_packet_header(asio::streambuf &event_src, Log_event_head Protocol_chunk prot_flags(h->flags); is >> prot_marker - >> prot_timestamp - >> prot_type_code - >> prot_server_id - >> prot_event_length - >> prot_next_position - >> prot_flags; + >> prot_timestamp + >> prot_type_code + >> prot_server_id + >> prot_event_length + >> prot_next_position + >> prot_flags; } void Binlog_tcp_driver::handle_net_packet(const asio::error_code& err, std::size_t bytes_transferred) @@ -312,7 +321,7 @@ void Binlog_tcp_driver::handle_net_packet(const asio::error_code& err, std::size if (err) { Binary_log_event * ev= create_incident_event(175, err.message().c_str(), m_binlog_offset); - std::cout << "1:" << err.message() << std::endl; + // std::cout << "1:" << err.message() << std::endl; m_event_queue->push_front(ev); return; } @@ -326,12 +335,16 @@ void Binlog_tcp_driver::handle_net_packet(const asio::error_code& err, std::size << bytes_transferred << " instead."; Binary_log_event * ev= create_incident_event(175, os.str().c_str(), m_binlog_offset); - std::cout << "2:" << os.str() << std::endl; + // std::cout << "2:" << os.str() << std::endl; m_event_queue->push_front(ev); return; } //assert(m_waiting_event != 0); + if (!m_waiting_event) { + return; + } + //std::cerr << "Committing '"<< bytes_transferred << "' bytes to the event stream." << std::endl; m_event_stream_buffer.commit(bytes_transferred); /* @@ -339,13 +352,13 @@ void Binlog_tcp_driver::handle_net_packet(const asio::error_code& err, std::size hasn't been parsed. If the event stream also contains enough bytes we make the assumption that the next bytes waiting in the stream is the event header and attempt to parse it. - */ + */ if (m_waiting_event->event_length == 0 && m_event_stream_buffer.size() >= 19) { /* Copy and remove from the event stream, the remaining bytes might be dynamic payload. - */ + */ //std::cerr << "Consuming event stream for header. Size before: " << m_event_stream_buffer.size() << std::endl; proto_event_packet_header(m_event_stream_buffer, m_waiting_event); //std::cerr << " Size after: " << m_event_stream_buffer.size() << std::endl; @@ -388,7 +401,7 @@ void Binlog_tcp_driver::handle_net_packet_header(const asio::error_code& err, st if (err) { Binary_log_event * ev= create_incident_event(175, err.message().c_str(), m_binlog_offset); - std::cout << "3:" << err.message() << std::endl; + // std::cout << "3:" << err.message() << std::endl; m_event_queue->push_front(ev); return; } @@ -402,7 +415,7 @@ void Binlog_tcp_driver::handle_net_packet_header(const asio::error_code& err, st << bytes_transferred << " instead."; Binary_log_event * ev= create_incident_event(175, os.str().c_str(), m_binlog_offset); - std::cout << "4:" << os.str() << std::endl; + // std::cout << "4:" << os.str() << std::endl; m_event_queue->push_front(ev); return; } @@ -427,8 +440,8 @@ void Binlog_tcp_driver::handle_net_packet_header(const asio::error_code& err, st read_handler.method = &Binlog_tcp_driver::handle_net_packet; read_handler.tcp_driver = this; asio::async_read(*m_socket, - asio::buffer(m_event_packet, packet_length), - read_handler); + asio::buffer(m_event_packet, packet_length), + read_handler); } int authenticate(tcp::socket *socket, const std::string& user, const std::string& passwd, @@ -526,8 +539,7 @@ int Binlog_tcp_driver::wait_for_next_event(mysql::Binary_log_event **event_ptr) { // poll for new event until one event is found. // return the event - if (event_ptr) - *event_ptr = 0; + if (event_ptr) *event_ptr = 0; m_event_queue->pop_back(event_ptr); return 0; } @@ -587,7 +599,7 @@ void Binlog_tcp_driver::reconnect() connect(m_user, m_passwd, m_host, m_port); } -void Binlog_tcp_driver::disconnect() +int Binlog_tcp_driver::disconnect() { Binary_log_event * event; m_waiting_event= 0; @@ -597,9 +609,27 @@ void Binlog_tcp_driver::disconnect() m_event_queue->pop_back(&event); delete(event); } - if (m_socket) - m_socket->close(); + if (m_socket) m_socket->close(); m_socket= 0; + + /* + By posting to the io service we guarantee that the operations are + executed in the same thread as the io_service is running in. + */ + // shut down io service + Shutdown_handler shutdown_handler; + shutdown_handler.method = &Binlog_tcp_driver::shutdown; + shutdown_handler.tcp_driver = this; + m_io_service.post(shutdown_handler); + + // free pthread + if (m_event_loop) { + pthread_join(*m_event_loop, NULL); + free(m_event_loop); + } + m_event_loop= 0; + + return ERR_OK; } @@ -616,14 +646,19 @@ int Binlog_tcp_driver::set_position(const std::string &str, unsigned long positi position we won't know if it succeded because the binlog dump is running in another thread asynchronously. */ + + if(position >= m_binlog_offset) { + return ERR_FAIL; + } + asio::io_service io_service; tcp::socket *socket; - if ((socket= sync_connect_and_authenticate(io_service, m_user, m_passwd, m_host, m_port)) == 0) + if ((socket= sync_connect_and_authenticate(io_service, m_user, m_passwd, m_host, m_port, m_server_id)) == 0) return ERR_FAIL; std::map binlog_map; - fetch_binlogs_name_and_size(socket, binlog_map); + if(fetch_binlogs_name_and_size(socket, binlog_map)) return ERR_FAIL; socket->close(); delete socket; @@ -647,16 +682,16 @@ int Binlog_tcp_driver::set_position(const std::string &str, unsigned long positi By posting to the io service we guarantee that the operations are executed in the same thread as the io_service is running in. */ - Shutdown_handler shutdown_handler; - shutdown_handler.method = &Binlog_tcp_driver::shutdown; - shutdown_handler.tcp_driver = this; - m_io_service.post(shutdown_handler); - if (m_event_loop) - { - pthread_join(*m_event_loop, NULL); - free(m_event_loop); - } - m_event_loop= 0; + // Shutdown_handler shutdown_handler; + // shutdown_handler.method = &Binlog_tcp_driver::shutdown; + // shutdown_handler.tcp_driver = this; + // m_io_service.post(shutdown_handler); + // if (m_event_loop) + // { + // pthread_join(*m_event_loop, NULL); + // free(m_event_loop); + // } + // m_event_loop= 0; disconnect(); /* Uppon return of connect we only know if we succesfully authenticated @@ -675,7 +710,7 @@ int Binlog_tcp_driver::get_position(std::string *filename_ptr, unsigned long *po tcp::socket *socket; - if ((socket=sync_connect_and_authenticate(io_service, m_user, m_passwd, m_host, m_port)) == 0) + if ((socket=sync_connect_and_authenticate(io_service, m_user, m_passwd, m_host, m_port, m_server_id)) == 0) return ERR_FAIL; if (fetch_master_status(socket, &m_binlog_file_name, &m_binlog_offset)) @@ -696,7 +731,7 @@ bool fetch_master_status(tcp::socket *socket, std::string *filename, unsigned lo std::ostream command_request_stream(&server_messages); - Protocol_chunk prot_command(COM_QUERY); + Protocol_chunk prot_command(COM_QUERY, NEED_ALLOC); command_request_stream << prot_command << "SHOW MASTER STATUS"; @@ -717,7 +752,7 @@ bool fetch_master_status(tcp::socket *socket, std::string *filename, unsigned lo it++) { Row_of_fields row(*it); - *filename= ""; + *filename = ""; conv.to(*filename, row[0]); long pos; conv.to(pos, row[1]); @@ -732,7 +767,7 @@ bool fetch_binlogs_name_and_size(tcp::socket *socket, std::map prot_command(COM_QUERY); + Protocol_chunk prot_command(COM_QUERY, NEED_ALLOC); command_request_stream << prot_command << "SHOW BINARY LOGS"; @@ -783,6 +818,7 @@ int hash_sha1(uint8_t *output, ...) EVP_DigestUpdate(hash_context, data, length); } EVP_DigestFinal_ex(hash_context, (unsigned char *)output, (unsigned int *)&result); + EVP_MD_CTX_destroy(hash_context); va_end(ap); return result; } diff --git a/src/value.cpp b/src/value.cpp index 05044a1..7cf36f5 100644 --- a/src/value.cpp +++ b/src/value.cpp @@ -17,27 +17,71 @@ along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ -#include "value.h" #include + +#include "value.h" #include "binlog_event.h" +#include "decimal.h" using namespace mysql; using namespace mysql::system; namespace mysql { -int calc_field_size(unsigned char column_type, const unsigned char *field_ptr, uint32_t metadata) +// calculate mysql NEW DECIMAL digits's size in bytes +static uint8_t calc_digits_size(uint8_t digits) +{ + uint8_t size = 0; + + while (digits > 9) { + digits -= 9; + size += 4; + } + if (digits >=7) { + size += 4; + } else if (digits >= 5 && digits <= 6) { + size += 3; + } else if (digits >= 3 && digits <= 4) { + size += 2; + } else if (digits >= 1 && digits <= 2) { + size += 1; + } + return size; +} + +uint8_t calc_newdecimal_size(uint8_t m, uint8_t d) +{ + // unsigned char digits_per_bytes[] = { 0, 1, 1, 2, 2, 3, 3, 4, 4, 4 }; + // uint8_t i_digits = m - d; + // uint8_t f_digits = d; + // uint8_t decimal_full_blocks = i_digits / 9; + // uint8_t decimal_last_block_digits = i_digits % 9; + // uint8_t scale_full_blocks = f_digits / 9; + // uint8_t scale_last_block_digits = f_digits % 9; + // uint8_t size = 0; + + // size += decimal_full_blocks * digits_per_bytes[9] + digits_per_bytes[decimal_last_block_digits]; + // size += scale_full_blocks * digits_per_bytes[9] + digits_per_bytes[scale_last_block_digits]; + + // (m - d) is left digits + + return calc_digits_size(m-d) + calc_digits_size(d); + +} + +int calc_field_size(enum mysql::system::enum_field_types column_type, const unsigned char *field_ptr, uint16_t metadata) { uint32_t length; switch (column_type) { case mysql::system::MYSQL_TYPE_VAR_STRING: /* This type is hijacked for result set types. */ - length= metadata; + length = metadata; break; case mysql::system::MYSQL_TYPE_NEWDECIMAL: - //length= my_decimal_get_binary_size(metadata_ptr[col] >> 8, - // metadata_ptr[col] & 0xff); - length= 0; + // higher byte is number of digits to the right of the decimal point + // lower byte is the maximum number of digits + length = calc_newdecimal_size(metadata & 0xff, metadata >> 8); + // length = 0; break; case mysql::system::MYSQL_TYPE_DECIMAL: case mysql::system::MYSQL_TYPE_FLOAT: @@ -48,29 +92,36 @@ int calc_field_size(unsigned char column_type, const unsigned char *field_ptr, u The cases for SET and ENUM are include for completeness, however both are mapped to type MYSQL_TYPE_STRING and their real types are encoded in the field metadata. + + THIS WILL NEVER BE EXECUTED. */ case mysql::system::MYSQL_TYPE_SET: case mysql::system::MYSQL_TYPE_ENUM: + { + length = (metadata >> 8U); + break; + } case mysql::system::MYSQL_TYPE_STRING: { - unsigned char type= metadata >> 8U; - if ((type == mysql::system::MYSQL_TYPE_SET) || (type == mysql::system::MYSQL_TYPE_ENUM)) - length= metadata & 0x00ff; - else - { - /* - We are reading the actual size from the master_data record - because this field has the actual lengh stored in the first - byte. - */ - length= (unsigned int) *field_ptr+1; - //DBUG_ASSERT(length != 0); + uint32_t maxlen = 0; + uint8_t lower = metadata & 0xFF; + if (lower == mysql::system::MYSQL_TYPE_SET || lower == mysql::system::MYSQL_TYPE_ENUM) { + length = (metadata >> 8U); + break; + } + uint8_t higher = metadata >> 8U; + if ((lower & 0x30) != 0x30) { + maxlen = (((lower & 0x30) ^ 0x30) << 4) | higher; + } else { + maxlen = higher; } + length = maxlen >= 256 ? (uint16_t)(*field_ptr) + 2 : (uint8_t) (*field_ptr) + 1; + // std::cout << "Len: " << length << std::endl; break; } case mysql::system::MYSQL_TYPE_YEAR: case mysql::system::MYSQL_TYPE_TINY: - length= 1; + length = 1; break; case mysql::system::MYSQL_TYPE_SHORT: length= 2; @@ -104,22 +155,21 @@ int calc_field_size(unsigned char column_type, const unsigned char *field_ptr, u { /* Decode the size of the bit field from the master. - from_len is the length in bytes from the master - from_bit_len is the number of extra bits stored in the master record + from_len is the length in bytes from the master + from_bit_len is the number of extra bits stored in the master record If from_bit_len is not 0, add 1 to the length to account for accurate number of bytes needed. */ - uint32_t from_len= (metadata >> 8U) & 0x00ff; - uint32_t from_bit_len= metadata & 0x00ff; - //DBUG_ASSERT(from_bit_len <= 7); - length= from_len + ((from_bit_len > 0) ? 1 : 0); + uint8_t from_len = metadata >> 8U; + uint8_t from_bit_len = metadata & 0xff; + length = from_len + ((from_bit_len > 0) ? 1 : 0); break; } case mysql::system::MYSQL_TYPE_VARCHAR: { - length= metadata > 255 ? 2 : 1; + length = metadata > 255 ? 2 : 1; - length+= length == 1 ? (uint32_t) *field_ptr : *((uint16_t *)field_ptr); + length += length == 1 ? (uint32_t) *field_ptr : *((uint16_t *)field_ptr); break; } @@ -129,7 +179,7 @@ int calc_field_size(unsigned char column_type, const unsigned char *field_ptr, u case mysql::system::MYSQL_TYPE_BLOB: case mysql::system::MYSQL_TYPE_GEOMETRY: { - switch (metadata) + switch (metadata) { case 1: length= 1+ (uint32_t) field_ptr[0]; @@ -157,17 +207,6 @@ int calc_field_size(unsigned char column_type, const unsigned char *field_ptr, u return length; } -/* -Value::Value(Value &val) -{ - m_size= val.length(); - m_storage= val.storage(); - m_type= val.type(); - m_metadata= val.metadata(); - m_is_null= val.is_null(); -} -*/ - Value::Value(const Value& val) { m_size= val.m_size; @@ -202,24 +241,42 @@ bool Value::operator!=(const Value &val) const char *Value::as_c_str(unsigned long &size) const { - if (m_is_null || m_size == 0) - { + if (m_is_null || m_size == 0) { size = 0; return NULL; } + + if (m_size == 1) { + size = 0; + return const_cast(m_storage); + } /* Length encoded; First byte is length of string. */ - int metadata_length= m_metadata > 255 ? 2 : 1; - - // std::cout << "m_size " << metadata_length << std::endl; + uint32_t maxlen = 0; + + if(m_type == mysql::system::MYSQL_TYPE_STRING) { + uint8_t lower = m_metadata & 0xFF; + uint8_t higher = m_metadata >> 8U; + if ((lower & 0x30) != 0x30) { + maxlen = (((lower & 0x30) ^ 0x30) << 4) | higher; + } else { + maxlen = higher; + } + } else { + maxlen = m_metadata; + } + + int metadata_length = maxlen > 255 ? 2 : 1; /* Size is length of the character string; not of the entire storage */ - size= m_size - metadata_length; + size = m_size - metadata_length; + // std::cout << "Len: " << m_size << " Metadata: " << m_metadata << " Size: " << size << " Max Len: " << maxlen << std::endl; + return const_cast(m_storage + metadata_length); } @@ -250,8 +307,8 @@ int32_t Value::as_int32() const { return 0; } - uint32_t to_int; - Protocol_chunk prot_integer(to_int); + int32_t to_int; + Protocol_chunk prot_integer(to_int); buffer_source buff(m_storage, m_size); buff >> prot_integer; @@ -319,48 +376,98 @@ void Converter::to(std::string &str, const Value &val) const str = "NULL"; return; } + if (!val.storage()) { + str = "NULL"; + return; + } std::ostringstream os; + switch(val.type()) { case MYSQL_TYPE_DECIMAL: + { str = "not implemented"; - return; + str = os.str(); + break; + } case MYSQL_TYPE_TINY: + { os << static_cast(val.as_int8()); + str = os.str(); break; + } case MYSQL_TYPE_SHORT: + { os << val.as_int16(); + str = os.str(); break; + } case MYSQL_TYPE_LONG: + { os << val.as_int32(); + str = os.str(); break; + } case MYSQL_TYPE_FLOAT: + { os << val.as_float(); + str = os.str(); break; + } case MYSQL_TYPE_DOUBLE: + { os << val.as_double(); + str = os.str(); break; + } case MYSQL_TYPE_NULL: + { str = "not implemented"; - return; + str = os.str(); + break; + } case MYSQL_TYPE_TIMESTAMP: + { os << (uint32_t)val.as_int32(); + str = os.str(); break; + } case MYSQL_TYPE_LONGLONG: + { os << val.as_int64(); + str = os.str(); break; + } case MYSQL_TYPE_INT24: + { str = "not implemented"; - return; + str = os.str(); + break; + } case MYSQL_TYPE_DATE: - str = "not implemented"; - return; + { + const char *storage = val.storage(); + uint32_t date = (storage[0] & 0xff) + ((storage[1] & 0xff) << 8) + ((storage[2] & 0xff) << 16); + // actually date occupies 3 bytes, so below will work + uint32_t year = date >> 9; + date -= (year << 9); + uint32_t month = date >> 5; + date -= (month << 5); + uint32_t day = date; + os << std::setfill('0') << std::setw(4) << year + << std::setw(1) << '-' + << std::setw(2) << month + << std::setw(1) << '-' + << std::setw(2) << day; + str = os.str(); + break; + } case MYSQL_TYPE_DATETIME: - { - uint64_t timestamp= val.as_int64(); - unsigned long d= timestamp / 1000000; - unsigned long t= timestamp % 1000000; + { + uint64_t timestamp = val.as_int64(); + unsigned long d = timestamp / 1000000; + unsigned long t = timestamp % 1000000; os << std::setfill('0') << std::setw(4) << d / 10000 << std::setw(1) << '-' @@ -373,40 +480,80 @@ void Converter::to(std::string &str, const Value &val) const << std::setw(2) << (t % 10000) / 100 << std::setw(1) << ':' << std::setw(2) << t % 100; - } + str = os.str(); break; + } case MYSQL_TYPE_TIME: + { + const char *storage = val.storage(); + uint32_t time = (storage[0] & 0xff) + ((storage[1] & 0xff) << 8) + ((storage[2] & 0xff) << 16); + uint32_t sec = time % 100; + time -= sec; + uint32_t min = (time % 10000) / 100; + uint32_t hour = (time - min) / 10000; + os << std::setfill('0') << std::setw(2) << hour + << std::setw(1) << ':' + << std::setw(2) << min + << std::setw(1) << ':' + << std::setw(2) << sec; + str = os.str(); + break; + } case MYSQL_TYPE_YEAR: + { + const char *storage = val.storage(); + uint32_t year = (storage[0] & 0xff); + if (year > 0) { + year += 1900; + } + os << std::setfill('0') << std::setw(4) << year; + str = os.str(); + break; + } case MYSQL_TYPE_NEWDATE: + { str = "not implemented"; - return; + break; + } case MYSQL_TYPE_VARCHAR: { unsigned long size; - char *ptr= val.as_c_str(size); - - str.assign(ptr, size); + str.append(ptr, size); + break; } - return; case MYSQL_TYPE_VAR_STRING: { - str.assign(val.storage(), val.length()); + str.append(val.storage(), val.length()); + break; } - return; case MYSQL_TYPE_STRING: { unsigned long size; char *ptr = val.as_c_str(size); - str.assign(ptr, size); + str.append(ptr, size); + break; } - return; case MYSQL_TYPE_BIT: case MYSQL_TYPE_NEWDECIMAL: + { + char buffer[100], result[100]; + int len = 100; + decimal_t decimal; + decimal.buf = (decimal_digit_t *)buffer; + decimal.len = sizeof(buffer)/sizeof(decimal_digit_t); + bin2decimal((const u_char *)val.storage(), &decimal, val.metadata() & 0xff, val.metadata() >> 8); + decimal2string(&decimal, result, &len, 0, 0, 0); + str.append(result, len); + break; + } case MYSQL_TYPE_ENUM: case MYSQL_TYPE_SET: - str = "not implemented"; - return ; + { + os << (val.length() == 1 ? static_cast(val.as_int8()) : val.as_int16()); + str = os.str(); + break; + } case MYSQL_TYPE_TINY_BLOB: case MYSQL_TYPE_MEDIUM_BLOB: case MYSQL_TYPE_LONG_BLOB: @@ -414,30 +561,28 @@ void Converter::to(std::string &str, const Value &val) const { unsigned long size; unsigned char *ptr= val.as_blob(size); - str.assign((const char *)ptr, size); + str.append((const char *)ptr, size); + break; } - return; case MYSQL_TYPE_GEOMETRY: default: str = "not implemented"; - return; + break; } - str = os.str(); } void Converter::to(float &out, const Value &val) const { switch(val.type()) { - case MYSQL_TYPE_FLOAT: - out= val.as_float(); - break; - default: - out= 0; + case MYSQL_TYPE_FLOAT: + out= val.as_float(); + break; + default: + out= 0; } } - void Converter::to(long long &out, const Value &val) const { switch(val.type()) @@ -498,10 +643,8 @@ void Converter::to(long long &out, const Value &val) const out= 0; break; case MYSQL_TYPE_ENUM: - out= 0; - break; - case MYSQL_TYPE_SET: - out= 0; + case MYSQL_TYPE_SET: + out = (val.length() == 1 ? static_cast(val.as_int8()) : val.as_int16()); break; case MYSQL_TYPE_TINY_BLOB: case MYSQL_TYPE_MEDIUM_BLOB: @@ -511,10 +654,10 @@ void Converter::to(long long &out, const Value &val) const break; case MYSQL_TYPE_VAR_STRING: { - std::string str; - str.append(val.storage(), val.length()); - std::istringstream is(str); - is >> out; + std::string str; + str.append(val.storage(), val.length()); + std::istringstream is(str); + is >> out; } break; case MYSQL_TYPE_STRING: @@ -589,10 +732,8 @@ void Converter::to(long &out, const Value &val) const out= 0; break; case MYSQL_TYPE_ENUM: - out= 0; - break; case MYSQL_TYPE_SET: - out= 0; + out = (val.length() == 1 ? static_cast(val.as_int8()) : val.as_int16()); break; case MYSQL_TYPE_TINY_BLOB: case MYSQL_TYPE_MEDIUM_BLOB: