Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

The detection of changes (often referred to as "delta detection") is essential for effectively managing updates in Snowflake. This method involves using Amazon S3 as an interim staging area, where CSV files are updated by the source system. These CSV files serve as the foundation for delta detection and for making subsequent updates in Snowflake tables

...

Implementation Steps

Step 1: EazyDI Pipeline Configuration

...

  1. CSV Files in S3:

    • CSV files updated by the source systems are deposited into the designated S3 bucket and folder.

    • These files serve as the staging area for delta detection.

Step 3:

...

Snowflake Staging Table (Optional)

If required, you can load data from the S3 staging area into Snowflake tables first for staging using Snowflake's COPY INTO command. This step is optional and can be skipped if you prefer to directly integrate and process data from external stages in Snowflake.

  1. Create Snowflake Tables:

    • Define Snowflake tables that mirror the structure of the CSV files stored in S3.

    • Ensure that primary keys or unique identifiers are established to facilitate delta detection.

  2. Load Data from S3 to Snowflake:

Code Block
languagesql
COPY INTO snowflake_table
FROM 's3://your-bucket/path/'
FILE_FORMAT = (TYPE = 'CSV');

Step 4: Delta Detection and Updates

  1. SQL for Delta Detection:

  2. Merging using a snowflake staging table

Code Block
languagesql
MERGE INTO target_table AS t
USING (
    SELECT * FROM snowflake_staging_table
) AS s
ON t.primary_key = s.primary_key
WHEN MATCHED THEN
    UPDATE SET t.column1 = s.column1, t.column2 = s.column2
WHEN NOT MATCHED THEN
    INSERT (primary_key, column1, column2)
    VALUES (s.primary_key, s.column1, s.column2);
  1. If you want to directly access the S3 files from Snowflake external stage, follow these steps:

Code Block
languagesql
-- Step 1: Define an external stage pointing to S3
CREATE OR REPLACE STAGE s3_stage
  URL = 's3://your-bucket/path/'
  CREDENTIALS = (
    AWS_KEY_ID = 'your_access_key_id',
    AWS_SECRET_KEY = 'your_secret_access_key'
  );

-- Step 2: Perform MERGE operation using the external stage
MERGE INTO target_table AS t
USING (
    SELECT * FROM @s3_stage_path/csv_file 
) AS sources
ON targett.primary_key = sources.primary_key
WHEN MATCHED THEN
    UPDATE SET targett.column1 = sources.column1, targett.column2 = sources.column2
WHEN NOT MATCHED THEN
    INSERT (primary_key, column1, column2)
    VALUES (sources.primary_key, sources.column1, sources.column2);

Considerations

  • Automation: Set up automated scheduled pipelines in EazyDI and Snowflake workflows to regularly execute the pipeline and delta detection process..

...