Exploring Actual-Time Information Ingestion Into Snowflake – DZone – Uplaza

Earlier Articles on Snowflake

Earlier Articles on CockroachDB CDC


This text builds upon the earlier dialogue in “Tour of Snowflake ingestion using CockroachDB and Redpanda Connect,” the place we investigated the method of streaming changefeeds from CockroachDB to Snowflake utilizing Redpanda Join and Snowpipe in batch mode. Right here, we are going to shift our focus to Kafka Join and display how each batch and streaming modes might be utilized for information ingestion into Snowflake.

  • Deploy a CockroachDB cluster with enterprise changefeeds
  • Deploy Snowflake
  • Deploy Kafka Join
  • Confirm
  • Conclusion

Deploy a CockroachDB Cluster With Enterprise Changefeeds

Begin by both launching a CockroachDB occasion or using a managed service.

  • To allow CDC, execute the next instructions:
SET CLUSTER SETTING cluster.group = '';
SET CLUSTER SETTING enterprise.license="";
SET CLUSTER SETTING kv.rangefeed.enabled = true;
  • Confirm that changefeeds are enabled:
SHOW CLUSTER SETTING kv.rangefeed.enabled;

If the worth is false, replace it to true.

CREATE TABLE cockroachdb (
     id INT PRIMARY KEY,
     worth STRING DEFAULT md5(random()::textual content),
     created_at TIMESTAMPTZ DEFAULT now(),
     updated_at TIMESTAMPTZ DEFAULT NULL);
INSERT INTO cockroachdb SELECT
   (generate_series(1, 10000));
UPDATE cockroachdb SET worth="UPDATED", updated_at = now() WHERE id = 1;
  • Create a changefeed job pointing to an area occasion of Redpanda:
CREATE CHANGEFEED FOR TABLE cockroachdb INTO 'kafka://redpanda:29092';
SELECT * FROM cockroachdb LIMIT 5;
  id |              worth               |          created_at           |          updated_at
-----+----------------------------------+-------------------------------+--------------------------------
   1 | UPDATED                          | 2024-09-09 13:17:57.837984+00 | 2024-09-09 13:17:57.917108+00
   2 | 27a41183599c44251506e2971ba78426 | 2024-09-09 13:17:57.837984+00 | NULL
   3 | 3bf8bc26a750a15691ec4d7ddbb7f5e5 | 2024-09-09 13:17:57.837984+00 | NULL
   4 | b8c5786e8651ddfb3a68eabeadb52f2e | 2024-09-09 13:17:57.837984+00 | NULL
   5 | 3a24df165773639ce89d0d877e7103b7 | 2024-09-09 13:17:57.837984+00 | NULL
(5 rows)

The subsequent step is to arrange the Snowflake Kafka connector.

  • Create a database and schema for outputting changefeed information:
USE ROLE SYSADMIN;
CREATE OR REPLACE DATABASE KAFKADB;
CREATE OR REPLACE SCHEMA kafka_schema;

Comply with the Snowflake documentation to configure the Kafka connector.

  • Create the required tables:
create or substitute desk kafkatb_batch(
    RECORD_METADATA VARIANT,
    RECORD_CONTENT VARIANT
);

create or substitute desk kafkatb_streaming(
    RECORD_METADATA VARIANT,
    RECORD_CONTENT VARIANT
);
  • Arrange roles and permissions:
-- Use a task that may create and handle roles and privileges.
USE ROLE securityadmin;

-- Create a Snowflake function with the privileges to work with the connector.
CREATE OR REPLACE ROLE kafka_connector_role_1;

-- Grant privileges on the database.
GRANT USAGE ON DATABASE kafkadb TO ROLE kafka_connector_role_1;

-- Grant privileges on the schema.
GRANT USAGE ON SCHEMA KAFKADB.kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE TABLE ON SCHEMA KAFKADB.kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE STAGE ON SCHEMA KAFKADB.kafka_schema TO ROLE kafka_connector_role_1;
GRANT CREATE PIPE ON SCHEMA KAFKADB.kafka_schema TO ROLE kafka_connector_role_1;

-- Solely required if the Kafka connector will load information into an present desk.
GRANT OWNERSHIP ON TABLE KAFKADB.KAFKA_SCHEMA.kafkatb_batch TO ROLE kafka_connector_role_1;
GRANT OWNERSHIP ON TABLE KAFKADB.KAFKA_SCHEMA.kafkatb_streaming TO ROLE kafka_connector_role_1;

-- Grant the customized function to an present consumer.
GRANT ROLE kafka_connector_role_1 TO USER username;

-- Set the customized function because the default function for the consumer.
-- Should you encounter an 'Inadequate privileges' error, confirm the function that has the OWNERSHIP privilege on the consumer.
ALTER USER username SET DEFAULT_ROLE = kafka_connector_role_1;

Make sure you observe the documentation for organising key pair authentication for the Snowflake Kafka connector.

docker compose -f compose-redpandadata.yaml up -d
  • As soon as up, navigate to the Redpanda Console.

  • Click on into the cockroachdb matter:

  • Set up the Snowflake Kafka connector:
confluent-hub set up --no-prompt snowflakeinc/snowflake-kafka-connector:newest
  • Use the next configuration for Kafka Join in distributed mode, saved as connect-distributed.properties:
bootstrap.servers=172.18.0.3:29092
group.id=connect-cluster
key.converter=org.apache.kafka.join.json.JsonConverter
worth.converter=org.apache.kafka.join.json.JsonConverter
key.converter.schemas.allow=true
worth.converter.schemas.allow=true
offset.storage.matter=connect-offsets
offset.storage.replication.issue=1
config.storage.matter=connect-configs
config.storage.replication.issue=1
standing.storage.matter=connect-status
standing.storage.replication.issue=1
offset.flush.interval.ms=10000
plugin.path=/usr/share/confluent-hub-components,plugin.path=/usr/native/share/kafka/plugins,/usr/share/filestream-connectors
  • Deploy Kafka Join in distributed mode:
./kafka-connect/bin/connect-distributed.sh connect-distributed.properties
  • Register the Snowflake connector with the next configuration, saved as snowflake-sink-batch.json:
{
    "name":"snowflake-sink-batch",
    "config":{
      "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
      "tasks.max":"8",
      "topics":"cockroachdb",
      "snowflake.topic2table.map": "cockroachdb:kafkatb_batch",
      "buffer.count.records":"10000",
      "buffer.flush.time":"60",
      "buffer.size.bytes":"5000000",
      "snowflake.url.name":"account-name:443",
      "snowflake.user.name":"username",
      "snowflake.private.key":"private-key",
      "snowflake.private.key.passphrase":"",
      "snowflake.database.name":"kafkadb",
      "snowflake.schema.name":"kafka_schema",
      "snowflake.role.name":"kafka_connector_role_1",
      "key.converter":"org.apache.kafka.connect.storage.StringConverter",
      "value.converter":"com.snowflake.kafka.connector.records.SnowflakeJsonConverter"
    }
  }
  • Publish the connector configuration:
curl -d @"snowflake-sink-batch.json" -H "Content-Type: application/json" -X POST http://kafka-connect:8083/connectors
  • Confirm the connector within the Kafka Join UI and within the Kafka Join part of the Redpanda Console.

Should you click on on the snowflake-sink-batch sink, you may see extra data.

The great steps wanted to set this up are completely outlined within the tutorial.

Information will now circulation into Snowflake in batch mode, with updates occurring each 60 seconds as decided by the buffer.flush.time parameter.

Now you can question the info in Snowflake:

choose * from kafkatb_batch restrict 5;

If the whole lot is configured accurately, the info from CockroachDB must be accessible in Snowflake in real-time or in batches, relying in your configuration.

{
  "CreateTime": 1725887877966,
  "key": "[3]",
  "offset": 30007,
  "partition": 0,
  "topic": "cockroachdb"
}
{
  "after": {
    "created_at": "2024-09-09T13:17:57.837984Z",
    "id": 1,
    "updated_at": "2024-09-09T13:17:57.917108Z",
    "value": "UPDATED"
  }
}
  • The subsequent step is to configure the connector in streaming mode. First, cease the present connector with the next command:
curl -X DELETE http://localhost:8083/connectors/snowflake-sink-batch
  • The up to date connector configuration will seem as follows:
{
    "name":"snowflake-sink-streaming",
    "config":{
      "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
      "tasks.max":"8",
      "topics":"cockroachdb",
      "snowflake.topic2table.map": "cockroachdb:kafkatb_streaming",
      "buffer.count.records":"10000",
      "buffer.flush.time":"10",
      "buffer.size.bytes":"5000000",
      "snowflake.url.name":":443",
      "snowflake.user.name":"username",
      "snowflake.private.key":"private-key",
      "snowflake.private.key.passphrase":"",
      "snowflake.database.name":"kafkadb",
      "snowflake.schema.name":"kafka_schema",
      "snowflake.role.name":"kafka_connector_role_1",
      "key.converter":"org.apache.kafka.connect.storage.StringConverter",
      "value.converter":"org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable":"false",
      "snowflake.ingestion.method": "SNOWPIPE_STREAMING",
      "errors.log.enable":"true",
      "schemas.enable":"false"

    }
  }

Pay attention to the snowflake.ingestion.technique parameter. This function removes the necessity to wait 60 seconds to push information to Snowflake, permitting us to scale back the buffer.flush.time to 10 seconds.

  • To deploy the connector, use the next command:
curl -d @"snowflake-sink-streaming.json" -H "Content-Type: application/json" -X POST http://kafka-connect:8083/connectors

Shortly after deployment, the info will likely be accessible within the Snowflake desk.

The earlier examples demonstrated how information was ingested into predefined Snowflake tables. The next technique will robotically infer the schema from the Kafka messages:

  {
    "name":"snowflake-sink-streaming-schematized",
    "config":{
      "connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
      "tasks.max":"8",
      "topics":"cockroachdb",
      "snowflake.topic2table.map": "cockroachdb:kafkatb_streaming_schematized",
      "buffer.count.records":"10000",
      "buffer.flush.time":"10",
      "buffer.size.bytes":"5000000",
      "snowflake.url.name":":443",
      "snowflake.user.name":"username",
      "snowflake.private.key":"private-key",
      "snowflake.private.key.passphrase":"",
      "snowflake.database.name":"kafkadb",
      "snowflake.schema.name":"kafka_schema",
      "snowflake.role.name":"kafka_connector_role_1",
      "key.converter":"org.apache.kafka.connect.storage.StringConverter",
      "value.converter":"org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable":"false",
      "snowflake.ingestion.method": "SNOWPIPE_STREAMING",
      "errors.log.enable":"true",
      "schemas.enable":"false",
      "snowflake.enable.schematization": "TRUE"
    }
  }
  • Save this as snowflake-sink-streaming-schematized.json and deploy it utilizing:
curl -d @"snowflake-sink-streaming-schematized.json" -H "Content-Type: application/json" -X POST http://kafka-connect:8083/connectors
  • Upon deployment, a brand new desk will likely be created in Snowflake with the next schema:
create or substitute TABLE KAFKADB.KAFKA_SCHEMA.KAFKATB_STREAMING_SCHEMATIZED (
    RECORD_METADATA VARIANT COMMENT 'created by automated desk creation from Snowflake Kafka Connector',
    AFTER VARIANT COMMENT 'column created by schema evolution from Snowflake Kafka Connector'
);
  • To examine the desk, use the next question:
SELECT after AS file FROM kafkatb_streaming_schematized LIMIT 5;

Pattern consequence:

{
  "created_at": "2024-09-09T16:39:34.993226Z",
  "id": 18712,
  "updated_at": null,
  "value": "0d6bd8a4a790aab95c97a084d17bd820"
}
  • We will flatten the info for simpler manipulation utilizing the next question:
USE ROLE securityadmin;
GRANT CREATE VIEW ON SCHEMA KAFKADB.kafka_schema TO ROLE kafka_connector_role_1;

USE ROLE kafka_connector_role_1;
USE DATABASE KAFKADB;
USE SCHEMA KAFKA_SCHEMA;
CREATE VIEW v_kafkatb_batch_flattened AS
    SELECT PARSE_JSON(record_content:after):id AS ID,
        PARSE_JSON(record_content:after):worth AS VALUE,
        PARSE_JSON(record_content:after):created_at AS CREATED_AT,
        PARSE_JSON(record_content:after):updated_at AS UPDATED_AT
    FROM kafkatb_batch;

SELECT * FROM v_kafkatb_batch_flattened restrict 1;
ID    VALUE        CREATED_AT                      UPDATED_AT
1   "UPDATED"    "2024-09-09T13:17:57.837984Z"    "2024-09-09T13:17:57.917108Z"
  • Alternatively, for the schematized desk, the view creation assertion can be:
CREATE VIEW v_kafkatb_streaming_schematized_flattened AS
    SELECT PARSE_JSON(after):id AS ID,
        PARSE_JSON(after):worth AS VALUE,
        PARSE_JSON(after):created_at AS CREATED_AT,
        PARSE_JSON(after):updated_at AS UPDATED_AT
    FROM kafkatb_streaming_schematized;
  • To confirm the info circulation, make an replace in CockroachDB and examine for the adjustments in Snowflake:
UPDATE cockroachdb 
  SET worth="UPDATED", updated_at = now() 
WHERE  
  id = 20000; 
  • In Snowflake, execute the next question to substantiate the replace:
SELECT * FROM v_kafkatb_streaming_schematized_flattened the place VALUE = 'UPDATED';

Pattern consequence:

ID    VALUE        CREATED_AT                      UPDATED_AT
20000    "UPDATED"    "2024-09-09T18:15:13.460078Z"    "2024-09-09T18:16:56.550778Z"
19999    "UPDATED"    "2024-09-09T18:15:13.460078Z"    "2024-09-09T18:15:27.365272Z"

The architectural diagram is beneath:

On this course of, we explored Kafka Join as an answer to stream changefeeds into Snowflake. This method gives larger management over how messages are delivered to Snowflake, leveraging the Snowflake Kafka Connector with Snowpipe Streaming for real-time, dependable information ingestion.

Share This Article
Leave a comment

Leave a Reply

Your email address will not be published. Required fields are marked *

Exit mobile version