Skip to content

Commit dfebdd4

Browse files
author
linty
committed
clean up for open source
0 parents  commit dfebdd4

File tree

4 files changed

+357
-0
lines changed

4 files changed

+357
-0
lines changed

Example.php

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<?php
2+
3+
require 'vendor/autoload.php';
4+
5+
use Lmstfy\Client;
6+
7+
$addr = "127.0.0.1:7777";
8+
$namespace = "guest";
9+
$token = "01CG17XY4HA7CQCMHSS8GTQ";
10+
$queue = "test-queue";
11+
12+
$cli = new Client($addr, $namespace, $token);
13+
// publish new job
14+
$cli->Publish($queue, "bar", 0, 10, 0);
15+
// consume the job from queue
16+
$job = $cli->Consume($queue, 5, 3);
17+
// process job
18+
var_dump($job);
19+
// ack job
20+
$cli->Ack($queue, $job["job_id"]);
21+
$cli->Close();

README.md

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# php-lmstfy-client
2+
3+
Impl php client for lmstfy(Let Me Schedule Task For You)
4+
5+
## Install
6+
7+
```
8+
> git clone [email protected]:platform/php-lmstfy-client.git
9+
> cd php-lmstfy-client
10+
> composer install
11+
```
12+
13+
## Example
14+
15+
```php
16+
<?php
17+
18+
require 'vendor/autoload.php';
19+
20+
use Lmstfy\Client;
21+
22+
$addr = "127.0.0.1:7777";
23+
$namespace = "guest";
24+
$token = "01CG17XY4HA7CQCMHSS8GTQ";
25+
$queue = "test-queue";
26+
27+
$cli = new Client($addr, $namespace, $token);
28+
// publish new job
29+
$cli->Publish($queue, "bar", 0, 10, 0);
30+
// consume the job from queue
31+
$job = $cli->Consume($queue, 5, 3);
32+
// process job
33+
var_dump($job);
34+
// ack job
35+
$cli->Ack($queue, $job["job_id"]);
36+
$cli->Close();
37+
```
38+
39+
## Test
40+
41+
```
42+
> make test
43+
```

composer.json

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
{
2+
"name": "platform/php-lmstfy-client",
3+
"description": "lmstfy php client",
4+
"license": "Apache-2.0",
5+
"authors": [
6+
{
7+
"name": "hulk",
8+
"email": "[email protected]"
9+
}
10+
],
11+
"require-dev" : {
12+
"phpunit/phpunit": "5.6.*"
13+
},
14+
"autoload" : {
15+
"psr-4" : {"Lmstfy\\" : "src/"}
16+
}
17+
}

src/Client.php

Lines changed: 276 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
<?php
2+
3+
namespace Lmstfy;
4+
5+
class Client {
6+
private $addr;
7+
private $namespace;
8+
private $token;
9+
private $url;
10+
11+
12+
/**
13+
* constructor
14+
*
15+
* @var String $addr server address, eg: "127.0.0.1:9999"
16+
* @var String $namespace namespace of queue
17+
* @var String $token secret to access the namespace
18+
*/
19+
public function __construct($addr, $namespace, $token) {
20+
$this->addr = $addr;
21+
$this->namespace = $namespace;
22+
$this->token = $token;
23+
$this->url = join('/', array(trim($this->addr, '/'), 'api', $this->namespace));
24+
$this->ch = curl_init();
25+
}
26+
27+
/**
28+
* Publish a job to task queue
29+
*
30+
* @var String $queue publish to which queue
31+
* @var String $data job data to store
32+
* @var Int $ttl time to live(unit: Second). TTL should be always gt the delay, or the task would never be consumed
33+
* @var Int $tries max retry times of the job. If tires = 1, the job would be consumed at most once
34+
* @var Int $delay job could be consumed after delay seconds
35+
*/
36+
public function Publish($queue, $data, $ttl = 0, $tries = 1, $delay = 0) {
37+
if (empty($queue)) {
38+
throw new \Exception("queue name can't be empty");
39+
}
40+
if ($ttl < 0) {
41+
throw new \Exception("ttl(time to live) should be >= 0");
42+
}
43+
if ($tries <= 0) {
44+
throw new \Exception("retries should be > 0");
45+
}
46+
if ($delay < 0) {
47+
throw new \Exception("delay should be >= 0");
48+
}
49+
50+
$query = array('ttl'=>$ttl, 'tries'=>$tries, 'delay'=>$delay);
51+
$response = $this->doRequest($queue, 'PUT', http_build_query($query), $data);
52+
if ($response['code'] != 201) {
53+
throw new \Exception("failed to publish while got bad response code:".$response['code']);
54+
}
55+
$body = json_decode($response['body'], true);
56+
return $body['job_id'];
57+
}
58+
59+
/*
60+
* Consume job from task queue
61+
* @var String $queue queue name
62+
* @var Int $ttr time to run(unit: Second). If consumer didn't ack after exceed the ttr,
63+
* the job would be re-consumed if retry times wasn't reached, or fall into deadletter queue.
64+
* The job in deadletter queue would be disappeared til it's respwan by the user manually.
65+
* @var Int $timeout client blocking wait for new job(unit: Second), 0 would be blocking forever
66+
*/
67+
public function Consume($queue, $ttr = 120, $timeout = 0) {
68+
if (empty($queue)) {
69+
throw new \Exception("queue name can't be empty");
70+
}
71+
if ($ttr <= 0) {
72+
throw new \Exception("ttr(time to run) should be > 0");
73+
}
74+
if ($timeout < 0 || $timeout > 600) {
75+
throw new \Exception("timeout should be >= 0 && <= 600");
76+
}
77+
78+
$query = array('ttr'=>$ttr, 'timeout'=>$timeout);
79+
$response = $this->doRequest($queue, 'GET', http_build_query($query));
80+
if ($response['code'] == 404) {
81+
return NULL;
82+
}
83+
if ($response['code'] != 200) {
84+
throw new \Exception("failed to consume while got bad response code:".$response['code']);
85+
}
86+
$job = json_decode($response['body'], true);
87+
$job['data'] = base64_decode($job['data']);
88+
return $job;
89+
}
90+
91+
/*
92+
* Ack job, and server would delete the job
93+
* @var String $queue queue name
94+
* @var String $job_id job id
95+
*/
96+
public function Ack($queue, $job_id) {
97+
if (empty($queue)) {
98+
throw new \Exception("queue name can't be empty");
99+
}
100+
if (empty($job_id)) {
101+
throw new \Exception("job id can't be empty");
102+
}
103+
$response = $this->doRequest($queue.'/job/'.$job_id, 'DELETE');
104+
if ($response['code'] != 204) {
105+
throw new \Exception("failed to ack while got bad response code:".$response['code']);
106+
}
107+
return true;
108+
}
109+
110+
/*
111+
* Get job from queue
112+
* @var String $queue queue name
113+
* @var String $job_id job id
114+
*/
115+
public function GetJob($queue, $job_id) {
116+
if (empty($queue)) {
117+
throw new \Exception("queue name can't be empty");
118+
}
119+
if (empty($job_id)) {
120+
throw new \Exception("job id can't be empty");
121+
}
122+
$response = $this->doRequest($queue.'/job/'.$job_id, 'GET');
123+
if ($response['code'] != 200) {
124+
throw new \Exception("failed to consume while got bad response code:".$response['code']);
125+
}
126+
$job = json_decode($response['body'], true);
127+
$job['data'] = base64_decode($job['data']);
128+
return $job;
129+
}
130+
131+
/*
132+
* Queue return the size of queue
133+
* @var String $queue queue name
134+
*/
135+
public function QueueSize($queue) {
136+
if (empty($queue)) {
137+
throw new \Exception("queue name can't be empty");
138+
}
139+
$response = $this->doRequest($queue.'/size', 'GET');
140+
if ($response['code'] != 200) {
141+
throw new \Exception("failed to get queue size while got bad response code:".$response['code']);
142+
}
143+
$body = json_decode($response['body'], true);
144+
return $body['size'];
145+
}
146+
147+
/*
148+
* Queue return the size of queue
149+
* @var String $queue queue name
150+
*/
151+
public function PeekQueue($queue) {
152+
if (empty($queue)) {
153+
throw new \Exception("queue name can't be empty");
154+
}
155+
$response = $this->doRequest($queue.'/peek', 'GET');
156+
if ($response['code'] != 200) {
157+
throw new \Exception("failed to peek queue while got bad response code:".$response['code']);
158+
}
159+
$job = json_decode($response['body'], true);
160+
$job['data'] = base64_decode($job['data']);
161+
return $job;
162+
}
163+
164+
/*
165+
* PeekDeadLetter peek a job from dead letter
166+
* @var String $queue queue name
167+
*/
168+
public function PeekDeadLetter($queue) {
169+
if (empty($queue)) {
170+
throw new \Exception("queue name can't be empty");
171+
}
172+
$response = $this->doRequest($queue.'/deadletter', 'GET');
173+
if ($response['code'] != 200) {
174+
throw new \Exception("failed to peek dead letter while got bad response code:".$response['code']);
175+
}
176+
$job = json_decode($response['body'], true);
177+
$job['data'] = base64_decode($job['data']);
178+
return $job;
179+
}
180+
181+
/*
182+
* RespawnDeadLetter respawn job from dead letter
183+
* @var String $queue queue name
184+
* @var Int $limit max number of job to respawn
185+
* @var Int $ttl time to live
186+
*/
187+
public function RespawnDeadLetter($queue, $limit, $ttl) {
188+
if (empty($queue)) {
189+
throw new \Exception("queue name can't be empty");
190+
}
191+
if ($limit < 0) {
192+
throw new \Exception("limit should be > 0");
193+
}
194+
if ($ttl < 0) {
195+
throw new \Exception("ttl should be >= 0");
196+
}
197+
$query = array('limit'=>$limit, 'ttl'=>$ttl);
198+
$response = $this->doRequest($queue.'/deadletter', 'PUT');
199+
if ($response['code'] != 200) {
200+
throw new \Exception("failed to respawn dead letter while got bad response code:".$response['code']);
201+
}
202+
$body = json_decode($response['body'], true);
203+
return $body['count'];
204+
}
205+
206+
/*
207+
* ConsumeMultiQueues can be used to impl the prio queue,
208+
* server would fetch job from Q1, Q2, ... QN with order
209+
* @var Int $ttl time to live
210+
* @var Int $timeout client blocking wait for new job(unit: Second)
211+
*/
212+
public function ConsumeMultiQueues($ttr, $timeout, ...$queues) {
213+
if ($ttr <= 0) {
214+
throw new \Exception("ttr(time to run) should be > 0");
215+
}
216+
if ($timeout < 0 || $timeout > 600) {
217+
throw new \Exception("timeout should be >= 0 && <= 600");
218+
}
219+
if (count($queues) <= 0) {
220+
throw new \Exception("consume atleast one queue");
221+
}
222+
$query = array('ttr'=>$ttr, 'timeout'=>$timeout);
223+
$response = $this->doRequest(join(',', $queues), 'GET', $query);
224+
if ($response['code'] != 200) {
225+
throw new \Exception("failed to consume while got bad response code:".$response['code']);
226+
}
227+
$job = json_decode($response['body'], true);
228+
$job['data'] = base64_decode($job['data']);
229+
return $job;
230+
}
231+
232+
public function Close() {
233+
curl_close($this->ch);
234+
$this->ch = NULL;
235+
}
236+
237+
private function doRequest($relativePath, $method, $query='', $data='') {
238+
if ($this->ch == NULL) {
239+
$this->ch = curl_init();
240+
}
241+
$headers= array(
242+
"X-Token:".$this->token
243+
);
244+
$url = $this->url.'/'.$relativePath;
245+
if (!empty($query)) {
246+
$url = $url.'?'.$query;
247+
}
248+
$ch = $this->ch;
249+
curl_setopt($ch, CURLOPT_URL, $url);
250+
curl_setopt($ch, CURLOPT_HTTPHEADER, $headers);
251+
// connect timeout 1500ms
252+
curl_setopt($ch, CURLOPT_CONNECTTIMEOUT_MS, 1500);
253+
// socket timeout 300s
254+
curl_setopt($ch, CURLOPT_TIMEOUT_MS, 300000);
255+
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, $method);
256+
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
257+
if (!empty($data)) {
258+
if (is_string($data)) {
259+
curl_setopt($ch, CURLOPT_POSTFIELDS,$data);
260+
} else {
261+
curl_setopt($ch, CURLOPT_POSTFIELDS,http_build_query($data));
262+
}
263+
}
264+
$body = curl_exec($ch);
265+
$errno = curl_errno($ch);
266+
if ($errno != 0 && $errno != CURLE_HTTP_NOT_FOUND) {
267+
$this->Close();
268+
throw new \Exception("failed to curl while error:".curl_strerror($errno));
269+
}
270+
$res = array(
271+
'code' => curl_getinfo($ch, CURLINFO_HTTP_CODE),
272+
'body' => $body
273+
);
274+
return $res;
275+
}
276+
}

0 commit comments

Comments
 (0)