Welcome, guest! Login / Register - Why register?
Psst.. new poll here.
[email protected] web/email now available. Want one? Go here.
Cannot use outlook/hotmail/live here to register as they blocking our mail servers. #microsoftdeez
Obey the Epel!

Paste

Pasted as SQL by registered user vvillacorta ( 4 months ago )
########################################################################################################### UTILS::

esquema_vpc = 'disc_comercial'
grupo_vpc = 'ibk-discovery-comercial-work-group'
path_ = 's3://ibk-discovery-comercial-us-east-1-654654352211-data/discovery/comercial/'

 

 

########################################################################################################### NOTEBOOKS
def generate_table(esquema_vpc, query_, table, path, llave_,
                   formato='PARQUET', compresion='SNAPPY', grupo=grupo_vpc):
    # seteado de variables
    table_ = table + '_SG'
    path_ = path.replace(table, table_)
    path_del = path_.split(bucket)[-1][1:]
    
    if path_.count('/') <= 6:
        print("ERROR!, ", 'path coincide con uri vpc - PROCESO DETENIDO')
        return {}
    
    if path_.count('/') <= 8:
        print("WARNING!, ", 'puede que la ruta no esté completa')
    
    # se elimina la ubicacion fisica
    bucket_s3.objects.filter(Prefix=path_del).delete()
    print(table_, path_, path_del)

    # se elimina la tabla en athena
    del_ = {}, {}
    try:
        del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table_)
    except Exception as e:
        print('e:::::: ',str(e))
        
    try:
        del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table)
    except Exception as e:
        print('e:::::: ',str(e))
    
    time.sleep(5)
    # se crea nuevamente la tabla
    result_ = wr.athena.create_ctas_table(
        sql=query_,
        database=esquema_vpc,
        ctas_table=table_,
        wait=True,
        s3_output=path_,
        storage_format=formato,
        write_compression=compresion,
        partitioning_info=[llave_],
        athena_query_wait_polling_delay=0.5,
        boto3_session=boto3.Session(),
        workgroup=grupo
    )
    return del_, result_

def apply_create(table='X', path='X', llave='X', query='X'):
    result = generate_table(esquema_vpc, query, table, path, llave)
    return result[0], result[1]['ctas_query_metadata'].raw_payload['Status']
 

 

########################################################################################################### USO:
table_fr = "HM_VMVP_FEEDBACK_RUC"
path_fr = '{}vpc/aceptacion/athena_nuevomes/{}/'.format(path_, table_fr)
llave_fr = 'p_periodo'
query_fr = """
       acá va el query
"""
result = generate_table(esquema_vpc, query_fr, table_fr, path_fr, llave_fr)
result[0], result[1]['ctas_query_metadata'].raw_payload['Status']

 

Revise this Paste

Your Name: Code Language: