diff --git a/splunk_handler/__init__.py b/splunk_handler/__init__.py index bdb9b14..8a79046 100644 --- a/splunk_handler/__init__.py +++ b/splunk_handler/__init__.py @@ -12,6 +12,7 @@ instances = [] # For keeping track of running class instances DEFAULT_QUEUE_SIZE = 5000 +DEFAULT_PAYLOAD_SIZE = 524288 # Called when application exit imminent (main thread ended / got kill signal) @@ -51,7 +52,9 @@ def __init__(self, host, port, token, index, force_keep_ahead=False, hostname=None, protocol='https', proxies=None, queue_size=DEFAULT_QUEUE_SIZE, record_format=False, retry_backoff=2.0, retry_count=5, source=None, - sourcetype='text', timeout=60, url=None, verify=True): + sourcetype='text', timeout=60, url=None, verify=True, + keep_payload_on_fail=False, payload_size=DEFAULT_PAYLOAD_SIZE, + ): """ Args: host (str): The Splunk host param @@ -68,6 +71,8 @@ def __init__(self, host, port, token, index, queue_size (int): The max number of logs to queue, set to 0 for no max record_format (bool): Whether the log record will be json retry_backoff (float): The requests lib backoff factor + keep_payload_on_fail (bool): Clear payload only after successfully sending + payload_size (int): The max size of payload (bytes) retry_count (int): The number of times to retry a failed request source (str): The Splunk source param sourcetype (str): The Splunk sourcetype param @@ -101,6 +106,8 @@ def __init__(self, host, port, token, index, self.session = requests.Session() self.retry_count = retry_count self.retry_backoff = retry_backoff + self.keep_payload_on_fail = keep_payload_on_fail + self.payload_size = payload_size self.protocol = protocol self.proxies = proxies self.record_format = record_format @@ -269,6 +276,9 @@ def _splunk_worker(self, payload=None): self.write_debug_log("Payload sent successfully") except Exception as e: + # return unsuccessfully sent part of payload to log_payload + if self.keep_payload_on_fail: + self.log_payload += payload try: self.write_log("Exception in Splunk logging handler: %s" % str(e)) self.write_log(traceback.format_exc()) @@ -307,13 +317,15 @@ def empty_queue(self): self.log_payload += ''.join(self.queue) self.queue.clear() else: - # without looking at each item, estimate how many can fit in 50 MB - apprx_size_base = len(self.queue[0]) - # dont eval max/event size ration as less than 1 - # dont count more than what is in queue to ensure the same number as pulled are deleted - count = min(max(int(524288 / apprx_size_base), 1), len(self.queue)) - self.log_payload += ''.join(self.queue[:count]) - del self.queue[:count] + # Flush queue data to payload, only if payload is not too big + if len(self.log_payload) < self.payload_size: + # without looking at each item, estimate how many can fit in payload_size + apprx_size_base = len(self.queue[0]) + # dont eval max/event size ration as less than 1 + # dont count more than what is in queue to ensure the same number as pulled are deleted + count = min(max(int(self.payload_size / apprx_size_base), 1), len(self.queue)) + self.log_payload += ''.join(self.queue[:count]) + del self.queue[:count] self.write_debug_log("Queue task completed") return len(self.queue) > 0