资源简介
网上很少能找到flink的生产项目,这个项目是关于系统运维方面的流式处理,生产上的注意点,代码里基本都有体现,对初学者应该有一些帮助,其他的,就去官方demo上学习吧。
代码片段和文件信息
package comm.teld.cn;
import java.io.InputStream;
import java.util.Arrays;
import java.util.Properties;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import comm.teld.cn.common.Utils;
import comm.teld.cn.common.config.ConfigBean;
import comm.teld.cn.common.config.LoadPropertiesFile;
import comm.teld.cn.event.baseDTO;
import comm.teld.cn.filter.CommMsg;
import comm.teld.cn.log.LoggerUtils;
import comm.teld.cn.map.AlarmRichFlatMap;
import comm.teld.cn.sink.ByteArrayDeserialization;
import comm.teld.cn.sink.EORMQSink;
import comm.teld.cn.sink.JsonDTOSerializing;
public class DRDCDeviceMonitor {
private static void startProcessMessage(ConfigBean configBean) throws Exception {
StreamExecutionEnvironment env=null;
if(configBean.environmentDataCenter.startsWith(“dev“)) {
Configuration localConfig = new Configuration();
localConfig.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER true);
env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(localConfig);
env.setParallelism(1);
} else {
env = StreamExecutionEnvironment.getExecutionEnvironment();
// Configuration localConfig = new Configuration();
// localConfig.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER true);
// env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(localConfig);
env.setParallelism(1);
}
env.enableCheckpointing(configBean.checkpointDurationCheckpointingMode.EXACTLY_ONCE);
// set mode to exactly-once (this is the default)
//env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//TODO
if(!configBean.environmentDataCenter.startsWith(“prod“)) {
env.disableOperatorChaining();
}
FlinkKafkaConsumer08 devMonitorConsumer = new FlinkKafkaConsumer08<>(
configBean.kafkaDeviceMonitorSourceTopic new ByteArrayDeserialization()
configBean.kafkaDeviceMonitorSourceProperties);
//devMonitorConsumer.setCommitOffsetsOnCheckpoints(true);
// devMonitorConsumer.setStartFromGroupOffsets();
DataStream devMonitorDataStream = env.addSource(devMonitorConsumer).name(“devMonitor“).rebalance()
.map(Utils.devMonitorMsgMapFunction).name(“devMonitorCommMsg“);
convertToEvent(devMonitorDataStream configBean);
env.execute(“MonitorDevice_“ + configBean.environmentDataCenter);
}
private static void convertToEvent(DataStream devMonit
属性 大小 日期 时间 名称
----------- --------- ---------- ----- ----
目录 0 2018-07-25 09:30 DRDCDeviceMonitor-master\
文件 1510 2018-07-25 09:30 DRDCDeviceMonitor-master\.classpath
文件 66 2018-07-25 09:30 DRDCDeviceMonitor-master\.gitattributes
文件 546 2018-07-25 09:30 DRDCDeviceMonitor-master\.project
目录 0 2018-07-25 09:30 DRDCDeviceMonitor-master\.settings\
文件 342 2018-07-25 09:30 DRDCDeviceMonitor-master\.settings\org.eclipse.core.resources.prefs
文件 238 2018-07-25 09:30 DRDCDeviceMonitor-master\.settings\org.eclipse.jdt.core.prefs
文件 86 2018-07-25 09:30 DRDCDeviceMonitor-master\.settings\org.eclipse.m2e.core.prefs
目录 0 2018-07-25 09:30 DRDCDeviceMonitor-master\.svn\
文件 3 2018-07-25 09:30 DRDCDeviceMonitor-master\.svn\entries
文件 3 2018-07-25 09:30 DRDCDeviceMonitor-master\.svn\format
目录 0 2018-07-25 09:30 DRDCDeviceMonitor-master\.svn\pristine\
目录 0 2018-07-25 09:30 DRDCDeviceMonitor-master\.svn\pristine\19\
文件 18719 2018-07-25 09:30 DRDCDeviceMonitor-master\.svn\pristine\19\190e4d39993998a3eb264e3f4b09dd39d9f2df69.svn-ba
目录 0 2018-07-25 09:30 DRDCDeviceMonitor-master\.svn\pristine\1c\
文件 717 2018-07-25 09:30 DRDCDeviceMonitor-master\.svn\pristine\1c\1c5cb7e3508e19c5c6e86e0cce978439d713e8d9.svn-ba
目录 0 2018-07-25 09:30 DRDCDeviceMonitor-master\.svn\pristine\1d\
文件 1974 2018-07-25 09:30 DRDCDeviceMonitor-master\.svn\pristine\1d\1d1cea4de9230b899bde4d7ca2841f7aaf41e477.svn-ba
目录 0 2018-07-25 09:30 DRDCDeviceMonitor-master\.svn\pristine\33\
文件 1033 2018-07-25 09:30 DRDCDeviceMonitor-master\.svn\pristine\33\33e4ac58a8520fb15f0f138968204fde188d0d3c.svn-ba
目录 0 2018-07-25 09:30 DRDCDeviceMonitor-master\.svn\pristine\38\
文件 18875 2018-07-25 09:30 DRDCDeviceMonitor-master\.svn\pristine\38\38a760b9a8f43014275156ff470c00249af2e4df.svn-ba
目录 0 2018-07-25 09:30 DRDCDeviceMonitor-master\.svn\pristine\3a\
文件 1710 2018-07-25 09:30 DRDCDeviceMonitor-master\.svn\pristine\3a\3abeb46e660e926c2bb8d6594b12a1061a95472d.svn-ba
目录 0 2018-07-25 09:30 DRDCDeviceMonitor-master\.svn\pristine\42\
文件 4748 2018-07-25 09:30 DRDCDeviceMonitor-master\.svn\pristine\42\42d2764b3ddfd5a2ce1e7fa9d5cef8af403e2b73.svn-ba
目录 0 2018-07-25 09:30 DRDCDeviceMonitor-master\.svn\pristine\4d\
文件 417 2018-07-25 09:30 DRDCDeviceMonitor-master\.svn\pristine\4d\4d442b67ec9a03b95da5ecad694dac4b2e1884a8.svn-ba
文件 1121 2018-07-25 09:30 DRDCDeviceMonitor-master\.svn\pristine\4d\4dec4ac6c9d62a53afd78c3e16ac3fefe36273ce.svn-ba
目录 0 2018-07-25 09:30 DRDCDeviceMonitor-master\.svn\pristine\64\
文件 20837 2018-07-25 09:30 DRDCDeviceMonitor-master\.svn\pristine\64\642b84e792d7982add4ec8109b4653ac574dba3f.svn-ba
............此处省略387个文件信息
- 上一篇:现代移动通信PDF+WORD)
- 下一篇:汇编语言程序题库清华大学出版的
评论
共有 条评论