Snowflake Ingestion Utilizing CockroachDB and Redpanda – DZone – Uplaza

Earlier Articles on Snowflake

Earlier Articles on CockroachDB CDC


Motivation

I work with monetary companies shoppers, and it’s normal to come across a necessity for streaming adjustments within the operational knowledge retailer into a knowledge warehouse or a knowledge lake. A former colleague lately reached out for recommendation on the quickest and best technique to load commerce knowledge into Snowflake. I’ve provide you with at the very least three strategies, which I’ll discover in a follow-up collection of articles. Nonetheless, I’ve determined to first discover Redpanda Join, an answer that has lately caught my consideration. That is on no account a conclusive information on how changefeed knowledge should be loaded into Snowflake; we’re merely exploring the probabilities and discussing the professionals and cons in later articles.

CockroachDB changefeeds are an enterprise characteristic and require a license. On this tutorial, I am utilizing a free-to-start model of CockroachDB Serverless, which has enterprise changefeeds enabled.

Excessive-Stage Steps

  • Deploy a CockroachDB cluster with enterprise changefeeds
  • Deploy Redpanda Join
  • Deploy Snowflake
  • Confirm
  • Conclusion

Step-By-Step Directions

Deploy a CockroachDB Cluster With Enterprise Changefeeds

Begin an occasion of CockroachDB or use the managed service.

To allow CDC we have to execute the next instructions:

SET CLUSTER SETTING cluster.group = '';

SET CLUSTER SETTING enterprise.license="";

SET CLUSTER SETTING kv.rangefeed.enabled = true;

I’m utilizing CockroachDB Serverless and the above steps aren’t essential. It’s possible you’ll affirm whether or not the changefeeds are certainly enabled utilizing the next command:

SHOW CLUSTER SETTING kv.rangefeed.enabled;

If the worth is false, change it to true.

Generate pattern knowledge:

CREATE TABLE office_dogs (
     id INT PRIMARY KEY,
     title STRING);

INSERT INTO office_dogs VALUES
   (1, 'Petee'),
   (2, 'Carl');

UPDATE office_dogs SET title="Petee H" WHERE id = 1;

We have populated the desk after which up to date a document. Let’s add extra knowledge to make it attention-grabbing:

INSERT INTO office_dogs SELECT generate_series(3, 10000), md5(random()::string);
SELECT * FROM office_dogs LIMIT 5;
id,title
1,Petee H
2,Carl
3,6e19280ae649efffa7a58584c7f46032
4,5e4e897f008bb752c8edfa64a3aed356
5,abc0d898318d27f23a43060f89d62e34
SELECT COUNT(*) FROM office_dogs;

I am operating Redpanda Join in a Docker Compose file.

docker compose -f compose-redpanda.yaml up -d

The contents of the file are:

companies:

  redpanda:
    container_name: redpanda-connect
    hostname: redpanda-connect
    picture: docker.redpanda.com/redpandadata/join
    volumes:
      - ./redpanda/join.yaml:/join.yaml
      - /Customers/aervits/.ssh/rsa_key.pem:/rsa_key.pem

I shall be utilizing the join.yaml file as the inspiration to attach all of the parts on this article. For extra detailed info, you possibly can seek advice from the documentation offered by Redpanda.

Probably the most primary configuration seems like so:

enter:
  stdin: {}

pipeline:
  processors: []

output:
  stdout: {}

Since I am utilizing CockroachDB enter, mine seems like this:

enter:
  # CockroachDB Enter
  label: ""
  cockroachdb_changefeed:
    dsn: postgresql://:@:/?sslmode=verify-full
    tls:
      skip_cert_verify: true
      #enable_renegotiation: false
      #root_cas: ""
      #root_cas_file: ""
      client_certs: []
    tables: [table_for_cdc] # No default (required)
    cursor_cache: "" # No default (non-obligatory)
    auto_replay_nacks: true

pipeline:
  processors: []

output:
  stdout: {}

Depart the pipeline and output as default.

For reference, I am together with the repo with my supply code the place you possibly can reference the values.

When you’ve got been following alongside, you’ll have seen that I have never began a changefeed job in CockroachDB. The cockroachdb_changefeed enter immediately subscribes to the desk, which might be noticed by analyzing the logs utilizing the command docker logs redpanda-connect --follow. If you happen to have a look at the join.yaml file, the output is shipped to stdout:

{"primary_key":"[9998]","row":"{"after": {"id": 9998, "title": "0794a9d1c99e8e47ee4515be6e0d736f"}}","table":"office_dogs"}
{"primary_key":"[9999]","row":"{"after": {"id": 9999, "title": "c85a6b38154f7e3085d467d567141d45"}}","table":"office_dogs"}
{"primary_key":"[10000]","row":"{"after": {"id": 10000, "title": "aae9e0849fff8f47e0371a4c06fb255b"}}","table":"office_dogs"}

The following step is to configure Snowflake. We aren’t going to take a look at the obtainable processors immediately.

Deploy Snowflake

I am utilizing a Snowflake trial account. You get a beneficiant credit score which needs to be enough to finish this tutorial.

We have to create a database and a desk the place we are going to output the changefeed knowledge.

CREATE OR REPLACE DATABASE FROM_COCKROACH;
CREATE OR REPLACE TABLE OFFICE_DOGS (RECORD variant);

We additionally have to create a consumer with key-pair authentication as we will be utilizing the Snowpipe service.

openssl genrsa 2048 | openssl pkcs8 -topk8 -v2 des3 -inform PEM -out rsa_key.p8

We should use an encrypted key as Redpanda does not help unencrypted variations.

Generate a public key:

openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub 

Lastly, generate a pem file from the personal key:

openssl pkcs8 -in rsa_key.p8 -out rsa_key.pem

In Snowflake, alter the consumer to make use of the important thing pair generated within the earlier step.

ALTER USER username SET rsa_public_key='MIIB...';

We will now populate the join.yaml file with the required info for the snowflake_put output. This output sort is for industrial use and requires a license, however since we’re utilizing it for demo functions, we’re capable of proceed.

output:
  # Snowflake Output
  label: ""
  snowflake_put:
    account: 
    consumer: 
    private_key_file: rsa_key.pem
    function: ACCOUNTADMIN
    database: 
    warehouse: 
    schema: 
    stage: "@%implicit_table_stage_name"
    path: "path"
    upload_parallel_threads: 4
    compression: NONE
    batching:
      depend: 10
      interval: 3s
      processors:
        - archive:
            format: json_array
    max_in_flight: 1

If we restart the compose setting and tail the logs, we are able to see the next:

stage=information msg="Running main config from specified file" @service=benthos benthos_version=v4.32.1 path=/join.yaml
stage=information msg="Listening for HTTP requests at: http://0.0.0.0:4195" @service=benthos
stage=information msg="Launching a Redpanda Connect instance, use CTRL+C to close" @service=benthos
stage=information msg="Output type snowflake_put is now active" @service=benthos label="" path=root.output
stage=information msg="Input type cockroachdb_changefeed is now active" @service=benthos label="" path=root.enter

Let’s take a look at the implicit desk stage and observe if something has modified.

| canines/f2f3cf47-d6bc-46f4-88f2-c82519b67481.json | 1312 | 30f709e4962bae9d10b48565d22e9f32 | Wed, 14 Aug 2024 18:58:43 GMT |
| canines/f6adbf39-3955-4848-93c3-06f873a88078.json | 1312 | 28be7a619ef1e139599077e977ea130b | Wed, 14 Aug 2024 18:58:13 GMT |
| canines/f8705606-eb07-400a-9ffe-da6834fa1a30.json | 1296 | 5afbdce0e8929fc38a2eb5e0f12b96d6 | Wed, 14 Aug 2024 18:57:29 GMT |
| canines/f9e5c01a-7dda-4e76-840d-13b8a1e4946a.json | 1296 | 5480c01f1578f67afe2761c7619e9123 | Wed, 14 Aug 2024 18:57:32 GMT |
| canines/fad4efe7-3f3f-48bc-bdb4-9f0310abcf4d.json | 1312 | 5942c6e2dbaef5ee257d4a9b8e68827d | Wed, 14 Aug 2024 18:58:04 GMT |

The recordsdata are able to be copied right into a desk. Let’s create a pipe:

CREATE OR REPLACE PIPE FROM_COCKROACH.PUBLIC.cockroach_pipe AUTO_INGEST = FALSE AS COPY INTO FROM_COCKROACH.PUBLIC.OFFICE_DOGS FROM (SELECT * FROM @%office_dogs) FILE_FORMAT = (TYPE = JSON COMPRESSION = AUTO STRIP_OUTER_ARRAY = TRUE);

The final remaining step is to refresh the pipe.

ALTER PIPE cockroach_pipe REFRESH;
| canines/ff0871b1-6f49-43a4-a929-958d07f74046.json | SENT   |
| canines/ff131d8d-3781-4cf6-8700-edd50dbb87de.json | SENT   |
| canines/ff216da1-4f9d-4b37-9776-bcd559dd4a6f.json | SENT   |
| canines/ff221430-4c3a-46be-bbc2-d335cc6cc9e3.json | SENT   |
| canines/ffbd7d45-5084-4e36-8907-61874ac652b4.json | SENT   |
| canines/fffb5fa6-23cc-4450-934a-29ccf01c67b9.json | SENT   |

Let’s question the desk in Snowflake:

SELECT * FROM OFFICE_DOGS LIMIT 5;
| {                                                                                       |
|   "primary_key": "[5241]",                                                              |
|   "row": "{"after": {"id": 5241, "title": "5e0360a0d10d849afbbfa319a50bccf2"}}", |
|   "table": "office_dogs"                                                                |
| }                                                                                       |
| {                                                                                       |
|   "primary_key": "[5242]",                                                              |
|   "row": "{"after": {"id": 5242, "title": "62be250249afe74bfbc5dd356e7b0ad9"}}", |
|   "table": "office_dogs"                                                                |
| }                                                                                       |
| {                                                                                       |
|   "primary_key": "[5243]",                                                              |
|   "row": "{"after": {"id": 5243, "title": "7f286800a8a03e74938d09fdba52f869"}}", |
|   "table": "office_dogs"                                                                |
| }                                                                                       |
| {                                                                                       |
|   "primary_key": "[5244]",                                                              |
|   "row": "{"after": {"id": 5244, "title": "16a330b8f09bcd314f9760ffe26d0ae2"}}", |
|   "table": "office_dogs"                                                                |
| }

We count on 10000 rows:

SELECT COUNT(*) FROM OFFICE_DOGS;
+----------+                                                                    
| COUNT(*) |
|----------|
|    10000 |
+----------+

The info is in JSON format. Let’s create a view and flatten the information out.

CREATE VIEW v_office_dogs AS
    SELECT PARSE_JSON(document:row):after:id::INTEGER AS id,
           PARSE_JSON(document:row):after:title::STRING AS title FROM OFFICE_DOGS;

Question the view:

SELECT * FROM v_office_dogs WHERE id 
+----+----------------------------------+                                       
| ID | NAME                             |
|----+----------------------------------|
|  1 | Petee H                          |
|  2 | Carl                             |
|  3 | 6e19280ae649efffa7a58584c7f46032 |
|  4 | 5e4e897f008bb752c8edfa64a3aed356 |
|  5 | abc0d898318d27f23a43060f89d62e34 |
+----+----------------------------------+

Let’s make issues a bit extra attention-grabbing and delete knowledge in CockroachDB.

DELETE FROM office_dogs WHERE title="Carl";
DELETE FROM office_dogs WHERE id = 1;

In Snowflake, let’s refresh the pipe as of some minutes in the past:

ALTER PIPE cockroach_pipe REFRESH MODIFIED_AFTER='2024-08-14T12:10:00-07:00';

Discover there are a few recordsdata.

+------------------------------------------------+--------+                     
| File                                           | Standing |
|------------------------------------------------+--------|
| canines/2a4ee400-6b37-4513-97cb-097764a340bc.json | SENT   |
| canines/8f5b5b69-8a00-4dbf-979a-60c3814d96b4.json | SENT   |
+------------------------------------------------+--------+

I need to warning that in the event you run the REFRESH manually, you could trigger duplicates in your Snowflake knowledge. We are going to have a look at higher approaches in a future article.

Let’s take a look at the row depend:

+----------+                                                                    
| COUNT(*) |
|----------|
|    10002 |
+----------+

The elimination course of did not correctly replace in Snowflake as anticipated; it acknowledged the deleted data however didn’t mirror the state in CockroachDB. We have to incorporate further logic to attain this. This shall be a activity for one more time.

Lastly, I want to notice that utilizing Redpanda Join as a compose file is non-obligatory. You’ve got the choice to run the Docker container by executing the next command:

docker run --rm -it -v ./redpanda/join.yaml:/join.yaml -v ./snowflake/rsa_key.pem:/rsa_key.pem docker.redpanda.com/redpandadata/join run

Conclusion

Right this moment, we explored Redpanda Join as a method to ship streaming changefeeds into Snowflake. We have solely simply begun to delve into this subject, and future articles will construct upon the foundations laid immediately.

Share This Article
Leave a comment

Leave a Reply

Your email address will not be published. Required fields are marked *

Exit mobile version