123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266 |
- <?php
- namespace Workerman;
- class ServerAction extends \Action {
-
-
- const ERRNO_SUCCESS = 0;
-
- const ERRNO_POOL_NOT_FOUND = 100;
-
- const ERRNO_SEND_FAIL = 101;
-
- const ERRNO_RESP_TIMEOUT = 102;
- private $mConnectionPool = array();
-
- protected $mIsBinaryProto = true;
-
-
- protected function onWorkerStart( $worker ){
-
- }
-
-
- protected function onWorkerStop( $worker ){
-
- }
-
-
- private function onConnect( $connection ){
- $this->logDebug($connection->id. ' connected');
- }
-
-
- private function onClose( $connection ){
- $this->logDebug($connection->id. ' closed');
- //移除长连接
- $this->removeFromPool($connection);
- }
-
-
- private function onBufferFull( $connection ){
-
- }
-
-
- private function onBufferDrain( $connection ){
-
- }
-
-
- private function onError( $connection, $code, $msg ){
- //log_error("onError:" . $connection->imei . $msg);
- }
-
-
- protected function logDebug( $msg ){
- if (!APP_DEBUG)
- return;
- $logger = \Logger\FileLogger::getInstance(SOLUTION_LOG_PATH . APP_PREFIX .'/');
- $logger->log('',\Logger\FileLogger::DEBUG,$msg);
-
- }
-
-
- protected function logError( $msg ){
- $logger = \Logger\FileLogger::getInstance(SOLUTION_LOG_PATH .APP_PREFIX .'/');
- $logger->log('',\Logger\FileLogger::ERROR,$msg);
- }
-
-
- protected function addToPool( $key, $connection ){
- $connection->poolKey = $key;
- $connection->protoData = array();
- $this->mConnectionPool[$key] = $connection;
- $this->logDebug($key . " in pool");
- }
-
-
- protected function removeFromPool( $connection ){
- $key = $connection->poolKey;
- if($key){
- $this->logDebug("remove " . $key);
- unset( $this->mConnectionPool[$key]);
- }
- }
-
-
- protected function getFromPool( $key ){
- return $this->mConnectionPool[$key];
- $this->logDebug("use " . $key);
- }
-
-
- protected function sendWait( $key, $request, $respMethod, $respCallback, $respTimeout = 3 ){
- //检查是否回应的回调必填
- if($respMethod && !$respCallback){
- throw_exception('resp callback needed');
- }
-
- //获取连接对象
- $connection = $this->getFromPool($key);
- if(!$connection){
- $respCallback(self::ERRNO_POOL_NOT_FOUND);
- return;
- }
-
- //发送请求数据(需要等待回应)
- $result = $connection->send($request);
- if($result === false){
- $respCallback(self::ERRNO_SEND_FAIL);
- return;
- }
-
- //清空回应方法的数据
- $connection->protoData[$respMethod] = null;
-
- //开启时钟等待回应
- $interval = 0.5;
- $startTime = microtime(true);
- $timerId = \Workerman\Lib\Timer::add($interval, function() use($connection,$request,$startTime,$respMethod,$respTimeout,$respCallback,&$timerId)
- {
- //判断是否超时
- if(microtime(true) - $startTime > $respTimeout){
- \Workerman\Lib\Timer::del($timerId);
- $respCallback(self::ERRNO_RESP_TIMEOUT);
- return;
- }
- //判断是否接收到app的响应
- $resp = $connection->protoData[$respMethod];
- if( $resp ){
- \Workerman\Lib\Timer::del($timerId);
- $respCallback(self::ERRNO_SUCCESS,$resp);
- }
- });
- }
-
-
- protected function getErrnoText( $errno ){
- $text = array(
- self::ERRNO_POOL_NOT_FOUND => '连接池内不存在',
- self::ERRNO_RESP_TIMEOUT => '等待回应超时',
- self::ERRNO_SEND_FAIL => '发送失败',
- self::ERRNO_SUCCESS => '发送成功',
- );
- return $text[$errno] ? : '无效错误码';
-
- }
-
-
- public function start( $proto, $port, $count = 1, $transport ){
- //检查监听端口是否设置
- if(!$port){
- echo 'error:please set listening port'.PHP_EOL;
- exit;
- }
- //检查协议类型是否设置
- if(!$proto){
- echo 'error:please set proto type'.PHP_EOL;
- exit;
- }
- //启动监听
- Vendor('Workerman352.Autoloader');
- $worker = new \Workerman\Worker($proto ."://0.0.0.0:".$port);
- $worker->count = $count;
- if($transport)
- $worker->transport = $transport;
- $worker->onWorkerStart = function($worker){
- $this->onWorkerStart($worker);
- };
- $worker->onWorkerStop = function($worker) {
- $this->onWorkerStop($worker);
- };
- $worker->onConnect = function($connection){
- $this->onConnect($connection);
- };
- $worker->onMessage = function($connection,$data,$raw){
- $this->onMessage($connection,$data,$raw);
- };
- $worker->onClose = function($connection){
- $this->onClose($connection);
- };
- $worker->onBufferFull = function($connection){
- $this->onBufferFull($connection);
- };
- $worker->onBufferDrain = function($connection){
- $this->onBufferDrain($connection);
- };
- $worker->onError = function($connection, $code, $msg){
- $this->onError($connection, $code, $msg);
- };
- \Workerman\Worker::runAll();
-
- }
-
-
- protected function onMessage( $connection, $data, $raw ){
- $raw_msg = $this->mIsBinaryProto ? $this->bin2str($raw,true) : $raw;
- $this->logDebug("message:" . $raw_msg );
- if(!$data){
- $this->logError("message decode failed " . $raw_msg);
- return;
- }
- if(!is_array($data) && !is_object($data)){
- $this->logError("message type should be array or object");
- return;
- }
- //根据协议方法进行分发
- //检测data类型
- if (!is_array($data)) {
- $method = $data->method;
- if($method){
- $connection->protoData[$method] = $data;
- $method = 'on' . ucfirst($method);
- if(method_exists($this,$method))
- $this->$method($connection,$data,$raw);
- else
- $this->logError('method not exists . method=' . $method);
- }else{
- $this->logError('proto method not exists' . json_encode($data));
- }
- }elseif (count($data) == count($data, 1)) {
- $method = is_array($data) ? $data['method'] : $data->method;
- if($method){
- $connection->protoData[$method] = $data;
- $method = 'on' . ucfirst($method);
- if(method_exists($this,$method))
- $this->$method($connection,$data,$raw);
- else
- $this->logError('method not exists . method=' . $method);
- }else{
- $this->logError('proto method not exists' . json_encode($data));
- }
- }else{
- foreach($data as $row_data){
- $method = is_array($row_data) ? $row_data['method'] : $row_data->method;
- if($method){
- $connection->protoData[$method] = $row_data;
- $method = 'on' . ucfirst($method);
- if(method_exists($this,$method))
- $this->$method($connection,$row_data,$raw);
- else
- $this->logError('method not exists . method=' . $method);
- }else{
- $this->logError('proto method not exists' . json_encode($row_data));
- }
- }
- }
-
- }
-
-
- protected function bin2str( $hex, $space = false ){
- $data = unpack("C*chars",$hex);
- $bin = '';
- foreach($data as $key=>$value){
- $bin .= sprintf('%02X',$value);
- if($space)
- $bin .= ' ';
- }
- return trim($bin);
- }
-
- }
|