15declare(strict_types=1);
17namespace raklib\generic;
25use
function array_fill;
31 public static int $WINDOW_SIZE = 2048;
33 private int $windowStart;
34 private int $windowEnd;
35 private int $highestSeqNumber = -1;
38 private array $ACKQueue = [];
40 private array $NACKQueue = [];
42 private int $reliableWindowStart;
43 private int $reliableWindowEnd;
45 private array $reliableWindow = [];
48 private array $receiveOrderedIndex;
50 private array $receiveSequencedHighestIndex;
52 private array $receiveOrderedPackets;
55 private array $splitPackets = [];
64 private \Closure $onRecv,
65 private \Closure $sendPacket,
66 private int $maxSplitPacketPartCount = PHP_INT_MAX,
67 private int $maxConcurrentSplitPackets = PHP_INT_MAX
69 $this->windowStart = 0;
70 $this->windowEnd = self::$WINDOW_SIZE;
72 $this->reliableWindowStart = 0;
73 $this->reliableWindowEnd = self::$WINDOW_SIZE;
75 $this->receiveOrderedIndex = array_fill(0, PacketReliability::MAX_ORDER_CHANNELS, 0);
76 $this->receiveSequencedHighestIndex = array_fill(0, PacketReliability::MAX_ORDER_CHANNELS, 0);
78 $this->receiveOrderedPackets = array_fill(0, PacketReliability::MAX_ORDER_CHANNELS, []);
90 private function handleSplit(EncapsulatedPacket $packet) : ?EncapsulatedPacket{
91 if($packet->splitInfo === null){
94 $totalParts = $packet->splitInfo->getTotalPartCount();
95 $partIndex = $packet->splitInfo->getPartIndex();
97 $totalParts >= $this->maxSplitPacketPartCount or $totalParts < 0 or
98 $partIndex >= $totalParts or $partIndex < 0
100 $this->logger->debug(
"Invalid split packet part, too many parts or invalid split index (part index $partIndex, part count $totalParts)");
104 $splitId = $packet->splitInfo->getId();
105 if(!isset($this->splitPackets[$splitId])){
106 if(count($this->splitPackets) >= $this->maxConcurrentSplitPackets){
107 $this->logger->debug(
"Ignored split packet part because reached concurrent split packet limit of $this->maxConcurrentSplitPackets");
110 $this->splitPackets[$splitId] = array_fill(0, $totalParts,
null);
111 }elseif(count($this->splitPackets[$splitId]) !== $totalParts){
112 $this->logger->debug(
"Wrong split count $totalParts for split packet $splitId, expected " . count($this->splitPackets[$splitId]));
116 $this->splitPackets[$splitId][$partIndex] = $packet;
119 foreach($this->splitPackets[$splitId] as $splitIndex => $part){
123 $parts[$splitIndex] = $part;
127 $pk =
new EncapsulatedPacket();
130 $pk->reliability = $packet->reliability;
131 $pk->messageIndex = $packet->messageIndex;
132 $pk->sequenceIndex = $packet->sequenceIndex;
133 $pk->orderIndex = $packet->orderIndex;
134 $pk->orderChannel = $packet->orderChannel;
136 for($i = 0; $i < $totalParts; ++$i){
137 $pk->buffer .= $parts[$i]->buffer;
140 unset($this->splitPackets[$splitId]);
145 private function handleEncapsulatedPacket(EncapsulatedPacket $packet) : void{
146 if($packet->messageIndex !== null){
148 if($packet->messageIndex < $this->reliableWindowStart or $packet->messageIndex > $this->reliableWindowEnd or isset($this->reliableWindow[$packet->messageIndex])){
152 $this->reliableWindow[$packet->messageIndex] =
true;
154 if($packet->messageIndex === $this->reliableWindowStart){
155 for(; isset($this->reliableWindow[$this->reliableWindowStart]); ++$this->reliableWindowStart){
156 unset($this->reliableWindow[$this->reliableWindowStart]);
157 ++$this->reliableWindowEnd;
162 if(($packet = $this->handleSplit($packet)) ===
null){
166 if(PacketReliability::isSequencedOrOrdered($packet->reliability) and ($packet->orderChannel < 0 or $packet->orderChannel >= PacketReliability::MAX_ORDER_CHANNELS)){
168 $this->logger->debug(
"Invalid packet, bad order channel ($packet->orderChannel)");
172 if(PacketReliability::isSequenced($packet->reliability)){
173 if($packet->sequenceIndex < $this->receiveSequencedHighestIndex[$packet->orderChannel] or $packet->orderIndex < $this->receiveOrderedIndex[$packet->orderChannel]){
178 $this->receiveSequencedHighestIndex[$packet->orderChannel] = $packet->sequenceIndex + 1;
179 $this->handleEncapsulatedPacketRoute($packet);
180 }elseif(PacketReliability::isOrdered($packet->reliability)){
181 if($packet->orderIndex === $this->receiveOrderedIndex[$packet->orderChannel]){
186 $this->receiveSequencedHighestIndex[$packet->orderChannel] = 0;
187 $this->receiveOrderedIndex[$packet->orderChannel] = $packet->orderIndex + 1;
189 $this->handleEncapsulatedPacketRoute($packet);
190 $i = $this->receiveOrderedIndex[$packet->orderChannel];
191 for(; isset($this->receiveOrderedPackets[$packet->orderChannel][$i]); ++$i){
192 $this->handleEncapsulatedPacketRoute($this->receiveOrderedPackets[$packet->orderChannel][$i]);
193 unset($this->receiveOrderedPackets[$packet->orderChannel][$i]);
196 $this->receiveOrderedIndex[$packet->orderChannel] = $i;
197 }elseif($packet->orderIndex > $this->receiveOrderedIndex[$packet->orderChannel]){
198 if(count($this->receiveOrderedPackets[$packet->orderChannel]) >= self::$WINDOW_SIZE){
202 $this->receiveOrderedPackets[$packet->orderChannel][$packet->orderIndex] = $packet;
208 $this->handleEncapsulatedPacketRoute($packet);
212 public function onDatagram(Datagram $packet) : void{
213 if($packet->seqNumber < $this->windowStart or $packet->seqNumber > $this->windowEnd or isset($this->ACKQueue[$packet->seqNumber])){
214 $this->logger->debug(
"Received duplicate or out-of-window packet (sequence number $packet->seqNumber, window " . $this->windowStart .
"-" . $this->windowEnd .
")");
218 unset($this->NACKQueue[$packet->seqNumber]);
219 $this->ACKQueue[$packet->seqNumber] = $packet->seqNumber;
220 if($this->highestSeqNumber < $packet->seqNumber){
221 $this->highestSeqNumber = $packet->seqNumber;
224 if($packet->seqNumber === $this->windowStart){
228 for(; isset($this->ACKQueue[$this->windowStart]); ++$this->windowStart){
231 }elseif($packet->seqNumber > $this->windowStart){
235 for($i = $this->windowStart; $i < $packet->seqNumber; ++$i){
236 if(!isset($this->ACKQueue[$i])){
237 $this->NACKQueue[$i] = $i;
241 assert(
false,
"received packet before window start");
244 foreach($packet->packets as $pk){
245 $this->handleEncapsulatedPacket($pk);
249 public function update() : void{
250 $diff = $this->highestSeqNumber - $this->windowStart + 1;
257 $this->windowStart += $diff;
258 $this->windowEnd += $diff;
261 if(count($this->ACKQueue) > 0){
263 $pk->packets = $this->ACKQueue;
264 ($this->sendPacket)($pk);
265 $this->ACKQueue = [];
268 if(count($this->NACKQueue) > 0){
270 $pk->packets = $this->NACKQueue;
271 ($this->sendPacket)($pk);
272 $this->NACKQueue = [];
276 public function needsUpdate() : bool{
277 return count($this->ACKQueue) !== 0 or count($this->NACKQueue) !== 0;
__construct(private \Logger $logger, private \Closure $onRecv, private \Closure $sendPacket, private int $maxSplitPacketPartCount=PHP_INT_MAX, private int $maxConcurrentSplitPackets=PHP_INT_MAX)