/
snowflake-restapi
/
snowflake_connector
/
Upload File
HOME
import json import pandas as pd import datetime import requests from django.shortcuts import get_object_or_404 from django.utils import timezone from django.utils.timezone import get_current_timezone from rest_framework.decorators import api_view, permission_classes, authentication_classes from rest_framework_simplejwt.authentication import JWTAuthentication from rest_framework.authentication import SessionAuthentication, BasicAuthentication from rest_framework.permissions import IsAuthenticated from rest_framework.response import Response from rest_framework import status from .models import SnowflakeAuthentication, MappingDetail, SyncMappingRecord from .serializers import SnowflakeAuthenticationSerializer, SnowflakeDatabaseList, SnowflakeSchemaList, \ SnowflakeTableList, SnowflakeColumnList, SnowflakeMapping, SnowflakeError, \ SnowflakeDatabaseListReturn, SnowflakeSchemaListReturn, SnowflakeTableListReturn, SnowflakeColumnListReturn from snowflake.snowpark.session import Session as SFSession from snowflake.connector.pandas_tools import write_pandas @api_view(['POST']) @authentication_classes([JWTAuthentication]) @permission_classes([IsAuthenticated]) def snowflake_auth(request): if request.method == 'POST': print('sayyam') print(request.data) snowflake_data = SnowflakeAuthenticationSerializer(data=request.data) snowflake_data.is_valid(raise_exception=True) parameters = { "account": request.data['account'], "user": request.data['user'], "password": request.data['password'], "role": request.data['role'] } try: session = SFSession.builder.configs(parameters).create() print(session) snowflake_data.save() print(snowflake_data) return Response(snowflake_data.data, status=status.HTTP_201_CREATED) except Exception as e: print(e) error_serializer = SnowflakeError(data={'error': str(e)}) error_serializer.is_valid(raise_exception=True) return Response(error_serializer.data, status=status.HTTP_404_NOT_FOUND) @api_view(['GET']) @authentication_classes([JWTAuthentication]) @permission_classes([IsAuthenticated]) def snowflake_database_list(request): if request.method == 'GET': database_serializer = SnowflakeDatabaseList(data=request.data) database_serializer.is_valid(raise_exception=True) queryset = get_object_or_404(SnowflakeAuthentication, pk=request.data['snowflake_id']) print(queryset) parameters = { "account": queryset.account, "user": queryset.user, "password": queryset.password, "role": queryset.role } try: session = SFSession.builder.configs(parameters).create() print(session) list_warehouse = session.sql('show warehouses').collect() df = pd.DataFrame(list_warehouse) warehouse_list = df['name'].values list_databases = session.sql('show databases').collect() df = pd.DataFrame(list_databases) database_list = df['name'].values data = { 'snowflake_id': request.data['snowflake_id'], 'warehouses': warehouse_list, 'databases': database_list, } snowflake_warehouse_return = SnowflakeDatabaseListReturn(data=data) snowflake_warehouse_return.is_valid(raise_exception=True) return Response(snowflake_warehouse_return.data, status=status.HTTP_200_OK) except Exception as e: print(e) error_serializer = SnowflakeError(data={'error': str(e)}) error_serializer.is_valid(raise_exception=True) return Response(error_serializer.data, status=status.HTTP_404_NOT_FOUND) @api_view(['GET']) @authentication_classes([JWTAuthentication]) @permission_classes([IsAuthenticated]) def snowflake_schema_list(request): print(request.data) if request.method == 'GET': schema_serializer = SnowflakeSchemaList(data=request.data) schema_serializer.is_valid(raise_exception=True) queryset = get_object_or_404(SnowflakeAuthentication, pk=request.data['snowflake_id']) print(queryset) parameters = { "account": queryset.account, "user": queryset.user, "password": queryset.password, "role": queryset.role } try: session = SFSession.builder.configs(parameters).create() print(session) list_schema = session.sql('show schemas in database %s ' % request.data['database']).collect() df = pd.DataFrame(list_schema) list_schema = df['name'].values print(list_schema) data = {**request.data, 'schemas': list_schema} print(data) schema_return = SnowflakeSchemaListReturn(data=data) schema_return.is_valid(raise_exception=True) return Response(schema_return.data, status=status.HTTP_200_OK) except Exception as e: print(e) error_serializer = SnowflakeError(data={'error': str(e)}) error_serializer.is_valid(raise_exception=True) return Response(error_serializer.data, status=status.HTTP_404_NOT_FOUND) @api_view(['GET']) @authentication_classes([JWTAuthentication]) @permission_classes([IsAuthenticated]) def snowflake_table_list(request): print(request.data) if request.method == 'GET': table_serializer = SnowflakeTableList(data=request.data) table_serializer.is_valid(raise_exception=True) queryset = get_object_or_404(SnowflakeAuthentication, pk=request.data['snowflake_id']) print(queryset) parameters = { "account": queryset.account, "user": queryset.user, "password": queryset.password, "role": queryset.role } try: session = SFSession.builder.configs(parameters).create() print(session) list_table = session.sql( 'show tables in schema %s.%s ' % (request.data['database'], request.data['schema'])).collect() df = pd.DataFrame(list_table) list_table = df['name'].values data = {**request.data, 'tables': list_table} schema_return = SnowflakeTableListReturn(data=data) schema_return.is_valid(raise_exception=True) return Response(schema_return.data, status=status.HTTP_200_OK) except Exception as e: print(e) error_serializer = SnowflakeError(data={'error': str(e)}) error_serializer.is_valid(raise_exception=True) return Response(error_serializer.data, status=status.HTTP_404_NOT_FOUND) @api_view(['GET']) @authentication_classes([JWTAuthentication]) @permission_classes([IsAuthenticated]) def snowflake_column_list(request): print(request.data) if request.method == 'GET': column_serializer = SnowflakeColumnList(data=request.data) column_serializer.is_valid(raise_exception=True) queryset = get_object_or_404(SnowflakeAuthentication, pk=request.data['snowflake_id']) print(queryset) parameters = { "account": queryset.account, "user": queryset.user, "password": queryset.password, "role": queryset.role } try: session = SFSession.builder.configs(parameters).create() print(session) list_column = session.sql('show columns in table %s.%s.%s ' % ( request.data['database'], request.data['schema'], request.data['table'])).collect() df = pd.DataFrame(list_column) list_column = df['column_name'].values data = {**request.data, 'columns': list_column} schema_return = SnowflakeColumnListReturn(data=data) schema_return.is_valid(raise_exception=True) return Response(schema_return.data, status=status.HTTP_200_OK) except Exception as e: print(e) error_serializer = SnowflakeError(data={'error': str(e)}) error_serializer.is_valid(raise_exception=True) return Response(error_serializer.data, status=status.HTTP_404_NOT_FOUND) @api_view(['POST']) @authentication_classes([JWTAuthentication]) @permission_classes([IsAuthenticated]) def snowflake_mapping(request): print(request.data) if request.method == 'POST': mapping_serializer = SnowflakeMapping(data=request.data) mapping_serializer.is_valid(raise_exception=True) queryset = get_object_or_404(SnowflakeAuthentication, pk=request.data['snowflake_id']) print(queryset) try: warehouse = request.data['warehouse'] retention_database = request.data['retention_database'] database = request.data['database'] schema = request.data['schema'] table = request.data['table'] directs = [ ['timestamp', 'e', 'customer'], [request.data['timestamp'], request.data['e'], request.data['customer']], ] map_directs = [ [request.data['metadata']], [request.data['metadata']] ] snowflake_static = {} queryset = get_object_or_404(SnowflakeAuthentication, pk=request.data['snowflake_id']) mapping_data_id = MappingDetail( warehouse=warehouse, retention_database=retention_database, database=database, schema=schema, table=table, direct_list=directs, direct_mapping_list=map_directs, snowflake_static=snowflake_static, max_id=0, snowflake_id=queryset, client_id=request.data['client_id'], subdomain=request.data['subdomain'] ) sync_signals(mapping_data_id, limit=1) mapping_data_id.save() return Response(mapping_serializer.data, status=status.HTTP_201_CREATED) except Exception as e: print(e) error_serializer = SnowflakeError(data={'error': str(e)}) error_serializer.is_valid(raise_exception=True) return Response(error_serializer.data, status=status.HTTP_404_NOT_FOUND) def sync_signals(mapping_data_id, limit): session = SFSession.builder.configs({ 'account': mapping_data_id.snowflake_id.account, 'user': mapping_data_id.snowflake_id.user, 'password': mapping_data_id.snowflake_id.password, 'role': mapping_data_id.snowflake_id.role, 'warehouse': mapping_data_id.warehouse, 'database': mapping_data_id.database, 'schema': mapping_data_id.schema }).create() # print('session', session) query = "%s.%s.%s" % (mapping_data_id.database, mapping_data_id.schema, mapping_data_id.table) # tables = session.table(query).limit(50,0) if limit == '': tables = session.sql("select * from {0} LIMIT '' offset {1}".format(query, mapping_data_id.max_id)) else: tables = session.sql("select * from {0} LIMIT 1 offset {1}".format(query, mapping_data_id.max_id)) direct_result = tables.select(mapping_data_id.direct_list[1]) direct_values = direct_result.to_pandas() print(len(direct_values)) if len(direct_values) > 0: retention_session = SFSession.builder.configs({ 'account': mapping_data_id.snowflake_id.account, 'user': mapping_data_id.snowflake_id.user, 'password': mapping_data_id.snowflake_id.password, 'role': mapping_data_id.snowflake_id.role, 'warehouse': mapping_data_id.warehouse, 'database': mapping_data_id.retention_database, 'schema': 'PUBLIC' }).create() if len(mapping_data_id.direct_mapping_list[1]) != 0: df2 = tables.select(mapping_data_id.direct_mapping_list[1]) mapping_values = df2.to_pandas() else: mapping_values = pd.DataFrame(range(0, len(direct_values.to_dict('records'))), columns=['ssr']) for key, value in mapping_data_id.snowflake_static.items(): if key in ["customer", "e"]: direct_values.loc[:, key] = value elif key == 'timestamp': direct_values.loc[:, key] = datetime.datetime.now(tz=get_current_timezone()).isoformat() else: mapping_values.loc[:, key] = value for col, val in zip(mapping_data_id.direct_list[0], mapping_data_id.direct_list[1]): direct_values.rename(columns={val.upper(): col}, inplace=True) if len(mapping_data_id.direct_mapping_list[1]) != 0: for col, val in zip(mapping_data_id.direct_mapping_list[0], mapping_data_id.direct_mapping_list[1]): mapping_values.rename(columns={val.upper(): col}, inplace=True) direct_values['meta'] = mapping_values.to_dict('records') else: mapping_values.drop(columns=['ssr'], inplace=True) if len(mapping_values.columns) == 0: df_temp = pd.DataFrame(["data_source"] * len(direct_values), columns=["Snowflake"]) direct_values.loc[:, 'meta'] = df_temp.to_dict('records') else: direct_values['meta'] = mapping_values.to_dict('records') retention_session.write_pandas(direct_values, 'RETENTIONFLOW', auto_create_table=True, table_type="transient") direct_values.loc[:, 'apiKey'] = mapping_data_id.client_id direct_values.loc[:, 'c_identity'] = 'id' direct_values = direct_values.to_dict('records') for vals in direct_values: url = 'https://%s/api/v1/retentionflow' % mapping_data_id.subdomain print(vals) response = requests.post(url, json=vals, headers={'Content-type': 'application/json'}) print(response.status_code) print(json.loads(response.text)) mapping_data_id.max_id += len(direct_values) mapping_data_id.save() sync_map = SyncMappingRecord( mapping_id=mapping_data_id, snowflake_id=mapping_data_id.snowflake_id, event_number=len(direct_values), created_on=datetime.datetime.now() ) sync_map.save() def send_pattern_to_sf(): print('reached') mapping_objects = MappingDetail.objects.all() for mapping_item in mapping_objects: sync_signals(mapping_item, '')