Integration between Python/FastAPI and Snowflake
Requirements
Basic python
Installation requirements — FastAPI and Uvicorn
pip install fastapipip install uvicorn
Snowflake Account
Steps
Log in with your snowflake account
Now let us create a stage named first_stage -
create or replace stage first_stage
file_format = (type = 'CSV' field_delimiter = ',' skip_header = 1);
the above statement file_format type = CSV
This because we can upload a CSV file to snowflake also.
Now let us create a table named Customers with parameters as CustomerID, LastName, FirstName, Products, Category
create table Customers(CustomerID int, LastName varchar(20) ,FirstName varchar(20),Products varchar(30),Category varchar(15));
Let us insert some records in the table
INSERT INTO Customers(CustomerID, LastName, FirstName, Products, Category)VALUES ('2', 'Maheshwarappa', 'Abhishek', 'OnePlus', 'Mobile');
To check the entries
select * from Customers;
Output
Now let us create the procedure
create or replace procedure read_customer_proc()
returns String not null
language javascript
as
$$ var my_sql_command = "select * from Customers";var statement1 = snowflake.createStatement( {sqlText: my_sql_command} ); var result_set1 = statement1.execute();while(result_set1.next()) { var column1 = result_set1.getColumnValue(1);
var column2 = result_set1.getColumnValue(2);
var column3 = result_set1.getColumnValue(3);
var column4 = result_set1.getColumnValue(4);
var column5 = result_set1.getColumnValue(5); var column = column1+' '+column2+' '+column3+' '+column4+' '+column5 } return column;
$$
;
Command to call the procedure
call read_person_proc();
Now to work with we need to call that procedure in python using FastAPI
Install Snowflake Connector
pip install — upgrade snowflake-connector-python
from fastapi import FastAPIimport snowflake.connector as sfusername='<username>'
password='<password>'
account='<account>'
warehouse='<warehouse>'
database='<database>'
ctx=sf.connect(user=username,password=password,account=account,warehouse=warehouse,database=database)app = FastAPI()@app.get('/fetchdata')async def fetchdata():
cursor = ctx.cursor()
cursor.execute("USE WAREHOUSE COMPUTE_WH ")
cursor.execute("USE DATABASE FIRST")
cursor.execute("USE schema PUBLIC")sql = cursor.execute("call read_customer_proc()")
for data in sql:
return data
Save this with .py extension, say main.py
To run this use the command
uvicorn main:app --reload
Go to web-browser
http://127.0.0.1:8000/fetchdata
Conclusion
In this article, we learned how to connect snowflake with the fast API, and as part of it, we wrote procedure and code in python to access that procedure from the fast API in the local system.
Reference