Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions splunk_handler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import traceback

from threading import Timer
from multiprocessing import JoinableQueue

import requests
from requests.packages.urllib3.util.retry import Retry
Expand Down Expand Up @@ -48,7 +49,7 @@ def __init__(self, host, port, token, index,
hostname=None, source=None, sourcetype='text',
verify=True, timeout=60, flush_interval=15.0,
queue_size=5000, debug=False, retry_count=5,
retry_backoff=2.0):
retry_backoff=2.0, multiple_process=False):

global instances
instances.append(self)
Expand All @@ -67,8 +68,12 @@ def __init__(self, host, port, token, index,
self.SIGTERM = False # 'True' if application requested exit
self.timer = None
self.testing = False # Used for slightly altering logic during unit testing
self.multiple_process = multiple_process
# It is possible to get 'behind' and never catch up, so we limit the queue size
self.queue = Queue(maxsize=queue_size)
if self.multiple_process:
self.queue = JoinableQueue(maxsize=queue_size)
else:
self.queue = Queue(maxsize=queue_size)
self.debug = debug
self.session = requests.Session()
self.retry_count = retry_count
Expand Down
54 changes: 53 additions & 1 deletion tests/test_splunk_handler.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
import logging
import unittest
import sys
from multiprocessing.queues import Queue as MQueue

import mock
is_py2 = sys.version[0] == '2'

if is_py2:
import mock
from Queue import Queue
else:
from unittest import mock
from queue import Queue

from splunk_handler import SplunkHandler

Expand All @@ -20,6 +29,7 @@
SPLUNK_DEBUG = False
SPLUNK_RETRY_COUNT = 1
SPLUNK_RETRY_BACKOFF = 0.1
SPLUNK_MULTIPLE_PROCESS = False

RECEIVER_URL = 'https://%s:%s/services/collector' % (SPLUNK_HOST, SPLUNK_PORT)

Expand All @@ -41,6 +51,7 @@ def setUp(self):
debug=SPLUNK_DEBUG,
retry_count=SPLUNK_RETRY_COUNT,
retry_backoff=SPLUNK_RETRY_BACKOFF,
multiple_process=SPLUNK_MULTIPLE_PROCESS,
)
self.splunk.testing = True

Expand All @@ -65,10 +76,51 @@ def test_init(self):
self.assertEqual(self.splunk.debug, SPLUNK_DEBUG)
self.assertEqual(self.splunk.retry_count, SPLUNK_RETRY_COUNT)
self.assertEqual(self.splunk.retry_backoff, SPLUNK_RETRY_BACKOFF)
self.assertEqual(self.splunk.multiple_process, SPLUNK_MULTIPLE_PROCESS)

self.assertFalse(logging.getLogger('requests').propagate)
self.assertFalse(logging.getLogger('splunk_handler').propagate)

def test_init_queue_class_type_when_multiple_process(self):
splunk = SplunkHandler(
host=SPLUNK_HOST,
port=SPLUNK_PORT,
token=SPLUNK_TOKEN,
index=SPLUNK_INDEX,
hostname=SPLUNK_HOSTNAME,
source=SPLUNK_SOURCE,
sourcetype=SPLUNK_SOURCETYPE,
verify=SPLUNK_VERIFY,
timeout=SPLUNK_TIMEOUT,
flush_interval=SPLUNK_FLUSH_INTERVAL,
queue_size=SPLUNK_QUEUE_SIZE,
debug=SPLUNK_DEBUG,
retry_count=SPLUNK_RETRY_COUNT,
retry_backoff=SPLUNK_RETRY_BACKOFF,
multiple_process=True,
)
self.assertTrue(isinstance(splunk.queue, MQueue))

def test_init_queue_class_type_when_not_multiple_process(self):
splunk = SplunkHandler(
host=SPLUNK_HOST,
port=SPLUNK_PORT,
token=SPLUNK_TOKEN,
index=SPLUNK_INDEX,
hostname=SPLUNK_HOSTNAME,
source=SPLUNK_SOURCE,
sourcetype=SPLUNK_SOURCETYPE,
verify=SPLUNK_VERIFY,
timeout=SPLUNK_TIMEOUT,
flush_interval=SPLUNK_FLUSH_INTERVAL,
queue_size=SPLUNK_QUEUE_SIZE,
debug=SPLUNK_DEBUG,
retry_count=SPLUNK_RETRY_COUNT,
retry_backoff=SPLUNK_RETRY_BACKOFF,
multiple_process=False,
)
self.assertTrue(isinstance(splunk.queue, Queue))

@mock.patch('requests.Session.post')
def test_splunk_worker(self, mock_request):
# Silence root logger
Expand Down