-
Notifications
You must be signed in to change notification settings - Fork 112
Stream emulator data into workflow visualizer app #1374
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Streamed data takes in a list of nodes at a time, but a trace file has lists of lists of nodes, so we use generics to differentiate them
Supplying generics don't seem to work and runs into error with nesting. This solution supplies a type for moshi and forces a cast to the desired type.
Extend TraceMode logic by using it to store the file/socket being used in the specific mode. The logic of rendering will depend on the type of TraceMode
Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
Previously, the use of types and manually casting was due to type erasure. But using kotlin's typeOf allows us to still supply the nested types.
This will always show the most recent workflow
Since reader.readLine() is a blocking call, it's difficult to pause/end when socket.close() is called. Instead we just try-catch for when the error occurs after closing.
| // Default to File mode, and can be toggled to be in Live mode. | ||
| var traceMode by remember { mutableStateOf<TraceMode>(TraceMode.File(null)) } | ||
| var selectedTraceFile by remember { mutableStateOf<PlatformFile?>(null) } | ||
| val socket = remember { SocketClient() } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Create this lazily so it only happens if we switch to streaming mode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at it lazily, it looks liek this is pretty lightweight, so not a big deal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does seem like a bit of a smell to create a network client in composition though, especially since this is a single-use object.
| Thread { | ||
| socket.close() | ||
| } | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a JVM runtime API that @tcmulcahy was showing you? Not familiar with it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to launch a Thread to do this so that it does not block shutdown? It would be good to have a comment why we need a new Thread for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my understanding, shutdown hooks are registered by JVM and only ran once it starts the shutdown sequence. So it sort of makes sense that it needs to be on its own Thread. I'll make a comment on this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the runtime is already shutting down, why do we care about blocking? And for that matter, why do we care about closing the socket at all in that case? The JVM/OS will close it for us once the process dies – you can't leak a socket outside a process boundary if you tried.
Also, it looks like this code is running directly in composition, which is extremely bad – it would add a new shutdown hook every time this composable recomposes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
About the point for having threads, it seem like addShutdownHook just takes a Thread and will run all registered hooks in a random order. I wasn't able to find anything that would allow you to add hooks without it being on a different thread. Do update me know, though, if there are ways to do that (so I know for the future too)!
- Good point, I am not knowledgeable in this area so I didn't think about this. I had imagined that we ought to free the resources when provided an opportunity to do so, but this makes sense.
- It's removed now.
| fun reset() { | ||
| offset = Offset.Zero | ||
| scale = 1f | ||
| // scale = 1f |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
| import java.net.SocketException | ||
|
|
||
| /** | ||
| * This is a client that connects to the `ActionLogger` Unix Domain Socket and listens for any new |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: not necessarily ActionLogger socket (though it is that in your branch). More importantly, just a publishing socket from an app. (Remember this is in Workflow library which can be used anywhere).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed
| * To better separate the responsibility of reading from the socket, we use a channel for the caller | ||
| * to handle parsing and amalgamating the render passes. | ||
| */ | ||
| fun beginListen(scope: CoroutineScope) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make this a suspend fun so we don't have to pass the CoroutineScope on the stack.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes the client handle the job/coroutine for this. Otherwise we are giving too much responsibility/power to this function with the CoroutineScope
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed
| * This will always be called within an asynchronous call, so we do not need to block/launch a | ||
| * new coroutine here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But, it does launch a new coroutine?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Turns out I dont understand coroutines at all, fixed the comment now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You aren't the only one!
| * to handle parsing and amalgamating the render passes. | ||
| */ | ||
| fun beginListen(scope: CoroutineScope) { | ||
| scope.launch(Dispatchers.IO) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you make this a suspend fun, this can just be
| scope.launch(Dispatchers.IO) { | |
| withContext(Dispatchers.IO) { |
Then make the client handle launching a separate coroutine for this to run in the background.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
➕
Fully together
suspend fun beginListen() = withContext(Dispatchers.IO) {
// do all the stuff, now on an IO Thread pool backed coroutines dispatcher.
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But then beginListen would be a misnomer, since it doesn't just "begin" the listen, it is the whole listen. So a better name might be something like "consumeSocket" or something more descriptive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed, changed name to. pollSocket as well.
| } | ||
| } catch (e: SocketException) { | ||
| println("Exiting socket listener due to: ${e.message}") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need to close the input stream in a finally here? Is the BufferedReader handling that behind the scense? I don't see how.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just use the Closable.use {} extension.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed, I'm beginning to see how useful Closeable is
| TraceModeToggleSwitch( | ||
| onToggle = { | ||
| resetStates() | ||
| traceMode = if (traceMode is TraceMode.Live) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, consider calling the traceMode instance under evaluation as "currentTraceMode"
if(currentTraceMode is TraceMode.Live {
} else {
}
IMO, it's a bit clearer that the left side is the new state and the right side is existing state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed
| fun handleParseResult( | ||
| parseResult: ParseResult, | ||
| onNewFrame: (() -> Unit)? = null | ||
| ): Boolean { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, what's teh intent of this boolean? It seems like it always just returns whatever the value was of parseResult, and the result doesn't seem to be used elsewhere in your code anyway. Thoughts on just making it Unit? (no return type)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, good catch. Removed
| LaunchedEffect(traceSource) { | ||
| when (traceSource) { | ||
| is TraceMode.File -> { | ||
| // We guarantee the file is null since this composable can only be called when a file is selected. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean you guarantee that it's "not" null?
One nit.. maybe you should do some sort of assertion/error here?
Like
checkNotNull(traceSource.file) {
"traceSource.file should never be null here since this composable can only be called when a file is selected."
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops that's a typo, changed to a check now.
Is a check like this for readability? As in we can avoid having a comment here by stating explicitly (with code) that the file cannot be null? Or is it also useful to prevent us from having to use !!
| if (!initialized) { | ||
| return | ||
| } | ||
| socket.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should you check for isClosed first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in general, the start/close contract is unclear. Presumably calling start and close multiple times is not meaningful, so that should be documented in the kdoc, and enforced either by making them idempotent (i.e. calling start() again after calling it once does nothing, same for close()) or by throwing when multiple calls are detected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm assuming @ericmaxwell2003 is referring to socket.isClosed, which I don't think you need to check, I would think socket.close() would be idempotent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed, kdoc and the readme all clarifies this now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this class is intended to be single-use, you probably want to also close the channel here so that anything that tries to read from it sees there's no more data coming.
| renderPassChannel.trySend(input) | ||
| } | ||
| } catch (e: SocketException) { | ||
| println("Exiting socket listener due to: ${e.message}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be a log with the stack trace.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
util.Log isn't available for KMP apps, should I look into 3rd party libraries for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throwable.printStackTrace() is available in KMP.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| * `localabstract:` to connect to it. | ||
| */ | ||
| fun start() { | ||
| initialized = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we threadsafe here? just double checking, you may be
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be, start() and close() are all called on the main thread. Speaking of which I should change the name to be open()
...flow-trace-viewer/src/jvmMain/kotlin/com/squareup/workflow1/traceviewer/util/SocketClient.kt
Show resolved
Hide resolved
| withContext(Dispatchers.IO) { | ||
| // Since channel implements ChannelIterator, we can for-loop through on the receiver end. | ||
| for (renderPass in socket.renderPassChannel) { | ||
| val currentTree = if (fullTree.isEmpty()) null else fullTree.last() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Stylistic nit: fullTree.lastOrNull() could replace this if/else
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| Thread { | ||
| socket.close() | ||
| } | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the runtime is already shutting down, why do we care about blocking? And for that matter, why do we care about closing the socket at all in that case? The JVM/OS will close it for us once the process dies – you can't leak a socket outside a process boundary if you tried.
Also, it looks like this code is running directly in composition, which is extremely bad – it would add a new shutdown hook every time this composable recomposes.
| Box( | ||
| modifier = modifier | ||
| ) { | ||
| fun resetStates() = run { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is run for here? Can't this just be
| fun resetStates() = run { | |
| fun resetStates() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not even sure what I was thinking, changed.
| Runtime.getRuntime().addShutdownHook( | ||
| Thread { | ||
| ProcessBuilder("adb", "forward", "--remove-all") | ||
| .start().waitFor() | ||
| } | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why use a shutdown hook instead of just normal control flow, i.e. wrapping singleWindowApplication in a try/finally and doing the cleanup in finally? You'd need to pass exitProcessOnExit = false. Imo that would make this code more straightforward.
| socket.beginListen(this) | ||
| val adapter: JsonAdapter<List<Node>> = createMoshiAdapter<Node>() | ||
|
|
||
| withContext(Dispatchers.IO) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this code needs to be explicitly ran on Dispatchers.IO. I don't think any of it is doing system IO, it's all computation, so it should be on Dispatchers.Default.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed
| internal suspend fun parseFileTrace( | ||
| file: PlatformFile, | ||
| ): ParseResult { | ||
| val jsonString = file.readString() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this readString do system IO directly? If so, it should be ran on Dispatchers.IO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does system IO mean something specific here? If not, PlatformFile.readString() is already dispatched onto the IO thread, so this should be okay.
| renderPassChannel.trySend(input) | ||
| } | ||
| } catch (e: SocketException) { | ||
| println("Exiting socket listener due to: ${e.message}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throwable.printStackTrace() is available in KMP.
| } | ||
| } catch (e: SocketException) { | ||
| println("Exiting socket listener due to: ${e.message}") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just use the Closable.use {} extension.
| if (!initialized) { | ||
| return | ||
| } | ||
| socket.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm assuming @ericmaxwell2003 is referring to socket.isClosed, which I don't think you need to check, I would think socket.close() would be idempotent.
| @Composable | ||
| internal fun RenderDiagram( | ||
| traceFile: PlatformFile, | ||
| internal fun RenderTrace( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This composable contains lots of logic related to parsing, not views. Might be worth considering factoring it out into a separate class, would be a lot more testable and keep the code in this file to actual view concerns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I don't mean you need to do it in this PR – just something to think about for later.
| LaunchedEffect(currentIndex) { | ||
| if (currentIndex < 0) return@LaunchedEffect | ||
| lazyListState.animateScrollToItem(currentIndex) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is quite wasteful, launching a noop effect. Instead check currentIndex before even calling LaunchedEffect.
| LaunchedEffect(currentIndex) { | |
| if (currentIndex < 0) return@LaunchedEffect | |
| lazyListState.animateScrollToItem(currentIndex) | |
| } | |
| if (currentIndex >= 0) { | |
| LaunchedEffect(currentIndex) { | |
| lazyListState.animateScrollToItem(currentIndex) | |
| } | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| } | ||
| if (error != null) { | ||
| return@LaunchedEffect | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can there be a situation where the if check happens before the launched coroutine is able to return an error? Manually testing it doesn't seem like it's the case, but I'm sure there's a better way of writing this bit of error handling
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea this is definitely a race, the if check is pretty much guaranteed to run before the coroutine even starts since compose's dispatcher will dispatch the coroutine. And pollSocket is long-running – an error could happen at any time over the lifetime of the socket.
I think what you want to happen is that if the socket errors out, we just give up, so the loop that's reading from the socket below is cancelled. There are a few ways to do this, but if SocketClient is intended to be single-use, then probably the way that makes the most sense is to have pollSocket() close the channel after it returns. Then the for/in loop on line 119 will detect the socket is closed and complete. You can make SocketClient implement Closeable, close the channel in SocketClient.close(), and then use the use extension.
Probably worth rethinking the API of SocketClient and how it's used a bit to make something that's a bit more self-contained and safe. Maybe we should pair on it? Feel free to throw something on my calendar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Implementing Closeable seems like a good idea, that would mean the responsibility for opening and closing the socket is more clear, right?
| if (!initialized) { | ||
| return | ||
| } | ||
| socket.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this class is intended to be single-use, you probably want to also close the channel here so that anything that tries to read from it sees there's no more data coming.
| frames get populated, so we avoid off by one when indexing into the frames. | ||
| */ | ||
| frameIndex = -1 | ||
| socket.open() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would not open the socket here, since it's not clear who's responsible for closing it.
| } | ||
| if (error != null) { | ||
| return@LaunchedEffect | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea this is definitely a race, the if check is pretty much guaranteed to run before the coroutine even starts since compose's dispatcher will dispatch the coroutine. And pollSocket is long-running – an error could happen at any time over the lifetime of the socket.
I think what you want to happen is that if the socket errors out, we just give up, so the loop that's reading from the socket below is cancelled. There are a few ways to do this, but if SocketClient is intended to be single-use, then probably the way that makes the most sense is to have pollSocket() close the channel after it returns. Then the for/in loop on line 119 will detect the socket is closed and complete. You can make SocketClient implement Closeable, close the channel in SocketClient.close(), and then use the use extension.
Probably worth rethinking the API of SocketClient and how it's used a bit to make something that's a bit more self-contained and safe. Maybe we should pair on it? Feel free to throw something on my calendar.
| // Default to File mode, and can be toggled to be in Live mode. | ||
| var traceMode by remember { mutableStateOf<TraceMode>(TraceMode.File(null)) } | ||
| var selectedTraceFile by remember { mutableStateOf<PlatformFile?>(null) } | ||
| val socket = remember { SocketClient() } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does seem like a bit of a smell to create a network client in composition though, especially since this is a single-use object.
Since the SocketClient was not complex to begin with, combining the start-poll-stop lifecycle within one function is much easier to maintain cleaner, rather than having the start/stop contract be unclear.
93730e6 to
2ae9051
Compare
Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
d46ff02 to
be3fbd1
Compare
Signed-off-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One last Compose UI thing to clean up, unrelated to the network stuff we discussed.
| checked = it | ||
| onToggle() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an anti-pattern, since you now have two sources of truth for traceMode and violates UDF. You don't need the checked property at all, just check the traceMode parameter every time. Or do something like
val isLive = traceMode is TraceMode.Livesince you check it below as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
| is TraceMode.Live -> { | ||
| val adapter: JsonAdapter<List<Node>> = createMoshiAdapter<Node>() | ||
| streamRenderPassesFromDevice { renderPass -> | ||
| val currentTree = fullTree.lastOrNull() | ||
| val parseResult = parseLiveTrace(renderPass, adapter, currentTree) | ||
| handleParseResult(parseResult, onNewFrame) | ||
| } | ||
| error = "Socket has already been closed or is not available." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Beautiful!
By using Unix Domain Sockets and adb forwarding, the visualizer is able to retrieve logged data directly from the emulator to display the workflow render passes.
The app has also been changed to allow for both File and Live tracing mode through a toggle.