From 53661eb72bba55376bc6112b51c25489522d309c Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Wed, 20 Dec 2017 16:21:53 +0000 Subject: [PATCH 1/4] initial commit --- .../expressions/codegen/CodeGenerator.scala | 4 ++-- .../catalyst/expressions/regexpExpressions.scala | 6 ++---- .../sql/execution/WholeStageCodegenExec.scala | 2 +- .../execution/aggregate/HashAggregateExec.scala | 14 +++++++++----- .../sql/execution/joins/SortMergeJoinExec.scala | 3 ++- 5 files changed, 16 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 41a920ba3d677..c95c06af1f3d1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -178,7 +178,7 @@ class CodegenContext { /** * Returns the reference of next available slot in current compacted array. The size of each - * compacted array is controlled by the config `CodeGenerator.MUTABLESTATEARRAY_SIZE_LIMIT`. + * compacted array is controlled by the constant `CodeGenerator.MUTABLESTATEARRAY_SIZE_LIMIT`. * Once reaching the threshold, new compacted array is created. */ def getNextSlot(): String = { @@ -299,7 +299,7 @@ class CodegenContext { def initMutableStates(): String = { // It's possible that we add same mutable state twice, e.g. the `mergeExpressions` in // `TypedAggregateExpression`, we should call `distinct` here to remove the duplicated ones. - val initCodes = mutableStateInitCode.distinct + val initCodes = mutableStateInitCode.distinct.map(_ + "\n") // The generated initialization code may exceed 64kb function size limit in JVM if there are too // many mutable states, so split it into multiple functions. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala index fa5425c77ebba..2b1d0ae1c029b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala @@ -118,9 +118,8 @@ case class Like(left: Expression, right: Expression) extends StringRegexExpressi if (rVal != null) { val regexStr = StringEscapeUtils.escapeJava(escape(rVal.asInstanceOf[UTF8String].toString())) - // inline mutable state since not many Like operations in a task val pattern = ctx.addMutableState(patternClass, "patternLike", - v => s"""$v = ${patternClass}.compile("$regexStr");""", forceInline = true) + v => s"""$v = ${patternClass}.compile("$regexStr");""") // We don't use nullSafeCodeGen here because we don't want to re-evaluate right again. val eval = left.genCode(ctx) @@ -194,9 +193,8 @@ case class RLike(left: Expression, right: Expression) extends StringRegexExpress if (rVal != null) { val regexStr = StringEscapeUtils.escapeJava(rVal.asInstanceOf[UTF8String].toString()) - // inline mutable state since not many RLike operations in a task val pattern = ctx.addMutableState(patternClass, "patternRLike", - v => s"""$v = ${patternClass}.compile("$regexStr");""", forceInline = true) + v => s"""$v = ${patternClass}.compile("$regexStr");""") // We don't use nullSafeCodeGen here because we don't want to re-evaluate right again. val eval = left.genCode(ctx) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 9e7008d1e0c31..d5b041ca84c93 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -283,7 +283,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp override def doProduce(ctx: CodegenContext): String = { // Right now, InputAdapter is only used when there is one input RDD. - // inline mutable state since an inputAdaptor in a task + // inline mutable state since an InputAdapter in a task val input = ctx.addMutableState("scala.collection.Iterator", "input", v => s"$v = inputs[0];", forceInline = true) val row = ctx.freshName("row") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index b1af360d85095..acddf66b676c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -587,20 +587,24 @@ case class HashAggregateExec( fastHashMapClassName, groupingKeySchema, bufferSchema).generate() ctx.addInnerClass(generatedMap) + // inline mutable state since not many aggregation operations in a task fastHashMapTerm = ctx.addMutableState(fastHashMapClassName, "vectorizedHastHashMap", - v => s"$v = new $fastHashMapClassName();") - ctx.addMutableState(s"java.util.Iterator", "vectorizedFastHashMapIter") + v => s"$v = new $fastHashMapClassName();", forceInline = true) + ctx.addMutableState(s"java.util.Iterator", "vectorizedFastHashMapIter", + forceInline = true) } else { val generatedMap = new RowBasedHashMapGenerator(ctx, aggregateExpressions, fastHashMapClassName, groupingKeySchema, bufferSchema).generate() ctx.addInnerClass(generatedMap) + // inline mutable state since not many aggregation operations in a task fastHashMapTerm = ctx.addMutableState(fastHashMapClassName, "fastHashMap", v => s"$v = new $fastHashMapClassName(" + - s"$thisPlan.getTaskMemoryManager(), $thisPlan.getEmptyAggregationBuffer());") + s"$thisPlan.getTaskMemoryManager(), $thisPlan.getEmptyAggregationBuffer());", + forceInline = true) ctx.addMutableState( "org.apache.spark.unsafe.KVIterator", - "fastHashMapIter") + "fastHashMapIter", forceInline = true) } } @@ -611,7 +615,7 @@ case class HashAggregateExec( // create hashMap val hashMapClassName = classOf[UnsafeFixedWidthAggregationMap].getName hashMapTerm = ctx.addMutableState(hashMapClassName, "hashMap", - v => s"$v = $thisPlan.createHashMap();") + v => s"$v = $thisPlan.createHashMap();", forceInline = true) sorterTerm = ctx.addMutableState(classOf[UnsafeKVExternalSorter].getName, "sorter", forceInline = true) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala index 073730462a75f..ce1472835febf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala @@ -440,8 +440,9 @@ case class SortMergeJoinExec( val spillThreshold = getSpillThreshold val inMemoryThreshold = getInMemoryThreshold + // inline mutable state since not many join operations in a task val matches = ctx.addMutableState(clsName, "matches", - v => s"$v = new $clsName($inMemoryThreshold, $spillThreshold);") + v => s"$v = new $clsName($inMemoryThreshold, $spillThreshold);", forceInline = true) // Copy the left keys as class members so they could be used in next function call. val matchedKeyVars = copyKeys(ctx, leftKeyVars) From 8d8b669d4eca9181420d71b5c69f6ced8924b6e3 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Fri, 22 Dec 2017 07:20:47 +0000 Subject: [PATCH 2/4] address review comments --- .../expressions/regexpExpressions.scala | 60 +++++++++---------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala index 2b1d0ae1c029b..f3e8f6de58975 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala @@ -119,7 +119,7 @@ case class Like(left: Expression, right: Expression) extends StringRegexExpressi val regexStr = StringEscapeUtils.escapeJava(escape(rVal.asInstanceOf[UTF8String].toString())) val pattern = ctx.addMutableState(patternClass, "patternLike", - v => s"""$v = ${patternClass}.compile("$regexStr");""") + v => s"""$v = $patternClass.compile("$regexStr");""") // We don't use nullSafeCodeGen here because we don't want to re-evaluate right again. val eval = left.genCode(ctx) @@ -142,9 +142,9 @@ case class Like(left: Expression, right: Expression) extends StringRegexExpressi val rightStr = ctx.freshName("rightStr") nullSafeCodeGen(ctx, ev, (eval1, eval2) => { s""" - String $rightStr = ${eval2}.toString(); - ${patternClass} $pattern = ${patternClass}.compile($escapeFunc($rightStr)); - ${ev.value} = $pattern.matcher(${eval1}.toString()).matches(); + String $rightStr = $eval2.toString(); + $patternClass $pattern = $patternClass.compile($escapeFunc($rightStr)); + ${ev.value} = $pattern.matcher($eval1.toString()).matches(); """ }) } @@ -194,7 +194,7 @@ case class RLike(left: Expression, right: Expression) extends StringRegexExpress val regexStr = StringEscapeUtils.escapeJava(rVal.asInstanceOf[UTF8String].toString()) val pattern = ctx.addMutableState(patternClass, "patternRLike", - v => s"""$v = ${patternClass}.compile("$regexStr");""") + v => s"""$v = $patternClass.compile("$regexStr");""") // We don't use nullSafeCodeGen here because we don't want to re-evaluate right again. val eval = left.genCode(ctx) @@ -217,9 +217,9 @@ case class RLike(left: Expression, right: Expression) extends StringRegexExpress val pattern = ctx.freshName("pattern") nullSafeCodeGen(ctx, ev, (eval1, eval2) => { s""" - String $rightStr = ${eval2}.toString(); - ${patternClass} $pattern = ${patternClass}.compile($rightStr); - ${ev.value} = $pattern.matcher(${eval1}.toString()).find(0); + String $rightStr = $eval2.toString(); + $patternClass $pattern = $patternClass.compile($rightStr); + ${ev.value} = $pattern.matcher($eval1.toString()).find(0); """ }) } @@ -336,25 +336,25 @@ case class RegExpReplace(subject: Expression, regexp: Expression, rep: Expressio nullSafeCodeGen(ctx, ev, (subject, regexp, rep) => { s""" - if (!$regexp.equals(${termLastRegex})) { + if (!$regexp.equals($termLastRegex)) { // regex value changed - ${termLastRegex} = $regexp.clone(); - ${termPattern} = ${classNamePattern}.compile(${termLastRegex}.toString()); + $termLastRegex = $regexp.clone(); + $termPattern = $classNamePattern.compile($termLastRegex.toString()); } - if (!$rep.equals(${termLastReplacementInUTF8})) { + if (!$rep.equals($termLastReplacementInUTF8)) { // replacement string changed - ${termLastReplacementInUTF8} = $rep.clone(); - ${termLastReplacement} = ${termLastReplacementInUTF8}.toString(); + $termLastReplacementInUTF8 = $rep.clone(); + $termLastReplacement = $termLastReplacementInUTF8.toString(); } - $classNameStringBuffer ${termResult} = new $classNameStringBuffer(); - java.util.regex.Matcher ${matcher} = ${termPattern}.matcher($subject.toString()); + $classNameStringBuffer $termResult = new $classNameStringBuffer(); + java.util.regex.Matcher $matcher = $termPattern.matcher($subject.toString()); - while (${matcher}.find()) { - ${matcher}.appendReplacement(${termResult}, ${termLastReplacement}); + while ($matcher.find()) { + $matcher.appendReplacement($termResult, $termLastReplacement); } - ${matcher}.appendTail(${termResult}); - ${ev.value} = UTF8String.fromString(${termResult}.toString()); - ${termResult} = null; + $matcher.appendTail($termResult); + ${ev.value} = UTF8String.fromString($termResult.toString()); + $termResult = null; $setEvNotNull """ }) @@ -423,19 +423,19 @@ case class RegExpExtract(subject: Expression, regexp: Expression, idx: Expressio nullSafeCodeGen(ctx, ev, (subject, regexp, idx) => { s""" - if (!$regexp.equals(${termLastRegex})) { + if (!$regexp.equals($termLastRegex)) { // regex value changed - ${termLastRegex} = $regexp.clone(); - ${termPattern} = ${classNamePattern}.compile(${termLastRegex}.toString()); + $termLastRegex = $regexp.clone(); + $termPattern = $classNamePattern.compile($termLastRegex.toString()); } - java.util.regex.Matcher ${matcher} = - ${termPattern}.matcher($subject.toString()); - if (${matcher}.find()) { - java.util.regex.MatchResult ${matchResult} = ${matcher}.toMatchResult(); - if (${matchResult}.group($idx) == null) { + java.util.regex.Matcher $matcher = + $termPattern.matcher($subject.toString()); + if ($matcher.find()) { + java.util.regex.MatchResult $matchResult = $matcher.toMatchResult(); + if ($matchResult.group($idx) == null) { ${ev.value} = UTF8String.EMPTY_UTF8; } else { - ${ev.value} = UTF8String.fromString(${matchResult}.group($idx)); + ${ev.value} = UTF8String.fromString($matchResult.group($idx)); } $setEvNotNull } else { From 05da9d7dfa2aca359630e70eee96db5abf96c9e4 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Wed, 27 Dec 2017 04:46:05 +0000 Subject: [PATCH 3/4] address review comment --- .../org/apache/spark/sql/execution/WholeStageCodegenExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index d5b041ca84c93..7f17743e4b009 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -283,7 +283,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp override def doProduce(ctx: CodegenContext): String = { // Right now, InputAdapter is only used when there is one input RDD. - // inline mutable state since an InputAdapter in a task + // inline mutable state since an InputAdapter is used once in a task for WholeStageCodegen val input = ctx.addMutableState("scala.collection.Iterator", "input", v => s"$v = inputs[0];", forceInline = true) val row = ctx.freshName("row") From 4ef81c8e132d3b3a91ef81f5874d992706870aa9 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Wed, 27 Dec 2017 08:35:04 +0000 Subject: [PATCH 4/4] address review comment --- .../scala/org/apache/spark/sql/execution/SortExec.scala | 2 +- .../apache/spark/sql/execution/WholeStageCodegenExec.scala | 2 +- .../spark/sql/execution/aggregate/HashAggregateExec.scala | 6 +++--- .../apache/spark/sql/execution/basicPhysicalOperators.scala | 4 ++-- .../spark/sql/execution/joins/BroadcastHashJoinExec.scala | 2 +- .../spark/sql/execution/joins/SortMergeJoinExec.scala | 6 +++--- 6 files changed, 11 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala index daff3c49e7517..ef1bb1c2a4468 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala @@ -138,7 +138,7 @@ case class SortExec( // Initialize the class member variables. This includes the instance of the Sorter and // the iterator to return sorted rows. val thisPlan = ctx.addReferenceObj("plan", this) - // inline mutable state since not many Sort operations in a task + // Inline mutable state since not many Sort operations in a task sorterVariable = ctx.addMutableState(classOf[UnsafeExternalRowSorter].getName, "sorter", v => s"$v = $thisPlan.createSorter();", forceInline = true) val metrics = ctx.addMutableState(classOf[TaskMetrics].getName, "metrics", diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 7f17743e4b009..065954559e487 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -283,7 +283,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp override def doProduce(ctx: CodegenContext): String = { // Right now, InputAdapter is only used when there is one input RDD. - // inline mutable state since an InputAdapter is used once in a task for WholeStageCodegen + // Inline mutable state since an InputAdapter is used once in a task for WholeStageCodegen val input = ctx.addMutableState("scala.collection.Iterator", "input", v => s"$v = inputs[0];", forceInline = true) val row = ctx.freshName("row") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index acddf66b676c7..9a6f1c6dfa6a9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -587,7 +587,7 @@ case class HashAggregateExec( fastHashMapClassName, groupingKeySchema, bufferSchema).generate() ctx.addInnerClass(generatedMap) - // inline mutable state since not many aggregation operations in a task + // Inline mutable state since not many aggregation operations in a task fastHashMapTerm = ctx.addMutableState(fastHashMapClassName, "vectorizedHastHashMap", v => s"$v = new $fastHashMapClassName();", forceInline = true) ctx.addMutableState(s"java.util.Iterator", "vectorizedFastHashMapIter", @@ -597,7 +597,7 @@ case class HashAggregateExec( fastHashMapClassName, groupingKeySchema, bufferSchema).generate() ctx.addInnerClass(generatedMap) - // inline mutable state since not many aggregation operations in a task + // Inline mutable state since not many aggregation operations in a task fastHashMapTerm = ctx.addMutableState(fastHashMapClassName, "fastHashMap", v => s"$v = new $fastHashMapClassName(" + s"$thisPlan.getTaskMemoryManager(), $thisPlan.getEmptyAggregationBuffer());", @@ -609,7 +609,7 @@ case class HashAggregateExec( } // Create a name for the iterator from the regular hash map. - // inline mutable state since not many aggregation operations in a task + // Inline mutable state since not many aggregation operations in a task val iterTerm = ctx.addMutableState(classOf[KVIterator[UnsafeRow, UnsafeRow]].getName, "mapIter", forceInline = true) // create hashMap diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 78137d3f97cfc..a15a8d11aa2a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -284,7 +284,7 @@ case class SampleExec( val samplerClass = classOf[PoissonSampler[UnsafeRow]].getName val initSampler = ctx.freshName("initSampler") - // inline mutable state since not many Sample operations in a task + // Inline mutable state since not many Sample operations in a task val sampler = ctx.addMutableState(s"$samplerClass", "sampleReplace", v => { val initSamplerFuncName = ctx.addNewFunction(initSampler, @@ -371,7 +371,7 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) val ev = ExprCode("", "false", value) val BigInt = classOf[java.math.BigInteger].getName - // inline mutable state since not many Range operations in a task + // Inline mutable state since not many Range operations in a task val taskContext = ctx.addMutableState("TaskContext", "taskContext", v => s"$v = TaskContext.get();", forceInline = true) val inputMetrics = ctx.addMutableState("InputMetrics", "inputMetrics", diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala index ee763e23415cf..1918fcc5482db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala @@ -139,7 +139,7 @@ case class BroadcastHashJoinExec( // At the end of the task, we update the avg hash probe. val avgHashProbe = metricTerm(ctx, "avgHashProbe") - // inline mutable state since not many join operations in a task + // Inline mutable state since not many join operations in a task val relationTerm = ctx.addMutableState(clsName, "relation", v => s""" | $v = (($clsName) $broadcast.value()).asReadOnlyCopy(); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala index ce1472835febf..94405410cce90 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala @@ -422,7 +422,7 @@ case class SortMergeJoinExec( */ private def genScanner(ctx: CodegenContext): (String, String) = { // Create class member for next row from both sides. - // inline mutable state since not many join operations in a task + // Inline mutable state since not many join operations in a task val leftRow = ctx.addMutableState("InternalRow", "leftRow", forceInline = true) val rightRow = ctx.addMutableState("InternalRow", "rightRow", forceInline = true) @@ -440,7 +440,7 @@ case class SortMergeJoinExec( val spillThreshold = getSpillThreshold val inMemoryThreshold = getInMemoryThreshold - // inline mutable state since not many join operations in a task + // Inline mutable state since not many join operations in a task val matches = ctx.addMutableState(clsName, "matches", v => s"$v = new $clsName($inMemoryThreshold, $spillThreshold);", forceInline = true) // Copy the left keys as class members so they could be used in next function call. @@ -577,7 +577,7 @@ case class SortMergeJoinExec( override def needCopyResult: Boolean = true override def doProduce(ctx: CodegenContext): String = { - // inline mutable state since not many join operations in a task + // Inline mutable state since not many join operations in a task val leftInput = ctx.addMutableState("scala.collection.Iterator", "leftInput", v => s"$v = inputs[0];", forceInline = true) val rightInput = ctx.addMutableState("scala.collection.Iterator", "rightInput",