docker Desktop安装storm,并计算股票实时数据
3.在浏览器浏览locallhost:8080。2.在终端运行storm.yml文件。如果出现这个页面,storm搭建完成。1.编写Storm.yml文件。
·
1.编写Storm.yml文件
version: '2'
services:
zookeeper1:
image: registry.aliyuncs.com/denverdino/zookeeper:3.4.8
container_name: zk1.cloud
environment:
- SERVER_ID=1
- ADDITIONAL_ZOOKEEPER_1=server.1=0.0.0.0:2888:3888
- ADDITIONAL_ZOOKEEPER_2=server.2=zk2.cloud:2888:3888
- ADDITIONAL_ZOOKEEPER_3=server.3=zk3.cloud:2888:3888
zookeeper2:
image: registry.aliyuncs.com/denverdino/zookeeper:3.4.8
container_name: zk2.cloud
environment:
- SERVER_ID=2
- ADDITIONAL_ZOOKEEPER_1=server.1=zk1.cloud:2888:3888
- ADDITIONAL_ZOOKEEPER_2=server.2=0.0.0.0:2888:3888
- ADDITIONAL_ZOOKEEPER_3=server.3=zk3.cloud:2888:3888
zookeeper3:
image: registry.aliyuncs.com/denverdino/zookeeper:3.4.8
container_name: zk3.cloud
environment:
- SERVER_ID=3
- ADDITIONAL_ZOOKEEPER_1=server.1=zk1.cloud:2888:3888
- ADDITIONAL_ZOOKEEPER_2=server.2=zk2.cloud:2888:3888
- ADDITIONAL_ZOOKEEPER_3=server.3=0.0.0.0:2888:3888
ui:
image: registry.aliyuncs.com/denverdino/baqend-storm:1.0.0
command: ui -c nimbus.host=nimbus
environment:
- STORM_ZOOKEEPER_SERVERS=zk1.cloud,zk2.cloud,zk3.cloud
restart: always
container_name: ui
ports:
- 8080:8080
depends_on:
- nimbus
nimbus:
image: registry.aliyuncs.com/denverdino/baqend-storm:1.0.0
command: nimbus -c nimbus.host=nimbus
restart: always
environment:
- STORM_ZOOKEEPER_SERVERS=zk1.cloud,zk2.cloud,zk3.cloud
container_name: nimbus
ports:
- 6627:6627
supervisor:
image: registry.aliyuncs.com/denverdino/baqend-storm:1.0.0
command: supervisor -c nimbus.host=nimbus -c supervisor.slots.ports=[6700,6701,6702,6703]
restart: always
environment:
- affinity:role!=supervisor
- STORM_ZOOKEEPER_SERVERS=zk1.cloud,zk2.cloud,zk3.cloud
depends_on:
- nimbus
networks:
default:
external:
name: zk-net
2.在终端运行storm.yml文件
docker-compose -f storm.yml up -d
3.在浏览器浏览locallhost:8080
如果出现这个页面,storm搭建完成
4.编写spout
package com.storm.spout;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.io.*;
import java.util.Map;
@SuppressWarnings("serial")
public class FileDataReaderSpout extends BaseRichSpout {
private SpoutOutputCollector spoutOutputCollector;
private String filename;
private BufferedReader bufferedReader;
@SuppressWarnings("rawtypes")
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.spoutOutputCollector = collector;
this.filename = "股票数据1.csv"; // 替换成你要读取的文件路径
try {
this.bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(filename), "GBK"));
} catch (IOException e) {
throw new RuntimeException("Error reading file: " + filename, e);
}
}
public void nextTuple() {
try {
String line = bufferedReader.readLine();
if (line != null) {
spoutOutputCollector.emit(new Values(line.trim().toLowerCase()));
} else {
Utils.sleep(100); // 如果文件读取完毕,则等待一段时间后再次检查
}
} catch (IOException e) {
throw new RuntimeException("Error reading tuple from file", e);
}
}
public void ack(Object id) {
// 消息保证机制中的ack确认方法,可以为空
}
public void fail(Object id) {
// 消息保证机制中的fail确认方法,可以为空
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public void close() {
try {
bufferedReader.close();
} catch (IOException e) {
throw new RuntimeException("Error closing file: " + filename, e);
}
}
}
5.编写bolt
package com.storm.bolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
public class OutputBolt extends BaseBasicBolt {
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String stockType = input.getStringByField("stockType");
int tradeVolume = input.getIntegerByField("totalTradeVolume");
double tradeAmount = input.getDoubleByField("totalTradeAmount");
System.out.println("stock Type: " + stockType + ", Trade Volume: " + tradeVolume + ", Trade Amount: " + tradeAmount);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// 不会发射任何数据,因为这个Bolt只是用来输出结果
}
}
package com.storm.bolt;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Tuple;
import java.util.HashMap;
import java.util.Map;
public class StockTradeBolt extends BaseBasicBolt {
private Map<String, Integer> stockTypeTotalVolumeMap;
private Map<String, Double> stockTypeTotalAmountMap;
@Override
public void prepare(Map stormConf, TopologyContext context) {
stockTypeTotalVolumeMap = new HashMap<>();
stockTypeTotalAmountMap = new HashMap<>();
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String line = tuple.getString(0);
String[] fields = line.split(",");
if (fields.length >= 9 && isNumeric(fields[3]) && isNumeric(fields[4])) {
String stockType = fields[2]; // 假设股票类型字段在数组中的索引为2
int tradeVolume = Integer.parseInt(fields[4]);
double price = Double.parseDouble(fields[3]);
double tradeAmount = price * tradeVolume;
stockTypeTotalVolumeMap.merge(stockType, tradeVolume, Integer::sum);
stockTypeTotalAmountMap.merge(stockType, tradeAmount, Double::sum);
collector.emit(new Values(stockType, stockTypeTotalVolumeMap.get(stockType), stockTypeTotalAmountMap.get(stockType)));
} else {
// 非数字字段的处理逻辑,比如记录日志或者丢弃这条数据
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("stockType", "totalTradeVolume", "totalTradeAmount"));
}
@Override
public void cleanup() {
for (Map.Entry<String, Integer> entry : stockTypeTotalVolumeMap.entrySet()) {
String stockType = entry.getKey();
int totalVolume = entry.getValue();
double totalAmount = stockTypeTotalAmountMap.get(stockType);
System.out.println("Stock Type: " + stockType + ", Total Volume: " + totalVolume + ", Total Amount: " + totalAmount);
}
}
private boolean isNumeric(String str) {
try {
Double.parseDouble(str);
return true;
} catch (NumberFormatException e) {
return false;
}
}
}
6.编写topology
package com.storm.wordcount;
import com.storm.bolt.OutputBolt;
import com.storm.bolt.StockTradeBolt;
import com.storm.spout.FileDataReaderSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
public class WordCountTopology {
private static TopologyBuilder builder = new TopologyBuilder();
public static void main(String[] args) {
Config config = new Config();
config.setDebug(false);
config.setNumWorkers(2); // 设置任务数为3
builder.setSpout("RandomSentence", new FileDataReaderSpout(), 2);
builder.setBolt("WordNormalizer", new StockTradeBolt(), 2).shuffleGrouping(
"RandomSentence");
builder.setBolt("Print", new OutputBolt(), 1).shuffleGrouping(
"WordNormalizer");
//通过是否有参数来控制是否启动集群,或者本地模式执行
if (args != null && args.length > 0) {
try {
StormSubmitter.submitTopology(args[0], config,
builder.createTopology());
} catch (Exception e) {
e.printStackTrace();
}
} else {
config.setMaxTaskParallelism(1);
LocalCluster cluster = new LocalCluster();
// 记录拓扑开始执行的时间
long startTime = System.currentTimeMillis();
cluster.submitTopology("wordcount", config, builder.createTopology());
// 让拓扑运行一段时间
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
cluster.killTopology("wordcount");
cluster.shutdown();
// 记录拓扑执行结束的时间
long endTime = System.currentTimeMillis();
// 计算拓扑的运行时间
long runtime = endTime - startTime;
System.out.println("拓扑执行时间:" + runtime + " 毫秒");
}
}
}
7.运行结果
更多推荐
已为社区贡献2条内容
所有评论(0)