(1)透過FlinkSQL將資料寫入mysql demo
2022-08-07由 NBI大資料視覺化 發表于 漁業
string字元型變數什麼特點
FlinkSQL的出現,極大程度上降低了Flink的程式設計門檻,更加容易理解和掌握使用。今天將自己的筆記分享出來,希望能幫助在這方面有需要的朋友。
(1)首先引入POM依賴:
```
<!—— https://mvnrepository。com/artifact/org。apache。flink/flink-connector-jdbc ——>
<!——
<!—— https://mvnrepository。com/artifact/com。fasterxml。jackson。core/jackson-databind ——>
<!—— https://mvnrepository。com/artifact/mysql/mysql-connector-java ——>
```
(2)編寫程式碼
```
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment。getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings。newInstance()
。inStreamingMode()
//。useOldPlanner() // flink
。useBlinkPlanner() // blink
。build();
StreamTableEnvironment ste = StreamTableEnvironment。create(env, settings);
String ddl = “CREATE TABLE flinksinksds(\r\n” +
“componentname STRING,\r\n” +
“componentcount INT,\r\n” +
“componentsum INT\r\n” +
“) WITH(\r\n” +
“‘connector。type’=‘jdbc’,\r\n” +
“‘connector。driver’ = ‘com。mysql。cj。jdbc。Driver’,” +
“‘connector。url’=‘jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai’,\r\n” +
“‘connector。table’=‘flinksink’,\r\n” +
“‘connector。username’=‘root’,\r\n” +
“‘connector。password’=‘root’,\r\n” +
“‘connector。write。flush。max-rows’=‘1’\r\n” +
“)”;
System。err。println(ddl);
ste。executeSql(ddl);
String insert = “insert into flinksinksds(componentname,componentcount,componentsum)” +
“values(‘1024’, 1 , 2 )”;
ste。executeSql(insert);
env。execute();
System。exit(0);
}
```
(3)執行結果: