123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238 |
- <?php
- class V1Action extends Action {
-
-
- public function kafka2createFile( ){
-
- $broker_list = C('KAFKA_BROKER_LIST');
-
- if (empty($broker_list)) {
- exit("KAFKA_BROKER_LIST must be config!".PHP_EOL);
- }
- $group = C('ROUTE_INDEX_KAFKA_GROUP_FTP');
- if (empty($group)) {
- exit("ROUTE_INDEX_KAFKA_GROUP_FTP must be config!".PHP_EOL);
- }
- $topics = C('ROUTE_INDEX_KAFKA_TOPIC');
- if (empty($topics)) {
- exit("ROUTE_INDEX_KAFKA_TOPIC must be config!".PHP_EOL);
- }
- $topics = explode(',',$topics);
-
- $conf = new RdKafka\Conf();
- // Set a rebalance callback to log partition assignments (optional)
- $conf->setRebalanceCb(function (RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
- switch ($err) {
- case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
- echo "Assign: ";
- var_dump($partitions);
- $kafka->assign($partitions);
- break;
-
- case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
- echo "Revoke: ";
- var_dump($partitions);
- $kafka->assign(NULL);
- break;
-
- default:
- throw new \Exception($err);
- }
- });
-
- // Configure the group.id. All consumer with the same group.id will consume
- // different partitions.
- $conf->set('group.id', $group);
-
- $conf->set('metadata.broker.list', $broker_list);
-
- $topicConf = new RdKafka\TopicConf();
-
- // Set where to start consuming messages when there is no initial offset in
- // offset store or the desired offset is out of range.
- // 'smallest': start from the beginning
- $topicConf->set('auto.offset.reset', 'smallest');
-
- // Set the configuration to use for subscribed/assigned topics
- $conf->setDefaultTopicConf($topicConf);
-
- $consumer = new RdKafka\KafkaConsumer($conf);
-
- // Subscribe to topic 'test'
- $consumer->subscribe($topics);
-
- $tmp_list = array();
- $start = time();
- $count = 0;
-
-
- //HC_YYYYMMDD_HHIISS.dat
- //HC_YYYYMMDD_HHIISS.md5
- $localDir = C('FTP_LOCAL_DIR');
- $timeFram = time();
- $createTime = date('Ymd_His', $timeFram);
- $fileTimeInterval = C('FTP_FILE_CREATE_INTERVAL');
- if(!$fileTimeInterval){
- $fileTimeInterval = 30;
- }
- // $x = 0;
- // $sum = 0;
- while (true) {
- $message = $consumer->consume(30*1000);
- // $_st = microtime(TRUE);
- switch ($message->err) {
- case RD_KAFKA_RESP_ERR_NO_ERROR:
- $locationPack = ($message->payload).'\n';
-
- $fileName = $localDir.'/HC_'.json_decode($message->payload)->mac.$createTime . '.dat';
- $runTime = time() - $timeFram;
-
- if($runTime < $fileTimeInterval){
- $datRes = $this->writeRouteFile($fileName, $locationPack);
- }else{
- $md5Res = $this->createRouteMD5file($fileName);
-
- $timeFram = time();
- $createTime = date('Ymd_His', $timeFram);
- $datRes = $this->writeRouteFile($fileName);
- }
- break;
- case RD_KAFKA_RESP_ERR__PARTITION_EOF:
- echo "No more messages; will wait for more".PHP_EOL;
- break;
- case RD_KAFKA_RESP_ERR__TIMED_OUT:
- $fileName = $localDir.'/HC_'.json_decode($message->payload)->mac.$createTime . '.dat';
- $md5Res = $this->createRouteMD5file($fileName);
- $timeFram = time();
- $createTime = date('Ymd_His', $timeFram);
- echo "Timed out\n";
- break;
- default:
- throw new \Exception($message->errstr(), $message->err);
- break;
- }
- }
- }
-
-
- private function writeRouteFile( $fileName, $data ){
- //deviceid,positioninfo.longitude,positioninfo.latitude,receivetime
- $config = C('FTP_CONFIG');
-
- $sRedis = Redis('open_vehicle', 'hash');
-
- $writeData = $data;
-
- if(!file_exists($fileName)){
- $writeData = 'deviceid,positioninfo.longitude,positioninfo.latitude,receivetime'.PHP_EOL.$writeData;
- }
- debug_log('route_info.log',json_encode($writeData,JSON_UNESCAPED_UNICODE));
-
- $writeData = utf8_encode($writeData);
-
- $res = Zmcoding\FtpFile::getInstance($config)->writeFile($fileName, $writeData.PHP_EOL);
-
- return $res;
- }
-
-
- private function createRouteMD5file( $fileName ){
- if(!file_exists($fileName)){
- return false;
- }
- $contents = strlen( file_get_contents($fileName, null, null, 0, 1) );
- if($contents == 0){
- unlink($fileName);
- return false;
- }
- $config = C('FTP_CONFIG');
- $writeData = md5_file($fileName);
-
- $pathInfo = pathinfo($fileName);
- $md5File = $pathInfo['dirname'].'/'.$pathInfo['filename'].'.md5';
-
- $writeData = $writeData. ' '. $pathInfo['basename'];
-
- $res = Zmcoding\FtpFile::getInstance($config)->writeFile($md5File, $writeData);
-
- return $res;
- }
-
-
- private function getT61GpsParseReslut( $data ){
- if(!$data){
- echo 'data empty!'.PHP_EOL;
- return false;
- }
- $imei = $data['imei']['value'];
- if(!$imei){
- echo 'imei not existed!'.PHP_EOL;
- return false;
- }
- if(!isset($data['lng']['value'])){
- echo '['.$imei.']data.lng.value not set!'.PHP_EOL;
- return false;
- }
- //检测是否有打包位置参数
- $is_have_pkgfields = $data['extctrl']['extctrl_conf']['is_have_pkgfields'];
- if(!$is_have_pkgfields){
- echo '['.$imei.']is_have_pkgfields empty!'.PHP_EOL;
- return false;
- }
-
- //检测是否有打包位置参数
- $pkt_count = hexdec($data['pkt_count']['hex_value']);
- if(!$pkt_count){
- echo '['.$imei.']pkt_count empty!'.PHP_EOL;
- return false;
- }
-
- $pkt_position_params = $data['pkt_position_params'];
- if(!$pkt_position_params){
- echo '['.$imei.']pkt_position_params empty!'.PHP_EOL;
- return false;
- }
-
- //解析基准点定位信息
- $base_points_info = array(
- 'DeviceId' => $imei,
- 'AlarmStatus' => $data['alarm_status']['hex_value'],//告警状态
- 'DeviceStatus' => $data['device_status']['hex_value'],//设备状态
- 'Longitude' => $data['lng']['value'],
- 'Latitude' => $data['lat']['value'],
- 'DeviceTime' => $data['timestamp']['value'],//本次打包时,第一个终端位置数据采集时间
- 'Voltage' => $data['voltage']['value'],//终端外部供电电压
- );
- //var_dump($base_points_info);
- //echo 'lat_hex = '.$data['lat']['hex_value'].',lng_hex = '.$data['lng']['hex_value'].PHP_EOL;
- //解析打包位置参数
- $list = array();
- foreach($pkt_position_params as $key => $row){
- $tmp = array();
- $tmp['DeviceId'] = $base_points_info['DeviceId'];
- $tmp['Altitude'] = $row['altitude']['value'];
- $tmp['Speed'] = $row['speed']['value'];
- $tmp['Direction'] = $row['direction']['value'];
- $tmp['SatelliteCount'] = $row['satellite_count']['value'];
- if($key < 1){
- $tmp['DeviceTime'] = $row['relative_time']['value'] + $base_points_info['DeviceTime'];
- $tmp['Latitude'] = ($row['lat']['value'] + $base_points_info['Latitude'])/1000000;
- $tmp['Longitude'] = ($row['lng']['value'] + $base_points_info['Longitude'])/1000000;
- }else{
- $tmp['DeviceTime'] = $row['relative_time']['value'] + $list[$key-1]['DeviceTime'];
- $tmp['Latitude'] = $row['lat']['value']/1000000 + $list[$key-1]['Latitude'];
- $tmp['Longitude'] = $row['lng']['value']/1000000 + $list[$key-1]['Longitude'];
- }
- $list[$key] = $tmp;
- }
-
- return $list;
- }
-
- }
|