15declare(strict_types=1);
17namespace raklib\generic;
25use
function array_fill;
26use
function array_push;
29use
function microtime;
30use
function str_split;
34 private const DATAGRAM_MTU_OVERHEAD = 36 + Datagram::HEADER_SIZE;
35 private const MIN_POSSIBLE_PACKET_SIZE_LIMIT = Session::MIN_MTU_SIZE - self::DATAGRAM_MTU_OVERHEAD;
40 private const UNACKED_RETRANSMIT_DELAY = 2.0;
43 private array $sendQueue = [];
45 private int $splitID = 0;
47 private int $sendSeqNumber = 0;
49 private int $messageIndex = 0;
51 private int $reliableWindowStart;
52 private int $reliableWindowEnd;
57 private array $reliableWindow = [];
60 private array $sendOrderedIndex;
62 private array $sendSequencedIndex;
65 private array $reliableBacklog = [];
68 private array $resendQueue = [];
71 private array $reliableCache = [];
74 private array $needACK = [];
77 private int $maxDatagramPayloadSize;
86 private \Closure $sendDatagramCallback,
87 private \Closure $onACK,
88 private int $reliableWindowSize = 512,
90 $this->sendOrderedIndex = array_fill(0, PacketReliability::MAX_ORDER_CHANNELS, 0);
91 $this->sendSequencedIndex = array_fill(0, PacketReliability::MAX_ORDER_CHANNELS, 0);
93 $this->maxDatagramPayloadSize = $this->mtuSize - self::DATAGRAM_MTU_OVERHEAD;
95 $this->reliableWindowStart = 0;
96 $this->reliableWindowEnd = $this->reliableWindowSize;
102 private function sendDatagram(array $packets) : void{
104 $datagram->seqNumber = $this->sendSeqNumber++;
105 $datagram->packets = $packets;
106 ($this->sendDatagramCallback)($datagram);
109 foreach($datagram->packets as $pk){
110 if(PacketReliability::isReliable($pk->reliability)){
114 if(count($resendable) !== 0){
115 $this->reliableCache[$datagram->seqNumber] =
new ReliableCacheEntry($resendable);
119 public function sendQueue() : void{
120 if(count($this->sendQueue) > 0){
121 $this->sendDatagram($this->sendQueue);
122 $this->sendQueue = [];
126 private function addToQueue(EncapsulatedPacket $pk,
bool $immediate) : void{
127 if(PacketReliability::isReliable($pk->reliability)){
128 if($pk->messageIndex ===
null || $pk->messageIndex < $this->reliableWindowStart){
129 throw new \InvalidArgumentException(
"Cannot send a reliable packet with message index less than the window start ($pk->messageIndex < $this->reliableWindowStart)");
131 if($pk->messageIndex >= $this->reliableWindowEnd){
133 $this->reliableBacklog[$pk->messageIndex] = $pk;
137 $this->reliableWindow[$pk->messageIndex] =
false;
140 if($pk->identifierACK !==
null and $pk->messageIndex !==
null){
141 $this->needACK[$pk->identifierACK][$pk->messageIndex] = $pk->messageIndex;
145 foreach($this->sendQueue as $queued){
146 $length += $queued->getTotalLength();
149 if($length + $pk->getTotalLength() > $this->maxDatagramPayloadSize){
153 if($pk->identifierACK !==
null){
154 $this->sendQueue[] = clone $pk;
155 $pk->identifierACK =
null;
157 $this->sendQueue[] = $pk;
166 public function addEncapsulatedToQueue(EncapsulatedPacket $packet,
bool $immediate =
false) : void{
167 if($packet->identifierACK !== null){
168 $this->needACK[$packet->identifierACK] = [];
171 if(PacketReliability::isOrdered($packet->reliability)){
172 $packet->orderIndex = $this->sendOrderedIndex[$packet->orderChannel]++;
173 }elseif(PacketReliability::isSequenced($packet->reliability)){
174 $packet->orderIndex = $this->sendOrderedIndex[$packet->orderChannel];
175 $packet->sequenceIndex = $this->sendSequencedIndex[$packet->orderChannel]++;
178 $maxBufferSize = $this->maxDatagramPayloadSize - $packet->getHeaderLength();
180 if(strlen($packet->buffer) > $maxBufferSize){
181 $buffers = str_split($packet->buffer, $maxBufferSize - EncapsulatedPacket::SPLIT_INFO_LENGTH);
182 $bufferCount = count($buffers);
184 $splitID = ++$this->splitID % 65536;
185 foreach($buffers as $count => $buffer){
186 $pk =
new EncapsulatedPacket();
187 $pk->splitInfo =
new SplitPacketInfo($splitID, $count, $bufferCount);
188 $pk->reliability = $packet->reliability;
189 $pk->buffer = $buffer;
190 $pk->identifierACK = $packet->identifierACK;
192 if(PacketReliability::isReliable($pk->reliability)){
193 $pk->messageIndex = $this->messageIndex++;
196 $pk->sequenceIndex = $packet->sequenceIndex;
197 $pk->orderChannel = $packet->orderChannel;
198 $pk->orderIndex = $packet->orderIndex;
200 $this->addToQueue($pk,
true);
203 if(PacketReliability::isReliable($packet->reliability)){
204 $packet->messageIndex = $this->messageIndex++;
206 $this->addToQueue($packet, $immediate);
210 private function updateReliableWindow() : void{
212 isset($this->reliableWindow[$this->reliableWindowStart]) &&
213 $this->reliableWindow[$this->reliableWindowStart] === true
215 unset($this->reliableWindow[$this->reliableWindowStart]);
216 $this->reliableWindowStart++;
217 $this->reliableWindowEnd++;
221 public function onACK(ACK $packet) : void{
222 foreach($packet->packets as $seq){
223 if(isset($this->reliableCache[$seq])){
224 foreach($this->reliableCache[$seq]->getPackets() as $pk){
225 assert($pk->messageIndex !==
null && $pk->messageIndex >= $this->reliableWindowStart && $pk->messageIndex < $this->reliableWindowEnd);
226 $this->reliableWindow[$pk->messageIndex] =
true;
227 $this->updateReliableWindow();
229 if($pk->identifierACK !==
null){
230 unset($this->needACK[$pk->identifierACK][$pk->messageIndex]);
231 if(count($this->needACK[$pk->identifierACK]) === 0){
232 unset($this->needACK[$pk->identifierACK]);
233 ($this->onACK)($pk->identifierACK);
237 unset($this->reliableCache[$seq]);
242 public function onNACK(NACK $packet) : void{
243 foreach($packet->packets as $seq){
244 if(isset($this->reliableCache[$seq])){
245 foreach($this->reliableCache[$seq]->getPackets() as $pk){
246 $this->resendQueue[] = $pk;
248 unset($this->reliableCache[$seq]);
253 public function needsUpdate() : bool{
255 count($this->sendQueue) !== 0 or
256 count($this->reliableBacklog) !== 0 or
257 count($this->resendQueue) !== 0 or
258 count($this->reliableCache) !== 0
262 public function update() : void{
263 $retransmitOlderThan = microtime(true) - self::UNACKED_RETRANSMIT_DELAY;
264 foreach($this->reliableCache as $seq => $pk){
265 if($pk->getTimestamp() < $retransmitOlderThan){
267 array_push($this->resendQueue, ...$pk->getPackets());
268 unset($this->reliableCache[$seq]);
274 if(count($this->resendQueue) > 0){
275 foreach($this->resendQueue as $pk){
277 $this->addToQueue($pk,
false);
279 $this->resendQueue = [];
282 if(count($this->reliableBacklog) > 0){
283 foreach($this->reliableBacklog as $k => $pk){
284 assert($pk->messageIndex !==
null && $pk->messageIndex >= $this->reliableWindowStart);
285 if($pk->messageIndex >= $this->reliableWindowEnd){
290 $this->addToQueue($pk,
false);
291 unset($this->reliableBacklog[$k]);
__construct(private int $mtuSize, private \Closure $sendDatagramCallback, private \Closure $onACK, private int $reliableWindowSize=512,)