Prefect integrations for interacting with Snowflake.
Project description
prefect-snowflake
Visit the full docs here to see additional examples and the API reference.
Welcome!
Prefect integrations for interacting with prefect-snowflake.
Getting Started
Python setup
Requires an installation of Python 3.7+.
We recommend using a Python virtual environment manager such as pipenv, conda or virtualenv.
These tasks are designed to work with Prefect 2.0. For more information about how to use Prefect, please refer to the Prefect documentation.
Installation
Install prefect-snowflake
with pip
:
pip install prefect-snowflake
A list of available blocks in prefect-snowflake
and their setup instructions can be found here.
Query from table
from prefect import flow
from prefect_snowflake.credentials import SnowflakeCredentials
from prefect_snowflake.database import SnowflakeConnector, snowflake_query
@flow
def snowflake_query_flow():
snowflake_credentials = SnowflakeCredentials(
account="account",
user="user",
password="password",
)
snowflake_connector = SnowflakeConnector(
database="database",
warehouse="warehouse",
schema="schema",
credentials=snowflake_credentials
)
result = snowflake_query(
"SELECT * FROM table WHERE id=%{id_param}s LIMIT 8;",
snowflake_connector,
params={"id_param": 1}
)
return result
snowflake_query_flow()
Write pandas to table using block attributes
import pandas as pd
from prefect import flow
from prefect_snowflake.credentials import SnowflakeCredentials
from prefect_snowflake.database import SnowflakeConnector, snowflake_query
from snowflake.connector.pandas_tools import write_pandas
@flow
def snowflake_write_pandas_flow():
snowflake_connector = SnowflakeConnector.load("my-block")
with snowflake_connector.get_connection() as conn:
table_name = "TABLE_NAME"
ddl = "NAME STRING, NUMBER INT"
statement = f'CREATE TABLE IF NOT EXISTS {table_name} ({ddl})'
with conn.cursor() as cur:
cur.execute(statement)
# case sensitivity matters here!
df = pd.DataFrame([('Marvin', 42), ('Ford', 88)], columns=['NAME', 'NUMBER'])
success, num_chunks, num_rows, _ = write_pandas(
conn=conn,
df=df,
table_name=table_name,
database=snowflake_connector.database,
schema=snowflake_connector.schema_ # note the "_" suffix
)
Execute get
and put
statements
To execute get
and put
statements, use snowflake_query_sync
.
from prefect import flow
from prefect_snowflake.database import SnowflakeConnector, snowflake_query_sync
@flow
def snowflake_put_file_to_snowflake_stage():
snowflake_connector = SnowflakeConnector.load("my-block")
snowflake_query_sync(
f"put file:///myfolder/myfile @mystage/mystagepath",
snowflake_connector=snowflake_connector
)
Use with_options
to customize options on any existing task or flow:
from prefect import flow
from prefect_snowflake.database import SnowflakeConnector, snowflake_query_sync
custom_snowflake_query_sync = snowflake_query_sync.with_options(
name="My custom task name",
retries=2,
retry_delay_seconds=10,
)
@flow
def example_with_options_flow():
snowflake_connector = SnowflakeConnector.load("my-block")
custom_snowflake_query_sync(
f"put file:///myfolder/myfile @mystage/mystagepath",
snowflake_connector=snowflake_connector
)
example_with_options_flow()
For more tips on how to use tasks and flows in a Collection, check out Using Collections!
Resources
If you encounter any bugs while using prefect-snowflake
, feel free to open an issue in the prefect-snowflake repository.
If you have any questions or issues while using prefect-snowflake
, you can find help in either the Prefect Discourse forum or the Prefect Slack community.
Feel free to star or watch prefect-snowflake
for updates too!
Contributing
If you'd like to help contribute to fix an issue or add a feature to prefect-snowflake
, please propose changes through a pull request from a fork of the repository.
Here are the steps:
- Fork the repository
- Clone the forked repository
- Install the repository and its dependencies:
pip install -e ".[dev]"
- Make desired changes
- Add tests
- Insert an entry to CHANGELOG.md
- Install
pre-commit
to perform quality checks prior to commit:
pre-commit install
git commit
,git push
, and create a pull request
Project details
Release history Release notifications | RSS feed
Download files
Download the file for your platform. If you're not sure which to choose, learn more about installing packages.
Source Distribution
Built Distribution
Hashes for prefect_snowflake-0.26.0-py3-none-any.whl
Algorithm | Hash digest | |
---|---|---|
SHA256 | e6506ddf8f3db44df015c00b6a12b07ae99f58d4bf85145379a4fb98d7d18d2d |
|
MD5 | b8f0bcf87811a1beea9d9e0dec4666a1 |
|
BLAKE2b-256 | b4a742c1060438bf20b546bf99e3a81ce34c4edfabaebb2cbb7e238247bb9a16 |