mqtt_recieve.lua 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. module(...,package.seeall)
  2. function table_leng(t)
  3. local leng=0
  4. for k, v in pairs(t) do
  5. leng=leng+1
  6. end
  7. return leng;
  8. end
  9. local DIR_PATH = {
  10. Users = "/sdcard0/common",
  11. Wrench = "/sdcard0/common",
  12. WorkingParts = "/sdcard0/common",
  13. WorkingPosition = "/sdcard0/common",
  14. WorkPlan = "/sdcard0/common",
  15. File = "/sdcard0/log",
  16. Wind = "/sdcard0/common"
  17. }
  18. local ALLOWCUSTOM_FILE = {
  19. Fan = 1,
  20. WorkingParts = 1,
  21. WorkingPosition = 1
  22. }
  23. -- 根据操作写文件
  24. function operateFs(op, cntType, cntVersion, content )
  25. -- body
  26. local opRes, count, desc = "",fileRes
  27. if op == "add" then
  28. -- body
  29. if DIR_PATH[cntType] then
  30. -- body
  31. if ALLOWCUSTOM_FILE[cntType] then
  32. -- body
  33. delBaseInfo(cntType, 0, DIR_PATH[cntType])
  34. end
  35. opRes, desc = saveBaseInfo(cntType, content, DIR_PATH[cntType] )
  36. else
  37. if ALLOWCUSTOM_FILE[cntType] then
  38. -- body
  39. delBaseInfo(cntType, 0, "/sdcard0/"..content.wnum)
  40. end
  41. if cntType == "WorkRecord" then
  42. -- body
  43. opRes, desc = saveBaseInfo(cntType, content, "/sdcard0/"..content.wnum .. "/" .. content.fnum)
  44. else
  45. opRes, desc = saveBaseInfo(cntType, content, "/sdcard0/"..content.wnum )
  46. end
  47. end
  48. end
  49. if op == "update" then
  50. -- body
  51. if DIR_PATH[cntType] then
  52. -- body
  53. opRes,count, desc = updateBaseInfo(cntType, content, DIR_PATH[cntType] )
  54. if not opRes or desc:sub(1, 29) == "this data not found where id=" then
  55. -- body
  56. opRes, desc = saveBaseInfo(cntType, content, DIR_PATH[cntType] )
  57. end
  58. else
  59. if cntType == "WorkRecord" then
  60. -- body
  61. opRes,count, desc = updateBaseInfo(cntType, content, "/sdcard0/"..content.wnum .. "/" .. content.fnum)
  62. else
  63. opRes,count, desc = updateBaseInfo(cntType, content, "/sdcard0/"..content.wnum )
  64. end
  65. if not opRes or desc:sub(1, 29) == "this data not found where id=" then
  66. -- body
  67. if cntType == "WorkRecord" then
  68. -- body
  69. opRes, desc = saveBaseInfo(cntType, content, "/sdcard0/"..content.wnum .. "/" .. content.fnum)
  70. else
  71. opRes, desc = saveBaseInfo(cntType, content, "/sdcard0/"..content.wnum )
  72. end
  73. end
  74. end
  75. end
  76. if op == "delete" then
  77. -- body
  78. if DIR_PATH[cntType] then
  79. -- body
  80. opRes,count, desc = delBaseInfo(cntType, content.id, DIR_PATH[cntType] )
  81. else
  82. if cntType == "WorkRecord" then
  83. -- body
  84. opRes,count, desc = delBaseInfo(cntType, content.id, "/sdcard0/"..content.wnum .. "/" .. content.fnum)
  85. else
  86. opRes,count, desc = delBaseInfo(cntType, content.id, "/sdcard0/"..content.wnum )
  87. end
  88. end
  89. end
  90. if op == "directive" then
  91. if cntType == "File" then
  92. -- body
  93. if content.file_name == "/sdcard0/log/0" then
  94. -- body
  95. opRes = logModuel.readfile(nil)
  96. else
  97. opRes = logModuel.readfile(content.file_name)
  98. end
  99. end
  100. if cntType == "SysCMD" then
  101. -- body
  102. if content.cmd_content == "Reboot" then
  103. -- body
  104. opRes = true
  105. sys.restart("Sever send reboot CMD! ")
  106. end
  107. if content.cmd_content == "ResetUploadCache" then
  108. -- body
  109. rtos.remove_dir("/sdcard0/common/temp")
  110. sys.restart("Server send ResetUploadCache CMD! ")
  111. end
  112. if content.cmd_content == "FormatSD" then
  113. -- body
  114. local fg = io.format(io.SDCARD)
  115. log.info("format res", fg)
  116. if fg == 1 then
  117. -- body
  118. nvm.set("localCntVersion", "0")
  119. sys.restart("Server send Format SDcard CMD! ")
  120. else
  121. opRes = false
  122. desc = "格式化SDcard失败"
  123. end
  124. end
  125. end
  126. if cntType == "Config" then
  127. -- body
  128. for k,v in pairs(content) do
  129. nvm.set(k, v)
  130. end
  131. opRes = true
  132. sys.restart("Sever send config update! ")
  133. end
  134. end
  135. local localVersion = nvm.get("localCntVersion")
  136. if cntVersion > localVersion then
  137. -- body
  138. local dateObj = os.date("*t")
  139. local updateTime = dateObj.year .. dateObj.month .. dateObj.day .. dateObj.hour .. dateObj.min .. dateObj.sec
  140. nvm.set("localCntVersion", cntVersion)
  141. nvm.set("localCntUpdateTime", updateTime)
  142. log.info("nvm set success, value:", cntVersion)
  143. end
  144. -- log.info("optype:",op,"cntType:", cntType, "opRes:",opRes, "count:",count, "desc:",desc)
  145. if not opRes then
  146. -- body
  147. if not desc then
  148. -- body
  149. desc = ""
  150. end
  151. logModuel.debug_log("mqtt_recieve operate fs failed! desc:"..desc..",optype:"..op..",cntVersion:"..cntVersion)
  152. end
  153. return opRes,desc
  154. end
  155. --- MQTT客户端数据接收处理
  156. -- @param mqttClient,MQTT客户端对象
  157. -- @return 处理成功返回true,处理出错返回false
  158. -- @usage mqttInMsg.proc(mqttClient)
  159. function proc(mqttClient)
  160. local result,data,param
  161. while true do
  162. result, data, param = mqttClient:receive(30000,"LOCAL_PUB_MSG")
  163. --接收到数据
  164. -- opType = "update"时,若不存在,则 add
  165. if result then
  166. local resData = json.decode(data.payload)
  167. log.info("origin mqttrec data:",data.payload)
  168. if resData.msgid and resData.success then
  169. -- body
  170. log.info("three topic msgid:",resData.msgid, "type:", type(resData.msgid))
  171. sys.publish("WAIT_UPDATA_RESP_SUCCESS", resData.msgid )
  172. break
  173. end
  174. local ack = {
  175. Imei = misc.getImei(),
  176. CntVersion = "",
  177. Succ = "",
  178. Extra = {}
  179. }
  180. --{"Extra":{},"Succ":"1","Imei":"863488052331447","CntVersion":"1659751519989"}
  181. local extrTable = {}
  182. local hasFile = false
  183. for k,v in pairs(resData) do
  184. local opRes, desc = operateFs(v.OpType, v.CntType, v.CntVersion, v.Content)
  185. if not opRes then
  186. -- body
  187. ack.Succ = "-1"
  188. table.insert(extrTable, "error_version:"..v.CntVersion..",OpType:"..v.OpType..",CntType:"..v.CntType..",desc:"..desc)
  189. end
  190. if v.OpType =="directive" and v.CntType == "File" then
  191. -- body
  192. table.insert(extrTable, "file_name:"..(v.Content.file_name or "unknown_file" )..",content:"..opRes)
  193. hasFile = true
  194. end
  195. end
  196. if ack.Succ == "" then
  197. -- body
  198. ack.Succ = "1"
  199. end
  200. ack.CntVersion = nvm.get("localCntVersion")
  201. ack.Extra = extrTable
  202. local mqttSendContent = ""
  203. if hasFile then
  204. -- body
  205. mqttSendContent = '{"Imei":' .. misc.getImei() ..',' .. '"CntVersion":'.. ack.CntVersion .. ',' .. '"Succ":'.. ack.Succ .. ','.. '"Extra":'.. table.concat(extrTable) .. '}'
  206. else
  207. mqttSendContent = json.encode(ack)
  208. end
  209. -- log.info("json temple:", att)
  210. -- mqttClient:publish("SHEGCL/IntelligenTool/Ack", json.encode(ack))
  211. mqttClient:publish("SHEGCL/IntelligenTool/Ack", mqttSendContent)
  212. elseif data == "LOCAL_PUB_MSG" then
  213. if type(param) == "string" then
  214. -- body
  215. param = json.decode(param)
  216. if param.topic and param.data then
  217. -- body
  218. -- if param.topic == "SHEGCL/IntelligenTool/UploadWorkRecordMA" or param.topic == "SHEGCL/IntelligenTool/UploadWorkRecordFA" or param.topic == "SHEGCL/IntelligenTool/Heart" then
  219. -- -- body
  220. -- upData.msgid = devTool.guid()
  221. -- end
  222. local upData = param.data
  223. upData.PublishVersion = nvm.get("localCntVersion")
  224. upData.IMEI = misc.getImei()
  225. mqttClient:publish(param.topic, json.encode(upData))
  226. end
  227. else
  228. log.error("mqtt_recieve.proc", "param type invalid!! param:",param)
  229. end
  230. elseif data == "timeout" then
  231. log.info("mqtt_recieve.proc", "mqtt wait receive timeout!")
  232. else
  233. break
  234. end
  235. end
  236. return result or data=="timeout" or data=="LOCAL_PUB_MSG"
  237. end
  238. --定时上传因网络问题未上传的数据
  239. function uploadOfflineData( )
  240. sys.wait(5000)
  241. -- body
  242. while true do
  243. if socket.isReady() then
  244. -- body
  245. local uploadData, cn, des = getBaseInfo_ununique("wait2upload", nil, "/sdcard0/common/temp")
  246. if not uploadData then
  247. -- body
  248. log.info("wait2upload file is empty!", "count:", cn, "des:", des)
  249. else
  250. for k,v in pairs(uploadData) do
  251. sys.publish("LOCAL_PUB_MSG", json.encode(v) )
  252. local uploadRes,msgid = sys.waitUntil("WAIT_UPDATA_RESP_SUCCESS", 3000)
  253. -- log.info("msgid:", msgid, "v.data.msgid:", v.data.msgid)
  254. if uploadRes and msgid then
  255. local re,be,ce = delBaseInfo("wait2upload",v.data.msgid,"/sdcard0/common/temp")
  256. -- log.info("re:",re,"be:",be, "ce:",ce)
  257. end
  258. sys.wait(500)
  259. end
  260. end
  261. end
  262. sys.wait(10000)
  263. end
  264. end
  265. sys.taskInit(uploadOfflineData)