Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 90 additions & 7 deletions Sources/Realtime/Channel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public class Channel {
)

/// Handle when a response is received after join()
joinPush.delegateReceive("ok", to: self) { (self, _) in
joinPush.delegateReceive(.ok, to: self) { (self, _) in
// Mark the Channel as joined
self.state = ChannelState.joined

Expand All @@ -181,13 +181,13 @@ public class Channel {
}

// Perform if Channel errors while attempting to joi
joinPush.delegateReceive("error", to: self) { (self, _) in
joinPush.delegateReceive(.error, to: self) { (self, _) in
self.state = .errored
if self.socket?.isConnected == true { self.rejoinTimer.scheduleTimeout() }
}

// Handle when the join push times out when sending after join()
joinPush.delegateReceive("timeout", to: self) { (self, _) in
joinPush.delegateReceive(.timeout, to: self) { (self, _) in
// log that the channel timed out
self.socket?.logItems(
"channel", "timeout \(self.topic) \(self.joinRef ?? "") after \(self.timeout)s"
Expand Down Expand Up @@ -553,12 +553,12 @@ public class Channel {
// Perform the same behavior if successfully left the channel
// or if sending the event timed out
leavePush
.receive("ok", delegated: onCloseDelegate)
.receive("timeout", delegated: onCloseDelegate)
.receive(.ok, delegated: onCloseDelegate)
.receive(.timeout, delegated: onCloseDelegate)
leavePush.send()

// If the Channel cannot send push events, trigger a success locally
if !canPush { leavePush.trigger("ok", payload: [:]) }
if !canPush { leavePush.trigger(.ok, payload: [:]) }

// Return the push so it can be bound to
return leavePush
Expand Down Expand Up @@ -694,13 +694,53 @@ extension Channel {
return state == .leaving
}
}
// ----------------------------------------------------------------------

// MARK: - Codable Payload

// ----------------------------------------------------------------------

extension Payload {

/// Initializes a payload from a given value
/// - parameter value: The value to encode
/// - parameter encoder: The encoder to use to encode the payload
/// - throws: Throws an error if the payload cannot be encoded
init<T: Encodable>(_ value: T, encoder: JSONEncoder = Defaults.encoder) throws {
let data = try encoder.encode(value)
self = try JSONSerialization.jsonObject(with: data, options: .allowFragments) as! Payload
}

/// Decodes the payload to a given type
/// - parameter type: The type to decode to
/// - parameter decoder: The decoder to use to decode the payload
/// - returns: The decoded payload
/// - throws: Throws an error if the payload cannot be decoded
public func decode<T: Decodable>(to type: T.Type = T.self, decoder: JSONDecoder = Defaults.decoder) throws -> T {
let data = try JSONSerialization.data(withJSONObject: self)
return try decoder.decode(type, from: data)
}

}


// ----------------------------------------------------------------------

// MARK: - Broadcast API

// ----------------------------------------------------------------------

/// Represents the payload of a broadcast message
public struct BroadcastPayload {
public let type: String
public let event: String
public let payload: Payload
}

extension Channel {
/// Broadcasts the payload to all other members of the channel
/// - parameter event: The event to broadcast
/// - parameter payload: The payload to broadcast
@discardableResult
public func broadcast(event: String, payload: Payload) -> Push {
self.push(.broadcast, payload: [
Expand All @@ -709,6 +749,38 @@ extension Channel {
"payload": payload
])
}

/// Broadcasts the encodable payload to all other members of the channel
/// - parameter event: The event to broadcast
/// - parameter payload: The payload to broadcast
/// - parameter encoder: The encoder to use to encode the payload
/// - throws: Throws an error if the payload cannot be encoded
@discardableResult
public func broadcast(event: String, payload: Encodable, encoder: JSONEncoder = Defaults.encoder) throws -> Push {
self.broadcast(event: event, payload: try Payload(payload))
}

/// Subscribes to broadcast events. Does not handle retain cycles.
///
/// Example:
///
/// let ref = channel.onBroadcast { [weak self] (message,broadcast) in
/// print(broadcast.event, broadcast.payload)
/// }
/// channel.off(.broadcast, ref1)
///
/// Subscription returns a ref counter, which can be used later to
/// unsubscribe the exact event listener
/// - parameter callback: Called with the broadcast payload
/// - returns: Ref counter of the subscription. See `func off()`
@discardableResult
public func onBroadcast(callback: @escaping (Message,BroadcastPayload) -> Void) -> Int {
self.on(.broadcast, callback: { message in
let payload = BroadcastPayload(type: message.payload["type"] as! String, event: message.payload["event"] as! String, payload: message.payload["payload"] as! Payload)
callback(message, payload)
})
}

}
// ----------------------------------------------------------------------

Expand All @@ -717,6 +789,8 @@ extension Channel {
// ----------------------------------------------------------------------

extension Channel {
/// Share presence state, available to all channel members via sync
/// - parameter payload: The payload to broadcast
@discardableResult
public func track(payload: Payload) -> Push {
self.push(.presence, payload: [
Expand All @@ -726,12 +800,21 @@ extension Channel {
])
}

/// Share presence state, available to all channel members via sync
/// - parameter payload: The payload to broadcast
/// - parameter encoder: The encoder to use to encode the payload
/// - throws: Throws an error if the payload cannot be encoded
@discardableResult
public func track(payload: Encodable, encoder: JSONEncoder = Defaults.encoder) throws -> Push {
self.track(payload: try Payload(payload))
}

/// Remove presence state for given channel
@discardableResult
public func untrack() -> Push {
self.push(.presence, payload: [
"type": "presence",
"event": "untrack"
])
}

}
15 changes: 14 additions & 1 deletion Sources/Realtime/Defaults.swift
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ public enum Defaults {
}

public static let vsn = "2.0.0"

/// Default encoder
public static let encoder: JSONEncoder = JSONEncoder()

/// Default encode function, utilizing JSONSerialization.data
public static let encode: (Any) -> Data = { json in
Expand All @@ -53,7 +56,10 @@ public enum Defaults {
options: JSONSerialization.WritingOptions()
)
}


/// Default decoder
public static let decoder: JSONDecoder = JSONDecoder()

/// Default decode function, utilizing JSONSerialization.jsonObject
public static let decode: (Data) -> Any? = { data in
guard
Expand Down Expand Up @@ -228,3 +234,10 @@ public struct ChannelOptions {
self.broadcastAcknowledge = broadcastAcknowledge
}
}

/// Represents the different status of a push
public enum PushStatus: String {
case ok
case error
case timeout
}
7 changes: 5 additions & 2 deletions Sources/Realtime/Message.swift
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,11 @@ public class Message {
/// ```swift
/// message.payload["status"]
/// ```
public var status: String? {
return rawPayload["status"] as? String
public var status: PushStatus? {
guard let status = rawPayload["status"] as? String else {
return nil
}
return PushStatus(rawValue: status)
}

init(
Expand Down
31 changes: 30 additions & 1 deletion Sources/Realtime/Presence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public final class Presence {
/// phoenix events "presence_state" and "presence_diff"
public static let defaults = Options(events: [
.state: .presenceState,
.diff: .presenceState,
.diff: .presenceDiff,
])

public init(events: [Events: ChannelEvent]) {
Expand Down Expand Up @@ -409,3 +409,32 @@ public final class Presence {
return presences.map(transformer)
}
}


extension Presence.Map {

/// Decodes the presence metadata to an array of the specified type.
/// - parameter type: The type to decode to.
/// - parameter decoder: The decoder to use.
/// - returns: The decoded values.
/// - throws: Any error that occurs during decoding.
public func decode<T: Decodable>(to type: T.Type = T.self, decoder: JSONDecoder = Defaults.decoder) throws -> [T] {
let metas: [Presence.Meta] = self["metas"]!
let data = try JSONSerialization.data(withJSONObject: metas)
return try decoder.decode([T].self, from: data)
}

}

extension Presence.State {

/// Decodes the presence metadata to a dictionary of arrays of the specified type.
/// - parameter type: The type to decode to.
/// - parameter decoder: The decoder to use.
/// - returns: The dictionary of decoded values.
/// - throws: Any error that occurs during decoding.
public func decode<T: Decodable>(to type: T.Type = T.self, decoder: JSONDecoder = Defaults.decoder) throws -> [String: [T]] {
return try mapValues { try $0.decode(decoder: decoder) }
}

}
18 changes: 9 additions & 9 deletions Sources/Realtime/Push.swift
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class Push {
var timeoutWorkItem: DispatchWorkItem?

/// Hooks into a Push. Where .receive("ok", callback(Payload)) are stored
var receiveHooks: [String: [Delegated<Message, Void>]]
var receiveHooks: [PushStatus: [Delegated<Message, Void>]]

/// True if the Push has been sent
var sent: Bool
Expand Down Expand Up @@ -89,7 +89,7 @@ public class Push {
/// Sends the Push. If it has already timed out, then the call will
/// be ignored and return early. Use `resend` in this case.
public func send() {
guard !hasReceived(status: "timeout") else { return }
guard !hasReceived(status: .timeout) else { return }

startTimeout()
sent = true
Expand Down Expand Up @@ -120,7 +120,7 @@ public class Push {
/// - parameter callback: Callback to fire when the status is recevied
@discardableResult
public func receive(
_ status: String,
_ status: PushStatus,
callback: @escaping ((Message) -> Void)
) -> Push {
var delegated = Delegated<Message, Void>()
Expand All @@ -146,7 +146,7 @@ public class Push {
/// - parameter callback: Callback to fire when the status is recevied
@discardableResult
public func delegateReceive<Target: AnyObject>(
_ status: String,
_ status: PushStatus,
to owner: Target,
callback: @escaping ((Target, Message) -> Void)
) -> Push {
Expand All @@ -158,7 +158,7 @@ public class Push {

/// Shared behavior between `receive` calls
@discardableResult
internal func receive(_ status: String, delegated: Delegated<Message, Void>) -> Push {
internal func receive(_ status: PushStatus, delegated: Delegated<Message, Void>) -> Push {
// If the message has already been received, pass it to the callback immediately
if hasReceived(status: status), let receivedMessage = receivedMessage {
delegated.call(receivedMessage)
Expand Down Expand Up @@ -188,7 +188,7 @@ public class Push {
///
/// - parameter status: Status which was received, e.g. "ok", "error", "timeout"
/// - parameter response: Response that was received
private func matchReceive(_ status: String, message: Message) {
private func matchReceive(_ status: PushStatus, message: Message) {
receiveHooks[status]?.forEach { $0.call(message) }
}

Expand Down Expand Up @@ -237,7 +237,7 @@ public class Push {

/// Setup and start the Timeout timer.
let workItem = DispatchWorkItem {
self.trigger("timeout", payload: [:])
self.trigger(.timeout, payload: [:])
}

timeoutWorkItem = workItem
Expand All @@ -248,12 +248,12 @@ public class Push {
///
/// - parameter status: Status to check
/// - return: True if given status has been received by the Push.
internal func hasReceived(status: String) -> Bool {
internal func hasReceived(status: PushStatus) -> Bool {
return receivedMessage?.status == status
}

/// Triggers an event to be sent though the Channel
internal func trigger(_ status: String, payload: Payload) {
internal func trigger(_ status: PushStatus, payload: Payload) {
/// If there is no ref event, then there is nothing to trigger on the channel
guard let refEvent = refEvent else { return }

Expand Down