esquema_vpc = 'd_mdl_vpc_disc'
def only_del(table, path):
# seteado de variables
table_ = table + '_SG'
path_ = path.replace(table, table_)
path_del = path_.split(bucket)[-1][1:]
# se elimina la ubicacion fisica
bucket_s3.objects.filter(Prefix=path_del).delete()
print(table_, path_, path_del)
# se elimina la tabla en athena
#del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table)
del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table_)
def generate_table(esquema_vpc=esquema_vpc, query_='', table='', path='', llave_='periodo',
formato='Parquet', compresion='SNAPPY', grupo='athenav2'):
# seteado de variables
table_ = table + '_SG'
path_ = path.replace(table, table_)
path_del = path_.split(bucket)[-1][1:]
print("tabla: ", table_)
print("uri: ", path_)
print("ruta: ", path_del)
# se elimina la ubicacion fisica
bucket_s3.objects.filter(Prefix=path_del).delete()
print(table_, path_, path_del)
# se elimina la tabla en athena
#del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table)
del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table_)
time.sleep(5)
# se crea nuevamente la tabla
result_ = wr.athena.create_ctas_table(
sql=query_,
database=esquema_vpc,
ctas_table=table_,
wait=True,
s3_output=path_,
storage_format=formato,
write_compression=compresion,
partitioning_info=[llave_],
athena_query_wait_polling_delay=0.5,
boto3_session=boto3.Session(),
workgroup=grupo
)
return del_, result_
def generate_table_new(query_='', table='', path='', llave_='periodo', esquema_vpc=esquema_vpc,
formato='Parquet', compresion='SNAPPY', grupo='athenav2'):
# seteado de variables
table_ = table + '_SG'
path_ = path.replace(table, table_)
path_del = path_.split(bucket)[-1][1:]
# se elimina la ubicacion fisica
bucket_s3.objects.filter(Prefix=path_del).delete()
print("DEL S3: ", table_, path_, path_del)
# se elimina la tabla en athena
#del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table)
del_ = wr.catalog.delete_table_if_exists(database=esquema_vpc, table=table_)
print("DEL ATHENA")
time.sleep(5)
# se crea nuevamente la tabla
qyery_create = """
CREATE TABLE {}.{}
WITH ( format = '{}',
parquet_compression = '{}',
partitioned_by = ARRAY['{}'],
external_location= '{}'
)
AS (
{}
)
""".format(esquema_vpc, table_, formato, compresion, llave_, path_, query_)
print(qyery_create)
result_ = wr.athena.read_sql_query(
qyery_create,
database=esquema_vpc,
workgroup=grupo,
ctas_approach=False
)
return del_, result_
def apply_create_new(table='X', path='X', llave='X', query='X'):
result = generate_table_new(esquema_vpc, query, table, path, llave)
return result[0], result[1]
def apply_create(table='X', path='X', llave='X', query='X', esquema_vpc=esquema_vpc):
result = generate_table(esquema_vpc, query, table, path, llave)
return result[0], result[1]['ctas_query_metadata'].raw_payload['Status']
Add a code snippet to your website: www.paste.org