15declare(strict_types=1);
17namespace raklib\server;
34use
function get_class;
35use
function microtime;
37use
function preg_match;
40use
function time_sleep_until;
42use
const SOCKET_ECONNRESET;
46 private const RAKLIB_TPS = 100;
47 private const RAKLIB_TIME_PER_TICK = 1 / self::RAKLIB_TPS;
49 protected int $receiveBytes = 0;
50 protected int $sendBytes = 0;
53 protected array $sessionsByAddress = [];
55 protected array $sessions = [];
59 protected string $name =
"";
61 protected int $packetLimit = 200;
63 protected bool $shutdown =
false;
65 protected int $ticks = 0;
68 protected array $block = [];
70 protected array $ipSec = [];
73 protected array $rawPacketFilters = [];
75 public bool $portChecking =
false;
77 protected int $nextSessionId = 0;
84 protected int $serverId,
87 protected int $maxMtuSize,
92 private int $recvMaxSplitParts = ServerSession::DEFAULT_MAX_SPLIT_PART_COUNT,
93 private int $recvMaxConcurrentSplits = ServerSession::DEFAULT_MAX_CONCURRENT_SPLIT_COUNT
95 if($maxMtuSize < Session::MIN_MTU_SIZE){
96 throw new \InvalidArgumentException(
"MTU size must be at least " . Session::MIN_MTU_SIZE .
", got $maxMtuSize");
98 $this->socket->setBlocking(
false);
103 public function getPort() : int{
104 return $this->socket->getBindAddress()->getPort();
107 public function getMaxMtuSize() : int{
108 return $this->maxMtuSize;
111 public function getLogger() : \
Logger{
112 return $this->logger;
115 public function tickProcessor() : void{
116 $start = microtime(true);
123 $stream = !$this->shutdown;
124 for($i = 0; $i < 100 && $stream && !$this->shutdown; ++$i){
125 $stream = $this->eventSource->process($this);
129 for($i = 0; $i < 100 && $socket; ++$i){
130 $socket = $this->receivePacket();
132 }
while($stream || $socket);
136 $time = microtime(
true) - $start;
137 if($time < self::RAKLIB_TIME_PER_TICK){
138 @time_sleep_until(microtime(
true) + self::RAKLIB_TIME_PER_TICK - $time);
146 $this->shutdown = true;
148 while($this->eventSource->process($this)){
154 foreach($this->sessions as $session){
155 $session->initiateDisconnect(DisconnectReason::SERVER_SHUTDOWN);
158 while(count($this->sessions) > 0){
159 $this->tickProcessor();
162 $this->socket->close();
163 $this->logger->debug(
"Graceful shutdown complete");
166 private function tick() : void{
167 $time = microtime(true);
168 foreach($this->sessions as $session){
169 $session->update($time);
170 if($session->isFullyDisconnected()){
171 $this->removeSessionInternal($session);
177 if(!$this->shutdown and ($this->ticks % self::RAKLIB_TPS) === 0){
178 if($this->sendBytes > 0 or $this->receiveBytes > 0){
179 $this->eventListener->onBandwidthStatsUpdate($this->sendBytes, $this->receiveBytes);
180 $this->sendBytes = 0;
181 $this->receiveBytes = 0;
184 if(count($this->block) > 0){
187 foreach($this->block as $address => $timeout){
188 if($timeout <= $now){
189 unset($this->block[$address]);
201 private function receivePacket() : bool{
203 $buffer = $this->socket->readPacket($addressIp, $addressPort);
204 }
catch(SocketException $e){
205 $error = $e->getCode();
206 if($error === SOCKET_ECONNRESET){
210 $this->logger->debug($e->getMessage());
213 if($buffer ===
null){
216 $len = strlen($buffer);
218 $this->receiveBytes += $len;
219 if(isset($this->block[$addressIp])){
223 if(isset($this->ipSec[$addressIp])){
224 if(++$this->ipSec[$addressIp] >= $this->packetLimit){
225 $this->blockAddress($addressIp);
229 $this->ipSec[$addressIp] = 1;
236 $address =
new InternetAddress($addressIp, $addressPort, $this->socket->getBindAddress()->getVersion());
238 $session = $this->getSessionByAddress($address);
239 if($session !==
null){
240 $header = ord($buffer[0]);
241 if(($header & Datagram::BITFLAG_VALID) !== 0){
242 if(($header & Datagram::BITFLAG_ACK) !== 0){
244 }elseif(($header & Datagram::BITFLAG_NAK) !== 0){
245 $packet =
new NACK();
247 $packet =
new Datagram();
249 $packet->decode(
new PacketSerializer($buffer));
250 $session->handlePacket($packet);
252 }elseif($session->isConnected()){
255 $this->logger->debug(
"Ignored unconnected packet from $address due to session already opened (0x" . bin2hex($buffer[0]) .
")");
260 if(!$this->shutdown){
261 if(!($handled = $this->unconnectedMessageHandler->handleRaw($buffer, $address))){
262 foreach($this->rawPacketFilters as $pattern){
263 if(preg_match($pattern, $buffer) > 0){
265 $this->eventListener->onRawPacketReceive($address->getIp(), $address->getPort(), $buffer);
272 $this->logger->debug(
"Ignored packet from $address due to no session opened (0x" . bin2hex($buffer[0]) .
")");
275 }
catch(BinaryDataException $e){
276 $logFn =
function() use ($address, $e, $buffer) :
void{
277 $this->logger->debug(
"Packet from $address (" . strlen($buffer) .
" bytes): 0x" . bin2hex($buffer));
278 $this->logger->debug(get_class($e) .
": " . $e->getMessage() .
" in " . $e->getFile() .
" on line " . $e->getLine());
279 foreach($this->traceCleaner->getTrace(0, $e->getTrace()) as $line){
280 $this->logger->debug($line);
282 $this->logger->error(
"Bad packet from $address: " . $e->getMessage());
285 $this->logger->buffer($logFn);
289 $this->blockAddress($address->getIp(), 5);
295 public function sendPacket(Packet $packet, InternetAddress $address) : void{
296 $out = new PacketSerializer();
297 $packet->encode($out);
299 $this->sendBytes += $this->socket->writePacket($out->getBuffer(), $address->getIp(), $address->getPort());
300 }
catch(SocketException $e){
301 $this->logger->debug($e->getMessage());
305 public function getEventListener() : ServerEventListener{
306 return $this->eventListener;
309 public function sendEncapsulated(
int $sessionId, EncapsulatedPacket $packet,
bool $immediate =
false) : void{
310 $session = $this->sessions[$sessionId] ?? null;
311 if($session !==
null and $session->isConnected()){
312 $session->addEncapsulatedToQueue($packet, $immediate);
316 public function sendRaw(
string $address,
int $port,
string $payload) : void{
318 $this->socket->writePacket($payload, $address, $port);
319 }
catch(SocketException $e){
320 $this->logger->debug($e->getMessage());
324 public function closeSession(
int $sessionId) : void{
325 if(isset($this->sessions[$sessionId])){
326 $this->sessions[$sessionId]->initiateDisconnect(DisconnectReason::SERVER_DISCONNECT);
330 public function setName(
string $name) : void{
334 public function setPortCheck(
bool $value) : void{
335 $this->portChecking = $value;
338 public function setPacketsPerTickLimit(
int $limit) : void{
339 $this->packetLimit = $limit;
342 public function blockAddress(
string $address,
int $timeout = 300) : void{
343 $final = time() + $timeout;
344 if(!isset($this->block[$address]) or $timeout === -1){
346 $final = PHP_INT_MAX;
348 $this->logger->notice(
"Blocked $address for $timeout seconds");
350 $this->block[$address] = $final;
351 }elseif($this->block[$address] < $final){
352 $this->block[$address] = $final;
356 public function unblockAddress(
string $address) : void{
357 unset($this->block[$address]);
358 $this->logger->debug(
"Unblocked $address");
361 public function addRawPacketFilter(
string $regex) : void{
362 $this->rawPacketFilters[] = $regex;
365 public function getSessionByAddress(InternetAddress $address) : ?ServerSession{
366 return $this->sessionsByAddress[$address->toString()] ?? null;
369 public function sessionExists(InternetAddress $address) : bool{
370 return isset($this->sessionsByAddress[$address->toString()]);
373 public function createSession(InternetAddress $address,
int $clientId,
int $mtuSize) : ServerSession{
374 $existingSession = $this->sessionsByAddress[$address->toString()] ?? null;
375 if($existingSession !==
null){
376 $existingSession->forciblyDisconnect(DisconnectReason::CLIENT_RECONNECT);
377 $this->removeSessionInternal($existingSession);
380 $this->checkSessions();
382 while(isset($this->sessions[$this->nextSessionId])){
383 $this->nextSessionId++;
384 $this->nextSessionId &= 0x7fffffff;
387 $session =
new ServerSession($this, $this->logger, clone $address, $clientId, $mtuSize, $this->nextSessionId, $this->recvMaxSplitParts, $this->recvMaxConcurrentSplits);
388 $this->sessionsByAddress[$address->toString()] = $session;
389 $this->sessions[$this->nextSessionId] = $session;
390 $this->logger->debug(
"Created session for $address with MTU size $mtuSize");
395 private function removeSessionInternal(ServerSession $session) : void{
396 unset($this->sessionsByAddress[$session->getAddress()->toString()], $this->sessions[$session->getInternalId()]);
399 public function openSession(ServerSession $session) : void{
400 $address = $session->getAddress();
401 $this->eventListener->onClientConnect($session->getInternalId(), $address->getIp(), $address->getPort(), $session->getID());
404 private function checkSessions() : void{
405 if(count($this->sessions) > 4096){
406 foreach($this->sessions as $sessionId => $session){
407 if($session->isTemporary()){
408 $this->removeSessionInternal($session);
409 if(count($this->sessions) <= 4096){
417 public function getName() : string{
421 public function getID() : int{
422 return $this->serverId;
__construct(protected int $serverId, protected \Logger $logger, protected ServerSocket $socket, protected int $maxMtuSize, ProtocolAcceptor $protocolAcceptor, private ServerEventSource $eventSource, private ServerEventListener $eventListener, private ExceptionTraceCleaner $traceCleaner, private int $recvMaxSplitParts=ServerSession::DEFAULT_MAX_SPLIT_PART_COUNT, private int $recvMaxConcurrentSplits=ServerSession::DEFAULT_MAX_CONCURRENT_SPLIT_COUNT)