Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion components/gmail/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@pipedream/gmail",
"version": "0.2.0",
"version": "0.2.1",
"description": "Pipedream Gmail Components",
"main": "gmail.app.mjs",
"keywords": [
Expand Down
51 changes: 33 additions & 18 deletions components/gmail/sources/common/polling-history.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ export default {
const params = {
maxResults: constants.HISTORICAL_EVENTS,
};
if (this.label) {
params.labelIds = this.label;
if (this.labels?.length) {
params.labelIds = this.labels;
}
let { messages } = await this.gmail.listMessages({
params,
...params,
});
if (!messages?.length) {
return;
Expand All @@ -48,30 +48,42 @@ export default {
startHistoryId,
historyTypes: this.getHistoryTypes(),
};
if (this.label) {
opts.labelId = this.label;
}

const {
history, historyId,
} = await this.gmail.listHistory(opts);
const length = this.labels?.length > 0
? this.labels.length
: 1;
let maxHistoryId = 0;

if (!history) {
return;
}
this._setLastHistoryId(historyId);
for (let i = 0; i < length; i++) {
if (this.labels) {
opts.labelId = this.labels[i];
}

const responseArray = this.filterHistory(history);
for (const item of responseArray) {
await this.emitFullMessage(item.messages[0].id);
const {
history, historyId,
} = await this.gmail.listHistory(opts);

if (!history) {
continue;
}

maxHistoryId = Math.max(maxHistoryId, historyId);
const responseArray = this.filterHistory(history);

for (const item of responseArray) {
await this.emitFullMessage(item.messages[0].id);
}
}
if (maxHistoryId > 0) {
this._setLastHistoryId(maxHistoryId);
}
},
async emitRecentMessages() {
const opts = {
maxResults: constants.HISTORICAL_EVENTS,
};
if (this.label) {
opts.labelIds = this.label;
if (this.labels?.length) {
opts.labelIds = this.labels;
}
let { messages } = await this.gmail.listMessages(opts);
if (!messages?.length) {
Expand All @@ -88,6 +100,9 @@ export default {
message = await this.gmail.getMessage({
id,
});
if (this.excludeLabels && message.labelIds.some((i) => this.excludeLabels.includes(i))) {
return;
}
} catch {
console.log(`Message ${id} not found`);
}
Expand Down
91 changes: 60 additions & 31 deletions components/gmail/sources/new-email-received/new-email-received.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
name: "New Email Received",
description: "Emit new event when a new email is received.",
type: "source",
version: "0.1.9",
version: "0.1.10",
dedupe: "unique",
props: {
gmail,
Expand All @@ -31,14 +31,14 @@
"Configuring this source as a `webhook` (instant) trigger requires a custom OAuth client. [Refer to the guide here to get started](https://pipedream.com/apps/gmail/#getting-started).",
reloadProps: true,
},
serviceAccountKeyJson: {

Check warning on line 34 in components/gmail/sources/new-email-received/new-email-received.mjs

View workflow job for this annotation

GitHub Actions / Lint Code Base

Component prop serviceAccountKeyJson must have a description. See https://pipedream.com/docs/components/guidelines/#props
type: "string",
label: "Service Account Key JSON",
optional: true,
hidden: true,
reloadProps: true,
},
serviceAccountKeyJsonInstructions: {

Check warning on line 41 in components/gmail/sources/new-email-received/new-email-received.mjs

View workflow job for this annotation

GitHub Actions / Lint Code Base

Component prop serviceAccountKeyJsonInstructions must have a label. See https://pipedream.com/docs/components/guidelines/#props

Check warning on line 41 in components/gmail/sources/new-email-received/new-email-received.mjs

View workflow job for this annotation

GitHub Actions / Lint Code Base

Component prop serviceAccountKeyJsonInstructions must have a description. See https://pipedream.com/docs/components/guidelines/#props
type: "alert",
alertType: "info",
content: `1) [Create a service account in GCP](https://cloud.google.com/iam/docs/creating-managing-service-accounts) and set the following permission: **Pub/Sub Admin**
Expand Down Expand Up @@ -72,16 +72,31 @@
hidden: true,
reloadProps: true,
},
label: {
labels: {
propDefinition: [
gmail,
"label",
],
default: "INBOX",
type: "string[]",
label: "Labels",
default: [
"INBOX",
],
optional: true,
hidden: true,
},
excludeLabels: {
propDefinition: [
gmail,
"label",
],
type: "string[]",
label: "Exclude Labels",
description: "Emails with the specified labels will be excluded from results",
optional: true,
hidden: true,
},
permissionAlert: {

Check warning on line 99 in components/gmail/sources/new-email-received/new-email-received.mjs

View workflow job for this annotation

GitHub Actions / Lint Code Base

Component prop permissionAlert must have a label. See https://pipedream.com/docs/components/guidelines/#props

Check warning on line 99 in components/gmail/sources/new-email-received/new-email-received.mjs

View workflow job for this annotation

GitHub Actions / Lint Code Base

Component prop permissionAlert must have a description. See https://pipedream.com/docs/components/guidelines/#props
type: "alert",
alertType: "error",
content: `Unable to grant publish permission to Gmail API service account.
Expand All @@ -94,7 +109,7 @@
`,
hidden: true,
},
latencyWarningAlert: {

Check warning on line 112 in components/gmail/sources/new-email-received/new-email-received.mjs

View workflow job for this annotation

GitHub Actions / Lint Code Base

Component prop latencyWarningAlert must have a label. See https://pipedream.com/docs/components/guidelines/#props

Check warning on line 112 in components/gmail/sources/new-email-received/new-email-received.mjs

View workflow job for this annotation

GitHub Actions / Lint Code Base

Component prop latencyWarningAlert must have a description. See https://pipedream.com/docs/components/guidelines/#props
type: "alert",
alertType: "warning",
content:
Expand Down Expand Up @@ -221,7 +236,8 @@
};
}
}
props.label.hidden = false;
props.labels.hidden = false;
props.excludeLabels.hidden = false;
return newProps;
},
hooks: {
Expand Down Expand Up @@ -354,8 +370,8 @@
path: "/users/me/watch",
data: {
topicName,
labelIds: [
this.label || "INBOX",
labelIds: this.labels || [
"INBOX",
],
},
});
Expand Down Expand Up @@ -396,14 +412,18 @@
};
},
filterHistory(history) {
return this.label
? history.filter(
(item) =>
item.messagesAdded?.length &&
item.messagesAdded[0].message.labelIds &&
item.messagesAdded[0].message.labelIds.includes(this.label),
)
: history.filter((item) => item.messagesAdded?.length);
let filteredHistory = history.filter((item) => item.messagesAdded?.length);
if (this.labels) {
filteredHistory = filteredHistory.filter((item) =>
item.messagesAdded[0].message.labelIds &&
item.messagesAdded[0].message.labelIds.some((i) => this.labels.includes(i)));
}
if (this.excludeLabels) {
filteredHistory = filteredHistory.filter((item) =>
item.messagesAdded[0].message.labelIds &&
!(item.messagesAdded[0].message.labelIds.some((i) => this.excludeLabels.includes(i))));
}
return filteredHistory;
},
async getMessageDetails(ids) {
const messages = await Promise.all(ids.map(async (id) => {
Expand All @@ -419,14 +439,19 @@
}));
return messages;
},
getHistoryResponse(startHistoryId) {
return this.gmail.listHistory({
startHistoryId,
historyTypes: [
"messageAdded",
],
labelId: this.label,
});
async getHistoryResponses(startHistoryId) {
const historyResponses = [];
for (const labelId of this.labels) {
const response = await this.gmail.listHistory({
startHistoryId,
historyTypes: [
"messageAdded",
],
labelId,
});
historyResponses.push(response);
}
return historyResponses;
},
},
async run(event) {
Expand Down Expand Up @@ -495,9 +520,9 @@
console.log("Using startHistoryId:", startHistoryId);

// Fetch the history
let historyResponse;
let historyResponses;
try {
historyResponse = await this.getHistoryResponse(startHistoryId);
historyResponses = await this.getHistoryResponses(startHistoryId);
} catch {
// catch error thrown if startHistoryId is invalid or expired

Expand All @@ -507,19 +532,20 @@
// set startHistoryId to the historyId received from the webhook
startHistoryId = parseInt(receivedHistoryId);
console.log("Using startHistoryId:", startHistoryId);
historyResponse = await this.getHistoryResponse(startHistoryId);
historyResponses = await this.getHistoryResponses(startHistoryId);
}

console.log(
"History response:",
JSON.stringify(historyResponse, null, 2),
"History responses:",
JSON.stringify(historyResponses, null, 2),
);

// Process history to find new messages
const newMessages = [];
if (historyResponse.history) {
for (const historyItem of historyResponse.history) {
if (historyItem.messagesAdded) {
for (const historyResponse of historyResponses) {
if (historyResponse.history) {
const historyResponseFiltered = this.filterHistory(historyResponse.history);
for (const historyItem of historyResponseFiltered) {
newMessages.push(
...historyItem.messagesAdded.map((msg) => msg.message),
);
Expand All @@ -540,7 +566,10 @@
console.log("Fetched message details count:", messageDetails.length);

// Store the latest historyId in the db
const latestHistoryId = historyResponse.historyId || receivedHistoryId;
let latestHistoryId = receivedHistoryId;
for (const historyResponse of historyResponses) {
latestHistoryId = Math.max(latestHistoryId, historyResponse.historyId);
}
this._setLastProcessedHistoryId(latestHistoryId);
console.log("Updated lastProcessedHistoryId:", latestHistoryId);

Expand Down
14 changes: 8 additions & 6 deletions components/gmail/sources/new-labeled-email/new-labeled-email.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,18 @@ export default {
name: "New Labeled Email",
description: "Emit new event when a new email is labeled.",
type: "source",
version: "0.0.6",
version: "0.0.7",
dedupe: "unique",
props: {
...common.props,
gmail,
label: {
labels: {
propDefinition: [
gmail,
"label",
],
type: "string[]",
label: "Labels",
},
},
methods: {
Expand All @@ -30,17 +32,17 @@ export default {
},
generateMeta(message) {
return {
id: `${message.id}-${this.label}`,
summary: `A new message with ID: ${message.id} was labeled with "${this.label}"`,
id: `${message.id}-${message.historyId}`,
summary: `A new message with ID: ${message.id} was labeled"`,
ts: +message.internalDate,
};
},
filterHistory(history) {
return history.filter((item) =>
(item.labelsAdded && item.labelsAdded[0].labelIds.includes(this.label))
(item.labelsAdded && item.labelsAdded[0].labelIds.some((i) => this.labels.includes(i)))
|| (item.messagesAdded
&& item.messagesAdded[0].message.labelIds
&& item.messagesAdded[0].message.labelIds.includes(this.label)));
&& item.messagesAdded[0].message.labelIds.some((i) => this.labels.includes(i))));
},
},
sampleEmit,
Expand Down
3 changes: 1 addition & 2 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading