在线精品99_中国九九盗摄偷拍偷看_91免费版在线观看_91.app_91高清视频在线_99热最新网站

kafka的low

190次阅读
没有评论

共计 6776 个字符,预计需要花费 17 分钟才能阅读完成。

这篇文章主要介绍“kafka 的 low-level consumer 怎么使用”,在日常操作中,相信很多人在 kafka 的 low-level consumer 怎么使用问题上存在疑惑,丸趣 TV 小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”kafka 的 low-level consumer 怎么使用”的疑惑有所帮助!接下来,请跟着丸趣 TV 小编一起来学习吧!

一、什么时候用这个接口?

     1)Read a message multiple times

     2)Consume only a subset of the partitions in a topic in a process

    3)Manage transactions to make sure a message is processed once and only once

二、使用 SimpleConsumer 的步骤:

1)Find an active Broker and find out which Broker is the leader for your topic and partition

2)Determine who the replica Brokers are for your topic and partition

3)Build the request defining what data you are interested in

4)Fetch the data ,Identify and recover from leader changes

首先,你必须知道读哪个 topic 的哪个 partition 

然后,找到负责该 partition 的 broker leader,从而找到存有该 partition 副本的那个 broker 

再者,自己去写 request 并 fetch 数据  

最终,还要注意需要识别和处理 broker leader 的改变

三、代码如下:

package kafkatest.kakfademo;

import java.nio.ByteBuffer;

import java.util.ArrayList;

import java.util.Collections;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import kafka.api.FetchRequest;

import kafka.api.FetchRequestBuilder;

import kafka.api.PartitionOffsetRequestInfo;

import kafka.cluster.BrokerEndPoint;

import kafka.common.ErrorMapping;

import kafka.common.TopicAndPartition;

import kafka.javaapi.FetchResponse;

import kafka.javaapi.OffsetResponse;

import kafka.javaapi.PartitionMetadata;

import kafka.javaapi.TopicMetadata;

import kafka.javaapi.TopicMetadataRequest;

import kafka.javaapi.consumer.SimpleConsumer;

import kafka.message.MessageAndOffset;

public class SimpleExample {

public static void main(String args[]) {

SimpleExample example = new SimpleExample();

long maxReads = 10;

String topicName = test

int partition = 0;

List String seeds = new ArrayList String

seeds.add(hadoop0

int port = 9092;

try {

example.run(maxReads, topicName, partition, seeds, port);

} catch (Exception e) {

System.out.println(Oops: + e);

e.printStackTrace();

}

}

private List String m_replicaBrokers = new ArrayList String

public SimpleExample() {

m_replicaBrokers = new ArrayList String

}

public void run(long a_maxReads, String a_topic, int a_partition,

List String a_seedBrokers, int a_port) throws Exception {

// find the meta data about the topic and partition we are interested in

//

PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic,

a_partition);

if (metadata == null) {

System.out

.println(Can t find metadata for Topic and Partition. Exiting

return;

}

if (metadata.leader() == null) {

System.out

.println(Can t find Leader for Topic and Partition. Exiting

return;

}

String leadBroker = metadata.leader().host();

String clientName = Client_ + a_topic + _ + a_partition;

SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port,

100000, 64 * 1024, clientName);

long readOffset = getLastOffset(consumer, a_topic, a_partition,

kafka.api.OffsetRequest.EarliestTime(), clientName);

int numErrors = 0;

while (a_maxReads 0) {

if (consumer == null) {

consumer = new SimpleConsumer(leadBroker, a_port, 100000,

64 * 1024, clientName);

}

// Note: this fetchSize of 100000 might need to be increased if

// large batches are written to Kafka

FetchRequest req = new FetchRequestBuilder().clientId(clientName)

.addFetch(a_topic, a_partition, readOffset, 100000).build();

FetchResponse fetchResponse = consumer.fetch(req);

if (fetchResponse.hasError()) {

numErrors++;

// Something went wrong!

short code = fetchResponse.errorCode(a_topic, a_partition);

System.out.println(Error fetching data from the Broker:

+ leadBroker + Reason: + code);

if (numErrors 5)

break;

if (code == ErrorMapping.OffsetOutOfRangeCode()) {

// We asked for an invalid offset. For simple case ask for

// the last element to reset

readOffset = getLastOffset(consumer, a_topic, a_partition,

kafka.api.OffsetRequest.LatestTime(), clientName);

continue;

}

consumer.close();

consumer = null;

leadBroker = findNewLeader(leadBroker, a_topic, a_partition,

a_port);

continue;

}

numErrors = 0;

long numRead = 0;

for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(

a_topic, a_partition)) {

long currentOffset = messageAndOffset.offset();

if (currentOffset readOffset) {

System.out.println(Found an old offset: + currentOffset

+ Expecting: + readOffset);

continue;

}

readOffset = messageAndOffset.nextOffset();

ByteBuffer payload = messageAndOffset.message().payload();

byte[] bytes = new byte[payload.limit()];

payload.get(bytes);

System.out.println(String.valueOf(messageAndOffset.offset())

+ : + new String(bytes, UTF-8));

numRead++;

a_maxReads–;

}

if (numRead == 0) {

try {

Thread.sleep(1000);

} catch (InterruptedException ie) {

}

}

}

if (consumer != null)

consumer.close();

}

public static long getLastOffset(SimpleConsumer consumer, String topic,

int partition, long whichTime, String clientName) {

TopicAndPartition topicAndPartition = new TopicAndPartition(topic,

partition);

Map TopicAndPartition, PartitionOffsetRequestInfo requestInfo = new HashMap TopicAndPartition, PartitionOffsetRequestInfo

requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(

whichTime, 1));

kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(

requestInfo, kafka.api.OffsetRequest.CurrentVersion(),

clientName);

OffsetResponse response = consumer.getOffsetsBefore(request);

if (response.hasError()) {

System.out

.println(Error fetching data Offset Data the Broker. Reason:

+ response.errorCode(topic, partition));

return 0;

}

long[] offsets = response.offsets(topic, partition);

return offsets[0];

}

private String findNewLeader(String a_oldLeader, String a_topic,

int a_partition, int a_port) throws Exception {

for (int i = 0; i i++) {

boolean goToSleep = false;

PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port,

a_topic, a_partition);

if (metadata == null) {

goToSleep = true;

} else if (metadata.leader() == null) {

goToSleep = true;

} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host())

i == 0) {

// first time through if the leader hasn t changed give

// ZooKeeper a second to recover

// second time, assume the broker did recover before failover,

// or it was a non-Broker issue

//

goToSleep = true;

} else {

return metadata.leader().host();

}

if (goToSleep) {

try {

Thread.sleep(1000);

} catch (InterruptedException ie) {

}

}

}

System.out

.println(Unable to find new leader after Broker failure. Exiting

throw new Exception(

Unable to find new leader after Broker failure. Exiting

}

private PartitionMetadata findLeader(List String a_seedBrokers,

int a_port, String a_topic, int a_partition) {

PartitionMetadata returnMetaData = null;

loop: for (String seed : a_seedBrokers) {

SimpleConsumer consumer = null;

try {

consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024,

leaderLookup

List String topics = Collections.singletonList(a_topic);

TopicMetadataRequest req = new TopicMetadataRequest(topics);

kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

List TopicMetadata metaData = resp.topicsMetadata();

for (TopicMetadata item : metaData) {

for (PartitionMetadata part : item.partitionsMetadata()) {

if (part.partitionId() == a_partition) {

returnMetaData = part;

break loop;

}

}

}

} catch (Exception e) {

System.out.println(Error communicating with Broker [ + seed

+ ] to find Leader for [+ a_topic + ,

+ a_partition + ] Reason: + e);

} finally {

if (consumer != null)

consumer.close();

}

}

if (returnMetaData != null) {

m_replicaBrokers.clear();

for (BrokerEndPoint replica : returnMetaData.replicas()) {

m_replicaBrokers.add(replica.host());

}

}

return returnMetaData;

}

}

四、消费结果如下:

 

 

到此,关于“kafka 的 low-level consumer 怎么使用”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注丸趣 TV 网站,丸趣 TV 小编会继续努力为大家带来更多实用的文章!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计6776字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)
主站蜘蛛池模板: 国产精品久久久久毛片 | 国产一级特黄特色aa毛片 | 亚洲国产成人精品小蝌蚪 | 男女啪啪的网站 | 伊人四虎| 一级欧美毛片成人免费视频 | 国产精品免费aⅴ片在线观看 | 国产av激情无码久久 | 亚洲精品无码永久在线观看 | 插b内射18免费视频 插吧插吧综合网 | 久久精品无码鲁网中文电影 | 中文字幕亚洲精品无码 | 国产成人vr精品a视频 | 黄色毛片大全 | 日本理论片午午伦夜理片2021 | 色欲天天天天天综合网 | 久久精品国产在爱久久 | 国产20岁美女一级毛片 | 亚洲浮力影院久久久久久 | 久久这里只精品 | 国产成人精品视频 | 9丨精品国产高清自在线看 a 'v片欧美日韩在线 | 日本一区二区在线不卡 | 男女做爰猛烈吃奶啪啪喷水网站 | 成人一区二区三区视频在线观看 | 欧美一区二区三区性 | 中文字幕精品在线 | 97视频在线观看播放 | 九九爱这里只有精品 | 欧美一级毛片免费高清aa | 毛片网站在线观看 | 亚洲综合无码无在线观看 | 国产综合精品在线 | 99精品福利视频在线一区 | 久久久久亚洲av无码a片 | 精品免费久久久久久影院 | 久久国产精品鲁丝片 | 99久久亚洲精品无码毛片 | 美女视频久久 | 九色最新网址 | 久久精品亚洲中文字幕无码网站 |