Psst.. new poll here.
[email protected] web/email now available. Want one? Go here.
Cannot use outlook/hotmail/live here to register as they blocking our mail servers. #microsoftdeez
Obey the Epel!
Paste
Pasted as Python by registered user vvillacorta ( 1 year ago )
def generate_table(esquema_vpc, query_, table, path, llave_,
formato='PARQUET', compresion='SNAPPY', grupo='athenav2'):
# seteado de variables
table_ = table + '_SG'
path_ = path.replace(table, table_)
path_del = path_.split(bucket)[-1][1:]
# se elimina la ubicacion fisica
bucket_s3.objects.filter(Prefix=path_del).delete()
print(table_, path_, path_del)
# se elimina la tabla en athena
#del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table)
del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table_)
time.sleep(2)
# se crea nuevamente la tabla
result_ = wr.athena.create_ctas_table(
sql=query_,
database=esquema_vpc,
ctas_table=table_,
wait=True,
s3_output=path_,
storage_format=formato,
write_compression=compresion,
partitioning_info=[llave_],
athena_query_wait_polling_delay=0.5,
boto3_session=boto3.Session(),
workgroup=grupo
)
return del_, result_
def apply_create(table='X', path='X', llave='X', query='X'):
result = generate_table(esquema_vpc, query, table, path, llave)
return result[0], result[1]['ctas_query_metadata'].raw_payload['Status']
table_ ='SUNAT_SOLO_BASE_RIESGOS'
apply_create(
table=table_,
path='s3://sagemaker-us-east-1-058528764918/vpc/contactabilidad/athena_2/{}/'.format(table_),
llave='p_periodo',
query="""
SELECT AA.*, AA.periodo_val p_periodo
FROM(
SELECT ROW_NUMBER() OVER(PARTITION by periodo_val, numruc_val ORDER BY tiempo_deuda_tributaria_amt DESC) ORDEN,
*
FROM e_perm_aws.t_fact_vpc_agg_sunat_reniec
WHERE numruc_val IN (select num_ruc from d_mdl_vpc_disc.HM_RIESGOS_DATA_BPE)
AND periodo_val >= '202201'
) AA
WHERE AA.ORDEN = 1
"""
)
Revise this Paste