From a3a90c16df7b1d1511f84b0aa24bee3a95487d49 Mon Sep 17 00:00:00 2001 From: George Niculae Date: Wed, 17 Aug 2016 16:24:28 +0300 Subject: [PATCH 01/23] Update README trigger build process --- README | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README b/README index faafd1ad768..c5ad3ad5451 100644 --- a/README +++ b/README @@ -1,5 +1,5 @@ -To install from source -======================= +To install sipxcom from source +============================== Step 1. Build makefile. From 6a59b4857d8def007522e3f22dcd154ed0faeedd Mon Sep 17 00:00:00 2001 From: George Niculae Date: Mon, 22 Aug 2016 12:27:09 +0300 Subject: [PATCH 02/23] Update README --- README | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README b/README index c5ad3ad5451..0790969137d 100644 --- a/README +++ b/README @@ -1,5 +1,5 @@ -To install sipxcom from source -============================== +To install sipxcom from source +=============================== Step 1. Build makefile. From 79a09f26b5037174bf4bd7ee1b125a8f6311d19e Mon Sep 17 00:00:00 2001 From: George Niculae Date: Mon, 22 Aug 2016 13:13:03 +0300 Subject: [PATCH 03/23] Update README --- README | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README b/README index 0790969137d..08341fde1ec 100644 --- a/README +++ b/README @@ -1,4 +1,4 @@ -To install sipxcom from source +To install sipxcom from source =============================== Step 1. Build makefile. From afa076c82d37f55258d225a415cb13f0d8727b0e Mon Sep 17 00:00:00 2001 From: George Niculae Date: Mon, 22 Aug 2016 13:20:02 +0300 Subject: [PATCH 04/23] Update README --- sipXcounterpath/README | 1 + 1 file changed, 1 insertion(+) diff --git a/sipXcounterpath/README b/sipXcounterpath/README index e69de29bb2d..cbcf4fbab0a 100644 --- a/sipXcounterpath/README +++ b/sipXcounterpath/README @@ -0,0 +1 @@ +counterpath support From 6d8560772a413dd87b05e315c046054de436303d Mon Sep 17 00:00:00 2001 From: George Niculae Date: Mon, 22 Aug 2016 13:47:04 +0300 Subject: [PATCH 05/23] Update README --- sipXcounterpath/README | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sipXcounterpath/README b/sipXcounterpath/README index cbcf4fbab0a..9d5c79d62b1 100644 --- a/sipXcounterpath/README +++ b/sipXcounterpath/README @@ -1 +1 @@ -counterpath support +#counterpath support From dd0c2826439e7cd466093d761f6b7562dce27d4d Mon Sep 17 00:00:00 2001 From: "martin.harcar" Date: Fri, 26 Aug 2016 10:45:44 +0200 Subject: [PATCH 06/23] UW-338 UC-4216 UW-371 UW-231 unitelite 16.08 --- sipXconfig/web/unitelite/index.html | 2 +- ...js => 140ade2372521985202c726b.scripts.js} | 20 +++++++++++-------- sipXconfig/web/unitelite/views/settings.html | 6 ++++++ 3 files changed, 19 insertions(+), 9 deletions(-) rename sipXconfig/web/unitelite/scripts/{136f5540ba8b86199d328694.scripts.js => 140ade2372521985202c726b.scripts.js} (99%) diff --git a/sipXconfig/web/unitelite/index.html b/sipXconfig/web/unitelite/index.html index 6c3ef44ca11..1b01e77f00f 100644 --- a/sipXconfig/web/unitelite/index.html +++ b/sipXconfig/web/unitelite/index.html @@ -1 +1 @@ -sipXcom User Portal
\ No newline at end of file +sipXcom User Portal
\ No newline at end of file diff --git a/sipXconfig/web/unitelite/scripts/136f5540ba8b86199d328694.scripts.js b/sipXconfig/web/unitelite/scripts/140ade2372521985202c726b.scripts.js similarity index 99% rename from sipXconfig/web/unitelite/scripts/136f5540ba8b86199d328694.scripts.js rename to sipXconfig/web/unitelite/scripts/140ade2372521985202c726b.scripts.js index 22382cd8df6..e88e23b5c5c 100644 --- a/sipXconfig/web/unitelite/scripts/136f5540ba8b86199d328694.scripts.js +++ b/sipXconfig/web/unitelite/scripts/140ade2372521985202c726b.scripts.js @@ -3777,13 +3777,14 @@ uw.service('restService', [ data: null, init: function () { + secondary.settings.errors.speedDials = null; restService.getSpeeddial().then(function (data) { - for( var i = 0; i < data.buttons.length; i++) - data.buttons[i].pattern = /.*/; - secondary.settings.speed.data = data; - }).catch(function () { - secondary.settings.errors.notAvailable = true; - }) + for( var i = 0; i < data.buttons.length; i++) + data.buttons[i].pattern = /(^\d+|^([*][0-9]+)|^([+][0-9]+)|(^(\w+)@([a-zA-Z0-9_.]+)))$/; + secondary.settings.speed.data = data; + }).catch(function () { + secondary.settings.errors.notAvailable = true; + }) }, removeEntry: function (index) { @@ -3795,7 +3796,7 @@ uw.service('restService', [ var obj = { number: '', label: '', - pattern: /(^\d+|(^(\w+)@([a-zA-Z0-9_.]+)))$/, + pattern: /(^\d+|^([*][0-9]+)|^([+][0-9]+)|(^(\w+)@([a-zA-Z0-9_.]+)))$/, blf: false } secondary.settings.speed.data.buttons.push(obj); @@ -3825,6 +3826,7 @@ uw.service('restService', [ formSpeed.$setPristine(); restService.getSpeeddial().then(function (data) { secondary.settings.loading = null; + secondary.settings.errors.speedDials = null; for( var i = 0; i < data.buttons.length; i++) data.buttons[i].pattern = /.*/; secondary.settings.speed.data = data; @@ -3836,6 +3838,7 @@ uw.service('restService', [ secondary.settings.warning = true; secondary.settings.loading = null; console.log(err); + secondary.settings.errors.speedDials = true; }) }, @@ -4138,7 +4141,8 @@ uw.service('restService', [ notAvailable: false, voicemail: false, confBridge: false, - myBuddy: false + myBuddy: false, + speedDials: false } }, diff --git a/sipXconfig/web/unitelite/views/settings.html b/sipXconfig/web/unitelite/views/settings.html index 4f0bb18dac5..870c2c5ef5b 100644 --- a/sipXconfig/web/unitelite/views/settings.html +++ b/sipXconfig/web/unitelite/views/settings.html @@ -637,6 +637,12 @@ +
+
+

Wrong Speed Dial typed

+ You can not subcribed to presence with invalid speed dial. +
+
From 5c29a865b9dbb2e2f5d7c31d5fd07ed38ed686d9 Mon Sep 17 00:00:00 2001 From: "martin.harcar" Date: Fri, 26 Aug 2016 11:45:34 +0200 Subject: [PATCH 07/23] UW-338 UC-4216 UW-371 UW-231 unitelite 16.08 fixed previous commit --- sipXconfig/web/unitelite/index.html | 2 +- ...5202c726b.scripts.js => 668494ad528c8b5f63df46c4.scripts.js} | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) rename sipXconfig/web/unitelite/scripts/{140ade2372521985202c726b.scripts.js => 668494ad528c8b5f63df46c4.scripts.js} (99%) diff --git a/sipXconfig/web/unitelite/index.html b/sipXconfig/web/unitelite/index.html index 1b01e77f00f..1d65e39ece5 100644 --- a/sipXconfig/web/unitelite/index.html +++ b/sipXconfig/web/unitelite/index.html @@ -1 +1 @@ -sipXcom User Portal
\ No newline at end of file +sipXcom User Portal
\ No newline at end of file diff --git a/sipXconfig/web/unitelite/scripts/140ade2372521985202c726b.scripts.js b/sipXconfig/web/unitelite/scripts/668494ad528c8b5f63df46c4.scripts.js similarity index 99% rename from sipXconfig/web/unitelite/scripts/140ade2372521985202c726b.scripts.js rename to sipXconfig/web/unitelite/scripts/668494ad528c8b5f63df46c4.scripts.js index e88e23b5c5c..fa4c683a7b7 100644 --- a/sipXconfig/web/unitelite/scripts/140ade2372521985202c726b.scripts.js +++ b/sipXconfig/web/unitelite/scripts/668494ad528c8b5f63df46c4.scripts.js @@ -3828,7 +3828,7 @@ uw.service('restService', [ secondary.settings.loading = null; secondary.settings.errors.speedDials = null; for( var i = 0; i < data.buttons.length; i++) - data.buttons[i].pattern = /.*/; + data.buttons[i].pattern = /(^\d+|^([*][0-9]+)|^([+][0-9]+)|(^(\w+)@([a-zA-Z0-9_.]+)))$/; secondary.settings.speed.data = data; }, function (err) { secondary.settings.loading = null; From c7d9b9c7c4990f24a1e3ddbb22c1484aa58f7e87 Mon Sep 17 00:00:00 2001 From: "martin.harcar" Date: Wed, 31 Aug 2016 07:55:43 +0200 Subject: [PATCH 08/23] UW-338 unitelite, changed warning message --- sipXconfig/web/unitelite/views/settings.html | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sipXconfig/web/unitelite/views/settings.html b/sipXconfig/web/unitelite/views/settings.html index 870c2c5ef5b..88ca8f5bdc7 100644 --- a/sipXconfig/web/unitelite/views/settings.html +++ b/sipXconfig/web/unitelite/views/settings.html @@ -639,8 +639,8 @@
-

Wrong Speed Dial typed

- You can not subcribed to presence with invalid speed dial. +

Invalid Speed Dial value entered.

+ You can not subscribe to presence with this speed dial value.
From 1d44023a561a2fce202c9f56e51ed5e7452b70b0 Mon Sep 17 00:00:00 2001 From: dizzy Date: Thu, 29 Sep 2016 10:29:02 +0300 Subject: [PATCH 09/23] UC-4189: barge in functionality broken --- freeswitch | 2 +- mak/freeswitch.sipxecs.mk | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/freeswitch b/freeswitch index 8b4fc8c3d9a..4134569616a 160000 --- a/freeswitch +++ b/freeswitch @@ -1 +1 @@ -Subproject commit 8b4fc8c3d9ad470a7e68c5a0115f74f9b3fd2cde +Subproject commit 4134569616aad09ec4ffdd5eb645ba5f10117334 diff --git a/mak/freeswitch.sipxecs.mk b/mak/freeswitch.sipxecs.mk index 37814c5f43c..6cd9e41da35 100644 --- a/mak/freeswitch.sipxecs.mk +++ b/mak/freeswitch.sipxecs.mk @@ -1,4 +1,4 @@ -freeswitch_VER = 1.4.20 +freeswitch_VER = 1.4.20.1 freeswitch_TAG = 1.4.4 freeswitch_PACKAGE_REVISION = $(shell cd $(SRC)/$(PROJ); ../config/revision-gen $(freeswitch_TAG)) freeswitch_SRPM = freeswitch-$(freeswitch_VER)-$(freeswitch_PACKAGE_REVISION).src.rpm From 2863e7252466f888bef09e9d4ac40c249b3f7820 Mon Sep 17 00:00:00 2001 From: Mircea Carasel Date: Wed, 3 Aug 2016 17:43:33 +0300 Subject: [PATCH 10/23] UC-4193: zen 6013 : StateQueueClient 100ms hard coded timeout -added tcp-timeout setting into etc/sipxpbx/sipxsqa-config default is 100 ms --- sipXconfig/etc/sipxpbx/sipxsqa/sipxsqa.properties | 3 +++ sipXconfig/etc/sipxpbx/sipxsqa/sipxsqa.xml | 6 ++++++ 2 files changed, 9 insertions(+) diff --git a/sipXconfig/etc/sipxpbx/sipxsqa/sipxsqa.properties b/sipXconfig/etc/sipxpbx/sipxsqa/sipxsqa.properties index 53275aa81c1..105c78288c2 100644 --- a/sipXconfig/etc/sipxpbx/sipxsqa/sipxsqa.properties +++ b/sipXconfig/etc/sipxpbx/sipxsqa/sipxsqa.properties @@ -2,6 +2,9 @@ sqa-config.label=Network Queue Configuration sqa-config.log-level.label=Logging Level sqa-config.sqa-control-port.label=Control Port sqa-config.zmq-subscription-port.label=Queue Port +sqa-config.tcp-timeout.label=TCP timeout +sqa-config.tcp-timeout.description=Value in milliseconds to drop the TCP connection + type.loglevel.0=EMERG type.loglevel.1=ALERT type.loglevel.2=CRIT diff --git a/sipXconfig/etc/sipxpbx/sipxsqa/sipxsqa.xml b/sipXconfig/etc/sipxpbx/sipxsqa/sipxsqa.xml index 2b5c43e6302..60df361ee00 100644 --- a/sipXconfig/etc/sipxpbx/sipxsqa/sipxsqa.xml +++ b/sipXconfig/etc/sipxpbx/sipxsqa/sipxsqa.xml @@ -47,5 +47,11 @@ 5242 + + + + + 100 + From b0d62b400cfc0c4706601714388b3550609b5ba5 Mon Sep 17 00:00:00 2001 From: Mircea Carasel Date: Thu, 4 Aug 2016 17:12:24 +0300 Subject: [PATCH 11/23] UC-4193: zen 6013 : StateQueueClient 100ms hard coded timeout -replicate tcp timeout in sipxsqa-client.ini too --- .../sipxconfig/networkqueue/NetworkQueueManagerImpl.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sipXconfig/neoconf/src/org/sipfoundry/sipxconfig/networkqueue/NetworkQueueManagerImpl.java b/sipXconfig/neoconf/src/org/sipfoundry/sipxconfig/networkqueue/NetworkQueueManagerImpl.java index b0175b3ace1..d0037d5fd71 100644 --- a/sipXconfig/neoconf/src/org/sipfoundry/sipxconfig/networkqueue/NetworkQueueManagerImpl.java +++ b/sipXconfig/neoconf/src/org/sipfoundry/sipxconfig/networkqueue/NetworkQueueManagerImpl.java @@ -72,7 +72,7 @@ public void replicate(ConfigManager manager, ConfigRequest request) throws IOExc Address queue = manager.getAddressManager().getSingleAddress(CONTROL_ADDRESS, location); Writer client = new FileWriter(new File(dir, "sipxsqa-client.ini")); try { - writeClientConfig(client, queue); + writeClientConfig(client, queue, settings); } finally { IOUtils.closeQuietly(client); } @@ -96,13 +96,14 @@ void writeServerConfig(Writer w, NetworkQueueSettings settings) throws IOExcepti config.writeSettings(settings.getSettings().getSetting("sqa-config")); } - void writeClientConfig(Writer w, Address queue) throws IOException { + void writeClientConfig(Writer w, Address queue, NetworkQueueSettings settings) throws IOException { KeyValueConfiguration config = KeyValueConfiguration.equalsSeparated(w); config.write("enabled", queue != null); if (queue != null) { config.write("sqa-control-port", queue.getCanonicalPort()); config.write("sqa-control-address", queue.getAddress()); } + config.writeSettings(settings.getSettings().getSetting("sqa-config/tcp-timeout")); } @Override From ad8363bf78c43be94efb4cb06b9a27d0212f2f34 Mon Sep 17 00:00:00 2001 From: Roman Romanchenko Date: Thu, 4 Aug 2016 15:15:10 +0300 Subject: [PATCH 12/23] UC-4193 StateQueueClient 100ms hard coded timeout - refactoring before actual fix: - BlockingTcpClient moved to separate source/include files - friendship of StateQueueClient and BlockingTcpClient removed to avoid interface confusion - some of the BlockingTcpClient methods made private --- sipXsqa/include/sqa/BlockingTcpClient.h | 160 +++++ sipXsqa/include/sqa/StateQueueClient.h | 742 ++++-------------------- sipXsqa/src/BlockingTcpClient.cpp | 583 +++++++++++++++++++ sipXsqa/src/Makefile.am | 1 + 4 files changed, 859 insertions(+), 627 deletions(-) create mode 100644 sipXsqa/include/sqa/BlockingTcpClient.h create mode 100644 sipXsqa/src/BlockingTcpClient.cpp diff --git a/sipXsqa/include/sqa/BlockingTcpClient.h b/sipXsqa/include/sqa/BlockingTcpClient.h new file mode 100644 index 00000000000..1a07106c53a --- /dev/null +++ b/sipXsqa/include/sqa/BlockingTcpClient.h @@ -0,0 +1,160 @@ +/* + * Copyright (c) eZuce, Inc. All rights reserved. + * Contributed to SIPfoundry under a Contributor Agreement + * + * This software is free software; you can redistribute it and/or modify it under + * the terms of the Affero General Public License (AGPL) as published by the + * Free Software Foundation; either version 3 of the License, or (at your option) + * any later version. + * + * This software is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more + * details. + */ + +#ifndef BlockingTcpClient_H +#define BlockingTcpClient_H + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "StateQueueMessage.h" +#include "BlockingQueue.h" +#include "BlockingTcpClient.h" + +#define SQA_LINGER_TIME_MILLIS 5000 +#define SQA_TERMINATE_STRING "__TERMINATE__" +#define SQA_CONN_MAX_READ_BUFF_SIZE 65536 +#define SQA_CONN_CONNECTION_TIMEOUT_MSEC 5000 +#define SQA_CONN_READ_TIMEOUT 1000 +#define SQA_CONN_WRITE_TIMEOUT 1000 +#define SQA_KEY_MIN 22172 +#define SQA_KEY_ALPHA 22180 +#define SQA_KEY_DEFAULT SQA_KEY_MIN +#define SQA_KEY_MAX 22200 +#define SQA_KEEP_ALIVE_TICKS 30 + +// Defines the interval, in seconds, to wait between keep alive loop calls +#define SQA_KEEP_ALIVE_LOOP_INTERVAL_SECS 1 + + class BlockingTcpClient + { +public: + + typedef boost::shared_ptr Ptr; + + BlockingTcpClient( + boost::asio::io_service& ioService, + int readTimeout = SQA_CONN_READ_TIMEOUT, + int writeTimeout = SQA_CONN_WRITE_TIMEOUT, + short key = SQA_KEY_DEFAULT); + + ~BlockingTcpClient(); + + bool connect(); + bool connect(const std::string& serviceAddress, const std::string& servicePort); + bool send(const StateQueueMessage& request); + bool receive(StateQueueMessage& response); + bool sendAndReceive(const StateQueueMessage& request, StateQueueMessage& response); + bool isConnected() const; + std::string getLocalAddress(); + const std::string &getServicePort() const; + const std::string &getServiceAddress() const; + +private: + void setReadTimeout(boost::asio::ip::tcp::socket& socket, int milliseconds); + void setWriteTimeout(boost::asio::ip::tcp::socket& socket, int milliseconds); + void startReadTimer(); + void startWriteTimer(); + void startConnectTimer(); + void cancelReadTimer(); + void cancelWriteTimer(); + void cancelConnectTimer(); + void onReadTimeout(const boost::system::error_code& e); + void onWriteTimeout(const boost::system::error_code& e); + void onConnectTimeout(const boost::system::error_code& e); + void close(); + bool timedWaitUntilDataAvailable(boost::function onTimeoutCb, int timeoutMs, short int requestedEvents); + bool timedWaitUntilReadDataAvailable(); + bool timedWaitUntilWriteDataAvailable(); + + unsigned long getNextReadSize(); + + +private: + class ConnectTimer + { + public: + ConnectTimer(BlockingTcpClient* pOwner) : + _pOwner(pOwner) + { + _pOwner->startConnectTimer(); + } + + ~ConnectTimer() + { + _pOwner->cancelConnectTimer(); + } + BlockingTcpClient* _pOwner; + }; + + class ReadTimer + { + public: + ReadTimer(BlockingTcpClient* pOwner) : + _pOwner(pOwner) + { + _pOwner->startReadTimer(); + } + + ~ReadTimer() + { + _pOwner->cancelReadTimer(); + } + BlockingTcpClient* _pOwner; + }; + + class WriteTimer + { + public: + WriteTimer(BlockingTcpClient* pOwner) : + _pOwner(pOwner) + { + _pOwner->startWriteTimer(); + } + + ~WriteTimer() + { + _pOwner->cancelWriteTimer(); + } + BlockingTcpClient* _pOwner; + }; + + const std::string& className(); + + boost::asio::io_service& _ioService; + boost::asio::ip::tcp::resolver _resolver; + boost::asio::ip::tcp::socket *_pSocket; + std::string _serviceAddress; + std::string _servicePort; + bool _isConnected; + int _readTimeout; + int _writeTimeout; + short _key; + boost::asio::deadline_timer _readTimer; + boost::asio::deadline_timer _writeTimer; + boost::asio::deadline_timer _connectTimer; + }; + +#endif /* BlockingTcpClient_H */ + diff --git a/sipXsqa/include/sqa/StateQueueClient.h b/sipXsqa/include/sqa/StateQueueClient.h index 7b4cc4ffc38..c4d76d5203a 100644 --- a/sipXsqa/include/sqa/StateQueueClient.h +++ b/sipXsqa/include/sqa/StateQueueClient.h @@ -14,7 +14,7 @@ */ #ifndef StateQueueClient_H -#define StateQueueClient_H +#define StateQueueClient_H #include #include @@ -30,17 +30,10 @@ #include "StateQueueMessage.h" #include "BlockingQueue.h" +#include "BlockingTcpClient.h" #define SQA_LINGER_TIME_MILLIS 5000 #define SQA_TERMINATE_STRING "__TERMINATE__" -#define SQA_CONN_MAX_READ_BUFF_SIZE 65536 -#define SQA_CONN_CONNECTION_TIMEOUT_MSEC 5000 -#define SQA_CONN_READ_TIMEOUT 1000 -#define SQA_CONN_WRITE_TIMEOUT 1000 -#define SQA_KEY_MIN 22172 -#define SQA_KEY_ALPHA 22180 -#define SQA_KEY_DEFAULT SQA_KEY_MIN -#define SQA_KEY_MAX 22200 #define SQA_KEEP_ALIVE_TICKS 30 // Defines the interval, in seconds, to wait between keep alive loop calls @@ -57,598 +50,6 @@ class StateQueueClient : public boost::enable_shared_from_this Watcher }; - class BlockingTcpClient - { - public: - typedef boost::shared_ptr Ptr; - - class ConnectTimer - { - public: - ConnectTimer(BlockingTcpClient* pOwner) : - _pOwner(pOwner) - { - _pOwner->startConnectTimer(); - } - - ~ConnectTimer() - { - _pOwner->cancelConnectTimer(); - } - BlockingTcpClient* _pOwner; - }; - - class ReadTimer - { - public: - ReadTimer(BlockingTcpClient* pOwner) : - _pOwner(pOwner) - { - _pOwner->startReadTimer(); - } - - ~ReadTimer() - { - _pOwner->cancelReadTimer(); - } - BlockingTcpClient* _pOwner; - }; - - class WriteTimer - { - public: - WriteTimer(BlockingTcpClient* pOwner) : - _pOwner(pOwner) - { - _pOwner->startWriteTimer(); - } - - ~WriteTimer() - { - _pOwner->cancelWriteTimer(); - } - BlockingTcpClient* _pOwner; - }; - - const std::string& className() - { - static const std::string className("StateQueueClient::BlockingTcpClient"); - - return className; - } - - BlockingTcpClient( - boost::asio::io_service& ioService, - int readTimeout = SQA_CONN_READ_TIMEOUT, - int writeTimeout = SQA_CONN_WRITE_TIMEOUT, - short key = SQA_KEY_DEFAULT) : - _ioService(ioService), - _resolver(_ioService), - _pSocket(0), - _isConnected(false), - _readTimeout(readTimeout), - _writeTimeout(writeTimeout), - _key(key), - _readTimer(_ioService), - _writeTimer(_ioService), - _connectTimer(_ioService) - { - } - - ~BlockingTcpClient() - { - if (_pSocket) - { - delete _pSocket; - _pSocket = 0; - } - } - - void setReadTimeout(boost::asio::ip::tcp::socket& socket, int milliseconds) - { - struct timeval tv; - tv.tv_sec = 0; - tv.tv_usec = milliseconds * 1000; - setsockopt(socket.native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); - } - - void setWriteTimeout(boost::asio::ip::tcp::socket& socket, int milliseconds) - { - struct timeval tv; - tv.tv_sec = 0; - tv.tv_usec = milliseconds * 1000; - setsockopt(socket.native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); - } - - void startReadTimer() - { - boost::system::error_code ec; - _readTimer.expires_from_now(boost::posix_time::milliseconds(_readTimeout), ec); - _readTimer.async_wait(boost::bind(&BlockingTcpClient::onReadTimeout, this, boost::asio::placeholders::error)); - } - - void startWriteTimer() - { - boost::system::error_code ec; - _writeTimer.expires_from_now(boost::posix_time::milliseconds(_writeTimeout), ec); - _writeTimer.async_wait(boost::bind(&BlockingTcpClient::onWriteTimeout, this, boost::asio::placeholders::error)); - } - - void startConnectTimer() - { - boost::system::error_code ec; - _connectTimer.expires_from_now(boost::posix_time::milliseconds(SQA_CONN_CONNECTION_TIMEOUT_MSEC), ec); - _connectTimer.async_wait(boost::bind(&BlockingTcpClient::onConnectTimeout, this, boost::asio::placeholders::error)); - } - - void cancelReadTimer() - { - boost::system::error_code ec; - _readTimer.cancel(ec); - } - - void cancelWriteTimer() - { - boost::system::error_code ec; - _writeTimer.cancel(ec); - } - - void cancelConnectTimer() - { - boost::system::error_code ec; - _connectTimer.cancel(ec); - } - - void onReadTimeout(const boost::system::error_code& e) - { - if (e) - return; - close(); - OS_LOG_ERROR(FAC_NET, CLASS_INFO() "- " << _readTimeout << " milliseconds."); - } - - - void onWriteTimeout(const boost::system::error_code& e) - { - if (e) - return; - close(); - OS_LOG_ERROR(FAC_NET, CLASS_INFO() "- " << _writeTimeout << " milliseconds."); - } - - void onConnectTimeout(const boost::system::error_code& e) - { - if (e) - return; - close(); - OS_LOG_ERROR(FAC_NET, CLASS_INFO() "- " << SQA_CONN_CONNECTION_TIMEOUT_MSEC << " milliseconds."); - } - - void close() - { - if (_pSocket) - { - boost::system::error_code ignored_ec; - _pSocket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignored_ec); - _pSocket->close(ignored_ec); - _isConnected = false; - OS_LOG_INFO(FAC_NET, CLASS_INFO() "- socket deleted."); - } - } - - bool timedWaitUntilDataAvailable(boost::function onTimeoutCb, - int timeoutMs, - short int requestedEvents) - { - int error = 0; - bool ret = false; - int nativeSocket = _pSocket->native(); - - struct pollfd fds[1] = {{nativeSocket, requestedEvents, 0}}; - - - int pollResult = poll(fds, sizeof(fds) / sizeof(fds[0]), timeoutMs); - if (1 == pollResult) - { - if (fds[0].revents & POLLERR) - { - error = errno; - } - else if (fds[0].revents & requestedEvents) - { - ret = true; - } - else - { - OS_LOG_ERROR(FAC_NET, CLASS_INFO() - << "unexpected return from poll(): pollResult = " << pollResult - << ", fds[0].revents =" << fds[0].revents); - } - } - else if(0 == pollResult) - { // timeout - const boost::system::error_code e; - - onTimeoutCb(e); - error = ETIMEDOUT; - } - else - { - error = errno; - } - - if (0 != error) - { - OS_LOG_ERROR(FAC_NET, CLASS_INFO() - << "(" << nativeSocket << ", " << timeoutMs << " ms) error: " << - error << "=" << strerror(error)); - } - - return ret; - } - - bool timedWaitUntilReadDataAvailable() - { - // check for normal or out-of-band - return timedWaitUntilDataAvailable(boost::bind(&BlockingTcpClient::onReadTimeout, this, _1), - _readTimeout, - POLLIN | POLLPRI); - } - - bool timedWaitUntilWriteDataAvailable() - { - return timedWaitUntilDataAvailable(boost::bind(&BlockingTcpClient::onWriteTimeout, this, _1), - _writeTimeout, - POLLOUT); - } - - bool connect(const std::string& serviceAddress, const std::string& servicePort) - { - // - // Close the previous socket; - // - close(); - - if (_pSocket) - { - delete _pSocket; - _pSocket = 0; - } - - _pSocket = new boost::asio::ip::tcp::socket(_ioService); - - OS_LOG_INFO(FAC_NET, CLASS_INFO() "creating new connection to " << serviceAddress << ":" << servicePort); - - _serviceAddress = serviceAddress; - _servicePort = servicePort; - - try - { - boost::asio::ip::tcp::resolver::query query(boost::asio::ip::tcp::v4(), serviceAddress.c_str(), servicePort.c_str()); - boost::asio::ip::tcp::resolver::iterator hosts = _resolver.resolve(query); - - ConnectTimer timer(this); - - // this flag may be reset by ConnectTimer's timer during connect() call - _isConnected = true; - - ////////////////////////////////////////////////////////////////////////// - // Only works in 1.47 version of asio. 1.46 doesnt have this utility func - // boost::asio::connect(*_pSocket, hosts); - _pSocket->connect(hosts->endpoint()); // so we use the connect member - ////////////////////////////////////////////////////////////////////////// - OS_LOG_INFO(FAC_NET, CLASS_INFO() "creating new connection to " << serviceAddress << ":" << servicePort << " SUCESSFUL."); - } - catch(std::exception &e) - { - OS_LOG_ERROR(FAC_NET, CLASS_INFO() "failed with error " << e.what()); - _isConnected = false; - } - - return _isConnected; - } - - - bool connect() - { - // - // Initialize State Queue Agent Publisher if an address is provided - // - //if (_serviceAddress.empty() || _servicePort.empty()) - std::string sqaControlAddress; - std::string sqaControlPort; - std::ostringstream sqaconfig; - sqaconfig << SIPX_CONFDIR << "/" << "sipxsqa-client.ini"; - OsServiceOptions configOptions(sqaconfig.str()); - std::string controlAddress; - std::string controlPort; - { - if (configOptions.parseOptions()) - { - bool enabled = false; - if (configOptions.getOption("enabled", enabled, enabled) && enabled) - { - configOptions.getOption("sqa-control-address", _serviceAddress); - configOptions.getOption("sqa-control-port", _servicePort); - } - else - { - OS_LOG_ERROR(FAC_NET, "BlockingTcpClient::connect() this:" << this << " Unable to read connection information from " << sqaconfig.str()); - return false; - } - } - } - - if(_serviceAddress.empty() || _servicePort.empty()) - { - OS_LOG_ERROR(FAC_NET, "BlockingTcpClient::connect() this:" << this << " remote address is not set"); - return false; - } - - return connect(_serviceAddress, _servicePort); - } - - bool send(const StateQueueMessage& request) - { - assert(_pSocket); - std::string data = request.data(); - - if (data.size() > SQA_CONN_MAX_READ_BUFF_SIZE - 1) /// Account for the terminating char "_" - { - OS_LOG_ERROR(FAC_NET, CLASS_INFO() "data size: " << data.size() << " maximum buffer length of " << SQA_CONN_MAX_READ_BUFF_SIZE - 1); - return false; - } - - short version = 1; - unsigned long len = (unsigned long)data.size() + 1; /// Account for the terminating char "_" - std::stringstream strm; - strm.write((char*)(&version), sizeof(version)); - strm.write((char*)(&_key), sizeof(_key)); - strm.write((char*)(&len), sizeof(len)); - strm << data << "_"; - std::string packet = strm.str(); - boost::system::error_code ec; - bool ok = false; - - { - if (false == timedWaitUntilWriteDataAvailable()) - { - OS_LOG_ERROR(FAC_NET, CLASS_INFO() - << "timedWaitUntilWriteDataAvailable failed: " - << "Unable to send request"); - - _isConnected = false; - return false; - } - - //ok = boost::asio::write(*_pSocket, boost::asio::buffer(packet.c_str(), packet.size()), boost::asio::transfer_all(), ec) > 0; - ok = _pSocket->write_some(boost::asio::buffer(packet.c_str(), packet.size()), ec) > 0; - } - - if (!ok || ec) - { - OS_LOG_ERROR(FAC_NET, CLASS_INFO() "write_some error: " << ec.message()); - _isConnected = false; - return false; - } - return true; - } - - bool receive(StateQueueMessage& response) - { - assert(_pSocket); - unsigned long len = getNextReadSize(); - if (!len) - { - OS_LOG_INFO(FAC_NET, CLASS_INFO() "next read size is empty."); - return false; - } - - char responseBuff[len]; - boost::system::error_code ec; - { - if (false == timedWaitUntilReadDataAvailable()) - { - OS_LOG_ERROR(FAC_NET, CLASS_INFO() - << "timedWaitUntilReadDataAvailable failed: " - << "Unable to receive response"); - - _isConnected = false; - return false; - } - - _pSocket->read_some(boost::asio::buffer((char*)responseBuff, len), ec); - } - - if (ec) - { - if (boost::asio::error::eof == ec) - { - OS_LOG_INFO(FAC_NET, CLASS_INFO() "remote closed the connection, read_some error: " << ec.message()); - } - else - { - OS_LOG_ERROR(FAC_NET, CLASS_INFO() "read_some error: " << ec.message()); - } - - _isConnected = false; - return false; - } - std::string responseData(responseBuff, len); - return response.parseData(responseData); - } - - bool sendAndReceive(const StateQueueMessage& request, StateQueueMessage& response) - { - if (send(request)) - return receive(response); - return false; - } - - unsigned long getNextReadSize() - { - short version = 1; - bool hasVersion = false; - bool hasKey = false; - unsigned long remoteLen = 0; - while (!hasVersion || !hasKey) - { - short remoteVersion; - short remoteKey; - - //TODO: Refactor the code below to do one read for the three fields - // - // Read the version (must be 1) - // - while (true) - { - - boost::system::error_code ec; - if (false == timedWaitUntilReadDataAvailable()) - { - OS_LOG_ERROR(FAC_NET, CLASS_INFO() - << "timedWaitUntilReadDataAvailable failed: " - << "Unable to read version"); - - _isConnected = false; - return 0; - } - - _pSocket->read_some(boost::asio::buffer((char*)&remoteVersion, sizeof(remoteVersion)), ec); - if (ec) - { - if (boost::asio::error::eof == ec) - { - OS_LOG_ERROR(FAC_NET, CLASS_INFO() "remote closed the connection, read_some error: " << ec.message()); - } - else - { - OS_LOG_INFO(FAC_NET, CLASS_INFO() - << "Unable to read version " - << "ERROR: " << ec.message()); - } - - _isConnected = false; - return 0; - } - else - { - if (remoteVersion == version) - { - hasVersion = true; - break; - } - } - } - - while (true) - { - - boost::system::error_code ec; - if (false == timedWaitUntilReadDataAvailable()) - { - OS_LOG_ERROR(FAC_NET, CLASS_INFO() - << "timedWaitUntilReadDataAvailable failed: " - << "Unable to read secret key"); - - _isConnected = false; - return 0; - } - - _pSocket->read_some(boost::asio::buffer((char*)&remoteKey, sizeof(remoteKey)), ec); - if (ec) - { - if (boost::asio::error::eof == ec) - { - OS_LOG_ERROR(FAC_NET, CLASS_INFO() "remote closed the connection, read_some error: " << ec.message()); - } - else - { - OS_LOG_INFO(FAC_NET, CLASS_INFO() - << "Unable to read secret key " - << "ERROR: " << ec.message()); - } - - _isConnected = false; - return 0; - } - else - { - if (remoteKey >= SQA_KEY_MIN && remoteKey <= SQA_KEY_MAX) - { - hasKey = true; - break; - } - } - } - } - - boost::system::error_code ec; - if (false == timedWaitUntilReadDataAvailable()) - { - OS_LOG_ERROR(FAC_NET, CLASS_INFO() - << "timedWaitUntilReadDataAvailable failed: " - << "Unable to read secret packet length"); - - _isConnected = false; - return 0; - } - - _pSocket->read_some(boost::asio::buffer((char*)&remoteLen, sizeof(remoteLen)), ec); - if (ec) - { - if (boost::asio::error::eof == ec) - { - OS_LOG_ERROR(FAC_NET, CLASS_INFO() "remote closed the connection, read_some error: " << ec.message()); - } - else - { - OS_LOG_INFO(FAC_NET, CLASS_INFO() - << "Unable to read secret packet length " - << "ERROR: " << ec.message()); - } - - _isConnected = false; - return 0; - } - - return remoteLen; - } - - bool isConnected() const - { - return _isConnected; - } - - std::string getLocalAddress() - { - try - { - if (!_pSocket) - return ""; - return _pSocket->local_endpoint().address().to_string(); - } - catch(...) - { - return ""; - } - } - private: - boost::asio::io_service& _ioService; - boost::asio::ip::tcp::resolver _resolver; - boost::asio::ip::tcp::socket *_pSocket; - std::string _serviceAddress; - std::string _servicePort; - bool _isConnected; - int _readTimeout; - int _writeTimeout; - short _key; - boost::asio::deadline_timer _readTimer; - boost::asio::deadline_timer _writeTimer; - boost::asio::deadline_timer _connectTimer; - friend class StateQueueClient; - }; - protected: typedef BlockingQueue ClientPool; typedef BlockingQueue EventQueue; @@ -685,7 +86,6 @@ class StateQueueClient : public boost::enable_shared_from_this const std::string& className() { static const std::string className("StateQueueClient"); - return className; } @@ -725,7 +125,6 @@ class StateQueueClient : public boost::enable_shared_from_this _currentKeepAliveTicks(keepAliveTicks), _isAlive(true) { - if (_type != Publisher) { createZmqSocket(); @@ -739,21 +138,25 @@ class StateQueueClient : public boost::enable_shared_from_this pClient->connect(_serviceAddress, _servicePort); if (_localAddress.empty()) + { _localAddress = pClient->getLocalAddress(); + } BlockingTcpClient::Ptr client(pClient); _clientPointers.push_back(client); _clientPool.enqueue(client); } - _pKeepAliveThread.reset( - new boost::thread( - boost::bind(&StateQueueClient::keepAliveThreadRun, this))); + _pKeepAliveThread.reset(new boost::thread( boost::bind(&StateQueueClient::keepAliveThreadRun, this))); if (_type == Watcher) + { _zmqEventId = "sqw."; + } else + { _zmqEventId = "sqa."; + } _zmqEventId += zmqEventId; @@ -824,9 +227,7 @@ class StateQueueClient : public boost::enable_shared_from_this setServiceAddressAndPort(); - _pKeepAliveThread.reset( - new boost::thread( - boost::bind(&StateQueueClient::keepAliveThreadRun, this))); + _pKeepAliveThread.reset(new boost::thread(boost::bind(&StateQueueClient::keepAliveThreadRun, this))); if (_type == Watcher) _zmqEventId = "sqw."; @@ -861,8 +262,8 @@ class StateQueueClient : public boost::enable_shared_from_this } BlockingTcpClient::Ptr pClient = *_clientPointers.begin(); - _serviceAddress = pClient->_serviceAddress; - _servicePort = pClient->_servicePort; + _serviceAddress = pClient->getServiceAddress(); + _servicePort = pClient->getServicePort(); } void createZmqSocket() @@ -1032,20 +433,17 @@ class StateQueueClient : public boost::enable_shared_from_this void keepAliveThreadRun() // Runs the keep alive loop at 1 secs interval { - OS_LOG_INFO(FAC_SIP, CLASS_INFO() - << "starting"); + OS_LOG_INFO(FAC_SIP, CLASS_INFO() << "starting"); while (!_terminate) { // sleep the current thread boost::this_thread::sleep(boost::posix_time::seconds(SQA_KEEP_ALIVE_LOOP_INTERVAL_SECS)); - keepAliveLoop(); } - OS_LOG_INFO(FAC_SIP, CLASS_INFO() - << "exiting"); + OS_LOG_INFO(FAC_SIP, CLASS_INFO() << "exiting"); } private: @@ -1054,16 +452,19 @@ class StateQueueClient : public boost::enable_shared_from_this assert(_type != Publisher); OS_LOG_INFO(FAC_NET, CLASS_INFO() "eventId=" << eventId << " address=" << sqaAddress); + try { _zmqSocket->connect(sqaAddress.c_str()); _zmqSocket->setsockopt(ZMQ_SUBSCRIBE, eventId.c_str(), eventId.size()); - }catch(std::exception e) + } + catch(std::exception e) { OS_LOG_INFO(FAC_NET, CLASS_INFO() "eventId=" << eventId << " address=" << sqaAddress << " FAILED! Error: " << e.what()); return false; } + return true; } @@ -1105,8 +506,11 @@ class StateQueueClient : public boost::enable_shared_from_this StateQueueMessage response; + if (!sendAndReceive(request, response)) + { return false; + } bool ok = response.get("message-data", publisherAddress); @@ -1152,7 +556,7 @@ class StateQueueClient : public boost::enable_shared_from_this request.set("service-type", "watcher"); clientType = "watcher"; } - + StateQueueMessage response; return sendAndReceive(request, response); } @@ -1202,14 +606,14 @@ class StateQueueClient : public boost::enable_shared_from_this while (!_terminate) { bool lastReadFailed = false; - // Although ZeroMQ assures that a sub socket will reconnect to publisher automatically, + // Although ZeroMQ assures that a sub socket will reconnect to publisher automatically, // this seems to be not a 100% certainty. There are cases when publisher disappears, // it takes 8 minutes for the client to receive events again. // // There is also the case of socket reads where a cann to recv is assured to only return when // it receives the complete packet. We will not treat this at 100% certainty and treat // a read failure as a trigger for resubscribe. - + if (false == _isAlive || lastReadFailed) { OS_LOG_ERROR(FAC_NET, "StateQueueClient::eventLoop connection is not alive, closing socket, resubscribe"); @@ -1221,17 +625,20 @@ class StateQueueClient : public boost::enable_shared_from_this // // This will block until it succeeds or _terminated flag is set // So there is no danger of a runaway loop here - // + // subscribeForEvents(); } std::string id; std::string data; int count = 0; + if (readEvent(id, data, count)) { if (_terminate) + { break; + } OS_LOG_INFO(FAC_NET, CLASS_INFO() "received event: " << id); OS_LOG_DEBUG(FAC_NET, CLASS_INFO() "received data: " << std::endl << data); @@ -1240,7 +647,8 @@ class StateQueueClient : public boost::enable_shared_from_this { OS_LOG_DEBUG(FAC_NET, CLASS_INFO() "popping data: " << id); do_pop(firstHit, count, id, data); - }else if (_type == Watcher) + } + else if (_type == Watcher) { OS_LOG_DEBUG(FAC_NET, CLASS_INFO() "watching data: " << id); do_watch(firstHit, count, id, data); @@ -1249,11 +657,13 @@ class StateQueueClient : public boost::enable_shared_from_this else { lastReadFailed = true; + if (_terminate) { break; } } + firstHit = false; } @@ -1301,6 +711,7 @@ class StateQueueClient : public boost::enable_shared_from_this _backoffCount++; // this will ensure that we participate next time boost::this_thread::yield(); } + // // Check if we are in the exlude list // @@ -1313,6 +724,7 @@ class StateQueueClient : public boost::enable_shared_from_this OS_LOG_DEBUG(FAC_NET, CLASS_INFO() "do_pop is not allowed to pop " << id); return; } + // // Pop it // @@ -1326,6 +738,7 @@ class StateQueueClient : public boost::enable_shared_from_this << " Popping event " << id); StateQueueMessage popResponse; + if (!sendAndReceive(pop, popResponse)) { OS_LOG_ERROR(FAC_NET, CLASS_INFO() "do_pop unable to send pop command for event " << id); @@ -1338,6 +751,7 @@ class StateQueueClient : public boost::enable_shared_from_this // std::string messageResponse; popResponse.get("message-response", messageResponse); + if (messageResponse != "ok") { std::string messageResponseError; @@ -1359,7 +773,7 @@ class StateQueueClient : public boost::enable_shared_from_this _backoffCount = 0; } } - + void do_watch(bool firstHit, int count, const std::string& id, const std::string& data) { OS_LOG_DEBUG(FAC_NET, CLASS_INFO() "Received watcher data " << id); @@ -1421,6 +835,7 @@ class StateQueueClient : public boost::enable_shared_from_this { free (data); } + // Convert string to 0MQ string and send to socket static bool zmq_send (zmq::socket_t & socket, const std::string & data) { @@ -1445,8 +860,12 @@ class StateQueueClient : public boost::enable_shared_from_this { zmq::message_t message; socket->recv(&message); + if (!message.size()) + { return false; + } + value = std::string(static_cast(message.data()), message.size()); return true; } @@ -1493,6 +912,7 @@ class StateQueueClient : public boost::enable_shared_from_this } BlockingTcpClient::Ptr conn; + if (!_clientPool.dequeue(conn)) { OS_LOG_ERROR(FAC_NET, CLASS_INFO() "Unable to retrieve a TCP connection for pool."); @@ -1530,10 +950,16 @@ class StateQueueClient : public boost::enable_shared_from_this // Enqueue it // StateQueueMessage enqueueRequest; + if (!publish) + { enqueueRequest.setType(StateQueueMessage::Enqueue); + } else + { enqueueRequest.setType(StateQueueMessage::EnqueueAndPublish); + } + enqueueRequest.set("message-id", id.c_str()); enqueueRequest.set("message-app-id", _applicationId.c_str()); enqueueRequest.set("message-expires", _expires); @@ -1551,6 +977,7 @@ class StateQueueClient : public boost::enable_shared_from_this // std::string messageResponse; enqueueResponse.get("message-response", messageResponse); + if (messageResponse != "ok") { std::string messageResponseError; @@ -1586,14 +1013,18 @@ class StateQueueClient : public boost::enable_shared_from_this else { StateQueueMessage enqueueResponse; + if (!sendAndReceive(enqueueRequest, enqueueResponse)) + { return false; + } // // Check if Queue is successful // std::string messageResponse; enqueueResponse.get("message-response", messageResponse); + if (messageResponse != "ok") { std::string messageResponseError; @@ -1622,8 +1053,11 @@ class StateQueueClient : public boost::enable_shared_from_this OS_LOG_INFO(FAC_NET, CLASS_INFO() "publishing data ID=" << id); StateQueueMessage enqueueResponse; + if (!sendAndReceive(enqueueRequest, enqueueResponse)) + { return false; + } // // Check if Queue is successful @@ -1645,12 +1079,16 @@ class StateQueueClient : public boost::enable_shared_from_this public: - + bool pop(std::string& id, std::string& data) { StateQueueMessage message; + if (!pop(message)) + { return false; + } + return message.get("message-id", id) && message.get("message-data", data); } @@ -1658,21 +1096,28 @@ class StateQueueClient : public boost::enable_shared_from_this { OS_LOG_INFO(FAC_NET, CLASS_INFO() "(" << id << ") INVOKED" ); StateQueueMessage message; + if (!pop(message)) + { return false; + } + return message.get("message-id", id) && message.get("message-data", data); } bool enqueue(const std::string& data, int expires = 30, bool publish = false) { if (_type != Publisher) + { return false; + } std::ostringstream ss; ss << _zmqEventId << "." << std::hex << std::uppercase << std::setw(4) << std::setfill('0') << (int) ((float) (0x10000) * random () / (RAND_MAX + 1.0)) << "-" << std::setw(4) << std::setfill('0') << (int) ((float) (0x10000) * random () / (RAND_MAX + 1.0)); + return enqueue(ss.str(), data, expires, publish); } @@ -1680,13 +1125,16 @@ class StateQueueClient : public boost::enable_shared_from_this bool publish(const std::string& eventId, const std::string& data, bool noresponse) { if (_type != Publisher) + { return false; + } std::ostringstream ss; ss << "sqw." << eventId << "." << std::hex << std::uppercase << std::setw(4) << std::setfill('0') << (int) ((float) (0x10000) * random () / (RAND_MAX + 1.0)) << "-" << std::setw(4) << std::setfill('0') << (int) ((float) (0x10000) * random () / (RAND_MAX + 1.0)); + return internal_publish(ss.str(), data, noresponse); } @@ -1699,11 +1147,13 @@ class StateQueueClient : public boost::enable_shared_from_this bool publishAndPersist(int workspace, const std::string& eventId, const std::string& data, int expires) { if (_type != Publisher) + { return false; + } std::ostringstream ss; ss << "sqw." << eventId; - + return internal_publish_and_persist(workspace, ss.str(), data, expires); } @@ -1716,11 +1166,15 @@ class StateQueueClient : public boost::enable_shared_from_this request.set("message-app-id", _applicationId.c_str()); StateQueueMessage response; + if (!sendAndReceive(request, response)) + { return false; + } std::string messageResponse; response.get("message-response", messageResponse); + if (messageResponse != "ok") { std::string messageResponseError; @@ -1730,6 +1184,7 @@ class StateQueueClient : public boost::enable_shared_from_this << " Error: " << messageResponseError); return false; } + OS_LOG_ERROR(FAC_NET, CLASS_INFO() << "Successfully erased " << id); return true; @@ -1754,11 +1209,15 @@ class StateQueueClient : public boost::enable_shared_from_this request.set("message-data-id", dataId.c_str()); StateQueueMessage response; + if (!sendAndReceive(request, response)) + { return false; + } std::string messageResponse; response.get("message-response", messageResponse); + if (messageResponse != "ok") { std::string messageResponseError; @@ -1790,11 +1249,15 @@ class StateQueueClient : public boost::enable_shared_from_this request.set("message-data", data); StateQueueMessage response; + if (!sendAndReceive(request, response)) + { return false; + } std::string messageResponse; response.get("message-response", messageResponse); + if (messageResponse != "ok") { std::string messageResponseError; @@ -1827,11 +1290,15 @@ class StateQueueClient : public boost::enable_shared_from_this request.set("message-data", data); StateQueueMessage response; + if (!sendAndReceive(request, response)) + { return false; + } std::string messageResponse; response.get("message-response", messageResponse); + if (messageResponse != "ok") { std::string messageResponseError; @@ -1863,11 +1330,15 @@ class StateQueueClient : public boost::enable_shared_from_this request.set("message-data-id", dataId.c_str()); StateQueueMessage response; + if (!sendAndReceive(request, response)) + { return false; + } std::string messageResponse; response.get("message-response", messageResponse); + if (messageResponse != "ok") { std::string messageResponseError; @@ -1900,11 +1371,15 @@ class StateQueueClient : public boost::enable_shared_from_this request.set("message-data-id", dataId.c_str()); StateQueueMessage response; + if (!sendAndReceive(request, response)) + { return false; + } std::string messageResponse; response.get("message-response", messageResponse); + if (messageResponse != "ok") { std::string messageResponseError; @@ -1936,11 +1411,15 @@ class StateQueueClient : public boost::enable_shared_from_this request.set("message-map-id", mapId.c_str()); StateQueueMessage response; + if (!sendAndReceive(request, response)) + { return false; + } std::string messageResponse; response.get("message-response", messageResponse); + if (messageResponse != "ok") { std::string messageResponseError; @@ -1955,8 +1434,11 @@ class StateQueueClient : public boost::enable_shared_from_this response.get("message-data", data); StateQueueMessage message; + if (!message.parseData(data)) + { return false; + } return message.getMap(smap); } @@ -1980,8 +1462,11 @@ class StateQueueClient : public boost::enable_shared_from_this request.set("message-data-id", dataId.c_str()); StateQueueMessage response; + if (!sendAndReceive(request, response)) + { return false; + } std::string messageResponse; response.get("message-response", messageResponse); @@ -2016,11 +1501,15 @@ class StateQueueClient : public boost::enable_shared_from_this request.set("workspace", workspace); StateQueueMessage response; + if (!sendAndReceive(request, response)) + { return false; + } std::string messageResponse; response.get("message-response", messageResponse); + if (messageResponse != "ok") { std::string messageResponseError; @@ -2035,6 +1524,5 @@ class StateQueueClient : public boost::enable_shared_from_this } }; - -#endif /* StateQueueClient_H */ +#endif /* StateQueueClient_H */ diff --git a/sipXsqa/src/BlockingTcpClient.cpp b/sipXsqa/src/BlockingTcpClient.cpp new file mode 100644 index 00000000000..420631d2114 --- /dev/null +++ b/sipXsqa/src/BlockingTcpClient.cpp @@ -0,0 +1,583 @@ +/* + * Copyright (c) eZuce, Inc. All rights reserved. + * Contributed to SIPfoundry under a Contributor Agreement + * + * This software is free software; you can redistribute it and/or modify it under + * the terms of the Affero General Public License (AGPL) as published by the + * Free Software Foundation; either version 3 of the License, or (at your option) + * any later version. + * + * This software is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS + * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more + * details. + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "sqa/StateQueueMessage.h" +#include "sqa/BlockingTcpClient.h" + +#define SQA_CONN_MAX_READ_BUFF_SIZE 65536 +#define SQA_CONN_CONNECTION_TIMEOUT_MSEC 5000 +#define SQA_CONN_READ_TIMEOUT 1000 +#define SQA_CONN_WRITE_TIMEOUT 1000 +#define SQA_KEY_MIN 22172 +#define SQA_KEY_ALPHA 22180 +#define SQA_KEY_DEFAULT SQA_KEY_MIN +#define SQA_KEY_MAX 22200 + +const std::string& BlockingTcpClient::className() +{ + static const std::string className("StateQueueClient::BlockingTcpClient"); + + return className; +} + +const std::string & BlockingTcpClient::getServicePort() const +{ + return _servicePort; +} + +const std::string & BlockingTcpClient::getServiceAddress() const +{ + return _serviceAddress; +} + +BlockingTcpClient::BlockingTcpClient( + boost::asio::io_service& ioService, + int readTimeout, + int writeTimeout, + short key) : + _ioService(ioService), + _resolver(_ioService), + _pSocket(0), + _isConnected(false), + _readTimeout(readTimeout), + _writeTimeout(writeTimeout), + _key(key), + _readTimer(_ioService), + _writeTimer(_ioService), + _connectTimer(_ioService) +{ +} + +BlockingTcpClient::~BlockingTcpClient() +{ + if (_pSocket) + { + delete _pSocket; + _pSocket = 0; + } +} + +void BlockingTcpClient::setReadTimeout(boost::asio::ip::tcp::socket& socket, int milliseconds) +{ + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = milliseconds * 1000; + setsockopt(socket.native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); +} + +void BlockingTcpClient::setWriteTimeout(boost::asio::ip::tcp::socket& socket, int milliseconds) +{ + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = milliseconds * 1000; + setsockopt(socket.native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); +} + +void BlockingTcpClient::startReadTimer() +{ + boost::system::error_code ec; + _readTimer.expires_from_now(boost::posix_time::milliseconds(_readTimeout), ec); + _readTimer.async_wait(boost::bind(&BlockingTcpClient::onReadTimeout, this, boost::asio::placeholders::error)); +} + +void BlockingTcpClient::startWriteTimer() +{ + boost::system::error_code ec; + _writeTimer.expires_from_now(boost::posix_time::milliseconds(_writeTimeout), ec); + _writeTimer.async_wait(boost::bind(&BlockingTcpClient::onWriteTimeout, this, boost::asio::placeholders::error)); +} + +void BlockingTcpClient::startConnectTimer() +{ + boost::system::error_code ec; + _connectTimer.expires_from_now(boost::posix_time::milliseconds(SQA_CONN_CONNECTION_TIMEOUT_MSEC), ec); + _connectTimer.async_wait(boost::bind(&BlockingTcpClient::onConnectTimeout, this, boost::asio::placeholders::error)); +} + +void BlockingTcpClient::cancelReadTimer() +{ + boost::system::error_code ec; + _readTimer.cancel(ec); +} + +void BlockingTcpClient::cancelWriteTimer() +{ + boost::system::error_code ec; + _writeTimer.cancel(ec); +} + +void BlockingTcpClient::cancelConnectTimer() +{ + boost::system::error_code ec; + _connectTimer.cancel(ec); +} + +void BlockingTcpClient::onReadTimeout(const boost::system::error_code& e) +{ + if (e) + { + return; + } + + close(); + OS_LOG_ERROR(FAC_NET, CLASS_INFO() "- " << _readTimeout << " milliseconds."); +} + + +void BlockingTcpClient::onWriteTimeout(const boost::system::error_code& e) +{ + if (e) + { + return; + } + + close(); + OS_LOG_ERROR(FAC_NET, CLASS_INFO() "- " << _writeTimeout << " milliseconds."); +} + +void BlockingTcpClient::onConnectTimeout(const boost::system::error_code& e) +{ + if (e) + { + return; + } + + close(); + OS_LOG_ERROR(FAC_NET, CLASS_INFO() "- " << SQA_CONN_CONNECTION_TIMEOUT_MSEC << " milliseconds."); +} + +void BlockingTcpClient::close() +{ + if (_pSocket) + { + boost::system::error_code ignored_ec; + _pSocket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignored_ec); + _pSocket->close(ignored_ec); + _isConnected = false; + OS_LOG_INFO(FAC_NET, CLASS_INFO() "- socket deleted."); + } +} + +bool BlockingTcpClient::timedWaitUntilDataAvailable(boost::function onTimeoutCb, + int timeoutMs, + short int requestedEvents) +{ + int error = 0; + bool ret = false; + int nativeSocket = _pSocket->native(); + + struct pollfd fds[1] = {{nativeSocket, requestedEvents, 0}}; + + + int pollResult = poll(fds, sizeof(fds) / sizeof(fds[0]), timeoutMs); + if (1 == pollResult) + { + if (fds[0].revents & POLLERR) + { + error = errno; + } + else if (fds[0].revents & requestedEvents) + { + ret = true; + } + else + { + OS_LOG_ERROR(FAC_NET, CLASS_INFO() + << "unexpected return from poll(): pollResult = " << pollResult + << ", fds[0].revents =" << fds[0].revents); + } + } + else if(0 == pollResult) + { // timeout + const boost::system::error_code e; + + onTimeoutCb(e); + error = ETIMEDOUT; + } + else + { + error = errno; + } + + if (0 != error) + { + OS_LOG_ERROR(FAC_NET, CLASS_INFO() + << "(" << nativeSocket << ", " << timeoutMs << " ms) error: " << + error << "=" << strerror(error)); + } + + return ret; +} + +bool BlockingTcpClient::timedWaitUntilReadDataAvailable() +{ + // check for normal or out-of-band + return timedWaitUntilDataAvailable(boost::bind(&BlockingTcpClient::onReadTimeout, this, _1), + _readTimeout, + POLLIN | POLLPRI); +} + +bool BlockingTcpClient::timedWaitUntilWriteDataAvailable() +{ + return timedWaitUntilDataAvailable(boost::bind(&BlockingTcpClient::onWriteTimeout, this, _1), + _writeTimeout, + POLLOUT); +} + +bool BlockingTcpClient::connect(const std::string& serviceAddress, const std::string& servicePort) +{ + // Close the previous socket; + close(); + + if (_pSocket) + { + delete _pSocket; + _pSocket = 0; + } + + _pSocket = new boost::asio::ip::tcp::socket(_ioService); + + OS_LOG_INFO(FAC_NET, CLASS_INFO() "creating new connection to " << serviceAddress << ":" << servicePort); + + _serviceAddress = serviceAddress; + _servicePort = servicePort; + + try + { + boost::asio::ip::tcp::resolver::query query(boost::asio::ip::tcp::v4(), serviceAddress.c_str(), servicePort.c_str()); + boost::asio::ip::tcp::resolver::iterator hosts = _resolver.resolve(query); + + ConnectTimer timer(this); + + // this flag may be reset by ConnectTimer's timer during connect() call + _isConnected = true; + + ////////////////////////////////////////////////////////////////////////// + // Only works in 1.47 version of asio. 1.46 doesnt have this utility func + // boost::asio::connect(*_pSocket, hosts); + _pSocket->connect(hosts->endpoint()); // so we use the connect member + ////////////////////////////////////////////////////////////////////////// + OS_LOG_INFO(FAC_NET, CLASS_INFO() "creating new connection to " << serviceAddress << ":" << servicePort << " SUCESSFUL."); + } + catch(std::exception &e) + { + OS_LOG_ERROR(FAC_NET, CLASS_INFO() "failed with error " << e.what()); + _isConnected = false; + } + + return _isConnected; +} + +bool BlockingTcpClient::connect() +{ + // Initialize State Queue Agent Publisher if an address is provided + // + //if (_serviceAddress.empty() || _servicePort.empty()) + std::string sqaControlAddress; + std::string sqaControlPort; + std::ostringstream sqaconfig; + sqaconfig << SIPX_CONFDIR << "/" << "sipxsqa-client.ini"; + OsServiceOptions configOptions(sqaconfig.str()); + std::string controlAddress; + std::string controlPort; + + { + if (configOptions.parseOptions()) + { + bool enabled = false; + if (configOptions.getOption("enabled", enabled, enabled) && enabled) + { + configOptions.getOption("sqa-control-address", _serviceAddress); + configOptions.getOption("sqa-control-port", _servicePort); + } + else + { + OS_LOG_ERROR(FAC_NET, "BlockingTcpClient::connect() this:" << this << " Unable to read connection information from " << sqaconfig.str()); + return false; + } + } + } + + if(_serviceAddress.empty() || _servicePort.empty()) + { + OS_LOG_ERROR(FAC_NET, "BlockingTcpClient::connect() this:" << this << " remote address is not set"); + return false; + } + + return connect(_serviceAddress, _servicePort); +} + +bool BlockingTcpClient::send(const StateQueueMessage& request) +{ + assert(_pSocket); + std::string data = request.data(); + + if (data.size() > SQA_CONN_MAX_READ_BUFF_SIZE - 1) /// Account for the terminating char "_" + { + OS_LOG_ERROR(FAC_NET, CLASS_INFO() "data size: " << data.size() << " maximum buffer length of " << SQA_CONN_MAX_READ_BUFF_SIZE - 1); + return false; + } + + short version = 1; + unsigned long len = (unsigned long)data.size() + 1; /// Account for the terminating char "_" + std::stringstream strm; + strm.write((char*)(&version), sizeof(version)); + strm.write((char*)(&_key), sizeof(_key)); + strm.write((char*)(&len), sizeof(len)); + strm << data << "_"; + std::string packet = strm.str(); + boost::system::error_code ec; + bool ok = false; + + { + if (false == timedWaitUntilWriteDataAvailable()) + { + OS_LOG_ERROR(FAC_NET, CLASS_INFO() + << "timedWaitUntilWriteDataAvailable failed: " + << "Unable to send request"); + + _isConnected = false; + return false; + } + + //ok = boost::asio::write(*_pSocket, boost::asio::buffer(packet.c_str(), packet.size()), boost::asio::transfer_all(), ec) > 0; + ok = _pSocket->write_some(boost::asio::buffer(packet.c_str(), packet.size()), ec) > 0; + } + + if (!ok || ec) + { + OS_LOG_ERROR(FAC_NET, CLASS_INFO() "write_some error: " << ec.message()); + _isConnected = false; + return false; + } + return true; +} + +bool BlockingTcpClient::receive(StateQueueMessage& response) +{ + assert(_pSocket); + unsigned long len = getNextReadSize(); + if (!len) + { + OS_LOG_INFO(FAC_NET, CLASS_INFO() "next read size is empty."); + return false; + } + + char responseBuff[len]; + boost::system::error_code ec; + { + if (false == timedWaitUntilReadDataAvailable()) + { + OS_LOG_ERROR(FAC_NET, CLASS_INFO() + << "timedWaitUntilReadDataAvailable failed: " + << "Unable to receive response"); + + _isConnected = false; + return false; + } + + _pSocket->read_some(boost::asio::buffer((char*)responseBuff, len), ec); + } + + if (ec) + { + if (boost::asio::error::eof == ec) + { + OS_LOG_INFO(FAC_NET, CLASS_INFO() "remote closed the connection, read_some error: " << ec.message()); + } + else + { + OS_LOG_ERROR(FAC_NET, CLASS_INFO() "read_some error: " << ec.message()); + } + + _isConnected = false; + return false; + } + std::string responseData(responseBuff, len); + return response.parseData(responseData); +} + +bool BlockingTcpClient::sendAndReceive(const StateQueueMessage& request, StateQueueMessage& response) +{ + if (send(request)) + return receive(response); + return false; +} + +unsigned long BlockingTcpClient::getNextReadSize() +{ + short version = 1; + bool hasVersion = false; + bool hasKey = false; + unsigned long remoteLen = 0; + + while (!hasVersion || !hasKey) + { + short remoteVersion; + short remoteKey; + + //TODO: Refactor the code below to do one read for the three fields + // + // Read the version (must be 1) + // + while (true) + { + + boost::system::error_code ec; + if (false == timedWaitUntilReadDataAvailable()) + { + OS_LOG_ERROR(FAC_NET, CLASS_INFO() + << "timedWaitUntilReadDataAvailable failed: " + << "Unable to read version"); + + _isConnected = false; + return 0; + } + + _pSocket->read_some(boost::asio::buffer((char*)&remoteVersion, sizeof(remoteVersion)), ec); + if (ec) + { + if (boost::asio::error::eof == ec) + { + OS_LOG_ERROR(FAC_NET, CLASS_INFO() "remote closed the connection, read_some error: " << ec.message()); + } + else + { + OS_LOG_INFO(FAC_NET, CLASS_INFO() + << "Unable to read version " + << "ERROR: " << ec.message()); + } + + _isConnected = false; + return 0; + } + else + { + if (remoteVersion == version) + { + hasVersion = true; + break; + } + } + } + + while (true) + { + + boost::system::error_code ec; + if (false == timedWaitUntilReadDataAvailable()) + { + OS_LOG_ERROR(FAC_NET, CLASS_INFO() + << "timedWaitUntilReadDataAvailable failed: " + << "Unable to read secret key"); + + _isConnected = false; + return 0; + } + + _pSocket->read_some(boost::asio::buffer((char*)&remoteKey, sizeof(remoteKey)), ec); + if (ec) + { + if (boost::asio::error::eof == ec) + { + OS_LOG_ERROR(FAC_NET, CLASS_INFO() "remote closed the connection, read_some error: " << ec.message()); + } + else + { + OS_LOG_INFO(FAC_NET, CLASS_INFO() + << "Unable to read secret key " + << "ERROR: " << ec.message()); + } + + _isConnected = false; + return 0; + } + else + { + if (remoteKey >= SQA_KEY_MIN && remoteKey <= SQA_KEY_MAX) + { + hasKey = true; + break; + } + } + } + } + + boost::system::error_code ec; + if (false == timedWaitUntilReadDataAvailable()) + { + OS_LOG_ERROR(FAC_NET, CLASS_INFO() + << "timedWaitUntilReadDataAvailable failed: " + << "Unable to read secret packet length"); + + _isConnected = false; + return 0; + } + + _pSocket->read_some(boost::asio::buffer((char*)&remoteLen, sizeof(remoteLen)), ec); + if (ec) + { + if (boost::asio::error::eof == ec) + { + OS_LOG_ERROR(FAC_NET, CLASS_INFO() "remote closed the connection, read_some error: " << ec.message()); + } + else + { + OS_LOG_INFO(FAC_NET, CLASS_INFO() + << "Unable to read secret packet length " + << "ERROR: " << ec.message()); + } + + _isConnected = false; + return 0; + } + + return remoteLen; +} + +bool BlockingTcpClient::isConnected() const +{ + return _isConnected; +} + +std::string BlockingTcpClient::getLocalAddress() +{ + try + { + if (!_pSocket) + { + return ""; + } + + return _pSocket->local_endpoint().address().to_string(); + } + catch(...) + { + return ""; + } +} + diff --git a/sipXsqa/src/Makefile.am b/sipXsqa/src/Makefile.am index 44907c52fdf..cc66adb28ae 100755 --- a/sipXsqa/src/Makefile.am +++ b/sipXsqa/src/Makefile.am @@ -58,6 +58,7 @@ libsipXsqa_la_LDFLAGS = \ -version-info ${version_Current}:${version_Revision}:${version_Age} libsipXsqa_la_SOURCES = \ + BlockingTcpClient.cpp \ StateQueueAgent.cpp \ StateQueueConnection.cpp \ StateQueueDriverTest.cpp \ From c0818567d0f2b003c0157583c9a2687ff326d470 Mon Sep 17 00:00:00 2001 From: Roman Romanchenko Date: Fri, 5 Aug 2016 13:38:31 +0300 Subject: [PATCH 13/23] UC-4193 StateQueueClient 100ms hard coded timeout - magic numbers removed - clean unused code from BlockingClient - read of "tcp-timeout" option added --- sipXsqa/include/sqa/BlockingTcpClient.h | 51 +----------- sipXsqa/include/sqa/StateQueueClient.h | 9 +- .../include/sqa/StateQueueDialogDataClient.h | 4 +- sipXsqa/include/sqa/StateQueueDriverTest.h | 3 +- sipXsqa/include/sqa/StateQueueNotification.h | 10 ++- sipXsqa/include/sqa/sqaclient.h | 2 + sipXsqa/src/BlockingTcpClient.cpp | 82 +++++-------------- sipXsqa/src/StateQueueDriverTest.cpp | 18 ++-- sipXsqa/src/main.cpp | 2 +- 9 files changed, 50 insertions(+), 131 deletions(-) diff --git a/sipXsqa/include/sqa/BlockingTcpClient.h b/sipXsqa/include/sqa/BlockingTcpClient.h index 1a07106c53a..e83e08331cd 100644 --- a/sipXsqa/include/sqa/BlockingTcpClient.h +++ b/sipXsqa/include/sqa/BlockingTcpClient.h @@ -53,12 +53,7 @@ typedef boost::shared_ptr Ptr; - BlockingTcpClient( - boost::asio::io_service& ioService, - int readTimeout = SQA_CONN_READ_TIMEOUT, - int writeTimeout = SQA_CONN_WRITE_TIMEOUT, - short key = SQA_KEY_DEFAULT); - + explicit BlockingTcpClient(boost::asio::io_service& ioService, int readTimeout, int writeTimeout, short key); ~BlockingTcpClient(); bool connect(); @@ -72,13 +67,7 @@ const std::string &getServiceAddress() const; private: - void setReadTimeout(boost::asio::ip::tcp::socket& socket, int milliseconds); - void setWriteTimeout(boost::asio::ip::tcp::socket& socket, int milliseconds); - void startReadTimer(); - void startWriteTimer(); void startConnectTimer(); - void cancelReadTimer(); - void cancelWriteTimer(); void cancelConnectTimer(); void onReadTimeout(const boost::system::error_code& e); void onWriteTimeout(const boost::system::error_code& e); @@ -90,13 +79,10 @@ unsigned long getNextReadSize(); - private: - class ConnectTimer + struct ConnectTimer { - public: - ConnectTimer(BlockingTcpClient* pOwner) : - _pOwner(pOwner) + ConnectTimer(BlockingTcpClient* pOwner) : _pOwner(pOwner) { _pOwner->startConnectTimer(); } @@ -105,38 +91,7 @@ { _pOwner->cancelConnectTimer(); } - BlockingTcpClient* _pOwner; - }; - - class ReadTimer - { - public: - ReadTimer(BlockingTcpClient* pOwner) : - _pOwner(pOwner) - { - _pOwner->startReadTimer(); - } - ~ReadTimer() - { - _pOwner->cancelReadTimer(); - } - BlockingTcpClient* _pOwner; - }; - - class WriteTimer - { - public: - WriteTimer(BlockingTcpClient* pOwner) : - _pOwner(pOwner) - { - _pOwner->startWriteTimer(); - } - - ~WriteTimer() - { - _pOwner->cancelWriteTimer(); - } BlockingTcpClient* _pOwner; }; diff --git a/sipXsqa/include/sqa/StateQueueClient.h b/sipXsqa/include/sqa/StateQueueClient.h index c4d76d5203a..73b660667d4 100644 --- a/sipXsqa/include/sqa/StateQueueClient.h +++ b/sipXsqa/include/sqa/StateQueueClient.h @@ -26,7 +26,6 @@ #include #include -#include #include "StateQueueMessage.h" #include "BlockingQueue.h" @@ -96,8 +95,8 @@ class StateQueueClient : public boost::enable_shared_from_this const std::string& servicePort, const std::string& zmqEventId, std::size_t poolSize, - int readTimeout = SQA_CONN_READ_TIMEOUT, - int writeTimeout = SQA_CONN_WRITE_TIMEOUT, + int readTimeout, + int writeTimeout, int keepAliveTicks = SQA_KEEP_ALIVE_TICKS ) : _type(type), @@ -178,8 +177,8 @@ class StateQueueClient : public boost::enable_shared_from_this const std::string& applicationId, const std::string& zmqEventId, std::size_t poolSize, - int readTimeout = SQA_CONN_READ_TIMEOUT, - int writeTimeout = SQA_CONN_WRITE_TIMEOUT, + int readTimeout, + int writeTimeout, int keepAliveTicks = SQA_KEEP_ALIVE_TICKS ) : _type(type), diff --git a/sipXsqa/include/sqa/StateQueueDialogDataClient.h b/sipXsqa/include/sqa/StateQueueDialogDataClient.h index fc17326b2cf..0186d70d4f3 100644 --- a/sipXsqa/include/sqa/StateQueueDialogDataClient.h +++ b/sipXsqa/include/sqa/StateQueueDialogDataClient.h @@ -48,7 +48,9 @@ inline StateQueueDialogDataClient::StateQueueDialogDataClient( serviceAddress, servicePort, "dialog-state", - poolSize), + poolSize, + SQA_CONN_READ_TIMEOUT, + SQA_CONN_WRITE_TIMEOUT), _workspace(3) { } diff --git a/sipXsqa/include/sqa/StateQueueDriverTest.h b/sipXsqa/include/sqa/StateQueueDriverTest.h index 2ad1a890b8e..c745a044ba4 100644 --- a/sipXsqa/include/sqa/StateQueueDriverTest.h +++ b/sipXsqa/include/sqa/StateQueueDriverTest.h @@ -31,6 +31,7 @@ #include #include "TimedMap.h" +#define TEST_TCP_TIMEOUT 1000 class StateQueueDriverTest : boost::noncopyable { @@ -56,7 +57,7 @@ class ThreadedPop : public StateQueueClient const std::string& servicePort, const std::string& zmqEventId, std::size_t poolSize = 1) : - StateQueueClient(StateQueueClient::Worker, applicationId, serviceAddress, servicePort, zmqEventId, poolSize), + StateQueueClient(StateQueueClient::Worker, applicationId, serviceAddress, servicePort, zmqEventId, poolSize, TEST_TCP_TIMEOUT, TEST_TCP_TIMEOUT), _pThread(0), total(0) { diff --git a/sipXsqa/include/sqa/StateQueueNotification.h b/sipXsqa/include/sqa/StateQueueNotification.h index 208677531fb..50541ee743f 100644 --- a/sipXsqa/include/sqa/StateQueueNotification.h +++ b/sipXsqa/include/sqa/StateQueueNotification.h @@ -62,6 +62,7 @@ class StateQueueNotification SQAPublisher* _pPublisher; std::string _sqaControlAddress; std::string _sqaControlPort; + int _tcpTimeout; }; @@ -72,7 +73,8 @@ class StateQueueNotification inline StateQueueNotification::StateQueueNotification(OsServiceOptions& options) : _queueThread(0), _isRunning(false), - _pPublisher(0) + _pPublisher(0), + _tcpTimeout(SQA_DEFAULT_TCP_TIMEOUT) { #define within(num) (int) ((float) (num) * random () / (RAND_MAX + 1.0)) std::ostringstream ss; @@ -82,6 +84,7 @@ inline StateQueueNotification::StateQueueNotification(OsServiceOptions& options) options.getOption("sqa-control-address", _sqaControlAddress); options.getOption("sqa-control-port", _sqaControlPort); + options.getOption("tcp-timeout", _tcpTimeout); } inline StateQueueNotification::StateQueueNotification(const std::string& sqaControlAddress, @@ -90,7 +93,8 @@ inline StateQueueNotification::StateQueueNotification(const std::string& sqaCont _isRunning(false), _pPublisher(0), _sqaControlAddress(sqaControlAddress), - _sqaControlPort(sqaControlPort) + _sqaControlPort(sqaControlPort), + _tcpTimeout(SQA_DEFAULT_TCP_TIMEOUT) { #define within(num) (int) ((float) (num) * random () / (RAND_MAX + 1.0)) std::ostringstream ss; @@ -133,7 +137,7 @@ inline void StateQueueNotification::internal_run() ss << "ssw-" << std::hex << std::uppercase << std::setw(4) << std::setfill('0') << (int) ((float) (0x10000) * random () / (RAND_MAX + 1.0)); - _pPublisher = new SQAPublisher(ss.str().c_str(), _sqaControlAddress.c_str(), _sqaControlPort.c_str(), 1, 100, 100); + _pPublisher = new SQAPublisher(ss.str().c_str(), _sqaControlAddress.c_str(), _sqaControlPort.c_str(), 1, _tcpTimeout, _tcpTimeout); std::string key; diff --git a/sipXsqa/include/sqa/sqaclient.h b/sipXsqa/include/sqa/sqaclient.h index e48fb67c5d7..77d3ea16bef 100644 --- a/sipXsqa/include/sqa/sqaclient.h +++ b/sipXsqa/include/sqa/sqaclient.h @@ -46,6 +46,8 @@ namespace std #endif +#define SQA_DEFAULT_TCP_TIMEOUT 100 + class SQALogger { public: diff --git a/sipXsqa/src/BlockingTcpClient.cpp b/sipXsqa/src/BlockingTcpClient.cpp index 420631d2114..75a71501e82 100644 --- a/sipXsqa/src/BlockingTcpClient.cpp +++ b/sipXsqa/src/BlockingTcpClient.cpp @@ -30,8 +30,6 @@ #define SQA_CONN_MAX_READ_BUFF_SIZE 65536 #define SQA_CONN_CONNECTION_TIMEOUT_MSEC 5000 -#define SQA_CONN_READ_TIMEOUT 1000 -#define SQA_CONN_WRITE_TIMEOUT 1000 #define SQA_KEY_MIN 22172 #define SQA_KEY_ALPHA 22180 #define SQA_KEY_DEFAULT SQA_KEY_MIN @@ -81,36 +79,6 @@ BlockingTcpClient::~BlockingTcpClient() } } -void BlockingTcpClient::setReadTimeout(boost::asio::ip::tcp::socket& socket, int milliseconds) -{ - struct timeval tv; - tv.tv_sec = 0; - tv.tv_usec = milliseconds * 1000; - setsockopt(socket.native(), SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); -} - -void BlockingTcpClient::setWriteTimeout(boost::asio::ip::tcp::socket& socket, int milliseconds) -{ - struct timeval tv; - tv.tv_sec = 0; - tv.tv_usec = milliseconds * 1000; - setsockopt(socket.native(), SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); -} - -void BlockingTcpClient::startReadTimer() -{ - boost::system::error_code ec; - _readTimer.expires_from_now(boost::posix_time::milliseconds(_readTimeout), ec); - _readTimer.async_wait(boost::bind(&BlockingTcpClient::onReadTimeout, this, boost::asio::placeholders::error)); -} - -void BlockingTcpClient::startWriteTimer() -{ - boost::system::error_code ec; - _writeTimer.expires_from_now(boost::posix_time::milliseconds(_writeTimeout), ec); - _writeTimer.async_wait(boost::bind(&BlockingTcpClient::onWriteTimeout, this, boost::asio::placeholders::error)); -} - void BlockingTcpClient::startConnectTimer() { boost::system::error_code ec; @@ -118,18 +86,6 @@ void BlockingTcpClient::startConnectTimer() _connectTimer.async_wait(boost::bind(&BlockingTcpClient::onConnectTimeout, this, boost::asio::placeholders::error)); } -void BlockingTcpClient::cancelReadTimer() -{ - boost::system::error_code ec; - _readTimer.cancel(ec); -} - -void BlockingTcpClient::cancelWriteTimer() -{ - boost::system::error_code ec; - _writeTimer.cancel(ec); -} - void BlockingTcpClient::cancelConnectTimer() { boost::system::error_code ec; @@ -295,31 +251,31 @@ bool BlockingTcpClient::connect(const std::string& serviceAddress, const std::st bool BlockingTcpClient::connect() { // Initialize State Queue Agent Publisher if an address is provided - // - //if (_serviceAddress.empty() || _servicePort.empty()) - std::string sqaControlAddress; - std::string sqaControlPort; - std::ostringstream sqaconfig; - sqaconfig << SIPX_CONFDIR << "/" << "sipxsqa-client.ini"; - OsServiceOptions configOptions(sqaconfig.str()); - std::string controlAddress; - std::string controlPort; + std::string sqaControlAddress; + std::string sqaControlPort; + std::ostringstream sqaconfig; + sqaconfig << SIPX_CONFDIR << "/" << "sipxsqa-client.ini"; + OsServiceOptions configOptions(sqaconfig.str()); + if (configOptions.parseOptions()) { - if (configOptions.parseOptions()) + bool enabled = false; + if (configOptions.getOption("enabled", enabled, enabled) && enabled) { - bool enabled = false; - if (configOptions.getOption("enabled", enabled, enabled) && enabled) - { - configOptions.getOption("sqa-control-address", _serviceAddress); - configOptions.getOption("sqa-control-port", _servicePort); - } - else + configOptions.getOption("sqa-control-address", _serviceAddress); + configOptions.getOption("sqa-control-port", _servicePort); + + if (configOptions.hasOption("tcp-timeout")) { - OS_LOG_ERROR(FAC_NET, "BlockingTcpClient::connect() this:" << this << " Unable to read connection information from " << sqaconfig.str()); - return false; + configOptions.getOption("tcp-timeout", _readTimeout); + configOptions.getOption("tcp-timeout", _writeTimeout); } } + else + { + OS_LOG_ERROR(FAC_NET, "BlockingTcpClient::connect() this:" << this << " Unable to read connection information from " << sqaconfig.str()); + return false; + } } if(_serviceAddress.empty() || _servicePort.empty()) diff --git a/sipXsqa/src/StateQueueDriverTest.cpp b/sipXsqa/src/StateQueueDriverTest.cpp index 66637cd8e81..60fd8bfa311 100644 --- a/sipXsqa/src/StateQueueDriverTest.cpp +++ b/sipXsqa/src/StateQueueDriverTest.cpp @@ -130,7 +130,7 @@ DEFINE_TEST(TestDriver, TestWatcher) _pAgent->options().getOption("sqa-control-address", address); _pAgent->options().getOption("sqa-control-port", port); StateQueueClient* pPublisher = GET_RESOURCE(TestDriver, StateQueueClient*, "simple_publisher"); - StateQueueClient watcher(StateQueueClient::Watcher, "StateQueueDriverTest", address, port, "watcher-data", 1); + StateQueueClient watcher(StateQueueClient::Watcher, "StateQueueDriverTest", address, port, "watcher-data", 1, TEST_TCP_TIMEOUT, TEST_TCP_TIMEOUT); boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); ASSERT_COND(pPublisher->publish("watcher-data-sample", "Hello SQA!", false)); std::string watcherData; @@ -147,8 +147,8 @@ DEFINE_TEST(TestDriver, TestPublishAndPersist) _pAgent->options().getOption("sqa-control-address", address); _pAgent->options().getOption("sqa-control-port", port); - SQAPublisher publisher("TestPublishAndPersist", address.c_str(), port.c_str(), 1, 100, 100); - SQAWatcher watcher("TestPublishAndPersist", address.c_str(), port.c_str(), "pub&persist", 1, 100, 100); + SQAPublisher publisher("TestPublishAndPersist", address.c_str(), port.c_str(), 1, TEST_TCP_TIMEOUT, TEST_TCP_TIMEOUT); + SQAWatcher watcher("TestPublishAndPersist", address.c_str(), port.c_str(), "pub&persist", 1, TEST_TCP_TIMEOUT, TEST_TCP_TIMEOUT); boost::this_thread::sleep(boost::posix_time::milliseconds(100)); ASSERT_COND(publisher.publishAndPersist(5, "pub&persist", "test-data", 10)); SQAEvent* pEvent = watcher.watch(); @@ -178,9 +178,9 @@ DEFINE_TEST(TestDriver, TestDealAndPublish) int poolSize // Number of active connections to SQA ) */ - SQADealer dealer("TestDealAndPublish", address.c_str(), port.c_str(), "not", 1, 100, 100); - SQAWatcher watcher("TestDealAndPublish", address.c_str(), port.c_str(), "not", 1, 100, 100); - SQAWorker worker("TestDealAndPublish", address.c_str(), port.c_str(), "not", 1, 100, 100); + SQADealer dealer("TestDealAndPublish", address.c_str(), port.c_str(), "not", 1, SQA_DEFAULT_TCP_TIMEOUT, SQA_DEFAULT_TCP_TIMEOUT); + SQAWatcher watcher("TestDealAndPublish", address.c_str(), port.c_str(), "not", 1, SQA_DEFAULT_TCP_TIMEOUT, SQA_DEFAULT_TCP_TIMEOUT); + SQAWorker worker("TestDealAndPublish", address.c_str(), port.c_str(), "not", 1, SQA_DEFAULT_TCP_TIMEOUT, SQA_DEFAULT_TCP_TIMEOUT); boost::this_thread::sleep(boost::posix_time::milliseconds(100)); ASSERT_COND(dealer.dealAndPublish("test-data", 20)); SQAEvent* pEvent = worker.fetchTask(); @@ -243,7 +243,7 @@ DEFINE_TEST(TestDriver, TestMapGetSetPlugin) std::string port; _pAgent->options().getOption("sqa-control-address", address); _pAgent->options().getOption("sqa-control-port", port); - SQAWatcher watcher("TestMapGetSetPlugin", address.c_str(), port.c_str(), "dummy", 1, 100, 100); + SQAWatcher watcher("TestMapGetSetPlugin", address.c_str(), port.c_str(), "dummy", 1, SQA_DEFAULT_TCP_TIMEOUT, SQA_DEFAULT_TCP_TIMEOUT); watcher.mset(10, "TestMapGetSetPlugin", "cseq", "0", 10); char* cseq = watcher.mget(10, "TestMapGetSetPlugin", "cseq"); ASSERT_STR_EQ(cseq, "0"); @@ -275,8 +275,8 @@ bool StateQueueDriverTest::runTests() // Define common resource accessible by all unit tests // DEFINE_RESOURCE(TestDriver, "state_agent", &_agent); - DEFINE_RESOURCE(TestDriver, "simple_pop_client", new StateQueueClient(StateQueueClient::Worker, "StateQueueDriverTest", address, port, "reg", 2)); - DEFINE_RESOURCE(TestDriver, "simple_publisher", new StateQueueClient(StateQueueClient::Publisher, "StateQueueDriverTest", address, port, "reg", 2)); + DEFINE_RESOURCE(TestDriver, "simple_pop_client", new StateQueueClient(StateQueueClient::Worker, "StateQueueDriverTest", address, port, "reg", 2, TEST_TCP_TIMEOUT, TEST_TCP_TIMEOUT)); + DEFINE_RESOURCE(TestDriver, "simple_publisher", new StateQueueClient(StateQueueClient::Publisher, "StateQueueDriverTest", address, port, "reg", 2, TEST_TCP_TIMEOUT, TEST_TCP_TIMEOUT)); boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); diff --git a/sipXsqa/src/main.cpp b/sipXsqa/src/main.cpp index 60eb1c00467..24a17c704c6 100644 --- a/sipXsqa/src/main.cpp +++ b/sipXsqa/src/main.cpp @@ -18,7 +18,6 @@ #include "sqa/ServiceOptions.h" #include "sqa/StateQueueAgent.h" #include "sqa/StateQueueDriverTest.h" -//#include "sqa/StateQueueConnection.h" #include "sipXecsService/SipXApplication.h" #define SIPXSQA_APP_NAME "StateQueueAgent" @@ -50,6 +49,7 @@ int main(int argc, char** argv) osServiceOptions.addOptionString("sqa-control-address", ": Address where to send control commands.", OsServiceOptions::ConfigOption); osServiceOptions.addOptionInt("id", ": Address where to send control commands.", OsServiceOptions::ConfigOption); osServiceOptions.addOptionFlag("test-driver", ": Set this flag if you want to run the driver unit tests to ensure proper operations.", OsServiceOptions::ConfigOption); + osServiceOptions.addOptionInt("tcp-timeout", ": TCP read/write timout.", OsServiceOptions::ConfigOption); // NOTE: this might exit application in case of failure sipXApplication.init(argc, argv, sqaData); From a51ebc97b9717e2cf5eef34672c309b31d91662f Mon Sep 17 00:00:00 2001 From: Roman Romanchenko Date: Fri, 5 Aug 2016 13:38:31 +0300 Subject: [PATCH 14/23] UC-4193 StateQueueClient 100ms hard coded timeout - magic numbers removed - clean unused code from BlockingClient - read of "tcp-timeout" option added --- sipXsqa/include/sqa/BlockingTcpClient.h | 1 - 1 file changed, 1 deletion(-) diff --git a/sipXsqa/include/sqa/BlockingTcpClient.h b/sipXsqa/include/sqa/BlockingTcpClient.h index e83e08331cd..5301f47f8de 100644 --- a/sipXsqa/include/sqa/BlockingTcpClient.h +++ b/sipXsqa/include/sqa/BlockingTcpClient.h @@ -30,7 +30,6 @@ #include "StateQueueMessage.h" #include "BlockingQueue.h" -#include "BlockingTcpClient.h" #define SQA_LINGER_TIME_MILLIS 5000 #define SQA_TERMINATE_STRING "__TERMINATE__" From e4924b2cc58a2e68cff63157ca4cc131db5b0f0b Mon Sep 17 00:00:00 2001 From: Roman Romanchenko Date: Fri, 5 Aug 2016 13:38:31 +0300 Subject: [PATCH 15/23] UC-4193 StateQueueClient 100ms hard coded timeout - magic numbers removed - clean unused code from BlockingClient - read of "tcp-timeout" option added --- sipXsqa/include/Makefile.am | 1 + 1 file changed, 1 insertion(+) diff --git a/sipXsqa/include/Makefile.am b/sipXsqa/include/Makefile.am index c8cd7602f77..6fbe31f97e1 100755 --- a/sipXsqa/include/Makefile.am +++ b/sipXsqa/include/Makefile.am @@ -4,6 +4,7 @@ nobase_includeHEADERS_INSTALL = $(INSTALL) -D -p -c -m 644 nobase_include_HEADERS = \ sqa/BlockingQueue.h \ + sqa/BlockingTcpClient.h \ sqa/RedisClient.h \ sqa/RedisClientAsync.h \ sqa/Semaphore.h \ From 2043f43994734d7b9c325bd4441bc80aec49154b Mon Sep 17 00:00:00 2001 From: Roman Romanchenko Date: Mon, 8 Aug 2016 12:23:48 +0300 Subject: [PATCH 16/23] UC-4193 StateQueueClient 100ms hard coded timeout - BlockingTcpClient.cpp merged back to BlockingTcpClient.h to avoid compiltaion issues --- sipXsqa/include/sqa/BlockingTcpClient.h | 564 ++++++++++++++++++++++-- sipXsqa/src/BlockingTcpClient.cpp | 539 ---------------------- sipXsqa/src/Makefile.am | 1 - 3 files changed, 517 insertions(+), 587 deletions(-) delete mode 100644 sipXsqa/src/BlockingTcpClient.cpp diff --git a/sipXsqa/include/sqa/BlockingTcpClient.h b/sipXsqa/include/sqa/BlockingTcpClient.h index 5301f47f8de..c70776db0bf 100644 --- a/sipXsqa/include/sqa/BlockingTcpClient.h +++ b/sipXsqa/include/sqa/BlockingTcpClient.h @@ -46,69 +46,539 @@ // Defines the interval, in seconds, to wait between keep alive loop calls #define SQA_KEEP_ALIVE_LOOP_INTERVAL_SECS 1 - class BlockingTcpClient - { +class BlockingTcpClient +{ public: typedef boost::shared_ptr Ptr; - explicit BlockingTcpClient(boost::asio::io_service& ioService, int readTimeout, int writeTimeout, short key); - ~BlockingTcpClient(); + BlockingTcpClient(boost::asio::io_service& ioService, int readTimeout, int writeTimeout, short key) : + _ioService(ioService), + _resolver(_ioService), + _pSocket(0), + _isConnected(false), + _readTimeout(readTimeout), + _writeTimeout(writeTimeout), + _key(key), + _readTimer(_ioService), + _writeTimer(_ioService), + _connectTimer(_ioService) + { + } - bool connect(); - bool connect(const std::string& serviceAddress, const std::string& servicePort); - bool send(const StateQueueMessage& request); - bool receive(StateQueueMessage& response); - bool sendAndReceive(const StateQueueMessage& request, StateQueueMessage& response); - bool isConnected() const; - std::string getLocalAddress(); - const std::string &getServicePort() const; - const std::string &getServiceAddress() const; + ~BlockingTcpClient() + { + if (_pSocket) + { + delete _pSocket; + _pSocket = 0; + } + } -private: - void startConnectTimer(); - void cancelConnectTimer(); - void onReadTimeout(const boost::system::error_code& e); - void onWriteTimeout(const boost::system::error_code& e); - void onConnectTimeout(const boost::system::error_code& e); - void close(); - bool timedWaitUntilDataAvailable(boost::function onTimeoutCb, int timeoutMs, short int requestedEvents); - bool timedWaitUntilReadDataAvailable(); - bool timedWaitUntilWriteDataAvailable(); - - unsigned long getNextReadSize(); + bool connect() + { + // Initialize State Queue Agent Publisher if an address is provided + std::string sqaControlAddress; + std::string sqaControlPort; + std::ostringstream sqaconfig; + sqaconfig << SIPX_CONFDIR << "/" << "sipxsqa-client.ini"; + OsServiceOptions configOptions(sqaconfig.str()); + + if (configOptions.parseOptions()) + { + bool enabled = false; + if (configOptions.getOption("enabled", enabled, enabled) && enabled) + { + configOptions.getOption("sqa-control-address", _serviceAddress); + configOptions.getOption("sqa-control-port", _servicePort); + + if (configOptions.hasOption("tcp-timeout")) + { + configOptions.getOption("tcp-timeout", _readTimeout); + configOptions.getOption("tcp-timeout", _writeTimeout); + } + } + else + { + OS_LOG_ERROR(FAC_NET, "BlockingTcpClient::connect() this:" << this << " Unable to read connection information from " << sqaconfig.str()); + return false; + } + } + + if(_serviceAddress.empty() || _servicePort.empty()) + { + OS_LOG_ERROR(FAC_NET, "BlockingTcpClient::connect() this:" << this << " remote address is not set"); + return false; + } + + return connect(_serviceAddress, _servicePort); + } + + bool connect(const std::string& serviceAddress, const std::string& servicePort) + { + // Close the previous socket; + close(); + + if (_pSocket) + { + delete _pSocket; + _pSocket = 0; + } + + _pSocket = new boost::asio::ip::tcp::socket(_ioService); + + OS_LOG_INFO(FAC_NET, CLASS_INFO() "creating new connection to " << serviceAddress << ":" << servicePort); + + _serviceAddress = serviceAddress; + _servicePort = servicePort; + + try + { + boost::asio::ip::tcp::resolver::query query(boost::asio::ip::tcp::v4(), serviceAddress.c_str(), servicePort.c_str()); + boost::asio::ip::tcp::resolver::iterator hosts = _resolver.resolve(query); + + ConnectTimer timer(this); + + // this flag may be reset by ConnectTimer's timer during connect() call + _isConnected = true; + + ////////////////////////////////////////////////////////////////////////// + // Only works in 1.47 version of asio. 1.46 doesnt have this utility func + // boost::asio::connect(*_pSocket, hosts); + _pSocket->connect(hosts->endpoint()); // so we use the connect member + ////////////////////////////////////////////////////////////////////////// + OS_LOG_INFO(FAC_NET, CLASS_INFO() "creating new connection to " << serviceAddress << ":" << servicePort << " SUCESSFUL."); + } + catch(std::exception &e) + { + OS_LOG_ERROR(FAC_NET, CLASS_INFO() "failed with error " << e.what()); + _isConnected = false; + } + + return _isConnected; + } + + bool send(const StateQueueMessage& request) + { + assert(_pSocket); + std::string data = request.data(); + + if (data.size() > SQA_CONN_MAX_READ_BUFF_SIZE - 1) /// Account for the terminating char "_" + { + OS_LOG_ERROR(FAC_NET, CLASS_INFO() "data size: " << data.size() << " maximum buffer length of " << SQA_CONN_MAX_READ_BUFF_SIZE - 1); + return false; + } + + short version = 1; + unsigned long len = (unsigned long)data.size() + 1; /// Account for the terminating char "_" + std::stringstream strm; + strm.write((char*)(&version), sizeof(version)); + strm.write((char*)(&_key), sizeof(_key)); + strm.write((char*)(&len), sizeof(len)); + strm << data << "_"; + std::string packet = strm.str(); + boost::system::error_code ec; + bool ok = false; + + { + if (false == timedWaitUntilWriteDataAvailable()) + { + OS_LOG_ERROR(FAC_NET, CLASS_INFO() + << "timedWaitUntilWriteDataAvailable failed: " + << "Unable to send request"); + + _isConnected = false; + return false; + } + + //ok = boost::asio::write(*_pSocket, boost::asio::buffer(packet.c_str(), packet.size()), boost::asio::transfer_all(), ec) > 0; + ok = _pSocket->write_some(boost::asio::buffer(packet.c_str(), packet.size()), ec) > 0; + } + + if (!ok || ec) + { + OS_LOG_ERROR(FAC_NET, CLASS_INFO() "write_some error: " << ec.message()); + _isConnected = false; + return false; + } + return true; + } + + bool receive(StateQueueMessage& response) + { + assert(_pSocket); + unsigned long len = getNextReadSize(); + if (!len) + { + OS_LOG_INFO(FAC_NET, CLASS_INFO() "next read size is empty."); + return false; + } + + char responseBuff[len]; + boost::system::error_code ec; + { + if (false == timedWaitUntilReadDataAvailable()) + { + OS_LOG_ERROR(FAC_NET, CLASS_INFO() + << "timedWaitUntilReadDataAvailable failed: " + << "Unable to receive response"); + + _isConnected = false; + return false; + } + + _pSocket->read_some(boost::asio::buffer((char*)responseBuff, len), ec); + } + + if (ec) + { + if (boost::asio::error::eof == ec) + { + OS_LOG_INFO(FAC_NET, CLASS_INFO() "remote closed the connection, read_some error: " << ec.message()); + } + else + { + OS_LOG_ERROR(FAC_NET, CLASS_INFO() "read_some error: " << ec.message()); + } + + _isConnected = false; + return false; + } + std::string responseData(responseBuff, len); + return response.parseData(responseData); + } + + bool sendAndReceive(const StateQueueMessage& request, StateQueueMessage& response) + { + if (send(request)) + { + return receive(response); + } + + return false; + } + + bool isConnected() const + { + return _isConnected; + } + + std::string getLocalAddress() + { + try + { + if (!_pSocket) + { + return ""; + } + + return _pSocket->local_endpoint().address().to_string(); + } + catch(...) + { + return ""; + } + } + + const std::string &getServicePort() const + { + return _servicePort; + } + + const std::string &getServiceAddress() const + { + return _serviceAddress; + } private: - struct ConnectTimer + void startConnectTimer() + { + boost::system::error_code ec; + _connectTimer.expires_from_now(boost::posix_time::milliseconds(SQA_CONN_CONNECTION_TIMEOUT_MSEC), ec); + _connectTimer.async_wait(boost::bind(&BlockingTcpClient::onConnectTimeout, this, boost::asio::placeholders::error)); + } + + void cancelConnectTimer() + { + boost::system::error_code ec; + _connectTimer.cancel(ec); + } + + void onReadTimeout(const boost::system::error_code& e) + { + if (e) + { + return; + } + + close(); + OS_LOG_ERROR(FAC_NET, CLASS_INFO() "- " << _readTimeout << " milliseconds."); + } + + void onWriteTimeout(const boost::system::error_code& e) + { + if (e) + { + return; + } + + close(); + OS_LOG_ERROR(FAC_NET, CLASS_INFO() "- " << _writeTimeout << " milliseconds."); + } + + void onConnectTimeout(const boost::system::error_code& e) + { + if (e) + { + return; + } + + close(); + OS_LOG_ERROR(FAC_NET, CLASS_INFO() "- " << SQA_CONN_CONNECTION_TIMEOUT_MSEC << " milliseconds."); + } + + void close() + { + if (_pSocket) { - ConnectTimer(BlockingTcpClient* pOwner) : _pOwner(pOwner) + boost::system::error_code ignored_ec; + _pSocket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignored_ec); + _pSocket->close(ignored_ec); + _isConnected = false; + OS_LOG_INFO(FAC_NET, CLASS_INFO() "- socket deleted."); + } + } + + bool timedWaitUntilDataAvailable(boost::function onTimeoutCb, int timeoutMs, short int requestedEvents) + { + int error = 0; + bool ret = false; + int nativeSocket = _pSocket->native(); + + struct pollfd fds[1] = {{nativeSocket, requestedEvents, 0}}; + + + int pollResult = poll(fds, sizeof(fds) / sizeof(fds[0]), timeoutMs); + if (1 == pollResult) + { + if (fds[0].revents & POLLERR) + { + error = errno; + } + else if (fds[0].revents & requestedEvents) + { + ret = true; + } + else { - _pOwner->startConnectTimer(); + OS_LOG_ERROR(FAC_NET, CLASS_INFO() + << "unexpected return from poll(): pollResult = " << pollResult + << ", fds[0].revents =" << fds[0].revents); } + } + else if(0 == pollResult) + { // timeout + const boost::system::error_code e; + + onTimeoutCb(e); + error = ETIMEDOUT; + } + else + { + error = errno; + } + + if (0 != error) + { + OS_LOG_ERROR(FAC_NET, CLASS_INFO() + << "(" << nativeSocket << ", " << timeoutMs << " ms) error: " << + error << "=" << strerror(error)); + } - ~ConnectTimer() + return ret; + } + + bool timedWaitUntilReadDataAvailable() + { + // check for normal or out-of-band + return timedWaitUntilDataAvailable(boost::bind(&BlockingTcpClient::onReadTimeout, this, _1), + _readTimeout, + POLLIN | POLLPRI); + } + + bool timedWaitUntilWriteDataAvailable() + { + return timedWaitUntilDataAvailable(boost::bind(&BlockingTcpClient::onWriteTimeout, this, _1), + _writeTimeout, + POLLOUT); + } + + unsigned long getNextReadSize() + { + short version = 1; + bool hasVersion = false; + bool hasKey = false; + unsigned long remoteLen = 0; + + while (!hasVersion || !hasKey) + { + short remoteVersion; + short remoteKey; + + //TODO: Refactor the code below to do one read for the three fields + // + // Read the version (must be 1) + // + while (true) + { + + boost::system::error_code ec; + if (false == timedWaitUntilReadDataAvailable()) + { + OS_LOG_ERROR(FAC_NET, CLASS_INFO() + << "timedWaitUntilReadDataAvailable failed: " + << "Unable to read version"); + + _isConnected = false; + return 0; + } + + _pSocket->read_some(boost::asio::buffer((char*)&remoteVersion, sizeof(remoteVersion)), ec); + if (ec) + { + if (boost::asio::error::eof == ec) + { + OS_LOG_ERROR(FAC_NET, CLASS_INFO() "remote closed the connection, read_some error: " << ec.message()); + } + else + { + OS_LOG_INFO(FAC_NET, CLASS_INFO() + << "Unable to read version " + << "ERROR: " << ec.message()); + } + + _isConnected = false; + return 0; + } + else + { + if (remoteVersion == version) + { + hasVersion = true; + break; + } + } + } + + while (true) + { + + boost::system::error_code ec; + if (false == timedWaitUntilReadDataAvailable()) + { + OS_LOG_ERROR(FAC_NET, CLASS_INFO() + << "timedWaitUntilReadDataAvailable failed: " + << "Unable to read secret key"); + + _isConnected = false; + return 0; + } + + _pSocket->read_some(boost::asio::buffer((char*)&remoteKey, sizeof(remoteKey)), ec); + if (ec) + { + if (boost::asio::error::eof == ec) + { + OS_LOG_ERROR(FAC_NET, CLASS_INFO() "remote closed the connection, read_some error: " << ec.message()); + } + else + { + OS_LOG_INFO(FAC_NET, CLASS_INFO() + << "Unable to read secret key " + << "ERROR: " << ec.message()); + } + + _isConnected = false; + return 0; + } + else + { + if (remoteKey >= SQA_KEY_MIN && remoteKey <= SQA_KEY_MAX) + { + hasKey = true; + break; + } + } + } + } + + boost::system::error_code ec; + if (false == timedWaitUntilReadDataAvailable()) + { + OS_LOG_ERROR(FAC_NET, CLASS_INFO() + << "timedWaitUntilReadDataAvailable failed: " + << "Unable to read secret packet length"); + + _isConnected = false; + return 0; + } + + _pSocket->read_some(boost::asio::buffer((char*)&remoteLen, sizeof(remoteLen)), ec); + if (ec) + { + if (boost::asio::error::eof == ec) { - _pOwner->cancelConnectTimer(); + OS_LOG_ERROR(FAC_NET, CLASS_INFO() "remote closed the connection, read_some error: " << ec.message()); } + else + { + OS_LOG_INFO(FAC_NET, CLASS_INFO() + << "Unable to read secret packet length " + << "ERROR: " << ec.message()); + } + + _isConnected = false; + return 0; + } + + return remoteLen; + } - BlockingTcpClient* _pOwner; - }; - - const std::string& className(); - - boost::asio::io_service& _ioService; - boost::asio::ip::tcp::resolver _resolver; - boost::asio::ip::tcp::socket *_pSocket; - std::string _serviceAddress; - std::string _servicePort; - bool _isConnected; - int _readTimeout; - int _writeTimeout; - short _key; - boost::asio::deadline_timer _readTimer; - boost::asio::deadline_timer _writeTimer; - boost::asio::deadline_timer _connectTimer; +private: + struct ConnectTimer + { + ConnectTimer(BlockingTcpClient* pOwner) : _pOwner(pOwner) + { + _pOwner->startConnectTimer(); + } + + ~ConnectTimer() + { + _pOwner->cancelConnectTimer(); + } + + BlockingTcpClient* _pOwner; }; + const std::string& className() + { + static const std::string className("StateQueueClient::BlockingTcpClient"); + return className; + } + + boost::asio::io_service& _ioService; + boost::asio::ip::tcp::resolver _resolver; + boost::asio::ip::tcp::socket *_pSocket; + std::string _serviceAddress; + std::string _servicePort; + bool _isConnected; + int _readTimeout; + int _writeTimeout; + short _key; + boost::asio::deadline_timer _readTimer; + boost::asio::deadline_timer _writeTimer; + boost::asio::deadline_timer _connectTimer; +}; + #endif /* BlockingTcpClient_H */ diff --git a/sipXsqa/src/BlockingTcpClient.cpp b/sipXsqa/src/BlockingTcpClient.cpp deleted file mode 100644 index 75a71501e82..00000000000 --- a/sipXsqa/src/BlockingTcpClient.cpp +++ /dev/null @@ -1,539 +0,0 @@ -/* - * Copyright (c) eZuce, Inc. All rights reserved. - * Contributed to SIPfoundry under a Contributor Agreement - * - * This software is free software; you can redistribute it and/or modify it under - * the terms of the Affero General Public License (AGPL) as published by the - * Free Software Foundation; either version 3 of the License, or (at your option) - * any later version. - * - * This software is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS - * FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more - * details. - */ - -#include -#include -#include -#include -#include -#include -#include -#include - -#include -#include - -#include "sqa/StateQueueMessage.h" -#include "sqa/BlockingTcpClient.h" - -#define SQA_CONN_MAX_READ_BUFF_SIZE 65536 -#define SQA_CONN_CONNECTION_TIMEOUT_MSEC 5000 -#define SQA_KEY_MIN 22172 -#define SQA_KEY_ALPHA 22180 -#define SQA_KEY_DEFAULT SQA_KEY_MIN -#define SQA_KEY_MAX 22200 - -const std::string& BlockingTcpClient::className() -{ - static const std::string className("StateQueueClient::BlockingTcpClient"); - - return className; -} - -const std::string & BlockingTcpClient::getServicePort() const -{ - return _servicePort; -} - -const std::string & BlockingTcpClient::getServiceAddress() const -{ - return _serviceAddress; -} - -BlockingTcpClient::BlockingTcpClient( - boost::asio::io_service& ioService, - int readTimeout, - int writeTimeout, - short key) : - _ioService(ioService), - _resolver(_ioService), - _pSocket(0), - _isConnected(false), - _readTimeout(readTimeout), - _writeTimeout(writeTimeout), - _key(key), - _readTimer(_ioService), - _writeTimer(_ioService), - _connectTimer(_ioService) -{ -} - -BlockingTcpClient::~BlockingTcpClient() -{ - if (_pSocket) - { - delete _pSocket; - _pSocket = 0; - } -} - -void BlockingTcpClient::startConnectTimer() -{ - boost::system::error_code ec; - _connectTimer.expires_from_now(boost::posix_time::milliseconds(SQA_CONN_CONNECTION_TIMEOUT_MSEC), ec); - _connectTimer.async_wait(boost::bind(&BlockingTcpClient::onConnectTimeout, this, boost::asio::placeholders::error)); -} - -void BlockingTcpClient::cancelConnectTimer() -{ - boost::system::error_code ec; - _connectTimer.cancel(ec); -} - -void BlockingTcpClient::onReadTimeout(const boost::system::error_code& e) -{ - if (e) - { - return; - } - - close(); - OS_LOG_ERROR(FAC_NET, CLASS_INFO() "- " << _readTimeout << " milliseconds."); -} - - -void BlockingTcpClient::onWriteTimeout(const boost::system::error_code& e) -{ - if (e) - { - return; - } - - close(); - OS_LOG_ERROR(FAC_NET, CLASS_INFO() "- " << _writeTimeout << " milliseconds."); -} - -void BlockingTcpClient::onConnectTimeout(const boost::system::error_code& e) -{ - if (e) - { - return; - } - - close(); - OS_LOG_ERROR(FAC_NET, CLASS_INFO() "- " << SQA_CONN_CONNECTION_TIMEOUT_MSEC << " milliseconds."); -} - -void BlockingTcpClient::close() -{ - if (_pSocket) - { - boost::system::error_code ignored_ec; - _pSocket->shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignored_ec); - _pSocket->close(ignored_ec); - _isConnected = false; - OS_LOG_INFO(FAC_NET, CLASS_INFO() "- socket deleted."); - } -} - -bool BlockingTcpClient::timedWaitUntilDataAvailable(boost::function onTimeoutCb, - int timeoutMs, - short int requestedEvents) -{ - int error = 0; - bool ret = false; - int nativeSocket = _pSocket->native(); - - struct pollfd fds[1] = {{nativeSocket, requestedEvents, 0}}; - - - int pollResult = poll(fds, sizeof(fds) / sizeof(fds[0]), timeoutMs); - if (1 == pollResult) - { - if (fds[0].revents & POLLERR) - { - error = errno; - } - else if (fds[0].revents & requestedEvents) - { - ret = true; - } - else - { - OS_LOG_ERROR(FAC_NET, CLASS_INFO() - << "unexpected return from poll(): pollResult = " << pollResult - << ", fds[0].revents =" << fds[0].revents); - } - } - else if(0 == pollResult) - { // timeout - const boost::system::error_code e; - - onTimeoutCb(e); - error = ETIMEDOUT; - } - else - { - error = errno; - } - - if (0 != error) - { - OS_LOG_ERROR(FAC_NET, CLASS_INFO() - << "(" << nativeSocket << ", " << timeoutMs << " ms) error: " << - error << "=" << strerror(error)); - } - - return ret; -} - -bool BlockingTcpClient::timedWaitUntilReadDataAvailable() -{ - // check for normal or out-of-band - return timedWaitUntilDataAvailable(boost::bind(&BlockingTcpClient::onReadTimeout, this, _1), - _readTimeout, - POLLIN | POLLPRI); -} - -bool BlockingTcpClient::timedWaitUntilWriteDataAvailable() -{ - return timedWaitUntilDataAvailable(boost::bind(&BlockingTcpClient::onWriteTimeout, this, _1), - _writeTimeout, - POLLOUT); -} - -bool BlockingTcpClient::connect(const std::string& serviceAddress, const std::string& servicePort) -{ - // Close the previous socket; - close(); - - if (_pSocket) - { - delete _pSocket; - _pSocket = 0; - } - - _pSocket = new boost::asio::ip::tcp::socket(_ioService); - - OS_LOG_INFO(FAC_NET, CLASS_INFO() "creating new connection to " << serviceAddress << ":" << servicePort); - - _serviceAddress = serviceAddress; - _servicePort = servicePort; - - try - { - boost::asio::ip::tcp::resolver::query query(boost::asio::ip::tcp::v4(), serviceAddress.c_str(), servicePort.c_str()); - boost::asio::ip::tcp::resolver::iterator hosts = _resolver.resolve(query); - - ConnectTimer timer(this); - - // this flag may be reset by ConnectTimer's timer during connect() call - _isConnected = true; - - ////////////////////////////////////////////////////////////////////////// - // Only works in 1.47 version of asio. 1.46 doesnt have this utility func - // boost::asio::connect(*_pSocket, hosts); - _pSocket->connect(hosts->endpoint()); // so we use the connect member - ////////////////////////////////////////////////////////////////////////// - OS_LOG_INFO(FAC_NET, CLASS_INFO() "creating new connection to " << serviceAddress << ":" << servicePort << " SUCESSFUL."); - } - catch(std::exception &e) - { - OS_LOG_ERROR(FAC_NET, CLASS_INFO() "failed with error " << e.what()); - _isConnected = false; - } - - return _isConnected; -} - -bool BlockingTcpClient::connect() -{ - // Initialize State Queue Agent Publisher if an address is provided - std::string sqaControlAddress; - std::string sqaControlPort; - std::ostringstream sqaconfig; - sqaconfig << SIPX_CONFDIR << "/" << "sipxsqa-client.ini"; - OsServiceOptions configOptions(sqaconfig.str()); - - if (configOptions.parseOptions()) - { - bool enabled = false; - if (configOptions.getOption("enabled", enabled, enabled) && enabled) - { - configOptions.getOption("sqa-control-address", _serviceAddress); - configOptions.getOption("sqa-control-port", _servicePort); - - if (configOptions.hasOption("tcp-timeout")) - { - configOptions.getOption("tcp-timeout", _readTimeout); - configOptions.getOption("tcp-timeout", _writeTimeout); - } - } - else - { - OS_LOG_ERROR(FAC_NET, "BlockingTcpClient::connect() this:" << this << " Unable to read connection information from " << sqaconfig.str()); - return false; - } - } - - if(_serviceAddress.empty() || _servicePort.empty()) - { - OS_LOG_ERROR(FAC_NET, "BlockingTcpClient::connect() this:" << this << " remote address is not set"); - return false; - } - - return connect(_serviceAddress, _servicePort); -} - -bool BlockingTcpClient::send(const StateQueueMessage& request) -{ - assert(_pSocket); - std::string data = request.data(); - - if (data.size() > SQA_CONN_MAX_READ_BUFF_SIZE - 1) /// Account for the terminating char "_" - { - OS_LOG_ERROR(FAC_NET, CLASS_INFO() "data size: " << data.size() << " maximum buffer length of " << SQA_CONN_MAX_READ_BUFF_SIZE - 1); - return false; - } - - short version = 1; - unsigned long len = (unsigned long)data.size() + 1; /// Account for the terminating char "_" - std::stringstream strm; - strm.write((char*)(&version), sizeof(version)); - strm.write((char*)(&_key), sizeof(_key)); - strm.write((char*)(&len), sizeof(len)); - strm << data << "_"; - std::string packet = strm.str(); - boost::system::error_code ec; - bool ok = false; - - { - if (false == timedWaitUntilWriteDataAvailable()) - { - OS_LOG_ERROR(FAC_NET, CLASS_INFO() - << "timedWaitUntilWriteDataAvailable failed: " - << "Unable to send request"); - - _isConnected = false; - return false; - } - - //ok = boost::asio::write(*_pSocket, boost::asio::buffer(packet.c_str(), packet.size()), boost::asio::transfer_all(), ec) > 0; - ok = _pSocket->write_some(boost::asio::buffer(packet.c_str(), packet.size()), ec) > 0; - } - - if (!ok || ec) - { - OS_LOG_ERROR(FAC_NET, CLASS_INFO() "write_some error: " << ec.message()); - _isConnected = false; - return false; - } - return true; -} - -bool BlockingTcpClient::receive(StateQueueMessage& response) -{ - assert(_pSocket); - unsigned long len = getNextReadSize(); - if (!len) - { - OS_LOG_INFO(FAC_NET, CLASS_INFO() "next read size is empty."); - return false; - } - - char responseBuff[len]; - boost::system::error_code ec; - { - if (false == timedWaitUntilReadDataAvailable()) - { - OS_LOG_ERROR(FAC_NET, CLASS_INFO() - << "timedWaitUntilReadDataAvailable failed: " - << "Unable to receive response"); - - _isConnected = false; - return false; - } - - _pSocket->read_some(boost::asio::buffer((char*)responseBuff, len), ec); - } - - if (ec) - { - if (boost::asio::error::eof == ec) - { - OS_LOG_INFO(FAC_NET, CLASS_INFO() "remote closed the connection, read_some error: " << ec.message()); - } - else - { - OS_LOG_ERROR(FAC_NET, CLASS_INFO() "read_some error: " << ec.message()); - } - - _isConnected = false; - return false; - } - std::string responseData(responseBuff, len); - return response.parseData(responseData); -} - -bool BlockingTcpClient::sendAndReceive(const StateQueueMessage& request, StateQueueMessage& response) -{ - if (send(request)) - return receive(response); - return false; -} - -unsigned long BlockingTcpClient::getNextReadSize() -{ - short version = 1; - bool hasVersion = false; - bool hasKey = false; - unsigned long remoteLen = 0; - - while (!hasVersion || !hasKey) - { - short remoteVersion; - short remoteKey; - - //TODO: Refactor the code below to do one read for the three fields - // - // Read the version (must be 1) - // - while (true) - { - - boost::system::error_code ec; - if (false == timedWaitUntilReadDataAvailable()) - { - OS_LOG_ERROR(FAC_NET, CLASS_INFO() - << "timedWaitUntilReadDataAvailable failed: " - << "Unable to read version"); - - _isConnected = false; - return 0; - } - - _pSocket->read_some(boost::asio::buffer((char*)&remoteVersion, sizeof(remoteVersion)), ec); - if (ec) - { - if (boost::asio::error::eof == ec) - { - OS_LOG_ERROR(FAC_NET, CLASS_INFO() "remote closed the connection, read_some error: " << ec.message()); - } - else - { - OS_LOG_INFO(FAC_NET, CLASS_INFO() - << "Unable to read version " - << "ERROR: " << ec.message()); - } - - _isConnected = false; - return 0; - } - else - { - if (remoteVersion == version) - { - hasVersion = true; - break; - } - } - } - - while (true) - { - - boost::system::error_code ec; - if (false == timedWaitUntilReadDataAvailable()) - { - OS_LOG_ERROR(FAC_NET, CLASS_INFO() - << "timedWaitUntilReadDataAvailable failed: " - << "Unable to read secret key"); - - _isConnected = false; - return 0; - } - - _pSocket->read_some(boost::asio::buffer((char*)&remoteKey, sizeof(remoteKey)), ec); - if (ec) - { - if (boost::asio::error::eof == ec) - { - OS_LOG_ERROR(FAC_NET, CLASS_INFO() "remote closed the connection, read_some error: " << ec.message()); - } - else - { - OS_LOG_INFO(FAC_NET, CLASS_INFO() - << "Unable to read secret key " - << "ERROR: " << ec.message()); - } - - _isConnected = false; - return 0; - } - else - { - if (remoteKey >= SQA_KEY_MIN && remoteKey <= SQA_KEY_MAX) - { - hasKey = true; - break; - } - } - } - } - - boost::system::error_code ec; - if (false == timedWaitUntilReadDataAvailable()) - { - OS_LOG_ERROR(FAC_NET, CLASS_INFO() - << "timedWaitUntilReadDataAvailable failed: " - << "Unable to read secret packet length"); - - _isConnected = false; - return 0; - } - - _pSocket->read_some(boost::asio::buffer((char*)&remoteLen, sizeof(remoteLen)), ec); - if (ec) - { - if (boost::asio::error::eof == ec) - { - OS_LOG_ERROR(FAC_NET, CLASS_INFO() "remote closed the connection, read_some error: " << ec.message()); - } - else - { - OS_LOG_INFO(FAC_NET, CLASS_INFO() - << "Unable to read secret packet length " - << "ERROR: " << ec.message()); - } - - _isConnected = false; - return 0; - } - - return remoteLen; -} - -bool BlockingTcpClient::isConnected() const -{ - return _isConnected; -} - -std::string BlockingTcpClient::getLocalAddress() -{ - try - { - if (!_pSocket) - { - return ""; - } - - return _pSocket->local_endpoint().address().to_string(); - } - catch(...) - { - return ""; - } -} - diff --git a/sipXsqa/src/Makefile.am b/sipXsqa/src/Makefile.am index cc66adb28ae..44907c52fdf 100755 --- a/sipXsqa/src/Makefile.am +++ b/sipXsqa/src/Makefile.am @@ -58,7 +58,6 @@ libsipXsqa_la_LDFLAGS = \ -version-info ${version_Current}:${version_Revision}:${version_Age} libsipXsqa_la_SOURCES = \ - BlockingTcpClient.cpp \ StateQueueAgent.cpp \ StateQueueConnection.cpp \ StateQueueDriverTest.cpp \ From ab7ee5dac7d8a04f6ce5f31120d82e86741fb05c Mon Sep 17 00:00:00 2001 From: Roman Romanchenko Date: Fri, 7 Oct 2016 08:49:11 +0300 Subject: [PATCH 17/23] UC-4044 uniteme setup to bind network interface of choice - preparation for IP moval script development. --- .../freeswitch/eslrequest/AbstractEslRequestController.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sipXcommons/src/main/java/org/sipfoundry/commons/freeswitch/eslrequest/AbstractEslRequestController.java b/sipXcommons/src/main/java/org/sipfoundry/commons/freeswitch/eslrequest/AbstractEslRequestController.java index bb156cfd2ae..152f32347b0 100644 --- a/sipXcommons/src/main/java/org/sipfoundry/commons/freeswitch/eslrequest/AbstractEslRequestController.java +++ b/sipXcommons/src/main/java/org/sipfoundry/commons/freeswitch/eslrequest/AbstractEslRequestController.java @@ -289,7 +289,7 @@ public String getSipxchangeDomainName() { } public String getFreeswitchIpAndPort() { - return getVariable("FreeSWITCH-IPv4") + ":15060"; + return getVariable("variable_sip_local_network_addr") + ":15060"; } public void setBridgedTransfer(boolean bridge) { From 53dbeb1c7355818e4416ade42683aa32dab5ff06 Mon Sep 17 00:00:00 2001 From: Benedikt Machens Date: Wed, 12 Oct 2016 17:09:22 +0200 Subject: [PATCH 18/23] SIPX-524 Backport from 16.12 --- sipXyealink/etc/yealink/phone-8X.properties | 2 ++ sipXyealink/etc/yealink/phone-8X.xml | 1 + 2 files changed, 3 insertions(+) diff --git a/sipXyealink/etc/yealink/phone-8X.properties b/sipXyealink/etc/yealink/phone-8X.properties index 098ff36b8bc..73fdf3fcd20 100644 --- a/sipXyealink/etc/yealink/phone-8X.properties +++ b/sipXyealink/etc/yealink/phone-8X.properties @@ -804,6 +804,8 @@ iant.bw.feature_key_sync.label=Feature Key Sync iant.bw.feature_key_sync.description=Use BroadSoft Like feature Key Synchronisation for Forwaring and DND iant.bw.enable.label=BroadWorks external Phonebook support iant.bw.enable.description=Use BroadWorks XSI Server +iant.features.dnd.feature_key_sync.enable.label=Global DND Feature Sync +iant.features.dnd.feature_key_sync.enable.description=This feature enables or disables the DND Feature Sync for all accounts iant.sip.notify_reboot_enable.label=Check-Sync Reboot iant.sip.notify_reboot_enable.description=Should the phone reboot if it receives SIP-NOTIFY with check-sync iant.phone_setting.backgrounds.label=Preselect Phone Background diff --git a/sipXyealink/etc/yealink/phone-8X.xml b/sipXyealink/etc/yealink/phone-8X.xml index 3fd67944af1..d86f47d53f6 100644 --- a/sipXyealink/etc/yealink/phone-8X.xml +++ b/sipXyealink/etc/yealink/phone-8X.xml @@ -1024,6 +1024,7 @@ From 2bad51539662a0b7cd192d03e883956bb008554e Mon Sep 17 00:00:00 2001 From: dizzy Date: Wed, 26 Oct 2016 08:48:20 +0300 Subject: [PATCH 19/23] UC-4267 - Config generation fails with external Line on Phone --- .../sipxconfig/phone/polycom/RegAdvancedConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sipXpolycom/src/org/sipfoundry/sipxconfig/phone/polycom/RegAdvancedConfiguration.java b/sipXpolycom/src/org/sipfoundry/sipxconfig/phone/polycom/RegAdvancedConfiguration.java index 668afffb7dc..14ef30f6d66 100644 --- a/sipXpolycom/src/org/sipfoundry/sipxconfig/phone/polycom/RegAdvancedConfiguration.java +++ b/sipXpolycom/src/org/sipfoundry/sipxconfig/phone/polycom/RegAdvancedConfiguration.java @@ -59,7 +59,7 @@ public Collection getLines() { for (Line line : lines) { String lineLabel = line.getSettingValue(PolycomPhone.REGISTRATION_LABEL); if (lineLabel == null || lineLabel.isEmpty()) { - line.setSettingValue(PolycomPhone.REGISTRATION_LABEL, line.getUser().getUserName()); + line.setSettingValue(PolycomPhone.REGISTRATION_LABEL, line.getUserName()); } linesSettings.add(line.getSettings()); } From e774da9997807a3b2ff2892aeaca2a7873d03daf Mon Sep 17 00:00:00 2001 From: Benedikt Machens Date: Fri, 11 Nov 2016 08:43:49 +0100 Subject: [PATCH 20/23] SIPX-529 Allow to self should only override name check for BLF, not BLF/Speed dial check --- .../org/sipfoundry/sipxconfig/commserver/imdb/SpeedDials.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sipXconfig/neoconf/src/org/sipfoundry/sipxconfig/commserver/imdb/SpeedDials.java b/sipXconfig/neoconf/src/org/sipfoundry/sipxconfig/commserver/imdb/SpeedDials.java index e1bad710c88..1b3052cfba4 100644 --- a/sipXconfig/neoconf/src/org/sipfoundry/sipxconfig/commserver/imdb/SpeedDials.java +++ b/sipXconfig/neoconf/src/org/sipfoundry/sipxconfig/commserver/imdb/SpeedDials.java @@ -73,8 +73,8 @@ public void generate(Replicable entity, DBObject top) { List