- 
                Notifications
    You must be signed in to change notification settings 
- Fork 0
SERV-505 #26
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
SERV-505 #26
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
| @@ -0,0 +1,159 @@ | ||
| package edu.ucla.library.avpairtree; | ||
|  | ||
| import java.util.Set; | ||
|  | ||
| import info.freelibrary.util.Logger; | ||
| import info.freelibrary.util.LoggerFactory; | ||
|  | ||
| import io.vertx.core.Future; | ||
| import io.vertx.core.Promise; | ||
| import io.vertx.core.Vertx; | ||
|  | ||
| import io.vertx.core.shareddata.AsyncMap; | ||
|  | ||
| /** | ||
| * A class that adds and removes jobs from a shared map | ||
| */ | ||
| public class JobsQueue { | ||
| /** | ||
| * The watcher verticle's logger. | ||
| */ | ||
| private static final Logger LOGGER = LoggerFactory.getLogger(JobsQueue.class, MessageCodes.BUNDLE); | ||
|  | ||
| /** | ||
| * Name of shared map | ||
| */ | ||
| private static final String MY_JOBS_QUEUE = "jobs-queue"; | ||
|  | ||
| /** | ||
| * Copy of VertX instance | ||
| */ | ||
| private final Vertx myVertx; | ||
|  | ||
| /** | ||
| * Accesses the jobs queue shared data map | ||
| * | ||
| * @param aVertx A Vert.x instance | ||
| */ | ||
| public JobsQueue(final Vertx aVertx) { | ||
| myVertx = aVertx; | ||
| } | ||
|  | ||
| /** | ||
| * Add job to jobs queue | ||
| * | ||
| * @param aItem A string of the map name | ||
| * @return A future completing task with the map result | ||
| */ | ||
| public Future<Void> addJobToQueue(final CsvItem aItem) { | ||
| return getSharedMap(MY_JOBS_QUEUE).compose(jobsMap -> { | ||
| return putJobInMap(jobsMap, aItem); | ||
| }); | ||
| } | ||
|  | ||
| /** | ||
| * Remove jobs from job queue | ||
| * | ||
| * @param aArk A string of the map name | ||
| * @return A future completing task with the map result | ||
| */ | ||
| public Future<Void> removeJobInQueue(final String aArk) { | ||
| return getSharedMap(MY_JOBS_QUEUE).compose(map -> { | ||
| return getKeys(map).compose(jobs -> { | ||
| return deleteKeyInMap(map, jobs, aArk); | ||
| }); | ||
| }); | ||
| } | ||
|  | ||
| /** | ||
| * Retrieved shared local async data map. | ||
| * | ||
| * @param aMapName A string of the map name | ||
| * @return A future completing task with the map result | ||
| */ | ||
| private Future<AsyncMap<String, CsvItem>> getSharedMap(final String aMapName) { | ||
| final Promise<AsyncMap<String, CsvItem>> promise = Promise.promise(); | ||
|  | ||
| myVertx.sharedData().<String, CsvItem>getLocalAsyncMap(aMapName, getMap -> { | ||
| if (getMap.succeeded()) { | ||
| final AsyncMap<String, CsvItem> jobsQueue = getMap.result(); | ||
| promise.complete(jobsQueue); | ||
| } else { | ||
| LOGGER.error(MessageCodes.AVPT_027); | ||
| promise.fail(getMap.cause()); | ||
| } | ||
| }); | ||
|  | ||
| return promise.future(); | ||
|         
                  ksclarke marked this conversation as resolved.
              Show resolved
            Hide resolved | ||
| } | ||
|  | ||
| /** | ||
| * | ||
| * @param aJobsQueue A string of the map name | ||
| * @param aItem A CsvItem | ||
| * @return A future completing task | ||
| */ | ||
| private Future<Void> putJobInMap(final AsyncMap<String, CsvItem> aJobsQueue, final CsvItem aItem) { | ||
| final Promise<Void> promise = Promise.promise(); | ||
|  | ||
| aJobsQueue.put(aItem.getItemARK(), aItem, put -> { | ||
| if (put.succeeded()) { | ||
| promise.complete(); | ||
| } else { | ||
| LOGGER.error(MessageCodes.AVPT_028); | ||
| promise.fail(put.cause()); | ||
| } | ||
| }); | ||
|  | ||
| return promise.future(); | ||
| } | ||
|  | ||
| /** | ||
| * | ||
| * @param aJobsQueue A string of the map name | ||
| * @return A future completing task with the Jobs | ||
| */ | ||
| private Future<Set<String>> getKeys(final AsyncMap<String, CsvItem> aJobsQueue) { | ||
| final Promise<Set<String>> promise = Promise.promise(); | ||
| aJobsQueue.keys(keyCheck -> { | ||
| if (keyCheck.succeeded()) { | ||
| final Set<String> jobs = keyCheck.result(); | ||
| promise.complete(jobs); | ||
| } else { | ||
| LOGGER.error(MessageCodes.AVPT_029); | ||
| promise.fail(keyCheck.cause()); | ||
| } | ||
| }); | ||
|  | ||
| return promise.future(); | ||
| } | ||
|  | ||
| /** | ||
| * | ||
| * @param aJobsQueue A string of the map name | ||
| * @param aJobs A set of the job being removed | ||
| * @param aArk A string of the ark being removed | ||
| * @return A future completing task | ||
| */ | ||
| private Future<Void> deleteKeyInMap(final AsyncMap<String, CsvItem> aJobsQueue, | ||
| final Set<String> aJobs, final String aArk) { | ||
|  | ||
| final Promise<Void> promise = Promise.promise(); | ||
|  | ||
| if (aJobs.contains(aArk)) { | ||
| aJobsQueue.remove(aArk, deleteJob -> { | ||
| if (deleteJob.succeeded()) { | ||
| promise.complete(); | ||
| } else { | ||
| LOGGER.error(MessageCodes.AVPT_031); | ||
| promise.fail(deleteJob.cause()); | ||
| } | ||
| }); | ||
| } else { | ||
| LOGGER.error(MessageCodes.AVPT_030); | ||
| promise.fail(MessageCodes.AVPT_030); | ||
| } | ||
|  | ||
| return promise.future(); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -123,6 +123,7 @@ private void configureServer(final JsonObject aConfig, final Promise<Void> aProm | |
|  | ||
| // Associate handlers with operation IDs from the application's OpenAPI specification | ||
| routerBuilder.operation(Op.GET_STATUS).handler(new StatusHandler(getVertx())); | ||
| //add for JobsStatusHandler | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Assuming this is a "to-do", it would be good to explicitly mark it as such by prefixing with  | ||
|  | ||
| // Create the application server | ||
| myServer = vertx.createHttpServer(serverOptions).requestHandler(routerBuilder.createRouter()); | ||
|  | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -31,6 +31,7 @@ | |
| import edu.ucla.library.avpairtree.CsvItem; | ||
| import edu.ucla.library.avpairtree.MessageCodes; | ||
| import edu.ucla.library.avpairtree.Op; | ||
| import edu.ucla.library.avpairtree.JobsQueue; | ||
|  | ||
| import io.vertx.core.AbstractVerticle; | ||
| import io.vertx.core.CompositeFuture; | ||
|  | @@ -57,10 +58,16 @@ public class WatcherVerticle extends AbstractVerticle { | |
| */ | ||
| private static final String SUBSTITUTION_PATTERN = "{}"; | ||
|  | ||
| /** | ||
| * JobsQueue | ||
| */ | ||
| private JobsQueue myJobsQueue; | ||
|  | ||
| @Override | ||
| public void start(final Promise<Void> aPromise) { | ||
| final DeliveryOptions options = new DeliveryOptions().setSendTimeout(Integer.MAX_VALUE); | ||
| final Vertx vertx = getVertx(); | ||
| myJobsQueue = new JobsQueue(vertx); | ||
| final EventBus eventBus = vertx.eventBus(); | ||
|  | ||
| // Consume messages containing a path location to an uploaded CSV file | ||
|  | @@ -100,6 +107,8 @@ public void start(final Promise<Void> aPromise) { | |
| .filter(result -> result.body().getClass().equals(CsvItem.class)).map(msg -> { | ||
| final CsvItem item = (CsvItem) msg.body(); | ||
|  | ||
| myJobsQueue.addJobToQueue(item); | ||
|  | ||
| 
      Comment on lines
    
      +110
     to 
      +111
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this statement is misplaced. Here, it will be executed when all of the tasks in the  | ||
| LOGGER.info(MessageCodes.AVPT_009, item.getItemARK()); | ||
| return item; | ||
| }).collect(Collectors.toMap(CsvItem::getItemARK, item -> item)); | ||
|  | @@ -204,6 +213,9 @@ private Future<String> updateCSV(final String aCsvFilePath, final Map<String, Cs | |
| if (accessUrlIndex == index) { | ||
| if (aCsvItemMap.containsKey(ark)) { | ||
| row[index] = constructAccessURL(csvItem); | ||
|  | ||
| myJobsQueue.removeJobInQueue(ark); | ||
|  | ||
| 
      Comment on lines
    
      +216
     to 
      +218
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wondering if this one is also misplaced. Is there a reason not to run this for each record (i.e., unconditionally in each loop iteration)? | ||
| } else if (originalAccessUrlIndex != -1) { | ||
| // Don't overwrite what was already there (e.g. in the case of images) | ||
| row[index] = originalRow.get(index + 1); | ||
|  | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -32,5 +32,13 @@ | |
| <entry key="AVPT_024">Must provide an object key for the data</entry> | ||
| <entry key="AVPT_025">The environment variable AWS_ENDPOINT_URL must be set</entry> | ||
| <entry key="AVPT_026">Configuring S3 bucket: {} [region: {}]</entry> | ||
| <entry key="AVPT_027">Failure to retrieve shared map</entry> | ||
| <entry key="AVPT_028">Put job in Shared Map failed</entry> | ||
| <entry key="AVPT_029">Failure to retrieve keys from Sharedmap</entry> | ||
| <entry key="AVPT_030">Job was not found in sharedMap</entry> | ||
| <entry key="AVPT_031">Failure to remove job from sharedmap</entry> | ||
| 
      Comment on lines
    
      +35
     to 
      +39
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be good to pick one way of writing shared map (space vs. no space, casing) so that e.g. it's predictable for someone searching through the logs for these messages. | ||
|  | ||
|  | ||
|  | ||
|  | ||
| </properties> | ||
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -29,7 +29,10 @@ public class CsvItemTest { | |
| */ | ||
| @Test | ||
| public void testSetPathRoot() { | ||
| assertEquals(PATH_ROOT, new CsvItem().setPathRoot(SOUL_WAV).getPathRoot()); | ||
| final CsvItem csvItem = new CsvItem(); | ||
|  | ||
| csvItem.setPathRoot(PATH_ROOT); | ||
| assertEquals(PATH_ROOT, csvItem.getPathRoot()); | ||
| 
      Comment on lines
    
      -32
     to 
      +35
    
   There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why the change in argument passed to  | ||
| } | ||
|  | ||
| /** | ||
|  | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's fine if you like, but static field names don't need to start with "my" (only instance fields). Just FYI.
You could also change the visibility to public so that the test could directly reference it rather than redefine it 🤷🏻