V1Action.class.php 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. <?php
  2. class V1Action extends Action {
  3. public function kafka2createFile_route( ){
  4. ini_set("memory_limit", "1024M");
  5. $broker_list = C('KAFKA_BROKER_LIST');
  6. if (empty($broker_list)) {
  7. exit("KAFKA_BROKER_LIST must be config!".PHP_EOL);
  8. }
  9. $group = C('ROUTE_INDEX_KAFKA_GROUP_FTP');
  10. if (empty($group)) {
  11. exit("ROUTE_INDEX_KAFKA_GROUP_FTP must be config!".PHP_EOL);
  12. }
  13. $topics = C('ROUTE_INDEX_KAFKA_TOPIC');
  14. if (empty($topics)) {
  15. exit("ROUTE_INDEX_KAFKA_TOPIC must be config!".PHP_EOL);
  16. }
  17. $topics = explode(',',$topics);
  18. $conf = new RdKafka\Conf();
  19. // Set a rebalance callback to log partition assignments (optional)
  20. $conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
  21. switch ($err) {
  22. case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
  23. echo "Assign: ";
  24. var_dump($partitions);
  25. $kafka->assign($partitions);
  26. break;
  27. case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
  28. echo "Revoke: ";
  29. var_dump($partitions);
  30. $kafka->assign(NULL);
  31. break;
  32. default:
  33. throw new \Exception($err);
  34. }
  35. });
  36. // Configure the group.id. All consumer with the same group.id will consume
  37. // different partitions.
  38. $conf->set('group.id', $group);
  39. $conf->set('metadata.broker.list', $broker_list);
  40. $topicConf = new RdKafka\TopicConf();
  41. // Set where to start consuming messages when there is no initial offset in
  42. // offset store or the desired offset is out of range.
  43. // 'smallest': start from the beginning
  44. $topicConf->set('auto.offset.reset', 'smallest');
  45. // Set the configuration to use for subscribed/assigned topics
  46. $conf->setDefaultTopicConf($topicConf);
  47. $consumer = new RdKafka\KafkaConsumer($conf);
  48. // Subscribe to topic 'test'
  49. $consumer->subscribe($topics);
  50. //HC_YYYYMMDD_HHIISS.dat
  51. //HC_YYYYMMDD_HHIISS.md5
  52. $localDir = C('FTP_LOCAL_DIR');
  53. $timeFram = time();
  54. $random = rand(100,999);
  55. $createTime = date('Ymd_His', $timeFram).$random;
  56. $fileTimeInterval = C('FTP_FILE_CREATE_INTERVAL');
  57. if(!$fileTimeInterval){
  58. $fileTimeInterval = 30;
  59. }
  60. $sum = 0;//每次写入一个文件的数据条数
  61. $locationPack = '';
  62. $total = 0; //总数10000个数 消息总数 = total*10000+endsum
  63. $endsum = 0; //最后一次数量
  64. while (true) {
  65. $message = $consumer->consume(30*1000);
  66. // $_st = microtime(TRUE);
  67. switch ($message->err) {
  68. case RD_KAFKA_RESP_ERR_NO_ERROR:
  69. $locationPack .= ($message->payload) . PHP_EOL;
  70. $fileName = $localDir . '/route_' . $createTime . '.dat';
  71. $runTime = time() - $timeFram;
  72. $sum++;
  73. if($sum % 10000 ==0){
  74. echo $sum . PHP_EOL;
  75. }
  76. if ($runTime < $fileTimeInterval) {
  77. if ($sum % 50000 == 0) {
  78. echo "start write routefile to $fileName" . PHP_EOL;
  79. //echo 'locationPack length ' .strlen($locationPack).PHP_EOL;
  80. $datRes = $this->writeRouteFile($fileName, $locationPack);
  81. if ($datRes) {
  82. echo "write success...$sum 条" . PHP_EOL;
  83. $locationPack = '';
  84. $sum = 0;
  85. $total++;
  86. }else{
  87. $locationPack = '';
  88. $sum = 0;
  89. echo 'file data write failed ,please check filepath or file permission';
  90. }
  91. }
  92. } else {
  93. $res = $this->createRouteMD5file($fileName);
  94. if($res){
  95. echo "file completed" . PHP_EOL;
  96. }
  97. $timeFram = time();
  98. $random = rand(100, 999);
  99. $createTime = date('Ymd_His', $timeFram) . $random;
  100. }
  101. break;
  102. case RD_KAFKA_RESP_ERR__PARTITION_EOF:
  103. //echo "No more messages; will wait for more".PHP_EOL;
  104. break;
  105. case RD_KAFKA_RESP_ERR__TIMED_OUT:
  106. $fileName = $localDir . '/route_' . $createTime . '.dat';
  107. if ($locationPack != '') {
  108. echo "start write routefile to $fileName" . PHP_EOL;
  109. $datRes = $this->writeRouteFile($fileName, $locationPack);
  110. if ($datRes) {
  111. $endsum = $sum;
  112. $locationPack = '';
  113. $sum = 0;
  114. echo "write success...$endsum 条" . PHP_EOL;
  115. }
  116. $this->createRouteMD5file($fileName);
  117. echo "file completed" . PHP_EOL;
  118. }
  119. $timeFram = time();
  120. $random = rand(100,999);
  121. $createTime = date('Ymd_His', $timeFram).$random;
  122. echo "Timed out\n";
  123. echo "消息总数total:" . ($total * 50000 + $endsum) . PHP_EOL;
  124. break;
  125. default:
  126. throw new \Exception($message->errstr(), $message->err);
  127. break;
  128. }
  129. }
  130. }
  131. private function writeRouteFile( $fileName, $data ){
  132. //deviceid,positioninfo.longitude,positioninfo.latitude,receivetime
  133. $config = C('FTP_CONFIG');
  134. debug_log('route_info', $data);
  135. $r = file_put_contents($fileName, $data, FILE_APPEND);
  136. if($r === false){
  137. debug_log('route_info', 'file write fail'.$fileName);
  138. return false;
  139. }
  140. return true;
  141. //$res = Zmcoding\FtpFile::getInstance($config)->writeFile($fileName, $data);
  142. //return $res;
  143. }
  144. private function createRouteMD5file( $fileName ){
  145. if(!file_exists($fileName)){
  146. return false;
  147. }
  148. $contents = strlen( file_get_contents($fileName, null, null, 0, 1) );
  149. if($contents == 0){
  150. unlink($fileName);
  151. return false;
  152. }
  153. //$config = C('FTP_CONFIG');
  154. //$pathInfo = pathinfo($fileName);
  155. //$md5File = $pathInfo['dirname'].'/'.$pathInfo['filename'].'.md5';
  156. //$res = Zmcoding\FtpFile::getInstance($config)->writeFile($md5File, '');
  157. //return $res;
  158. $pathInfo = pathinfo($fileName);
  159. $md5File = $pathInfo['dirname'].'/'.$pathInfo['filename'].'.md5';
  160. $r = file_put_contents($md5File, '', FILE_APPEND);
  161. if($r === false){
  162. debug_log('md5', 'file write fail'.$md5File);
  163. return false;
  164. }
  165. return true;
  166. }
  167. private function getT61GpsParseReslut( $data ){
  168. if(!$data){
  169. echo 'data empty!'.PHP_EOL;
  170. return false;
  171. }
  172. $imei = $data['imei']['value'];
  173. if(!$imei){
  174. echo 'imei not existed!'.PHP_EOL;
  175. return false;
  176. }
  177. if(!isset($data['lng']['value'])){
  178. echo '['.$imei.']data.lng.value not set!'.PHP_EOL;
  179. return false;
  180. }
  181. //检测是否有打包位置参数
  182. $is_have_pkgfields = $data['extctrl']['extctrl_conf']['is_have_pkgfields'];
  183. if(!$is_have_pkgfields){
  184. echo '['.$imei.']is_have_pkgfields empty!'.PHP_EOL;
  185. return false;
  186. }
  187. //检测是否有打包位置参数
  188. $pkt_count = hexdec($data['pkt_count']['hex_value']);
  189. if(!$pkt_count){
  190. echo '['.$imei.']pkt_count empty!'.PHP_EOL;
  191. return false;
  192. }
  193. $pkt_position_params = $data['pkt_position_params'];
  194. if(!$pkt_position_params){
  195. echo '['.$imei.']pkt_position_params empty!'.PHP_EOL;
  196. return false;
  197. }
  198. //解析基准点定位信息
  199. $base_points_info = array(
  200. 'DeviceId' => $imei,
  201. 'AlarmStatus' => $data['alarm_status']['hex_value'],//告警状态
  202. 'DeviceStatus' => $data['device_status']['hex_value'],//设备状态
  203. 'Longitude' => $data['lng']['value'],
  204. 'Latitude' => $data['lat']['value'],
  205. 'DeviceTime' => $data['timestamp']['value'],//本次打包时,第一个终端位置数据采集时间
  206. 'Voltage' => $data['voltage']['value'],//终端外部供电电压
  207. );
  208. //var_dump($base_points_info);
  209. //echo 'lat_hex = '.$data['lat']['hex_value'].',lng_hex = '.$data['lng']['hex_value'].PHP_EOL;
  210. //解析打包位置参数
  211. $list = array();
  212. foreach($pkt_position_params as $key => $row){
  213. $tmp = array();
  214. $tmp['DeviceId'] = $base_points_info['DeviceId'];
  215. $tmp['Altitude'] = $row['altitude']['value'];
  216. $tmp['Speed'] = $row['speed']['value'];
  217. $tmp['Direction'] = $row['direction']['value'];
  218. $tmp['SatelliteCount'] = $row['satellite_count']['value'];
  219. if($key < 1){
  220. $tmp['DeviceTime'] = $row['relative_time']['value'] + $base_points_info['DeviceTime'];
  221. $tmp['Latitude'] = ($row['lat']['value'] + $base_points_info['Latitude'])/1000000;
  222. $tmp['Longitude'] = ($row['lng']['value'] + $base_points_info['Longitude'])/1000000;
  223. }else{
  224. $tmp['DeviceTime'] = $row['relative_time']['value'] + $list[$key-1]['DeviceTime'];
  225. $tmp['Latitude'] = $row['lat']['value']/1000000 + $list[$key-1]['Latitude'];
  226. $tmp['Longitude'] = $row['lng']['value']/1000000 + $list[$key-1]['Longitude'];
  227. }
  228. $list[$key] = $tmp;
  229. }
  230. return $list;
  231. }
  232. public function redis2createFile_station_vehicle( ){
  233. ini_set("memory_limit", "1024M");
  234. $redis = Redis('ningbo_fangdao_sync_data','queue');
  235. /**模拟数据
  236. for($i = 0;$i<23000;$i++){
  237. $redis->add('{"PLATE_NO":"余姚698951","RFID_SN":"031FABDC","CAR_TYPE":"1","CAR_BRAND":"1","NAME":"叶春红","ID_CARD_NUMBER":"330219197001274302","MOBILE_NUMBER":"13958363623","type":1,"creator_id":1,"OLD_NO":"余姚698951","DATA_TYPE":"vehicle_update"}');
  238. }
  239. exit;
  240. */
  241. $localDir = C('FTP_LOCAL_DIR');
  242. $timeFram = time();
  243. $createTime = date('Ymd_His', $timeFram);
  244. $fileTimeInterval = C('FTP_FILE_CREATE_INTERVAL');
  245. if(!$fileTimeInterval){
  246. $fileTimeInterval = 30;
  247. }
  248. $sum = 0;//消息数据数量
  249. $locationPack = '';
  250. $total = 0; //总数10000个数 消息总数 = total*10000+endsum
  251. $endsum = 0; //最后一次数量
  252. while (true) {
  253. $message = $redis->pop();
  254. if(!$message){
  255. echo 'waiting for data from redis'.PHP_EOL;
  256. $fileName = $localDir . '/station_' . $createTime . '.dat';
  257. if ($locationPack != '') {
  258. $datRes = $this->writeRouteFile($fileName, $locationPack);
  259. if ($datRes) {
  260. $endsum = $sum;
  261. $locationPack = '';
  262. $sum = 0;
  263. }
  264. $this->createRouteMD5file($fileName);
  265. $timeFram = time();
  266. $createTime = date('Ymd_His', $timeFram);
  267. echo "Timed out\n";
  268. echo "消息总数total:" . ($total * 10000 + $endsum) . PHP_EOL;
  269. }
  270. sleep(3);
  271. }else{
  272. $locationPack .= json_encode($message,JSON_UNESCAPED_UNICODE) . PHP_EOL;
  273. $fileName = $localDir . '/station_' . $createTime . '.dat';
  274. $runTime = time() - $timeFram;
  275. $sum++;
  276. if($sum % 10000 ==0){
  277. echo $sum . PHP_EOL;
  278. }
  279. if ($runTime < $fileTimeInterval) {
  280. if ($sum % 10000 == 0) {
  281. //echo 'start write routefile...' . PHP_EOL;
  282. echo 'locationPack length ' .strlen($locationPack).PHP_EOL;
  283. $datRes = $this->writeRouteFile($fileName, $locationPack);
  284. if ($datRes) {
  285. $locationPack = '';
  286. $sum = 0;
  287. $total++;
  288. }
  289. }
  290. } else {
  291. $this->createRouteMD5file($fileName);
  292. $timeFram = time();
  293. $createTime = date('Ymd_His', $timeFram);
  294. }
  295. }
  296. }
  297. }
  298. }