農林漁牧網

您現在的位置是:首頁 > 漁業

(1)透過FlinkSQL將資料寫入mysql demo

2022-08-07由 NBI大資料視覺化 發表于 漁業

string字元型變數什麼特點

(1)透過FlinkSQL將資料寫入mysql demo

FlinkSQL的出現,極大程度上降低了Flink的程式設計門檻,更加容易理解和掌握使用。今天將自己的筆記分享出來,希望能幫助在這方面有需要的朋友。

(1)首先引入POM依賴:

```

1。13。1

2。12

1。7。30

org。apache。flink

flink-java

${flink。version}

org。apache。flink

flink-streaming-java_${scala。binary。version}

${flink。version}

org。apache。flink

flink-clients_${scala。binary。version}

${flink。version}

org。apache。flink

flink-table-api-java-bridge_${scala。binary。version}

${flink。version}

<!—— https://mvnrepository。com/artifact/org。apache。flink/flink-connector-jdbc ——>

org。apache。flink

flink-connector-jdbc_${scala。binary。version}

${flink。version}

<!——provided——>

org。apache。flink

flink-table-planner-blink_${scala。binary。version}

${flink。version}

org。apache。flink

flink-streaming-scala_${scala。binary。version}

${flink。version}

org。apache。flink

flink-table-common

${flink。version}

org。apache。flink

flink-json

${flink。version}

<!—— https://mvnrepository。com/artifact/com。fasterxml。jackson。core/jackson-databind ——>

com。fasterxml。jackson。core

jackson-databind

2。12。0

<!—— https://mvnrepository。com/artifact/mysql/mysql-connector-java ——>

mysql

mysql-connector-java

8。0。16

com。alibaba

fastjson

1。2。66

```

(2)編寫程式碼

```

public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env = StreamExecutionEnvironment。getExecutionEnvironment();

EnvironmentSettings settings = EnvironmentSettings。newInstance()

。inStreamingMode()

//。useOldPlanner() // flink

。useBlinkPlanner() // blink

。build();

StreamTableEnvironment ste = StreamTableEnvironment。create(env, settings);

String ddl = “CREATE TABLE flinksinksds(\r\n” +

“componentname STRING,\r\n” +

“componentcount INT,\r\n” +

“componentsum INT\r\n” +

“) WITH(\r\n” +

“‘connector。type’=‘jdbc’,\r\n” +

“‘connector。driver’ = ‘com。mysql。cj。jdbc。Driver’,” +

“‘connector。url’=‘jdbc:mysql://localhost:3306/testdb?characterEncoding=UTF-8&useUnicode=true&useSSL=false&tinyInt1isBit=false&allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai’,\r\n” +

“‘connector。table’=‘flinksink’,\r\n” +

“‘connector。username’=‘root’,\r\n” +

“‘connector。password’=‘root’,\r\n” +

“‘connector。write。flush。max-rows’=‘1’\r\n” +

“)”;

System。err。println(ddl);

ste。executeSql(ddl);

String insert = “insert into flinksinksds(componentname,componentcount,componentsum)” +

“values(‘1024’, 1 , 2 )”;

ste。executeSql(insert);

env。execute();

System。exit(0);

}

```

(3)執行結果:

(1)透過FlinkSQL將資料寫入mysql demo