{ consumer.seek(tp, consumer.position(tp)) }) However, sometimes I end up an infinite loop with IllegalStateExceptions being thrown [1]: No current assignment for partition I thought it was safe to seek because the. Before you can seek() you first need to subscribe() to a topic or assign() partition of a topic to the consumer. Also keep in mind, that subscribe() and assign() are lazy -- thus, you also need to do a "dummy call" to poll() before you can use seek (). If you use subscribe(), you use group management: thus, you.">
1 Kigasho

No Current Assignment For Partition Manager

Yeah, don't mix multiple versions of kafka clients.  That's not 100%
certain to be the cause of your problem, but it can't be helping.

As for your comments about async commits, read

https://issues.apache.org/jira/browse/SPARK-22486

and if you think your use case is still relevant to others given those
constraints, then share it.

On Fri, Dec 1, 2017 at 4:11 AM, Qiao, Richard
<[hidden email]> wrote:


> In your case, it looks it’s trying to make 2 versions Kafka existed in the
> same JVM at runtime. There is version conflict.
>
>
>
> About “I dont find the spark async commit  useful for our needs”, do you
> mean to say the code like below?
>
> kafkaDStream.asInstanceOf[CanCommitOffsets].commitAsync(ranges)
>
>
>
>
>
> Best Regards
>
> Richard
>
>
>
>
>
> From: venkat <[hidden email]>
> Date: Thursday, November 30, 2017 at 8:16 PM
> To: Cody Koeninger <[hidden email]>
> Cc: "[hidden email]" <[hidden email]>
> Subject: Re: [Spark streaming] No assigned partition error during seek
>
>
>
> I notice that 'Do not manually add dependencies on org.apache.kafka
> artifacts (e.g. kafka-clients). The spark-streaming-kafka-0-10 artifact has
> the appropriate transitive dependencies already, and different versions may
> be incompatible in hard to diagnose way' after your query.
>
> Does this imply that we should not be adding kafka clients in our jars?.
>
> Thanks
>
> Venkat
>
>
>
> On Fri, 1 Dec 2017 at 06:45 venkat <[hidden email]> wrote:
>
> Yes I use latest Kafka clients 0.11 to determine beginning offsets without
> seek and also I use Kafka offsets commits externally.
>
> I dont find the spark async commit  useful for our needs.
>
> Thanks
>
> Venkat
>
>
>
> On Fri, 1 Dec 2017 at 02:39 Cody Koeninger <[hidden email]> wrote:
>
> You mentioned 0.11 version; the latest version of org.apache.kafka
> kafka-clients artifact supported by DStreams is 0.10.0.1, for which it
> has an appropriate dependency.
>
> Are you manually depending on a different version of the kafka-clients
> artifact?
>
> On Fri, Nov 24, 2017 at 7:39 PM, venks61176 <[hidden email]> wrote:
>> Version: 2.2 with Kafka010
>>
>> Hi,
>>
>> We are running spark streaming on AWS and trying to process incoming
>> messages on Kafka topics. All was well.
>> Recently we wanted to migrate from 0.8 to 0.11 version of Spark library
>> and
>> Kafka 0.11 version of server.
>>
>> With this new version of software we are facing issues with regard to 'No
>> assignment to partition for a topic and it happens intermittently'. I
>> construct four DStreams with different group.ids as suggested.
>>
>> The main source of code thats causing the issue is this one
>>
>> if (!toSeek.isEmpty) {
>>       // work around KAFKA-3370 when reset is none
>>       // poll will throw if no position, i.e. auto offset reset none and
>> no
>> explicit position
>>       // but cant seek to a position before poll, because poll is what
>> gets
>> subscription partitions
>>       // So, poll, suppress the first exception, then seek
>>       val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
>>       val shouldSuppress = aor != null &&
>> aor.asInstanceOf[String].toUpperCase == "NONE"
>>       try {
>>         consumer.poll(0)
>>       } catch {
>>         case x: NoOffsetForPartitionException if shouldSuppress =>
>>           logWarning("Catching NoOffsetForPartitionException since " +
>>             ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none.  See
>> KAFKA-3370")
>>       }
>>       toSeek.asScala.foreach { case (topicPartition, offset) =>
>>           *consumer.seek(topicPartition, offset)*
>>       }
>>     }
>>
>> At the start of the job, I also ensure we are supplying all required
>> offsets
>> correctly
>>
>> private Map<TopicPartition, Long> getCommittedOffsets(String topic) {
>>     Map<TopicPartition, Long> offsets = new HashMap<>();
>>     List<TopicPartition> topicPartitions =
>>         consumer.partitionsFor(topic).stream().map(partitionInfo ->
>>             new TopicPartition(partitionInfo.topic(),
>> partitionInfo.partition()))
>>             .collect(Collectors.toList());
>>     Map<TopicPartition, Long> earliestOffsets =
>> consumer.beginningOffsets(topicPartitions);
>>     // pick committed offsets
>>     for (TopicPartition topicAndPartition : topicPartitions) {
>>       final OffsetAndMetadata committed =
>> consumer.committed(topicAndPartition);
>>       Long earliestOffset = earliestOffsets.get(topicAndPartition);
>>       if (committed != null && committed.offset() > earliestOffset) {
>>         logger
>>             .warn(
>>                 "Committed offset found for: {} offset:{} -> Hence adding
>> committed offset",
>>                 topicAndPartition, committed.offset());
>>         offsets.put(topicAndPartition, committed.offset());
>>       } else {
>>         logger
>>             .warn(
>>                 "New partition/stale offset found for: {} offset:{} ->
>> Hence
>> adding earliest offset",
>>                 topicAndPartition, earliestOffset);
>>         offsets.put(topicAndPartition, earliestOffset);
>>       }
>>     }
>>     return offsets;
>>   }
>>
>> The actual stack trace:
>>
>> Caused by: java.lang.IllegalStateException: No current assignment for
>> partition genericEvents-1
>> 2017-11-23 10:35:24,677 -    at
>>
>> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:269)
>> 2017-11-23 10:35:24,677 -    at
>>
>> org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:294)
>> 2017-11-23 10:35:24,677 -    at
>>
>> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1249)
>> 2017-11-23 10:35:24,678 -    at
>>
>> org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:107)
>> 2017-11-23 10:35:24,678 -    at
>>
>> org.apache.spark.streaming.kafka010.Subscribe$$anonfun$onStart$2.apply(ConsumerStrategy.scala:106)
>> 2017-11-23 10:35:24,678 -    at
>> scala.collection.Iterator$class.foreach(Iterator.scala:893)
>> 2017-11-23 10:35:24,678 -    at
>> scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>> 2017-11-23 10:35:24,678 -    at
>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> 2017-11-23 10:35:24,678 -    at
>> scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> 2017-11-23 10:35:24,678 -    at
>>
>> org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:106)
>> 2017-11-23 10:35:24,679 -    at
>>
>> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:72)
>> 2017-11-23 10:35:24,679 -    at
>>
>> org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:242)
>> 2017-11-23 10:35:24,679 -    at
>>
>> org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
>> 2017-11-23 10:35:24,679 -    at
>>
>> org.apache.spark.streaming.DStreamGraph$$anonfun$start$7.apply(DStreamGraph.scala:54)
>> 2017-11-23 10:35:24,679 -    at
>>
>> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
>> 2017-11-23 10:35:24,679 -    at
>>
>> scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
>> 2017-11-23 10:35:24,679 -    at
>>
>> scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
>> 2017-11-23 10:35:24,680 -    at
>>
>> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
>> 2017-11-23 10:35:24,680 -    at
>> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>> 2017-11-23 10:35:24,680 -    at
>> scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>> 2017-11-23 10:35:24,680 -    at
>> scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
>> 2017-11-23 10:35:24,680 -    at
>>
>> scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
>> 2017-11-23 10:35:24,680 -    at
>>
>> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159)
>> 2017-11-23 10:35:24,680 -    at
>>
>> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
>> 2017-11-23 10:35:24,681 -    at
>>
>> scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
>> 2017-11-23 10:35:24,681 -    at
>>
>> scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
>> 2017-11-23 10:35:24,681 -    at
>> scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
>> 2017-11-23 10:35:24,681 -    at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>
>>
>>
>>
>> --
>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: [hidden email]
>>
>
>
> ________________________________
>
> The information contained in this e-mail is confidential and/or proprietary
> to Capital One and/or its affiliates and may only be used solely in
> performance of work or services for Capital One. The information transmitted
> herewith is intended only for use by the individual or entity to which it is
> addressed. If the reader of this message is not the intended recipient, you
> are hereby notified that any review, retransmission, dissemination,
> distribution, copying or other use of, or taking of any action in reliance
> upon this information is strictly prohibited. If you have received this
> communication in error, please contact the sender and delete the material
> from your computer.


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

In this blog post, I’d like to focus the attention on how “automatic” and “manual” partition assignments can interfere with each other — and even break things. I’d like to give an advice on using them in the right way avoiding to mix them in the same scenario or being aware of what you are doing.

The Consumer Group Experience

In Apache Kafka, the consumer group concept is a way of achieving two things:

  1. Having consumers as part of the same consumer group means providing the“competing consumers” pattern with whom the messages from topic partitions are spread across the members of the group. Each consumer receives messages from one or more partitions (“automatically” assigned to it) and the same messages won’t be received by the other consumers (assigned to different partitions). In this way, we can scale the number of the consumers up to the number of the partitions (having one consumer reading only one partition); in this case, a new consumer joining the group will be in an idle state without being assigned to any partition.
  2. Having consumers as part of different consumer groups means providing the “publish/subscribe” pattern where the messages from topic partitions are sent to all the consumers across the different groups. It means that inside the same consumer group, we’ll have the rules explained above, but across different groups, the consumers will receive the same messages. It’s useful when the messages inside a topic are of interest for different applications that will process them in different ways. We want all the interested applications to receive all the same messages from the topic.

Another great advantage of consumers grouping is the rebalancing feature. When a consumer joins a group, if there are still enough partitions available (i.e. we haven’t reached the limit of one consumer per partition), a re-balancing starts and the partitions will be reassigned to the current consumers, plus the new one. In the same way, if a consumer leaves a group, the partitions will be reassigned to the remaining consumers.

What I have told so far it’s really true using the  method provided by the KafkaConsumer API. This method forces you to assign the consumer to a consumer group, setting the  property, because it’s needed for re-balancing. In any case, it’s not the consumer's choice to decide the partitions it wants to read for. In general, the first consumer joins the group doing the assignment while other consumers join the group.

How Things Can Be Broken

Other than using the  method, there is another way for a consumer to read from topic partitions: the  method. In this case, the consumer is able to specify the topic partitions it wants to read for.

This type of approach can be useful when you know exactly where some specific messages will be written (the partition) and you want to read directly from there. Of course, you lose the re-balancing feature in this case, which is the first big difference in using the subscribe method.

Another difference is that with “manual” assignment, you can avoid specifying a consumer group (i.e. the  property) for the consumer — it will be just empty. In any case, it’s better to specify it.

Most people use the subscribe method, leveraging the “automatic” assignment and re-balancing feature. Using both of these methods can break things, as we're about to see.

Imagine having a single “test” topic with only two partitions (P0 and P1) and a consumer C1 that subscribes to the topic as part of the consumer group G1. This consumer will be assigned to both the partitions receiving messages from them. Now, let’s start a new consumer C2 that is configured to be part of the same consumer group G1 but it uses the assign method to ask partitions P0 and P1 explicitly.

Now we have broken something! ...but what is it?

Both C1 and C2 will receive messages from the topic from both partitions P0 and P1, but they are part of the same consumer group G1! So we have “broken” what we said in the previous paragraph about “competing consumers” when they are part of the same consumer group. You experience a “publish/subscribe” pattern, but with consumers within the same consumer group.

What About Offsets Commits?

Generally, you should avoid a scenario like the one described above. Starting from version 0.8.2.0, the offsets committed by the consumers aren’t saved in Zookeeper but on a partitioned and replicated topic named , which is hosted on the Kafka brokers in the cluster.

When a consumer commits some offsets (for different partitions), it sends a message to the broker to the  topic. The message has the following structure :

  • key = [group, topic, partition]
  • value = offset

Coming back to the previous scenario... what does it mean?

Having C1 and C2 as part of the same consumer group but being able to receive from the same partitions (both P0 and P1) would look something like the following:

  • C1 commits offset X for partition P0 writing a message like this:
    • key = [G1, “test”, P0], value = X
  • C2 commits offset Y for partition P0 writing a message like this:
    • key = [G1, “test”, P0], value = Y

C2 has overwritten the committed offset for the same partition P0 of the consumer C1 and maybe X was less than Y. If C1 crashes and restarts, it will lose messages starting to read from Y (remember Y > X).

Something like that can’t happen with consumers which use only the subscribe way for being assigned to partitions because as part of the same consumer group they’ll receive different partitions so the key for the offset commit message will be always different.

Update: As a confirmation that mixing subscribe and assign isn’t a good thing to do, after a discussion with one of my colleagues, Henryk Konsek, it turned out that if you try to call both methods on the same consumer, the client library throws the following exception:

Conclusion

The consumer groups mechanism in Apache Kafka works really well. Leveraging it for scaling consumers and having “automatic” partitions assignment with rebalancing is a great plus. There are cases in which you would need to assign partitions “manually” but in those cases, pay attention to what could happen if you mix both solutions.

Leave a Comment

(0 Comments)

Your email address will not be published. Required fields are marked *