Welcome, guest! Login / Register - Why register?
Psst.. new poll here.
[email protected] web/email now available. Want one? Go here.
Cannot use outlook/hotmail/live here to register as they blocking our mail servers. #microsoftdeez
Obey the Epel!

Paste

Pasted as SQL by registered user vvillacorta ( 1 year ago )
esquema_vpc = 'd_mdl_vpc_disc'

def only_del(table, path):
    # seteado de variables
    table_ = table + '_SG'
    path_ = path.replace(table, table_)
    path_del = path_.split(bucket)[-1][1:]
    
    # se elimina la ubicacion fisica
    bucket_s3.objects.filter(Prefix=path_del).delete()
    print(table_, path_, path_del)
    
    # se elimina la tabla en athena
    #del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table)
    del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table_)
    

def generate_table(esquema_vpc=esquema_vpc, query_='', table='', path='', llave_='periodo', 
                   formato='Parquet', compresion='SNAPPY', grupo='athenav2'):
    # seteado de variables
    table_ = table + '_SG'
    path_ = path.replace(table, table_)
    path_del = path_.split(bucket)[-1][1:]
    print("tabla: ", table_)
    print("uri: ", path_)
    print("ruta: ", path_del)
    # se elimina la ubicacion fisica
    bucket_s3.objects.filter(Prefix=path_del).delete()
    print(table_, path_, path_del)
    
    # se elimina la tabla en athena
    #del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table)
    del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table_)
    
    time.sleep(5)
    # se crea nuevamente la tabla
    result_ = wr.athena.create_ctas_table(
        sql=query_,
        database=esquema_vpc,
        ctas_table=table_,
        wait=True,
        s3_output=path_,
        storage_format=formato,
        write_compression=compresion,
        partitioning_info=[llave_],
        athena_query_wait_polling_delay=0.5,
        boto3_session=boto3.Session(),
        workgroup=grupo
    )
    return del_, result_


def generate_table_new(query_='', table='', path='', llave_='periodo', esquema_vpc=esquema_vpc, 
                   formato='Parquet', compresion='SNAPPY', grupo='athenav2'):
    # seteado de variables
    table_ = table + '_SG'
    path_ = path.replace(table, table_)
    path_del = path_.split(bucket)[-1][1:]
    
    # se elimina la ubicacion fisica
    bucket_s3.objects.filter(Prefix=path_del).delete()
    print("DEL S3: ", table_, path_, path_del)
    
    # se elimina la tabla en athena
    #del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table)
    del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table_)
    print("DEL ATHENA")
    
    time.sleep(5)
    # se crea nuevamente la tabla
    
    qyery_create = """
            CREATE TABLE {}.{}
            WITH ( format = '{}', 
                     parquet_compression = '{}', 
                     partitioned_by = ARRAY['{}'],
                     external_location= '{}'
                   )
            AS (
                {}
            )
    """.format(esquema_vpc, table_, formato, compresion, llave_, path_, query_)
    print(qyery_create)
    
    result_ = wr.athena.read_sql_query(
        qyery_create,
        database=esquema_vpc,
        workgroup=grupo,         
        ctas_approach=False
    )
    return del_, result_




def apply_create_new(table='X', path='X', llave='X', query='X'):
    result = generate_table_new(esquema_vpc, query, table, path, llave)
    return result[0], result[1]

def apply_create(table='X', path='X', llave='X', query='X', esquema_vpc=esquema_vpc):
    result = generate_table(esquema_vpc, query, table, path, llave)
    return result[0], result[1]['ctas_query_metadata'].raw_payload['Status']

 

Revise this Paste

Your Name: Code Language: