Skip to content

Commit f783a56

Browse files
authored
Merge pull request #8 from remeike/create-scheduler-thread
Create separate scheduler thread
2 parents 453e4b2 + 88e0b2d commit f783a56

File tree

2 files changed

+24
-12
lines changed

2 files changed

+24
-12
lines changed

src/System/Hworker.hs

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ module System.Hworker
6464
, createWith
6565
, destroy
6666
, worker
67+
, scheduler
6768
, monitor
6869
-- * Queuing Jobs
6970
, queue
@@ -837,15 +838,12 @@ worker hw =
837838
delayAndRun
838839

839840

840-
-- | Start a monitor. Like 'worker', this is blocking, so should be
841-
-- started in a thread. This is responsible for retrying jobs that
842-
-- time out (which can happen if the processing thread is killed, for
843-
-- example) and for pushing scheduled jobs to the queue at the expected time.
844-
-- You need to have at least one of these running to have
845-
-- the retry happen, but it is safe to have any number running.
841+
-- | Start a scheduler. Like 'worker', this is blocking, so should be
842+
-- started in a thread. This is responsible for pushing scheduled jobs
843+
-- to the queue at the expected time.
846844

847-
monitor :: Job s t => Hworker s t -> IO ()
848-
monitor hw =
845+
scheduler :: Job s t => Hworker s t -> IO ()
846+
scheduler hw =
849847
forever $ do
850848
now <- getCurrentTime
851849

@@ -868,6 +866,20 @@ monitor hw =
868866
Left err ->
869867
liftIO $ hwlog hw err
870868

869+
threadDelay 500000 >> scheduler hw
870+
871+
872+
-- | Start a monitor. Like 'worker', this is blocking, so should be
873+
-- started in a thread. This is responsible for retrying jobs that
874+
-- time out (which can happen if the processing thread is killed, for
875+
-- example). You need to have at least one of these running to have
876+
-- the retry happen, but it is safe to have any number running.
877+
878+
monitor :: Job s t => Hworker s t -> IO ()
879+
monitor hw =
880+
forever $ do
881+
now <- getCurrentTime
882+
871883
runWithList hw (R.hkeys (progressQueue hw)) $ \js ->
872884
forM_ js $ \j ->
873885
runWithMaybe hw (R.hget (progressQueue hw) j) $

test/Spec.hs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,7 @@ main = hspec $ do
475475
mvar <- newMVar 0
476476
hworker <- createWith (conf "simpleworker-1" (SimpleState mvar))
477477
wthread <- forkIO (worker hworker)
478-
mthread <- forkIO (monitor hworker)
478+
sthread <- forkIO (scheduler hworker)
479479
time <- getCurrentTime
480480
queueScheduled hworker SimpleJob (addUTCTime 1 time)
481481
queueScheduled hworker SimpleJob (addUTCTime 2 time)
@@ -485,14 +485,14 @@ main = hspec $ do
485485
threadDelay 1000000 >> readMVar mvar >>= shouldBe 2
486486
threadDelay 1000000 >> readMVar mvar >>= shouldBe 3
487487
killThread wthread
488-
killThread mthread
488+
killThread sthread
489489
destroy hworker
490490

491491
it "should execute a recurring job" $ do
492492
mvar <- newMVar 0
493493
hworker <- createWith (conf "recurringworker-1" (RecurringState mvar))
494494
wthread <- forkIO (worker hworker)
495-
mthread <- forkIO (monitor hworker)
495+
sthread <- forkIO (scheduler hworker)
496496
time <- getCurrentTime
497497
queueScheduled hworker RecurringJob (addUTCTime 2 time)
498498
threadDelay 3000000 >> readMVar mvar >>= shouldBe 1
@@ -501,7 +501,7 @@ main = hspec $ do
501501
threadDelay 2000000 >> readMVar mvar >>= shouldBe 4
502502
destroy hworker
503503
killThread wthread
504-
killThread mthread
504+
killThread sthread
505505

506506
describe "Broken jobs" $
507507
it "should store broken jobs" $ do

0 commit comments

Comments
 (0)