Skip to content

Conversation

@fishcakez
Copy link
Member

@fishcakez fishcakez commented Apr 9, 2017

Needs more tests but this is a first attempt at a streaming GenStage producer targeting Flow usage. Notice that there is flat_map option so we can preload in Ecto and currently only supports a single stream.

@fishcakez fishcakez requested a review from josevalim April 9, 2017 17:06
@fishcakez fishcakez changed the title Introduce DBConnection.StreamStage Introduce DBConnection.Stage Apr 10, 2017
@fishcakez
Copy link
Member Author

fishcakez commented Apr 10, 2017

I have updated this to use a documented and generic GenStage implementation that should work in any part of a Flow pipeline. flat_map has been removed as Ecto could implement it's own implementation using Repo.Stream directly. Tests still need to be expanded.

@josevalim
Copy link
Member

@fishcakez beautiful! ❤️

I don't see though how it can be used as a producer_consumer or consumer though. Is it somehow executing the query with the producer_consumer/consumer events?

@fishcakez
Copy link
Member Author

fishcakez commented Apr 10, 2017

A :consumer might be:

start = fn(conn) ->
  # prepare query/stream to use when handling events (init stage)
  copy_query = Postgrex.prepare!(conn, "copy", "COPY table FROM STDIN", [])
  Postgrex.stream(conn, copy_query, [])
end
## this is a consumer, so takes in events and returns `{[], state}`
handle  = fn(conn, events, copy_stream) ->
  {[], for event <- events, do: encode(event), into: copy_stream}
end
## close the query (undo start function)
stop = fn(conn, _reason, %DBConnection.Stream{query: copy_query}) ->
  Postgrex.close!(conn, copy_query)
end
DBConnection.Stage.start_link(pool, :consumer, start, handle, stop)

A producer_consumer, is less obvious, but might be something like:

start = fn(conn) ->
  # prepare query to use handling events (init stage)
  Postgrex.prepare!(conn, "insert", "INSERT INTO table (id, value) VALUES (DEFAULT, $1) RETURNING id", [])
end
## this is a producer_consumer, so takes in events and returns `{events, state}`
handle  = fn(conn, values, insert_query) ->
  ids = for value <- values do
    %{rows: [id]} = Postgrex.execute!(conn, insert_query, [value])
    id
  end
  {ids, insert_query}
end
## close the query (undo start function)
stop = fn(conn, _reason, insert_query) ->
  Postgrex.close!(conn, insert_query)
end
DBConnection.Stage.start_link(pool, :producer_consumer, start, handle, stop)

start, handle and stop are equivalent to nested DBConnection.transaction calls, so can do anything that can be done in a transaction and return the result.

@josevalim
Copy link
Member

@fishcakez perfect, thank you!

### Options

* `:stream_mapper` - A function to flat map the results of the query, either
a 2-arity fun, `{module, function, args}` with `DBConnection.t` and the
Copy link
Member

@josevalim josevalim May 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Four spaces (counting from the doc marker) for paragraphs that belong to the bullet.

@fishcakez
Copy link
Member Author

When having a "continuation" style transaction we want to be able to not raise when transaction(conn, fun) is called when the connection closed/rolled back so that terminate/2 or after fun in Stream.resource/3 won't crash if an exception was previously raised. This PR gets around this by returning {:error, :rollback} if the connection is no longer available. However to be consistent it would mean nested transactions would return {:error, :rollback} instead of raising if the connection is closed or transaction is rolling back.

@josevalim
Copy link
Member

josevalim commented Jul 1, 2017 via email

@fishcakez
Copy link
Member Author

Does it mean we need to change Repo.transaction in Ecto or can we still
raise there?

We wouldn't be able to change it there because we can't tell why {:error, :rollback} occurred. Currently it can occur if:

  • Repo.rollback(:rollback)
  • Transaction is failed but transaction fun locally returns (e.g. rolled back in inner transaction and outer finishes as normal but is forced to rollback)
  • Connection is closed but transaction fun locally returns (e.g. connection closed and exception was rescued, transaction finishes as normal).

If we can't raise still, would it be a breaking change since we only return
{:error, :rollback} if the user calls Repo.rollback? Although I guess
anything could call Repo.rollback and return {:error, :rollback} so I would
say that's ok?

While it is not the only way I think it is a breaking change because users may be rescuing the DBConnection.ConnectionError (current behaviour), and not handling {:error, :rollback}.

We can resolve this by requiring a continue/2,3 call at the top level that returns {:ok, _} | {:error, _} | :closed when using the "continuation" transaction, wdyt?

@fishcakez
Copy link
Member Author

We can resolve this by requiring a continue/2,3 call at the top level that returns {:ok, _} | {:error, _} | :closed when using the "continuation" transaction, wdyt?

I think we actually need to go with requiring a continue function so we can force check in on exceptions, otherwise we may end up in situations where we close connections because the caller crashes or does not checkin.

@fishcakez
Copy link
Member Author

Closed in favour of #87 so this can be implemented in Ecto itself.

@fishcakez fishcakez closed this Jul 4, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants