Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 29 additions & 21 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,37 +1,35 @@
# Learning

This project is a companion repository to the [Apache Kafka Connect course on Udemy](https://links.datacumulus.com/kafka-connect-coupon).

https://links.datacumulus.com/kafka-connect-coupon

# Kafka Connect Source GitHub

This connector allows you to get a stream of issues and pull requests from your GitHub repository, using the GitHub Api: https://developer.github.com/v3/issues/#list-issues-for-a-repository
This connector allows you to get a stream of issues and pull requests from GitHub repositories, using the GitHub Api: https://developer.github.com/v3/issues/#list-issues-for-a-repository

Issues are pulled based on `updated_at` field, meaning any update to an issue or pull request will appear in the stream.

The connector writes to topic that is great candidate to demonstrate *log compaction*. It's also a fun way to automate your GitHub workflow.

It's finally aimed to be an educative example to demonstrate how to write a Source Connector a little less trivial than the `FileStreamSourceConnector` provided in Kafka.

# Contributing

This connector is not perfect and can be improved, please feel free to submit any PR you deem useful.

# Configuration

```
name=GitHubSourceConnectorDemo
tasks.max=1
tasks.max=2
connector.class=com.simplesteph.kafka.GitHubSourceConnector
topic=github-issues
github.owner=kubernetes
github.repo=kubernetes
since.timestamp=2017-01-01T00:00:00Z
# I heavily recommend you set those two fields:
auth.username=your_username
auth.password=your_password
#Pattern to be followed for mentioning repositories owner/repo:topic
github.repos=kubernetes/kubernetes:github-issues-kubernetes,apache/kafka:github-issues-kafka
# I heavily recommend you set auth.accesstoken field:
#auth.username=
#auth.password=
auth.accesstoken=your_accestoken
```
Note: Configuration for **github.repos** should be set and should follow the pattern owner1/repo1:topic1,owner2/repo2:topic2 ....

You can control the number of tasks to run by using **tasks.max**. This allows work to be divided among tasks i.e., each task will be assigned few repositories and
will fetch issues for those repositories.

Set **since.timestamp** to fetch the issues of repositories which have been updated after the required timestamp.

Use either **auth.username** and **auth.password** or only **auth.accesstoken**. Using **auth.accesstoken** is preferable
because authentication with *username* and *password* has been deprecated and will soon be not supported by Github APIs.
For generating the *personal accesstoken* follow the steps in [https://help.github.com/en/github/authenticating-to-github/creating-a-personal-access-token-for-the-command-line](https://help.github.com/en/github/authenticating-to-github/creating-a-personal-access-token-for-the-command-line)

# Running in development

Expand All @@ -49,4 +47,14 @@ The simplest way to run `run.sh` is to have docker installed. It will pull a Doc

Note: Java 8 is required for this connector.

TODO
#### Distributed Mode

Build the project using `./build.sh`.

Paste the folder `target/kafka-connnect-github-source-1.1-package /share/java/kafka-connect-github-source`(this folder has all jars of the project)
in connect-workers' `plugin.path`(can be found in the connect-workers' properties)
directory. The connect-worker should be able to detect `GitHubSourceConnector`.

# Contributing

This connector can be improved much, please feel free to submit any PR you deem useful.
14 changes: 7 additions & 7 deletions config/GitHubSourceConnectorExample.properties
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
name=GitHubSourceConnectorDemo
tasks.max=1
tasks.max=2
connector.class=com.simplesteph.kafka.GitHubSourceConnector
topic=github-issues
github.owner=kubernetes
github.repo=kubernetes
since.timestamp=2017-01-01T00:00:00Z
# I heavily recommend you set those two fields:
# auth.username=your_username
# auth.password=your_password
#Pattern to be followed for mentioning repositories owner/repo:topic
github.repos=kubernetes/kubernetes:github-issues-kubernetes,apache/kafka:github-issues-kafka
# I heavily recommend you set auth.accesstoken field:
#auth.username=
#auth.password=
auth.accesstoken=your_accestoken
13 changes: 9 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,16 @@
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.mashape.unirest/unirest-java -->
<dependency>
<groupId>com.mashape.unirest</groupId>
<artifactId>unirest-java</artifactId>
<version>1.4.9</version>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20190722</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.10</version>
</dependency>

</dependencies>
Expand Down
109 changes: 72 additions & 37 deletions src/main/java/com/simplesteph/kafka/GitHubAPIHttpClient.java
Original file line number Diff line number Diff line change
@@ -1,25 +1,33 @@
package com.simplesteph.kafka;

import com.mashape.unirest.http.Headers;
import com.mashape.unirest.http.HttpResponse;
import com.mashape.unirest.http.JsonNode;
import com.mashape.unirest.http.Unirest;
import com.mashape.unirest.http.exceptions.UnirestException;
import com.mashape.unirest.request.GetRequest;
import com.simplesteph.kafka.utils.SetBasicAuthUtil;
import com.simplesteph.kafka.utils.SetBearerAuthUtil;
import org.apache.http.Header;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.util.EntityUtils;
import org.apache.kafka.connect.errors.ConnectException;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.HashMap;

// GitHubHttpAPIClient used to launch HTTP Get requests
public class GitHubAPIHttpClient {
public class GitHubAPIHttpClient implements Closeable {

private static final Logger log = LoggerFactory.getLogger(GitHubAPIHttpClient.class);

private HttpClientProvider httpClientProvider;
private HttpClient httpClient;
// for efficient http requests
private Integer XRateLimit = 9999;
private Integer XRateRemaining = 9999;
Expand All @@ -29,69 +37,90 @@ public class GitHubAPIHttpClient {

public GitHubAPIHttpClient(GitHubSourceConnectorConfig config){
this.config = config;
this.httpClientProvider=new HttpClientProvider();
this.httpClient=this.httpClientProvider.getHttpClient();
}

protected JSONArray getNextIssues(Integer page, Instant since) throws InterruptedException {
protected JSONArray getNextIssues(RepositoryVariables repoVar) throws InterruptedException {

HttpResponse<JsonNode> jsonResponse;
HttpResponse httpResponse;
try {
jsonResponse = getNextIssuesAPI(page, since);

// deal with headers in any case
Headers headers = jsonResponse.getHeaders();
XRateLimit = Integer.valueOf(headers.getFirst("X-RateLimit-Limit"));
XRateRemaining = Integer.valueOf(headers.getFirst("X-RateLimit-Remaining"));
XRateReset = Integer.valueOf(headers.getFirst("X-RateLimit-Reset"));
switch (jsonResponse.getStatus()){
httpResponse = getNextIssuesAPI(repoVar);
//reading the headers of the response
HashMap<String,String> headers=new HashMap<String, String>();
for(Header h:httpResponse.getAllHeaders())
headers.put(h.getName(),h.getValue());

XRateLimit = Integer.valueOf(httpResponse.getFirstHeader("X-RateLimit-Limit").getValue());
XRateRemaining = Integer.valueOf(httpResponse.getFirstHeader("X-RateLimit-Remaining").getValue());
XRateReset = Integer.valueOf(httpResponse.getFirstHeader("X-RateLimit-Reset").getValue());
//reading the httpResponse content(body)
String jsonResponse= EntityUtils.toString(httpResponse.getEntity());
JSONArray jsonArray=new JSONArray();
JSONObject jsonBody=new JSONObject();
try{
//try to read httpResponse as JSONArray if possible
jsonArray=new JSONArray(jsonResponse);
}
catch (JSONException ex){
//read as JSONObject
jsonBody=new JSONObject(jsonResponse);
}

switch (httpResponse.getStatusLine().getStatusCode()){
case 200:
return jsonResponse.getBody().getArray();
return jsonArray;
case 401:
throw new ConnectException("Bad GitHub credentials provided, please edit your config");
case 403:
// we have issues too many requests.
log.info(jsonResponse.getBody().getObject().getString("message"));
log.info(jsonBody.getString("message"));
log.info(String.format("Your rate limit is %s", XRateLimit));
log.info(String.format("Your remaining calls is %s", XRateRemaining));
log.info(String.format("The limit will reset at %s",
LocalDateTime.ofInstant(Instant.ofEpochSecond(XRateReset), ZoneOffset.systemDefault())));
long sleepTime = XRateReset - Instant.now().getEpochSecond();
log.info(String.format("Sleeping for %s seconds", sleepTime ));
Thread.sleep(1000 * sleepTime);
return getNextIssues(page, since);
return getNextIssues(repoVar);
default:
log.error(constructUrl(page, since));
log.error(String.valueOf(jsonResponse.getStatus()));
log.error(jsonResponse.getBody().toString());
log.error(jsonResponse.getHeaders().toString());
log.error(constructUrl(repoVar));
log.error(String.valueOf(httpResponse.getStatusLine().getStatusCode()));
log.error(jsonResponse);
log.error(headers.toString());
log.error("Unknown error: Sleeping 5 seconds " +
"before re-trying");
Thread.sleep(5000L);
return getNextIssues(page, since);
return getNextIssues(repoVar);
}
} catch (UnirestException e) {
} catch (IOException e) {
e.printStackTrace();
Thread.sleep(5000L);
return new JSONArray();
}
}

protected HttpResponse<JsonNode> getNextIssuesAPI(Integer page, Instant since) throws UnirestException {
GetRequest unirest = Unirest.get(constructUrl(page, since));
if (!config.getAuthUsername().isEmpty() && !config.getAuthPassword().isEmpty() ){
unirest = unirest.basicAuth(config.getAuthUsername(), config.getAuthPassword());
protected HttpResponse getNextIssuesAPI(RepositoryVariables repoVar) throws IOException {
HttpGet httpGet=new HttpGet(constructUrl(repoVar));
if(!config.getAuthAccesstoken().isEmpty()){
SetBearerAuthUtil.SetBearerAuthUtil(httpGet,config.getAuthAccesstoken());
}
log.debug(String.format("GET %s", unirest.getUrl()));
return unirest.asJson();

else if (!config.getAuthUsername().isEmpty() && !config.getAuthPassword().isEmpty() ){
SetBasicAuthUtil.SetBasicAuth(httpGet,config.getAuthUsername(), config.getAuthPassword());
}
HttpResponse response= httpClient.execute(httpGet);
return response;
}

protected String constructUrl(Integer page, Instant since){
protected String constructUrl(RepositoryVariables repoVar){
return String.format(
"https://api.github.com/repos/%s/%s/issues?page=%s&per_page=%s&since=%s&state=all&direction=asc&sort=updated",
config.getOwnerConfig(),
config.getRepoConfig(),
page,
repoVar.getOwner(),
repoVar.getRepoName(),
repoVar.nextPageToVisit,
config.getBatchSize(),
since.toString());
repoVar.nextQuerySince.toString());
}

public void sleep() throws InterruptedException {
Expand All @@ -108,4 +137,10 @@ public void sleepIfNeed() throws InterruptedException {
sleep();
}
}

@Override
public void close() throws IOException {
if(httpClientProvider!=null)
httpClientProvider.close();
}
}
35 changes: 33 additions & 2 deletions src/main/java/com/simplesteph/kafka/GitHubSourceConnector.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.simplesteph.kafka;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.simplesteph.kafka.utils.RepoJoinUtil;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
Expand All @@ -13,6 +15,7 @@
public class GitHubSourceConnector extends SourceConnector {
private static Logger log = LoggerFactory.getLogger(GitHubSourceConnector.class);
private GitHubSourceConnectorConfig config;
private Map<String, String> settings;

@Override
public String version() {
Expand All @@ -22,6 +25,7 @@ public String version() {
@Override
public void start(Map<String, String> map) {
config = new GitHubSourceConnectorConfig(map);
settings=map;
}

@Override
Expand All @@ -32,8 +36,35 @@ public Class<? extends Task> taskClass() {
@Override
public List<Map<String, String>> taskConfigs(int i) {
// Define the individual task configurations that will be executed.
ArrayList<Map<String, String>> configs = new ArrayList<>(1);
configs.add(config.originalsStrings());
String repos[]=config.getReposConfig();
if(repos.length<i){
log.info(String.format("Number of repositories i.e., %d to be followed are less than max tasks i.e., %d \n" +
"Therefore, setting max tasks to %d",repos.length,i,repos.length));
i=repos.length;//setting max tasks to number repositories
}

ArrayList<Map<String, String>> configs = new ArrayList<>(i);

//Distributing the repositories among the taskConfigs efficiently so that the possible max of repos per task is min
int remainingRepos=repos.length;
int currentRepo=0;
while(remainingRepos>0){
int NumOfReposForTask=(remainingRepos/i) +((remainingRepos%i==0)?0:1);
String ReposForTask="";
for(int j=0;j<NumOfReposForTask;j++){
ReposForTask= RepoJoinUtil.Join(ReposForTask,repos[currentRepo+j]);
}
//Creating task setting/config
Map<String, String> task_settings=new HashMap<>(settings);
//updating repos for the task
task_settings.put(GitHubSourceConnectorConfig.REPOS_CONFIG,ReposForTask);
configs.add(task_settings);

currentRepo+=NumOfReposForTask;
remainingRepos-=NumOfReposForTask;
i--;
}

return configs;
}

Expand Down
Loading