本文主要介绍一一下如何使用Flink DataStream API
和Flink SQL
开发Flink应用程序。更多请参考Flink API 介绍。同时,将介绍如何使用 Flink 作业管理平台提交和运行 Flink 应用程序。
前置条件:
Flink DataStream API
是一种用于处理无界数据流的高级API。它提供了许多操作符,可以用于处理数据流。例如,map
、filter
、keyBy
、reduce
、window
等。下面是一个简单的示例,展示如何使用Flink DataStream API
开发一个简单的WordCount
应用程序。
package org.apache.flink.streaming.examples.socket;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* Implements a streaming windowed version of the "WordCount" program.
*
* <p>This program connects to a server socket and reads strings from the socket. The easiest way to
* try this out is to open a text server (at port 12345) using the <i>netcat</i> tool via
*
* <pre>
* nc -l 12345 on Linux or nc -l -p 12345 on Windows
* </pre>
*
* <p>and run this example with the hostname and the port as arguments.
*/
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
// the host and the port to connect to
final String hostname;
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
hostname = params.has("hostname") ? params.get("hostname") : "localhost";
port = params.getInt("port");
} catch (Exception e) {
System.err.println(
"No port specified. Please run 'SocketWindowWordCount "
+ "--hostname <hostname> --port <port>', where hostname (localhost by default) "
+ "and port is the address of the text server");
System.err.println(
"To start a simple text server, run 'netcat -l <port>' and "
+ "type the input text into the command line");
return;
}
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream(hostname, port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts =
text.flatMap(
(FlatMapFunction<String, WordWithCount>)
(value, out) -> {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
},
Types.POJO(WordWithCount.class))
.keyBy(value -> value.word)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.reduce((a, b) -> new WordWithCount(a.word, a.count + b.count))
.returns(WordWithCount.class);
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
// ------------------------------------------------------------------------
/** Data type for words with count. */
public static class WordWithCount {
public String word;
public long count;
@SuppressWarnings("unused")
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
这段代码是一个简单的Apache Flink程序,执行从socket读取数据的窗口化单词计数任务。下面是对代码的主要部分进行解释:
这个程序的主要功能是从指定的socket读取文本数据流,对窗口内的单词进行实时计数,并将结果打印到控制台。
源代码地址:https://github.com/apache/flink/tree/release-1.17/flink-examples/flink-examples-streaming
介绍二种方式,一种是通过Flink CLI提交应用程序,另一种是通过Flink作业管理平台Streampark提交应用程序。
进入flink session cluster容器
# pod 根据实际情况替换
kubectl exec -it flink-session-cluster-xxxxx -n kdp-data -- bash
在容器中执行以下命令:
./bin/flink run -d ./examples/streaming/SocketWindowWordCount.jar --hostname `(grep 'flink-session-cluster' /etc/hosts | head -n 1 | awk '{print $1}')` --port 9999 && nc -l 9999
预期输出类似:
ERROR StatusLogger Reconfiguration failed: No configuration found for '6438a396' at 'null' in 'null'
ERROR StatusLogger Reconfiguration failed: No configuration found for '510f3d34' at 'null' in 'null'
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/opt/flink/lib/flink-dist-1.17.1.jar) to field java.lang.String.value
WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Job has been submitted with JobID ff3008609a49c364e7b27f5e94f4f57b
ERROR StatusLogger Reconfiguration failed
不影响作业提交,可以忽略。
此时,作业提交成功,在nc命令的交互界面输入任意文本,回车后发送文字,可以多次发送文字。可以查看flink session cluster控制台输出结果。 也可以到flink WebUI 查看作业状态。
清理作业: 退出nc交互界面(control+c),执行以下命令:
./bin/flink list
## 预期会输出job id
./bin/flink cancel <job_id>
准备
进入 flink session cluster 容器查看ip地址,执行以下命令:
grep 'flink-session-cluster' /etc/hosts | head -n 1 | awk '{print $1}'
## 记录输出的flink容器ip地址(如:10.233.114.142),streampark 作业参数会用到
## 起socket服务
nc -l 9999
## 在成功发布作业后,输入文本数据,查看flink session cluster控制台输出结果
登录 Streampark WebUI 提交
登录 Streampark WebUI ,在左侧导航栏点击实时开发
- 作业管理
- 添加
,在新的页面中按照如下信息填写,然后点击提交
按钮。
Custom Code
remote
upload local job
flink-1.17.1
demo
org.apache.flink.streaming.examples.socket.SocketWindowWordCount
Socket Window WordCount
--port 9999 --hostname 10.233.114.142
(ip 地址根据实际情况填写)添加成功后会跳转到作业管理页面
运行作业
Socket Window WordCount
作业的发布作业
按钮,稍等片刻,发布状态变为Done
Success
Socket Window WordCount
作业的启动作业
按钮,关闭弹窗中的from savepoint
, 点击应用
, 作业将提交到Flink session集群运行, 运行状态依次变为Starting
Running
停止作业
按钮,停止作业。Flink SQL
是一种用于处理无界和有界数据流的高级API。它提供了类似于SQL的查询语言,可以用于处理数据流。下面是一个简单的示例,展示如何使用Flink SQL
,开发一个简单应用程序。用于模拟生成订单数据,并计算每个用户的总金额,然后将结果打印输出。
Flink SQL 如下:
```sql
create table UserOrder(
user_id int,
money_amount int,
ts AS localtimestamp,
WATERMARK FOR ts AS ts
) with (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '10',
'fields.money_amount.min' = '1',
'fields.money_amount.max' = '100'
);
create table UserMoneyAmount (
user_id int,
total_amount int,
primary key (user_id) not enforced
) with ('connector' = 'print');
insert into
UserMoneyAmount
select
user_id,
sum(money_amount) as total_amount
from
UserOrder
group by
user_id;
介绍二种方式,一种是通过Flink CLI提交应用程序,另一种是通过Flink作业管理平台Streampark提交应用程序。
进入 flink session cluster 容器,执行以下命令:
## 启动sql-client
./bin/sql-client.sh
## 在flink sql交互终端中输入上面的三条sql语句, 需要单条执行,然后回车,不支持多条语句同时执行
## 预期三条语句执行成功,job 提交成功并输出 job id, 记录id稍后用于取消作业
## 访问Flink WebUI 查看作业状态
## 返回flink sql 交互终端,取消作业
./bin/flink cancel <job_id>
在左侧导航栏点击实时开发
- 作业管理
- 添加
,在新的页面中按照如下信息填写,然后点击提交
按钮。
remote
flink-1.17.1
demo
create table UserOrder(
user_id int,
money_amount int,
ts AS localtimestamp,
WATERMARK FOR ts AS ts
) with (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '10',
'fields.money_amount.min' = '1',
'fields.money_amount.max' = '100'
);
create table UserMoneyAmount (
user_id int,
total_amount int,
primary key (user_id) not enforced
) with ('connector' = 'print');
insert into
UserMoneyAmount
select
user_id,
sum(money_amount) as total_amount
from
UserOrder
group by
user_id;
user-order-total-amount
添加成功后会跳转到作业管理页面
运行作业
发布作业
按钮,稍等片刻,发布状态变为Done
Success
启动作业
按钮,关闭弹窗中的from savepoint
, 点击应用
, 作业将提交到Flink session集群运行, 运行状态依次变为Starting
Running
停止作业
按钮,停止作业。