From e6100b86d90f00edf829622fad63f2c1963f98da Mon Sep 17 00:00:00 2001 From: paulaan Date: Sun, 28 Jul 2019 11:49:10 +0700 Subject: [PATCH 1/5] [master] Add FCM - Google Firebase --- pushkin/sender/nordifier/apns2_push_sender.py | 21 +- pushkin/sender/nordifier/apns_push_sender.py | 11 +- pushkin/sender/nordifier/fcm.py | 291 ++++++++++++++++++ pushkin/sender/nordifier/fcm_push_sender.py | 78 +++++ pushkin/sender/nordifier/sender.py | 6 +- 5 files changed, 388 insertions(+), 19 deletions(-) create mode 100644 pushkin/sender/nordifier/fcm.py create mode 100644 pushkin/sender/nordifier/fcm_push_sender.py diff --git a/pushkin/sender/nordifier/apns2_push_sender.py b/pushkin/sender/nordifier/apns2_push_sender.py index cd90586..a4d193f 100644 --- a/pushkin/sender/nordifier/apns2_push_sender.py +++ b/pushkin/sender/nordifier/apns2_push_sender.py @@ -39,7 +39,7 @@ def pop_unregistered_devices(self): self.unregistered_devices = [] return items - def prepare_data(self, notification): + def create_message(self, notification): def to_timestamp(dt, epoch=datetime(1970, 1, 1)): # http://stackoverflow.com/questions/8777753/converting-datetime-date-to-utc-timestamp-in-python td = dt - epoch @@ -66,28 +66,30 @@ def to_timestamp(dt, epoch=datetime(1970, 1, 1)): 'identifier': notification['sending_id'], 'expiry': expiry_utc_ts_seconds, 'priority': 10, + 'payload': Payload(alert=notification['content'], badge=1, sound='default', custom=custom) } - result['payload'] = Payload(alert=notification['content'], badge=1, sound='default', custom=custom) return result - def send(self, notification): - data = self.prepare_data(notification) + data = self.create_message(notification) if data is not None: for i in xrange(self.connection_error_retries): try: - self.apn.send_notification(data['token'], data['payload'], expiration=data['expiry'], topic=self.topic) + self.apn.send_notification(data['token'], + data['payload'], + expiration=data['expiry'], + topic=self.topic) notification['status'] = const.NOTIFICATION_SUCCESS - break #We did it, time to break free! + break # We did it, time to break free! except APNsException as e: if isinstance(e, Unregistered): notification['status'] = const.NOTIFICATION_APNS_DEVICE_UNREGISTERED unregistered_data = { - 'login_id': notification['login_id'], - 'device_token': notification['receiver_id'], - } + 'login_id': notification['login_id'], + 'device_token': notification['receiver_id'], + } self.unregistered_devices.append(unregistered_data) else: self.log.warning('APN got exception {}'.format(type(e).__name__)) @@ -95,4 +97,3 @@ def send(self, notification): def send_batch(self): while len(self.queue): self.send(self.queue.pop()) - diff --git a/pushkin/sender/nordifier/apns_push_sender.py b/pushkin/sender/nordifier/apns_push_sender.py index cef4948..570271d 100644 --- a/pushkin/sender/nordifier/apns_push_sender.py +++ b/pushkin/sender/nordifier/apns_push_sender.py @@ -55,11 +55,9 @@ def process_failed_notification(self, notification_ids_list): notif = self.sent_queue[id] notif['status'] = const.NOTIFICATION_CONNECTION_ERROR - def prepare_data(self, notification): + def create_message(self, notification): def totimestamp(dt, epoch=datetime(1970, 1, 1)): - # http://stackoverflow.com/questions/8777753/converting-datetime-date-to-utc-timestamp-in-python td = dt - epoch - # return td.total_seconds() # Python 2.7 return (td.microseconds + (td.seconds + td.days * 86400) * 10 ** 6) / 10 ** 6 expiry_seconds = (notification['time_to_live_ts_bigint'] - int(round(time.time() * 1000))) / 1000 @@ -82,8 +80,8 @@ def totimestamp(dt, epoch=datetime(1970, 1, 1)): 'identifier': notification['sending_id'], 'expiry': expiry_utc_ts_seconds, 'priority': 10, + 'payload': Payload(alert=notification['content'], badge=1, sound='default', custom=custom) } - result['payload'] = Payload(alert=notification['content'], badge=1, sound='default', custom=custom) return result @@ -91,7 +89,7 @@ def send(self, notification): notification['status'] = const.NOTIFICATION_SUCCESS self.sent_queue[notification['sending_id']] = notification - data = self.prepare_data(notification) + data = self.create_message(notification) if data: self.apns.gateway_server.send_notification( token_hex=data['token'], @@ -110,7 +108,7 @@ def send_batch(self): notification['status'] = const.NOTIFICATION_SUCCESS self.sent_queue[notification['sending_id']] = notification - data = self.prepare_data(notification) + data = self.create_message(notification) if data: frame.add_item( token_hex=data['token'], @@ -122,4 +120,3 @@ def send_batch(self): # batch (frame) prepared, send it self.apns.gateway_server.send_notification_multiple(frame) - diff --git a/pushkin/sender/nordifier/fcm.py b/pushkin/sender/nordifier/fcm.py new file mode 100644 index 0000000..e33520d --- /dev/null +++ b/pushkin/sender/nordifier/fcm.py @@ -0,0 +1,291 @@ +''' +The MIT License (MIT) + +Copyright (c) 2012 Minh Nam Ngo. + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +''' +import urllib +import urllib2 +import json +from collections import defaultdict +import time +import random +import firebase_admin +from firebase_admin import credentials + +FCM_URL = 'https://fcm.googleapis.com/fcm/send' + + +class FCMException(Exception): pass + + +class FCMMalformedJsonException(FCMException): pass + + +class FCMConnectionException(FCMException): pass + + +class FCMAuthenticationException(FCMException): pass + + +class FCMTooManyRegIdsException(FCMException): pass + + +class FCMInvalidTtlException(FCMException): pass + + +# Exceptions from Google responses +class FCMMissingRegistrationException(FCMException): pass + + +class FCMMismatchSenderIdException(FCMException): pass + + +class FCMNotRegisteredException(FCMException): pass + + +class FCMMessageTooBigException(FCMException): pass + + +class FCMInvalidRegistrationException(FCMException): pass + + +class FCMUnavailableException(FCMException): pass + + +# TODO: Refactor this to be more human-readable +def group_response(response, registration_ids, key): + # Pair up results and reg_ids + mapping = zip(registration_ids, response['results']) + # Filter by key + filtered = filter(lambda x: key in x[1], mapping) + # Only consider the value in the dict + tupled = [(s[0], s[1][key]) for s in filtered] + # Grouping of errors and mapping of ids + if key is 'registration_id': + grouping = {} + for k, v in tupled: + grouping[k] = v + else: + grouping = defaultdict(list) + for k, v in tupled: + grouping[v].append(k) + + if len(grouping) == 0: + return + return grouping + + +def urlencode_utf8(params): + """ + UTF-8 safe variant of urllib.urlencode. + http://stackoverflow.com/a/8152242 + """ + + if hasattr(params, 'items'): + params = params.items() + + params = ( + '='.join(( + urllib.quote_plus(k.encode('utf8'), safe='/'), + urllib.quote_plus(v.encode('utf8'), safe='/') + )) for k, v in params + ) + + return '&'.join(params) + + +def generate_token(service_account_file): + cred = credentials.Certificate(service_account_file) + default_app = firebase_admin.initialize_app(cred) + return default_app.credential.get_access_token() + + +class FCM(object): + # Timeunit is milliseconds. + BACKOFF_INITIAL_DELAY = 1000; + MAX_BACKOFF_DELAY = 1024000; + + def __init__(self, token, url=FCM_URL, proxy=None): + """ api_key : google api key + url: url of FCM service. + proxy: can be string "http://host:port" or dict {'https':'host:port'} + """ + self.access_token = token + self.url = url + if proxy: + if isinstance(proxy, basestring): + protocol = url.split(':')[0] + proxy = {protocol: proxy} + + auth = urllib2.HTTPBasicAuthHandler() + opener = urllib2.build_opener(urllib2.ProxyHandler(proxy), auth, urllib2.HTTPHandler) + urllib2.install_opener(opener) + + def construct_payload(self, registration_ids, data=None, collapse_key=None, + delay_while_idle=False, time_to_live=None, is_json=True, dry_run=False): + """ + Construct the dictionary mapping of parameters. + Encodes the dictionary into JSON if for json requests. + Helps appending 'data.' prefix to the plaintext data: 'hello' => 'data.hello' + + :return constructed dict or JSON payload + :raises FCMInvalidTtlException: if time_to_live is invalid + """ + + if time_to_live: + if time_to_live > 2419200 or time_to_live < 0: + raise FCMInvalidTtlException("Invalid time to live value") + + if is_json: + payload = {'registration_ids': registration_ids} + if data: + payload['data'] = data + else: + payload = {'registration_id': registration_ids} + if data: + plaintext_data = data.copy() + for k in plaintext_data.keys(): + plaintext_data['data.%s' % k] = plaintext_data.pop(k) + payload.update(plaintext_data) + + if delay_while_idle: + payload['delay_while_idle'] = delay_while_idle + + if time_to_live >= 0: + payload['time_to_live'] = time_to_live + + if collapse_key: + payload['collapse_key'] = collapse_key + + if dry_run: + payload['dry_run'] = True + + if is_json: + payload = json.dumps(payload) + + return payload + + def make_request(self, data, is_json=True): + """ + Makes a HTTP request to FCM servers with the constructed payload + + :param data: return value from construct_payload method + :raises FCMMalformedJsonException: if malformed JSON request found + :raises FCMAuthenticationException: if there was a problem with authentication, invalid api key + :raises FCMConnectionException: if FCM is screwed + """ + + headers = { + 'Authorization': 'key=%s' % self.access_token, + } + # Default Content-Type is defaulted to application/x-www-form-urlencoded;charset=UTF-8 + if is_json: + headers['Content-Type'] = 'application/json' + + if not is_json: + data = urlencode_utf8(data) + req = urllib2.Request(self.url, data, headers) + + try: + response = urllib2.urlopen(req).read() + except urllib2.HTTPError as e: + if e.code == 400: + raise FCMMalformedJsonException("The request could not be parsed as JSON") + elif e.code == 401: + raise FCMAuthenticationException("There was an error authenticating the nordifier account") + elif e.code == 503: + raise FCMUnavailableException("FCM service is unavailable") + else: + error = "FCM service error: %d" % e.code + raise FCMUnavailableException(error) + except urllib2.URLError as e: + raise FCMConnectionException( + "There was an internal error in the FCM server while trying to process the request") + + if is_json: + response = json.loads(response) + return response + + def raise_error(self, error): + if error == 'InvalidRegistration': + raise FCMInvalidRegistrationException("Registration ID is invalid") + elif error == 'Unavailable': + # Plain-text requests will never return Unavailable as the error code. + # http://developer.android.com/guide/google/FCM/FCM.html#error_codes + raise FCMUnavailableException("Server unavailable. Resent the message") + elif error == 'NotRegistered': + raise FCMNotRegisteredException("Registration id is not valid anymore") + elif error == 'MismatchSenderId': + raise FCMMismatchSenderIdException("A Registration ID is tied to a certain group of senders") + elif error == 'MessageTooBig': + raise FCMMessageTooBigException("Message can't exceed 4096 bytes") + + def handle_plaintext_response(self, response): + + # Split response by line + response_lines = response.strip().split('\n') + # Split the first line by = + key, value = response_lines[0].split('=') + if key == 'Error': + self.raise_error(value) + else: + if len(response_lines) == 2: + return response_lines[1].split('=')[1] + return + + def handle_json_response(self, response, registration_ids): + errors = group_response(response, registration_ids, 'error') + canonical = group_response(response, registration_ids, 'registration_id') + + info = {} + if errors: + info.update({'errors': errors}) + if canonical: + info.update({'canonical': canonical}) + + return info + + def extract_unsent_reg_ids(self, info): + if 'errors' in info and 'Unavailable' in info['errors']: + return info['errors']['Unavailable'] + return [] + + def plaintext_request(self, registration_id, data=None, collapse_key=None, + delay_while_idle=False, time_to_live=None, retries=5, dry_run=False): + """ + Makes a plaintext request to FCM servers + + :param registration_id: string of the registration id + :param data: dict mapping of key-value pairs of messages + :return dict of response body from Google including multicast_id, success, failure, canonical_ids, etc + :raises FCMMissingRegistrationException: if registration_id is not provided + """ + + if not registration_id: + raise FCMMissingRegistrationException("Missing registration_id") + + payload = self.construct_payload( + registration_id, data, collapse_key, + delay_while_idle, time_to_live, False, dry_run + ) + + attempt = 0 + backoff = self.BACKOFF_INITIAL_DELAY + for attempt in range(retries): + try: + response = self.make_request(payload, is_json=False) + return self.handle_plaintext_response(response) + except FCMUnavailableException: + sleep_time = backoff / 2 + random.randrange(backoff) + time.sleep(float(sleep_time) / 1000) + if 2 * backoff < self.MAX_BACKOFF_DELAY: + backoff *= 2 + + raise IOError("Could not make request after %d attempts" % attempt) + diff --git a/pushkin/sender/nordifier/fcm_push_sender.py b/pushkin/sender/nordifier/fcm_push_sender.py new file mode 100644 index 0000000..ba516c6 --- /dev/null +++ b/pushkin/sender/nordifier/fcm_push_sender.py @@ -0,0 +1,78 @@ +''' +The MIT License (MIT) +Copyright (c) 2016 Nordeus LLC + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +''' +import time +from sender import Sender +import constants as const +from fcm import FCM, generate_token +from firebase_admin import messaging +import os + + +class FCMPushSender(Sender): + """ + FCM Push Sender uses python-FCM module: https://github.com/geeknam/python-FCM + FCM documentation: https://developer.android.com/google/FCM/FCM.html + """ + + def __init__(self, config, log): + Sender.__init__(self, config, log) + self.access_key = generate_token(os.getenv("GOOGLE_APPLICATION_CREDENTIALS")) + self.base_deeplink_url = config.get('Messenger', 'base_deeplink_url') + self.FCM = FCM(self.access_key) + self.canonical_ids = [] + self.unregistered_devices = [] + + def pop_canonical_ids(self): + items = self.canonical_ids + self.canonical_ids = [] + return items + + def pop_unregistered_devices(self): + items = self.unregistered_devices + self.unregistered_devices = [] + return items + + def create_message(self, notification): + expiry_seconds = (notification['time_to_live_ts_bigint'] - int(round(time.time() * 1000))) / 1000 + if expiry_seconds < 0: + self.log.warning( + 'FCM: expired notification with sending_id: {0}; expiry_seconds: {1}'.format(notification['sending_id'], + expiry_seconds)) + notification['status'] = const.NOTIFICATION_EXPIRED + return + + utm_source = 'pushnotification' + utm_campaign = str(notification['campaign_id']) + utm_medium = str(notification['message_id']) + data = { + 'title': notification['title'], + 'message': notification['content'], + 'url': self.base_deeplink_url + '://' + notification['screen'] + + '?utm_source=' + utm_source + '&utm_campaign=' + utm_campaign + '&utm_medium=' + utm_medium, + 'notifid': notification['campaign_id'], + } + msg = messaging.Message( + data=data, + token=notification['receiver_id'], + ) + return msg + + def send(self, notification): + message = self.create_message(notification) + response = messaging.send(message) + + return response + + def send_batch(self): + messages = [] + while len(self.queue): + messages.append(self.create_message(self.queue.pop())) + messaging.send_all(messages) diff --git a/pushkin/sender/nordifier/sender.py b/pushkin/sender/nordifier/sender.py index a2f1ff8..bb46c61 100644 --- a/pushkin/sender/nordifier/sender.py +++ b/pushkin/sender/nordifier/sender.py @@ -8,7 +8,6 @@ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ''' -import time class Sender: @@ -33,4 +32,7 @@ def send(self, notification): raise Exception('Not implemented') def send_batch(self): - raise Exception('Not implemented') \ No newline at end of file + raise Exception('Not implemented') + + def create_message(self, notification): + raise Exception('Not implemented') From f677572b7e353a971f3143e4834b4fb08ee70a61 Mon Sep 17 00:00:00 2001 From: paulaan Date: Sun, 28 Jul 2019 12:15:40 +0700 Subject: [PATCH 2/5] [master] Update config for FCMPushSender --- config.ini | 7 +-- pushkin/sender/nordifier/fcm_push_sender.py | 3 +- pushkin/sender/senders.py | 61 ++++++++++++++++----- 3 files changed, 52 insertions(+), 19 deletions(-) diff --git a/config.ini b/config.ini index cd075ef..dcd9ed2 100644 --- a/config.ini +++ b/config.ini @@ -17,8 +17,7 @@ apns_batch_size = 500 apns_topic = apns_certificate_path = -gcm_access_key = - +google_application_credentials = # for future use apns_sandbox = false connection_error_retries = 3 @@ -38,8 +37,8 @@ request_processor_num_threads = 10 sender_queue_limit = 50000 enabled_senders = - pushkin.sender.senders.ApnNotificationSender {"workers": 50} - pushkin.sender.senders.GcmNotificationSender {"workers": 50} + pushkin.sender.senders.ApnNotificationSender {"workers": 50} + pushkin.sender.senders.FcmNotificationSender {"workers": 50} [Log] # log configuration diff --git a/pushkin/sender/nordifier/fcm_push_sender.py b/pushkin/sender/nordifier/fcm_push_sender.py index ba516c6..73c7dab 100644 --- a/pushkin/sender/nordifier/fcm_push_sender.py +++ b/pushkin/sender/nordifier/fcm_push_sender.py @@ -13,7 +13,6 @@ import constants as const from fcm import FCM, generate_token from firebase_admin import messaging -import os class FCMPushSender(Sender): @@ -24,7 +23,7 @@ class FCMPushSender(Sender): def __init__(self, config, log): Sender.__init__(self, config, log) - self.access_key = generate_token(os.getenv("GOOGLE_APPLICATION_CREDENTIALS")) + self.access_key = generate_token(config.get('Messenger', 'google_application_credentials')) self.base_deeplink_url = config.get('Messenger', 'base_deeplink_url') self.FCM = FCM(self.access_key) self.canonical_ids = [] diff --git a/pushkin/sender/senders.py b/pushkin/sender/senders.py index 6c4afda..b97228f 100644 --- a/pushkin/sender/senders.py +++ b/pushkin/sender/senders.py @@ -8,16 +8,14 @@ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ''' -from Queue import Empty from threading import Thread -import time import logging import multiprocessing from pushkin.database import database -from pushkin.sender.nordifier.apns_push_sender import APNsPushSender from pushkin.sender.nordifier.gcm_push_sender import GCMPushSender from pushkin.sender.nordifier.apns2_push_sender import APNS2PushSender +from pushkin.sender.nordifier.fcm_push_sender import FCMPushSender from pushkin.util.pool import ProcessPool from pushkin import config, context from pushkin.sender.nordifier import constants @@ -25,7 +23,6 @@ class NotificationSender(ProcessPool): - NUM_WORKERS_DEFAULT = 50 def __init__(self, **kwargs): @@ -51,12 +48,15 @@ def log_notifications(self, notifications): except: main_logger.exception("Error while logging notification to csv log!") + class NotificationOperation(): - '''Represents operation which should be executed outside of process pool''' + """Represents operation which should be executed outside of process pool""" + def __init__(self, operation, data): self.operation = operation self.data = data + class NotificationPostProcessor(Thread): UPDATE_CANONICALS = 1 UPDATE_UNREGISTERED_DEVICES = 2 @@ -84,11 +84,13 @@ def run(self): elif record.operation == NotificationPostProcessor.UPDATE_UNREGISTERED_DEVICES: self.update_unregistered_devices(record.data) else: - context.main_logger.error("NotificationPostProcessor - unknown operation: {operation}".format(operation=record.operation)) + context.main_logger.error( + "NotificationPostProcessor - unknown operation: {operation}".format(operation=record.operation)) except: context.main_logger.exception("Exception while post processing notification.") pass + class NotificationStatistics: def __init__(self, name, logger, last_averages=100, log_time_seconds=30): self.name = name @@ -108,13 +110,12 @@ def stop(self): self.running_average += elapsed / self.last_averages elapsed_since_log = timeit.default_timer() - self.last_time_logged if elapsed_since_log > self.log_time_seconds: - self.logger.info('Average time for sending push for {name} is {avg}'.format(name=self.name, avg=self.running_average)) + self.logger.info( + 'Average time for sending push for {name} is {avg}'.format(name=self.name, avg=self.running_average)) self.last_time_logged = timeit.default_timer() - class ApnNotificationSender(NotificationSender): - PLATFORMS = (constants.PLATFORM_IPHONE, constants.PLATFORM_IPAD) @@ -129,7 +130,9 @@ def process(self): statistics.stop() unregistered_devices = sender.pop_unregistered_devices() if len(unregistered_devices) > 0: - NotificationPostProcessor.OPERATION_QUEUE.put(NotificationOperation(NotificationPostProcessor.UPDATE_UNREGISTERED_DEVICES, unregistered_devices)) + NotificationPostProcessor.OPERATION_QUEUE.put( + NotificationOperation(NotificationPostProcessor.UPDATE_UNREGISTERED_DEVICES, + unregistered_devices)) except Exception: context.main_logger.exception("ApnNotificationProcessor failed to send notifications") finally: @@ -137,7 +140,6 @@ def process(self): class GcmNotificationSender(NotificationSender): - PLATFORMS = (constants.PLATFORM_ANDROID, constants.PLATFORM_ANDROID_TABLET) @@ -152,10 +154,43 @@ def process(self): statistics.stop() canonical_ids = sender.pop_canonical_ids() if len(canonical_ids) > 0: - NotificationPostProcessor.OPERATION_QUEUE.put(NotificationOperation(NotificationPostProcessor.UPDATE_CANONICALS, canonical_ids)) + NotificationPostProcessor.OPERATION_QUEUE.put( + NotificationOperation(NotificationPostProcessor.UPDATE_CANONICALS, canonical_ids)) + unregistered_devices = sender.pop_unregistered_devices() + if len(unregistered_devices) > 0: + NotificationPostProcessor.OPERATION_QUEUE.put( + NotificationOperation(NotificationPostProcessor.UPDATE_UNREGISTERED_DEVICES, + unregistered_devices)) + except Exception: + context.main_logger.exception("GcmNotificationProcessor failed to send notifications") + finally: + self.log_notifications([notification]) + + +class FcmNotificationSender(NotificationSender): + PLATFORMS = (constants.PLATFORM_ANDROID, + constants.PLATFORM_ANDROID_TABLET, + constants.PLATFORM_IPAD, + constants.PLATFORM_IPHONE) + + def process(self): + sender = FCMPushSender(config.config, context.main_logger) + statistics = NotificationStatistics('FCM', context.main_logger) + while True: + notification = self.task_queue.get() + try: + statistics.start() + sender.send_in_batch(notification) + statistics.stop() + canonical_ids = sender.pop_canonical_ids() + if len(canonical_ids) > 0: + NotificationPostProcessor.OPERATION_QUEUE.put( + NotificationOperation(NotificationPostProcessor.UPDATE_CANONICALS, canonical_ids)) unregistered_devices = sender.pop_unregistered_devices() if len(unregistered_devices) > 0: - NotificationPostProcessor.OPERATION_QUEUE.put(NotificationOperation(NotificationPostProcessor.UPDATE_UNREGISTERED_DEVICES, unregistered_devices)) + NotificationPostProcessor.OPERATION_QUEUE.put( + NotificationOperation(NotificationPostProcessor.UPDATE_UNREGISTERED_DEVICES, + unregistered_devices)) except Exception: context.main_logger.exception("GcmNotificationProcessor failed to send notifications") finally: From f950bcfbc9f9260b55c2b01437d319dd7de0437b Mon Sep 17 00:00:00 2001 From: paulaan Date: Sun, 28 Jul 2019 12:49:28 +0700 Subject: [PATCH 3/5] [master] Dry run and FCM app --- pushkin/sender/nordifier/fcm.py | 251 ++------------------ pushkin/sender/nordifier/fcm_push_sender.py | 9 +- 2 files changed, 24 insertions(+), 236 deletions(-) diff --git a/pushkin/sender/nordifier/fcm.py b/pushkin/sender/nordifier/fcm.py index e33520d..51bcf42 100644 --- a/pushkin/sender/nordifier/fcm.py +++ b/pushkin/sender/nordifier/fcm.py @@ -9,14 +9,8 @@ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. ''' -import urllib -import urllib2 -import json -from collections import defaultdict -import time -import random import firebase_admin -from firebase_admin import credentials +from firebase_admin import credentials, messaging FCM_URL = 'https://fcm.googleapis.com/fcm/send' @@ -24,9 +18,6 @@ class FCMException(Exception): pass -class FCMMalformedJsonException(FCMException): pass - - class FCMConnectionException(FCMException): pass @@ -52,240 +43,36 @@ class FCMNotRegisteredException(FCMException): pass class FCMMessageTooBigException(FCMException): pass -class FCMInvalidRegistrationException(FCMException): pass +class FCMInvalidMessageException(FCMException): pass class FCMUnavailableException(FCMException): pass -# TODO: Refactor this to be more human-readable -def group_response(response, registration_ids, key): - # Pair up results and reg_ids - mapping = zip(registration_ids, response['results']) - # Filter by key - filtered = filter(lambda x: key in x[1], mapping) - # Only consider the value in the dict - tupled = [(s[0], s[1][key]) for s in filtered] - # Grouping of errors and mapping of ids - if key is 'registration_id': - grouping = {} - for k, v in tupled: - grouping[k] = v - else: - grouping = defaultdict(list) - for k, v in tupled: - grouping[v].append(k) - - if len(grouping) == 0: - return - return grouping - - -def urlencode_utf8(params): - """ - UTF-8 safe variant of urllib.urlencode. - http://stackoverflow.com/a/8152242 - """ - - if hasattr(params, 'items'): - params = params.items() - - params = ( - '='.join(( - urllib.quote_plus(k.encode('utf8'), safe='/'), - urllib.quote_plus(v.encode('utf8'), safe='/') - )) for k, v in params - ) - - return '&'.join(params) - - -def generate_token(service_account_file): +def generate_fcm_app(service_account_file): cred = credentials.Certificate(service_account_file) default_app = firebase_admin.initialize_app(cred) - return default_app.credential.get_access_token() + return default_app class FCM(object): - # Timeunit is milliseconds. - BACKOFF_INITIAL_DELAY = 1000; - MAX_BACKOFF_DELAY = 1024000; - - def __init__(self, token, url=FCM_URL, proxy=None): - """ api_key : google api key - url: url of FCM service. - proxy: can be string "http://host:port" or dict {'https':'host:port'} - """ - self.access_token = token - self.url = url - if proxy: - if isinstance(proxy, basestring): - protocol = url.split(':')[0] - proxy = {protocol: proxy} - - auth = urllib2.HTTPBasicAuthHandler() - opener = urllib2.build_opener(urllib2.ProxyHandler(proxy), auth, urllib2.HTTPHandler) - urllib2.install_opener(opener) - - def construct_payload(self, registration_ids, data=None, collapse_key=None, - delay_while_idle=False, time_to_live=None, is_json=True, dry_run=False): - """ - Construct the dictionary mapping of parameters. - Encodes the dictionary into JSON if for json requests. - Helps appending 'data.' prefix to the plaintext data: 'hello' => 'data.hello' - - :return constructed dict or JSON payload - :raises FCMInvalidTtlException: if time_to_live is invalid - """ - - if time_to_live: - if time_to_live > 2419200 or time_to_live < 0: - raise FCMInvalidTtlException("Invalid time to live value") - - if is_json: - payload = {'registration_ids': registration_ids} - if data: - payload['data'] = data - else: - payload = {'registration_id': registration_ids} - if data: - plaintext_data = data.copy() - for k in plaintext_data.keys(): - plaintext_data['data.%s' % k] = plaintext_data.pop(k) - payload.update(plaintext_data) - - if delay_while_idle: - payload['delay_while_idle'] = delay_while_idle - - if time_to_live >= 0: - payload['time_to_live'] = time_to_live - - if collapse_key: - payload['collapse_key'] = collapse_key - - if dry_run: - payload['dry_run'] = True - - if is_json: - payload = json.dumps(payload) - - return payload - - def make_request(self, data, is_json=True): - """ - Makes a HTTP request to FCM servers with the constructed payload - - :param data: return value from construct_payload method - :raises FCMMalformedJsonException: if malformed JSON request found - :raises FCMAuthenticationException: if there was a problem with authentication, invalid api key - :raises FCMConnectionException: if FCM is screwed + def __init__(self, app): + """ app : FCM app """ + self.app = app - headers = { - 'Authorization': 'key=%s' % self.access_token, - } - # Default Content-Type is defaulted to application/x-www-form-urlencoded;charset=UTF-8 - if is_json: - headers['Content-Type'] = 'application/json' - - if not is_json: - data = urlencode_utf8(data) - req = urllib2.Request(self.url, data, headers) - + def send(self, data, dry_run=False): try: - response = urllib2.urlopen(req).read() - except urllib2.HTTPError as e: - if e.code == 400: - raise FCMMalformedJsonException("The request could not be parsed as JSON") - elif e.code == 401: - raise FCMAuthenticationException("There was an error authenticating the nordifier account") - elif e.code == 503: - raise FCMUnavailableException("FCM service is unavailable") - else: - error = "FCM service error: %d" % e.code - raise FCMUnavailableException(error) - except urllib2.URLError as e: - raise FCMConnectionException( - "There was an internal error in the FCM server while trying to process the request") - - if is_json: - response = json.loads(response) + response = messaging.send(data, dry_run=dry_run, app=self.app) + except messaging.ApiCallError as e: + raise FCMException(e) + except Exception as e: + raise FCMInvalidMessageException(e) return response - def raise_error(self, error): - if error == 'InvalidRegistration': - raise FCMInvalidRegistrationException("Registration ID is invalid") - elif error == 'Unavailable': - # Plain-text requests will never return Unavailable as the error code. - # http://developer.android.com/guide/google/FCM/FCM.html#error_codes - raise FCMUnavailableException("Server unavailable. Resent the message") - elif error == 'NotRegistered': - raise FCMNotRegisteredException("Registration id is not valid anymore") - elif error == 'MismatchSenderId': - raise FCMMismatchSenderIdException("A Registration ID is tied to a certain group of senders") - elif error == 'MessageTooBig': - raise FCMMessageTooBigException("Message can't exceed 4096 bytes") - - def handle_plaintext_response(self, response): - - # Split response by line - response_lines = response.strip().split('\n') - # Split the first line by = - key, value = response_lines[0].split('=') - if key == 'Error': - self.raise_error(value) - else: - if len(response_lines) == 2: - return response_lines[1].split('=')[1] - return - - def handle_json_response(self, response, registration_ids): - errors = group_response(response, registration_ids, 'error') - canonical = group_response(response, registration_ids, 'registration_id') - - info = {} - if errors: - info.update({'errors': errors}) - if canonical: - info.update({'canonical': canonical}) - - return info - - def extract_unsent_reg_ids(self, info): - if 'errors' in info and 'Unavailable' in info['errors']: - return info['errors']['Unavailable'] - return [] - - def plaintext_request(self, registration_id, data=None, collapse_key=None, - delay_while_idle=False, time_to_live=None, retries=5, dry_run=False): - """ - Makes a plaintext request to FCM servers - - :param registration_id: string of the registration id - :param data: dict mapping of key-value pairs of messages - :return dict of response body from Google including multicast_id, success, failure, canonical_ids, etc - :raises FCMMissingRegistrationException: if registration_id is not provided - """ - - if not registration_id: - raise FCMMissingRegistrationException("Missing registration_id") - - payload = self.construct_payload( - registration_id, data, collapse_key, - delay_while_idle, time_to_live, False, dry_run - ) - - attempt = 0 - backoff = self.BACKOFF_INITIAL_DELAY - for attempt in range(retries): - try: - response = self.make_request(payload, is_json=False) - return self.handle_plaintext_response(response) - except FCMUnavailableException: - sleep_time = backoff / 2 + random.randrange(backoff) - time.sleep(float(sleep_time) / 1000) - if 2 * backoff < self.MAX_BACKOFF_DELAY: - backoff *= 2 - - raise IOError("Could not make request after %d attempts" % attempt) - + def send_batch(self, data, dry_run=False): + response = messaging.send_all(data, dry_run=dry_run, app=self.app) + if response.failure_count > 0: + raise FCMTooManyRegIdsException("Many reg id is failed %s", + ",".join(response.responses["failed_registration_ids"])) + return response diff --git a/pushkin/sender/nordifier/fcm_push_sender.py b/pushkin/sender/nordifier/fcm_push_sender.py index 73c7dab..9ca11a4 100644 --- a/pushkin/sender/nordifier/fcm_push_sender.py +++ b/pushkin/sender/nordifier/fcm_push_sender.py @@ -11,7 +11,7 @@ import time from sender import Sender import constants as const -from fcm import FCM, generate_token +from fcm import FCM, generate_fcm_app from firebase_admin import messaging @@ -23,9 +23,9 @@ class FCMPushSender(Sender): def __init__(self, config, log): Sender.__init__(self, config, log) - self.access_key = generate_token(config.get('Messenger', 'google_application_credentials')) self.base_deeplink_url = config.get('Messenger', 'base_deeplink_url') - self.FCM = FCM(self.access_key) + app = generate_fcm_app(config.get('Messenger', 'google_application_credentials')) + self.FCM = FCM(app) self.canonical_ids = [] self.unregistered_devices = [] @@ -65,8 +65,9 @@ def create_message(self, notification): return msg def send(self, notification): + dry_run = 'dry_run' in notification and notification['dry_run'] == True message = self.create_message(notification) - response = messaging.send(message) + response = messaging.send(message, dry_run) return response From 6740b82d12b40a9832b1537e43f580afc1658296 Mon Sep 17 00:00:00 2001 From: paulaan Date: Sun, 28 Jul 2019 12:50:55 +0700 Subject: [PATCH 4/5] [master] Refine codebase --- pushkin/sender/nordifier/fcm.py | 34 ++++----------------------------- 1 file changed, 4 insertions(+), 30 deletions(-) diff --git a/pushkin/sender/nordifier/fcm.py b/pushkin/sender/nordifier/fcm.py index 51bcf42..a87253a 100644 --- a/pushkin/sender/nordifier/fcm.py +++ b/pushkin/sender/nordifier/fcm.py @@ -15,38 +15,12 @@ FCM_URL = 'https://fcm.googleapis.com/fcm/send' -class FCMException(Exception): pass +class FCMException(Exception): + pass -class FCMConnectionException(FCMException): pass - - -class FCMAuthenticationException(FCMException): pass - - -class FCMTooManyRegIdsException(FCMException): pass - - -class FCMInvalidTtlException(FCMException): pass - - -# Exceptions from Google responses -class FCMMissingRegistrationException(FCMException): pass - - -class FCMMismatchSenderIdException(FCMException): pass - - -class FCMNotRegisteredException(FCMException): pass - - -class FCMMessageTooBigException(FCMException): pass - - -class FCMInvalidMessageException(FCMException): pass - - -class FCMUnavailableException(FCMException): pass +class FCMInvalidMessageException(FCMException): + pass def generate_fcm_app(service_account_file): From 5ebfdfd3e305d1627ef96a336947e1fbb893c257 Mon Sep 17 00:00:00 2001 From: paulaan Date: Sun, 28 Jul 2019 13:33:16 +0700 Subject: [PATCH 5/5] [master] refine codebase --- pushkin/sender/nordifier/fcm.py | 4 ++++ pushkin/sender/nordifier/fcm_push_sender.py | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pushkin/sender/nordifier/fcm.py b/pushkin/sender/nordifier/fcm.py index a87253a..f29c8e9 100644 --- a/pushkin/sender/nordifier/fcm.py +++ b/pushkin/sender/nordifier/fcm.py @@ -23,6 +23,10 @@ class FCMInvalidMessageException(FCMException): pass +class FCMTooManyRegIdsException(FCMException): + pass + + def generate_fcm_app(service_account_file): cred = credentials.Certificate(service_account_file) default_app = firebase_admin.initialize_app(cred) diff --git a/pushkin/sender/nordifier/fcm_push_sender.py b/pushkin/sender/nordifier/fcm_push_sender.py index 9ca11a4..a15ff96 100644 --- a/pushkin/sender/nordifier/fcm_push_sender.py +++ b/pushkin/sender/nordifier/fcm_push_sender.py @@ -67,7 +67,7 @@ def create_message(self, notification): def send(self, notification): dry_run = 'dry_run' in notification and notification['dry_run'] == True message = self.create_message(notification) - response = messaging.send(message, dry_run) + response = self.FCM.send(message, dry_run) return response @@ -75,4 +75,4 @@ def send_batch(self): messages = [] while len(self.queue): messages.append(self.create_message(self.queue.pop())) - messaging.send_all(messages) + return self.FCM.send_batch(messages)