PocketMine-MP 5.21.1 git-2ff647079265e7c600203af4fd902b15e99d49a4
vendor/pocketmine/raklib/src/server/Server.php
1<?php
2
3/*
4 * This file is part of RakLib.
5 * Copyright (C) 2014-2022 PocketMine Team <https://github.com/pmmp/RakLib>
6 *
7 * RakLib is not affiliated with Jenkins Software LLC nor RakNet.
8 *
9 * RakLib is free software: you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation, either version 3 of the License, or
12 * (at your option) any later version.
13 */
14
15declare(strict_types=1);
16
17namespace raklib\server;
18
31use function asort;
32use function bin2hex;
33use function count;
34use function get_class;
35use function microtime;
36use function ord;
37use function preg_match;
38use function strlen;
39use function time;
40use function time_sleep_until;
41use const PHP_INT_MAX;
42use const SOCKET_ECONNRESET;
43
44class Server implements ServerInterface{
45
46 private const RAKLIB_TPS = 100;
47 private const RAKLIB_TIME_PER_TICK = 1 / self::RAKLIB_TPS;
48
49 protected int $receiveBytes = 0;
50 protected int $sendBytes = 0;
51
53 protected array $sessionsByAddress = [];
55 protected array $sessions = [];
56
57 protected UnconnectedMessageHandler $unconnectedMessageHandler;
58
59 protected string $name = "";
60
61 protected int $packetLimit = 200;
62
63 protected bool $shutdown = false;
64
65 protected int $ticks = 0;
66
68 protected array $block = [];
70 protected array $ipSec = [];
71
73 protected array $rawPacketFilters = [];
74
75 public bool $portChecking = false;
76
77 protected int $nextSessionId = 0;
78
83 public function __construct(
84 protected int $serverId,
85 protected \Logger $logger,
86 protected ServerSocket $socket,
87 protected int $maxMtuSize,
88 ProtocolAcceptor $protocolAcceptor,
89 private ServerEventSource $eventSource,
90 private ServerEventListener $eventListener,
91 private ExceptionTraceCleaner $traceCleaner,
92 private int $recvMaxSplitParts = ServerSession::DEFAULT_MAX_SPLIT_PART_COUNT,
93 private int $recvMaxConcurrentSplits = ServerSession::DEFAULT_MAX_CONCURRENT_SPLIT_COUNT
94 ){
95 if($maxMtuSize < Session::MIN_MTU_SIZE){
96 throw new \InvalidArgumentException("MTU size must be at least " . Session::MIN_MTU_SIZE . ", got $maxMtuSize");
97 }
98 $this->socket->setBlocking(false);
99
100 $this->unconnectedMessageHandler = new UnconnectedMessageHandler($this, $protocolAcceptor);
101 }
102
103 public function getPort() : int{
104 return $this->socket->getBindAddress()->getPort();
105 }
106
107 public function getMaxMtuSize() : int{
108 return $this->maxMtuSize;
109 }
110
111 public function getLogger() : \Logger{
112 return $this->logger;
113 }
114
115 public function tickProcessor() : void{
116 $start = microtime(true);
117
118 /*
119 * The below code is designed to allow co-op between sending and receiving to avoid slowing down either one
120 * when high traffic is coming either way. Yielding will occur after 100 messages.
121 */
122 do{
123 $stream = !$this->shutdown;
124 for($i = 0; $i < 100 && $stream && !$this->shutdown; ++$i){ //if we received a shutdown event, we don't care about any more messages from the event source
125 $stream = $this->eventSource->process($this);
126 }
127
128 $socket = true;
129 for($i = 0; $i < 100 && $socket; ++$i){
130 $socket = $this->receivePacket();
131 }
132 }while($stream || $socket);
133
134 $this->tick();
135
136 $time = microtime(true) - $start;
137 if($time < self::RAKLIB_TIME_PER_TICK){
138 @time_sleep_until(microtime(true) + self::RAKLIB_TIME_PER_TICK - $time);
139 }
140 }
141
145 public function waitShutdown() : void{
146 $this->shutdown = true;
147
148 while($this->eventSource->process($this)){
149 //Ensure that any late messages are processed before we start initiating server disconnects, so that if the
150 //server implementation used a custom disconnect mechanism (e.g. a server transfer), we don't break it in
151 //race conditions.
152 }
153
154 foreach($this->sessions as $session){
155 $session->initiateDisconnect(DisconnectReason::SERVER_SHUTDOWN);
156 }
157
158 while(count($this->sessions) > 0){
159 $this->tickProcessor();
160 }
161
162 $this->socket->close();
163 $this->logger->debug("Graceful shutdown complete");
164 }
165
166 private function tick() : void{
167 $time = microtime(true);
168 foreach($this->sessions as $session){
169 $session->update($time);
170 if($session->isFullyDisconnected()){
171 $this->removeSessionInternal($session);
172 }
173 }
174
175 $this->ipSec = [];
176
177 if(!$this->shutdown and ($this->ticks % self::RAKLIB_TPS) === 0){
178 if($this->sendBytes > 0 or $this->receiveBytes > 0){
179 $this->eventListener->onBandwidthStatsUpdate($this->sendBytes, $this->receiveBytes);
180 $this->sendBytes = 0;
181 $this->receiveBytes = 0;
182 }
183
184 if(count($this->block) > 0){
185 asort($this->block);
186 $now = time();
187 foreach($this->block as $address => $timeout){
188 if($timeout <= $now){
189 unset($this->block[$address]);
190 }else{
191 break;
192 }
193 }
194 }
195 }
196
197 ++$this->ticks;
198 }
199
201 private function receivePacket() : bool{
202 try{
203 $buffer = $this->socket->readPacket($addressIp, $addressPort);
204 }catch(SocketException $e){
205 $error = $e->getCode();
206 if($error === SOCKET_ECONNRESET){ //client disconnected improperly, maybe crash or lost connection
207 return true;
208 }
209
210 $this->logger->debug($e->getMessage());
211 return false;
212 }
213 if($buffer === null){
214 return false; //no data
215 }
216 $len = strlen($buffer);
217
218 $this->receiveBytes += $len;
219 if(isset($this->block[$addressIp])){
220 return true;
221 }
222
223 if(isset($this->ipSec[$addressIp])){
224 if(++$this->ipSec[$addressIp] >= $this->packetLimit){
225 $this->blockAddress($addressIp);
226 return true;
227 }
228 }else{
229 $this->ipSec[$addressIp] = 1;
230 }
231
232 if($len < 1){
233 return true;
234 }
235
236 $address = new InternetAddress($addressIp, $addressPort, $this->socket->getBindAddress()->getVersion());
237 try{
238 $session = $this->getSessionByAddress($address);
239 if($session !== null){
240 $header = ord($buffer[0]);
241 if(($header & Datagram::BITFLAG_VALID) !== 0){
242 if(($header & Datagram::BITFLAG_ACK) !== 0){
243 $packet = new ACK();
244 }elseif(($header & Datagram::BITFLAG_NAK) !== 0){
245 $packet = new NACK();
246 }else{
247 $packet = new Datagram();
248 }
249 $packet->decode(new PacketSerializer($buffer));
250 $session->handlePacket($packet);
251 return true;
252 }elseif($session->isConnected()){
253 //allows unconnected packets if the session is stuck in DISCONNECTING state, useful if the client
254 //didn't disconnect properly for some reason (e.g. crash)
255 $this->logger->debug("Ignored unconnected packet from $address due to session already opened (0x" . bin2hex($buffer[0]) . ")");
256 return true;
257 }
258 }
259
260 if(!$this->shutdown){
261 if(!($handled = $this->unconnectedMessageHandler->handleRaw($buffer, $address))){
262 foreach($this->rawPacketFilters as $pattern){
263 if(preg_match($pattern, $buffer) > 0){
264 $handled = true;
265 $this->eventListener->onRawPacketReceive($address->getIp(), $address->getPort(), $buffer);
266 break;
267 }
268 }
269 }
270
271 if(!$handled){
272 $this->logger->debug("Ignored packet from $address due to no session opened (0x" . bin2hex($buffer[0]) . ")");
273 }
274 }
275 }catch(BinaryDataException $e){
276 $logFn = function() use ($address, $e, $buffer) : void{
277 $this->logger->debug("Packet from $address (" . strlen($buffer) . " bytes): 0x" . bin2hex($buffer));
278 $this->logger->debug(get_class($e) . ": " . $e->getMessage() . " in " . $e->getFile() . " on line " . $e->getLine());
279 foreach($this->traceCleaner->getTrace(0, $e->getTrace()) as $line){
280 $this->logger->debug($line);
281 }
282 $this->logger->error("Bad packet from $address: " . $e->getMessage());
283 };
284 if($this->logger instanceof \BufferedLogger){
285 $this->logger->buffer($logFn);
286 }else{
287 $logFn();
288 }
289 $this->blockAddress($address->getIp(), 5);
290 }
291
292 return true;
293 }
294
295 public function sendPacket(Packet $packet, InternetAddress $address) : void{
296 $out = new PacketSerializer(); //TODO: reusable streams to reduce allocations
297 $packet->encode($out);
298 try{
299 $this->sendBytes += $this->socket->writePacket($out->getBuffer(), $address->getIp(), $address->getPort());
300 }catch(SocketException $e){
301 $this->logger->debug($e->getMessage());
302 }
303 }
304
305 public function getEventListener() : ServerEventListener{
306 return $this->eventListener;
307 }
308
309 public function sendEncapsulated(int $sessionId, EncapsulatedPacket $packet, bool $immediate = false) : void{
310 $session = $this->sessions[$sessionId] ?? null;
311 if($session !== null and $session->isConnected()){
312 $session->addEncapsulatedToQueue($packet, $immediate);
313 }
314 }
315
316 public function sendRaw(string $address, int $port, string $payload) : void{
317 try{
318 $this->socket->writePacket($payload, $address, $port);
319 }catch(SocketException $e){
320 $this->logger->debug($e->getMessage());
321 }
322 }
323
324 public function closeSession(int $sessionId) : void{
325 if(isset($this->sessions[$sessionId])){
326 $this->sessions[$sessionId]->initiateDisconnect(DisconnectReason::SERVER_DISCONNECT);
327 }
328 }
329
330 public function setName(string $name) : void{
331 $this->name = $name;
332 }
333
334 public function setPortCheck(bool $value) : void{
335 $this->portChecking = $value;
336 }
337
338 public function setPacketsPerTickLimit(int $limit) : void{
339 $this->packetLimit = $limit;
340 }
341
342 public function blockAddress(string $address, int $timeout = 300) : void{
343 $final = time() + $timeout;
344 if(!isset($this->block[$address]) or $timeout === -1){
345 if($timeout === -1){
346 $final = PHP_INT_MAX;
347 }else{
348 $this->logger->notice("Blocked $address for $timeout seconds");
349 }
350 $this->block[$address] = $final;
351 }elseif($this->block[$address] < $final){
352 $this->block[$address] = $final;
353 }
354 }
355
356 public function unblockAddress(string $address) : void{
357 unset($this->block[$address]);
358 $this->logger->debug("Unblocked $address");
359 }
360
361 public function addRawPacketFilter(string $regex) : void{
362 $this->rawPacketFilters[] = $regex;
363 }
364
365 public function getSessionByAddress(InternetAddress $address) : ?ServerSession{
366 return $this->sessionsByAddress[$address->toString()] ?? null;
367 }
368
369 public function sessionExists(InternetAddress $address) : bool{
370 return isset($this->sessionsByAddress[$address->toString()]);
371 }
372
373 public function createSession(InternetAddress $address, int $clientId, int $mtuSize) : ServerSession{
374 $existingSession = $this->sessionsByAddress[$address->toString()] ?? null;
375 if($existingSession !== null){
376 $existingSession->forciblyDisconnect(DisconnectReason::CLIENT_RECONNECT);
377 $this->removeSessionInternal($existingSession);
378 }
379
380 $this->checkSessions();
381
382 while(isset($this->sessions[$this->nextSessionId])){
383 $this->nextSessionId++;
384 $this->nextSessionId &= 0x7fffffff; //we don't expect more than 2 billion simultaneous connections, and this fits in 4 bytes
385 }
386
387 $session = new ServerSession($this, $this->logger, clone $address, $clientId, $mtuSize, $this->nextSessionId, $this->recvMaxSplitParts, $this->recvMaxConcurrentSplits);
388 $this->sessionsByAddress[$address->toString()] = $session;
389 $this->sessions[$this->nextSessionId] = $session;
390 $this->logger->debug("Created session for $address with MTU size $mtuSize");
391
392 return $session;
393 }
394
395 private function removeSessionInternal(ServerSession $session) : void{
396 unset($this->sessionsByAddress[$session->getAddress()->toString()], $this->sessions[$session->getInternalId()]);
397 }
398
399 public function openSession(ServerSession $session) : void{
400 $address = $session->getAddress();
401 $this->eventListener->onClientConnect($session->getInternalId(), $address->getIp(), $address->getPort(), $session->getID());
402 }
403
404 private function checkSessions() : void{
405 if(count($this->sessions) > 4096){
406 foreach($this->sessions as $sessionId => $session){
407 if($session->isTemporary()){
408 $this->removeSessionInternal($session);
409 if(count($this->sessions) <= 4096){
410 break;
411 }
412 }
413 }
414 }
415 }
416
417 public function getName() : string{
418 return $this->name;
419 }
420
421 public function getID() : int{
422 return $this->serverId;
423 }
424}
__construct(protected int $serverId, protected \Logger $logger, protected ServerSocket $socket, protected int $maxMtuSize, ProtocolAcceptor $protocolAcceptor, private ServerEventSource $eventSource, private ServerEventListener $eventListener, private ExceptionTraceCleaner $traceCleaner, private int $recvMaxSplitParts=ServerSession::DEFAULT_MAX_SPLIT_PART_COUNT, private int $recvMaxConcurrentSplits=ServerSession::DEFAULT_MAX_CONCURRENT_SPLIT_COUNT)