Skip to content

Conversation

@tdas
Copy link
Contributor

@tdas tdas commented Feb 8, 2017

This is a follow up PR for merging #16758 to spark 2.1 branch

What changes were proposed in this pull request?

mapGroupsWithState is a new API for arbitrary stateful operations in Structured Streaming, similar to DStream.mapWithState

Requirements

  • Users should be able to specify a function that can do the following
  • Access the input row corresponding to a key
  • Access the previous state corresponding to a key
  • Optionally, update or remove the state
  • Output any number of new rows (or none at all)

Proposed API

// ------------ New methods on KeyValueGroupedDataset ------------
class KeyValueGroupedDataset[K, V] {	
	// Scala friendly
	def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], KeyedState[S]) => U)
        def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, Iterator[V], KeyedState[S]) => Iterator[U])
	// Java friendly
       def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
       def flatMapGroupsWithState[S, U](func: FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
}

// ------------------- New Java-friendly function classes ------------------- 
public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable {
  R call(K key, Iterator<V> values, state: KeyedState<S>) throws Exception;
}
public interface FlatMapGroupsWithStateFunction<K, V, S, R> extends Serializable {
  Iterator<R> call(K key, Iterator<V> values, state: KeyedState<S>) throws Exception;
}

// ---------------------- Wrapper class for state data ---------------------- 
trait KeyedState[S] {
	def exists(): Boolean  	
  	def get(): S 			// throws Exception is state does not exist
	def getOption(): Option[S]       
	def update(newState: S): Unit
	def remove(): Unit		// exists() will be false after this
}

Key Semantics of the State class

  • The state can be null.
  • If the state.remove() is called, then state.exists() will return false, and getOption will returm None.
  • After that state.update(newState) is called, then state.exists() will return true, and getOption will return Some(...).
  • None of the operations are thread-safe. This is to avoid memory barriers.

Usage

val stateFunc = (word: String, words: Iterator[String, runningCount: KeyedState[Long]) => {
    val newCount = words.size + runningCount.getOption.getOrElse(0L)
    runningCount.update(newCount)
   (word, newCount)
}

dataset					                        // type is Dataset[String]
  .groupByKey[String](w => w)        	                // generates KeyValueGroupedDataset[String, String]
  .mapGroupsWithState[Long, (String, Long)](stateFunc)	// returns Dataset[(String, Long)]

How was this patch tested?

New unit tests.

`mapGroupsWithState` is a new API for arbitrary stateful operations in Structured Streaming, similar to `DStream.mapWithState`

*Requirements*
- Users should be able to specify a function that can do the following
- Access the input row corresponding to a key
- Access the previous state corresponding to a key
- Optionally, update or remove the state
- Output any number of new rows (or none at all)

*Proposed API*
```
// ------------ New methods on KeyValueGroupedDataset ------------
class KeyValueGroupedDataset[K, V] {
	// Scala friendly
	def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], KeyedState[S]) => U)
        def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, Iterator[V], KeyedState[S]) => Iterator[U])
	// Java friendly
       def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
       def flatMapGroupsWithState[S, U](func: FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
}

// ------------------- New Java-friendly function classes -------------------
public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable {
  R call(K key, Iterator<V> values, state: KeyedState<S>) throws Exception;
}
public interface FlatMapGroupsWithStateFunction<K, V, S, R> extends Serializable {
  Iterator<R> call(K key, Iterator<V> values, state: KeyedState<S>) throws Exception;
}

// ---------------------- Wrapper class for state data ----------------------
trait State[S] {
	def exists(): Boolean
  	def get(): S 			// throws Exception is state does not exist
	def getOption(): Option[S]
	def update(newState: S): Unit
	def remove(): Unit		// exists() will be false after this
}
```

Key Semantics of the State class
- The state can be null.
- If the state.remove() is called, then state.exists() will return false, and getOption will returm None.
- After that state.update(newState) is called, then state.exists() will return true, and getOption will return Some(...).
- None of the operations are thread-safe. This is to avoid memory barriers.

*Usage*
```
val stateFunc = (word: String, words: Iterator[String, runningCount: KeyedState[Long]) => {
    val newCount = words.size + runningCount.getOption.getOrElse(0L)
    runningCount.update(newCount)
   (word, newCount)
}

dataset					                        // type is Dataset[String]
  .groupByKey[String](w => w)        	                // generates KeyValueGroupedDataset[String, String]
  .mapGroupsWithState[Long, (String, Long)](stateFunc)	// returns Dataset[(String, Long)]
```

New unit tests.

Author: Tathagata Das <[email protected]>

Closes apache#16758 from tdas/mapWithState.
@tdas tdas changed the title [SPARK-19413][SS] MapGroupsWithState for arbitrary stateful operations [SPARK-19413][SS] MapGroupsWithState for arbitrary stateful operations for branch-2.1 Feb 8, 2017
@SparkQA
Copy link

SparkQA commented Feb 8, 2017

Test build #72560 has started for PR 16850 at commit 5025cb7.

@tdas
Copy link
Contributor Author

tdas commented Feb 8, 2017

jenkins test this please

@SparkQA
Copy link

SparkQA commented Feb 8, 2017

Test build #3563 has finished for PR 16850 at commit 5025cb7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 8, 2017

Test build #72586 has finished for PR 16850 at commit 5025cb7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member

zsxwing commented Feb 8, 2017

LGTM. Merging to 2.1.

asfgit pushed a commit that referenced this pull request Feb 8, 2017
…s for branch-2.1

This is a follow up PR for merging #16758 to spark 2.1 branch

## What changes were proposed in this pull request?

`mapGroupsWithState` is a new API for arbitrary stateful operations in Structured Streaming, similar to `DStream.mapWithState`

*Requirements*
- Users should be able to specify a function that can do the following
- Access the input row corresponding to a key
- Access the previous state corresponding to a key
- Optionally, update or remove the state
- Output any number of new rows (or none at all)

*Proposed API*
```
// ------------ New methods on KeyValueGroupedDataset ------------
class KeyValueGroupedDataset[K, V] {
	// Scala friendly
	def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], KeyedState[S]) => U)
        def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, Iterator[V], KeyedState[S]) => Iterator[U])
	// Java friendly
       def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
       def flatMapGroupsWithState[S, U](func: FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U])
}

// ------------------- New Java-friendly function classes -------------------
public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable {
  R call(K key, Iterator<V> values, state: KeyedState<S>) throws Exception;
}
public interface FlatMapGroupsWithStateFunction<K, V, S, R> extends Serializable {
  Iterator<R> call(K key, Iterator<V> values, state: KeyedState<S>) throws Exception;
}

// ---------------------- Wrapper class for state data ----------------------
trait KeyedState[S] {
	def exists(): Boolean
  	def get(): S 			// throws Exception is state does not exist
	def getOption(): Option[S]
	def update(newState: S): Unit
	def remove(): Unit		// exists() will be false after this
}
```

Key Semantics of the State class
- The state can be null.
- If the state.remove() is called, then state.exists() will return false, and getOption will returm None.
- After that state.update(newState) is called, then state.exists() will return true, and getOption will return Some(...).
- None of the operations are thread-safe. This is to avoid memory barriers.

*Usage*
```
val stateFunc = (word: String, words: Iterator[String, runningCount: KeyedState[Long]) => {
    val newCount = words.size + runningCount.getOption.getOrElse(0L)
    runningCount.update(newCount)
   (word, newCount)
}

dataset					                        // type is Dataset[String]
  .groupByKey[String](w => w)        	                // generates KeyValueGroupedDataset[String, String]
  .mapGroupsWithState[Long, (String, Long)](stateFunc)	// returns Dataset[(String, Long)]
```

## How was this patch tested?
New unit tests.

Author: Tathagata Das <[email protected]>

Closes #16850 from tdas/mapWithState-branch-2.1.
@zsxwing
Copy link
Member

zsxwing commented Feb 8, 2017

Merged. Could you close this PR, please?

@tdas tdas closed this Feb 9, 2017
@wobuxiangtong
Copy link

Excuse me:
I am using mapwithstate to storage data in sparkstreaming。What confused me is that rememberDuration must more than checkpointDuration。I read the code of MapWithStateRDD and changed is as the follow question and have done some tests which indicated that it does not matter even though hold only one RDD in storage and no data will lose。it will cut down a lot of memory if doing so like this.
Maybe i am wrong. Could you help me? the question is here:
https://stackoverflow.com/questions/47784439/what-will-happen-if-i-change-the-rememberduration-in-dstream-less-than-checkpoin

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants