• 企业400电话
  • 微网小程序
  • AI电话机器人
  • 电商代运营
  • 全 部 栏 目

    企业400电话 网络优化推广 AI电话机器人 呼叫中心 网站建设 商标✡知产 微网小程序 电商运营 彩铃•短信 增值拓展业务
    大数据HelloWorld-Flink实现WordCount

    所有的语言开篇都是Hello Word,数据处理引擎也有Hello Word。那就是Word Count。MR,Spark,Flink以来开篇第一个程序都是Word Count。那么今天Flink开始目标就是在本地调试出Word Count。

    单机安装Flink

    开始Flink之前先在本机尝试安装一下Flink,当然FLink正常情况下是部署的集群方式。作者比较穷,机器配置太低开不了几个虚拟机。所以只能先演示个单机的安装。

    Apache Flink需要在Java1.8+以上的环境中运行 。

    所以,先确保自己的JDK版本是1.8包含以上的。

     

    Flink单机部署非常简单,只需安装下载安装即可。如果需要与Hadoop版本结合,那么下载相应的Hadoop关联版本即可。如果不与Hadoop结合就直接下载Scala版即可。我这里就直接下载了Scala2.11的相关版本。

     

    点击进入Apache页面进行下载,大小约有283MB。

    把下载下来的压缩包进行解压即可。

    打开命令行直接执行 

    /bin/start-cluster.bat

    进行启动。 

    浏览器打开 http://localhost:8081

    至此在Windows10环境下即完成Flink的启动。

    编写WordCount

    因为Flink是由Scala进行开发的,而Scala是基于JVM的一种语言。所以最终也会转换为JAVA字节码文件,所以Flink程序可以由Java、Scala两种语言都可以进行开发。也可以同时开发。比如Java写一部分代码,Scala写另一部分代码。可以参考Apache Flink利用Maven对Scala与Java进行混编>。

    Flink官方提供快速生成工程的两种工具:SBT与Maven。由于作者比较熟悉Maven,( 或者说没用过SBT )。所以直接使用Maven快速创建一个工程。

    Java版本

    mvn archetype:generate                \
    
       -DarchetypeGroupId=org.apache.flink       \
    
       -DarchetypeArtifactId=flink-quickstart-java   \
    
       -DarchetypeVersion=1.8.0

    Scala版本

    mvn archetype:generate                \
    
       -DarchetypeGroupId=org.apache.flink       \
    
       -DarchetypeArtifactId=flink-quickstart-scala   \
    
       -DarchetypeVersion=1.8.0

    按照提示输入相关信息,即可生成最终的项目。

    ├── pom.xml
    └── src
      └── main
        ├── resources
        │  └── log4j.properties
        └── scala/java
          └── org
            └── myorg
              └── quickstart
                ├── BatchJob.scala
                └── StreamingJob.scala

    把工程导入到IDEA中

    如果使用Scala的话,那么需要安装Scala的插件。搜索安装同时需要把Scala语言包进行安装。

    不知道如何操作可以联系我 微信公号指尖数虫>。

    package jar;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.functions.ReduceFunction;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.util.Collector;
    
    public class BatchJob {
    
    	public static void main(String[] args) throws Exception {
    		// set up the batch execution environment
    		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    		//读取目录下的文件
    		DataSourceString> data = env.readTextFile("/opt/Server_Packets/log/ServerLog_1_runtime.log");
    		//把文件中的内容按照空格进行拆分为 word,1  1 是为了能够在下面进行计算.
    		data.flatMap(new FlatMapFunctionString, Tuple2String, Integer>>() {
    			@Override
    			public void flatMap(String s, CollectorTuple2String, Integer>> collector) throws Exception {
    				for (String word : s.split(" ")){
    					collector.collect(new Tuple2>(word,1));
    				}
    			}
    		})
    		// 按照元组中的第1位进行分组
    		.groupBy(0)
    		// 分组的元组的计算方式为 value +value 也就是刚才的 同样的词 把 1+1
    		.reduce(new ReduceFunctionTuple2String, Integer>>() {
    			@Override
    			public Tuple2String, Integer> reduce(Tuple2String, Integer> t2, Tuple2String, Integer> t1) throws Exception {
    				return new Tuple2>(t1.f0,t1.f1+ t2.f1);
    			}
    		})
    		//输出结果
    		.print();
    	}
    }

    总结

    以上所述是小编给大家介绍的大数据HelloWorld-Flink实现WordCount,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对脚本之家网站的支持!
    如果你觉得本文对你有帮助,欢迎转载,烦请注明出处,谢谢!

    您可能感兴趣的文章:
    • 浅谈实时计算框架Flink集群搭建与运行机制
    • 详解大数据处理引擎Flink内存管理
    • Apache FlinkCEP 实现超时状态监控的步骤详解
    • Flink支持哪些数据类型?
    • Java lambda表达式实现Flink WordCount过程解析
    • 解析Flink内核原理与实现核心抽象
    上一篇:DedeCMS 5.7 sp1远程文件包含漏洞(CVE-2015-4553)
    下一篇:网站搜索框使用微信扫码功能
  • 相关文章
  • 

    © 2016-2020 巨人网络通讯 版权所有

    《增值电信业务经营许可证》 苏ICP备15040257号-8

    大数据HelloWorld-Flink实现WordCount 大,数据,HelloWorld-Flink,实现,