Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"require": {
"php": "^7.4|^8.0",
"antidot-fw/framework": "^0.1.3",
"beberlei/assert": "^3.3",
"psr/container": "^1.0.0",
"ramsey/uuid": "^4.1",
"react/http": "^1.2"
Expand Down
45 changes: 45 additions & 0 deletions src/Child.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?php

declare(strict_types=1);

namespace Antidot\React;

use RuntimeException;
use function pcntl_fork;
use function pcntl_waitpid;

class Child
{
private const DEFAULT_NUMBER_OF_FORKS = 0;

public static function fork(
int $numberOfWorkers,
callable $asyncServer,
int $numberOfFork = self::DEFAULT_NUMBER_OF_FORKS
): int {
if (strtoupper(substr(PHP_OS, 0, 3)) === 'WIN') {
throw new RuntimeException('The PHP pcntl extension is not available for Windows systems');
}

$pid = pcntl_fork();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just notice that the pcntl_fork function is only available on Unix-based system.

The reference is available here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am searching for an alternative for windows without luck, this feature will only be available on nix systems I think. We can protect it from usage in Windows systems;-S

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

something like this:
image

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm working o mutation tests now;- D

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, do you consider using the pthread?

Copy link
Member Author

@kpicaza kpicaza Jan 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peter279k @mmoreram I've tried parallel extension with @xserrat a time ago, and we discard it for complexity issues(we make it run with a docker image created by @WyriHaximus) antidot-framework/antidot-react-psr15#1, both pthreads and parallel requires PHP compiled with ZTS and extra configuration(enable extension).
I propose to maintain this feature without the windows multi-thread support and opening a new issue to research and then add multi-thread support in windows systems.

What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check this. The Parallel extension looks good :).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kpicaza In the short term that's the easiest way to make this happen. Really interested to see how well this works with different event loops as this isn't something we've tried in the past.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peter279k Yeah, I've build https://github.com/reactphp-parallel around it for a reason 😉 .

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @WyriHaximus, @mmoreram, and @peter279k for guidance;- D. I've just opened a new issue #14 to add support and document the usage by parallel ext.

if (-1 === $pid) {
throw new RuntimeException('Fork Failed');
}

if (0 === $pid) {
$asyncServer();
pcntl_waitpid($pid, $status);
return $pid;
}

// @parent
$numberOfWorkers--;
++$numberOfFork;
if (self::DEFAULT_NUMBER_OF_FORKS < $numberOfWorkers) {
self::fork($numberOfWorkers, $asyncServer, $numberOfFork);
}

pcntl_waitpid($pid, $status);
return $pid;
}
}
13 changes: 12 additions & 1 deletion src/Container/Config/ConfigProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@

class ConfigProvider
{
private const DEFAULT_HOST = '0.0.0.0';
private const DEFAULT_PORT = 8080;
private const DEFAULT_CONCURRENCY = 100;
private const DEFAULT_BUFFER_SIZE = 4 * 1024 * 1024;

public function __invoke(): array
{
return [
Expand All @@ -24,7 +29,13 @@ public function __invoke(): array
Socket::class => SocketFactory::class,
],
],
'server' => []
'server' => [
'workers' => 1,
'host' => self::DEFAULT_HOST,
'port' => self::DEFAULT_PORT,
'max_concurrency' => self::DEFAULT_CONCURRENCY,
'buffer_size' => self::DEFAULT_BUFFER_SIZE,
]
];
}
}
2 changes: 0 additions & 2 deletions src/PromiseResponse.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,8 @@

namespace Antidot\React;

use Psr\Http\Message\ResponseInterface;
use React\Promise\PromiseInterface;
use RingCentral\Psr7\Response;
use Throwable;

class PromiseResponse extends Response implements PromiseInterface
{
Expand Down
14 changes: 10 additions & 4 deletions src/ServerFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
namespace Antidot\React;

use Antidot\Application\Http\Application;
use Assert\Assertion;
use Psr\Container\ContainerInterface;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
use React\EventLoop\LoopInterface;
use React\Http\Server;
Expand All @@ -15,26 +15,32 @@
use React\Http\Middleware\RequestBodyParserMiddleware;
use React\Http\Middleware\StreamingRequestMiddleware;
use React\Promise\PromiseInterface;
use function PHPUnit\Framework\assertArrayHasKey;
use function PHPUnit\Framework\assertIsInt;
use function React\Promise\resolve;

class ServerFactory
{
public function __invoke(ContainerInterface $container): Server
{
$application = $container->get(Application::class);
assert($application instanceof ReactApplication);
Assertion::isInstanceOf($application, ReactApplication::class);
/** @var LoopInterface $loop */
$loop = $container->get(LoopInterface::class);
/** @var array<string, array> $globalConfig */
$globalConfig = $container->get('config');
/** @var array<string, int|null> $config */
$config = $globalConfig['server'];
Assertion::keyExists($config, 'max_concurrency');
Assertion::keyExists($config, 'buffer_size');
Assertion::integer($config['max_concurrency']);
Assertion::integer($config['buffer_size']);

$server = new Server(
$loop,
new StreamingRequestMiddleware(),
new LimitConcurrentRequestsMiddleware(($config['max_concurrency']) ?? 100),
new RequestBodyBufferMiddleware($config['buffer_size'] ?? 4 * 1024 * 1024), // 4 MiB
new LimitConcurrentRequestsMiddleware($config['max_concurrency']),
new RequestBodyBufferMiddleware($config['buffer_size']),
new RequestBodyParserMiddleware(),
static fn (ServerRequestInterface $request): PromiseInterface => resolve($application->handle($request))
);
Expand Down
32 changes: 27 additions & 5 deletions src/SocketFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@

namespace Antidot\React;

use Assert\Assertion;
use Psr\Container\ContainerInterface;
use React\EventLoop\LoopInterface;
use React\Socket\Server as Socket;

class SocketFactory
{
private const DEFAULT_TCP_CONFIG = ['tcp' => ['so_reuseport' => false]];
private const REUSE_PORT = true;
private const DEFAULT_WORKERS_NUMBER = 1;

public function __invoke(ContainerInterface $container): Socket
{
/** @var LoopInterface $loop */
Expand All @@ -18,11 +23,28 @@ public function __invoke(ContainerInterface $container): Socket
$globalConfig = $container->get('config');
/** @var array<string, string|null> $config */
$config = $globalConfig['server'];
Assertion::notEmptyKey($config, 'host');
Assertion::notEmptyKey($config, 'port');
Assertion::keyExists($config, 'workers');
/** @var string $host */
$host = $config['host'];
Assertion::ipv4($host);
/** @var int $port */
$port = $config['port'];
Assertion::integer($port);
/** @var int $workersNumber */
$workersNumber = $config['workers'];
Assertion::integer($workersNumber);
$tcpConfig = self::DEFAULT_TCP_CONFIG;
if ($this->needMoreThanOne($workersNumber)) {
$tcpConfig['tcp']['so_reuseport'] = self::REUSE_PORT;
}

return new Socket(sprintf('%s:%s', $host, $port), $loop, $tcpConfig);
}

return new Socket(sprintf(
'%s:%s',
$config['host'] ?? '0.0.0.0',
$config['port'] ?? '8080'
), $loop);
private function needMoreThanOne(int $workersNumber): bool
{
return self::DEFAULT_WORKERS_NUMBER < $workersNumber;
}
}
23 changes: 23 additions & 0 deletions test/ChildTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php

declare(strict_types=1);

namespace AntidotTest\React;

use PHPUnit\Framework\TestCase;
use Symfony\Component\Process\Process;

class ChildTest extends TestCase
{
public function testItShouldBeConstructedStatically(): void
{
$process = new Process([
'php',
'-r',
'include "src/Child.php";\Antidot\React\Child::fork(1, function() { echo "test passed"; }, 0);'
]);
$process->start();
$process->wait();
$this->assertSame('test passed', $process->getOutput());
}
}
8 changes: 7 additions & 1 deletion test/Container/Config/ConfigProviderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,13 @@ public function testItShouldReturnTheConfigArray(): void
Socket::class => SocketFactory::class,
]
],
'server' => []
'server' => [
'workers' => 1,
'host' => '0.0.0.0',
'port' => 8080,
'max_concurrency' => 100,
'buffer_size' => 4194304,
]
],
$configProvider(),
);
Expand Down
53 changes: 50 additions & 3 deletions test/ServerFactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,59 @@ public function testItShouldCreateReactServerInstances(): void
->willReturnOnConsecutiveCalls(
$this->createMock(ReactApplication::class),
$this->createMock(LoopInterface::class),
['server' => [

]]
['server' => ['max_concurrency' => 100, 'buffer_size' => 43242]]
);
$factory = new ServerFactory();
$server = $factory($container);
$this->assertInstanceOf(Server::class, $server);
}

public function testItShouldThrowExceptionWithNonReactApplicationInstance(): void
{
$this->expectException(\InvalidArgumentException::class);
$container = $this->createMock(ContainerInterface::class);
$container->expects($this->once())
->method('get')
->with(Application::class)
->willReturn(
$this->createMock(Application::class)
);
$factory = new ServerFactory();
$factory($container);
}

/** @dataProvider getInvalidConfig */
public function testItShouldThrowExceptionWithInvalidConfig(array $config): void
{
$this->expectException(\InvalidArgumentException::class);
$container = $this->createMock(ContainerInterface::class);
$container->expects($this->exactly(3))
->method('get')
->withConsecutive([Application::class], [LoopInterface::class], ['config'])
->willReturnOnConsecutiveCalls(
$this->createMock(ReactApplication::class),
$this->createMock(LoopInterface::class),
$config
);
$factory = new ServerFactory();
$factory($container);
}

public function getInvalidConfig()
{
return [
[
['server' => ['max_concurrency' => 100]]
],
[
['server' => ['buffer_size' => 43525]]
],
[
['server' => ['max_concurrency' => 'hello', 'buffer_size' => 43525]]
],
[
['server' => ['max_concurrency' => 100, 'buffer_size' => []]]
],
];
}
}
44 changes: 43 additions & 1 deletion test/SocketFactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace AntidotTest\React;

use Antidot\React\SocketFactory;
use Assert\AssertionFailedException;
use PHPUnit\Framework\TestCase;
use Psr\Container\ContainerInterface;
use React\EventLoop\LoopInterface;
Expand All @@ -20,11 +21,52 @@ public function testItShouldcreateReactSocketInstances(): void
->withConsecutive([LoopInterface::class], ['config'])
->willReturnOnConsecutiveCalls(
$this->createMock(LoopInterface::class),
['server' => []]
['server' => ['workers' => 1, 'host' => '0.0.0.0', 'port' => 8080]]
);

$factory = new SocketFactory();
$socket = $factory($container);
$this->assertInstanceOf(Socket::class, $socket);
}

/** @dataProvider getInvalidConfig */
public function testItShouldThrowExceptionWithInvalidConfig(array $config): void
{
$this->expectException(AssertionFailedException::class);
$container = $this->createMock(ContainerInterface::class);
$container->expects($this->exactly(2))
->method('get')
->withConsecutive([LoopInterface::class], ['config'])
->willReturnOnConsecutiveCalls(
$this->createMock(LoopInterface::class),
$config
);

$factory = new SocketFactory();
$factory($container);
}

public function getInvalidConfig()
{
return [
'Bad Host' => [
['server' => ['port' => 8080, 'workers' => 3, 'host' => '756875.67867.7668.787']]
],
[
['server' => ['host' => '0.0.0.0', 'port' => ['test'], 'workers' => 3]]
],
[
['server' => ['host' => '0.0.0.0', 'port' => 8888, 'workers' => 'some']]
],
[
['server' => ['port' => 43525, 'workers' => 3]]
],
[
['server' => ['port' => 43525, 'host' => '0.0.0.0']]
],
[
['server' => ['host' => '0.0.0.0', 'workers' => 3]]
],
];
}
}