11defmodule Ecto.Adapters.SQL.Stage do
22 @ moduledoc """
33 A `GenStage` process that encapsulates a SQL transaction.
4+
5+ ### Options
6+
7+ * `:name` - A name to register the started process (see the `:name` option
8+ in `GenServer.start_link/3`)
9+
10+ See the "Shared options" section at the `Ecto.Repo` documentation. All options
11+ are passed to the `GenStage` on init.
412 """
513
614 @ doc """
7- Start link a `GenStage` process that will run a transaction for its duration.
15+ Start link a `GenStage` producer that will run a transaction for its duration.
816
9- The first argument is the pool, the second argument is the `GenStage` type,
10- the third argument is the start function, the fourth argument is the handle
11- function, the fifth argument is the stop function and the optional sixth
12- argument are the options.
17+ The first argument is the repo, the second argument is the start function,
18+ the third argument is the handle demand function, the fourth argument is the
19+ stop function and the optional fiftth argument are the options.
1320
14- The start function is a o -arity anonymous function. This is called after the
15- transaction begins but before `start_link/6 ` returns. It should return the
16- `state` or call `MyRepo .rollback/1` to stop the `GenStage`.
21+ The start function is a 0 -arity anonymous function. This is called after the
22+ transaction begins but before `producer/5 ` returns. It should return the
23+ accumulator or call `repo .rollback/1` to stop the `GenStage`.
1724
18- The handle function is a 2-arity anonymous function. If the `GenStage` type is
19- a `:producer`, then the first argument is the `demand` from a `GenStage`
20- `handle_demand` callback. Otherwise the first argument is the events from a
21- `GenStage` `handle_events` callback. The second argument is the state. This
22- function returns a 2-tuple, with first element as events (empty list for
23- `:consumer`) and second element as the `state`. This function can roll back
24- and stop the `GenStage` using `MyRepo.rollback/1`.
25+ The handle demand function is a 2-arity anonymous function. The first argument
26+ is the `demand`, and the second argument is the accumulator. This function
27+ returns a 2-tuple, with first element as list of events to fulfil the demand
28+ and second element as the accumulator. If the producer has emitted all events
29+ (and so not fulfilled demand) it should call
30+ `GenStage.async_notify(self(), {:producer, :done | :halted}` to signal to
31+ consumers that it has finished. Also this function can rollback and stop the
32+ `GenStage` using `repo.rollback/1`.
2533
2634 The stop function is a 2-arity anonymous function. The first argument is the
27- terminate reason and the second argument is the `state` . This function will
35+ terminate reason and the third argument is the accumulator . This function will
2836 only be called if connection is alive and the transaction has not been rolled
2937 back. If this function returns the transaction is committed. This function can
30- roll back and stop the `GenStage` using `MyRepo.rollback/1`.
38+ rollback and stop the `GenStage` using `repo.rollback/1`.
39+
40+ For options see "Options" in the module documentation.
41+
42+ The `GenStage` process will behave like a `Flow` stage:
43+
44+ * It will stop with reason `:normal` when the last consumer cancels
45+ """
46+ @ spec producer ( module , start :: ( ( ) -> acc ) ,
47+ handle_demand :: ( ( demand :: pos_integer , acc ) -> { [ any ] , acc } ) ,
48+ stop :: ( ( reason :: any , acc ) -> any ) , Keyword . t ) ::
49+ GenServer . on_start when acc: var
50+ def producer ( repo , start , handle_demand , stop , opts \\ [ ] ) do
51+ fun = & DBConnection.Stage . producer / 5
52+ Ecto.Adapters.SQL . stage ( fun , repo , start , handle_demand , stop , opts )
53+ end
54+
55+ @ doc """
56+ Start link a `GenStage` producer consumer that will run a transaction for its
57+ duration.
58+
59+ The first argument is the repo, the second argument is the start function,
60+ the third argument is the handle events function, the fourth argument is the
61+ stop function and the optional fiftth argument are the options.
62+
63+ The start function is a 0-arity anonymous function. This is called after the
64+ transaction begins but before `consumer_producer/5` returns. It should return
65+ the accumulator or call `repo.rollback/1` to stop the `GenStage`.
66+
67+ The handle events function is a 2-arity anonymous function. The first argument
68+ is a list of incoming events, and the second argument is the accumulator. This
69+ function returns a 2-tuple, with first element as list of outgoing events and
70+ second element as the accumulator. Also this function can rollback and stop
71+ the `GenStage` using `repo.rollback/1`.
72+
73+ The stop function is a 2-arity anonymous function. The first argument is the
74+ terminate reason and the second argument is the accumulator. This function
75+ will only be called if connection is alive and the transaction has not been
76+ rolled back. If this function returns the transaction is committed. This
77+ function can rollback and stop the `GenStage` using `repo.rollback/1`.
78+
79+ For options see "Options" in the module documentation.
3180
3281 The `GenStage` process will behave like a `Flow` stage:
3382
3483 * It will stop with reason `:normal` when the last consumer cancels
3584 * It will notify consumers that it is done when all producers have cancelled
3685 or notified that they are done or halted
37- * It will cancel new and remaining producers when all producers have
38- notified that they are done or halted and it is a `:consumer`
3986 * It will not send demand to new producers when all producers have notified
40- that they are done or halted and it is a `:consumer_producer`
87+ that they are done or halted
88+ """
89+ @ spec producer_consumer ( repo :: module , start :: ( ( ) -> acc ) ,
90+ handle_events :: ( ( events_in :: [ any ] , acc ) -> { events_out :: [ any ] , acc } ) ,
91+ stop :: ( ( reason :: any , acc ) -> any ) , Keyword . t ) ::
92+ GenServer . on_start when acc: var
93+ def producer_consumer ( repo , start , handle_events , stop , opts \\ [ ] ) do
94+ fun = & DBConnection.Stage . producer_consumer / 5
95+ Ecto.Adapters.SQL . stage ( fun , repo , start , handle_events , stop , opts )
96+ end
4197
42- ### Options
98+ @ doc """
99+ Start link a `GenStage` consumer that will run a transaction for its duration.
43100
44- * `:name` - A name to register the started process (see the `:name` option
45- in `GenServer.start_link/3`)
101+ The first argument is the repo, the second argument is the start function,
102+ the third argument is the handle events function, the fourth argument is the
103+ stop function and the optional fiftth argument are the options.
46104
47- See the "Shared options" section at the `Ecto.Repo` documentation. All options
48- are passed to the `GenStage` on init.
105+ The start function is a 0-arity anonymous function. This is called after the
106+ transaction begins but before `consumer/5` returns. It should return the
107+ accumulator or call `repo.rollback/1` to stop the `GenStage`.
49108
50- ### Example
51-
52- start = fn() -> Post end
53- handle =
54- fn(entries, schema) ->
55- MyRepo.insert_all(schema, entries)
56- {[], schema}
57- end
58- stop =
59- fn
60- :normal, _ -> :ok
61- reason, _ -> MyRepo.rollback(reason)
62- end
63- Ecto.Adapters.SQL.Stage.start_link(MyRepo, :consumer, start, handle, stop)
109+ The handle events function is a 2-arity anonymous function. The first argument
110+ is the list of events, and the second argument is the accumulator. This
111+ function returns a 2-tuple, with first element is an empty list (as no
112+ outgoing events) and second element as the accumulator. Also this function can
113+ rollback and stop the `GenStage` using `repo.rollback/1`.
114+
115+ The stop function is a 2-arity anonymous function. The first argument is the
116+ terminate reason and the second argument is the accumulator. This function
117+ will only be called if connection is alive and the transaction has not been
118+ rolled back. If this function returns the transaction is committed. This
119+ function can rollback and stop the `GenStage` using `repo.rollback/1`.
120+
121+ See the "Shared options" section at the `Ecto.Repo` documentation.
122+
123+ The `GenStage` process will behave like a `Flow` stage:
124+
125+ * It will cancel new and remaining producers when all producers have
126+ notified that they are done or halted and it is a `:consumer`
64127 """
65- @ spec start_link ( repo :: module , :producer ,
66- start :: ( ( ) -> state ) ,
67- handle_demand :: ( ( demand :: pos_integer , state ) -> { [ any ] , state } ) ,
68- stop :: ( ( reason :: any , state ) -> any ) , opts :: Keyword . t ) ::
69- GenServer . on_start when state: var
70- @ spec start_link ( repo :: module , :producer_consumer ,
71- start :: ( ( ) -> state ) ,
72- handle_events :: ( ( [ any ] , state ) -> { [ any ] , state } ) ,
73- stop :: ( ( reason :: any , state ) -> any ) , opts :: Keyword . t ) ::
74- GenServer . on_start when state: var
75- @ spec start_link ( repo :: module , :consumer ,
76- start :: ( ( ) -> state ) ,
77- handle_events :: ( ( [ any ] , state ) -> { [ ] , state } ) ,
78- stop :: ( ( reason :: any , state ) -> any ) , opts :: Keyword . t ) ::
79- GenServer . on_start when state: var
80- def start_link ( repo , type , start , handle , stop , opts \\ [ ] ) do
81- Ecto.Adapters.SQL . stage ( repo , type , start , handle , stop , opts )
128+ @ spec consumer ( repo :: module , start :: ( ( ) -> acc ) ,
129+ handle_events :: ( ( events_in :: [ any ] , acc ) -> { [ ] , acc } ) ,
130+ stop :: ( ( reason :: any , acc ) -> any ) , Keyword . t ) ::
131+ GenServer . on_start when acc: var
132+ def consumer ( pool , start , handle_events , stop , opts \\ [ ] ) do
133+ fun = & DBConnection.Stage . consumer / 5
134+ Ecto.Adapters.SQL . stage ( fun , pool , start , handle_events , stop , opts )
82135 end
83136
84137 @ doc """
@@ -97,7 +150,7 @@ defmodule Ecto.Adapters.SQL.Stage do
97150 * `:max_rows` - The number of rows to load from the database as we stream.
98151 It is supported at least by Postgres and MySQL and defaults to 500.
99152
100- See the "Shared options" section at the `Ecto.Repo` documentation.
153+ For more options see "Options" in the module documentation.
101154
102155 ## Example
103156
@@ -110,8 +163,7 @@ defmodule Ecto.Adapters.SQL.Stage do
110163 |> Flow.each(&IO.inspect/1)
111164 |> Flow.start_link()
112165 """
113-
114- @ callback stream ( repo :: module , queryable :: Ecto.Query . t , opts :: Keyword . t ) ::
166+ @ spec stream ( repo :: module , queryable :: Ecto.Query . t , opts :: Keyword . t ) ::
115167 GenServer . on_start ( )
116168 def stream ( repo , queryable , opts \\ [ ] ) do
117169 stream = apply ( repo , :stream , [ queryable , opts ] )
@@ -121,7 +173,7 @@ defmodule Ecto.Adapters.SQL.Stage do
121173 { :suspended , _ , cont } = Enumerable . reduce ( stream , acc , & stream_reduce / 2 )
122174 { repo , :cont , cont }
123175 end
124- start_link ( repo , :producer , start , & stream_handle / 2 , & stream_stop / 2 , opts )
176+ producer ( repo , start , & stream_handle / 2 , & stream_stop / 2 , opts )
125177 end
126178
127179 ## Helpers
0 commit comments