资源简介
flinkdemo, 里面分别介绍了流式计算的单词统计,聚合,从kafka的数据生产,到flink从kafka消费再写入mysql,源是mysql消费数据再写入目标数据等一系列的代码,经过测试,完成能运行的
代码片段和文件信息
package com.hy.flinktest;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.util.Collector;
/**
* ClassName: BatchWordCountJava
* Description: 统计单词批量统计(java实现)
* Date: 2020/9/23 10:10
*
* @Author dengchangshi
*/
public class BatchWordCountJava {
public static void main(String[] args) {
String input = “d://tmpdata/test1.txt“;
String output = “d://tmpdata/result“;
//初始化环境
ExecutionEnvironment environment = ExecutionEnvironment.getExecutionEnvironment();
environment.setParallelism(1);
//读取数据
DataSource data = environment.readTextFile(input);
AggregateOperator> res = data.flatMap(new SplitFunction()).groupBy(0).sum(1);
res.writeAsCsv(output FileSystem.WriteMode.OVERWRITE);
//res.writeAsText(output FileSystem.WriteMode.OVERWRITE);
try {
environment.execute(“BatchWordCountJava“);
} catch (Exception e) {
e.printStackTrace();
}
}
private static class SplitFunction implements FlatMapFunction>{
public void flatMap(String value Collector> collector) throws Exception {
String[] words = value.split(“ “);
for (String word : words) {
if(StringUtils.isNotBlank(word)){
collector.collect(new Tuple2(word1L));
}
}
}
}
}
属性 大小 日期 时间 名称
----------- --------- ---------- ----- ----
文件 1 2020-09-23 11:52 fli
文件 17 2020-09-23 11:52 fli
文件 0 2020-09-23 11:52 fli
文件 17 2020-09-23 11:52 fli
文件 53 2020-09-23 11:52 fli
文件 0 2020-09-23 11:52 fli
文件 1185 2020-09-23 11:47 fli
文件 153 2020-09-23 11:47 fli
文件 709 2020-09-25 14:50 fli
文件 170 2020-09-23 11:47 fli
文件 294 2020-09-24 15:02 fli
文件 816 2020-09-24 09:15 fli
文件 184 2020-09-23 11:50 fli
文件 531 2020-09-23 11:50 fli
文件 619 2020-09-23 11:50 fli
文件 506 2020-09-23 11:50 fli
文件 496 2020-09-24 09:58 fli
文件 517 2020-09-24 15:38 fli
文件 545 2020-09-23 11:50 fli
文件 546 2020-09-23 11:50 fli
文件 544 2020-09-23 11:50 fli
文件 548 2020-09-23 11:50 fli
文件 577 2020-09-24 09:53 fli
文件 524 2020-09-24 15:11 fli
文件 524 2020-09-24 15:11 fli
文件 590 2020-09-25 12:27 fli
文件 611 2020-09-25 12:27 fli
文件 590 2020-09-25 12:27 fli
文件 597 2020-09-25 12:27 fli
文件 500 2020-09-25 12:27 fli
............此处省略107个文件信息
- 上一篇:sp1地址.txt
- 下一篇:SQL作业学生成绩管理数据库
相关资源
- 6.SparkSQL下--Spark实战应用.pdf
- 电子图书管理系统
- Xposed提取微信数据库密码(微信6.6.
- netbeans个人通讯录
- 旅游业信息化服务平台
- SSH+Spring Security+MySQL
- 2019最新银行卡bin表单、包含excel表,
- springmvc+spring+mybatis+mysql数据库整合读
- 用户同一时段不能重复登录类似QQMy
- SSM+mysql购书商城2018届毕设
- 网上订票系统
- 企业工单管理系统
- 基于SpringBoot2+Jpa+SpringSecurity+redis+Vue的
- 基于vue全家桶nodejsexpressmysql实现的商
- oracle数据库 +springmvc框架 开发的增删
- 毕业设计体检预约系统-springmvc+mybat
- 医院预约挂号系统 数据库
- 数据库课程设计-学生信息管理系统
- 酒店客房信息管理系统+源码+设计报告
- 阿里云_大数据计算服务_MaxCompute原O
- 简单酒店管理系统 swing+mysql
- 2019年最全的银行开户行联行号sql数据
- 医院信息管理系统完整项目
- 学籍管理系统源码
- 考勤系统 net sql server.rar
- 社团管理系统
- oracle大作业236343
- mysql必知必会ppt教程
- [大数据] 大数据分析 (英文版)
- 传智播客内部数据库教学资料及上课
评论
共有 条评论