Skip to content

implements sending concurrent requests with curl multi #24

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .codeclimate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ engines:
enabled: true
config:
file_extensions: "php"
standard: "PSR1,PSR2"
ratings:
paths:
- "**.php"
Expand Down
197 changes: 171 additions & 26 deletions lib/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ class Client
protected $path;
/** @var array */
protected $curlOptions;
/** @var bool $isConcurrentRequest */
protected $isConcurrentRequest;
/** @var array $savedRequests */
protected $savedRequests;
/** @var bool */
protected $retryOnLimit;

Expand All @@ -49,10 +53,10 @@ class Client
/**
* Initialize the client
*
* @param string $host the base url (e.g. https://api.sendgrid.com)
* @param array $headers global request headers
* @param string $version api version (configurable)
* @param array $path holds the segments of the url path
* @param string $host the base url (e.g. https://api.sendgrid.com)
* @param array $headers global request headers
* @param string $version api version (configurable)
* @param array $path holds the segments of the url path
*/
public function __construct($host, $headers = [], $version = '/v3', $path = [])
{
Expand All @@ -63,6 +67,8 @@ public function __construct($host, $headers = [], $version = '/v3', $path = [])

$this->curlOptions = [];
$this->retryOnLimit = false;
$this->isConcurrentRequest = false;
$this->savedRequests = [];
}

/**
Expand Down Expand Up @@ -125,6 +131,20 @@ public function setRetryOnLimit($retry)
return $this;
}

/**
* set concurrent request flag
*
* @param bool $isConcurrent
*
* @return Client
*/
public function setIsConcurrentRequest($isConcurrent)
{
$this->isConcurrentRequest = $isConcurrent;

return $this;
}

/**
* @return array
*/
Expand All @@ -150,43 +170,93 @@ private function buildUrl($queryParams = null)
}

/**
* Make the API call and return the response. This is separated into
* it's own function, so we can mock it easily for testing.
*
* @param string $method the HTTP verb
* @param string $url the final url to call
* @param array $body request body
* @param array $headers any additional request headers
* @param bool $retryOnLimit should retry if rate limit is reach?
*
* @return Response object
*/
public function makeRequest($method, $url, $body = null, $headers = null, $retryOnLimit = false)
* Creates curl options for a request
* this function does not mutate any private variables
*
* @param string $method
* @param array $body
* @param array $headers
* @return array
*/
private function createCurlOptions($method, $body = null, $headers = null)
{
$curl = curl_init($url);

$options = array_merge(
[
CURLOPT_RETURNTRANSFER => true,
CURLOPT_HEADER => 1,
CURLOPT_CUSTOMREQUEST => strtoupper($method),
CURLOPT_SSL_VERIFYPEER => false,
CURLOPT_FAILONERROR => false,
CURLOPT_FAILONERROR => false
],
$this->curlOptions
);

curl_setopt_array($curl, $options);

if (isset($headers)) {
$this->headers = array_merge($this->headers, $headers);
$headers = array_merge($this->headers, $headers);
} else {
$headers = [];
}

if (isset($body)) {
$encodedBody = json_encode($body);
curl_setopt($curl, CURLOPT_POSTFIELDS, $encodedBody);
$this->headers = array_merge($this->headers, ['Content-Type: application/json']);
$options[CURLOPT_POSTFIELDS] = $encodedBody;
$headers = array_merge($headers, ['Content-Type: application/json']);
}
curl_setopt($curl, CURLOPT_HTTPHEADER, $this->headers);
$options[CURLOPT_HTTPHEADER] = $headers;

return $options;
}

/**
* @param array $requestData
* e.g. ['method' => 'POST', 'url' => 'www.example.com', 'body' => 'test body', 'headers' => []]
* @param bool $retryOnLimit
*
* @return array
*/
private function createSavedRequest($requestData, $retryOnLimit = false)
{
return array_merge($requestData, ['retryOnLimit' => $retryOnLimit]);
}

/**
* @param array $requests
*
* @return array
*/
private function createCurlMultiHandle($requests)
{
$channels = [];
$multiHandle = curl_multi_init();

foreach ($requests as $id => $data) {
$channels[$id] = curl_init($data['url']);
$curlOpts = $this->createCurlOptions($data['method'], $data['body'], $data['headers']);
curl_setopt_array($channels[$id], $curlOpts);
curl_multi_add_handle($multiHandle, $channels[$id]);
}

return [$channels, $multiHandle];
}

/**
* Make the API call and return the response. This is separated into
* it's own function, so we can mock it easily for testing.
*
* @param string $method the HTTP verb
* @param string $url the final url to call
* @param array $body request body
* @param array $headers any additional request headers
* @param bool $retryOnLimit should retry if rate limit is reach?
*
* @return Response object
*/
public function makeRequest($method, $url, $body = null, $headers = null, $retryOnLimit = false)
{
$curl = curl_init($url);

$curlOpts = $this->createCurlOptions($method, $body, $headers);
curl_setopt_array($curl, $curlOpts);

$response = curl_exec($curl);
$headerSize = curl_getinfo($curl, CURLINFO_HEADER_SIZE);
Expand All @@ -212,6 +282,66 @@ public function makeRequest($method, $url, $body = null, $headers = null, $retry
return $response;
}

/**
* Send all saved requests at once
*
* @param array $requests
* @return Response[]
*/
public function makeAllRequests($requests = [])
{
if (empty($requests)) {
$requests = $this->savedRequests;
}
list ($channels, $multiHandle) = $this->createCurlMultiHandle($requests);

// running all requests
$isRunning = null;
do {
curl_multi_exec($multiHandle, $isRunning);
} while ($isRunning);

// get response and close all handles
$retryRequests = [];
$responses = [];
$sleepDurations = 0;
foreach ($channels as $id => $ch) {
$response = curl_multi_getcontent($ch);
$headerSize = curl_getinfo($ch, CURLINFO_HEADER_SIZE);
$statusCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
$responseBody = substr($response, $headerSize);

$responseHeaders = substr($response, 0, $headerSize);
$responseHeaders = explode("\n", $responseHeaders);
$responseHeaders = array_map('trim', $responseHeaders);

$response = new Response($statusCode, $responseBody, $responseHeaders);
if (($statusCode === 429) && $requests[$id]['retryOnLimit']) {
$headers = $response->headers(true);
$sleepDurations = max($sleepDurations, $headers['X-Ratelimit-Reset'] - time());
$requestData = [
'method' => $requests[$id]['method'],
'url' => $requests[$id]['url'],
'body' => $requests[$id]['body'],
'headers' =>$headers,
];
$retryRequests[] = $this->createSavedRequest($requestData, false);
} else {
$responses[] = $response;
}

curl_multi_remove_handle($multiHandle, $ch);
}
curl_multi_close($multiHandle);

// retry requests
if (!empty($retryRequests)) {
sleep($sleepDurations > 0 ? $sleepDurations : 0);
$responses = array_merge($responses, $this->makeAllRequests($retryRequests));
}
return $responses;
}

/**
* Add variable values to the url.
* (e.g. /your/api/{variable_value}/call)
Expand Down Expand Up @@ -242,7 +372,7 @@ public function _($name = null)
* @param string $name name of the dynamic method call or HTTP verb
* @param array $args parameters passed with the method call
*
* @return Client|Response object
* @return Client|Response|Response[]|null object
*/
public function __call($name, $args)
{
Expand All @@ -253,12 +383,27 @@ public function __call($name, $args)
return $this->_();
}

// send all saved requests
if (($name === 'send') && $this->isConcurrentRequest) {
return $this->makeAllRequests();
}

if (in_array($name, $this->methods, true)) {
$body = isset($args[0]) ? $args[0] : null;
$queryParams = isset($args[1]) ? $args[1] : null;
$url = $this->buildUrl($queryParams);
$headers = isset($args[2]) ? $args[2] : null;
$retryOnLimit = isset($args[3]) ? $args[3] : $this->retryOnLimit;

if ($this->isConcurrentRequest) {
// save request to be sent later
$this->savedRequests[] = $this->createSavedRequest(
['method' => $name, 'url' => $url, 'body' => $body, 'headers' => $headers],
$retryOnLimit
);
return null;
}

return $this->makeRequest($name, $url, $body, $headers, $retryOnLimit);
}

Expand Down
12 changes: 12 additions & 0 deletions test/unit/ClientTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,16 @@ public function testGetCurlOptions()
$client = new Client('https://localhost:4010');
$this->assertSame([], $client->getCurlOptions());
}

public function testCurlMulti()
{
$client = new Client('https://localhost:4010');
$client->setIsConcurrentRequest(true);
$client->get(['name' => 'A New Hope']);
$client->get(null, null, ['X-Mock: 200']);
$client->get(null, ['limit' => 100, 'offset' => 0]);

// returns 3 response object
$this->assertEquals(3, count($client->send()));
}
}