Skip to content

Conversation

@tdas
Copy link
Contributor

@tdas tdas commented Jul 21, 2015

@tdas
Copy link
Contributor Author

tdas commented Jul 21, 2015

@zsxwing @koeninger Can you guys take a look

@SparkQA
Copy link

SparkQA commented Jul 21, 2015

Test build #37989 has finished for PR 7578 at commit 5da3995.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class SequenceNumberRange(
    • case class SequenceNumberRanges(ranges: Array[SequenceNumberRange])
    • class KinesisBackedBlockRDDPartition(
    • class KinesisBackedBlockRDD(
    • class KinesisSequenceRangeIterator(

@SparkQA
Copy link

SparkQA commented Jul 21, 2015

Test build #37994 has finished for PR 7578 at commit 4a36096.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class SequenceNumberRange(
    • case class SequenceNumberRanges(ranges: Array[SequenceNumberRange])
    • class KinesisBackedBlockRDDPartition(
    • class KinesisBackedBlockRDD(
    • class KinesisSequenceRangeIterator(

@SparkQA
Copy link

SparkQA commented Jul 22, 2015

Test build #38016 has finished for PR 7578 at commit 575bdbc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class SequenceNumberRange(
    • case class SequenceNumberRanges(ranges: Array[SequenceNumberRange])
    • class KinesisBackedBlockRDDPartition(
    • class KinesisBackedBlockRDD(
    • class KinesisSequenceRangeIterator(

@SparkQA
Copy link

SparkQA commented Jul 22, 2015

Test build #38022 has finished for PR 7578 at commit 8874b70.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class SequenceNumberRange(
    • case class SequenceNumberRanges(ranges: Array[SequenceNumberRange])
    • class KinesisBackedBlockRDDPartition(
    • class KinesisBackedBlockRDD(
    • class KinesisSequenceRangeIterator(
    • case class Data(topic: Vector, index: Int)
    • case class Data(globalTopicTotals: Vector)
    • case class VertexData(id: Long, topicWeights: Vector)
    • case class EdgeData(srcId: Long, dstId: Long, tokenCounts: Double)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgive my unfamiliarity with Kinesis, but are sequence numbers contiguous and increasing ? Because this seems to be the assumption here. In the doc I find the scary quote:

Sequence numbers cannot be used as indexes to sets of data within the same stream. To logically separate sets of data, use partition keys or create a separate stream for each data set.

Here I also see:

Sequence numbers generally increase over time. To guarantee strictly increasing ordering, use the SequenceNumberForOrdering parameter

If they aren't, you might find yourself with :

  • missing data, that may be with a sequence number fromSeqNumber < x < lastSeqNumber, but not iterated on here because you request here with ShardIteratorType.AT_SEQUENCE_NUMBER rather than ShardIteratorType.TRIM_HORIZON
  • superfluous data, because you're not checking that fromSeqNumber < nextRecord.getSequenceNumber() < lastSeqNumber before returning nextRecord.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good doubts!

Sequence numbers are increasing within each shard for the data being consumed. Thats how the Kinesis Client Library keeps track of the data read (

What the document refers to as SequenceNumberForOrdering is to ensure ordering guarantees at the time of pushing data into Kinesis. Once data have been pushed in and Kinesis internal have assigned sequence numbers to each record, the ordering is well-defined and guaranteed. On consumption, the ordering will always be the same.

However sequence numbers are not contiguous (unlike Kafka offsets), hence they cannot be used as an index like 0 being first record, 1 being second, 2 being third. And there is lies the problem of why we cannot use the direct approach for Kinesis. See linked design doc for more explanations and discussions.

@zsxwing
Copy link
Member

zsxwing commented Jul 23, 2015

Just some minor comments. Otherwise LGTM.

@tdas
Copy link
Contributor Author

tdas commented Jul 24, 2015

@zsxwing I made a few changes to add retry logic and timeouts, to make it more robust. Could you take a look, especially the retry logic.

@SparkQA
Copy link

SparkQA commented Jul 24, 2015

Test build #38286 has finished for PR 7578 at commit c4f25d2.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class SequenceNumberRange(
    • case class SequenceNumberRanges(ranges: Array[SequenceNumberRange])
    • class KinesisBackedBlockRDDPartition(
    • class KinesisBackedBlockRDD(
    • class KinesisSequenceRangeIterator(

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: a redundant space after nextBytes

@SparkQA
Copy link

SparkQA commented Jul 24, 2015

Test build #38290 has finished for PR 7578 at commit 5082a30.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class SequenceNumberRange(
    • case class SequenceNumberRanges(ranges: Array[SequenceNumberRange])
    • class KinesisBackedBlockRDDPartition(
    • class KinesisBackedBlockRDD(
    • class KinesisSequenceRangeIterator(

@zsxwing
Copy link
Member

zsxwing commented Jul 24, 2015

LGTM except the minor style issue.

@SparkQA
Copy link

SparkQA commented Jul 24, 2015

Test build #1193 has finished for PR 7578 at commit 5082a30.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class SequenceNumberRange(
    • case class SequenceNumberRanges(ranges: Array[SequenceNumberRange])
    • class KinesisBackedBlockRDDPartition(
    • class KinesisBackedBlockRDD(
    • class KinesisSequenceRangeIterator(

@SparkQA
Copy link

SparkQA commented Jul 24, 2015

Test build #38291 has finished for PR 7578 at commit 543d208.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class SequenceNumberRange(
    • case class SequenceNumberRanges(ranges: Array[SequenceNumberRange])
    • class KinesisBackedBlockRDDPartition(
    • class KinesisBackedBlockRDD(
    • class KinesisSequenceRangeIterator(

@tdas
Copy link
Contributor Author

tdas commented Jul 24, 2015

Thanks @zsxwing for reviewing I am going to merge it in master.

@asfgit asfgit closed this in d249636 Jul 24, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants