########################################################################################################### UTILS::
esquema_vpc = 'disc_comercial'
grupo_vpc = 'ibk-discovery-comercial-work-group'
path_ = 's3://ibk-discovery-comercial-us-east-1-654654352211-data/discovery/comercial/'
########################################################################################################### NOTEBOOKS
def generate_table(esquema_vpc, query_, table, path, llave_,
formato='PARQUET', compresion='SNAPPY', grupo=grupo_vpc):
# seteado de variables
table_ = table + '_SG'
path_ = path.replace(table, table_)
path_del = path_.split(bucket)[-1][1:]
if path_.count('/') <= 6:
print("ERROR!, ", 'path coincide con uri vpc - PROCESO DETENIDO')
return {}
if path_.count('/') <= 8:
print("WARNING!, ", 'puede que la ruta no esté completa')
# se elimina la ubicacion fisica
bucket_s3.objects.filter(Prefix=path_del).delete()
print(table_, path_, path_del)
# se elimina la tabla en athena
del_ = {}, {}
try:
del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table_)
except Exception as e:
print('e:::::: ',str(e))
try:
del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table)
except Exception as e:
print('e:::::: ',str(e))
time.sleep(5)
# se crea nuevamente la tabla
result_ = wr.athena.create_ctas_table(
sql=query_,
database=esquema_vpc,
ctas_table=table_,
wait=True,
s3_output=path_,
storage_format=formato,
write_compression=compresion,
partitioning_info=[llave_],
athena_query_wait_polling_delay=0.5,
boto3_session=boto3.Session(),
workgroup=grupo
)
return del_, result_
def apply_create(table='X', path='X', llave='X', query='X'):
result = generate_table(esquema_vpc, query, table, path, llave)
return result[0], result[1]['ctas_query_metadata'].raw_payload['Status']
########################################################################################################### USO:
table_fr = "HM_VMVP_FEEDBACK_RUC"
path_fr = '{}vpc/aceptacion/athena_nuevomes/{}/'.format(path_, table_fr)
llave_fr = 'p_periodo'
query_fr = """
acá va el query
"""
result = generate_table(esquema_vpc, query_fr, table_fr, path_fr, llave_fr)
result[0], result[1]['ctas_query_metadata'].raw_payload['Status']
Add a code snippet to your website: www.paste.org