Imports utilizados no desenvolvimento. Sobre a instalação dos imports, favor verificar o tópico requirements.
Caso tenha dificuldades em ler conteúdos em inglês, sugiro utilizar o navegador google chrome para visualização das documentações.
Imports
descrição
documentação
boto3
AWS SDK for Python (Boto3) para criar, configurar e gerenciar serviços da AWS
Ferramenta de análise e manipulação de dados de código aberto rápida, poderosa, flexível e fácil de usar, construída sobre a linguagem de programação Python
Requerimentos utilizados e que necessitam de instalação.
# Requerimentos
# Bibliotecas de desenvolvimento
pandas # Manipulação e análise de dados
boto3 # Integração aos serviços AWS
psycopg2 # Adaptador do Banco de dados PostgreSQL
# Sem pacotes com versões específicas
Como faço para instalar os requerimentos?
Para instalação de requirements utilize o seguinte comando:
# Conexão ao Redshift"""Accessing the S3 buckets using boto3 client"""session=boto3.Session(profile_name='ayltonaguiar')s3_client=session.client('s3')s3=session.resource('s3')
Fiz a leitura buscando em 2 lugares diferentes. A primeira parte é identificação válida dos arquivos (verificar se existem), então foram criadas 2 funções com buscas de caminhos específicos: m_tf.get_path_s3() e m_tf.get_path_redshift() . Ambas retornam os caminhos (path) dos arquivos.
A segunda parte é a leitura dos arquivos identificados. Foram criadas outras 2 funções com parâmetros específicos para cada uma: m_tf.read_s3_tfstate_backend() e m_tf.read_redshift_tfstate(). Ambas retornam informações, por exemplo: região do redshift, região do bucket backend (remote state), etc.
# 1- Pega o diretório dos arquivos, valida e guarda as informações básicas em novas variáveispath_s3=m_tf.get_path_s3()path_redshift=m_tf.get_path_redshift()## 1.1- s3backend_details=m_tf.read_s3_tfstate_backend(f"{path_s3}")backend_bucket,backend_region,backend_key=backend_details## 1.2- redshiftredshift_details=m_tf.read_redshift_tfstate(f"{path_redshift}")redshift_iam_arn,redshift_secrete_name,redshift_region_name=redshift_details
defget_path_s3():# captura do arquivo terraform.statepath_s3_tfstate=Path().absolute().parent.parent.joinpath('terraform','02_aws_s3_and_files','.terraform','terraform.tfstate')# validação do pathpath_validate(path_s3_tfstate,get_path_s3.__name__)returnpath_s3_tfstate
defget_path_redshift():# captura do arquivo terraform.statepath_redshift_tfstate=Path().absolute().parent.parent.joinpath('terraform','03_aws_redshift','terraform.tfstate')# validação do pathpath_validate(path_redshift_tfstate,get_path_redshift.__name__)returnpath_redshift_tfstate
defread_s3_tfstate_backend(path):try:# Captura as informações pelo backend do tfstatewithopen(path)asfile_name:s3_details_backend=json.load(file_name)backend_bucket=s3_details_backend['backend']['config'].get('bucket')backend_region=s3_details_backend['backend']['config'].get('region')backend_key=s3_details_backend['backend']['config'].get('key')print('',"############ S3 Backend ##########",s3_details_backend['backend']['config'],sep='\n')return[backend_bucket,backend_region,backend_key]exceptExceptionase:logging.error(e)returnFalse
defread_redshift_tfstate(path):# Captura dos outputs criados no arquivo terraform.tfstate da pasta aws_redshifttry:withopen(path)asrd_terraform:rd_terraform_json=json.load(rd_terraform)redshift_iam_arn=rd_terraform_json['outputs'].get('iam_role_arn').get('value')redshift_secrete_name=rd_terraform_json['outputs'].get('secrete_name').get('value')redshift_region_name=rd_terraform_json['outputs'].get('region_name').get('value')print('',"####### Outputs redshift.tfstate ##########",redshift_iam_arn,redshift_secrete_name,redshift_region_name,sep='\n')return[redshift_iam_arn,redshift_secrete_name,redshift_region_name]exceptExceptionase:logging.error(e)returnFalse
Para identificar objetos no S3, precisamos do nome do bucket e dos arquivos a serem lidos. Para facilitar esse processo, foi criada a função m_aws.get_bucket(). Essa função precisa de informações como o nome do bucket compartilhado, a chave do bucket backend e a conexão do cliente s3. Ela retornará o nome do bucket que armazena os objetos.
A função m_aws.get_csv_s3() verifica e lista os objetos com extensão '.csv' ou '.CSV', retornando a lista de objetos.
# 2- seleciona o objeto do bucket/key fornecido e pega o valor do output 'bucket-name'bucket_name=m_aws.get_bucket(s3_client,backend_bucket,backend_key)# 3- Listagem de objetos do bucket fornecidolist_objects=m_aws.get_csv_s3(s3_client,bucket_name)
defget_csv_s3(s3_client,bucket_name):# Lista os Objetos do bucket setado e armazena na variável s3_objectstry:s3_list_obj=s3_client.list_objects(Bucket=bucket_name)s3_objects=[]forobjins3_list_obj['Contents']:ifobj['Key'].endswith('csv')orobj['Key'].endswith('CSV'):s3_objects.append(obj['Key'])print('',"####### Objects S3 ##########",s3_objects[0:10],sep='\n')exceptExceptionase:logging.error(e)returnFalsereturns3_objects
Para dar prosseguimento precisamos recuperar alguns dados sensíveis adicionados no secrets manager, então foi criado a função m_aws.get_secrets_redshift() com objetivo de recuperar essas informações e armazená-las para o próximo passo.
A conexão com banco do redshift é feita, nesse projeto, pelo uso do psycopg2. Utilizando as informações recuperadas pela função m_aws.get_secrets_redshift().
# 4- Acessando e pegando os valores guardados do Redshift no Secrets Managerclient_secret=session.client('secretsmanager',region_name=redshift_region_name)secrets_manager_details=m_aws.get_secrets_redshift(client_secret,redshift_secrete_name)redshift_db_name,redshift_db_user,redshift_db_password,redshift_db_port,redshift_db_host=secrets_manager_details# 5- conexão ao bancord_con=psycopg2.connect(host=redshift_db_host,database=redshift_db_name,user=redshift_db_user,password=redshift_db_password,port=redshift_db_port)rd_con.autocommit=Truecur=rd_con.cursor()
defget_secrets_redshift(client_secret,redshift_secrete_name):try:# Captura dos segredos do redshift na AWS Secretsget_secret_value_response=client_secret.get_secret_value(SecretId=redshift_secrete_name)secret_json=json.loads(get_secret_value_response['SecretString'])exceptExceptionase:logging.error(e)returnFalsereturnsecret_json
A função m_aws.create_users_redshift() tem como objetivo criar os usuários, grupos e realizar vinculos entre as partes.
Por último a limpeza do arquivo com dados do secrets manager.
## 5.1 Criando os usuários dos grupos loaders, transformers e reportersm_aws.create_users_redshift(cur,secrets_manager_json)## 5.2 Apagando as informações do secretsdelsecrets_manager_json
defcreate_users_redshift(cur,secret_json):try:# Usuários dos grupos - Falta melhorar essa parte bem hardcode e.e Deus me perdoe, mas é isso aí.loaders=[]reporters=[]transformers=[]# Pegando usuários e senhas dos loaders, transformers e reportersforkey,valueinsecret_json.items():if'loaders'inkey:loaders.append(value)elif'reporters'inkey:reporters.append(value)elif'transformers'inkey:transformers.append(value)# Gerando novas listas a cada 2 items na lista originalloaders=[loaders[i:i+2]foriinrange(0,len(loaders),2)]reporters=[reporters[i:i+2]foriinrange(0,len(reporters),2)]transformers=[transformers[i:i+2]foriinrange(0,len(transformers),2)]groups=loaders,reporters,transformersgroup_names=["loaders","reporters","transformers"]# Criação dos gruposforgroupingroup_names:cur.execute(f"CREATE GROUP {group};")# Criando Usuários com senhas (0=nome do usuário, 1=senha)forgroupingroups:foruseringroup:cur.execute(f"create user {user[0]} with password '{user[1]}';")# Adição dos usuários aos gruposforidx,groupinenumerate(groups):foridx2,userinenumerate(group):cur.execute(f"alter group {group_names[idx]} add user {user[0]};")exceptExceptionase:logging.error(e)returnFalse
Lembra que criamos os usuários e grupos de usuários? Então, estaremos realizando a primeira parte das permissões para os usuários criados, após essa etapa será realizado a preparação para os nomes das pastas.
Para adição de permissões, a função m_aws.give_permissions_database() receberá a conexão, o nome do banco de dados e nome do grupo.
A função m_aws.get_folder() pega a primeira parte do diretório da lista de objetos e armazena em SCHEMAS_REDSHIFT.
# 6- Permissão Create para loaders, select para transformers no banco de dadosm_aws.give_permissions_database(cur,redshift_db_name,groups_redshift)# 7- pegar o primeiro diretório e adiciona-lo a uma lista de schema.schemas_redshift=[]schemas_redshift=m_aws.get_folder(list_objects)
defgive_permissions_database(cur,redshift_db_name,groups):try:group_exception=['reporters']groups_count=group_exception# Executando permissão por grupo não presente em 'groups_count'forgroupingroups.keys():ifgroupnotingroups_count:cur.execute(f''' grant create on database {redshift_db_name} to group {group}; grant select on all tables in schema information_schema to group {group}; grant select on all tables in schema pg_catalog to group {group}; ''')groups_count.append(group)exceptExceptionase:logging.error(e)returnFalse
defget_folder(list_objects):# Criação da primeira parte do diretório como schema no banco de dados redshifttry:folders=[]forfolderinlist_objects:folder=folder.split('/',1)[0]if(foldernotinfolders):# Adiciona apenas as pastas diferentesfolders.append(folder.split('/',1)[0])returnfoldersexceptExceptionase:logging.error(e)returnFalse#print(folders)
Nessa etapa criaremos os schemas e adicionaremos as permissões para os usuários do grupo 'transformers'.
A função m_aws.create_schema_redshift() cria os schemas com base nos diretórios armazenados em schemas_redshift.
Os Tranformadores precisam de acesso aos schemas criados, pois vão trabalhar com todas as tabelas existentes. A função m_aws.give_permission_schemas() visa permitir ao grupo selecionado tenha permissões, além de fazer com que o redshift_db_user
ganhe privilégios para as novas tabelas criadas.
# 8- Criação dos schemas caso não existam_aws.create_schema_redshift(cur,schemas_redshift,redshift_db_user)# 9- Permissão para o grupo transformers sobre os schemas criadosm_aws.give_permission_schemas(cur,schemas_redshift,redshift_db_user,group_transformers)
defcreate_schema_redshift(cur,folders,redshift_db_user):# Criação da primeira parte do diretório como schema no banco de dados redshifttry:forschemainfolders:print('',"####### Comando executado: ##########",sep='\n')print(f'CREATE SCHEMA IF NOT EXISTS "{schema}" AUTHORIZATION {redshift_db_user};')cur.execute(f'CREATE SCHEMA IF NOT EXISTS "{schema}" AUTHORIZATION {redshift_db_user};')# vai criar a pasta entre "", exemplo: "c&a"exceptExceptionase:logging.error(e)returnFalsereturnTrue
defgive_permission_schemas(cur,folders,redshift_db_user,group):try:forkey,valueingroup.items():ifvalue==2:# 'loaders':1, 'transformers':2, 'reporters':3# Adicionando permissões apenas usuários do grupo 'transformers' e privilégios para um determinado userforschemainfolders:cur.execute(f''' grant usage on schema "{schema}" to group {key}; grant select on all tables in schema "{schema}" to group {key}; alter default privileges for user {redshift_db_user} in schema "{schema}" grant select on tables to group {key}; ''')returnTrueexceptExceptionase:logging.error(e)returnFalse
A função m_aws.csv_to_redshift() é um pouco maior, ela contém outras funções em seu corpo. Em resumo ela pega o objeto, faz a leitura utilizando o io.BytesIO(), verifica quais são os delimitadores dos objetos .csv, qual o tipo de dado da coluna, qual o tamanho máximo nos registros da coluna e realiza o copy dos dados para o Redshift.
# 10- Criação das tabelas, colunas e copym_aws.csv_to_redshift(cur,s3,bucket_name,list_objects,redshift_details,redshift_db_name)# 11- fechando conexãord_con.close()
defcsv_to_redshift(cur,s3,bucket_name,list_objects,redshift_details,redshift_db_name):# Reading the individual files from the AWS S3 buckets and putting them in dataframesredshift_iam_arn,redshift_secrete_name,redshift_region_name=redshift_detailsforfileinlist_objects:obj=s3.Object(bucket_name,file)data=obj.get()['Body'].read()# Identificando o delimitadordelimiter_csv=csv_identify_delimiter(data.decode('utf-8'))# Leitura de csvcsv_df=pd.read_csv(io.BytesIO(data),header=0,delimiter=delimiter_csv,low_memory=False)columns_df=csv_df.columnsprint('','####### Iniciando tratamento csv ... ##########',sep='\n')# resevando nomes dos schemas, tabelas e colunasschema='"'+file.split('/')[0]+'"'table=file.split('/')[-1].lower()table=table.replace('.csv','').replace('.CSV','')print('','Tratamento de schema e tabela finalizado.',sep='\n')print('',"Pasta para schema: ",schema,'',"Arquivo para tabela: ",table,sep='\n')# Tratamento das colunas, identificação de datatype e lengthcolumns,columns_names=csv_column_dtype(csv_df,columns_df)# Criação das tabelas e colunascur.execute(f"CREATE TABLE IF NOT EXISTS {redshift_db_name}.{schema}.{table}{columns};")print('',"####### Comando executado: ##########",sep='\n')print(f"CREATE TABLE IF NOT EXISTS {redshift_db_name}.{schema}.{table}{columns};")# Copy CSV do S3 para o Redshiftcur.execute(f""" copy {schema}.{table}{columns_names} from 's3://{bucket_name}/{file}' iam_role '{redshift_iam_arn}' delimiter '{delimiter_csv}' region '{redshift_region_name}' IGNOREHEADER 1 DATEFORMAT AS 'YYYY-MM-DD HH:MI:SS' removequotes maxerror 3; """)print('',"Copy do Objeto Finalizado",sep='\n')