This document mainly introduces how to develop Flink applications using the Flink DataStream API
and Flink SQL
. For more information, please refer to the Flink API. It also explains how to use the Flink job management platform, StreamPark, to submit and run Flink applications.
Prerequisites:
The Flink DataStream API
is an advanced API used for processing unbounded data streams. It provides numerous operators that can be used to manipulate data streams. For example, map
, filter
, keyBy
, reduce
, window
, etc. Below is a simple example demonstrating how to develop a basic WordCount application using the Flink DataStream API
.
package org.apache.flink.streaming.examples.socket;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* Implements a streaming windowed version of the "WordCount" program.
*
* <p>This program connects to a server socket and reads strings from the socket. The easiest way to
* try this out is to open a text server (at port 12345) using the <i>netcat</i> tool via
*
* <pre>
* nc -l 12345 on Linux or nc -l -p 12345 on Windows
* </pre>
*
* <p>and run this example with the hostname and the port as arguments.
*/
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
// the host and the port to connect to
final String hostname;
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
hostname = params.has("hostname") ? params.get("hostname") : "localhost";
port = params.getInt("port");
} catch (Exception e) {
System.err.println(
"No port specified. Please run 'SocketWindowWordCount "
+ "--hostname <hostname> --port <port>', where hostname (localhost by default) "
+ "and port is the address of the text server");
System.err.println(
"To start a simple text server, run 'netcat -l <port>' and "
+ "type the input text into the command line");
return;
}
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream(hostname, port, "\n");
// parse the data, group it, window it, and aggregate the counts
DataStream<WordWithCount> windowCounts =
text.flatMap(
(FlatMapFunction<String, WordWithCount>)
(value, out) -> {
for (String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
},
Types.POJO(WordWithCount.class))
.keyBy(value -> value.word)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.reduce((a, b) -> new WordWithCount(a.word, a.count + b.count))
.returns(WordWithCount.class);
// print the results with a single thread, rather than in parallel
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
// ------------------------------------------------------------------------
/** Data type for words with count. */
public static class WordWithCount {
public String word;
public long count;
@SuppressWarnings("unused")
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
}
This code is a simple Apache Flink program that performs a windowed word count task by reading data from a socket. Here is an explanation of the main parts of the code:
The main function of this program is to read text data streams from a specified socket, perform real-time word counting within the window, and print the results to the console.
Source code address: https://github.com/apache/flink/tree/release-1.17/flink-examples/flink-examples-streaming
Program package download address: https://repo1.maven.org/maven2/org/apache/flink/flink-examples-streaming_2.12/1.17.1/flink-examples-streaming_2.12-1.17.1-SocketWindowWordCount.jar
Two methods are introduced for submitting applications: one is through the Flink CLI, and the other is through the Flink job management platform StreamPark.
Enter the flink session cluster container :
# Change the pod according to the actual situation
kubectl exec -it flink-session-cluster-xxxxx -n kdp-data -- bash
Execute the following command in the flink session cluster container:
./bin/flink run -d ./examples/streaming/SocketWindowWordCount.jar --hostname `(grep 'flink-session-cluster' /etc/hosts | head -n 1 | awk '{print $1}')` --port 9999 && nc -l 9999
Expected output like:
ERROR StatusLogger Reconfiguration failed: No configuration found for '6438a396' at 'null' in 'null'
ERROR StatusLogger Reconfiguration failed: No configuration found for '510f3d34' at 'null' in 'null'
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/opt/flink/lib/flink-dist-1.17.1.jar) to field java.lang.String.value
WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Job has been submitted with JobID ff3008609a49c364e7b27f5e94f4f57b
The error message ERROR StatusLogger Reconfiguration failed
does not affect job submission and can be ignored.
At this point, the job has been successfully submitted. In the nc command interactive interface, enter any text, press Enter to send the text, and you can send text multiple times. You can view the output results on the flink session cluster console.
Clear the job: Exit the nc interactive interface (control+c) and execute the following command:
./bin/flink list
## expecte output:job id
./bin/flink cancel <job_id>
Preparation
Enter the flink session cluster container to view the IP address, and execute the following command:
grep 'flink-session-cluster' /etc/hosts | head -n 1 | awk '{print $1}'
## Record the output Flink container IP address (e.g., 10.233.114.142),as the StreamPark job parameters will use it.
## Start the socket service:
nc -l 9999
## After successfully publishing the job, input text data and observe the output results on the flink session cluster console.
Logging in to StreamPark WebUI to Submit
Log in to the StreamPark WebUI, click on real-time development
- job management
- add
in the left navigation bar, fill in the information as follows on the new page, and then click submit
.
Custom Code
remote
upload local job
flink-1.17.1
demo
org.apache.flink.streaming.examples.socket.SocketWindowWordCount
Socket Window WordCount
--port 9999 --hostname 10.233.114.142
(IP address to be filled in according to the actual situation)After successful addition, you will be redirected to the job management page.
Running the job
publish job
button for the Socket Window WordCount
job, and the publish status will change to Done
Success
start job
button for the Socket Window WordCount
job, close the from savepoint
in the pop-up window, click apply
, the job will be submitted to the Flink session cluster for running, and the running status will change to Starting
, Running
in turn.stop job
button to stop the job.Flink SQL
is a high-level API for processing unbounded and bounded data streams. It provides a SQL-like query language that can be used to process data streams. Here is a simple example showing how to use Flink SQL
to develop a simple application. It simulates generating order data and calculates the total amount for each user, then prints the results.
Flink SQL is as follows:
```sql
create table UserOrder(
user_id int,
money_amount int,
ts AS localtimestamp,
WATERMARK FOR ts AS ts
) with (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '10',
'fields.money_amount.min' = '1',
'fields.money_amount.max' = '100'
);
create table UserMoneyAmount (
user_id int,
total_amount int,
primary key (user_id) not enforced
) with ('connector' = 'print');
insert into
UserMoneyAmount
select
user_id,
sum(money_amount) as total_amount
from
UserOrder
group by
user_id;
Two methods are introduced for submitting applications: one is through the Flink CLI, and the other is through the Flink job management platform StreamPark.
Enter the flink session cluster container and execute the following command:
## Start sql-client
./bin/sql-client.sh
## In the Flink SQL interactive terminal, enter the three SQL statements above, one at a time, and press Enter after each; multiple statements cannot be executed simultaneously.
## Expected successful execution of the three statements, job submitted successfully, and job id output, record the id for later use to cancel the job.
## Access Flink WebUI to check the job status
## Return to the Flink SQL interactive terminal to cancel the job
./bin/flink cancel <job_id>
Click on real-time development
- job management
- add
in the left navigation bar, fill in the information as follows on the new page, and then click submit
.
remote
flink-1.17.1
demo
create table UserOrder(
user_id int,
money_amount int,
ts AS localtimestamp,
WATERMARK FOR ts AS ts
) with (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.user_id.min' = '1',
'fields.user_id.max' = '10',
'fields.money_amount.min' = '1',
'fields.money_amount.max' = '100'
);
create table UserMoneyAmount (
user_id int,
total_amount int,
primary key (user_id) not enforced
) with ('connector' = 'print');
insert into
UserMoneyAmount
select
user_id,
sum(money_amount) as total_amount
from
UserOrder
group by
user_id;
user-order-total-amount
After successful addition, you will be redirected to the job management page.
Running the job
publish job
button for the job, and the publish status will change to Done
Success
.start job
button for the job, close the from savepoint in the pop-up window, click apply
, the job will be submitted to the Flink session cluster for running, and the running status will change to Starting
, Running
, in turn.stop job
button to stop the job.