Psst.. new poll here.
[email protected] web/email now available. Want one? Go here.
Cannot use outlook/hotmail/live here to register as they blocking our mail servers. #microsoftdeez
Obey the Epel!
Paste
Pasted as Python by registered user vvillacorta ( 7 months ago )
import pip
package_names = ['awswrangler', 'jupyterlab_execute_time']
pip.main(['install'] + package_names + ['--upgrade'])
# Nativos
from dateutil.relativedelta import relativedelta
from time import gmtime, strftime
from datetime import datetime
import random as rn
import joblib
import time
import json
import sys
import os
import gc
#nube
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker import get_execution_role
import awswrangler as wr
import sagemaker
import boto3
#calculo
import pandas as pd
import numpy as np
import scipy
#grafico
from IPython.display import display
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline
sns.set(style="whitegrid")
#Interacciones con output
import warnings
warnings.filterwarnings("ignore")
# warnings.simplefilter(action='ignore', category=FutureWarning)
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
gc.collect()
# MODELS
##from lightgbm import LGBMClassifier
BASE_DIR = os.path.dirname(os.getcwd())
if BASE_DIR not in sys.path: sys.path.append(BASE_DIR)
BASE_DIR = os.path.dirname(os.path.dirname(os.getcwd()))
if BASE_DIR not in sys.path: sys.path.append(BASE_DIR)
print("BASE_DIR::: ", BASE_DIR)
#import scorecardpy as sc
SEED = 29082013
os.environ['PYTHONHASHSEED'] = str(SEED)
np.random.seed(SEED)
rn.seed(SEED)
from utils_campania import *
bucket
bucket_s3 = s3.Bucket(bucket)
bucket_s3
esquema_vpc = 'd_mdl_vpc_disc'
def generate_table(esquema_vpc, query_, table, path, llave_,
formato='PARQUET', compresion='SNAPPY', grupo='athenav2'):
# seteado de variables
table_ = table + '_SG'
path_ = path.replace(table, table_)
path_del = path_.split(bucket)[-1][1:]
# se elimina la ubicacion fisica
bucket_s3.objects.filter(Prefix=path_del).delete()
print(table_, path_, path_del)
# se elimina la tabla en athena
del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table)
del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table_)
time.sleep(5)
# se crea nuevamente la tabla
result_ = wr.athena.create_ctas_table(
sql=query_,
database=esquema_vpc,
ctas_table=table_,
wait=True,
s3_output=path_,
storage_format=formato,
write_compression=compresion,
partitioning_info=[llave_],
athena_query_wait_polling_delay=0.5,
boto3_session=boto3.Session(),
workgroup=grupo
)
return del_, result_
def apply_create(table='X', path='X', llave='X', query='X'):
result = generate_table(esquema_vpc, query, table, path, llave)
return result[0], result[1]['ctas_query_metadata'].raw_payload['Status']
table_ ='HM_FEEDBACK_RUC_UNICO'
apply_create(
table=table_,
path=path_.format(table_),
llave='p_periodo',
query="""
SELECT DD.periodo_campania, DD.num_ruc, --DD.PRODUCTO campania,
MAX(coalesce(FF.flg_nuevos_pre_feedback, 0)) flg_nuevos_pre,
MAX(coalesce(FF.flg_nuevos_ap_feedback, 0)) flg_nuevos_ap,
MAX(coalesce(FF.flg_gestionado_estricto, 0)) flg_gestionado_estricto,
MAX(coalesce(FF.flg_ce, 0)) flg_ce,
MAX(coalesce(FF.flg_cne, 0)) flg_cne,
MAX(coalesce(FF.flg_et, 0)) flg_et,
MAX(coalesce(FF.flg_no_acepta_campana, 0)) flg_no_acepta_campana,
MAX(coalesce(FF.flg_tasa_elevada, 0)) flg_tasa_elevada,
MAX(coalesce(FF.flg_lo_pensara, 0)) flg_lo_pensara,
MAX(coalesce(FF.flg_acepta_campana, 0)) flg_acepta_campana,
MAX(coalesce(FF.flg_no_califica, 0)) flg_no_califica,
SUM(FF.flg_gestionado_estricto) gestion_total,
SUM(FF.flg_ce) ce_total,
SUM(FF.flg_acepta_campana) ac_total,
DD.periodo_campania p_periodo
FROM d_mdl_vpc_disc.HM_BASE_DESPLIEGUE_FOR_INDICADORBPE_SG DD
INNER JOIN d_mdl_vpc_disc.HM_FEEDBACK_TLV_INDICADORBPE_SG FF
ON DD.periodo_campania = FF.gestion
--AND DD.PRODUCTO = FF.campania
AND DD.num_ruc = FF.num_ruc_autocompletado
GROUP BY DD.periodo_campania, DD.num_ruc--, DD.PRODUCTO
"""
)
df = wr.athena.read_sql_query("""
SELECT DD.periodo_campania, count(1), count(distinct(num_ruc)), sum(flg_ce), sum(flg_ce)*100.0/count(distinct(num_ruc))
FROM d_mdl_vpc_disc.HM_FEEDBACK_RUC_UNICO_SG DD
GROUP BY DD.periodo_campania
ORDER BY DD.periodo_campania DESC
""",
database="e_perm_aws",
ctas_approach=False
)
print(df.shape)
df.head(20)
Revise this Paste