ServerAction.php 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266
  1. <?php
  2. namespace Workerman;
  3. class ServerAction extends \Action {
  4. const ERRNO_SUCCESS = 0;
  5. const ERRNO_POOL_NOT_FOUND = 100;
  6. const ERRNO_SEND_FAIL = 101;
  7. const ERRNO_RESP_TIMEOUT = 102;
  8. private $mConnectionPool = array();
  9. protected $mIsBinaryProto = true;
  10. protected function onWorkerStart( $worker ){
  11. }
  12. protected function onWorkerStop( $worker ){
  13. }
  14. private function onConnect( $connection ){
  15. $this->logDebug($connection->id. ' connected');
  16. }
  17. private function onClose( $connection ){
  18. $this->logDebug($connection->id. ' closed');
  19. //移除长连接
  20. $this->removeFromPool($connection);
  21. }
  22. private function onBufferFull( $connection ){
  23. }
  24. private function onBufferDrain( $connection ){
  25. }
  26. private function onError( $connection, $code, $msg ){
  27. //log_error("onError:" . $connection->imei . $msg);
  28. }
  29. protected function logDebug( $msg ){
  30. if (!APP_DEBUG)
  31. return;
  32. $logger = \Logger\FileLogger::getInstance(SOLUTION_LOG_PATH . APP_PREFIX .'/');
  33. $logger->log('',\Logger\FileLogger::DEBUG,$msg);
  34. }
  35. protected function logError( $msg ){
  36. $logger = \Logger\FileLogger::getInstance(SOLUTION_LOG_PATH .APP_PREFIX .'/');
  37. $logger->log('',\Logger\FileLogger::ERROR,$msg);
  38. }
  39. protected function addToPool( $key, $connection ){
  40. $connection->poolKey = $key;
  41. $connection->protoData = array();
  42. $this->mConnectionPool[$key] = $connection;
  43. $this->logDebug($key . " in pool");
  44. }
  45. protected function removeFromPool( $connection ){
  46. $key = $connection->poolKey;
  47. if($key){
  48. $this->logDebug("remove " . $key);
  49. unset( $this->mConnectionPool[$key]);
  50. }
  51. }
  52. protected function getFromPool( $key ){
  53. return $this->mConnectionPool[$key];
  54. $this->logDebug("use " . $key);
  55. }
  56. protected function sendWait( $key, $request, $respMethod, $respCallback, $respTimeout = 3 ){
  57. //检查是否回应的回调必填
  58. if($respMethod && !$respCallback){
  59. throw_exception('resp callback needed');
  60. }
  61. //获取连接对象
  62. $connection = $this->getFromPool($key);
  63. if(!$connection){
  64. $respCallback(self::ERRNO_POOL_NOT_FOUND);
  65. return;
  66. }
  67. //发送请求数据(需要等待回应)
  68. $result = $connection->send($request);
  69. if($result === false){
  70. $respCallback(self::ERRNO_SEND_FAIL);
  71. return;
  72. }
  73. //清空回应方法的数据
  74. $connection->protoData[$respMethod] = null;
  75. //开启时钟等待回应
  76. $interval = 0.5;
  77. $startTime = microtime(true);
  78. $timerId = \Workerman\Lib\Timer::add($interval, function() use($connection,$request,$startTime,$respMethod,$respTimeout,$respCallback,&$timerId)
  79. {
  80. //判断是否超时
  81. if(microtime(true) - $startTime > $respTimeout){
  82. \Workerman\Lib\Timer::del($timerId);
  83. $respCallback(self::ERRNO_RESP_TIMEOUT);
  84. return;
  85. }
  86. //判断是否接收到app的响应
  87. $resp = $connection->protoData[$respMethod];
  88. if( $resp ){
  89. \Workerman\Lib\Timer::del($timerId);
  90. $respCallback(self::ERRNO_SUCCESS,$resp);
  91. }
  92. });
  93. }
  94. protected function getErrnoText( $errno ){
  95. $text = array(
  96. self::ERRNO_POOL_NOT_FOUND => '连接池内不存在',
  97. self::ERRNO_RESP_TIMEOUT => '等待回应超时',
  98. self::ERRNO_SEND_FAIL => '发送失败',
  99. self::ERRNO_SUCCESS => '发送成功',
  100. );
  101. return $text[$errno] ? : '无效错误码';
  102. }
  103. public function start( $proto, $port, $count = 1, $transport ){
  104. //检查监听端口是否设置
  105. if(!$port){
  106. echo 'error:please set listening port'.PHP_EOL;
  107. exit;
  108. }
  109. //检查协议类型是否设置
  110. if(!$proto){
  111. echo 'error:please set proto type'.PHP_EOL;
  112. exit;
  113. }
  114. //启动监听
  115. Vendor('Workerman352.Autoloader');
  116. $worker = new \Workerman\Worker($proto ."://0.0.0.0:".$port);
  117. $worker->count = $count;
  118. if($transport)
  119. $worker->transport = $transport;
  120. $worker->onWorkerStart = function($worker){
  121. $this->onWorkerStart($worker);
  122. };
  123. $worker->onWorkerStop = function($worker) {
  124. $this->onWorkerStop($worker);
  125. };
  126. $worker->onConnect = function($connection){
  127. $this->onConnect($connection);
  128. };
  129. $worker->onMessage = function($connection,$data,$raw){
  130. $this->onMessage($connection,$data,$raw);
  131. };
  132. $worker->onClose = function($connection){
  133. $this->onClose($connection);
  134. };
  135. $worker->onBufferFull = function($connection){
  136. $this->onBufferFull($connection);
  137. };
  138. $worker->onBufferDrain = function($connection){
  139. $this->onBufferDrain($connection);
  140. };
  141. $worker->onError = function($connection, $code, $msg){
  142. $this->onError($connection, $code, $msg);
  143. };
  144. \Workerman\Worker::runAll();
  145. }
  146. protected function onMessage( $connection, $data, $raw ){
  147. $raw_msg = $this->mIsBinaryProto ? $this->bin2str($raw,true) : $raw;
  148. $this->logDebug("message:" . $raw_msg );
  149. if(!$data){
  150. $this->logError("message decode failed " . $raw_msg);
  151. return;
  152. }
  153. if(!is_array($data) && !is_object($data)){
  154. $this->logError("message type should be array or object");
  155. return;
  156. }
  157. //根据协议方法进行分发
  158. //检测data类型
  159. if (!is_array($data)) {
  160. $method = $data->method;
  161. if($method){
  162. $connection->protoData[$method] = $data;
  163. $method = 'on' . ucfirst($method);
  164. if(method_exists($this,$method))
  165. $this->$method($connection,$data,$raw);
  166. else
  167. $this->logError('method not exists . method=' . $method);
  168. }else{
  169. $this->logError('proto method not exists' . json_encode($data));
  170. }
  171. }elseif (count($data) == count($data, 1)) {
  172. $method = is_array($data) ? $data['method'] : $data->method;
  173. if($method){
  174. $connection->protoData[$method] = $data;
  175. $method = 'on' . ucfirst($method);
  176. if(method_exists($this,$method))
  177. $this->$method($connection,$data,$raw);
  178. else
  179. $this->logError('method not exists . method=' . $method);
  180. }else{
  181. $this->logError('proto method not exists' . json_encode($data));
  182. }
  183. }else{
  184. foreach($data as $row_data){
  185. $method = is_array($row_data) ? $row_data['method'] : $row_data->method;
  186. if($method){
  187. $connection->protoData[$method] = $row_data;
  188. $method = 'on' . ucfirst($method);
  189. if(method_exists($this,$method))
  190. $this->$method($connection,$row_data,$raw);
  191. else
  192. $this->logError('method not exists . method=' . $method);
  193. }else{
  194. $this->logError('proto method not exists' . json_encode($row_data));
  195. }
  196. }
  197. }
  198. }
  199. protected function bin2str( $hex, $space = false ){
  200. $data = unpack("C*chars",$hex);
  201. $bin = '';
  202. foreach($data as $key=>$value){
  203. $bin .= sprintf('%02X',$value);
  204. if($space)
  205. $bin .= ' ';
  206. }
  207. return trim($bin);
  208. }
  209. }