• 大小: 1.01MB
    文件类型: .zip
    金币: 1
    下载: 0 次
    发布日期: 2023-08-12
  • 语言: 其他
  • 标签: flink  

资源简介

网上很少能找到flink的生产项目,这个项目是关于系统运维方面的流式处理,生产上的注意点,代码里基本都有体现,对初学者应该有一些帮助,其他的,就去官方demo上学习吧。

资源截图

代码片段和文件信息

package comm.teld.cn;

import java.io.InputStream;
import java.util.Arrays;
import java.util.Properties;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
import org.apache.kafka.clients.producer.ProducerConfig;

import comm.teld.cn.common.Utils;
import comm.teld.cn.common.config.ConfigBean;
import comm.teld.cn.common.config.LoadPropertiesFile;
import comm.teld.cn.event.baseDTO;
import comm.teld.cn.filter.CommMsg;
import comm.teld.cn.log.LoggerUtils;
import comm.teld.cn.map.AlarmRichFlatMap;
import comm.teld.cn.sink.ByteArrayDeserialization;
import comm.teld.cn.sink.EORMQSink;
import comm.teld.cn.sink.JsonDTOSerializing;

public class DRDCDeviceMonitor {

private static void startProcessMessage(ConfigBean configBean) throws Exception {
StreamExecutionEnvironment env=null;

if(configBean.environmentDataCenter.startsWith(“dev“)) {
Configuration localConfig = new Configuration();
localConfig.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER true);
env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(localConfig);
env.setParallelism(1);
} else {
env = StreamExecutionEnvironment.getExecutionEnvironment();
//         Configuration localConfig = new Configuration();
//            localConfig.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER true);
//         env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(localConfig);
env.setParallelism(1);
}
env.enableCheckpointing(configBean.checkpointDurationCheckpointingMode.EXACTLY_ONCE);
// set mode to exactly-once (this is the default)
//env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//TODO
if(!configBean.environmentDataCenter.startsWith(“prod“)) {
env.disableOperatorChaining();
}

FlinkKafkaConsumer08 devMonitorConsumer = new FlinkKafkaConsumer08<>(
configBean.kafkaDeviceMonitorSourceTopic new ByteArrayDeserialization()
configBean.kafkaDeviceMonitorSourceProperties);
//devMonitorConsumer.setCommitOffsetsOnCheckpoints(true);
// devMonitorConsumer.setStartFromGroupOffsets();
DataStream devMonitorDataStream = env.addSource(devMonitorConsumer).name(“devMonitor“).rebalance()
.map(Utils.devMonitorMsgMapFunction).name(“devMonitorCommMsg“);

convertToEvent(devMonitorDataStream configBean);



env.execute(“MonitorDevice_“ + configBean.environmentDataCenter);
}

private static void convertToEvent(DataStream devMonit

 属性            大小     日期    时间   名称
----------- ---------  ---------- -----  ----
     目录           0  2018-07-25 09:30  DRDCDeviceMonitor-master\
     文件        1510  2018-07-25 09:30  DRDCDeviceMonitor-master\.classpath
     文件          66  2018-07-25 09:30  DRDCDeviceMonitor-master\.gitattributes
     文件         546  2018-07-25 09:30  DRDCDeviceMonitor-master\.project
     目录           0  2018-07-25 09:30  DRDCDeviceMonitor-master\.settings\
     文件         342  2018-07-25 09:30  DRDCDeviceMonitor-master\.settings\org.eclipse.core.resources.prefs
     文件         238  2018-07-25 09:30  DRDCDeviceMonitor-master\.settings\org.eclipse.jdt.core.prefs
     文件          86  2018-07-25 09:30  DRDCDeviceMonitor-master\.settings\org.eclipse.m2e.core.prefs
     目录           0  2018-07-25 09:30  DRDCDeviceMonitor-master\.svn\
     文件           3  2018-07-25 09:30  DRDCDeviceMonitor-master\.svn\entries
     文件           3  2018-07-25 09:30  DRDCDeviceMonitor-master\.svn\format
     目录           0  2018-07-25 09:30  DRDCDeviceMonitor-master\.svn\pristine\
     目录           0  2018-07-25 09:30  DRDCDeviceMonitor-master\.svn\pristine\19\
     文件       18719  2018-07-25 09:30  DRDCDeviceMonitor-master\.svn\pristine\19\190e4d39993998a3eb264e3f4b09dd39d9f2df69.svn-base
     目录           0  2018-07-25 09:30  DRDCDeviceMonitor-master\.svn\pristine\1c\
     文件         717  2018-07-25 09:30  DRDCDeviceMonitor-master\.svn\pristine\1c\1c5cb7e3508e19c5c6e86e0cce978439d713e8d9.svn-base
     目录           0  2018-07-25 09:30  DRDCDeviceMonitor-master\.svn\pristine\1d\
     文件        1974  2018-07-25 09:30  DRDCDeviceMonitor-master\.svn\pristine\1d\1d1cea4de9230b899bde4d7ca2841f7aaf41e477.svn-base
     目录           0  2018-07-25 09:30  DRDCDeviceMonitor-master\.svn\pristine\33\
     文件        1033  2018-07-25 09:30  DRDCDeviceMonitor-master\.svn\pristine\33\33e4ac58a8520fb15f0f138968204fde188d0d3c.svn-base
     目录           0  2018-07-25 09:30  DRDCDeviceMonitor-master\.svn\pristine\38\
     文件       18875  2018-07-25 09:30  DRDCDeviceMonitor-master\.svn\pristine\38\38a760b9a8f43014275156ff470c00249af2e4df.svn-base
     目录           0  2018-07-25 09:30  DRDCDeviceMonitor-master\.svn\pristine\3a\
     文件        1710  2018-07-25 09:30  DRDCDeviceMonitor-master\.svn\pristine\3a\3abeb46e660e926c2bb8d6594b12a1061a95472d.svn-base
     目录           0  2018-07-25 09:30  DRDCDeviceMonitor-master\.svn\pristine\42\
     文件        4748  2018-07-25 09:30  DRDCDeviceMonitor-master\.svn\pristine\42\42d2764b3ddfd5a2ce1e7fa9d5cef8af403e2b73.svn-base
     目录           0  2018-07-25 09:30  DRDCDeviceMonitor-master\.svn\pristine\4d\
     文件         417  2018-07-25 09:30  DRDCDeviceMonitor-master\.svn\pristine\4d\4d442b67ec9a03b95da5ecad694dac4b2e1884a8.svn-base
     文件        1121  2018-07-25 09:30  DRDCDeviceMonitor-master\.svn\pristine\4d\4dec4ac6c9d62a53afd78c3e16ac3fefe36273ce.svn-base
     目录           0  2018-07-25 09:30  DRDCDeviceMonitor-master\.svn\pristine\64\
     文件       20837  2018-07-25 09:30  DRDCDeviceMonitor-master\.svn\pristine\64\642b84e792d7982add4ec8109b4653ac574dba3f.svn-base
............此处省略387个文件信息

评论

共有 条评论