logDebug($connection->id. ' connected'); } private function onClose( $connection ){ $this->logDebug($connection->id. ' closed'); //移除长连接 $this->removeFromPool($connection); } private function onBufferFull( $connection ){ } private function onBufferDrain( $connection ){ } private function onError( $connection, $code, $msg ){ //log_error("onError:" . $connection->imei . $msg); } protected function logDebug( $msg ){ if (!APP_DEBUG) return; $logger = \Logger\FileLogger::getInstance(SOLUTION_LOG_PATH . APP_PREFIX .'/'); $logger->log('',\Logger\FileLogger::DEBUG,$msg); } protected function logError( $msg ){ $logger = \Logger\FileLogger::getInstance(SOLUTION_LOG_PATH .APP_PREFIX .'/'); $logger->log('',\Logger\FileLogger::ERROR,$msg); } protected function addToPool( $key, $connection ){ $connection->poolKey = $key; $connection->protoData = array(); $this->mConnectionPool[$key] = $connection; $this->logDebug($key . " in pool"); } protected function removeFromPool( $connection ){ $key = $connection->poolKey; if($key){ $this->logDebug("remove " . $key); unset( $this->mConnectionPool[$key]); } } protected function getFromPool( $key ){ return $this->mConnectionPool[$key]; $this->logDebug("use " . $key); } protected function sendWait( $key, $request, $respMethod, $respCallback, $respTimeout = 3 ){ //检查是否回应的回调必填 if($respMethod && !$respCallback){ throw_exception('resp callback needed'); } //获取连接对象 $connection = $this->getFromPool($key); if(!$connection){ $respCallback(self::ERRNO_POOL_NOT_FOUND); return; } //发送请求数据(需要等待回应) $result = $connection->send($request); if($result === false){ $respCallback(self::ERRNO_SEND_FAIL); return; } //清空回应方法的数据 $connection->protoData[$respMethod] = null; //开启时钟等待回应 $interval = 0.5; $startTime = microtime(true); $timerId = \Workerman\Lib\Timer::add($interval, function() use($connection,$request,$startTime,$respMethod,$respTimeout,$respCallback,&$timerId) { //判断是否超时 if(microtime(true) - $startTime > $respTimeout){ \Workerman\Lib\Timer::del($timerId); $respCallback(self::ERRNO_RESP_TIMEOUT); return; } //判断是否接收到app的响应 $resp = $connection->protoData[$respMethod]; if( $resp ){ \Workerman\Lib\Timer::del($timerId); $respCallback(self::ERRNO_SUCCESS,$resp); } }); } protected function getErrnoText( $errno ){ $text = array( self::ERRNO_POOL_NOT_FOUND => '连接池内不存在', self::ERRNO_RESP_TIMEOUT => '等待回应超时', self::ERRNO_SEND_FAIL => '发送失败', self::ERRNO_SUCCESS => '发送成功', ); return $text[$errno] ? : '无效错误码'; } public function start( $proto, $port, $count = 1, $transport ){ //检查监听端口是否设置 if(!$port){ echo 'error:please set listening port'.PHP_EOL; exit; } //检查协议类型是否设置 if(!$proto){ echo 'error:please set proto type'.PHP_EOL; exit; } //启动监听 Vendor('Workerman352.Autoloader'); $worker = new \Workerman\Worker($proto ."://0.0.0.0:".$port); $worker->count = $count; if($transport) $worker->transport = $transport; $worker->onWorkerStart = function($worker){ $this->onWorkerStart($worker); }; $worker->onWorkerStop = function($worker) { $this->onWorkerStop($worker); }; $worker->onConnect = function($connection){ $this->onConnect($connection); }; $worker->onMessage = function($connection,$data,$raw){ $this->onMessage($connection,$data,$raw); }; $worker->onClose = function($connection){ $this->onClose($connection); }; $worker->onBufferFull = function($connection){ $this->onBufferFull($connection); }; $worker->onBufferDrain = function($connection){ $this->onBufferDrain($connection); }; $worker->onError = function($connection, $code, $msg){ $this->onError($connection, $code, $msg); }; \Workerman\Worker::runAll(); } protected function onMessage( $connection, $data, $raw ){ $raw_msg = $this->mIsBinaryProto ? $this->bin2str($raw,true) : $raw; $this->logDebug("message:" . $raw_msg ); if(!$data){ $this->logError("message decode failed " . $raw_msg); return; } if(!is_array($data) && !is_object($data)){ $this->logError("message type should be array or object"); return; } //根据协议方法进行分发 //检测data类型 if (!is_array($data)) { $method = $data->method; if($method){ $connection->protoData[$method] = $data; $method = 'on' . ucfirst($method); if(method_exists($this,$method)) $this->$method($connection,$data,$raw); else $this->logError('method not exists . method=' . $method); }else{ $this->logError('proto method not exists' . json_encode($data)); } }elseif (count($data) == count($data, 1)) { $method = is_array($data) ? $data['method'] : $data->method; if($method){ $connection->protoData[$method] = $data; $method = 'on' . ucfirst($method); if(method_exists($this,$method)) $this->$method($connection,$data,$raw); else $this->logError('method not exists . method=' . $method); }else{ $this->logError('proto method not exists' . json_encode($data)); } }else{ foreach($data as $row_data){ $method = is_array($row_data) ? $row_data['method'] : $row_data->method; if($method){ $connection->protoData[$method] = $row_data; $method = 'on' . ucfirst($method); if(method_exists($this,$method)) $this->$method($connection,$row_data,$raw); else $this->logError('method not exists . method=' . $method); }else{ $this->logError('proto method not exists' . json_encode($row_data)); } } } } protected function bin2str( $hex, $space = false ){ $data = unpack("C*chars",$hex); $bin = ''; foreach($data as $key=>$value){ $bin .= sprintf('%02X',$value); if($space) $bin .= ' '; } return trim($bin); } }