/
snowflake-connector
/
snowflake_app
/
Upload File
HOME
import json import requests import pandas as pd import numpy as np import datetime from django.contrib import messages from django.contrib.auth import authenticate, login, logout from django.shortcuts import render, redirect from django.contrib.auth.decorators import login_required from django.contrib.sessions.models import Session from django.contrib.auth.models import User from django.http import JsonResponse from snowflake.snowpark.session import Session as SFSession from snowflake.connector.pandas_tools import write_pandas from snowflake_app.models import SnowflakeAuthentication, SubscriptionflowAuthentication, MappingDetail, SyncMappingRecord import multiprocessing # Create your views here. def login_name(request): context = {} if request.method == 'POST': username = request.POST.get('username') password = request.POST.get('password') user = authenticate(request, username=username, password=password) if user is not None: login(request, user) print(type(user)) request.session['username'] = username.upper() request.session['user_id'] = user.id request.session['mapping'] = False request.session['snowflake_connected'] = False request.session['sf_connected'] = False sf_entry = SnowflakeAuthentication.objects.filter(user_id=user.id) print(sf_entry) if len(sf_entry) == 1: request.session['snowflake_connected'] = True sf_entry = SubscriptionflowAuthentication.objects.filter(user_id=user.id) print(sf_entry) if len(sf_entry) == 1: request.session['sf_connected'] = True map_entry = MappingDetail.objects.filter(user_id=user.id) print(map_entry) if len(map_entry) == 1: request.session['mapping'] = True # print(request.session.session_key) # session = Session.objects.get(session_key=request.session.session_key) # print(session) # session.session_data["username"] = username # session.save() return redirect('dashboard') else: messages.error(request, 'Credentials not matched!') return render(request, 'login.html', context) @login_required def index(request): context = { "last_update": '', "number": '', "values_found": False } sync_id = SyncMappingRecord.objects.filter(user_id=request.session['user_id']) if len(sync_id) > 0: sync_id = sync_id.latest('pk') context = { "last_update": sync_id.created_on, "number": sync_id.event_number, "values_found": True } print(sync_id) print(request) return render(request, 'index.html', context) @login_required def snowflake_connector(request): context = { "account": '', "user": '', "password": '', "role": '', "values_found": False } if request.method == 'POST': account = request.POST.get('account') user = request.POST.get('user') password = request.POST.get('password') role = request.POST.get('role') parameters = { "account": account, "user": user, "password": password, "role": role } print(parameters) print(account, user, password, role) try: session = SFSession.builder.configs(parameters).create() user_id = User.objects.get(id=request.session['user_id']) # request.session['snowflake_session'] = session request.session['snowflake_connected'] = True snowflake_record = SnowflakeAuthentication( account=account, user=user, password=password, role=role, user_id=user_id ) snowflake_record.save() context['account'] = account context['user'] = user context['password'] = password context['role'] = role messages.success(request, 'Connected to Snowflake!') return redirect('dashboard') except Exception as e: print(e) messages.error(request, e) else: sf_entry = SnowflakeAuthentication.objects.filter(user_id=request.session['user_id']) print(sf_entry) if len(sf_entry) == 1: context['account'] = sf_entry[0].account context['user'] = sf_entry[0].user context['password'] = sf_entry[0].password context['role'] = sf_entry[0].role context['values_found'] = True print(context) return render(request, 'snowflake-connector.html', context) # def snowflake_connector(request): # def _timed_out_function(): # try: # session = SFSession.builder.configs(parameters).create() # user_id = User.objects.get(id=request.session['user_id']) # # request.session['snowflake_session'] = session # # request.session['snowflake_connected'] = True # # snowflake_record = SnowflakeAuthentication( # # account=account, # # user=user, # # password=password, # # role=role, # # user_id=user_id # # ) # # snowflake_record.save() # context['account'] = account # context['user'] = user # context['password'] = password # context['role'] = role # messages.success(request, 'Connected to Snowflake!') # return redirect('dashboard') # except Exception as e: # print(e) # temp = False # messages.error(request, e) # # context = { # "account": '', # "user": '', # "password": '', # "role": '', # "values_found": False # } # if request.method == 'POST': # account = request.POST.get('account') # user = request.POST.get('user') # password = request.POST.get('password') # role = request.POST.get('role') # parameters = { # "account": account, # "user": user, # "password": password, # "role": role # } # print(parameters) # print(account, user, password, role) # try: # temp = True # print(temp) # process = multiprocessing.Process(target=_timed_out_function) # process.start() # process.join(10) # print(temp) # if process.is_alive(): # process.terminate() # raise TimeoutError("Snowflake account name not found!") # except Exception as e: # print(e) # messages.error(request, e) # else: # sf_entry = SnowflakeAuthentication.objects.filter(user_id=request.session['user_id']) # print(sf_entry) # if len(sf_entry) == 1: # context['account'] = sf_entry[0].account # context['user'] = sf_entry[0].user # context['password'] = sf_entry[0].password # context['role'] = sf_entry[0].role # context['values_found'] = True # print(context) # print(messages) # return render(request, 'snowflake-connector.html', context) @login_required def sf_connector(request): context = { "subdomain": '', "client_id": '', "client_secret": '', "values_found": False } if request.method == 'POST': subdomain = request.POST.get('subdomain') client_id = request.POST.get('client_id') client_secret = request.POST.get('client_secret') auth_server_url = "https://" + '%s.subscriptionflow.com/oauth/token' % subdomain client_id = client_id client_secret = client_secret token_req_payload = {'grant_type': 'client_credentials'} print(subdomain, client_id, client_secret) token_response = requests.post(auth_server_url, data=token_req_payload, verify=False, allow_redirects=False, auth=(client_id, client_secret)) print(token_response) if token_response.status_code != 200: request.session['sf_connected'] = False messages.error(request, "(%s) - %s" % (token_response.status_code, token_response.json()['error'])) else: tokens = json.loads(token_response.text) user_id = User.objects.get(id=request.session['user_id']) sf_record = SubscriptionflowAuthentication( subdomain=subdomain, client_id=client_id, client_secret=client_secret, user_id=user_id ) sf_record.save() context['subdomain'] = subdomain context['client_id'] = client_id context['client_secret'] = client_secret messages.success(request, 'Connected to SubscriptionFlow!') request.session['sf_connected'] = True return redirect('dashboard') else: sf_entry = SubscriptionflowAuthentication.objects.filter(user_id=request.session['user_id']) if len(sf_entry) == 1: context['subdomain'] = sf_entry[0].subdomain context['client_id'] = sf_entry[0].client_id context['client_secret'] = sf_entry[0].client_secret context['values_found'] = True return render(request, 'sf-connector.html', context) @login_required def mapping(request): context = {'values_found': False} if request.method == 'POST': try: warehouse = request.POST.get('warehouse-mapping') retention_database = request.POST.get('retention-database-mapping') database = request.POST.get('database-mapping') schema = request.POST.get('schema-mapping') table = request.POST.get('table-mapping') directs = [ ['timestamp', 'e', 'customer'], [request.POST.get('timestamp-col-mapping'), request.POST.get('event-col-mapping'),request.POST.get('customer-col-mapping')], ] map_directs = [ [request.POST.get('metadata-col-mapping')], [request.POST.get('metadata-col-mapping')] ] snowflake_static = {} user_id = User.objects.get(id=request.session['user_id']) snowflake_entry = SnowflakeAuthentication.objects.filter(user_id=request.session['user_id']) sf_entry = SubscriptionflowAuthentication.objects.filter(user_id=request.session['user_id']) mapping_data_id = MappingDetail( direct_list=directs, direct_mapping_list=map_directs, snowflake_static=snowflake_static, warehouse=warehouse, database=database, retention_database=retention_database, schema=schema, table=table, user_id=user_id, max_id=0, snowflake_auth_id=snowflake_entry[0], sf_auth_id=sf_entry[0] ) sync_signals(mapping_data_id) request.session['mapping'] = True mapping_data_id.save() messages.success(request, 'Mapping completed!') return redirect('dashboard') except Exception as e: print(e) messages.error(request, e) context['warehouse_list'] = [] context['database_list'] = [] if request.session['mapping'] is False: if request.session['snowflake_connected']: sf_entry = SnowflakeAuthentication.objects.filter(user_id=request.session['user_id']) print(sf_entry) if len(sf_entry) == 1: try: parameters = { 'account': sf_entry[0].account, 'user': sf_entry[0].user, 'password': sf_entry[0].password, 'role': sf_entry[0].role } session = SFSession.builder.configs(parameters).create() list_warehouse = session.sql('show warehouses').collect() df = pd.DataFrame(list_warehouse) context['warehouse_list'] = df['name'].values list_databases = session.sql('show databases').collect() df = pd.DataFrame(list_databases) context['database_list'] = df['name'].values except Exception as e: print(e) messages.error(request, e) else: map_entry = MappingDetail.objects.filter(user_id=request.session['user_id']) print(map_entry) if len(map_entry) == 1: map_entry = map_entry[0] context['warehouse'] = map_entry.warehouse context['database'] = map_entry.database context['retention_database'] = map_entry.retention_database context['schema'] = map_entry.schema context['table'] = map_entry.table for i in range(0, len(map_entry.direct_list[0])): if map_entry.direct_list[0][i] == 'customer': context['customer'] = map_entry.direct_list[1][i] elif map_entry.direct_list[0][i] == 'timestamp': context['timestamp'] = map_entry.direct_list[1][i] elif map_entry.direct_list[0][i] == 'e': context['e'] = map_entry.direct_list[1][i] for i in range(0, len(map_entry.direct_mapping_list[0])): context['meta'] = map_entry.direct_mapping_list[1][i] context['values_found'] = True print(context) return render(request, 'mapping.html', context) def send_pattern_to_sf(): print('reached') mapping_objects = MappingDetail.objects.all() for mapping_item in mapping_objects: sync_signals(mapping_item) def sync_signals(mapping_data_id): session = SFSession.builder.configs({ 'account': mapping_data_id.snowflake_auth_id.account, 'user': mapping_data_id.snowflake_auth_id.user, 'password': mapping_data_id.snowflake_auth_id.password, 'role': mapping_data_id.snowflake_auth_id.role, 'warehouse': mapping_data_id.warehouse, 'database': mapping_data_id.database, 'schema': mapping_data_id.schema }).create() # print('session', session) query = "%s.%s.%s" % (mapping_data_id.database, mapping_data_id.schema, mapping_data_id.table) # tables = session.table(query).limit(50,0) tables = session.sql("select * from {0} LIMIT '' offset {1}".format(query, mapping_data_id.max_id)) direct_result = tables.select(mapping_data_id.direct_list[1]) direct_values = direct_result.to_pandas() print(len(direct_values)) if len(direct_values) > 0: retention_session = SFSession.builder.configs({ 'account': mapping_data_id.snowflake_auth_id.account, 'user': mapping_data_id.snowflake_auth_id.user, 'password': mapping_data_id.snowflake_auth_id.password, 'role': mapping_data_id.snowflake_auth_id.role, 'warehouse': mapping_data_id.warehouse, 'database': mapping_data_id.retention_database, 'schema': 'PUBLIC' }).create() if len(mapping_data_id.direct_mapping_list[1]) != 0: df2 = tables.select(mapping_data_id.direct_mapping_list[1]) mapping_values = df2.to_pandas() else: mapping_values = pd.DataFrame(range(0, len(direct_values.to_dict('records'))), columns=['ssr']) for key, value in mapping_data_id.snowflake_static.items(): if key in ["customer", "e"]: direct_values.loc[:, key] = value elif key == 'timestamp': direct_values.loc[:, key] = datetime.datetime.now().isoformat() else: mapping_values.loc[:, key] = value for col, val in zip(mapping_data_id.direct_list[0], mapping_data_id.direct_list[1]): direct_values.rename(columns={val.upper(): col}, inplace=True) if len(mapping_data_id.direct_mapping_list[1]) != 0: for col, val in zip(mapping_data_id.direct_mapping_list[0], mapping_data_id.direct_mapping_list[1]): mapping_values.rename(columns={val.upper(): col}, inplace=True) direct_values['meta'] = mapping_values.to_dict('records') else: mapping_values.drop(columns=['ssr'], inplace=True) if len(mapping_values.columns) == 0: df_temp = pd.DataFrame(["data_source"] * len(direct_values), columns=["Snowflake"]) direct_values.loc[:, 'meta'] = df_temp.to_dict('records') else: direct_values['meta'] = mapping_values.to_dict('records') retention_session.write_pandas(direct_values, 'RETENTIONFLOW', auto_create_table=True, table_type="transient") direct_values.loc[:, 'apiKey'] = mapping_data_id.sf_auth_id.client_id direct_values.loc[:, 'c_identity'] = 'id' direct_values = direct_values.to_dict('records') for vals in direct_values: # url = 'https://%s.subscriptionflow.com/api/v1/retentionflow' % args['subscriptionflow']['subdomain'] url = 'https://%s.subscriptionflow.com/api/v1/retentionflow' % mapping_data_id.sf_auth_id.subdomain print(vals) response = requests.post(url, json=vals, headers={'Content-type': 'application/json'}) print(json.loads(response.text)) mapping_data_id.max_id += len(direct_values) mapping_data_id.save() sync_map = SyncMappingRecord( mapping_id=mapping_data_id, user_id=mapping_data_id.user_id, event_number=len(direct_values), snowflake_auth_id=mapping_data_id.snowflake_auth_id, sf_auth_id=mapping_data_id.sf_auth_id, created_on=datetime.datetime.now() ) sync_map.save() @login_required def logout_view(request): logout(request) return redirect('login') @login_required def change_schema(request): print(request) if request.method == 'POST': list_schema = [] warehouse = request.POST.get('warehouse') database = request.POST.get('database') print(warehouse) print(database) if request.session['snowflake_connected']: sf_entry = SnowflakeAuthentication.objects.filter(user_id=request.session['user_id']) print(sf_entry) if len(sf_entry) == 1: try: parameters = { 'account': sf_entry[0].account, 'user': sf_entry[0].user, 'password': sf_entry[0].password, 'role': sf_entry[0].role } session = SFSession.builder.configs(parameters).create() list_schema = session.sql('show schemas in database %s ' % database).collect() df = pd.DataFrame(list_schema) list_schema = df['name'].to_dict() except Exception as e: print(e) messages.error(request, 'Schema not Found!') print(list_schema) return JsonResponse({'success': True, 'schema_list': list_schema}) @login_required def change_table(request): if request.method == 'POST': list_table = [] warehouse = request.POST.get('warehouse') database = request.POST.get('database') schema = request.POST.get('schema') print(warehouse) print(database) print(schema) if request.session['snowflake_connected']: sf_entry = SnowflakeAuthentication.objects.filter(user_id=request.session['user_id']) print(sf_entry) if len(sf_entry) == 1: try: parameters = { 'account': sf_entry[0].account, 'user': sf_entry[0].user, 'password': sf_entry[0].password, 'role': sf_entry[0].role } session = SFSession.builder.configs(parameters).create() list_table = session.sql('show tables in schema %s.%s ' % (database, schema)).collect() df = pd.DataFrame(list_table) list_table = df['name'].to_dict() except Exception as e: print(e) messages.error(request, 'Schema not Found!') print(list_table) return JsonResponse({'success': True, 'table_list': list_table}) @login_required def change_column(request): if request.method == 'POST': list_column = [] warehouse = request.POST.get('warehouse') database = request.POST.get('database') schema = request.POST.get('schema') table = request.POST.get('table') print(warehouse) print(database) print(schema) if request.session['snowflake_connected']: sf_entry = SnowflakeAuthentication.objects.filter(user_id=request.session['user_id']) print(sf_entry) if len(sf_entry) == 1: try: parameters = { 'account': sf_entry[0].account, 'user': sf_entry[0].user, 'password': sf_entry[0].password, 'role': sf_entry[0].role } session = SFSession.builder.configs(parameters).create() list_column = session.sql('show columns in table %s.%s.%s ' % (database, schema, table)).collect() df = pd.DataFrame(list_column) list_column = df['column_name'].to_dict() except Exception as e: print(e) messages.error(request, 'Schema not Found!') print(list_column) return JsonResponse({'success': True, 'column_list': list_column})