V1Action.class.php 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. <?php
  2. class V1Action extends Action {
  3. public function kafka2createFile( ){
  4. $broker_list = C('KAFKA_BROKER_LIST');
  5. if (empty($broker_list)) {
  6. exit("KAFKA_BROKER_LIST must be config!".PHP_EOL);
  7. }
  8. $group = C('ROUTE_INDEX_KAFKA_GROUP_FTP');
  9. if (empty($group)) {
  10. exit("ROUTE_INDEX_KAFKA_GROUP_FTP must be config!".PHP_EOL);
  11. }
  12. $topics = C('ROUTE_INDEX_KAFKA_TOPIC');
  13. if (empty($topics)) {
  14. exit("ROUTE_INDEX_KAFKA_TOPIC must be config!".PHP_EOL);
  15. }
  16. $topics = explode(',',$topics);
  17. $conf = new RdKafka\Conf();
  18. // Set a rebalance callback to log partition assignments (optional)
  19. $conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
  20. switch ($err) {
  21. case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
  22. echo "Assign: ";
  23. var_dump($partitions);
  24. $kafka->assign($partitions);
  25. break;
  26. case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
  27. echo "Revoke: ";
  28. var_dump($partitions);
  29. $kafka->assign(NULL);
  30. break;
  31. default:
  32. throw new \Exception($err);
  33. }
  34. });
  35. // Configure the group.id. All consumer with the same group.id will consume
  36. // different partitions.
  37. $conf->set('group.id', $group);
  38. $conf->set('metadata.broker.list', $broker_list);
  39. $topicConf = new RdKafka\TopicConf();
  40. // Set where to start consuming messages when there is no initial offset in
  41. // offset store or the desired offset is out of range.
  42. // 'smallest': start from the beginning
  43. $topicConf->set('auto.offset.reset', 'smallest');
  44. // Set the configuration to use for subscribed/assigned topics
  45. $conf->setDefaultTopicConf($topicConf);
  46. $consumer = new RdKafka\KafkaConsumer($conf);
  47. // Subscribe to topic 'test'
  48. $consumer->subscribe($topics);
  49. $tmp_list = array();
  50. $start = time();
  51. $count = 0;
  52. //HC_YYYYMMDD_HHIISS.dat
  53. //HC_YYYYMMDD_HHIISS.md5
  54. $localDir = C('FTP_LOCAL_DIR');
  55. $timeFram = time();
  56. $createTime = date('Ymd_His', $timeFram);
  57. $fileTimeInterval = C('FTP_FILE_CREATE_INTERVAL');
  58. if(!$fileTimeInterval){
  59. $fileTimeInterval = 30;
  60. }
  61. // $x = 0;
  62. $sum = 0;
  63. $locationPack = '';
  64. $total = 0; //总数10000个数 消息总数 = total*10000+endsum
  65. $endsum = 0; //最后一次数量
  66. while (true) {
  67. $message = $consumer->consume(30*1000);
  68. // $_st = microtime(TRUE);
  69. switch ($message->err) {
  70. case RD_KAFKA_RESP_ERR_NO_ERROR:
  71. $locationPack .= ($message->payload) . PHP_EOL;
  72. $fileName = $localDir . '/HC_' . $createTime . '.dat';
  73. $runTime = time() - $timeFram;
  74. $sum++;
  75. echo $sum . PHP_EOL;
  76. if ($runTime < $fileTimeInterval) {
  77. if ($sum % 10000 == 0) {
  78. echo 'start write routefile...' . PHP_EOL;
  79. $datRes = $this->writeRouteFile($fileName, $locationPack);
  80. if ($datRes) {
  81. $locationPack = '';
  82. $sum = 0;
  83. $total++;
  84. }
  85. }
  86. } else {
  87. $md5Res = $this->createRouteMD5file($fileName);
  88. $timeFram = time();
  89. $createTime = date('Ymd_His', $timeFram);
  90. }
  91. break;
  92. case RD_KAFKA_RESP_ERR__PARTITION_EOF:
  93. echo "No more messages; will wait for more".PHP_EOL;
  94. break;
  95. case RD_KAFKA_RESP_ERR__TIMED_OUT:
  96. $fileName = $localDir . '/HC_' . $createTime . '.dat';
  97. if ($locationPack != '') {
  98. $datRes = $this->writeRouteFile($fileName, $locationPack);
  99. if ($datRes) {
  100. $endsum = $sum;
  101. $locationPack = '';
  102. $sum = 0;
  103. }
  104. }
  105. $md5Res = $this->createRouteMD5file($fileName);
  106. $timeFram = time();
  107. $createTime = date('Ymd_His', $timeFram);
  108. echo "Timed out\n";
  109. echo "消息总数total:" . ($total * 10000 + $endsum) . PHP_EOL;
  110. break;
  111. default:
  112. throw new \Exception($message->errstr(), $message->err);
  113. break;
  114. }
  115. }
  116. }
  117. private function writeRouteFile( $fileName, $data ){
  118. //deviceid,positioninfo.longitude,positioninfo.latitude,receivetime
  119. $config = C('FTP_CONFIG');
  120. debug_log('route_info', $data);
  121. $writeData = utf8_encode($data);
  122. $res = Zmcoding\FtpFile::getInstance($config)->writeFile($fileName, $writeData);
  123. return $res;
  124. }
  125. private function createRouteMD5file( $fileName ){
  126. if(!file_exists($fileName)){
  127. return false;
  128. }
  129. $contents = strlen( file_get_contents($fileName, null, null, 0, 1) );
  130. if($contents == 0){
  131. unlink($fileName);
  132. return false;
  133. }
  134. $config = C('FTP_CONFIG');
  135. $writeData = md5_file($fileName);
  136. $pathInfo = pathinfo($fileName);
  137. $md5File = $pathInfo['dirname'].'/'.$pathInfo['filename'].'.md5';
  138. $writeData = $writeData. ' '. $pathInfo['basename'];
  139. $res = Zmcoding\FtpFile::getInstance($config)->writeFile($md5File, $writeData);
  140. return $res;
  141. }
  142. private function getT61GpsParseReslut( $data ){
  143. if(!$data){
  144. echo 'data empty!'.PHP_EOL;
  145. return false;
  146. }
  147. $imei = $data['imei']['value'];
  148. if(!$imei){
  149. echo 'imei not existed!'.PHP_EOL;
  150. return false;
  151. }
  152. if(!isset($data['lng']['value'])){
  153. echo '['.$imei.']data.lng.value not set!'.PHP_EOL;
  154. return false;
  155. }
  156. //检测是否有打包位置参数
  157. $is_have_pkgfields = $data['extctrl']['extctrl_conf']['is_have_pkgfields'];
  158. if(!$is_have_pkgfields){
  159. echo '['.$imei.']is_have_pkgfields empty!'.PHP_EOL;
  160. return false;
  161. }
  162. //检测是否有打包位置参数
  163. $pkt_count = hexdec($data['pkt_count']['hex_value']);
  164. if(!$pkt_count){
  165. echo '['.$imei.']pkt_count empty!'.PHP_EOL;
  166. return false;
  167. }
  168. $pkt_position_params = $data['pkt_position_params'];
  169. if(!$pkt_position_params){
  170. echo '['.$imei.']pkt_position_params empty!'.PHP_EOL;
  171. return false;
  172. }
  173. //解析基准点定位信息
  174. $base_points_info = array(
  175. 'DeviceId' => $imei,
  176. 'AlarmStatus' => $data['alarm_status']['hex_value'],//告警状态
  177. 'DeviceStatus' => $data['device_status']['hex_value'],//设备状态
  178. 'Longitude' => $data['lng']['value'],
  179. 'Latitude' => $data['lat']['value'],
  180. 'DeviceTime' => $data['timestamp']['value'],//本次打包时,第一个终端位置数据采集时间
  181. 'Voltage' => $data['voltage']['value'],//终端外部供电电压
  182. );
  183. //var_dump($base_points_info);
  184. //echo 'lat_hex = '.$data['lat']['hex_value'].',lng_hex = '.$data['lng']['hex_value'].PHP_EOL;
  185. //解析打包位置参数
  186. $list = array();
  187. foreach($pkt_position_params as $key => $row){
  188. $tmp = array();
  189. $tmp['DeviceId'] = $base_points_info['DeviceId'];
  190. $tmp['Altitude'] = $row['altitude']['value'];
  191. $tmp['Speed'] = $row['speed']['value'];
  192. $tmp['Direction'] = $row['direction']['value'];
  193. $tmp['SatelliteCount'] = $row['satellite_count']['value'];
  194. if($key < 1){
  195. $tmp['DeviceTime'] = $row['relative_time']['value'] + $base_points_info['DeviceTime'];
  196. $tmp['Latitude'] = ($row['lat']['value'] + $base_points_info['Latitude'])/1000000;
  197. $tmp['Longitude'] = ($row['lng']['value'] + $base_points_info['Longitude'])/1000000;
  198. }else{
  199. $tmp['DeviceTime'] = $row['relative_time']['value'] + $list[$key-1]['DeviceTime'];
  200. $tmp['Latitude'] = $row['lat']['value']/1000000 + $list[$key-1]['Latitude'];
  201. $tmp['Longitude'] = $row['lng']['value']/1000000 + $list[$key-1]['Longitude'];
  202. }
  203. $list[$key] = $tmp;
  204. }
  205. return $list;
  206. }
  207. }