Flink API #
Dependency #
Maven dependency:
<dependency>
<groupId>org.apache.paimon</groupId>
<artifactId>paimon-flink-1.17</artifactId>
<version>0.4.0-incubating</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>1.17.0</version>
<scope>provided</scope>
</dependency>
Or download the jar file: Paimon Flink.
Please choose your Flink version.
Paimon relies on Hadoop environment, you should add hadoop classpath or bundled jar.
Paimon does not provide a DataStream API, but you can read or write to Paimon tables by the conversion between DataStream and Table in Flink. See DataStream API Integration.
Write to Table #
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
public class WriteToTable {
public static void writeTo() {
// create environments of both APIs
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// create a changelog DataStream
DataStream<Row> dataStream =
env.fromElements(
Row.ofKind(RowKind.INSERT, "Alice", 12),
Row.ofKind(RowKind.INSERT, "Bob", 5),
Row.ofKind(RowKind.UPDATE_BEFORE, "Alice", 12),
Row.ofKind(RowKind.UPDATE_AFTER, "Alice", 100))
.returns(
Types.ROW_NAMED(
new String[] {"name", "age"},
Types.STRING, Types.INT));
// interpret the DataStream as a Table
Schema schema = Schema.newBuilder()
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.build();
Table table = tableEnv.fromChangelogStream(dataStream, schema);
// create paimon catalog
tableEnv.executeSql("CREATE CATALOG paimon WITH ('type' = 'paimon', 'warehouse'='...')");
tableEnv.executeSql("USE CATALOG paimon");
// register the table under a name and perform an aggregation
tableEnv.createTemporaryView("InputTable", table);
// insert into paimon table from your data stream table
tableEnv.executeSql("INSERT INTO sink_paimon_table SELECT * FROM InputTable");
}
}
Read from Table #
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
public class ReadFromTable {
public static void readFrom() throws Exception {
// create environments of both APIs
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// create paimon catalog
tableEnv.executeSql("CREATE CATALOG paimon WITH ('type' = 'paimon', 'warehouse'='...')");
tableEnv.executeSql("USE CATALOG paimon");
// convert to DataStream
Table table = tableEnv.sqlQuery("SELECT * FROM my_paimon_table");
DataStream<Row> dataStream = tableEnv.toChangelogStream(table);
// use this datastream
dataStream.executeAndCollect().forEachRemaining(System.out::println);
// prints:
// +I[Bob, 12]
// +I[Alice, 12]
// -U[Alice, 12]
// +U[Alice, 14]
}
}