11import { readFileSync , writeFileSync , existsSync , mkdirSync } from "fs" ;
22import { createInterface } from "readline" ;
33
4- const readline = createInterface ( {
5- input : process . stdin ,
6- output : process . stdout ,
7- } ) ;
4+ // This function lets you write a question to the terminal
5+ // And returns you the response of the user
6+ function readLineAsync ( msg : string ) {
7+ const readline = createInterface ( {
8+ input : process . stdin ,
9+ output : process . stdout ,
10+ } ) ;
811
9- const readLineAsync = ( msg : string ) => {
1012 return new Promise ( ( resolve ) => {
1113 readline . question ( msg , ( userRes ) => {
1214 resolve ( userRes ) ;
1315 } ) ;
1416 } ) ;
15- } ;
16-
17- // constraints
18- // a pipeline can have any number of stages
19- // every stage has the same number of steps
20-
21- export type Step =
22- | ( (
23- stepId : string ,
24- // get the result of a past stage using it's id
25- // it will return the result for the same step id
26- // it can return undefined if the previous stage data has not been stored locally
27- // or if a future stage data is being asked
28- getResultOfPastStage : < Y > ( stageId : string ) => Y
29- ) => Promise < any > )
30- | ( ( stepId : string ) => Promise < any > ) ;
31-
32- // a stage is nothing but - stage id, step
33- type Stage = {
34- stageId : string ;
35- step : Step ;
36- } ;
37-
38- // class pipeline
39- // have stages
40- // a stage can be any function given by the caller
41- // or a stage can be created using processStage method for which the caller has to provide what one step should do.
42- // it should be given stepIds
43-
44- type StepResult < T = any > =
45- | {
46- status : "rejected" ;
47- stepId : string ;
48- reason : any ;
49- }
50- | {
51- status : "fulfilled" ;
52- stepId : string ;
53- result : T ;
54- } ;
55-
56- // each stage processes multiple steps
57- // it stores the result in a file named after the step
58- // if a file exists previously for this particular version
59- // it will read previous results. if the previous result is fulfilled
60- // it will do nothing. else it will reprocess it.
61- // a common set of stepIds should be used for the whole pipeline
62-
63- // check if previous stage was fulfilled
64- // and if result is complete
65- // it will throw an error if a step was rejected
66- // or if chains are not in order
67- // or if some chains are missing from the results
68- // a common set of stepIds should be used for the whole pipeline
17+ }
6918
7019type StateStore < T = any > = {
7120 [ stepId : string ] : T ;
7221} ;
22+ // StateManager helps in getting and setting the state locally
23+ // It manipulates data in-memory and once the consumer has finished manipulating it
24+ // They need to commit the data to permanent storage using the commit method
7325class StateManager < T > {
7426 private readonly fileExt = ".json" ;
7527 private readonly dirPath = "./tmp" ;
@@ -94,29 +46,66 @@ class StateManager<T> {
9446 this . store = JSON . parse ( readFileSync ( this . filePath ) . toString ( ) ) ;
9547 }
9648
97- // it gets the latest state for the given step
49+ // It gets the latest state for the given step
9850 // the state after the last operation
9951 getStepState ( stepId : string ) : T | undefined {
10052 return this . store [ stepId ] ;
10153 }
10254
55+ // It sets the latest state for the given step
10356 setSetState ( stepId : string , state : T ) {
10457 this . store [ stepId ] = state ;
10558 }
10659
107- // after all the in memory operations
108- // one can commit to the local file
60+ // After all the in memory operations one can commit to the local file
10961 // for permanent storage
11062 commit ( ) {
11163 writeFileSync ( this . filePath , JSON . stringify ( this . store , null , 4 ) ) ;
11264 }
11365}
11466
115- // can have many stages
116- // each stage will have some steps
117- // each stage will have same number of steps
118- // a step is a function to which the step id will be passed
119- // and it will proceed accordingly
67+ // A step is defined as a method
68+ // it takes in a step id as a parameter to execute that particular step
69+ // Optionally it can take in a method `getResultOfPastStage` as a parameter
70+ // if it wants to access the result of the previous stages
71+ export type Step =
72+ | ( (
73+ stepId : string ,
74+ // get the result of a past stage using it's id
75+ // It will return the result for the same step id
76+ // It will return undefined if the previous stage data has not been stored locally
77+ // or if a future stage data is being asked
78+ getResultOfPastStage : < Y > ( stageId : string ) => Y
79+ ) => Promise < any > )
80+ | ( ( stepId : string ) => Promise < any > ) ;
81+
82+ // A step can fail. If the error is not handled it will crash the pipeline
83+ // We would like to store the result or reason locally too
84+ // For that purpose the response from a step is being wrapped in this
85+ type StepResult < T = any > =
86+ | {
87+ status : "rejected" ;
88+ stepId : string ;
89+ reason : any ;
90+ }
91+ | {
92+ status : "fulfilled" ;
93+ stepId : string ;
94+ result : T ;
95+ } ;
96+
97+ // A stage will contain an identifier and a step method
98+ type Stage = {
99+ stageId : string ;
100+ step : Step ;
101+ } ;
102+
103+ // A pipeline is conists of multiple stages
104+ // A stage contains of multiple steps
105+ // Stages will be run synchronously i.e, in order
106+ // Steps will be run asychronously
107+ // CONSTRAINT: Each stage will have the same number of steps
108+ // See the type definition of `Stage` and `Step` above to know more about them
120109export class Pipeline {
121110 private readonly stages : Stage [ ] = [ ] ;
122111 constructor (
@@ -128,8 +117,12 @@ export class Pipeline {
128117 this . stages . push ( { stageId, step } ) ;
129118 }
130119
120+ // We want to wrap the step provided by the pipeline consumer
121+ // In order to wrap the response of the step in the StepResult
122+ // also in this method we inject the `getResultOfPastStage` to the step
131123 private stepWrapper < T , Y > ( step : Step ) {
132124 return async ( stepId : string ) : Promise < StepResult < T > > => {
125+ // method to inject
133126 const getResultOfPastStage = < Y > ( stageId : string ) : Y => {
134127 let stateManager = new StateManager < StepResult < Y > > (
135128 stageId ,
@@ -148,6 +141,7 @@ export class Pipeline {
148141 return result . result ;
149142 } ;
150143 try {
144+ // wrapping result
151145 const result = await step ( stepId , getResultOfPastStage ) ;
152146 return {
153147 status : "fulfilled" ,
@@ -167,7 +161,6 @@ export class Pipeline {
167161 async processStage ( stage : Stage ) {
168162 console . log ( "processing stage: " , stage . stageId ) ;
169163
170- // create a stage manager
171164 let stateManager = new StateManager < StepResult > (
172165 stage . stageId ,
173166 this . version
@@ -179,6 +172,7 @@ export class Pipeline {
179172 console . log ( `processing step: ${ stepId } of stage: ${ stage . stageId } ` ) ;
180173
181174 const prevResult = stateManager . getStepState ( stepId ) ;
175+ // We are only processing the step if the past result of it was not fulfilled
182176 if ( prevResult === undefined || prevResult . status === "rejected" ) {
183177 let stepResult = await this . stepWrapper ( stage . step ) ( stepId ) ;
184178
@@ -190,18 +184,19 @@ export class Pipeline {
190184 console . log ( stepResult . reason ) ;
191185 }
192186
193- // since javascript is a single threaded language
194- // only one thread will be executing this function at a time
187+ // Since javascript is a single threaded language
188+ // Only one thread will be executing this function at a time
195189 stateManager . setSetState ( stepId , stepResult ) ;
196190 }
197191 } )
198192 ) ;
199193
200- // commit
194+ // We need to commit after all the manipulations
195+ // so that the result is persisted locally
201196 stateManager . commit ( ) ;
202197
203- // check if each step is fulfilled
204- // re process
198+ // We are checking if some steps are rejected
199+ // If they are, we will try them process it again
205200 if ( areSomeRejected ) {
206201 const rerun =
207202 ( await readLineAsync (
0 commit comments