48    private const WORKER_START_OPTIONS = NativeThread::INHERIT_INI | NativeThread::INHERIT_COMMENTS;
 
   54    private array $workers = [];
 
   60    private array $workerStartHooks = [];
 
   62    public function __construct(
 
   64        private int $workerMemoryLimit,
 
   81        if($newSize > $this->size){
 
   82            $this->size = $newSize;
 
 
   95        Utils::validateCallableSignature(function(int $worker) : void{}, $hook);
 
   96        $this->workerStartHooks[spl_object_id($hook)] = $hook;
 
   97        foreach($this->workers as $i => $worker){
 
 
  108        unset($this->workerStartHooks[spl_object_id($hook)]);
 
 
  117        return array_keys($this->workers);
 
 
  125        if(!isset($this->workers[$workerId])){
 
  126            $sleeperEntry = $this->eventLoop->addNotifier(
function() use ($workerId) : 
void{
 
  127                $this->collectTasksFromWorker($workerId);
 
  129            $this->workers[$workerId] = 
new AsyncPoolWorkerEntry(
new AsyncWorker($this->logger, $workerId, $this->workerMemoryLimit, $sleeperEntry), $sleeperEntry->getNotifierId());
 
  130            $this->workers[$workerId]->worker->setClassLoaders([$this->classLoader]);
 
  131            $this->workers[$workerId]->worker->start(self::WORKER_START_OPTIONS);
 
  133            foreach($this->workerStartHooks as $hook){
 
  137            $this->checkCrashedWorker($workerId, 
null);
 
  140        return $this->workers[$workerId];
 
  147        if($worker < 0 || $worker >= $this->size){
 
  148            throw new \InvalidArgumentException(
"Invalid worker $worker");
 
  150        if($task->isSubmitted()){
 
  151            throw new \InvalidArgumentException(
"Cannot submit the same AsyncTask instance more than once");
 
  154        $task->setSubmitted();
 
  156        $this->getWorker($worker)->submit($task);
 
 
  168        $minUsage = PHP_INT_MAX;
 
  169        foreach($this->workers as $i => $entry){
 
  170            if(($usage = $entry->tasks->count()) < $minUsage){
 
  178        if($worker === 
null || ($minUsage > 0 && count($this->workers) < $this->size)){
 
  180            for($i = 0; $i < $this->size; ++$i){
 
  181                if(!isset($this->workers[$i])){
 
  188        assert($worker !== 
null);
 
 
  197        if($task->isSubmitted()){
 
  198            throw new \InvalidArgumentException(
"Cannot submit the same AsyncTask instance more than once");
 
  201        $worker = $this->selectWorker();
 
  202        $this->submitTaskToWorker($task, $worker);
 
 
  206    private function checkCrashedWorker(
int $workerId, ?AsyncTask $crashedTask) : void{
 
  207        $entry = $this->workers[$workerId];
 
  208        if($entry->worker->isTerminated()){
 
  209            if($crashedTask === 
null){
 
  210                foreach($entry->tasks as $task){
 
  211                    if($task->isTerminated()){
 
  212                        $crashedTask = $task;
 
  214                    }elseif(!$task->isFinished()){
 
  219            $info = $entry->worker->getCrashInfo();
 
  221                if($crashedTask !== 
null){
 
  222                    $message = 
"Worker $workerId crashed while running task " . get_class($crashedTask) . 
"#" . spl_object_id($crashedTask);
 
  224                    $message = 
"Worker $workerId crashed while doing unknown work";
 
  226                throw new ThreadCrashException($message, $info);
 
  228                throw new \RuntimeException(
"Worker $workerId crashed for unknown reason");
 
  240        foreach($this->workers as $workerId => $entry){
 
  241            $this->collectTasksFromWorker($workerId);
 
  245        foreach($this->workers as $entry){
 
  246            if(!$entry->tasks->isEmpty()){
 
 
  253    public function collectTasksFromWorker(
int $worker) : bool{
 
  254        if(!isset($this->workers[$worker])){
 
  255            throw new \InvalidArgumentException(
"No such worker $worker");
 
  257        $queue = $this->workers[$worker]->tasks;
 
  259        while(!$queue->isEmpty()){
 
  261            $task = $queue->bottom();
 
  262            if($task->isFinished()){ 
 
  265                if($task->isTerminated()){
 
  266                    $this->checkCrashedWorker($worker, $task);
 
  267                    throw new AssumptionFailedError(
"checkCrashedWorker() should have thrown an exception, making this unreachable");
 
  278                    $this->checkTaskProgressUpdates($task);
 
  279                    Timings::getAsyncTaskCompletionTimings($task)->time(
function() use ($task) : 
void{
 
  280                        $task->onCompletion();
 
  284                $this->checkTaskProgressUpdates($task);
 
  289        $this->workers[$worker]->worker->collect();
 
  300        return array_map(function(
AsyncPoolWorkerEntry $entry) : int{ return $entry->tasks->count(); }, $this->workers);
 
 
  303    public function shutdownUnusedWorkers() : int{
 
  306        foreach($this->workers as $i => $entry){
 
  307            if($entry->lastUsed + 300 < $time && $entry->tasks->isEmpty()){
 
  308                $entry->worker->quit();
 
  309                $this->eventLoop->removeNotifier($entry->sleeperNotifierId);
 
  310                unset($this->workers[$i]);
 
  322        while($this->collectTasks()){
 
  326        foreach($this->workers as $worker){
 
  327            $worker->worker->quit();
 
  328            $this->eventLoop->removeNotifier($worker->sleeperNotifierId);
 
 
  333    private function checkTaskProgressUpdates(AsyncTask $task) : void{
 
  334        Timings::getAsyncTaskProgressUpdateTimings($task)->time(function() use ($task) : void{
 
  335            $task->checkProgressUpdates();