Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions splunk_handler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

instances = [] # For keeping track of running class instances
DEFAULT_QUEUE_SIZE = 5000
DEFAULT_PAYLOAD_SIZE = 524288


# Called when application exit imminent (main thread ended / got kill signal)
Expand Down Expand Up @@ -51,7 +52,9 @@ def __init__(self, host, port, token, index,
force_keep_ahead=False, hostname=None, protocol='https',
proxies=None, queue_size=DEFAULT_QUEUE_SIZE, record_format=False,
retry_backoff=2.0, retry_count=5, source=None,
sourcetype='text', timeout=60, url=None, verify=True):
sourcetype='text', timeout=60, url=None, verify=True,
keep_payload_on_fail=False, payload_size=DEFAULT_PAYLOAD_SIZE,
):
"""
Args:
host (str): The Splunk host param
Expand All @@ -68,6 +71,8 @@ def __init__(self, host, port, token, index,
queue_size (int): The max number of logs to queue, set to 0 for no max
record_format (bool): Whether the log record will be json
retry_backoff (float): The requests lib backoff factor
keep_payload_on_fail (bool): Clear payload only after successfully sending
payload_size (int): The max size of payload (bytes)
retry_count (int): The number of times to retry a failed request
source (str): The Splunk source param
sourcetype (str): The Splunk sourcetype param
Expand Down Expand Up @@ -101,6 +106,8 @@ def __init__(self, host, port, token, index,
self.session = requests.Session()
self.retry_count = retry_count
self.retry_backoff = retry_backoff
self.keep_payload_on_fail = keep_payload_on_fail
self.payload_size = payload_size
self.protocol = protocol
self.proxies = proxies
self.record_format = record_format
Expand Down Expand Up @@ -269,6 +276,9 @@ def _splunk_worker(self, payload=None):
self.write_debug_log("Payload sent successfully")

except Exception as e:
# return unsuccessfully sent part of payload to log_payload
if self.keep_payload_on_fail:
self.log_payload += payload
try:
self.write_log("Exception in Splunk logging handler: %s" % str(e))
self.write_log(traceback.format_exc())
Expand Down Expand Up @@ -307,13 +317,15 @@ def empty_queue(self):
self.log_payload += ''.join(self.queue)
self.queue.clear()
else:
# without looking at each item, estimate how many can fit in 50 MB
apprx_size_base = len(self.queue[0])
# dont eval max/event size ration as less than 1
# dont count more than what is in queue to ensure the same number as pulled are deleted
count = min(max(int(524288 / apprx_size_base), 1), len(self.queue))
self.log_payload += ''.join(self.queue[:count])
del self.queue[:count]
# Flush queue data to payload, only if payload is not too big
if len(self.log_payload) < self.payload_size:
# without looking at each item, estimate how many can fit in payload_size
apprx_size_base = len(self.queue[0])
# dont eval max/event size ration as less than 1
# dont count more than what is in queue to ensure the same number as pulled are deleted
count = min(max(int(self.payload_size / apprx_size_base), 1), len(self.queue))
self.log_payload += ''.join(self.queue[:count])
del self.queue[:count]
self.write_debug_log("Queue task completed")

return len(self.queue) > 0
Expand Down