diff --git a/src/main/java/cn/leancloud/kafka/consumer/AbstractCommitPolicy.java b/src/main/java/cn/leancloud/kafka/consumer/AbstractCommitPolicy.java index 21ed48d..269ac3c 100644 --- a/src/main/java/cn/leancloud/kafka/consumer/AbstractCommitPolicy.java +++ b/src/main/java/cn/leancloud/kafka/consumer/AbstractCommitPolicy.java @@ -14,7 +14,7 @@ import static java.util.function.BinaryOperator.maxBy; import static java.util.stream.Collectors.toSet; -abstract class AbstractCommitPolicy implements CommitPolicy { +abstract class AbstractCommitPolicy implements CommitPolicy { protected final Consumer consumer; final Map topicOffsetHighWaterMark; final Map completedTopicOffsets; diff --git a/src/main/java/cn/leancloud/kafka/consumer/AbstractPartialCommitPolicy.java b/src/main/java/cn/leancloud/kafka/consumer/AbstractPartialCommitPolicy.java index fd1dcc2..8952955 100644 --- a/src/main/java/cn/leancloud/kafka/consumer/AbstractPartialCommitPolicy.java +++ b/src/main/java/cn/leancloud/kafka/consumer/AbstractPartialCommitPolicy.java @@ -5,40 +5,18 @@ import org.apache.kafka.common.TopicPartition; import java.time.Duration; -import java.util.HashMap; import java.util.Map; -abstract class AbstractPartialCommitPolicy extends AbstractCommitPolicy { - private final Duration forceWholeCommitInterval; - private long nextWholeCommitNanos; - - AbstractPartialCommitPolicy(Consumer consumer, Duration forceWholeCommitInterval) { - super(consumer); - this.forceWholeCommitInterval = forceWholeCommitInterval; - this.nextWholeCommitNanos = nextForceWholeCommitTime(forceWholeCommitInterval); +abstract class AbstractPartialCommitPolicy extends AbstractRecommitAwareCommitPolicy { + AbstractPartialCommitPolicy(Consumer consumer, Duration RecommitInterval) { + super(consumer, RecommitInterval); } - Map offsetsToPartialCommit() { - if (needWholeCommit()) { - final Map ret = new HashMap<>(completedTopicOffsets); - for (TopicPartition partition : consumer.assignment()) { - final OffsetAndMetadata offset = consumer.committed(partition); - if (offset != null) { - ret.putIfAbsent(partition, offset); - } - } - nextWholeCommitNanos = nextForceWholeCommitTime(forceWholeCommitInterval); - return ret; + Map offsetsForPartialCommit() { + if (needRecommit()) { + return offsetsForRecommit(); } else { return completedTopicOffsets; } } - - private boolean needWholeCommit() { - return System.nanoTime() >= nextWholeCommitNanos; - } - - private long nextForceWholeCommitTime(Duration forceWholeCommitInterval) { - return System.nanoTime() + forceWholeCommitInterval.toNanos(); - } } diff --git a/src/main/java/cn/leancloud/kafka/consumer/AbstractRecommitAwareCommitPolicy.java b/src/main/java/cn/leancloud/kafka/consumer/AbstractRecommitAwareCommitPolicy.java new file mode 100644 index 0000000..03dae70 --- /dev/null +++ b/src/main/java/cn/leancloud/kafka/consumer/AbstractRecommitAwareCommitPolicy.java @@ -0,0 +1,50 @@ +package cn.leancloud.kafka.consumer; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; + +abstract class AbstractRecommitAwareCommitPolicy extends AbstractCommitPolicy { + private final Duration recommitInterval; + private long nextRecommitNanos; + + AbstractRecommitAwareCommitPolicy(Consumer consumer, Duration recommitInterval) { + super(consumer); + this.recommitInterval = recommitInterval; + updateNextRecommitTime(System.nanoTime()); + } + + Map offsetsForRecommit() { + assert needRecommit() : "current nanos: " + System.nanoTime() + " nextRecommitNanos:" + nextRecommitNanos; + + final Map ret = new HashMap<>(completedTopicOffsets); + for (TopicPartition partition : consumer.assignment()) { + final OffsetAndMetadata offset = consumer.committed(partition); + if (offset != null) { + ret.putIfAbsent(partition, offset); + } + } + + return ret; + } + + boolean needRecommit() { + return System.nanoTime() >= nextRecommitNanos; + } + + void updateNextRecommitTime() { + updateNextRecommitTime(System.nanoTime()); + } + + long nextRecommitNanos() { + return nextRecommitNanos; + } + + private void updateNextRecommitTime(long currentNanos) { + nextRecommitNanos = currentNanos + recommitInterval.toNanos(); + } +} diff --git a/src/main/java/cn/leancloud/kafka/consumer/AsyncCommitPolicy.java b/src/main/java/cn/leancloud/kafka/consumer/AsyncCommitPolicy.java index d6ab267..253c65b 100644 --- a/src/main/java/cn/leancloud/kafka/consumer/AsyncCommitPolicy.java +++ b/src/main/java/cn/leancloud/kafka/consumer/AsyncCommitPolicy.java @@ -1,47 +1,42 @@ package cn.leancloud.kafka.consumer; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.Collections; import java.util.HashSet; +import java.util.Map; import java.util.Set; -final class AsyncCommitPolicy extends AbstractCommitPolicy { +final class AsyncCommitPolicy extends AbstractRecommitAwareCommitPolicy { private static final Logger logger = LoggerFactory.getLogger(AsyncCommitPolicy.class); private final int maxPendingAsyncCommits; + private final OffsetCommitCallback callback; private int pendingAsyncCommitCounter; private boolean forceSync; - AsyncCommitPolicy(Consumer consumer, int maxPendingAsyncCommits) { - super(consumer); + AsyncCommitPolicy(Consumer consumer, Duration recommitInterval, int maxPendingAsyncCommits) { + super(consumer, recommitInterval); this.maxPendingAsyncCommits = maxPendingAsyncCommits; + this.callback = new AsyncCommitCallback(); } @Override public Set tryCommit(boolean noPendingRecords) { if (!noPendingRecords || completedTopicOffsets.isEmpty()) { + if (needRecommit()) { + commit(offsetsForRecommit()); + } return Collections.emptySet(); } - if (forceSync || pendingAsyncCommitCounter >= maxPendingAsyncCommits) { - consumer.commitSync(); - pendingAsyncCommitCounter = 0; - forceSync = false; - } else { - ++pendingAsyncCommitCounter; - consumer.commitAsync((offsets, exception) -> { - --pendingAsyncCommitCounter; - assert pendingAsyncCommitCounter >= 0 : "actual: " + pendingAsyncCommitCounter; - if (exception != null) { - logger.warn("Failed to commit offsets: " + offsets + " asynchronously", exception); - forceSync = true; - } - }); - } + commit(); final Set partitions = new HashSet<>(completedTopicOffsets.keySet()); // it's OK to clear these collections here and we will not left any complete offset without commit even @@ -50,4 +45,61 @@ public Set tryCommit(boolean noPendingRecords) { topicOffsetHighWaterMark.clear(); return partitions; } + + int pendingAsyncCommitCount() { + return pendingAsyncCommitCounter; + } + + boolean forceSync() { + return forceSync; + } + + void setForceSync(boolean forceSync) { + this.forceSync = forceSync; + } + + private void commit() { + commit(Collections.emptyMap()); + } + + private void commit(Map offsets) { + if (forceSync || pendingAsyncCommitCounter >= maxPendingAsyncCommits) { + syncCommit(offsets); + pendingAsyncCommitCounter = 0; + forceSync = false; + } else { + asyncCommit(offsets); + } + // update next recommit time even if async commit failed, we tolerate this situation + updateNextRecommitTime(); + } + + private void asyncCommit(Map offsets) { + ++pendingAsyncCommitCounter; + if (offsets.isEmpty()) { + consumer.commitAsync(callback); + } else { + consumer.commitAsync(offsets, callback); + } + } + + private void syncCommit(Map offsets) { + if (offsets.isEmpty()) { + consumer.commitSync(); + } else { + consumer.commitSync(offsets); + } + } + + private class AsyncCommitCallback implements OffsetCommitCallback { + @Override + public void onComplete(Map offsets, Exception exception) { + --pendingAsyncCommitCounter; + assert pendingAsyncCommitCounter >= 0 : "actual: " + pendingAsyncCommitCounter; + if (exception != null) { + logger.warn("Failed to commit offsets: " + offsets + " asynchronously", exception); + forceSync = true; + } + } + } } diff --git a/src/main/java/cn/leancloud/kafka/consumer/AutoCommitPolicy.java b/src/main/java/cn/leancloud/kafka/consumer/AutoCommitPolicy.java index fc8c714..518746f 100644 --- a/src/main/java/cn/leancloud/kafka/consumer/AutoCommitPolicy.java +++ b/src/main/java/cn/leancloud/kafka/consumer/AutoCommitPolicy.java @@ -3,6 +3,7 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.TopicPartition; +import java.time.Duration; import java.util.Collections; import java.util.Set; diff --git a/src/main/java/cn/leancloud/kafka/consumer/LcKafkaConsumerBuilder.java b/src/main/java/cn/leancloud/kafka/consumer/LcKafkaConsumerBuilder.java index fe0415a..08cfc31 100644 --- a/src/main/java/cn/leancloud/kafka/consumer/LcKafkaConsumerBuilder.java +++ b/src/main/java/cn/leancloud/kafka/consumer/LcKafkaConsumerBuilder.java @@ -89,7 +89,7 @@ private static void requireArgument(boolean expression, String template, Object. @Nullable private CommitPolicy policy; @Nullable - private Duration forceWholeCommitInterval; + private Duration recommitInterval; private LcKafkaConsumerBuilder(Map kafkaConsumerConfigs, ConsumerRecordHandler consumerRecordHandler) { @@ -193,52 +193,56 @@ public LcKafkaConsumerBuilder maxPendingAsyncCommits(int maxPendingAsyncCo } /** - * The interval to commit all partitions and it's completed offsets to broker on a partial commit consumer. + * The interval to commit all partitions and it's completed offsets to broker on a non-automatic commit consumer. *

- * This configuration is only valid and is required on partial commit consumer build with + * This configuration is only valid and is required on a non-automatic commit consumer build with + * {@link LcKafkaConsumerBuilder#buildSync()}, {@link LcKafkaConsumerBuilder#buildAsync()}, * {@link LcKafkaConsumerBuilder#buildPartialSync()} or {@link LcKafkaConsumerBuilder#buildPartialAsync()}. * For these kind of consumers, usually they only commit offsets of a partition when there was records consumed from * that partition and all these consumed records was handled successfully. But we must periodically commit those - * subscribed partitions who have not have any records too. Otherwise, after commit offset log retention timeout, - * Kafka broker may forget where the current commit offset of these partition for the consumer are. Then, when the - * consumer crashed and recovered, if the consumer set "auto.offset.reset" configuration to "earliest", it may - * consume a already consumed record again. So please make sure that {@code forceWholeCommitIntervalInMillis} - * is within log retention time set on Kafka broker. - *

- * The default {@code forceWholeCommitInterval} is 1 hour. + * subscribed partitions who have had records but no new records for a long time too. Otherwise, after commit offset + * retention timeout, Kafka broker may forget where the current commit offset of these partition for the consumer + * are. Then, when the consumer crashed and recovered, if the consumer set auto.offset.reset + * configuration to earliest, it may consume a already consumed record again. So please make sure + * that {@code recommitIntervalInMillis} is within the limit set by offsets.retention.minutes + * on Kafka broker or even within 1/3 of that limit to tolerate some commit failures on async commit consumer. + *

+ * The default {@code recommitInterval} is 1 hour. * - * @param forceWholeCommitIntervalInMillis the interval in millis seconds to do a whole commit + * @param recommitIntervalInMillis the interval in millis seconds to do a recommit * @return this */ - public LcKafkaConsumerBuilder forceWholeCommitIntervalInMillis(long forceWholeCommitIntervalInMillis) { - requireArgument(forceWholeCommitIntervalInMillis > 0, - "forceWholeCommitIntervalInMillis: %s (expected > 0)", forceWholeCommitIntervalInMillis); + public LcKafkaConsumerBuilder recommitIntervalInMillis(long recommitIntervalInMillis) { + requireArgument(recommitIntervalInMillis > 0, + "recommitIntervalInMillis: %s (expected > 0)", recommitIntervalInMillis); - this.forceWholeCommitInterval = Duration.ofMillis(forceWholeCommitIntervalInMillis); + this.recommitInterval = Duration.ofMillis(recommitIntervalInMillis); return this; } /** - * The interval to commit all partitions and it's completed offsets to broker on a partial commit consumer. + * The interval to commit all partitions and it's completed offsets to broker on a non-automatic commit consumer. *

- * This configuration is only valid on partial commit consumer build with + * This configuration is only valid and is required on a non-automatic commit consumer build with + * {@link LcKafkaConsumerBuilder#buildSync()}, {@link LcKafkaConsumerBuilder#buildAsync()}, * {@link LcKafkaConsumerBuilder#buildPartialSync()} or {@link LcKafkaConsumerBuilder#buildPartialAsync()}. * For these kind of consumers, usually they only commit offsets of a partition when there was records consumed from * that partition and all these consumed records was handled successfully. But we must periodically commit those - * subscribed partitions who have not have any records too. Otherwise, after commit offset log retention timeout, - * Kafka broker may forget where the current commit offset of these partition for the consumer are. Then, when the - * consumer crashed and recovered, if the consumer set "auto.offset.reset" configuration to "earliest", it may - * consume a already consumed record again. So please make sure that {@code forceWholeCommitInterval} - * is within log retention time set on Kafka broker. - *

- * The default {@code forceWholeCommitInterval} is 1 hour. + * subscribed partitions who have had records but no new records for a long time too. Otherwise, after commit offset + * retention timeout, Kafka broker may forget where the current commit offset of these partition for the consumer + * are. Then, when the consumer crashed and recovered, if the consumer set auto.offset.reset + * configuration to earliest, it may consume a already consumed record again. So please make sure + * that {@code recommitInterval} is within the limit set by offsets.retention.minutes on + * Kafka broker or even within 1/3 of that limit to tolerate some commit failures on async commit consumer.. + *

+ * The default {@code recommitInterval} is 1 hour. * - * @param forceWholeCommitInterval the interval to do a whole commit + * @param recommitInterval the interval to do a recommit * @return this */ - public LcKafkaConsumerBuilder forceWholeCommitInterval(Duration forceWholeCommitInterval) { - requireNonNull(forceWholeCommitInterval, "forceWholeCommitInterval"); - this.forceWholeCommitInterval = forceWholeCommitInterval; + public LcKafkaConsumerBuilder recommitInterval(Duration recommitInterval) { + requireNonNull(recommitInterval, "recommitInterval"); + this.recommitInterval = recommitInterval; return this; } @@ -342,7 +346,7 @@ public LcKafkaConsumer buildAuto() { */ public LcKafkaConsumer buildSync() { consumer = buildConsumer(false); - policy = new SyncCommitPolicy<>(consumer); + policy = new SyncCommitPolicy<>(consumer, getRecommitInterval()); return doBuild(); } @@ -367,15 +371,8 @@ public LcKafkaConsumer buildSync() { * @return this */ public LcKafkaConsumer buildPartialSync() { - if (forceWholeCommitInterval == null) { - logger.warn("Force whole commit interval is not set for a partial commit consumer, the default " + - "interval of 1 hour will be used."); - forceWholeCommitInterval = Duration.ofHours(1); - } - assert forceWholeCommitInterval != null; - consumer = buildConsumer(false); - policy = new PartialSyncCommitPolicy<>(consumer, forceWholeCommitInterval); + policy = new PartialSyncCommitPolicy<>(consumer, getRecommitInterval()); return doBuild(); } @@ -406,7 +403,7 @@ public LcKafkaConsumer buildPartialSync() { */ public LcKafkaConsumer buildAsync() { consumer = buildConsumer(false); - policy = new AsyncCommitPolicy<>(consumer, maxPendingAsyncCommits); + policy = new AsyncCommitPolicy<>(consumer, getRecommitInterval(), maxPendingAsyncCommits); return doBuild(); } @@ -435,15 +432,8 @@ public LcKafkaConsumer buildAsync() { * @return this */ public LcKafkaConsumer buildPartialAsync() { - if (forceWholeCommitInterval == null) { - logger.warn("Force whole commit interval is not set for a partial commit consumer, the default " + - "interval of 30 seconds will be used."); - forceWholeCommitInterval = Duration.ofSeconds(30); - } - assert forceWholeCommitInterval != null; - consumer = buildConsumer(false); - policy = new PartialAsyncCommitPolicy<>(consumer, forceWholeCommitInterval, maxPendingAsyncCommits); + policy = new PartialAsyncCommitPolicy<>(consumer, getRecommitInterval(), maxPendingAsyncCommits); return doBuild(); } @@ -494,6 +484,16 @@ private Consumer buildConsumer(boolean autoCommit) { return new KafkaConsumer<>(configs, keyDeserializer, valueDeserializer); } + Duration getRecommitInterval() { + if (recommitInterval == null) { + logger.warn("Recommit interval is not set for a non-automatic commit consumer, the default " + + "interval of 1 hour will be used."); + recommitInterval = Duration.ofHours(1); + } + + return recommitInterval; + } + private void checkConfigs(KafkaConfigsChecker[] checkers) { for (KafkaConfigsChecker check : checkers) { check.check(configs); diff --git a/src/main/java/cn/leancloud/kafka/consumer/PartialAsyncCommitPolicy.java b/src/main/java/cn/leancloud/kafka/consumer/PartialAsyncCommitPolicy.java index 956b390..d9807cb 100644 --- a/src/main/java/cn/leancloud/kafka/consumer/PartialAsyncCommitPolicy.java +++ b/src/main/java/cn/leancloud/kafka/consumer/PartialAsyncCommitPolicy.java @@ -2,6 +2,7 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetCommitCallback; import org.apache.kafka.common.TopicPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -13,46 +14,65 @@ final class PartialAsyncCommitPolicy extends AbstractPartialCommitPolicy consumer, Duration forceWholeCommitInterval, int maxPendingAsyncCommits) { super(consumer, forceWholeCommitInterval); this.maxPendingAsyncCommits = maxPendingAsyncCommits; + this.callback = new AsyncCommitCallback(); } @Override public Set tryCommit(boolean noPendingRecords) { - if (completedTopicOffsets.isEmpty()) { + final Map offsets = offsetsForPartialCommit(); + if (offsets.isEmpty()) { return Collections.emptySet(); + } else { + final Set partitions = getCompletedPartitions(noPendingRecords); + if (forceSync || pendingAsyncCommitCounter >= maxPendingAsyncCommits) { + consumer.commitSync(offsets); + pendingAsyncCommitCounter = 0; + forceSync = false; + clearCachedCompletedPartitionsRecords(partitions, noPendingRecords); + } else { + ++pendingAsyncCommitCounter; + consumer.commitAsync(offsets, callback); + } + + // update next recommit time even if async commit failed, we tolerate this situation + updateNextRecommitTime(); + return partitions; } + } - final Set partitions = getCompletedPartitions(noPendingRecords); - if (forceSync || pendingAsyncCommitCounter >= maxPendingAsyncCommits) { - consumer.commitSync(offsetsToPartialCommit()); - pendingAsyncCommitCounter = 0; - forceSync = false; - clearCachedCompletedPartitionsRecords(partitions, noPendingRecords); - } else { - ++pendingAsyncCommitCounter; - consumer.commitAsync(offsetsToPartialCommit(), (offsets, exception) -> { - --pendingAsyncCommitCounter; - assert pendingAsyncCommitCounter >= 0 : "actual: " + pendingAsyncCommitCounter; - if (exception != null) { - // if last async commit is failed, we do not clean cached completed offsets and let next - // commit be a sync commit so all the complete offsets will be committed at that time - logger.warn("Failed to commit offset: " + offsets + " asynchronously", exception); - forceSync = true; - } else { - final Map completeOffsets = - offsets == completedTopicOffsets ? new HashMap<>(offsets) : offsets; - for (Map.Entry entry : completeOffsets.entrySet()) { - completedTopicOffsets.remove(entry.getKey(), entry.getValue()); - topicOffsetHighWaterMark.remove(entry.getKey(), entry.getValue().offset()); - } + int pendingAsyncCommitCount() { + return pendingAsyncCommitCounter; + } + + boolean forceSync() { + return forceSync; + } + + private class AsyncCommitCallback implements OffsetCommitCallback { + @Override + public void onComplete(Map offsets, Exception exception) { + --pendingAsyncCommitCounter; + assert pendingAsyncCommitCounter >= 0 : "actual: " + pendingAsyncCommitCounter; + if (exception != null) { + // if last async commit is failed, we do not clean cached completed offsets and let next + // commit be a sync commit so all the complete offsets will be committed at that time + logger.warn("Failed to commit offset: " + offsets + " asynchronously", exception); + forceSync = true; + } else { + final Map completeOffsets = + offsets == completedTopicOffsets ? new HashMap<>(offsets) : offsets; + for (Map.Entry entry : completeOffsets.entrySet()) { + completedTopicOffsets.remove(entry.getKey(), entry.getValue()); + topicOffsetHighWaterMark.remove(entry.getKey(), entry.getValue().offset()); } - }); + } } - return partitions; } } diff --git a/src/main/java/cn/leancloud/kafka/consumer/PartialSyncCommitPolicy.java b/src/main/java/cn/leancloud/kafka/consumer/PartialSyncCommitPolicy.java index a7707c5..61a4c58 100644 --- a/src/main/java/cn/leancloud/kafka/consumer/PartialSyncCommitPolicy.java +++ b/src/main/java/cn/leancloud/kafka/consumer/PartialSyncCommitPolicy.java @@ -1,10 +1,12 @@ package cn.leancloud.kafka.consumer; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import java.time.Duration; import java.util.Collections; +import java.util.Map; import java.util.Set; final class PartialSyncCommitPolicy extends AbstractPartialCommitPolicy { @@ -14,14 +16,18 @@ final class PartialSyncCommitPolicy extends AbstractPartialCommitPolicy tryCommit(boolean noPendingRecords) { + final Map offsets = offsetsForPartialCommit(); + if (!offsets.isEmpty()) { + consumer.commitSync(offsets); + updateNextRecommitTime(); + } + if (completedTopicOffsets.isEmpty()) { return Collections.emptySet(); + } else { + final Set partitions = getCompletedPartitions(noPendingRecords); + clearCachedCompletedPartitionsRecords(partitions, noPendingRecords); + return partitions; } - - consumer.commitSync(offsetsToPartialCommit()); - - final Set partitions = getCompletedPartitions(noPendingRecords); - clearCachedCompletedPartitionsRecords(partitions, noPendingRecords); - return partitions; } } diff --git a/src/main/java/cn/leancloud/kafka/consumer/SyncCommitPolicy.java b/src/main/java/cn/leancloud/kafka/consumer/SyncCommitPolicy.java index b173a8c..abb07ad 100644 --- a/src/main/java/cn/leancloud/kafka/consumer/SyncCommitPolicy.java +++ b/src/main/java/cn/leancloud/kafka/consumer/SyncCommitPolicy.java @@ -3,13 +3,14 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.common.TopicPartition; +import java.time.Duration; import java.util.Collections; import java.util.HashSet; import java.util.Set; -final class SyncCommitPolicy extends AbstractCommitPolicy { - SyncCommitPolicy(Consumer consumer) { - super(consumer); +final class SyncCommitPolicy extends AbstractRecommitAwareCommitPolicy { + SyncCommitPolicy(Consumer consumer, Duration recommitInterval) { + super(consumer, recommitInterval); } @Override @@ -19,7 +20,11 @@ public Set tryCommit(boolean noPendingRecords) { final Set completePartitions = new HashSet<>(completedTopicOffsets.keySet()); completedTopicOffsets.clear(); topicOffsetHighWaterMark.clear(); + updateNextRecommitTime(); return completePartitions; + } else if (needRecommit()) { + consumer.commitSync(offsetsForRecommit()); + updateNextRecommitTime(); } return Collections.emptySet(); } diff --git a/src/test/java/cn/leancloud/kafka/consumer/AbstractCommitPolicyTest.java b/src/test/java/cn/leancloud/kafka/consumer/AbstractCommitPolicyTest.java index eb3e43f..32fef9e 100644 --- a/src/test/java/cn/leancloud/kafka/consumer/AbstractCommitPolicyTest.java +++ b/src/test/java/cn/leancloud/kafka/consumer/AbstractCommitPolicyTest.java @@ -6,6 +6,7 @@ import org.junit.Before; import org.junit.Test; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; diff --git a/src/test/java/cn/leancloud/kafka/consumer/AbstractPartialCommitPolicyTest.java b/src/test/java/cn/leancloud/kafka/consumer/AbstractPartialCommitPolicyTest.java index dd59594..66b4c10 100644 --- a/src/test/java/cn/leancloud/kafka/consumer/AbstractPartialCommitPolicyTest.java +++ b/src/test/java/cn/leancloud/kafka/consumer/AbstractPartialCommitPolicyTest.java @@ -52,23 +52,23 @@ public void testPartialCommit() { final Map completeOffsets = buildCommitOffsets(completedRecords); - assertThat(policy.offsetsToPartialCommit()).isEqualTo(completeOffsets); + assertThat(policy.offsetsForPartialCommit()).isEqualTo(completeOffsets); } @Test - public void testWholeCommit() throws Exception { + public void testRecommit() throws Exception { policy = new TestingPartialCommitPolicy(consumer, Duration.ofMillis(200)); final Map previousCommitOffsets = commitRecords(completeRecords(prepareConsumerRecords(toPartitions(range(0, 10).boxed().collect(toList())), 1, 10))); - policy.offsetsToPartialCommit().clear(); + policy.offsetsForPartialCommit().clear(); policy.topicOffsetHighWaterMark().clear(); final Map newOffsetsToCommit = buildCommitOffsets(completeRecords(prepareConsumerRecords(toPartitions(range(10, 20).boxed().collect(toList())), 1, 10))); Thread.sleep(200); - assertThat(policy.offsetsToPartialCommit()) + assertThat(policy.offsetsForPartialCommit()) .hasSize(previousCommitOffsets.size() + newOffsetsToCommit.size()) .containsAllEntriesOf(previousCommitOffsets) .containsAllEntriesOf(newOffsetsToCommit); diff --git a/src/test/java/cn/leancloud/kafka/consumer/AsyncCommitPolicyTest.java b/src/test/java/cn/leancloud/kafka/consumer/AsyncCommitPolicyTest.java index 07b7a92..3df24b0 100644 --- a/src/test/java/cn/leancloud/kafka/consumer/AsyncCommitPolicyTest.java +++ b/src/test/java/cn/leancloud/kafka/consumer/AsyncCommitPolicyTest.java @@ -7,6 +7,7 @@ import org.junit.Test; import org.mockito.Mockito; +import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -15,6 +16,7 @@ import static cn.leancloud.kafka.consumer.TestingUtils.*; import static java.util.stream.Collectors.toList; +import static java.util.stream.IntStream.range; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; @@ -29,9 +31,10 @@ public class AsyncCommitPolicyTest { @Before public void setUp() { consumer = new MockConsumer<>(OffsetResetStrategy.LATEST); - policy = new AsyncCommitPolicy<>(consumer, defaultMaxPendingAsyncCommits); + policy = new AsyncCommitPolicy<>(consumer, Duration.ofHours(1), defaultMaxPendingAsyncCommits); partitions = toPartitions(IntStream.range(0, 30).boxed().collect(toList())); - pendingRecords = prepareRecords(partitions); + assignPartitions(consumer, partitions, 0L); + pendingRecords = generateConsumedRecords(consumer, partitions); } @After @@ -41,37 +44,97 @@ public void tearDown() { @Test public void testHavePendingRecords() { - for (ConsumerRecord record : pendingRecords) { - completeRecord(record); - } + final long nextRecommitNanos = policy.nextRecommitNanos(); + addCompleteRecordsInPolicy(policy, pendingRecords); assertThat(policy.tryCommit(false)).isEmpty(); for (TopicPartition partition : partitions) { assertThat(consumer.committed(partition)).isNull(); } assertThat(policy.completedTopicOffsets()).isNotEmpty(); assertThat(policy.topicOffsetHighWaterMark()).isNotEmpty(); + assertThat(policy.nextRecommitNanos()).isEqualTo(nextRecommitNanos); } @Test public void testNoCompleteRecords() { - prepareRecords(partitions); + final long nextRecommitNanos = policy.nextRecommitNanos(); + assignPartitions(consumer, partitions, 0L); + generateConsumedRecords(consumer, partitions); assertThat(policy.tryCommit(true)).isEmpty(); for (TopicPartition partition : partitions) { assertThat(consumer.committed(partition)).isNull(); } + assertThat(policy.nextRecommitNanos()).isEqualTo(nextRecommitNanos); } @Test - public void testTryCommitAll() { - for (ConsumerRecord record : pendingRecords) { - completeRecord(record); + public void testSyncRecommit() throws Exception { + policy = new AsyncCommitPolicy<>(consumer, Duration.ofMillis(200), defaultMaxPendingAsyncCommits); + long nextRecommitNanos = policy.nextRecommitNanos(); + assignPartitions(consumer, toPartitions(range(0, 30).boxed().collect(toList())), 0L); + + final List> prevRecords = generateConsumedRecords(consumer, toPartitions(range(0, 10).boxed().collect(toList())), 10); + final Map previousCommitOffsets = buildCommitOffsets(prevRecords); + addCompleteRecordsInPolicy(policy, prevRecords); + assertThat(policy.tryCommit(true)) + .containsExactlyInAnyOrderElementsOf(toPartitions(range(0, 10).boxed().collect(toList()))); + assertThat(policy.nextRecommitNanos()).isGreaterThan(nextRecommitNanos); + + Thread.sleep(200); + nextRecommitNanos = policy.nextRecommitNanos(); + final List> newRecords = generateConsumedRecords(consumer, toPartitions(range(10, 20).boxed().collect(toList())), 10); + final Map newCommitOffsets = buildCommitOffsets(newRecords); + newCommitOffsets.putAll(previousCommitOffsets); + + addCompleteRecordsInPolicy(policy, newRecords); + policy.setForceSync(true); + assertThat(policy.tryCommit(false)).isEmpty(); + assertThat(policy.forceSync()).isFalse(); + for (Map.Entry entry : newCommitOffsets.entrySet()) { + assertThat(consumer.committed(entry.getKey())).isEqualTo(entry.getValue()); + } + assertThat(policy.nextRecommitNanos()).isGreaterThan(nextRecommitNanos); + } + + @Test + public void testAsyncRecommit() throws Exception { + policy = new AsyncCommitPolicy<>(consumer, Duration.ofMillis(200), defaultMaxPendingAsyncCommits); + long nextRecommitNanos = policy.nextRecommitNanos(); + assignPartitions(consumer, toPartitions(range(0, 30).boxed().collect(toList())), 0L); + + final List> prevRecords = generateConsumedRecords(consumer, toPartitions(range(0, 10).boxed().collect(toList())), 10); + final Map previousCommitOffsets = buildCommitOffsets(prevRecords); + addCompleteRecordsInPolicy(policy, prevRecords); + assertThat(policy.tryCommit(true)) + .containsExactlyInAnyOrderElementsOf(toPartitions(range(0, 10).boxed().collect(toList()))); + assertThat(policy.nextRecommitNanos()).isGreaterThan(nextRecommitNanos); + + Thread.sleep(200); + nextRecommitNanos = policy.nextRecommitNanos(); + final List> newRecords = generateConsumedRecords(consumer, toPartitions(range(10, 20).boxed().collect(toList())), 10); + final Map newCommitOffsets = buildCommitOffsets(newRecords); + newCommitOffsets.putAll(previousCommitOffsets); + + addCompleteRecordsInPolicy(policy, newRecords); + policy.setForceSync(false); + assertThat(policy.tryCommit(false)).isEmpty(); + for (Map.Entry entry : newCommitOffsets.entrySet()) { + assertThat(consumer.committed(entry.getKey())).isEqualTo(entry.getValue()); } + assertThat(policy.nextRecommitNanos()).isGreaterThan(nextRecommitNanos); + } + + @Test + public void testTryCommitAll() { + final long nextRecommitNanos = policy.nextRecommitNanos(); + addCompleteRecordsInPolicy(policy, pendingRecords); assertThat(policy.tryCommit(true)).containsExactlyInAnyOrderElementsOf(partitions); for (TopicPartition partition : partitions) { assertThat(consumer.committed(partition)).isEqualTo(new OffsetAndMetadata(2)); } assertThat(policy.completedTopicOffsets()).isEmpty(); assertThat(policy.topicOffsetHighWaterMark()).isEmpty(); + assertThat(policy.nextRecommitNanos()).isGreaterThan(nextRecommitNanos); } @Test @@ -80,25 +143,28 @@ public void testForceCommitAfterTooManyPendingAsyncCommits() { doNothing().when(mockConsumer).commitAsync(any()); int asyncCommitTimes = pendingRecords.size() - 1; - policy = new AsyncCommitPolicy<>(mockConsumer, asyncCommitTimes); + policy = new AsyncCommitPolicy<>(mockConsumer, Duration.ofHours(1), asyncCommitTimes); for (ConsumerRecord record : pendingRecords.subList(0, asyncCommitTimes)) { - completeRecord(record); + addCompleteRecordInPolicy(policy, record); assertThat(policy.tryCommit(true)) .hasSize(1) .isEqualTo(Collections.singleton(new TopicPartition(record.topic(), record.partition()))); + assertThat(policy.forceSync()).isFalse(); } verify(mockConsumer, times(asyncCommitTimes)).commitAsync(any()); verify(mockConsumer, never()).commitSync(); + assertThat(policy.pendingAsyncCommitCount()).isEqualTo(asyncCommitTimes); final ConsumerRecord synCommitRecord = pendingRecords.get(asyncCommitTimes); - completeRecord(synCommitRecord); + addCompleteRecordInPolicy(policy, synCommitRecord); assertThat(policy.tryCommit(true)) .hasSize(1) .isEqualTo(Collections.singleton(new TopicPartition(synCommitRecord.topic(), synCommitRecord.partition()))); verify(mockConsumer, times(asyncCommitTimes)).commitAsync(any()); verify(mockConsumer, times(1)).commitSync(); + assertThat(policy.pendingAsyncCommitCount()).isZero(); } @Test @@ -106,10 +172,10 @@ public void testAsyncCommitIntertwineWithSyncCommits() { final Consumer mockConsumer = Mockito.mock(Consumer.class); doNothing().when(mockConsumer).commitAsync(any()); - policy = new AsyncCommitPolicy<>(mockConsumer, 10); + policy = new AsyncCommitPolicy<>(mockConsumer, Duration.ofHours(1), 10); for (ConsumerRecord record : pendingRecords) { - completeRecord(record); + addCompleteRecordInPolicy(policy, record); assertThat(policy.tryCommit(true)) .hasSize(1) .isEqualTo(Collections.singleton(new TopicPartition(record.topic(), record.partition()))); @@ -131,39 +197,28 @@ public void testForceSyncAfterAsyncCommitFailed() { return null; }).when(mockConsumer).commitAsync(any()); - policy = new AsyncCommitPolicy<>(mockConsumer, 10); + policy = new AsyncCommitPolicy<>(mockConsumer, Duration.ofHours(1), 10); final ConsumerRecord triggerFailedRecord = pendingRecords.get(0); - completeRecord(triggerFailedRecord); + addCompleteRecordInPolicy(policy, triggerFailedRecord); assertThat(policy.tryCommit(true)) .hasSize(1) .isEqualTo(Collections.singleton(new TopicPartition(triggerFailedRecord.topic(), triggerFailedRecord.partition()))); verify(mockConsumer, times(1)).commitAsync(any()); verify(mockConsumer, never()).commitSync(); + assertThat(policy.pendingAsyncCommitCount()).isZero(); + assertThat(policy.forceSync()).isTrue(); final ConsumerRecord syncRecord = pendingRecords.get(1); - completeRecord(syncRecord); + addCompleteRecordInPolicy(policy, syncRecord); assertThat(policy.tryCommit(true)) .hasSize(1) .isEqualTo(Collections.singleton(new TopicPartition(syncRecord.topic(), syncRecord.partition()))); verify(mockConsumer, times(1)).commitAsync(any()); verify(mockConsumer, times(1)).commitSync(); + assertThat(policy.pendingAsyncCommitCount()).isZero(); + assertThat(policy.forceSync()).isFalse(); } - - private List> prepareRecords(List partitions) { - // one msg for each partitions - final List> pendingRecords = prepareConsumerRecords(partitions, 1, 1); - assignPartitions(consumer, partitions, 0L); - fireConsumerRecords(consumer, pendingRecords); - consumer.poll(0); - return pendingRecords; - } - - private void completeRecord(ConsumerRecord record) { - policy.addPendingRecord(record); - policy.completeRecord(record); - } - } \ No newline at end of file diff --git a/src/test/java/cn/leancloud/kafka/consumer/AutoCommitPolicyTest.java b/src/test/java/cn/leancloud/kafka/consumer/AutoCommitPolicyTest.java index 14e8de0..80dd691 100644 --- a/src/test/java/cn/leancloud/kafka/consumer/AutoCommitPolicyTest.java +++ b/src/test/java/cn/leancloud/kafka/consumer/AutoCommitPolicyTest.java @@ -8,6 +8,7 @@ import org.junit.Before; import org.junit.Test; +import java.time.Duration; import java.util.List; import java.util.stream.IntStream; diff --git a/src/test/java/cn/leancloud/kafka/consumer/LcKafkaConsumerBuilderTest.java b/src/test/java/cn/leancloud/kafka/consumer/LcKafkaConsumerBuilderTest.java index aeba1d7..f80c4ab 100644 --- a/src/test/java/cn/leancloud/kafka/consumer/LcKafkaConsumerBuilderTest.java +++ b/src/test/java/cn/leancloud/kafka/consumer/LcKafkaConsumerBuilderTest.java @@ -100,19 +100,19 @@ public void testNullShutdownTimeout() { } @Test - public void testNegativeForceWholeCommitInterval() { + public void testNegativeRecommitInterval() { assertThatThrownBy(() -> LcKafkaConsumerBuilder.newBuilder(configs, testingHandler, keyDeserializer, valueDeserializer) - .forceWholeCommitIntervalInMillis(-1 * ThreadLocalRandom.current().nextLong(1, Long.MAX_VALUE))) + .recommitIntervalInMillis(-1 * ThreadLocalRandom.current().nextLong(1, Long.MAX_VALUE))) .isInstanceOf(IllegalArgumentException.class) - .hasMessageContaining("forceWholeCommitIntervalInMillis"); + .hasMessageContaining("recommitIntervalInMillis"); } @Test - public void testNullForceWholeCommitInterval() { + public void testNullRecommitInterval() { assertThatThrownBy(() -> LcKafkaConsumerBuilder.newBuilder(configs, testingHandler, keyDeserializer, valueDeserializer) - .forceWholeCommitInterval(null)) + .recommitInterval(null)) .isInstanceOf(NullPointerException.class) - .hasMessage("forceWholeCommitInterval"); + .hasMessage("recommitInterval"); } @Test @@ -185,6 +185,7 @@ public void testSyncConsumer() { .pollTimeout(Duration.ofMillis(1000)) .maxPendingAsyncCommits(100) .workerPool(workerPool, false) + .recommitInterval(Duration.ofMinutes(20)) .buildSync(); assertThat(consumer).isNotNull(); @@ -200,6 +201,7 @@ public void testSyncWithoutWorkerPoolConsumer() { .mockKafkaConsumer(new MockConsumer<>(OffsetResetStrategy.LATEST)) .pollTimeout(Duration.ofMillis(1000)) .maxPendingAsyncCommits(100) + .recommitInterval(Duration.ofMinutes(20)) .buildSync(); assertThat(consumer).isNotNull(); @@ -208,7 +210,7 @@ public void testSyncWithoutWorkerPoolConsumer() { } @Test - public void testASyncConsumer() { + public void testAsyncConsumer() { AUTO_OFFSET_RESET.set(configs, "latest"); MAX_POLL_RECORDS.set(configs, 10); final LcKafkaConsumer consumer = LcKafkaConsumerBuilder.newBuilder(configs, testingHandler) @@ -216,6 +218,7 @@ public void testASyncConsumer() { .pollTimeout(Duration.ofMillis(1000)) .maxPendingAsyncCommits(100) .workerPool(workerPool, false) + .recommitInterval(Duration.ofMinutes(20)) .buildAsync(); assertThat(consumer).isNotNull(); @@ -232,7 +235,7 @@ public void testPartialSyncConsumer() { .pollTimeout(Duration.ofMillis(1000)) .maxPendingAsyncCommits(100) .workerPool(workerPool, false) - .forceWholeCommitInterval(Duration.ofHours(1)) + .recommitInterval(Duration.ofHours(1)) .buildPartialSync(); assertThat(consumer).isNotNull(); @@ -249,7 +252,6 @@ public void testPartialAsyncConsumer() { .pollTimeout(Duration.ofMillis(1000)) .maxPendingAsyncCommits(100) .workerPool(workerPool, false) - .forceWholeCommitIntervalInMillis(1000) .buildPartialAsync(); assertThat(consumer).isNotNull(); diff --git a/src/test/java/cn/leancloud/kafka/consumer/PartialAsyncCommitPolicyTest.java b/src/test/java/cn/leancloud/kafka/consumer/PartialAsyncCommitPolicyTest.java index 98e9198..a6d73ce 100644 --- a/src/test/java/cn/leancloud/kafka/consumer/PartialAsyncCommitPolicyTest.java +++ b/src/test/java/cn/leancloud/kafka/consumer/PartialAsyncCommitPolicyTest.java @@ -13,6 +13,7 @@ import static cn.leancloud.kafka.consumer.TestingUtils.*; import static java.util.stream.Collectors.toList; +import static java.util.stream.IntStream.range; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; @@ -29,7 +30,8 @@ public void setUp() { consumer = new MockConsumer<>(OffsetResetStrategy.LATEST); policy = new PartialAsyncCommitPolicy<>(consumer, Duration.ofSeconds(30), defaultMaxPendingAsyncCommits); partitions = toPartitions(IntStream.range(0, 30).boxed().collect(toList())); - pendingRecords = preparePendingRecords(partitions, 1); + assignPartitions(consumer, partitions, 0); + pendingRecords = generateConsumedRecords(consumer, partitions, 1); } @After @@ -38,18 +40,65 @@ public void tearDown() { } @Test - public void testNoCompleteRecords() { - preparePendingRecords(partitions, 1); + public void testNoNewCompleteRecords() { + final List partitions = toPartitions(IntStream.range(0, 20).boxed().collect(toList())); + assignPartitions(consumer, partitions, 0L); + + final List> prevRecords = generateConsumedRecords(consumer, toPartitions(range(0, 10).boxed().collect(toList())), 10); + final Map previousCommitOffsets = buildCommitOffsets(prevRecords); + addCompleteRecordsInPolicy(policy, prevRecords); + assertThat(policy.tryCommit(true)) + .containsExactlyInAnyOrderElementsOf(toPartitions(range(0, 10).boxed().collect(toList()))); + + final long nextRecommitNanos = policy.nextRecommitNanos(); + // only generate consumed records, these records are not completed in policy + generateConsumedRecords(consumer, toPartitions(range(10, 20).boxed().collect(toList())), 10); assertThat(policy.tryCommit(true)).isEmpty(); for (TopicPartition partition : partitions) { - assertThat(consumer.committed(partition)).isNull(); + if (previousCommitOffsets.containsKey(partition)) { + assertThat(consumer.committed(partition)).isEqualTo(previousCommitOffsets.get(partition)); + } else { + assertThat(consumer.committed(partition)).isNull(); + } + } + assertThat(policy.nextRecommitNanos()).isEqualTo(nextRecommitNanos); + } + + @Test + public void testRecommit() throws Exception { + policy = new PartialAsyncCommitPolicy<>(consumer, Duration.ofMillis(200), defaultMaxPendingAsyncCommits); + long nextRecommitNanos = policy.nextRecommitNanos(); + assignPartitions(consumer, toPartitions(range(0, 30).boxed().collect(toList())), 0L); + + final List> prevRecords = generateConsumedRecords(consumer, toPartitions(range(0, 10).boxed().collect(toList())), 10); + final Map previousCommitOffsets = buildCommitOffsets(prevRecords); + addCompleteRecordsInPolicy(policy, prevRecords); + assertThat(policy.tryCommit(true)) + .containsExactlyInAnyOrderElementsOf(toPartitions(range(0, 10).boxed().collect(toList()))); + assertThat(policy.nextRecommitNanos()).isGreaterThan(nextRecommitNanos); + + Thread.sleep(200); + nextRecommitNanos = policy.nextRecommitNanos(); + final List> newRecords = generateConsumedRecords(consumer, toPartitions(range(10, 20).boxed().collect(toList())), 10); + final Map newCommitOffsets = buildCommitOffsets(newRecords); + newCommitOffsets.putAll(previousCommitOffsets); + + addCompleteRecordsInPolicy(policy, newRecords); + + assertThat(policy.tryCommit(false)) + .containsExactlyInAnyOrderElementsOf(toPartitions(range(10, 20).boxed().collect(toList()))); + for (Map.Entry entry : newCommitOffsets.entrySet()) { + assertThat(consumer.committed(entry.getKey())).isEqualTo(entry.getValue()); } + assertThat(policy.nextRecommitNanos()).isGreaterThan(nextRecommitNanos); } @Test public void testPartialAsyncCommit() { final List partitions = toPartitions(IntStream.range(0, 30).boxed().collect(toList())); - final List> pendingRecords = preparePendingRecords(partitions, 2); + final long nextRecommitNanos = policy.nextRecommitNanos(); + assignPartitions(consumer, partitions, 0); + final List> pendingRecords = generateConsumedRecords(consumer, partitions, 2); // two records for each partitions for (ConsumerRecord record : pendingRecords) { policy.addPendingRecord(record); @@ -78,17 +127,16 @@ public void testPartialAsyncCommit() { } assertThat(policy.completedTopicOffsets()).isEmpty(); + assertThat(policy.nextRecommitNanos()).isGreaterThan(nextRecommitNanos); } @Test public void testPartialAsyncCommitWithNoPendingFuturesLeft() { final List partitions = toPartitions(IntStream.range(0, 30).boxed().collect(toList())); - final List> pendingRecords = preparePendingRecords(partitions, 2); + assignPartitions(consumer, partitions, 0); // two records for each partitions - for (ConsumerRecord record : pendingRecords) { - policy.addPendingRecord(record); - policy.completeRecord(record); - } + final List> pendingRecords = generateConsumedRecords(consumer, partitions, 2); + addCompleteRecordsInPolicy(policy, pendingRecords); assertThat(policy.tryCommit(true)) .hasSize(partitions.size()) @@ -112,23 +160,27 @@ public void testForceCommitAfterTooManyPendingAsyncCommits() { final Set partitionsToResume = new HashSet<>(); for (ConsumerRecord record : pendingRecords.subList(0, asyncCommitTimes)) { partitionsToResume.add(new TopicPartition(record.topic(), record.partition())); - completeRecord(record); + addCompleteRecordInPolicy(policy, record); assertThat(policy.tryCommit(true)) .hasSize(partitionsToResume.size()) .isEqualTo(partitionsToResume); + assertThat(policy.forceSync()).isFalse(); } verify(mockConsumer, times(asyncCommitTimes)).commitAsync(any(), any()); verify(mockConsumer, never()).commitSync(); + assertThat(policy.pendingAsyncCommitCount()).isEqualTo(asyncCommitTimes); final ConsumerRecord synCommitRecord = pendingRecords.get(asyncCommitTimes); partitionsToResume.add(new TopicPartition(synCommitRecord.topic(), synCommitRecord.partition())); - completeRecord(synCommitRecord); + addCompleteRecordInPolicy(policy, synCommitRecord); assertThat(policy.tryCommit(true)) .hasSize(partitionsToResume.size()) .isEqualTo(partitionsToResume); verify(mockConsumer, times(asyncCommitTimes)).commitAsync(any(), any()); verify(mockConsumer, times(1)).commitSync(any()); + assertThat(policy.pendingAsyncCommitCount()).isZero(); + assertThat(policy.forceSync()).isFalse(); } @Test @@ -145,7 +197,7 @@ public void testAsyncCommitIntertwineWithSyncCommits() { } partitionsToResume.add(new TopicPartition(record.topic(), record.partition())); - completeRecord(record); + addCompleteRecordInPolicy(policy, record); assertThat(policy.tryCommit(true)) .hasSize(partitionsToResume.size()) .isEqualTo(partitionsToResume); @@ -171,17 +223,19 @@ public void testForceSyncAfterAsyncCommitFailed() { // a failed async commit on the first time final ConsumerRecord triggerFailedRecord = pendingRecords.get(0); - completeRecord(triggerFailedRecord); + addCompleteRecordInPolicy(policy, triggerFailedRecord); assertThat(policy.tryCommit(true)) .hasSize(1) .isEqualTo(Collections.singleton(new TopicPartition(triggerFailedRecord.topic(), triggerFailedRecord.partition()))); verify(mockConsumer, times(1)).commitAsync(any(), any()); verify(mockConsumer, never()).commitSync(any()); + assertThat(policy.forceSync()).isTrue(); + assertThat(policy.pendingAsyncCommitCount()).isZero(); // sync commit after the failed async commit final ConsumerRecord syncRecord = pendingRecords.get(1); - completeRecord(syncRecord); + TestingUtils.addCompleteRecordInPolicy(policy, syncRecord); assertThat(policy.tryCommit(true)) .hasSize(2) .containsExactlyInAnyOrderElementsOf(Arrays.asList( @@ -191,18 +245,7 @@ public void testForceSyncAfterAsyncCommitFailed() { verify(mockConsumer, times(1)).commitAsync(any(), any()); verify(mockConsumer, times(1)).commitSync(any()); - } - - private void completeRecord(ConsumerRecord record) { - policy.addPendingRecord(record); - policy.completeRecord(record); - } - - private List> preparePendingRecords(List partitions, int size) { - final List> pendingRecords = prepareConsumerRecords(partitions, 1, size); - assignPartitions(consumer, partitions, 0L); - fireConsumerRecords(consumer, pendingRecords); - consumer.poll(0); - return pendingRecords; + assertThat(policy.forceSync()).isFalse(); + assertThat(policy.pendingAsyncCommitCount()).isZero(); } } \ No newline at end of file diff --git a/src/test/java/cn/leancloud/kafka/consumer/PartialSyncCommitPolicyTest.java b/src/test/java/cn/leancloud/kafka/consumer/PartialSyncCommitPolicyTest.java index 290b77e..35b97ec 100644 --- a/src/test/java/cn/leancloud/kafka/consumer/PartialSyncCommitPolicyTest.java +++ b/src/test/java/cn/leancloud/kafka/consumer/PartialSyncCommitPolicyTest.java @@ -11,10 +11,12 @@ import java.time.Duration; import java.util.List; +import java.util.Map; import java.util.stream.IntStream; import static cn.leancloud.kafka.consumer.TestingUtils.*; import static java.util.stream.Collectors.toList; +import static java.util.stream.IntStream.range; import static org.assertj.core.api.Assertions.assertThat; public class PartialSyncCommitPolicyTest { @@ -34,18 +36,23 @@ public void tearDown() { @Test public void testNoCompleteRecords() { + final long nextRecommitNanos = policy.nextRecommitNanos(); final List partitions = toPartitions(IntStream.range(0, 30).boxed().collect(toList())); - preparePendingRecords(partitions, 1); + assignPartitions(consumer, partitions, 0L); + generateConsumedRecords(consumer, partitions, 1); assertThat(policy.tryCommit(true)).isEmpty(); for (TopicPartition partition : partitions) { assertThat(consumer.committed(partition)).isNull(); } + assertThat(policy.nextRecommitNanos()).isEqualTo(nextRecommitNanos); } @Test public void testPartialCommit() { + final long nextRecommitNanos = policy.nextRecommitNanos(); final List partitions = toPartitions(IntStream.range(0, 30).boxed().collect(toList())); - final List> pendingRecords = preparePendingRecords(partitions, 2); + assignPartitions(consumer, partitions, 0L); + final List> pendingRecords = generateConsumedRecords(consumer, partitions, 2); // two records for each partitions for (ConsumerRecord record : pendingRecords) { policy.addPendingRecord(record); @@ -74,13 +81,16 @@ public void testPartialCommit() { } assertThat(policy.completedTopicOffsets()).isEmpty(); + assertThat(policy.nextRecommitNanos()).isGreaterThan(nextRecommitNanos); } @Test public void testNoPendingFuturesLeft() { + final long nextRecommitNanos = policy.nextRecommitNanos(); final List partitions = toPartitions(IntStream.range(0, 30).boxed().collect(toList())); - final List> pendingRecords = preparePendingRecords(partitions, 2); + assignPartitions(consumer, partitions, 0L); // two records for each partitions + final List> pendingRecords = generateConsumedRecords(consumer, partitions,2); for (ConsumerRecord record : pendingRecords) { policy.addPendingRecord(record); policy.completeRecord(record); @@ -95,13 +105,35 @@ public void testNoPendingFuturesLeft() { assertThat(policy.topicOffsetHighWaterMark()).isEmpty(); assertThat(policy.completedTopicOffsets()).isEmpty(); + assertThat(policy.nextRecommitNanos()).isGreaterThan(nextRecommitNanos); } - private List> preparePendingRecords(List partitions, int size) { - final List> pendingRecords = prepareConsumerRecords(partitions, 1, size); - assignPartitions(consumer, partitions, 0L); - fireConsumerRecords(consumer, pendingRecords); - consumer.poll(0); - return pendingRecords; + @Test + public void testRecommit() throws Exception{ + policy = new PartialSyncCommitPolicy<>(consumer, Duration.ofMillis(200)); + long nextRecommitNanos = policy.nextRecommitNanos(); + assignPartitions(consumer, toPartitions(range(0, 30).boxed().collect(toList())), 0L); + + final List> prevRecords = generateConsumedRecords(consumer, toPartitions(range(0, 10).boxed().collect(toList())), 10); + final Map previousCommitOffsets = buildCommitOffsets(prevRecords); + addCompleteRecordsInPolicy(policy, prevRecords); + assertThat(policy.tryCommit(true)) + .containsExactlyInAnyOrderElementsOf(toPartitions(range(0, 10).boxed().collect(toList()))); + assertThat(policy.nextRecommitNanos()).isGreaterThan(nextRecommitNanos); + + Thread.sleep(200); + nextRecommitNanos = policy.nextRecommitNanos(); + final List> newRecords = generateConsumedRecords(consumer, toPartitions(range(10, 20).boxed().collect(toList())), 10); + final Map newCommitOffsets = buildCommitOffsets(newRecords); + newCommitOffsets.putAll(previousCommitOffsets); + + addCompleteRecordsInPolicy(policy, newRecords); + assertThat(policy.tryCommit(false)) + .containsExactlyInAnyOrderElementsOf(toPartitions(range(10, 20).boxed().collect(toList()))); + + for (Map.Entry entry : newCommitOffsets.entrySet()) { + assertThat(consumer.committed(entry.getKey())).isEqualTo(entry.getValue()); + } + assertThat(policy.nextRecommitNanos()).isGreaterThan(nextRecommitNanos); } } \ No newline at end of file diff --git a/src/test/java/cn/leancloud/kafka/consumer/SyncCommitPolicyTest.java b/src/test/java/cn/leancloud/kafka/consumer/SyncCommitPolicyTest.java index ca6d24b..96a61bd 100644 --- a/src/test/java/cn/leancloud/kafka/consumer/SyncCommitPolicyTest.java +++ b/src/test/java/cn/leancloud/kafka/consumer/SyncCommitPolicyTest.java @@ -9,11 +9,14 @@ import org.junit.Before; import org.junit.Test; +import java.time.Duration; import java.util.List; +import java.util.Map; import java.util.stream.IntStream; import static cn.leancloud.kafka.consumer.TestingUtils.*; import static java.util.stream.Collectors.toList; +import static java.util.stream.IntStream.range; import static org.assertj.core.api.Assertions.assertThat; public class SyncCommitPolicyTest { @@ -23,7 +26,7 @@ public class SyncCommitPolicyTest { @Before public void setUp() { consumer = new MockConsumer<>(OffsetResetStrategy.LATEST); - policy = new SyncCommitPolicy<>(consumer); + policy = new SyncCommitPolicy<>(consumer, Duration.ofHours(1)); } @After @@ -33,52 +36,74 @@ public void tearDown() { @Test public void testHavePendingRecords() { + final long nextRecommitNanos = policy.nextRecommitNanos(); final List partitions = toPartitions(IntStream.range(0, 30).boxed().collect(toList())); - final List> pendingRecords = prepareRecords(partitions); - for (ConsumerRecord record : pendingRecords) { - policy.addPendingRecord(record); - policy.completeRecord(record); - } + assignPartitions(consumer, partitions, 0L); + final List> pendingRecords = generateConsumedRecords(consumer, partitions); + addCompleteRecordsInPolicy(policy, pendingRecords); assertThat(policy.tryCommit(false)).isEmpty(); for (TopicPartition partition : partitions) { assertThat(consumer.committed(partition)).isNull(); } assertThat(policy.completedTopicOffsets()).isNotEmpty(); assertThat(policy.topicOffsetHighWaterMark()).isNotEmpty(); + assertThat(policy.nextRecommitNanos()).isEqualTo(nextRecommitNanos); } @Test public void testNoCompleteRecords() { + final long nextRecommitNanos = policy.nextRecommitNanos(); final List partitions = toPartitions(IntStream.range(0, 30).boxed().collect(toList())); - prepareRecords(partitions); + assignPartitions(consumer, partitions, 0L); + generateConsumedRecords(consumer, partitions); assertThat(policy.tryCommit(true)).isEmpty(); for (TopicPartition partition : partitions) { assertThat(consumer.committed(partition)).isNull(); } + assertThat(policy.nextRecommitNanos()).isEqualTo(nextRecommitNanos); } @Test public void testTryCommitAll() { + final long nextRecommitNanos = policy.nextRecommitNanos(); final List partitions = toPartitions(IntStream.range(0, 30).boxed().collect(toList())); - final List> pendingRecords = prepareRecords(partitions); - for (ConsumerRecord record : pendingRecords) { - policy.addPendingRecord(record); - policy.completeRecord(record); - } + assignPartitions(consumer, partitions, 0L); + final List> pendingRecords = generateConsumedRecords(consumer, partitions); + addCompleteRecordsInPolicy(policy, pendingRecords); assertThat(policy.tryCommit(true)).containsExactlyInAnyOrderElementsOf(partitions); for (TopicPartition partition : partitions) { assertThat(consumer.committed(partition)).isEqualTo(new OffsetAndMetadata(2)); } assertThat(policy.completedTopicOffsets()).isEmpty(); assertThat(policy.topicOffsetHighWaterMark()).isEmpty(); + assertThat(policy.nextRecommitNanos()).isGreaterThan(nextRecommitNanos); } - private List> prepareRecords(List partitions) { - // one msg for each partitions - final List> pendingRecords = prepareConsumerRecords(partitions, 1, 1); - assignPartitions(consumer, partitions, 0L); - fireConsumerRecords(consumer, pendingRecords); - consumer.poll(0); - return pendingRecords; + @Test + public void testRecommit() throws Exception{ + policy = new SyncCommitPolicy<>(consumer, Duration.ofMillis(200)); + long nextRecommitNanos = policy.nextRecommitNanos(); + assignPartitions(consumer, toPartitions(range(0, 30).boxed().collect(toList())), 0L); + + final List> prevRecords = generateConsumedRecords(consumer, toPartitions(range(0, 10).boxed().collect(toList())), 10); + final Map previousCommitOffsets = buildCommitOffsets(prevRecords); + addCompleteRecordsInPolicy(policy, prevRecords); + assertThat(policy.tryCommit(true)) + .containsExactlyInAnyOrderElementsOf(toPartitions(range(0, 10).boxed().collect(toList()))); + assertThat(policy.nextRecommitNanos()).isGreaterThan(nextRecommitNanos); + + Thread.sleep(200); + nextRecommitNanos = policy.nextRecommitNanos(); + final List> newRecords = generateConsumedRecords(consumer, toPartitions(range(10, 20).boxed().collect(toList())), 10); + final Map newCommitOffsets = buildCommitOffsets(newRecords); + newCommitOffsets.putAll(previousCommitOffsets); + + addCompleteRecordsInPolicy(policy, newRecords); + assertThat(policy.tryCommit(false)).isEmpty(); + + for (Map.Entry entry : newCommitOffsets.entrySet()) { + assertThat(consumer.committed(entry.getKey())).isEqualTo(entry.getValue()); + } + assertThat(policy.nextRecommitNanos()).isGreaterThan(nextRecommitNanos); } } \ No newline at end of file diff --git a/src/test/java/cn/leancloud/kafka/consumer/TestingUtils.java b/src/test/java/cn/leancloud/kafka/consumer/TestingUtils.java index 7e9b5a9..de68304 100644 --- a/src/test/java/cn/leancloud/kafka/consumer/TestingUtils.java +++ b/src/test/java/cn/leancloud/kafka/consumer/TestingUtils.java @@ -2,15 +2,15 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.function.Function; import java.util.stream.LongStream; +import static java.util.Comparator.comparing; +import static java.util.function.BinaryOperator.maxBy; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; @@ -61,4 +61,38 @@ static void fireConsumerRecords(MockConsumer consumer, Collection> generateConsumedRecords(MockConsumer consumer, List partitions) { + return generateConsumedRecords(consumer, partitions, 1); + } + + static List> generateConsumedRecords(MockConsumer consumer, List partitions, int size) { + // one msg for each partitions + final List> pendingRecords = prepareConsumerRecords(partitions, 1, size); + fireConsumerRecords(consumer, pendingRecords); + consumer.poll(0); + return pendingRecords; + } + + static Map buildCommitOffsets(List> records) { + final Map completeOffsets = new HashMap<>(); + for (ConsumerRecord record : records) { + completeOffsets.merge(new TopicPartition(testingTopic, record.partition()), + new OffsetAndMetadata(record.offset() + 1), + maxBy(comparing(OffsetAndMetadata::offset))); + } + return completeOffsets; + } + + static void addCompleteRecordInPolicy(CommitPolicy policy, ConsumerRecord record) { + policy.addPendingRecord(record); + policy.completeRecord(record); + } + + static List> addCompleteRecordsInPolicy(CommitPolicy policy, List> records) { + for (ConsumerRecord record : records) { + addCompleteRecordInPolicy(policy, record); + } + return records; + } } diff --git a/src/test/java/cn/leancloud/kafka/consumer/integration/Bootstrap.java b/src/test/java/cn/leancloud/kafka/consumer/integration/Bootstrap.java index fe6c32c..10e4b84 100644 --- a/src/test/java/cn/leancloud/kafka/consumer/integration/Bootstrap.java +++ b/src/test/java/cn/leancloud/kafka/consumer/integration/Bootstrap.java @@ -76,8 +76,10 @@ private static void cleanAllPartitions() { if (!consumer.assignment().isEmpty() && records.isEmpty()) { for (Map.Entry offsetsEntry : consumer.endOffsets(consumer.assignment()).entrySet()) { final OffsetAndMetadata committedOffset = consumer.committed(offsetsEntry.getKey()); - if (committedOffset.offset() != offsetsEntry.getValue()) { - continue outer; + if (committedOffset != null) { + if (committedOffset.offset() != offsetsEntry.getValue()) { + continue outer; + } } } @@ -226,7 +228,7 @@ public LcKafkaConsumer buildConsumer(String consumerName, TestS new IntegerDeserializer(), new StringDeserializer()) .workerPool(workerPool, false) - .forceWholeCommitInterval(Duration.ofSeconds(1)) + .recommitInterval(Duration.ofSeconds(1)) .buildPartialSync(); } } @@ -249,7 +251,7 @@ public LcKafkaConsumer buildConsumer(String consumerName, TestS handler, new IntegerDeserializer(), new StringDeserializer()) - .forceWholeCommitInterval(Duration.ofSeconds(1)) + .recommitInterval(Duration.ofSeconds(1)) .buildPartialSync(); } } @@ -318,7 +320,7 @@ public LcKafkaConsumer buildConsumer(String consumerName, TestS new IntegerDeserializer(), new StringDeserializer()) .workerPool(workerPool, false) - .forceWholeCommitInterval(Duration.ofSeconds(1)) + .recommitInterval(Duration.ofSeconds(1)) .buildAsync(); } } @@ -341,7 +343,7 @@ public LcKafkaConsumer buildConsumer(String consumerName, TestS handler, new IntegerDeserializer(), new StringDeserializer()) - .forceWholeCommitInterval(Duration.ofSeconds(1)) + .recommitInterval(Duration.ofSeconds(1)) .buildAsync(); } }