Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file removed ChatExample/app/libs/JavaPhoenixClient-0.3.0.jar
Binary file not shown.
Binary file added ChatExample/app/libs/JavaPhoenixClient-0.3.4.jar
Binary file not shown.
36 changes: 19 additions & 17 deletions ChatExample/app/src/main/AndroidManifest.xml
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
<?xml version="1.0" encoding="utf-8"?>
<manifest xmlns:android="http://schemas.android.com/apk/res/android"
xmlns:tools="http://schemas.android.com/tools" package="com.github.dsrees.chatexample">
xmlns:tools="http://schemas.android.com/tools"
package="com.github.dsrees.chatexample">

<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.INTERNET" />

<application
android:allowBackup="true"
android:icon="@mipmap/ic_launcher"
android:label="@string/app_name"
android:roundIcon="@mipmap/ic_launcher_round"
android:supportsRtl="true"
android:networkSecurityConfig="@xml/network_security_config"
android:theme="@style/AppTheme" tools:ignore="GoogleAppIndexingWarning">
<activity android:name=".MainActivity">
<intent-filter>
<action android:name="android.intent.action.MAIN"/>
<application
android:allowBackup="true"
android:icon="@mipmap/ic_launcher"
android:label="@string/app_name"
android:roundIcon="@mipmap/ic_launcher_round"
android:supportsRtl="true"
android:theme="@style/AppTheme"
android:usesCleartextTraffic="true"
tools:ignore="GoogleAppIndexingWarning">
<activity android:name=".MainActivity">
<intent-filter>
<action android:name="android.intent.action.MAIN" />

<category android:name="android.intent.category.LAUNCHER"/>
</intent-filter>
</activity>
</application>
<category android:name="android.intent.category.LAUNCHER" />
</intent-filter>
</activity>
</application>

</manifest>
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ class MainActivity : AppCompatActivity() {


// Use when connecting to https://github.com/dwyl/phoenix-chat-example
private val socket = Socket("https://phxchat.herokuapp.com/socket/websocket")
private val topic = "room:lobby"
// private val socket = Socket("https://phxchat.herokuapp.com/socket/websocket")
// private val topic = "room:lobby"

// Use when connecting to local server
// private val socket = Socket("ws://10.0.2.2:4000/socket/websocket")
// private val topic = "rooms:lobby"
private val socket = Socket("ws://10.0.2.2:4000/socket/websocket")
private val topic = "rooms:lobby"

private var lobbyChannel: Channel? = null

Expand Down
19 changes: 17 additions & 2 deletions src/main/kotlin/org/phoenixframework/Channel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ class Channel(
/** Timer to attempt rejoins */
internal var rejoinTimer: TimeoutTimer

/** Refs if stateChange hooks */
internal var stateChangeRefs: MutableList<String>

/**
* Optional onMessage hook that can be provided. Receives all event messages for specialized
* handling before dispatching to the Channel event callbacks.
Expand All @@ -125,6 +128,7 @@ class Channel(
this.timeout = socket.timeout
this.joinedOnce = false
this.pushBuffer = mutableListOf()
this.stateChangeRefs = mutableListOf()
this.rejoinTimer = TimeoutTimer(
dispatchQueue = socket.dispatchQueue,
timerCalculation = socket.rejoinAfterMs,
Expand All @@ -133,10 +137,11 @@ class Channel(

// Respond to socket events
this.socket.onError { _, _-> this.rejoinTimer.reset() }
.apply { stateChangeRefs.add(this) }
this.socket.onOpen {
this.rejoinTimer.reset()
if (this.isErrored) { this.rejoin() }
}
}.apply { stateChangeRefs.add(this) }


// Setup Push to be sent when joining
Expand Down Expand Up @@ -203,7 +208,14 @@ class Channel(
this.socket.logItems("Channel: error $topic ${it.payload}")

// If error was received while joining, then reset the Push
if (isJoining) { this.joinPush.reset() }
if (isJoining) {
// Make sure that the "phx_join" isn't buffered to send once the socket
// reconnects. The channel will send a new join event when the socket connects.
this.joinRef?.let { this.socket.removeFromSendBuffer(it) }

// Reset the push to be used again later
this.joinPush.reset()
}

// Mark the channel as errored and attempt to rejoin if socket is currently connected
this.state = State.ERRORED
Expand Down Expand Up @@ -414,6 +426,9 @@ class Channel(
// Do not attempt to rejoin if the channel is in the process of leaving
if (isLeaving) return

// Leave potentially duplicated channels
this.socket.leaveOpenTopic(this.topic)

// Send the joinPush
this.sendJoin(timeout)
}
Expand Down
97 changes: 68 additions & 29 deletions src/main/kotlin/org/phoenixframework/Socket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -35,33 +35,42 @@ typealias Payload = Map<String, Any?>
/** Data class that holds callbacks assigned to the socket */
internal class StateChangeCallbacks {

var open: List<() -> Unit> = ArrayList()
var open: List<Pair<String, () -> Unit>> = ArrayList()
private set
var close: List<() -> Unit> = ArrayList()
var close: List<Pair<String, () -> Unit>> = ArrayList()
private set
var error: List<(Throwable, Response?) -> Unit> = ArrayList()
var error: List<Pair<String, (Throwable, Response?) -> Unit>> = ArrayList()
private set
var message: List<(Message) -> Unit> = ArrayList()
var message: List<Pair<String, (Message) -> Unit>> = ArrayList()
private set

/** Safely adds an onOpen callback */
fun onOpen(callback: () -> Unit) {
this.open = this.open + callback
fun onOpen(ref: String, callback: () -> Unit) {
this.open = this.open + Pair(ref, callback)
}

/** Safely adds an onClose callback */
fun onClose(callback: () -> Unit) {
this.close = this.close + callback
fun onClose(ref: String, callback: () -> Unit) {
this.close = this.close + Pair(ref, callback)
}

/** Safely adds an onError callback */
fun onError(callback: (Throwable, Response?) -> Unit) {
this.error = this.error + callback
fun onError(ref: String, callback: (Throwable, Response?) -> Unit) {
this.error = this.error + Pair(ref, callback)
}

/** Safely adds an onMessage callback */
fun onMessage(callback: (Message) -> Unit) {
this.message = this.message + callback
fun onMessage(ref: String, callback: (Message) -> Unit) {
this.message = this.message + Pair(ref, callback)
}

/** Clears any callbacks with the matching refs */
fun release(refs: List<String>) {
open = open.filter { refs.contains(it.first) }
close = close.filter { refs.contains(it.first) }
error = error.filter { refs.contains(it.first) }
message = message.filter { refs.contains(it.first) }

}

/** Clears all stored callbacks */
Expand Down Expand Up @@ -151,8 +160,11 @@ class Socket(
/** Collection of unclosed channels created by the Socket */
internal var channels: List<Channel> = ArrayList()

/** Buffers messages that need to be sent once the socket has connected */
internal var sendBuffer: MutableList<() -> Unit> = ArrayList()
/**
* Buffers messages that need to be sent once the socket has connected. It is an array of Pairs
* that contain the ref of the message to send and the callback that will send the message.
*/
internal var sendBuffer: MutableList<Pair<String?, () -> Unit>> = ArrayList()

/** Ref counter for messages */
internal var ref: Int = 0
Expand Down Expand Up @@ -273,20 +285,20 @@ class Socket(

}

fun onOpen(callback: (() -> Unit)) {
this.stateChangeCallbacks.onOpen(callback)
fun onOpen(callback: (() -> Unit)): String {
return makeRef().apply { stateChangeCallbacks.onOpen(this, callback) }
}

fun onClose(callback: () -> Unit) {
this.stateChangeCallbacks.onClose(callback)
fun onClose(callback: () -> Unit): String {
return makeRef().apply { stateChangeCallbacks.onClose(this, callback) }
}

fun onError(callback: (Throwable, Response?) -> Unit) {
this.stateChangeCallbacks.onError(callback)
fun onError(callback: (Throwable, Response?) -> Unit): String {
return makeRef().apply { stateChangeCallbacks.onError(this, callback) }
}

fun onMessage(callback: (Message) -> Unit) {
this.stateChangeCallbacks.onMessage(callback)
fun onMessage(callback: (Message) -> Unit): String {
return makeRef().apply { stateChangeCallbacks.onMessage(this, callback) }
}

fun removeAllCallbacks() {
Expand All @@ -301,13 +313,24 @@ class Socket(
}

fun remove(channel: Channel) {
this.off(channel.stateChangeRefs)

// To avoid a ConcurrentModificationException, filter out the channels to be
// removed instead of calling .remove() on the list, thus returning a new list
// that does not contain the channel that was removed.
this.channels = channels
.filter { it.joinRef != channel.joinRef }
}

/**
* Removes [onOpen], [onClose], [onError], and [onMessage] registrations by their [ref] value.
*
* @param refs List of refs to remove
*/
fun off(refs: List<String>) {
this.stateChangeCallbacks.release(refs)
}

//------------------------------------------------------------------------------
// Internal
//------------------------------------------------------------------------------
Expand Down Expand Up @@ -341,7 +364,7 @@ class Socket(
} else {
// If the socket is not connected, add the push to a buffer which will
// be sent immediately upon connection.
sendBuffer.add(callback)
sendBuffer.add(Pair(ref, callback))
}
}

Expand Down Expand Up @@ -374,7 +397,7 @@ class Socket(

// Since the connections onClose was null'd out, inform all state callbacks
// that the Socket has closed
this.stateChangeCallbacks.close.forEach { it.invoke() }
this.stateChangeCallbacks.close.forEach { it.second.invoke() }
callback?.invoke()
}

Expand All @@ -391,11 +414,27 @@ class Socket(
/** Send all messages that were buffered before the socket opened */
internal fun flushSendBuffer() {
if (isConnected && sendBuffer.isNotEmpty()) {
this.sendBuffer.forEach { it.invoke() }
this.sendBuffer.forEach { it.second.invoke() }
this.sendBuffer.clear()
}
}

/** Removes an item from the send buffer with the matching ref */
internal fun removeFromSendBuffer(ref: String) {
this.sendBuffer = this.sendBuffer
.filter { it.first != ref }
.toMutableList()
}

internal fun leaveOpenTopic(topic: String) {
this.channels
.firstOrNull { it.topic == topic && (it.isJoined || it.isJoining) }
?.let {
logItems("Transport: Leaving duplicate topic: [$topic]")
it.leave()
}
}

//------------------------------------------------------------------------------
// Heartbeat
//------------------------------------------------------------------------------
Expand Down Expand Up @@ -469,7 +508,7 @@ class Socket(
this.resetHeartbeat()

// Inform all onOpen callbacks that the Socket has opened
this.stateChangeCallbacks.open.forEach { it.invoke() }
this.stateChangeCallbacks.open.forEach { it.second.invoke() }
}

internal fun onConnectionClosed(code: Int) {
Expand All @@ -486,7 +525,7 @@ class Socket(
}

// Inform callbacks the socket closed
this.stateChangeCallbacks.close.forEach { it.invoke() }
this.stateChangeCallbacks.close.forEach { it.second.invoke() }
}

internal fun onConnectionMessage(rawMessage: String) {
Expand All @@ -504,7 +543,7 @@ class Socket(
.forEach { it.trigger(message) }

// Inform all onMessage callbacks of the message
this.stateChangeCallbacks.message.forEach { it.invoke(message) }
this.stateChangeCallbacks.message.forEach { it.second.invoke(message) }
}

internal fun onConnectionError(t: Throwable, response: Response?) {
Expand All @@ -514,7 +553,7 @@ class Socket(
this.triggerChannelError()

// Inform any state callbacks of the error
this.stateChangeCallbacks.error.forEach { it.invoke(t, response) }
this.stateChangeCallbacks.error.forEach { it.second.invoke(t, response) }
}

}
Loading