From bb380172ad8bd62e6a16c4206017d0fa99702f76 Mon Sep 17 00:00:00 2001 From: angelahuqing Date: Thu, 19 Jan 2023 00:40:17 -0800 Subject: [PATCH 1/3] # This is a combination of 8 commits. # This is the 1st commit message: SERV-505 Added jobsQueue to watcherVerticle # This is the commit message #2: [SERV-505] Addition of JobsQueue Class and Test # This is the commit message #3: [SERV-505] Consistency update to CsvItemTest.java # This is the commit message #4: [SERV-505] Cleaned up JobsQueue code # This is the commit message #5: [SERV-505] Completion of JobsQueue functionality and added to WatcherVerticle # This is the commit message #6: [SERV-505] Added in messages # This is the commit message #7: [SERV-505] Fix checkstyle and delete unnecessary files # This is the commit message #8: [SERV-505] Fixed checkstyle violations --- src/.DS_Store | Bin 0 -> 6148 bytes src/main/.DS_Store | Bin 0 -> 6148 bytes src/main/java/.DS_Store | Bin 0 -> 6148 bytes src/main/java/edu/.DS_Store | Bin 0 -> 6148 bytes src/main/java/edu/ucla/.DS_Store | Bin 0 -> 6148 bytes src/main/java/edu/ucla/library/.DS_Store | Bin 0 -> 6148 bytes .../edu/ucla/library/avpairtree/.DS_Store | Bin 0 -> 6148 bytes .../edu/ucla/library/avpairtree/CsvItem.java | 9 +- .../ucla/library/avpairtree/JobsQueue.java | 159 ++++++++++++++++++ .../avpairtree/verticles/MainVerticle.java | 1 + .../avpairtree/verticles/WatcherVerticle.java | 12 ++ src/main/resources/av-pairtree_messages.xml | 8 + src/main/tools/.DS_Store | Bin 0 -> 6148 bytes .../ucla/library/avpairtree/CsvItemTest.java | 5 +- .../library/avpairtree/JobsQueueTest.java | 137 +++++++++++++++ .../avpairtree/utils/TestConstants.java | 5 + 16 files changed, 334 insertions(+), 2 deletions(-) create mode 100644 src/.DS_Store create mode 100644 src/main/.DS_Store create mode 100644 src/main/java/.DS_Store create mode 100644 src/main/java/edu/.DS_Store create mode 100644 src/main/java/edu/ucla/.DS_Store create mode 100644 src/main/java/edu/ucla/library/.DS_Store create mode 100644 src/main/java/edu/ucla/library/avpairtree/.DS_Store create mode 100644 src/main/java/edu/ucla/library/avpairtree/JobsQueue.java create mode 100644 src/main/tools/.DS_Store create mode 100644 src/test/java/edu/ucla/library/avpairtree/JobsQueueTest.java diff --git a/src/.DS_Store b/src/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..a5c80643ca7e44adbcbf20d60d5c839452fac541 GIT binary patch literal 6148 zcmeHK%}(4f5FRfnB&ra5Kq|*vdPCLnBPtF^>9Rd=LkfaJL6ER3h!kS9$+BvfMR|?( zCHfpZ0gpq!vCWF#D}ri9vcK_o#*RNnwugvRd(d4XY7&u;!q^Bsxjq# zn4a;2zz=4FgZNYr<0`2JWjpEr;MR6hou*FT(K?^#NxGZ$hKJ4hO;m zqs#HtZ@U)?KdO;Ahj;J?%4C5TalgoQaRM9rdsI=6N~#_a^cQ{i2|`mu7!U@8f%m|G zI|}`e@8M(R8-)R3;8hvm^C3iGj2zY$&C!9%qyRvEMjC-`p8mj)4Zz4@Z4nlTvY|j5 zs$7bpT;`x`IQk>!Ut6@{q|D41$IYxwku z7$hxWKp6P17*Jj(?zFKaceg%S9N)De$`J|+`_&dFAyCX!3}3#A7f_7A9)pC@Tv^_11}_I$^ZZW literal 0 HcmV?d00001 diff --git a/src/main/.DS_Store b/src/main/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..a1af1687890a5bda93fb81588de3c98862c6e278 GIT binary patch literal 6148 zcmeHK%}(1u5S|U8v4yJUPzf%TeCZ8U;YUB$?lvM0Tv65TM6=)Q>^E!qTgU4qBGDbi8$@j)a^Q>&7tKA!pJdJ_aHctOzO@waH|2r?c*D zZNHwbdy_Yth_|8heYx?rcl7P-`{aCjaiwmR1RiZ=&kX*7YXmbPPJ>~d$@~~O z=DUzGp@Jlh5cN5pmrA?<>#n3Nu>0VqG{9e|&c3X%m%)+vWedz-H72^&tO8a6t3X`= z^I~&W>I1fK6|f5Y_X_a)!GkmU1}lx~)qzGG0f0@Ul_BO2My~M;`UWeF7=a003e=^- z95IA0N8U3$-(aOtmy%n3!Pt7Cpo(Mk9kZEF><3d}38q`Ph2|KC4+{+}G_SAuz z)-d0a+$4CS!=ldC)%m+kf+U8^@< zyxj79-}7Jl=h3B^MrB-1#=UrSN>88T@-kuji!u4qj+3w1a5`wdJu*cZXT@-2le2V) zkh7DlNX@ioCPg~7>qKtCzG%?gna%bO+pVB`@NV7;X7Bf5A9N4rbKl$E?R@B8{J6fE z-OhjAxt-AXNUgkKT);gHF7IbFDsoeNhi4JJh!PYAgaKh-V;JBA&U>{n6-gn&fH3e_ z4AA)yKnX*}%Aq|vFxVCV*nry#Z2R*M49Ect87qhIK(tE*x>U_AhUVTuyL8Nl9A7zf z=}Ox-V;z38<_<-3>!AZ1u5{>7Mqxl0SY?2EKS-1A|4;w#|80;A!hkTanGER0adg~6 zZG5+`RY`Yk0KI^cP+aA3nF7OJ#fYV=cn4|){(uZ%$XGdq2O>WLmIfJwfhT3)0sl@} A3IG5A literal 0 HcmV?d00001 diff --git a/src/main/java/edu/.DS_Store b/src/main/java/edu/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..b0cbd7c6d11f292569b510e669484c58eb16b782 GIT binary patch literal 6148 zcmeHK%}T>S5ZZ^Ldz2qEjeR#k|S5W;~H7F-P92+fmzk%IM10VS=`Ow>h` zs0h!rAj|(}fcCB^dg4MPBK)*}JD6L2)1nI*$3^|_jos&$=q8D~S*7wS3sc4EnOVnm z9e3V64X#u_$il4Gt%sdsdfE@OtH|01N+n}Ej1J>=|E#pQtI{lt({@KE$E`L(PLATV zrTTT%OIuyNPGBbVi_S{r!JxKLS@G7^H-{_UV5UQ(|`tE-4FnoM6 zdm-_mYFV&2f@c`x0&jy(nyB;=zES)rj*%H)2AF{fV89-2&fEml!X+>R%)sw5K<9%8 zCG;)k2KCW_LALtBDMM-HHGF*gVgM4D8fNtL$6khXJ>CLQy>jn565bRqSd zv5tMSv>l4HjfeDfxRAa<9+?4V;3EUn`$0-{|L^_W|LY(gm;q*BG8vG?M$o9EHhZ^z wsgmwm4|)zIp}5@OI0c5jiV;g!aT%%wevb^GZ!tFr4}^aNGz~m31AofEJK88(_W%F@ literal 0 HcmV?d00001 diff --git a/src/main/java/edu/ucla/.DS_Store b/src/main/java/edu/ucla/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..a66d1eaf16e4c13bc97647d4800bc5fb9d3943f3 GIT binary patch literal 6148 zcmeHK%}(1u5Z+Bfv4tvfs02q}xS=ZiD5!@@u1gQORWhQ7f`Z*djV0ra5~mTQsr(x4 z8}S4@PWfhbgW3j;AgE@f*>5~Mv+MoV+Vv=rcDQSMNPe)2+;#z)Jt?r;IWVEjYMw|G?Krud4UMb;u(P#6#fgn?(rfIG;% zwP*L{^1{M^F!0a}(D@KR2}6gKMLs&va0vixz|8`iei|t!a2PtQETRXZZ7R^FYFA=t zn+|{A;zEa&MVqd)D?YTFS-T2F^XwQu;Bciwi_!`M!oc4QQ0)h4(*6J8{`#MTq!9*$ zf&Y^M-8hU7BdpEctxKzmG2O(BHcoQOeP_c;$2{Ui9JF}a83A-5~gsk-%YeJNS5Dt{E;KF<%)KB_J3btheDD@bz=!gv3 z#KrO#7@&7IBSO&^ZP<73?;uf^QKBOB)5oy`U4794@0*-x&~4kCr>LF8-K<=GmW7Gp zNo(C;82(mEib!%bglivSkLCfsYJO?FT8*`M>{t{jY=AFaylM|71WG_kz6| z)@IMvPphP})`QleBovn$d`p3&k7C5qQM?4zf_9G_pl>lZh!zO{2xuDEFatlzz$->t BVh8{L literal 0 HcmV?d00001 diff --git a/src/main/java/edu/ucla/library/avpairtree/.DS_Store b/src/main/java/edu/ucla/library/avpairtree/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..03ed13a72b03b02660a3a6035c1018daa62f6d0c GIT binary patch literal 6148 zcmeHK%Wl&^6upxMYEvF5LZTZaZ`f5;`cM&zN^V*fSy9P67Jyni4vmp(N3oL#LXh`- z0WA0hzJo8|TUf!lGn1-uny^C%=1Mc?&dfb`GAEiD4-tt*Kinj$5s`($SiOeo660|$ zGgi?(%o8%l0i~2sMmhO2(Wb*FU=;Y<6yUvkoAzlyN7%I&_iuuBMjP}7dwD>9GfrDU zoCb*J!(tz0>KtguageF+|T=2!|%P|){&og0_{IZ z)A-!?gJ)5DcwD=4kS4hwCGDOhM`0TwFP}$Am<}6hKM6CL$5R7xqT|}yXtcFkUw5{5 z9*ozW(ZenHJKMYCv2ESGzq!{uIeYtV^nU!|6P5)NxJsnm(l~+7a4r#N;PsL?O<42^ zbHXsVPysEMwX!hE3>pQD0{^1|d_FiRjIPE)p*%WJ$tM7?if(D}`SXA|jz(8wp%5c5 zp{PJb75a%G6dnDJj&n5@3KgA%etZZ$v(PUTA!kQ^M~0Jd6`I;8U=)~EU`aJ=eE&b1 z|NfsQnU+z&DDY1yAgYJnVFOFjck9gJ_^x$OPEgo5u23i`sPuI#9lna!P^4kZVF&1H UEEJ*#WserialVersionUID for Job. + */ + private static final long serialVersionUID = -2436320678602312189L; + /** * The property mapped to the item ARK column in the CSV file. */ diff --git a/src/main/java/edu/ucla/library/avpairtree/JobsQueue.java b/src/main/java/edu/ucla/library/avpairtree/JobsQueue.java new file mode 100644 index 0000000..8ed5c0a --- /dev/null +++ b/src/main/java/edu/ucla/library/avpairtree/JobsQueue.java @@ -0,0 +1,159 @@ +package edu.ucla.library.avpairtree; + +import java.util.Set; + +import info.freelibrary.util.Logger; +import info.freelibrary.util.LoggerFactory; + +import io.vertx.core.Future; +import io.vertx.core.Promise; +import io.vertx.core.Vertx; + +import io.vertx.core.shareddata.AsyncMap; + +/** + * A class that adds and removes jobs from a shared map + */ +public class JobsQueue { + /** + * The watcher verticle's logger. + */ + private static final Logger LOGGER = LoggerFactory.getLogger(JobsQueue.class, MessageCodes.BUNDLE); + + /** + * Copy of VertX instance + */ + private final Vertx myVertx; + + /** + * Name of shared map + */ + private final String myJobsQueue = "jobs-queue"; + + /** + * Accesses the jobs queue shared data map + * + * @param aVertx A Vert.x instance + */ + public JobsQueue(final Vertx aVertx) { + myVertx = aVertx; + } + + /** + * Add job to jobs queue + * + * @param aItem A string of the map name + * @return A future completing task with the map result + */ + public Future addJobToQueue(final CsvItem aItem) { + return getSharedMap(myJobsQueue).compose(jobsMap -> { + return putJobInMap(jobsMap, aItem); + }); + } + + /** + * Remove jobs from job queue + * + * @param aArk A string of the map name + * @return A future completing task with the map result + */ + public Future removeJobInQueue(final String aArk) { + return getSharedMap(myJobsQueue).compose(map -> { + return getKeys(map).compose(jobs -> { + return deleteKeyInMap(map, jobs, aArk); + }); + }); + } + + /** + * Retrieved shared local async data map. + * + * @param aMapName A string of the map name + * @return A future completing task with the map result + */ + private Future> getSharedMap(final String aMapName) { + final Promise> promise = Promise.promise(); + + myVertx.sharedData().getLocalAsyncMap(aMapName, getMap -> { + if (getMap.succeeded()) { + final AsyncMap jobsQueue = getMap.result(); + promise.complete(jobsQueue); + } else { + LOGGER.error(MessageCodes.AVPT_027); + promise.fail(getMap.cause()); + } + }); + + return promise.future(); + } + + /** + * + * @param aJobsQueue A string of the map name + * @param aItem A CsvItem + * @return A future completing task + */ + private Future putJobInMap(final AsyncMap aJobsQueue, final CsvItem aItem) { + final Promise promise = Promise.promise(); + + aJobsQueue.put(aItem.getItemARK(), aItem, put -> { + if (put.succeeded()) { + promise.complete(); + } else { + LOGGER.error(MessageCodes.AVPT_028); + promise.fail(put.cause()); + } + }); + + return promise.future(); + } + + /** + * + * @param aJobsQueue A string of the map name + * @return A future completing task with the Jobs + */ + private Future> getKeys(final AsyncMap aJobsQueue) { + final Promise> promise = Promise.promise(); + aJobsQueue.keys(keyCheck -> { + if (keyCheck.succeeded()) { + final Set jobs = keyCheck.result(); + promise.complete(jobs); + } else { + LOGGER.error(MessageCodes.AVPT_029); + promise.fail(keyCheck.cause()); + } + }); + + return promise.future(); + } + + /** + * + * @param aJobsQueue A string of the map name + * @param aJobs A set of the job being removed + * @param aArk A string of the ark being removed + * @return A future completing task + */ + private Future deleteKeyInMap(final AsyncMap aJobsQueue, + final Set aJobs, final String aArk) { + + final Promise promise = Promise.promise(); + + if (aJobs.contains(aArk)) { + aJobsQueue.remove(aArk, deleteJob -> { + if (deleteJob.succeeded()) { + promise.complete(); + } else { + LOGGER.error(MessageCodes.AVPT_031); + promise.fail(deleteJob.cause()); + } + }); + } else { + LOGGER.error(MessageCodes.AVPT_030); + promise.fail(MessageCodes.AVPT_030); + } + + return promise.future(); + } +} diff --git a/src/main/java/edu/ucla/library/avpairtree/verticles/MainVerticle.java b/src/main/java/edu/ucla/library/avpairtree/verticles/MainVerticle.java index a2f5c47..fac397f 100644 --- a/src/main/java/edu/ucla/library/avpairtree/verticles/MainVerticle.java +++ b/src/main/java/edu/ucla/library/avpairtree/verticles/MainVerticle.java @@ -123,6 +123,7 @@ private void configureServer(final JsonObject aConfig, final Promise aProm // Associate handlers with operation IDs from the application's OpenAPI specification routerBuilder.operation(Op.GET_STATUS).handler(new StatusHandler(getVertx())); + //add for JobsStatusHandler // Create the application server myServer = vertx.createHttpServer(serverOptions).requestHandler(routerBuilder.createRouter()); diff --git a/src/main/java/edu/ucla/library/avpairtree/verticles/WatcherVerticle.java b/src/main/java/edu/ucla/library/avpairtree/verticles/WatcherVerticle.java index 4abc59f..f3dd162 100644 --- a/src/main/java/edu/ucla/library/avpairtree/verticles/WatcherVerticle.java +++ b/src/main/java/edu/ucla/library/avpairtree/verticles/WatcherVerticle.java @@ -31,6 +31,7 @@ import edu.ucla.library.avpairtree.CsvItem; import edu.ucla.library.avpairtree.MessageCodes; import edu.ucla.library.avpairtree.Op; +import edu.ucla.library.avpairtree.JobsQueue; import io.vertx.core.AbstractVerticle; import io.vertx.core.CompositeFuture; @@ -57,10 +58,16 @@ public class WatcherVerticle extends AbstractVerticle { */ private static final String SUBSTITUTION_PATTERN = "{}"; + /** + * JobsQueue + */ + private JobsQueue myJobsQueue; + @Override public void start(final Promise aPromise) { final DeliveryOptions options = new DeliveryOptions().setSendTimeout(Integer.MAX_VALUE); final Vertx vertx = getVertx(); + myJobsQueue = new JobsQueue(vertx); final EventBus eventBus = vertx.eventBus(); // Consume messages containing a path location to an uploaded CSV file @@ -100,6 +107,8 @@ public void start(final Promise aPromise) { .filter(result -> result.body().getClass().equals(CsvItem.class)).map(msg -> { final CsvItem item = (CsvItem) msg.body(); + myJobsQueue.addJobToQueue(item); + LOGGER.info(MessageCodes.AVPT_009, item.getItemARK()); return item; }).collect(Collectors.toMap(CsvItem::getItemARK, item -> item)); @@ -204,6 +213,9 @@ private Future updateCSV(final String aCsvFilePath, final MapMust provide an object key for the data The environment variable AWS_ENDPOINT_URL must be set Configuring S3 bucket: {} [region: {}] + Failure to retrieve shared map + Put job in Shared Map failed + Failure to retrieve keys from Sharedmap + Job was not found in sharedMap + Failure to remove job from sharedmap + + + diff --git a/src/main/tools/.DS_Store b/src/main/tools/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..5008ddfcf53c02e82d7eee2e57c38e5672ef89f6 GIT binary patch literal 6148 zcmeH~Jr2S!425mzP>H1@V-^m;4Wg<&0T*E43hX&L&p$$qDprKhvt+--jT7}7np#A3 zem<@ulZcFPQ@L2!n>{z**++&mCkOWA81W14cNZlEfg7;MkzE(HCqgga^y>{tEnwC%0;vJ&^%eQ zLs35+`xjp>T0 future = JobsQueue.addJobToQueue(aItem); + + future.onSuccess(result -> { + vertx.sharedData().getLocalAsyncMap(MAP, getMap -> { + if (getMap.succeeded()) { + final AsyncMap jobsQueue = getMap.result(); + jobsQueue.get(ARK_ADD).onSuccess(res -> { + assertEquals(aItem, res); + asyncTask.complete(); + }); + } else { + aContext.fail(getMap.cause()); + } + }); + }).onFailure(failure -> { + aContext.fail(failure); + }); + } + + /** + * Test removing jobs from shared map + * @param aContext A test context + */ + @Test + public void TestRemoveJobInQueue(final TestContext aContext) { + final CsvItem aItem = new CsvItem(); + final Async asyncTask = aContext.async(); + final Vertx vertx = myContext.vertx(); + + + aItem.setItemARK(ARK_REM); + aItem.setFilePath(SOUL_WAV); + aItem.setPathRoot(PATH_ROOT); + + final JobsQueue JobsQueue = new JobsQueue(vertx); + + //Put item in shared map + vertx.sharedData().getLocalAsyncMap(MAP, getMap -> { + if (getMap.succeeded()) { + final AsyncMap jobsMap = getMap.result(); + + jobsMap.put(ARK_REM, aItem, result -> { + if (result.failed()) { + aContext.fail(result.cause()); + } + }); + + final Future future = JobsQueue.removeJobInQueue(ARK_REM); + jobsMap.get(ARK_REM).onFailure(failure -> { + aContext.fail(); + }).onSuccess(res -> { + assertEquals(res, null); + asyncTask.complete(); + }); + } else { + aContext.fail(getMap.cause()); + } + }); + } +} diff --git a/src/test/java/edu/ucla/library/avpairtree/utils/TestConstants.java b/src/test/java/edu/ucla/library/avpairtree/utils/TestConstants.java index 950c9b3..5ed6514 100644 --- a/src/test/java/edu/ucla/library/avpairtree/utils/TestConstants.java +++ b/src/test/java/edu/ucla/library/avpairtree/utils/TestConstants.java @@ -35,6 +35,11 @@ public final class TestConstants { */ public static final String SOUL = "soul_collection_2021_03_22.csv"; + /* + * A smaller sample audio CSV + */ + public static final String SOUL_SAMPLE = "job_map_test.csv"; + /** * The file name property from the JSON message format. */ From 8aba5e0181d27a024711fa09a0527ac4fe5c2ded Mon Sep 17 00:00:00 2001 From: angelahuqing Date: Fri, 17 Mar 2023 10:44:49 -0700 Subject: [PATCH 2/3] [SERV-505] Addition of JobsQueue and testing --- .../edu/ucla/library/avpairtree/JobsQueue.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/main/java/edu/ucla/library/avpairtree/JobsQueue.java b/src/main/java/edu/ucla/library/avpairtree/JobsQueue.java index 8ed5c0a..affcfac 100644 --- a/src/main/java/edu/ucla/library/avpairtree/JobsQueue.java +++ b/src/main/java/edu/ucla/library/avpairtree/JobsQueue.java @@ -21,14 +21,14 @@ public class JobsQueue { private static final Logger LOGGER = LoggerFactory.getLogger(JobsQueue.class, MessageCodes.BUNDLE); /** - * Copy of VertX instance + * Name of shared map */ - private final Vertx myVertx; + private static final String MY_JOBS_QUEUE = "jobs-queue"; - /** - * Name of shared map + /** + * Copy of VertX instance */ - private final String myJobsQueue = "jobs-queue"; + private final Vertx myVertx; /** * Accesses the jobs queue shared data map @@ -46,7 +46,7 @@ public JobsQueue(final Vertx aVertx) { * @return A future completing task with the map result */ public Future addJobToQueue(final CsvItem aItem) { - return getSharedMap(myJobsQueue).compose(jobsMap -> { + return getSharedMap(MY_JOBS_QUEUE).compose(jobsMap -> { return putJobInMap(jobsMap, aItem); }); } @@ -58,7 +58,7 @@ public Future addJobToQueue(final CsvItem aItem) { * @return A future completing task with the map result */ public Future removeJobInQueue(final String aArk) { - return getSharedMap(myJobsQueue).compose(map -> { + return getSharedMap(MY_JOBS_QUEUE).compose(map -> { return getKeys(map).compose(jobs -> { return deleteKeyInMap(map, jobs, aArk); }); From 43bb1acf408d9b3503e41216aad3eb33a31ec80c Mon Sep 17 00:00:00 2001 From: "Kevin S. Clarke" Date: Thu, 15 Jun 2023 10:57:15 -0400 Subject: [PATCH 3/3] Update src/main/java/edu/ucla/library/avpairtree/CsvItem.java Co-authored-by: Mark A. Matney, Jr. --- src/main/java/edu/ucla/library/avpairtree/CsvItem.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/edu/ucla/library/avpairtree/CsvItem.java b/src/main/java/edu/ucla/library/avpairtree/CsvItem.java index 966c59b..38f93b7 100644 --- a/src/main/java/edu/ucla/library/avpairtree/CsvItem.java +++ b/src/main/java/edu/ucla/library/avpairtree/CsvItem.java @@ -74,7 +74,7 @@ public class CsvItem implements Serializable{ private static final String PROCESSING_RESULT = "Processed"; /** - * The serialVersionUID for Job. + * The serialVersionUID for CsvItem. */ private static final long serialVersionUID = -2436320678602312189L;