Skip to main content

Prefect integrations for interacting with Snowflake.

Project description

prefect-snowflake

PyPI

Welcome!

Prefect integrations for interacting with prefect-snowflake.

Getting Started

Python setup

Requires an installation of Python 3.7+.

We recommend using a Python virtual environment manager such as pipenv, conda or virtualenv.

These tasks are designed to work with Prefect 2.0. For more information about how to use Prefect, please refer to the Prefect documentation.

Installation

Install prefect-snowflake with pip:

pip install prefect-snowflake

Then, register to view the block on Prefect Cloud:

prefect block register -m prefect_snowflake.credentials

Note, to use the load method on Blocks, you must already have a block document saved through code or saved through the UI.

Query from table

from prefect import flow
from prefect_snowflake.credentials import SnowflakeCredentials
from prefect_snowflake.database import SnowflakeConnector, snowflake_query


@flow
def snowflake_query_flow():
    snowflake_credentials = SnowflakeCredentials(
        account="account",
        user="user",
        password="password",
    )
    snowflake_connector = SnowflakeConnector(
        database="database",
        warehouse="warehouse",
        schema="schema",
        credentials=snowflake_credentials
    )
    result = snowflake_query(
        "SELECT * FROM table WHERE id=%{id_param}s LIMIT 8;",
        snowflake_connector,
        params={"id_param": 1}
    )
    return result

snowflake_query_flow()

Write pandas to table using block attributes

import pandas as pd
from prefect import flow
from prefect_snowflake.credentials import SnowflakeCredentials
from prefect_snowflake.database import SnowflakeConnector, snowflake_query
from snowflake.connector.pandas_tools import write_pandas

@flow
def snowflake_write_pandas_flow():
    snowflake_connector = SnowflakeConnector.load("my-block")
    with snowflake_connector.get_connection() as conn:
        table_name = "TABLE_NAME"
        ddl = "NAME STRING, NUMBER INT"
        statement = f'CREATE TABLE IF NOT EXISTS {table_name} ({ddl})'
        with conn.cursor() as cur:
            cur.execute(statement)

        # case sensitivity matters here!
        df = pd.DataFrame([('Marvin', 42), ('Ford', 88)], columns=['NAME', 'NUMBER'])
        success, num_chunks, num_rows, _ = write_pandas(
            conn=conn,
            df=df,
            table_name=table_name,
            database=snowflake_connector.database,
            schema=snowflake_connector.schema_  # note the "_" suffix
        )

Execute get and put statements

To execute get and put statements, use snowflake_query_sync.

from prefect import flow
from prefect_snowflake.database import SnowflakeConnector, snowflake_query_sync

@flow
def snowflake_put_file_to_snowflake_stage():
    snowflake_connector = SnowflakeConnector.load("my-block")
    
    snowflake_query_sync(
        f"put file:///myfolder/myfile @mystage/mystagepath",
        snowflake_connector=snowflake_connector
    )
            

Resources

If you encounter any bugs while using prefect-snowflake, feel free to open an issue in the prefect-snowflake repository.

If you have any questions or issues while using prefect-snowflake, you can find help in either the Prefect Discourse forum or the Prefect Slack community.

Feel free to ⭐️ or watch prefect-snowflake for updates too!

Development

If you'd like to install a version of prefect-snowflake for development, clone the repository and perform an editable install with pip:

git clone https://github.com/PrefectHQ/prefect-snowflake.git

cd prefect-snowflake/

pip install -e ".[dev]"

# Install linting pre-commit hooks
pre-commit install

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

prefect-snowflake-0.2.2.tar.gz (28.8 kB view hashes)

Uploaded Source

Built Distribution

prefect_snowflake-0.2.2-py3-none-any.whl (12.7 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page