Welcome, guest! Login / Register - Why register?
Psst.. new poll here.
[email protected] web/email now available. Want one? Go here.
Cannot use outlook/hotmail/live here to register as they blocking our mail servers. #microsoftdeez
Obey the Epel!

Paste

Pasted as Python by registered user vvillacorta ( 7 months ago )
import pip

package_names = ['awswrangler', 'jupyterlab_execute_time']
pip.main(['install'] + package_names + ['--upgrade'])


# Nativos
from dateutil.relativedelta import relativedelta
from time import gmtime, strftime
from datetime import datetime
import random as rn
import joblib
import time
import json
import sys
import os
import gc

#nube
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker import get_execution_role
import awswrangler as wr
import sagemaker
import boto3


#calculo
import pandas as pd
import numpy as np
import scipy

#grafico
from IPython.display import display
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline
sns.set(style="whitegrid")

#Interacciones con output
import warnings
warnings.filterwarnings("ignore")
# warnings.simplefilter(action='ignore', category=FutureWarning)
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

gc.collect()
# MODELS
##from lightgbm import LGBMClassifier

BASE_DIR = os.path.dirname(os.getcwd())
if BASE_DIR not in sys.path: sys.path.append(BASE_DIR)
    
BASE_DIR = os.path.dirname(os.path.dirname(os.getcwd()))
if BASE_DIR not in sys.path: sys.path.append(BASE_DIR)
print("BASE_DIR::: ", BASE_DIR)
#import scorecardpy as sc

SEED = 29082013
os.environ['PYTHONHASHSEED'] = str(SEED)
np.random.seed(SEED)
rn.seed(SEED)


from utils_campania import *
bucket



bucket_s3 = s3.Bucket(bucket)
bucket_s3


esquema_vpc = 'd_mdl_vpc_disc'

def generate_table(esquema_vpc, query_, table, path, llave_, 
                   formato='PARQUET', compresion='SNAPPY', grupo='athenav2'):
    # seteado de variables
    table_ = table + '_SG'
    path_ = path.replace(table, table_)
    path_del = path_.split(bucket)[-1][1:]
    
    # se elimina la ubicacion fisica
    bucket_s3.objects.filter(Prefix=path_del).delete()
    print(table_, path_, path_del)
    
    # se elimina la tabla en athena
    del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table)
    del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table_)
    
    time.sleep(5)
    # se crea nuevamente la tabla
    result_ = wr.athena.create_ctas_table(
        sql=query_,
        database=esquema_vpc,
        ctas_table=table_,
        wait=True,
        s3_output=path_,
        storage_format=formato,
        write_compression=compresion,
        partitioning_info=[llave_],
        athena_query_wait_polling_delay=0.5,
        boto3_session=boto3.Session(),
        workgroup=grupo
    )
    return del_, result_

def apply_create(table='X', path='X', llave='X', query='X'):
    result = generate_table(esquema_vpc, query, table, path, llave)
    return result[0], result[1]['ctas_query_metadata'].raw_payload['Status']



table_ ='HM_FEEDBACK_RUC_UNICO'
apply_create(
    table=table_, 
    path=path_.format(table_), 
    llave='p_periodo', 
    query="""
        SELECT DD.periodo_campania, DD.num_ruc, --DD.PRODUCTO campania,
         MAX(coalesce(FF.flg_nuevos_pre_feedback, 0)) flg_nuevos_pre,
         MAX(coalesce(FF.flg_nuevos_ap_feedback, 0)) flg_nuevos_ap,
         MAX(coalesce(FF.flg_gestionado_estricto, 0)) flg_gestionado_estricto,
         MAX(coalesce(FF.flg_ce, 0)) flg_ce,
         MAX(coalesce(FF.flg_cne, 0)) flg_cne,
         MAX(coalesce(FF.flg_et, 0)) flg_et,
         MAX(coalesce(FF.flg_no_acepta_campana, 0)) flg_no_acepta_campana,
         MAX(coalesce(FF.flg_tasa_elevada, 0)) flg_tasa_elevada,
         MAX(coalesce(FF.flg_lo_pensara, 0)) flg_lo_pensara,
         MAX(coalesce(FF.flg_acepta_campana, 0)) flg_acepta_campana,
         MAX(coalesce(FF.flg_no_califica, 0)) flg_no_califica,
         SUM(FF.flg_gestionado_estricto) gestion_total,    
         SUM(FF.flg_ce) ce_total,
         SUM(FF.flg_acepta_campana) ac_total,
         DD.periodo_campania p_periodo
    FROM d_mdl_vpc_disc.HM_BASE_DESPLIEGUE_FOR_INDICADORBPE_SG DD
    INNER JOIN d_mdl_vpc_disc.HM_FEEDBACK_TLV_INDICADORBPE_SG FF
    ON DD.periodo_campania = FF.gestion
    --AND DD.PRODUCTO = FF.campania
    AND DD.num_ruc = FF.num_ruc_autocompletado
    GROUP BY DD.periodo_campania, DD.num_ruc--, DD.PRODUCTO
    """
)


df = wr.athena.read_sql_query("""
        SELECT DD.periodo_campania, count(1), count(distinct(num_ruc)), sum(flg_ce), sum(flg_ce)*100.0/count(distinct(num_ruc))
        FROM d_mdl_vpc_disc.HM_FEEDBACK_RUC_UNICO_SG DD
        GROUP BY DD.periodo_campania
        ORDER BY DD.periodo_campania DESC
    """,        
    database="e_perm_aws",
    ctas_approach=False
)
print(df.shape)
df.head(20)

 

Revise this Paste

Your Name: Code Language: