Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 119 additions & 11 deletions backend/btrixcloud/crawlconfigs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,15 @@

# pylint: disable=too-many-lines

from typing import List, Optional, TYPE_CHECKING, cast, Dict, Tuple, Annotated, Union
from typing import (
List,
Optional,
TYPE_CHECKING,
cast,
Dict,
Tuple,
Annotated,
)

import asyncio
import json
Expand All @@ -17,10 +25,14 @@

import aiohttp
from fastapi import APIRouter, Depends, HTTPException, Query
from fastapi.responses import StreamingResponse
import pymongo

from .pagination import DEFAULT_PAGE_SIZE, paginated_format
from .models import (
BatchCrawlRunOut,
BatchFilter,
BatchTotalOut,
CrawlConfigIn,
ConfigRevision,
CrawlConfig,
Expand Down Expand Up @@ -687,8 +699,8 @@ async def update_usernames(self, userid: UUID, updated_name: str) -> None:
async def get_crawl_configs(
self,
org: Organization,
page_size: int = DEFAULT_PAGE_SIZE,
page: int = 1,
page_size: int | None = DEFAULT_PAGE_SIZE,
page: int | None = 1,
created_by: Optional[UUID] = None,
modified_by: Optional[UUID] = None,
profile_ids: Optional[List[UUID]] = None,
Expand All @@ -705,10 +717,12 @@ async def get_crawl_configs(
"""Get all crawl configs for an organization is a member of"""
# pylint: disable=too-many-locals,too-many-branches,too-many-statements
# Zero-index page for query
page = page - 1
skip = page * page_size
page = page - 1 if page is not None else 1
skip = page * page_size if page_size is not None else 0

match_query = {"oid": org.id, "inactive": {"$ne": True}}
match_query: dict[
str, object | str | int | list[dict[str, object | str | int]]
] = {"oid": org.id, "inactive": {"$ne": True}}

if tags:
query_type = "$all" if tag_match == ListFilterType.AND else "$in"
Expand Down Expand Up @@ -739,7 +753,7 @@ async def get_crawl_configs(
match_query["isCrawlRunning"] = is_crawl_running

# pylint: disable=duplicate-code
aggregate: List[Dict[str, Union[object, str, int]]] = [
aggregate: list[dict[str, object | str | int]] = [
{"$match": match_query},
{"$unset": ["config"]},
]
Expand Down Expand Up @@ -770,14 +784,15 @@ async def get_crawl_configs(

aggregate.extend([{"$sort": sort_query}])

items_stages = [{"$skip": skip}]
if page_size is not None:
items_stages.append({"$limit": page_size})

aggregate.extend(
[
{
"$facet": {
"items": [
{"$skip": skip},
{"$limit": page_size},
],
"items": items_stages,
"total": [{"$count": "count"}],
}
},
Expand All @@ -804,6 +819,64 @@ async def get_crawl_configs(

return configs, total

async def run_crawls_by_filters(
self,
org: Organization,
user: User,
created_by: UUID | None = None,
modified_by: UUID | None = None,
profile_ids: list[UUID] | None = None,
first_seed: str | None = None,
name: str | None = None,
description: str | None = None,
tags: list[str] | None = None,
tag_match: ListFilterType | None = ListFilterType.AND,
schedule: bool | None = None,
is_crawl_running: bool | None = None,
):
crawl_configs, total = await self.get_crawl_configs(
org,
created_by=created_by,
modified_by=modified_by,
profile_ids=profile_ids,
first_seed=first_seed,
name=name,
description=description,
tags=tags,
tag_match=tag_match,
schedule=schedule,
is_crawl_running=is_crawl_running,
page_size=None,
)
yield BatchTotalOut(total=total).model_dump_json(indent=None) + "\n"
print(
f"Got {total} crawl configs with filter params: {created_by}, {modified_by}, {profile_ids}, {first_seed}, {name}, {description}, {tags}, {tag_match}, {schedule}, {is_crawl_running}"
)

async def run_crawl_with_metadata(config: CrawlConfigOut):
"""Helper that runs crawl and returns metadata with result"""
try:
crawl_id = await self.run_now(config.id, org, user)
return BatchCrawlRunOut(crawl_id=crawl_id, success=True)
except Exception as e:
print(f"Error running crawl for config {config.id}: {e}")
return BatchCrawlRunOut(
crawl_id=str(config.id), success=False, error=str(e)
)

async with asyncio.TaskGroup() as tg:
tasks = [
tg.create_task(run_crawl_with_metadata(config))
for config in crawl_configs
]

completed = 0
for task in asyncio.as_completed(tasks):
result = await task
completed += 1
result.position = completed
yield result.model_dump_json(indent=None) + "\n"

async def is_profile_in_use(self, profileid: UUID, org: Organization) -> bool:
"""return true/false if any active workflows exist with given profile"""
res = await self.crawl_configs.find_one(
Expand Down Expand Up @@ -1674,6 +1747,41 @@ async def validate_custom_behavior(
):
return await ops.validate_custom_behavior(behavior.customBehavior)

# GROUP ACTIONS

@router.post("/batch/run")
async def run_multiple(
filter: BatchFilter,
org: Organization = Depends(org_crawl_dep),
user: User = Depends(user_dep),
):
if filter.first_seed:
filter.first_seed = urllib.parse.unquote(filter.first_seed)

if filter.name:
filter.name = urllib.parse.unquote(filter.name)

if filter.description:
filter.description = urllib.parse.unquote(filter.description)

return StreamingResponse(
ops.run_crawls_by_filters(
org=org,
user=user,
created_by=filter.created_by,
modified_by=filter.modified_by,
profile_ids=filter.profile_ids,
first_seed=filter.first_seed,
name=filter.name,
description=filter.description,
tags=filter.tags,
tag_match=filter.tag_match,
schedule=filter.schedule,
is_crawl_running=filter.is_crawl_running,
),
media_type="application/jsonl",
)

org_ops.router.include_router(router)

return ops
51 changes: 51 additions & 0 deletions backend/btrixcloud/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3096,3 +3096,54 @@ class ListFilterType(str, Enum):

OR = "or"
AND = "and"


# BATCH ACTIONS
# ============================================================================


class BatchFilter(BaseModel):
"""Base model for batch filters"""

created_by: Annotated[
UUID | None, Field(alias="createdBy", title="Created By User ID")
] = None
modified_by: Annotated[
UUID | None, Field(alias="modifiedBy", title="Modified By User ID")
] = None
profile_ids: Annotated[
list[UUID] | None, Field(alias="profileIds", title="Profile IDs")
] = None
first_seed: Annotated[str | None, Field(alias="firstSeed", title="First Seed")] = (
None
)
name: str | None = None
description: str | None = None
tags: list[str] | None = None
tag_match: Annotated[
ListFilterType | None,
Field(
alias="tagMatch",
title="Tag Match Type",
description='Defaults to `"and"` if omitted',
),
] = ListFilterType.AND
schedule: bool | None = None
is_crawl_running: Annotated[
bool | None, Field(alias="isCrawlRunning", title="Is Crawl Running")
] = None


class BatchTotalOut(BaseModel):
"""Response model for batch total"""

total: int


class BatchCrawlRunOut(BaseModel):
"""Response model for crawl runs"""

error: str | None = None
crawl_id: str
success: bool
position: int | None = None
48 changes: 48 additions & 0 deletions frontend/src/controllers/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,30 @@ export enum AbortReason {
RequestTimeout = "request-timeout",
}

function splitStream(splitOn: string) {
let buffer = "";

return new TransformStream({
transform(chunk, controller) {
buffer += chunk;
const parts = buffer.split(splitOn);
parts.slice(0, -1).forEach((part) => controller.enqueue(part));
buffer = parts[parts.length - 1];
},
flush(controller) {
if (buffer) controller.enqueue(buffer);
},
});
}

function parseJSON<T>() {
return new TransformStream<string, T>({
transform(chunk, controller) {
controller.enqueue(JSON.parse(chunk) as T);
},
});
}

/**
* Utilities for interacting with the Browsertrix backend API
*
Expand Down Expand Up @@ -261,4 +285,28 @@ export class APIController implements ReactiveController {

this.onUploadProgress.cancel();
}

async fetchStream<
T = unknown,
Body extends RequestInit["body"] | object = undefined,
>(path: string, options?: Omit<RequestInit, "body"> & { body?: Body }) {
const mergedOptions: RequestInit | undefined = options as
| RequestInit
| undefined;
if (options?.body) {
mergedOptions!.body = JSON.stringify(options.body);
}
const response = await fetch(path, mergedOptions);
if (!response.ok) {
throw new APIError({
message: response.statusText,
status: response.status,
});
}
const reader = response.body;
if (!reader) {
throw new Error("Response body is not readable");
}
return reader.pipeThrough(splitStream("\n")).pipeThrough(parseJSON<T>());
}
}
51 changes: 49 additions & 2 deletions frontend/src/pages/org/workflows-list.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ type Sort = {
direction: SortDirection;
};

type FilterBy = {
[K in keyof ListWorkflow]: ListWorkflow[K] extends string ? string : boolean;
};
type BoolFilterKeys = {
[K in keyof FilterBy]: FilterBy[K] extends boolean ? K : never;
}[keyof FilterBy];

const FILTER_BY_CURRENT_USER_STORAGE_KEY =
"btrix.filterByCurrentUser.crawlConfigs";
const INITIAL_PAGE_SIZE = 10;
Expand Down Expand Up @@ -128,7 +135,7 @@ export class WorkflowsList extends BtrixElement {
private orderBy: Sort = DEFAULT_SORT;

@state()
private filterBy: Partial<{ [k in keyof ListWorkflow]: boolean }> = {};
private filterBy: Partial<FilterBy> = {};

@state()
private filterByCurrentUser = false;
Expand Down Expand Up @@ -236,7 +243,7 @@ export class WorkflowsList extends BtrixElement {

// Convert string bools to filter values
if (value === "true") {
filterBy[key as keyof typeof filterBy] = true;
filterBy[key as BoolFilterKeys] = true;
} else if (value === "false") {
filterBy[key as keyof typeof filterBy] = false;
} else {
Expand Down Expand Up @@ -1158,4 +1165,44 @@ export class WorkflowsList extends BtrixElement {
);
return data;
}

private async runBatchWorkflows() {
const data = await this.api.fetchStream<
BatchWorkflowUpdate,
BatchWorkflowFilter
>(`/orgs/${this.orgId}/crawlconfigs/batch/run`, {
method: "POST",
body: {
...this.filterBy,
createdBy: this.filterByCurrentUser ? this.userInfo?.id : undefined,
tags: this.filterByTags,
tagMatch: this.filterByTagsType,
profileIds: this.filterByProfiles || undefined,
},
});
return data;
}
}

type BatchWorkflowFilter = {
createdBy?: string;
modifiedBy?: string;
profileIds?: string[];
firstSeed?: string;
name?: string;
description?: string;
tags?: string[];
tagMatch?: "and" | "or";
schedule?: boolean;
isCrawlRunning?: boolean;
};
type BatchWorkflowStart = {
total: number;
};
type BatchWorkflowProgress = {
error?: string;
crawl_id: string;
success: boolean;
position: number;
};
type BatchWorkflowUpdate = BatchWorkflowStart | BatchWorkflowProgress;
Loading