94 lines
		
	
	
		
			3.4 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
			
		
		
	
	
			94 lines
		
	
	
		
			3.4 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
import requests
 | 
						|
import json
 | 
						|
 | 
						|
FCM_URL = "https://fcm.googleapis.com/fcm/send"
 | 
						|
 | 
						|
class FCMAPI(object):
 | 
						|
 | 
						|
    def __init__(self, api_key):
 | 
						|
        self._API_KEY = api_key
 | 
						|
 | 
						|
    def _headers(self):
 | 
						|
        return {
 | 
						|
            "Content-Type": "application/json",
 | 
						|
            "Authorization": "Key=" + self._API_KEY
 | 
						|
        }
 | 
						|
 | 
						|
    def jsonDumps(self, data):
 | 
						|
        return json.dumps(data).encode('utf8')
 | 
						|
 | 
						|
    def _push(self, payload):
 | 
						|
        response = requests.post(FCM_URL, headers=self._headers(), data=payload)
 | 
						|
        print (response)
 | 
						|
        if response.status_code == 200:
 | 
						|
            if int(response.headers.get('content-length',0)) <= 0:
 | 
						|
                return {}
 | 
						|
            return response.json()
 | 
						|
        elif response.status_code == 401:
 | 
						|
            raise Exception("There was an error authenticating the sender account")
 | 
						|
        elif response.status_code == 400:
 | 
						|
            raise Exception(response.text)
 | 
						|
        else:
 | 
						|
            raise Exception("FCM server is temporarily unavailable")
 | 
						|
 | 
						|
    def send(self, payloads=None):
 | 
						|
        self.all_responses = []
 | 
						|
        for payload in payloads:
 | 
						|
            response = self._push(payload)
 | 
						|
            self.all_responses.append(response)
 | 
						|
        return self.all_responses
 | 
						|
 | 
						|
    def parse_payload(self, registration_ids=None, topic_name=None, message_body=None, message_title=None,
 | 
						|
                      message_icon=None, priority=False, data_message=None, badge=None, color=None, tag=None,
 | 
						|
                      **extra_kwargs):
 | 
						|
        fcm_payload = dict()
 | 
						|
        if registration_ids:
 | 
						|
            if len(registration_ids) > 1:
 | 
						|
                fcm_payload['registration_ids'] = registration_ids
 | 
						|
            else:
 | 
						|
                fcm_payload['to'] = registration_ids[0]
 | 
						|
        if topic_name:
 | 
						|
            fcm_payload['to'] = '/topics/%s' % topic_name
 | 
						|
        if priority:
 | 
						|
            fcm_payload['priority'] = 'normal'
 | 
						|
        else:
 | 
						|
            fcm_payload['priority'] = 'high'
 | 
						|
 | 
						|
        if data_message:
 | 
						|
            if isinstance(data_message, dict):
 | 
						|
                fcm_payload['data'] = data_message
 | 
						|
            else:
 | 
						|
                raise Exception("Provided data_message is in the wrong format")
 | 
						|
 | 
						|
        fcm_payload['notification'] = {}
 | 
						|
        if message_icon:
 | 
						|
            fcm_payload['notification']['icon'] = message_icon
 | 
						|
        if message_body:
 | 
						|
            fcm_payload['notification']['body'] = message_body
 | 
						|
        if message_title:
 | 
						|
            fcm_payload['notification']['title'] = message_title
 | 
						|
 | 
						|
        if badge:
 | 
						|
            fcm_payload['notification']['badge'] = badge
 | 
						|
        if color:
 | 
						|
            fcm_payload['notification']['color'] = color
 | 
						|
        if tag:
 | 
						|
            fcm_payload['notification']['tag'] = tag
 | 
						|
 | 
						|
        if extra_kwargs:
 | 
						|
            fcm_payload['notification'].update(extra_kwargs)
 | 
						|
 | 
						|
        return self.jsonDumps(fcm_payload)
 | 
						|
 | 
						|
if __name__== "__main__":
 | 
						|
    key = "AAAAkdP3f44:APA91bEyls0NapnpJi9IZvMe7jEpkYYx99bdCcFUYtIy4ZDEsEaKaZd1DRhodK6yIXiiWvRfwCE_17HU2uz04s--1qYSw635BgB_ilMxwlQEbBPOqoJkFTPpWnERbbJ2401Eyh27Szyv"
 | 
						|
    push_service = FCMAPI(api_key=key)
 | 
						|
 | 
						|
    registration_id = ["eSUQAef0S2Y:APA91bHm_Q2YJnzBJI77g7mTAW1teFd3i0t8gOM8HqJuv2RgO3_FFSI03ffShGCXus6jNNyILZ3Xng2jNKQdMCJ5FBPneJifkhQ9RJsC52reHfoXOxeEQvLFRbXZw1mKccUIGjf7P7_l"]
 | 
						|
    message_title = "Test"
 | 
						|
    message_body = "Hi Mohit, your customized news for today is ready"
 | 
						|
    payload = push_service.parse_payload(registration_ids=registration_id, message_title=message_title, message_body=message_body)
 | 
						|
    print (payload)
 | 
						|
    print (push_service.send([payload]))
 | 
						|
                
 |