MQTT rework started
parent
4e5ee2cf59
commit
c9d4796936
|
|
@ -0,0 +1,46 @@
|
|||
class Attr_Dict (dict) :
|
||||
|
||||
def _convert_dict (self, o) :
|
||||
return self.__class__ (o)
|
||||
# end def _convert_dict
|
||||
|
||||
def _convert_sequence (self, o) :
|
||||
result = []
|
||||
for v in o :
|
||||
if isinstance (v, (list, tuple)) :
|
||||
result.append (self._convert_sequence (v))
|
||||
elif isinstance (v, dict) :
|
||||
result.append (self._convert_dict (v))
|
||||
else :
|
||||
result.append (v)
|
||||
if isinstance (o, tuple) :
|
||||
return tuple (result)
|
||||
return result
|
||||
# end def _convert_sequence
|
||||
|
||||
def __getitem__ (self, key) :
|
||||
value = super ().__getitem__ (key)
|
||||
if isinstance (value, dict) :
|
||||
value = self [key] = self._convert_dict (value)
|
||||
if isinstance (value, (tuple, list)) :
|
||||
value = self [key] = self._convert_sequence (value)
|
||||
return value
|
||||
# end def __getitem__
|
||||
|
||||
def __getattr__ (self, name) :
|
||||
try :
|
||||
return self [name]
|
||||
except KeyError :
|
||||
raise AttributeError (name)
|
||||
# end def __getattr__
|
||||
|
||||
def __setattr__ (self, name, value) :
|
||||
self [name] = value
|
||||
# end def __setattr__
|
||||
|
||||
# end class Attr_Dict
|
||||
|
||||
if __name__ == "__main__" :
|
||||
a = Attr_Dict (a = {"b" : 1}, b = [{"a": 1}], c = ({"a" : 1}, ))
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,145 @@
|
|||
from pathlib import Path
|
||||
from lxml import etree
|
||||
from App_State import App_State, Attr_Dict
|
||||
from filesystem_watcher import \
|
||||
_Logger_, WindowsApiObserverExcpetionHandler, \
|
||||
WindowsApiEmitterExceptionHandling, Directory_Handler
|
||||
import time
|
||||
import json
|
||||
import datetime
|
||||
|
||||
class Order_Handler (Directory_Handler) :
|
||||
|
||||
def __init__ (self, path, client, config) :
|
||||
super ().__init__ (path)
|
||||
self.client = client
|
||||
mqr = config.mqtt ["topic-root"]
|
||||
baskets = f"{mqr}/baskets"
|
||||
self.mqtt = Attr_Dict \
|
||||
( basket_topic = f"{baskets}/{{basket_no}}"
|
||||
, basket_state = f"{baskets}/{{basket_no}}/state"
|
||||
, basket_user = f"{baskets}/{{basket_no}}/user"
|
||||
, basket_changed = f"{baskets}/{{basket_no}}/changed"
|
||||
, baskets = baskets
|
||||
)
|
||||
# end def __init__
|
||||
|
||||
def on_any_event (self, event) :
|
||||
if event.event_type == 'created' :
|
||||
file = Path (event.src_path)
|
||||
if file.suffix == ".txt" :
|
||||
xml = file.with_suffix (".xml")
|
||||
if xml.exists () :
|
||||
self.new_order (xml)
|
||||
if file.suffix == ".xml" :
|
||||
txt = file.with_suffix (".txt")
|
||||
if txt.exists () :
|
||||
self.new_order (file)
|
||||
elif event.event_type == 'deleted' :
|
||||
file = Path (event.src_path)
|
||||
if file.suffix == ".xml" :
|
||||
basket_no = file.stem
|
||||
for t in self.mqtt.values () :
|
||||
self.client.publish \
|
||||
( t.format (basket_no = basket_no)
|
||||
, payload = None, retain = False
|
||||
)
|
||||
self.L.info ("Order removed %s", basket_no)
|
||||
super ().on_any_event (event)
|
||||
# end def on_any_event
|
||||
|
||||
def new_order (self, xml_file : Path) :
|
||||
try :
|
||||
self.L.info ("New basket %s", xml_file.name)
|
||||
xml = etree.parse (xml_file).getroot ()
|
||||
com = xml.xpath ("//TEXT_SHORT") [0].text
|
||||
user, line = xml.xpath ("//INFO8") [0].text.split ("/")
|
||||
basket_no = xml_file.stem
|
||||
self.client.publish \
|
||||
( self.mqtt.basket_state.format (basket_no = basket_no)
|
||||
, payload = "new", retain = True
|
||||
)
|
||||
self.client.publish \
|
||||
( self.mqtt.basket_changed.format (basket_no = basket_no)
|
||||
, payload = App_State.snow ("import"), retain = True
|
||||
)
|
||||
basket = dict \
|
||||
(user = user, line = line, commission = com)
|
||||
basket ["positions"] = self._scan_order_pos (xml)
|
||||
self.client.publish \
|
||||
( self.mqtt.basket_topic.format (basket_no = basket_no)
|
||||
, payload = json.dumps (basket), retain = True
|
||||
)
|
||||
self.L.info ("Created new basket %s", basket_no)
|
||||
except Exception as e :
|
||||
self.L.error (e, exc_info = True)
|
||||
# end def new_order
|
||||
|
||||
def _scan_order_pos (self, xml) :
|
||||
result = []
|
||||
bl = xml.xpath ("//BuilderList") [0]
|
||||
for a in bl.getchildren () :
|
||||
art_name = a.xpath ("Pname") [0].text.strip ()
|
||||
count = int (a.xpath ("Count") [0].text)
|
||||
pos = a.get ("LineNo")
|
||||
text = a.xpath ("ARTICLE_TEXT_INFO1" ) [0].text.strip ()
|
||||
price = float (a.xpath ("ARTICLE_PRICE_INFO1") [0].text)
|
||||
result.append \
|
||||
( dict ( art_name = art_name
|
||||
, count = count
|
||||
, pos = pos
|
||||
, text = text
|
||||
, price = price
|
||||
)
|
||||
)
|
||||
return result
|
||||
# end def _scan_order_pos
|
||||
|
||||
# end class Order_Handler
|
||||
|
||||
class Order_Watch (App_State) :
|
||||
|
||||
order_incomming_dir = r"n:\glueck\watch-me\inbox"
|
||||
|
||||
def __init__ (self, config_file) :
|
||||
self.load_config (config_file)
|
||||
self.client = self.connect_to_mqtt ()
|
||||
# end def __init__
|
||||
|
||||
def _start_watchdog (self) :
|
||||
handler = Order_Handler \
|
||||
(self.order_incomming_dir, self.client, self.config)
|
||||
return WindowsApiObserverExcpetionHandler.Start_Directory_Watcher \
|
||||
(handler)
|
||||
# end def _start_watchdog
|
||||
|
||||
def run (self) :
|
||||
observer = None
|
||||
try:
|
||||
observer = self._start_watchdog ()
|
||||
while True:
|
||||
time.sleep (1)
|
||||
if WindowsApiEmitterExceptionHandling.Exception :
|
||||
App_State.L.exception \
|
||||
(str (WindowsApiEmitterExceptionHandling.Exception))
|
||||
WindowsApiEmitterExceptionHandling.Exception = None
|
||||
if observer :
|
||||
observer.stop ()
|
||||
observer = self._start_watchdog ()
|
||||
finally :
|
||||
if observer :
|
||||
observer.stop ()
|
||||
observer.join ()
|
||||
# end def run
|
||||
|
||||
# end class Order_Watch
|
||||
|
||||
if __name__ == "__main__" :
|
||||
parser = Order_Watch.Add_Logging_Attributes ()
|
||||
parser.add_argument ("config_file", type = str)
|
||||
cmd = parser.parse_args ()
|
||||
|
||||
_Logger_.L = Order_Watch.Setup_Logging (cmd)
|
||||
ow = Order_Watch (cmd.config_file)
|
||||
ow.run ()
|
||||
|
||||
|
|
@ -0,0 +1,121 @@
|
|||
import time
|
||||
from watchdog.events import FileSystemEventHandler
|
||||
from watchdog.observers import Observer
|
||||
from watchdog.observers.read_directory_changes import \
|
||||
WindowsApiEmitter, DEFAULT_OBSERVER_TIMEOUT, BaseObserver
|
||||
|
||||
class _Logger_ : pass
|
||||
|
||||
class WindowsApiEmitterExceptionHandling (WindowsApiEmitter, _Logger_) :
|
||||
|
||||
Exception = None
|
||||
|
||||
def run (self, * args, ** kw) :
|
||||
try :
|
||||
super ().run (* args, ** kw)
|
||||
except Exception as err :
|
||||
WindowsApiEmitterExceptionHandling.Exception = err
|
||||
# end def run
|
||||
|
||||
# end class WindowsApiEmitterExceptionHandling
|
||||
|
||||
class WindowsApiObserverExcpetionHandler (Observer, _Logger_) :
|
||||
|
||||
def __init__ (self, timeout = DEFAULT_OBSERVER_TIMEOUT) :
|
||||
BaseObserver.__init__ \
|
||||
( self
|
||||
, emitter_class = WindowsApiEmitterExceptionHandling
|
||||
, timeout = timeout
|
||||
)
|
||||
# end def __init__
|
||||
|
||||
@classmethod
|
||||
def Start_Directory_Watcher (cls, handler, ** kw) :
|
||||
L = cls.L
|
||||
retry_wait = kw.pop ("RETRY_SLEEP", 1)
|
||||
while True :
|
||||
try :
|
||||
observer = WindowsApiObserverExcpetionHandler ()
|
||||
observer.schedule (handler, handler.directory, ** kw)
|
||||
observer.start ()
|
||||
L.info ("Directory watching for %s started", handler.directory)
|
||||
return observer
|
||||
except Exception as e :
|
||||
if getattr (e, "errno", None) == 2 :
|
||||
L.error ("%s does not exists, retry", path)
|
||||
observer.stop ()
|
||||
time.sleep (retry_wait)
|
||||
else :
|
||||
raise
|
||||
# end def Start_Directory_Watcher
|
||||
|
||||
# end class WindowsApiObserverExcpetionHandler
|
||||
|
||||
class Directory_Handler (FileSystemEventHandler, _Logger_) :
|
||||
|
||||
def __init__ (self, directory) :
|
||||
super ().__init__ ()
|
||||
self.directory = directory
|
||||
# end def __init__
|
||||
|
||||
def on_any_event (self, event):
|
||||
self.L.debug ("%s %s", event.event_type, event.src_path)
|
||||
if event.event_type == 'deleted' and event.src_path == self.directory :
|
||||
WindowsApiEmitterExceptionHandling.Exception = \
|
||||
f"Directory has been deleted {self.directory}"
|
||||
#if event.event_type == 'created' :
|
||||
# try:
|
||||
# fileBase = os.path.basename(event.src_path).split('.')[0]
|
||||
# filePath = os.path.dirname(event.src_path) (fileBase,filePath))
|
||||
# except Exception as e:
|
||||
# pass
|
||||
|
||||
# if event.is_directory:
|
||||
# return None
|
||||
#
|
||||
# elif event.event_type == 'created':
|
||||
# # Take any action here when a file is first created.
|
||||
# print(datetime.datetime.now())
|
||||
# print("Received created event - %s." % event.src_path)
|
||||
#
|
||||
# elif event.event_type == 'modified':
|
||||
# # Taken any action here when a file is modified.
|
||||
# print(datetime.datetime.now())
|
||||
# print("Received modified event - %s." % event.src_path)
|
||||
#
|
||||
# else:
|
||||
# print(event.event_type,event.src_path)
|
||||
# end def on_any_event
|
||||
|
||||
# end class Directory_Handler
|
||||
|
||||
if __name__ == "__main__" :
|
||||
from App_State import App_State
|
||||
import time
|
||||
import os
|
||||
|
||||
cmd = App_State.Add_Logging_Attributes ().parse_args ()
|
||||
L = _Logger_.L = App_State.Setup_Logging (cmd)
|
||||
L.info ("PID_ %d", os.getpid ())
|
||||
def start_watching (path) :
|
||||
return WindowsApiObserverExcpetionHandler.Start_Directory_Watcher \
|
||||
(Directory_Handler, path)
|
||||
# end def start_watching
|
||||
|
||||
path = r"k:\glueck\watch-me"
|
||||
observer = None
|
||||
try:
|
||||
observer = start_watching (path)
|
||||
while True:
|
||||
time.sleep (1)
|
||||
if WindowsApiEmitterExceptionHandling.Exception :
|
||||
App_State.L.exception \
|
||||
(str (WindowsApiEmitterExceptionHandling.Exception))
|
||||
WindowsApiEmitterExceptionHandling.Exception = None
|
||||
if observer :
|
||||
observer.stop ()
|
||||
observer = start_watching (path)
|
||||
finally :
|
||||
if observer :
|
||||
observer.stop ()
|
||||
observer.join ()
|
||||
|
|
@ -0,0 +1,6 @@
|
|||
mqtt:
|
||||
broker: odoo-test
|
||||
port: 1883
|
||||
user: imos
|
||||
password: gAAAAABmTy4fipVAw10oKCE2aLWq79BF9Id5H8-lEEm8tOgcoaGgIJRWwBdR0nj-lrN-70hcLMHVqNudk7FTJjOvE0KOCkDr0A==
|
||||
topic-root: imos-order
|
||||
Loading…
Reference in New Issue