Aggregation
NOTE: Always set table.exec.sink.upsert-materialize to NONE in Flink SQL TableConfig.
Sometimes users only care about aggregated results. The aggregation merge engine aggregates each value field with the latest data one by one under the same primary key according to the aggregate function.
Each field not part of the primary keys can be given an aggregate function, specified by the fields.<field-name>.aggregate-function table property, otherwise it will use last_non_null_value aggregation as default. For example, consider the following table definition.
- Flink
CREATE TABLE my_table (
product_id BIGINT,
price DOUBLE,
sales BIGINT,
PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
'merge-engine' = 'aggregation',
'fields.price.aggregate-function' = 'max',
'fields.sales.aggregate-function' = 'sum'
);
Field price will be aggregated by the max function, and field sales will be aggregated by the sum function. Given two input records <1, 23.0, 15> and <1, 30.2, 20>, the final result will be <1, 30.2, 35>.
Aggregation Functions
Current supported aggregate functions and data types are:
sum
The sum function aggregates the values across multiple rows. It supports DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, and DOUBLE data types.
product
The product function can compute product values across multiple lines. It supports DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, and DOUBLE data types.
count
In scenarios where counting rows that match a specific condition is required, you can use the SUM function to achieve this. By expressing a condition as a Boolean value (TRUE or FALSE) and converting it into a numerical value, you can effectively count the rows. In this approach, TRUE is converted to 1, and FALSE is converted to 0.
For example, if you have a table orders and want to count the number of rows that meet a specific condition, you can use the following query:
SELECT SUM(CASE WHEN condition THEN 1 ELSE 0 END) AS count
FROM orders;
max
The max function identifies and retains the maximum value. It supports CHAR, VARCHAR, DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP, and TIMESTAMP_LTZ data types.
min
The min function identifies and retains the minimum value. It supports CHAR, VARCHAR, DECIMAL, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DATE, TIME, TIMESTAMP, and TIMESTAMP_LTZ data types.
last_value
The last_value function replaces the previous value with the most recently imported value. It supports all data types.
last_non_null_value
The last_non_null_value function replaces the previous value with the latest non-null value. It supports all data types.
listagg
The listagg function concatenates multiple string values into a single string.
It supports STRING data type.
Each field not part of the primary keys can be given a list agg delimiter, specified by the fields.<field-name>.list-agg-delimiter table property, otherwise it will use "," as default.
You can use fields.<field-name>.distinct=true to deduplicate values split by the fields.<field-name>.list-agg-delimiter.