在线精品99_中国九九盗摄偷拍偷看_91免费版在线观看_91.app_91高清视频在线_99热最新网站

Storm

133次阅读
没有评论

共计 4003 个字符,预计需要花费 11 分钟才能阅读完成。

Storm-kafka 中如何理解 ZkCoordinator 的过程,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。

梳理 ZkCoordinator 的过程

package com.mixbox.storm.kafka;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mixbox.storm.kafka.trident.GlobalPartitionInformation;
import java.util.*;
import static com.mixbox.storm.kafka.KafkaUtils.taskId;
 * 
 * 
 * ZKCoordinator  协调器
 * 
 * @author Yin Shuai
 */
public class ZkCoordinator implements PartitionCoordinator {
 public static final Logger LOG = LoggerFactory
 .getLogger(ZkCoordinator.class);
 SpoutConfig _spoutConfig;
 int _taskIndex;
 int _totalTasks;
 String _topologyInstanceId;
 //  每一个分区对应着一个分区管理器
 Map Partition, PartitionManager  _managers = new HashMap();
 // 缓存的 List
 List PartitionManager  _cachedList;
 // 上次刷新的时间
 Long _lastRefreshTime = null;
 // 刷新频率   毫秒
 int _refreshFreqMs;
 // 动态分区连接
 DynamicPartitionConnections _connections;
 // 动态 BrokersReader
 DynamicBrokersReader _reader;

public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) { this(connections, stormConf, spoutConfig, state, taskIndex, totalTasks, topologyInstanceId, buildReader(stormConf, spoutConfig)); public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId, DynamicBrokersReader reader) { _spoutConfig = spoutConfig; _connections = connections; _taskIndex = taskIndex; _totalTasks = totalTasks; _topologyInstanceId = topologyInstanceId; _stormConf = stormConf; _state = state; ZkHosts brokerConf = (ZkHosts) spoutConfig.hosts; _refreshFreqMs = brokerConf.refreshFreqSecs * 1000; _reader = reader;  * @param stormConf  * @param spoutConfig  * @return  */ private static DynamicBrokersReader buildReader(Map stormConf, SpoutConfig spoutConfig) {ZkHosts hosts = (ZkHosts) spoutConfig.hosts; return new DynamicBrokersReader(stormConf, hosts.brokerZkStr, hosts.brokerZkPath, spoutConfig.topic); @Override public List PartitionManager  getMyManagedPartitions() { if (_lastRefreshTime == null || (System.currentTimeMillis() - _lastRefreshTime)   _refreshFreqMs) {refresh(); _lastRefreshTime = System.currentTimeMillis(); return _cachedList;  *  简单的刷新的行为  *   */ void refresh() { try {LOG.info(taskId(_taskIndex, _totalTasks) +  Refreshing partition manager connections //  拿到所有的分区信息 GlobalPartitionInformation brokerInfo = _reader.getBrokerInfo(); //  拿到自己任务的所有分区 List Partition  mine = KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex); //  拿到当前任务的分区 Set Partition  curr = _managers.keySet(); //  构造一个集合 Set Partition  newPartitions = new HashSet Partition (mine); //  在 new 分区中,移除掉所有   自己拥有的分区 newPartitions.removeAll(curr); //  要删除的分区 Set Partition  deletedPartitions = new HashSet Partition (curr); deletedPartitions.removeAll(mine); LOG.info(taskId(_taskIndex, _totalTasks) +  Deleted partition managers:  + deletedPartitions.toString()); for (Partition id : deletedPartitions) {PartitionManager man = _managers.remove(id); man.close(); LOG.info(taskId(_taskIndex, _totalTasks) +  New partition managers:   + newPartitions.toString()); for (Partition id : newPartitions) { PartitionManager man = new PartitionManager(_connections, _topologyInstanceId, _state, _stormConf, _spoutConfig, id); _managers.put(id, man); } catch (Exception e) {throw new RuntimeException(e); _cachedList = new ArrayList PartitionManager (_managers.values()); LOG.info(taskId(_taskIndex, _totalTasks) +  Finished refreshing @Override public PartitionManager getManager(Partition partition) {return _managers.get(partition); }

   1:首先 ZKCoorDinator 实现  PartitionCoordinator 的接口

package com.mixbox.storm.kafka;
import java.util.List;
 * @author Yin Shuai
 */
public interface PartitionCoordinator { *  拿到我管理的分区列表  List{PartitionManager}
  * @return
  */
 List PartitionManager  getMyManagedPartitions();

PartitionManager getManager(Partition partition); }

          第一个方法拿到所有的   PartitionManager

          第二个方法依据特定的   Partition 去得到一个分区管理器

关于 Storm-kafka 中如何理解 ZkCoordinator 的过程问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注丸趣 TV 行业资讯频道了解更多相关知识。

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-17发表,共计4003字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)
主站蜘蛛池模板: 精品无码人妻一区二区三区不卡 | 香蕉成人啪国产精品视频综合网 | 国产午夜精品一区二区三区四区 | 亚洲欧美综合一区二区三区四区 | 久草成人在线视频 | 久草免费色站 | 亚洲二区在线播放 | 亚欧色视频在线观看免费 | 国产三级在线视频播放线 | 四虎影院视频在线观看 | 一区二区三区精品国产欧美 | 亚洲天天看 | 久草在线中文888 | 欧美性a爱片 | 亚洲 都市 校园 激情 另类 | 免费观看成人欧美1314www | 亚洲国产精品第一区二区 | 亚洲国产精品无码久久一线 | 久久精品丝袜高跟鞋 | 国产在线拍偷自揄观看视频网站 | 免费va国产高清大片在线 | 日韩精品欧美在线 | 精品日产 | 狼色精品人妻在线视频免费 | 久草综合在线视频 | 国产精品不卡高清在线观看 | 国内精品视频九九九九 | 国产欧美一区二区三区在线 | 欧美激情视频一区二区三区免费 | 美日韩精品 | 国产成人在线视频免费观看 | 亚洲 欧美 中文 日韩aⅴ | 国产91精品一区二区视色 | 久久九九精品一区二区 | 一级黄网 | 正在播放重口老熟女露脸 | 人妻系列无码专区无码中出 | 国产欧美视频在线观看 | 久久天天躁狠狠躁夜夜躁2014 | 久草日b视频一二三区 | 国产精品久久久国产盗摄 |