From c6dc048bdca03f2067da5fdfde25c5eaffadd513 Mon Sep 17 00:00:00 2001 From: Smite Chow Date: Wed, 28 Feb 2018 18:40:21 +0800 Subject: [PATCH 1/2] fix multiple process queue not sharing --- splunk_handler/__init__.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/splunk_handler/__init__.py b/splunk_handler/__init__.py index b94bb15..46d4e33 100644 --- a/splunk_handler/__init__.py +++ b/splunk_handler/__init__.py @@ -7,6 +7,7 @@ import traceback from threading import Timer +from multiprocessing import JoinableQueue import requests from requests.packages.urllib3.util.retry import Retry @@ -48,7 +49,7 @@ def __init__(self, host, port, token, index, hostname=None, source=None, sourcetype='text', verify=True, timeout=60, flush_interval=15.0, queue_size=5000, debug=False, retry_count=5, - retry_backoff=2.0): + retry_backoff=2.0, multiple_process=False): global instances instances.append(self) @@ -67,8 +68,12 @@ def __init__(self, host, port, token, index, self.SIGTERM = False # 'True' if application requested exit self.timer = None self.testing = False # Used for slightly altering logic during unit testing + self.multiple_process = multiple_process # It is possible to get 'behind' and never catch up, so we limit the queue size - self.queue = Queue(maxsize=queue_size) + if self.multiple_process: + self.queue = JoinableQueue(maxsize=queue_size) + else: + self.queue = Queue(maxsize=queue_size) self.debug = debug self.session = requests.Session() self.retry_count = retry_count From dc8c179e99958127c9b3b5811aee23a0eed3dfbd Mon Sep 17 00:00:00 2001 From: Smite Chow Date: Fri, 2 Mar 2018 11:46:25 +0800 Subject: [PATCH 2/2] add ut --- tests/test_splunk_handler.py | 54 +++++++++++++++++++++++++++++++++++- 1 file changed, 53 insertions(+), 1 deletion(-) diff --git a/tests/test_splunk_handler.py b/tests/test_splunk_handler.py index 21447fd..a26a3b3 100644 --- a/tests/test_splunk_handler.py +++ b/tests/test_splunk_handler.py @@ -1,7 +1,16 @@ import logging import unittest +import sys +from multiprocessing.queues import Queue as MQueue -import mock +is_py2 = sys.version[0] == '2' + +if is_py2: + import mock + from Queue import Queue +else: + from unittest import mock + from queue import Queue from splunk_handler import SplunkHandler @@ -20,6 +29,7 @@ SPLUNK_DEBUG = False SPLUNK_RETRY_COUNT = 1 SPLUNK_RETRY_BACKOFF = 0.1 +SPLUNK_MULTIPLE_PROCESS = False RECEIVER_URL = 'https://%s:%s/services/collector' % (SPLUNK_HOST, SPLUNK_PORT) @@ -41,6 +51,7 @@ def setUp(self): debug=SPLUNK_DEBUG, retry_count=SPLUNK_RETRY_COUNT, retry_backoff=SPLUNK_RETRY_BACKOFF, + multiple_process=SPLUNK_MULTIPLE_PROCESS, ) self.splunk.testing = True @@ -65,10 +76,51 @@ def test_init(self): self.assertEqual(self.splunk.debug, SPLUNK_DEBUG) self.assertEqual(self.splunk.retry_count, SPLUNK_RETRY_COUNT) self.assertEqual(self.splunk.retry_backoff, SPLUNK_RETRY_BACKOFF) + self.assertEqual(self.splunk.multiple_process, SPLUNK_MULTIPLE_PROCESS) self.assertFalse(logging.getLogger('requests').propagate) self.assertFalse(logging.getLogger('splunk_handler').propagate) + def test_init_queue_class_type_when_multiple_process(self): + splunk = SplunkHandler( + host=SPLUNK_HOST, + port=SPLUNK_PORT, + token=SPLUNK_TOKEN, + index=SPLUNK_INDEX, + hostname=SPLUNK_HOSTNAME, + source=SPLUNK_SOURCE, + sourcetype=SPLUNK_SOURCETYPE, + verify=SPLUNK_VERIFY, + timeout=SPLUNK_TIMEOUT, + flush_interval=SPLUNK_FLUSH_INTERVAL, + queue_size=SPLUNK_QUEUE_SIZE, + debug=SPLUNK_DEBUG, + retry_count=SPLUNK_RETRY_COUNT, + retry_backoff=SPLUNK_RETRY_BACKOFF, + multiple_process=True, + ) + self.assertTrue(isinstance(splunk.queue, MQueue)) + + def test_init_queue_class_type_when_not_multiple_process(self): + splunk = SplunkHandler( + host=SPLUNK_HOST, + port=SPLUNK_PORT, + token=SPLUNK_TOKEN, + index=SPLUNK_INDEX, + hostname=SPLUNK_HOSTNAME, + source=SPLUNK_SOURCE, + sourcetype=SPLUNK_SOURCETYPE, + verify=SPLUNK_VERIFY, + timeout=SPLUNK_TIMEOUT, + flush_interval=SPLUNK_FLUSH_INTERVAL, + queue_size=SPLUNK_QUEUE_SIZE, + debug=SPLUNK_DEBUG, + retry_count=SPLUNK_RETRY_COUNT, + retry_backoff=SPLUNK_RETRY_BACKOFF, + multiple_process=False, + ) + self.assertTrue(isinstance(splunk.queue, Queue)) + @mock.patch('requests.Session.post') def test_splunk_worker(self, mock_request): # Silence root logger