########################################################################################################### UTILS::

esquema_vpc = 'disc_comercial'
grupo_vpc = 'ibk-discovery-comercial-work-group'
path_ = 's3://ibk-discovery-comercial-us-east-1-654654352211-data/discovery/comercial/'

 

 

########################################################################################################### NOTEBOOKS
def generate_table(esquema_vpc, query_, table, path, llave_,
                   formato='PARQUET', compresion='SNAPPY', grupo=grupo_vpc):
    # seteado de variables
    table_ = table + '_SG'
    path_ = path.replace(table, table_)
    path_del = path_.split(bucket)[-1][1:]
    
    if path_.count(&#039;/&#039;) <= 6:
        print("ERROR!, ", &#039;path coincide con uri vpc - PROCESO DETENIDO&#039;)
        return {}
    
    if path_.count(&#039;/&#039;) <= 8:
        print("WARNING!, ", &#039;puede que la ruta no esté completa&#039;)
    
    # se elimina la ubicacion fisica
    bucket_s3.objects.filter(Prefix=path_del).delete()
    print(table_, path_, path_del)

    # se elimina la tabla en athena
    del_ = {}, {}
    try:
        del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table_)
    except Exception as e:
        print(&#039;e:::::: &#039;,str(e))
        
    try:
        del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table)
    except Exception as e:
        print(&#039;e:::::: &#039;,str(e))
    
    time.sleep(5)
    # se crea nuevamente la tabla
    result_ = wr.athena.create_ctas_table(
        sql=query_,
        database=esquema_vpc,
        ctas_table=table_,
        wait=True,
        s3_output=path_,
        storage_format=formato,
        write_compression=compresion,
        partitioning_info=[llave_],
        athena_query_wait_polling_delay=0.5,
        boto3_session=boto3.Session(),
        workgroup=grupo
    )
    return del_, result_

def apply_create(table=&#039;X&#039;, path=&#039;X&#039;, llave=&#039;X&#039;, query=&#039;X&#039;):
    result = generate_table(esquema_vpc, query, table, path, llave)
    return result[0], result[1][&#039;ctas_query_metadata&#039;].raw_payload[&#039;Status&#039;]
 

 

########################################################################################################### USO:
table_fr = "HM_VMVP_FEEDBACK_RUC"
path_fr = &#039;{}vpc/aceptacion/athena_nuevomes/{}/&#039;.format(path_, table_fr)
llave_fr = &#039;p_periodo&#039;
query_fr = """
       acá va el query
"""
result = generate_table(esquema_vpc, query_fr, table_fr, path_fr, llave_fr)
result[0], result[1][&#039;ctas_query_metadata&#039;].raw_payload[&#039;Status&#039;]

Add a code snippet to your website: www.paste.org