在线精品99_中国九九盗摄偷拍偷看_91免费版在线观看_91.app_91高清视频在线_99热最新网站

IBatchSpout API怎么使用

150次阅读
没有评论

共计 2678 个字符,预计需要花费 7 分钟才能阅读完成。

这篇文章主要介绍“IBatchSpout API 怎么使用”,在日常操作中,相信很多人在 IBatchSpout API 怎么使用问题上存在疑惑,丸趣 TV 小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”IBatchSpout API 怎么使用”的疑惑有所帮助!接下来,请跟着丸趣 TV 小编一起来学习吧!

IBatchSpout 是 storm trident 推出的一种可以批量发射的 Spout。非事务性,基本的 spout

1:Map getComponentConfiguration(); 定义配置,可以用 backtype.storm.Config。

2:void open(Map conf, TopologyContext context); Spout 的初始化方法,参数 conf 即是 getComponentConfiguration 定义的配置

3:Fields getOutputFields(); 声明输出的 fields

4:void emitBatch(long batchId, TridentCollector collector); 批量发射 tuple,本次的批次号为 batchId

5:void ack(long batchId); 批次号为 batchId 的数据处理成功

6:  void close();

一个例子

package storm.projectA;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.IBatchSpout;
import backtype.storm.Config;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class MySpout implements IBatchSpout{
 /**
 * 
 */
 private static final long serialVersionUID = 1L;
 private long maxBatchSize;// 每批次最大的数量
 private BufferedReader br;// 源文件流
 HashMap Long, List List Object  batches = new HashMap Long, List List Object // 保存发送过的所有数据,以便于重复发送
 /**
 * @param conf  配置
 * @param context 
 */
 @Override
 public void open(Map conf, TopologyContext context) { String filePath = (String)conf.get( filePath 
 maxBatchSize = (Long)conf.get( maxBatchSize 
 try { br = new BufferedReader(new FileReader(filePath));
 } catch (FileNotFoundException e) { e.printStackTrace();
 }
 }
 /*** spout 的发送方法
 * @param batchId  批次 id
 * @param collector  批量发射器
 */
 @Override
 public void emitBatch(long batchId, TridentCollector collector) { List List Object  batch = batches.get(batchId);
 if (batch == null) { batch = new ArrayList List Object ();
 for (int i = 0; i   maxBatchSize; i++) {
 try { String line = br.readLine();
 if(line == null){
 break;
 }
 batch.add(new Values(line));
 } catch (IOException e) { e.printStackTrace();
 }
 }
 }
 for(List Object  list : batch){ collector.emit(list);
 }
 }
 @Override
 public void ack(long batchId) { batches.remove(batchId);
 }
 /**
 * close  方法
 */
 @Override
 public void close() { if(br!=null){
 try { br.close();
 } catch (IOException e) { e.printStackTrace();
 }
 }
 
 }
 @Override
 public Map getComponentConfiguration() { Config conf = new Config();
 // 最大并行度   本地模式设置为 1
 conf.setMaxTaskParallelism(1);
 conf.put( filePath ,  D:\\aaa.txt 
 conf.put(maxBatchSize , 2);
 return conf;
 }
 /**
 *  输出的 fileds
 */
 @Override
 public Fields getOutputFields() {
 return new Fields( sentence 
 }
}

到此,关于“IBatchSpout API 怎么使用”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注丸趣 TV 网站,丸趣 TV 小编会继续努力为大家带来更多实用的文章!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计2678字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)
主站蜘蛛池模板: 亚洲精品无码高潮喷水a片软 | 国产99视频精品免视看9 | 91免费视频网 | 成人免费视频网站www | 中文人妻熟妇乱又伦精品 | 欧美精品国产综合久久 | 欧美日韩色综合网站 | 一及黄色毛片 | 国产精品av免费观看 | 天天爽夜夜爽人人爽曰 | 亚洲入口 | 亚洲最大福利视频 | 99精品久久99久久久久久 | 国产精品毛片无遮挡高清 | 一级黄色片在线 | 99久久精品国产综合男同 | 草久在线| 久久国产精品99久久小说 | 国产精品123区 | 妺妺跟我一起洗澡没忍住 | 四虎永久在线观看免费网站网址 | 日本天堂视频在线观看 | 中老年熟妇激情啪啪大屁股 | 亚洲综合国产成人丁香五月激情 | 69精品丰满人妻无码视频a片 | 亚洲av永久无码精品无码流畅 | 欧美色黄| 一级做a爱片久久毛片 | 妺妺窝人体色www看美女 | 一级黄色欧美片 | 欧美色碰碰碰免费观看长视频 | 亚洲精品久久玖玖玖玖 | 国产成人福利免费观看 | 蜜桃bt天堂精品视频在线观看 | 国产免费福利网站 | 色一情一乱一伦 | 日韩欧美一区二区三区不卡 | 亚洲手机在线手机观看高清hd | 欧美精品观看 | 一级片aaaaaa| 国产精品毛片一区二区三区 |