-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathScanRedisSpout.java
More file actions
144 lines (122 loc) · 4.52 KB
/
ScanRedisSpout.java
File metadata and controls
144 lines (122 loc) · 4.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package spout;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.*;
import java.util.*;
/**
* Created by on 2018/5/24.
* I designed this spout for this reason:prevent one key from being consumered by different bolt thread (the next bolt should * use shuffingfield pattern),i want one key consumered by on bolt thread.
*/
public class ScanRedisSpout extends BaseRichSpout {
private static final Logger LOGGER = LoggerFactory.getLogger(ScanRedisSpout.class);
private static final String REDIS_CONFIG_KEY = "redisConfigKey";
private static final String PREFIX_KEY = "prefixConfigKey";
private SpoutOutputCollector collector;
private JedisCluster jedisCluster;
private ScanParams scanParams;
private String STPRE;
private Set<String> allKeys;
private List<JedisScanBean> jedisList;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
collector = spoutOutputCollector;
jedisCluster = JedisClusterConnection.createJedisCluster((Map<String, Object>) map.get(REDIS_CONFIG_KEY));
Map prefix = (Map) map.get(PREFIX_KEY);
STPRE = prefix.get(StormTpoConfig.SESSION_TIME_PREFIX_CONFIG).toString();
scanParams = new ScanParams().count(1000).match(STPRE + "*");
allKeys = new HashSet<>();
jedisList = getJedisScanBean(jedisCluster);
}
@Override
public void nextTuple() {
try {
if (allKeys.size() == 0) {
getKeys();
if (allKeys.size() != 0) {
allKeys.stream().forEach(key -> collector.emit(new Values(key), key));
}
} else {
}
} catch (Exception e) {
LOGGER.info("error:{}", e);
allKeys = new HashSet<>();
}
}
private void getKeys() {
for (JedisScanBean jedisScanBean : jedisList) {
ScanResult<String> scanResult = jedisScanBean.getJedis().scan(jedisScanBean.getCursor(), jedisScanBean.getScanParams());
allKeys.addAll(scanResult.getResult());
jedisScanBean.setCursor(scanResult.getStringCursor());
}
}
@Override
public void ack(Object msgId) {
super.ack(msgId);
LOGGER.info("ack:{}", msgId);
allKeys.remove(msgId);
}
@Override
public void fail(Object msgId) {
super.fail(msgId);
LOGGER.info("fail:{}", msgId);
allKeys.remove(msgId);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("key"));
}
/**
* 在redis集群的各个主节点查询key,并将结果集放入set中
* get key from every redis cluster nodes and put all keys in a SET
* @param jedisCluster
* @return
*/
private List<JedisScanBean> getJedisScanBean(JedisCluster jedisCluster) {
Map<String, JedisPool> nodes = jedisCluster.getClusterNodes();
Set<String> keySet = new HashSet<>();
List<JedisScanBean> list = new ArrayList<>();
String cursor = ScanParams.SCAN_POINTER_START;
for (Map.Entry<String, JedisPool> entry : nodes.entrySet()) {
Jedis jedis = entry.getValue().getResource();
JedisScanBean bean = new JedisScanBean(jedis, scanParams, cursor);
list.add(bean);
}
return list;
}
class JedisScanBean {
private Jedis jedis;
private ScanParams scanParams;
private String cursor;
JedisScanBean() {
}
JedisScanBean(Jedis jedis, ScanParams scanParams, String cursor) {
this.jedis = jedis;
this.scanParams = scanParams;
this.cursor = cursor;
}
public Jedis getJedis() {
return jedis;
}
public void setJedis(Jedis jedis) {
this.jedis = jedis;
}
public ScanParams getScanParams() {
return scanParams;
}
public void setScanParams(ScanParams scanParams) {
this.scanParams = scanParams;
}
public String getCursor() {
return cursor;
}
public void setCursor(String cursor) {
this.cursor = cursor;
}
}
}