The MERGE statement is used to select rows from one or more sources to update or insert into a target table. You can specify conditions to determine whether to update or insert into the target table. This statement is a convenient way to combine multiple operations. Allows you to avoid multiple INSERT, UPDATE, and DELETE DML statements. MERGE is a deterministic statement.
In Spark, the MERGE syntax is (For more information check Spark documentation):
MERGE INTO target_table_name [target_alias]
USING source_table_reference [source_alias]
ON merge_condition
{ WHEN MATCHED [ AND matched_condition ] THEN matched_action |
WHEN NOT MATCHED [BY TARGET] [ AND not_matched_condition ] THEN not_matched_action |
WHEN NOT MATCHED BY SOURCE [ AND not_matched_by_source_condition ] THEN not_matched_by_source_action } [...]
matched_action
{ DELETE |
UPDATE SET * |
UPDATE SET { column = { expr | DEFAULT } } [, ...] }
not_matched_action
{ INSERT * |
INSERT (column1 [, ...] ) VALUES ( expr | DEFAULT ] [, ...] )
not_matched_by_source_action
{ DELETE |
UPDATE SET { column = { expr | DEFAULT } } [, ...] }
MERGE INTO <target_table> USING <source> ON <join_expr> { matchedClause | notMatchedClause } [ ... ]
matchedClause ::=
WHEN MATCHED [ AND <case_predicate> ] THEN { UPDATE SET <col_name> = <expr> [ , <col_name2> = <expr2> ... ] | DELETE } [ ... ]
notMatchedClause ::=
WHEN NOT MATCHED [ AND <case_predicate> ] THEN INSERT [ ( <col_name> [ , ... ] ) ] VALUES ( <expr> [ , ... ] )
The main difference is that Snowflake does not have an equivalence for the WHEN NOT MATCHED BY SOURCE clause, one workaround is needed for its equivalence.
Sample Source Patterns
Sample auxiliary data
This code was executed for a better understanding of the examples:
CREATE OR REPLACE people_source (
person_id INTEGER NOT NULL PRIMARY KEY,
first_name STRING NOT NULL,
last_name STRING NOT NULL,
title STRING NOT NULL,
);
CREATE OR REPLACE people_target (
person_id INTEGER NOT NULL PRIMARY KEY,
first_name STRING NOT NULL,
last_name STRING NOT NULL,
title STRING NOT NULL DEFAULT 'NONE'
);
INSERT INTO people_target VALUES (1, 'John', 'Smith', 'Mr');
INSERT INTO people_target VALUES (2, 'alice', 'jones', 'Mrs');
INSERT INTO people_source VALUES (2, 'Alice', 'Jones', 'Mrs.');
INSERT INTO people_source VALUES (3, 'Jane', 'Doe', 'Miss');
INSERT INTO people_source VALUES (4, 'Dave', 'Brown', 'Mr');
CREATE OR REPLACE TABLE people_source (
person_id INTEGER NOT NULL PRIMARY KEY,
first_name VARCHAR(20) NOT NULL,
last_name VARCHAR(20) NOT NULL,
title VARCHAR(10) NOT NULL
);
CREATE OR REPLACE TABLE people_target (
person_id INTEGER NOT NULL PRIMARY KEY,
first_name VARCHAR(20) NOT NULL,
last_name VARCHAR(20) NOT NULL,
title VARCHAR(10) NOT NULL DEFAULT 'NONE'
);
INSERT INTO people_target VALUES (1, 'John', 'Smith', 'Mr');
INSERT INTO people_target VALUES (2, 'alice', 'jones', 'Mrs');
INSERT INTO people_source VALUES (2, 'Alice', 'Jones', 'Mrs.');
INSERT INTO people_source VALUES (3, 'Jane', 'Doe', 'Miss');
INSERT INTO people_source VALUES (4, 'Dave', 'Brown', 'Mr');
MERGE Statement - Insert and Update Case
Spark
MERGE INTO people_target pt
USING people_source ps
ON (pt.person_id = ps.person_id)
WHEN MATCHED THEN UPDATE
SET pt.first_name = ps.first_name,
pt.last_name = ps.last_name,
pt.title = DEFAULT
WHEN NOT MATCHED THEN INSERT
(pt.person_id, pt.first_name, pt.last_name, pt.title)
VALUES (ps.person_id, ps.first_name, ps.last_name, ps.title);
SELECT * FROM people_target;
MERGE INTO people_target2 pt
USING people_source ps
ON (pt.person_id = ps.person_id)
WHEN MATCHED THEN UPDATE
SET pt.first_name = ps.first_name,
pt.last_name = ps.last_name,
pt.title = DEFAULT
WHEN NOT MATCHED THEN INSERT
(pt.person_id, pt.first_name, pt.last_name, pt.title)
VALUES (ps.person_id, ps.first_name, ps.last_name, ps.title);
SELECT * FROM PUBLIC.people_target ORDER BY person_id;
The INSERT and UPDATE actions are equivalently functional in Snowflake. Likewise in both languages, you can specify DEFAULT as expr to explicitly update the column to its default value.
Spark also has the option to insert and update without specifying the affected columns. In this case, the action is applied to all columns. This action assumes that the source table has the same columns as the destination table; otherwise, the query will return a parsing error.
UPDATE SET *
-- This is equivalent to UPDATE SET col1 = source.col1 [, col2 = source.col2 ...]
INSERT *
-- This is equivalent to INSERT (col1 [, col2 ...]) VALUES (source.col1 [, source.col2 ...]
Snowflake does not have these options, so for these scenarios, the equivalent of listing all the columns of the target table is applied.
MERGE Statement - Delete Case
MERGE INTO people_target pt
USING people_source ps
ON (pt.person_id = ps.person_id)
WHEN MATCHED AND pt.person_id < 3 THEN DELETE
WHEN NOT MATCHED BY TARGET THEN INSERT *;
SELECT * FROM people_target;
MERGE INTO people_target pt
USING people_source ps
ON (pt.person_id = ps.person_id)
WHEN MATCHED AND pt.person_id < 3 THEN DELETE
WHEN NOT MATCHED THEN INSERT
(pt.person_id, pt.first_name, pt.last_name, pt.title)
VALUES (ps.person_id, ps.first_name, ps.last_name, ps.title);
SELECT * FROM people_target;
The DELETE action is equivalently functional in Snowflake, as is the option to include an extra condition for the MATCHED/NOT MATCHED.
WHEN NOT MATCHED BY TARGET can be used as an alias for WHEN NOT MATCHED.
MERGE Statement - WHEN NOT MATCHED BY SOURCE
WHEN NOT MATCHED BY SOURCE clauses are executed when a target row does not match any rows in the source table based on the merge_condition and not_match_by_source_condition (if applied) evaluates to true (Spark documentation).
Snowflake does not support this clause, so a workaround is necessary to support it. Below is the workaround recommended for this case with the DELETE action, the same can be used for the UPDATE action
MERGE INTO people_target pt
USING people_source ps
ON pt.person_id = ps.person_id
WHEN NOT MATCHED BY SOURCE THEN DELETE;
SELECT * FROM people_target;
MERGE INTO people_target pt
USING (
SELECT
pt.person_id
FROM
people_target pt LEFT
JOIN people_source ps ON pt.person_id = ps.person_id
WHERE
ps.person_id is null
) s_src
ON s_src.person_id = pt.person_id
WHEN MATCHED THEN DELETE;
SELECT * FROM people_target;