order-processing/App_State.py

333 lines
11 KiB
Python

import logging
import logging.handlers
from rich.logging import RichHandler
import yaml
import time
import argparse
import datetime
import threading
try :
import paho.mqtt.client as mqtt
except :
mqtt = None
try :
import redis
except :
redis = None
from Attr_Dict import Attr_Dict
try :
from cryptography.fernet import Fernet
except :
print ("Decryption supported disabled")
logging.DEBUG5 = logging.DEBUG - 5
logging.addLevelName (logging.DEBUG5, "DEBUG5")
def debug5 (self, msg, *args, **kwargs):
"""
Log 'msg % args' with severity 'DEBUG4'.
To pass exception information, use the keyword argument exc_info with
a true value, e.g.
logger.debug("Houston, we have a %s", "thorny problem", exc_info=1)
"""
if self.isEnabledFor(logging.DEBUG5):
self._log(logging.DEBUG5, msg, args, **kwargs)
# end def debug5
logging.Logger.debug5 = debug5
class App_State :
user = None
imos_done_dir = "IMOS-Done"
imos_order_dir = "ImOrder"
sim_batches = "Batches"
pgiq_dir = "OrderXML"
def _setup_topics (self, config = None) :
config = config or getattr (self, "config", {})
mqr = config.mqtt ["topic-root"]
baskets = f"{mqr}/baskets"
self.mqtt = Attr_Dict \
( basket = Attr_Dict
( state = f"{baskets}/{{basket_no}}/state"
, user = f"{baskets}/{{basket_no}}/user"
, changed = f"{baskets}/{{basket_no}}/changed"
, data = f"{baskets}/{{basket_no}}"
)
, baskets = baskets
)
# end def _setup_topics
@classmethod
def Add_Logging_Attributes (cls, parser = None) :
if parser is None :
parser = argparse.ArgumentParser ()
parser.add_argument ("--log-level", type = str, default = "INFO")
parser.add_argument ("--log-file", type = str)
parser.add_argument ("--log-file-level", type = str, default = "INFO")
return parser
# end def Add_Logging_Attributes
@classmethod
def Setup_Logging (cls, cmd, name = None) :
logging.basicConfig \
( level = "DEBUG5"
, format = "%(message)s"
, handlers = []
)
cls.L = logging.getLogger (name)
handler = RichHandler ()
handler.setLevel (cmd.log_level)
cls.L.addHandler (handler)
if cmd.log_file :
handler = logging.handlers.TimedRotatingFileHandler \
(cmd.log_file, when = "midnight", encoding = "utf-8")
formatter = logging.Formatter \
( "%(asctime)s - %(levelname)-7s - %(message)s"
, datefmt = "%Y-%m-%d %H:%M:%S"
)
handler.setFormatter (formatter)
if cmd.log_file_level :
handler.setLevel (cmd.log_file_level)
cls.L.addHandler (handler)
return cls.L
# end def Setup_Logging
@staticmethod
def snow (user = None) :
date = datetime.datetime.now ().strftime ("%Y-%m-%d %H-%M-%S")
if user :
return f"{date} -- {user}"
return date
# end def snow
@classmethod
def Load_Yaml (cls, file_name) :
with open (file_name, "r") as f :
return Attr_Dict (yaml.safe_load (f))
# end def Load_Yaml
def load_config (self, file_name) :
with open (file_name, "r") as f :
self.config = Attr_Dict (yaml.safe_load (f))
self.L.info ("Configuration loaded from %s", file_name)
return self.config
# end def load_config
def connect_to_db (self) :
if self.config.get ("database", "redis") == "mqtt" :
return MQTT \
(self.config, self._db_connected, self._db_changes)
return Redis (self.config, self._db_connected, self._db_changes)
# end def connect_to_db
def _db_connected (self, client) : pass
def _db_changes (self, key, value) : pass
def change_basket_user (self, basket_no, user, data) :
if user != data.get ("user") :
self.L.debug \
( "Change basket user for %s from %s to %s"
, basket_no, data.get ("user", "<>"), user
)
data ["user"] = user
bt = self.mqtt.basket
data ["changed"] = self.snow (self.User)
self.client.publish \
( bt.user.format (basket_no = basket_no)
, data ["user"]
)
self.client.publish \
( bt.changed.format (basket_no = basket_no)
, data ["changed"]
)
return True
return False
# end def change_basket_user
def change_basket_state (self, basket_no, state, data) :
old_state = int (data.get ("state", "0"))
if state != old_state :
self.L.debug \
( "Change basket status for %s from %s to %s"
, basket_no, old_state, state
)
data ["state"] = str (state)
bt = self.mqtt.basket
data ["changed"] = self.snow (self.User)
self.client.publish \
( bt.state.format (basket_no = basket_no)
, data ["state"]
)
self.client.publish \
( bt.changed.format (basket_no = basket_no)
, data ["changed"]
)
return True
return False
# end def change_basket_state
_key = b'Mgl1Wae_JILP0rbLazTL5gOpfrp4tqOFOiEsD_IUH8c='
@classmethod
def Decrypt (cls, data: bytes) -> bytes:
"""Decrypt the passed data"""
return Fernet (cls._key).decrypt (data)
# end def Decrypt
@classmethod
def Encrypt (cls, data: bytes) -> bytes:
"""Encrypt the passed data"""
return Fernet (cls._key).encrypt (data.encode ("ascii"))
# end def Encrypt
# end class App_State
class _Database_Interface_ (App_State) :
def __init__ (self, config, connected, changes) :
self.connected = connected
self.changes = changes
# end def __init__
def subscribe (self, key) : pass
def publish (self, key, value) : pass
def delete (self, key): pass
def get_all_values (self, key) : pass
# end class _Database_Interface_
class MQTT (_Database_Interface_) :
def __init__ (self, config, connected, changes) :
super ().__init__ (config, connected, changes)
self.client = client = mqtt.Client ()
client.on_connect = self._on_connect
client.on_message = self._on_message
mqc = config ["mqtt"]
pw = self.Decrypt (mqc.password)
client.username_pw_set (mqc.user, pw)
self.L.info ("Try to connect to MQTT %s:%s", mqc.broker, mqc.port)
client.connect (mqc.broker, port = mqc.port)
self._mqtt_thread = threading.Thread \
(target = self.client.loop_forever, daemon = True)
self._mqtt_thread.start ()
# end def __init__
def _on_connect (self, client, user_data, flags, rc) :
self.L.info ("Connected to MQTT Broker")
if self.connected :
self.connected (self)
# end def _on_connect
def _on_message (self, client, user_data, msg) :
self.L.debug5 ("MQTT Message: %s: %s", msg.topic, msg.payload)
if self.changes :
self.changes (msg.topic, msg.payload)
# end def _on_message
def subscribe (self, key, recursie) :
if recursie :
key = f"{key}/#"
self.client.subscribe (key)
# end def subscribe
def publish (self, key, value) :
self.client.publish (key, payload = value, retain = True)
self.L.debug5 ("Publish to topic %s: %s", key, value)
# end def publish
def delete (self, key) :
self.client.publish (key, payload = None, retain = False)
self.L.debug5 ("Delete topic %s", key)
# end def delete
# end class MQTT
class Redis (_Database_Interface_) :
def __init__ (self, config, connected, changes) :
super ().__init__ (config, connected, changes)
RC = config.redis
host = RC.host
port = RC.get ("port", 6379)
self.L.info ("Try to connect to redis %s:%s", host, port)
self.client = client = redis.StrictRedis \
( host = host
, port = port
, decode_responses = True
)
self.subscribtions = {}
if connected :
connected (self)
threading.Thread (target = self._listen, daemon = True).start ()
# end def __init__
def _listen (self) :
self.L.debug ("Start redis listener",)
pubsub = self.client.pubsub (ignore_subscribe_messages = True)
pubsub.execute_command (b"CLIENT", b"ID")
client_id = pubsub.connection.read_response ()
pubsub.execute_command \
(f"CLIENT TRACKING on REDIRECT {client_id} BCAST")
res = pubsub.connection.read_response ()
self.L.debug ("Redis tracking enabled: %s", res)
pubsub.subscribe ('__redis__:invalidate')
while True :
m = pubsub.get_message ()
if m and m ["channel"] == "__redis__:invalidate" :
if self.changes :
for key in m ["data"] :
for k, r in self.subscribtions.items () :
if ( ( r and key.startswith (k))
or (not r and (key == k))
) :
val = self.client.get (key)
self.changes (key.replace (":", "/"), val)
else :
time.sleep (0.01)
# end def _listen
def get_all_values (self, key) :
key = key.replace ("/", ":")
for k in self.client.keys (f"{key}:*") :
v = self.client.get (k)
yield k.replace (":", "/"), v
# end def get_all_values
def subscribe (self, key, recursive) :
self.subscribtions [key.replace ("/", ":")] = recursive
# end def subscribe
def publish (self, key, value) :
key = key.replace ("/", ":")
self.client.set (key, value)
self.L.debug5 ("Set %s: %s", key, value)
# end def publish
def delete (self, key) :
key = key.replace ("/", ":")
self.client.delete (key)
self.L.debug5 ("Delete key %s", key)
# end def delete
# end class Redis
if __name__ == "__main__" :
import argparse
parser = argparse.ArgumentParser ()
parser.add_argument ("-e", "--encrypt", type = str)
parser.add_argument ("-d", "--decrypt", type = str)
cmd = parser.parse_args ()
if cmd.encrypt :
print (App_State.Encrypt (cmd.encrypt))
if cmd.decrypt :
print (App_State.Decrypt (cmd.decrypt))