您好,登录后才能下订单哦!
密码登录
登录注册
点击 登录注册 即表示同意《亿速云用户服务条款》
这篇文章主要讲解了“Storm如何接收数据”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Storm如何接收数据”吧!
简要的模拟如何接收数据:
package com.cc.storm.spout; import java.io.IOException; import java.util.Map; import java.util.Random; import java.util.concurrent.LinkedBlockingQueue; import org.apache.log4j.Logger; import redis.clients.jedis.JedisPubSub; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; public class RandomEmitSpout extends BaseRichSpout { private Random _random; private static final long serialVersionUID = 4092527421163270357L; static Logger LOG = Logger.getLogger(RandomEmitSpout.class); private SpoutOutputCollector _collector; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; _random = new Random(); } @Override public void nextTuple() { try { Thread.sleep(1000); } catch (Exception e) { e.printStackTrace(); } String[] userIds = { "1", "2", "3", "4" }; String[] merchandiseIDS = { "1" }; _collector.emit(new Values(userIds[_random.nextInt(userIds.length)], merchandiseIDS[_random.nextInt(merchandiseIDS.length)])); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // TODO Auto-generated method stub declarer.declare(new Fields("userIdS", "merchandiseIDS")); } @Override public void close() { } }
plus: 如果您采用的是Redis
那么:
package com.cc.storm.spout;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.log4j.Logger;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisPubSub;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
public class RedisPubSubSpout extends BaseRichSpout {
/**
* @Fields serialVersionUID : TODO
*/
private static final long serialVersionUID = 4092527421163270357L;
static Logger LOG = Logger.getLogger(RedisPubSubSpout.class);
private SpoutOutputCollector _collector;
private final String host;
private final int port;
private final String pattern;
LinkedBlockingQueue<String> queue;
JedisPool pool;
public RedisPubSubSpout(String host, int port, String pattern) {
// TODO Auto-generated constructor stub
this.host = host;
this.port = port;
this.pattern = pattern;
}
// 监听线程,从redis订阅的兴趣事件中获取数据
class ListenerThread extends Thread {
private LinkedBlockingQueue<String> queue;
JedisPool pool;
String pattern;
public ListenerThread(LinkedBlockingQueue<String> queue,
JedisPool pool, String pattern) {
// TODO Auto-generated constructor stub
this.queue = queue;
this.pool = pool;
this.pattern = pattern;
}
@Override
public void run() {
JedisPubSub listener = new JedisPubSub() {
@Override
public void onUnsubscribe(String arg0, int arg1) {
// TODO Auto-generated method stub
}
@Override
public void onSubscribe(String arg0, int arg1) {
// TODO Auto-generated method stub
}
@Override
public void onPUnsubscribe(String arg0, int arg1) {
// TODO Auto-generated method stub
}
@Override
public void onPSubscribe(String arg0, int arg1) {
// TODO Auto-generated method stub
}
@Override
public void onPMessage(String pattern, String channel,
String message) {
// TODO Auto-generated method stub
queue.offer(message);
}
@Override
public void onMessage(String channel, String message) {
// TODO Auto-generated method stub
queue.offer(message);
}
};
Jedis jedis = pool.getResource();
try {
jedis.psubscribe(listener, pattern);
} finally {
pool.returnResource(jedis);
}
}
}
@SuppressWarnings("rawtypes")
@Override
public void open(Map conf, TopologyContext context,
SpoutOutputCollector collector) {
// TODO Auto-generated method stub
_collector = collector;
// 队列最大支持1000个
queue = new LinkedBlockingQueue<String>(1000);
JedisPoolConfig config = new JedisPoolConfig();
// error
pool = null;
ListenerThread listener = new ListenerThread(queue, pool, pattern);
// 启动线程
listener.start();
}
@Override
public void nextTuple() {
// TODO Auto-generated method stub
String ret = queue.poll();
if (null == ret) {
// 如果队列中暂无数据可取,休息500ms
Utils.sleep(500);
} else {
// 数据格式为 “userID:merchandiseID”,可以依据需求更改此处
String[] s = ret.split(":");
_collector.emit(new Values(s[0], s[1]));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
declarer.declare(new Fields("userIdS", "merchandiseIDS"));
}
@Override
public void close() {
// TODO Auto-generated method stub
pool.destroy();
}
}感谢各位的阅读,以上就是“Storm如何接收数据”的内容了,经过本文的学习后,相信大家对Storm如何接收数据这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。