Projeto ETL com Apache Airflow usando a API de Dados Abertos da Assembleia Legislativa de Minas
python
apache_airflow
sql_server
etl
datawarewouse
2. Requisitos Funcionais
Coleta de dados
O sistema vai coletar dados do endpoint:
- https://dadosabertos.almg.gov.br/ws/proposicoes/pesquisa/direcionada?tp=100&formato=json&ano=2024&ord=3&p=50000&ini=20250123&fim=20250126, utilizando os filtros de paginação, data de início e data de fim.
- https://dadosabertos.almg.gov.br/ws/proposicoes/pesquisa/direcionada?formato=json&ano=2024&num=11063: URL de exemplo para fazer o reprocessamento.
Processamento de dados
O sistema deverá realizar o tratamento de caracteres, como:
- Remover espaços desnecessários;
- Remover caracteres de quebra de linha, deixando o texto em uma única linha;
- Formatar datas corretamente.
Armazenamento de dados
Os dados serão armazenados no banco SQL Server.
3. Requisitos Não Funcionais
- Execução: O sistema deve rodar com um intervalo de um dia para envio de dados.
- Monitoramento: O sistema deve incluir logs de execução, registrando históricos e erros para reprocessamento.
4. Estrutura do Banco de Dados
Tabela proposicao
CREATE TABLE proposicao (
ID INTEGER IDENTITY(1,1) PRIMARY KEY,
AUTOR NVARCHAR(300),
DATA_PRESENTACAO DATETIME,
EMENTA NVARCHAR(MAX),
REGIME VARCHAR(50),
SITUACCAO VARCHAR(80),
TIPO_PROPOSICAO VARCHAR(100),
NUMERO VARCHAR(40) UNIQUE,
ANO INTEGER,
CIDADE VARCHAR(60),
ESTADO VARCHAR(80),
DATA_INSERSAO_REGISTRO DATETIME DEFAULT GETDATE(),
DATA_ATUALIZACAO_REGISTRO DATETIME
);
Dicionário de dados da tabela proposicao
| Campo |
Tipo |
Descrição |
| ID |
Incremental |
ID automático |
| AUTOR |
String |
Autor da proposição, ex. “Governador Romeu Zema Neto” |
| DATA_PRESENTACAO |
Timestamp |
Data de apresentação da proposição, ex. “2022-10-06T00:00:00Z” |
| EMENTA |
String |
Assunto da proposição, ex. “Encaminha o Projeto de Lei 4008 2022…” |
| REGIME |
String |
Regime de tramitação da proposição, ex. “Especial” |
| SITUACCAO |
String |
Situação atual da proposição, ex. “Publicado” |
| TIPO_PROPOSICAO |
String |
Tipo da proposição, ex. “MSG” |
| NUMERO |
String |
Número da proposição, ex. “300” |
| ANO |
Integer |
Ano da proposição, ex. 2022 |
| CIDADE |
String |
Cidade fixa “Belo Horizonte” |
| ESTADO |
String |
Estado fixo “Minas Gerais” |
| DATA_INSERSAO |
DATETIME |
Data de inserção do registro: Padrão: DATA ATUAL |
| DATA_ATUALIZACAO_REGISTRO |
DATETIME |
Data de atualização do registro |
Tabela tramitacao
CREATE TABLE tramitacao (
ID INTEGER IDENTITY(1,1) PRIMARY KEY,
DESCRICAO VARCHAR(MAX),
LOCAL_PROPOSICAO VARCHAR(60),
ID_PROPOSICAO VARCHAR(40),
DATA_CRIACAO_TRAMITACAO DATETIME,
DATA_CRIACAO_REGISTRO DATETIME DEFAULT GETDATE(),
DATA_ATUALIZACAO_REGISTRO DATETIME,
FOREIGN KEY (ID_PROPOSICAO) REFERENCES proposicao(NUMERO)
);
Dicionário de dados da tabela tramitação
| Campo |
Tipo |
Descrição |
| id |
Incremental |
ID automático |
| DATA_CRIACAO_TRAMITACAO |
Timestamp |
Data do registro da tramitação, ex. “2022-10-04T00:00:00Z” |
| DESCRICAO |
String |
Descrição do histórico da tramitação, ex. “Proposição lida em Plenário.\nPublicada no DL…” |
| LOCAL_PROPOSICAO |
String |
Local da tramitação, ex. “Plenário” |
| ID_PROPOSICAO |
ForeignKey |
Chave estrangeira que referencia o ID da proposição |
| DATA_CRIACAO_REGISTRO |
Timestamp |
Data da inserção do registro |
| DATA_ATUALIZACAO_REGISTRO |
Timestamp |
Data atualização registro |
Tabela log_dag
CREATE TABLE log_dag (
ID INTEGER IDENTITY(1,1) PRIMARY KEY,
TIPO_ERROR INTEGER,
TIPO_LOG VARCHAR(20),
URL_API VARCHAR(MAX),
MENSAGEM_LOG VARCHAR(300),
JSON_XML VARCHAR(MAX),
JSON_ENVIO VARCHAR(MAX),
DATA_REGISTRO DATETIME DEFAULT GETDATE()
);
Dicionário de dados da tabela log_dag
| CAMPO |
TIPO |
DESCRIÇÃO |
| ID |
INTEGER |
ID automático incremental |
| TIPO_ERROR |
INTEGER |
REGISTRA O CÓDIGO TIPO ERROR |
| TIPO_LOG |
VARCHAR(20) |
ERROR, INFO |
| URL_API |
VARCHAR(MAX) |
URL DA API |
| MENSAGEM_LOG |
VARCHAR(300) |
MENSAGEM DE ERRO |
| JSON_XML |
VARCHAR(MAX) |
JSON DA API |
| JSON_ENVIO |
VARCHAR(MAX) |
JSON QUE TENTOU SER GRAVADO NO BANCO |
| DATA_REGISTRO |
DATETIME |
REGISTRA A DATA DE INSERÇÃO DO REGISTRO |
Tabela dag_error
CREATE TABLE dag_error (
ID INTEGER IDENTITY(1,1) PRIMARY KEY,
TIPO_ERROR INTEGER,
NUMERO VARCHAR(40),
URL_API VARCHAR(MAX),
JSON_XML VARCHAR(MAX),
JSON_ENVIO VARCHAR(MAX),
MENSAGEM_ERRO VARCHAR(300),
DATA_REGISTRO DATETIME DEFAULT GETDATE(),
DATA_ATUALIZACAO DATETIME
);
Dicionário de dados da tabela dag_error
| CAMPO |
TIPO |
DESCRIÇÃO |
| ID |
INTEGER |
ID automático incremental |
| TIPO_ERROR |
INTEGER |
REGISTRA O CÓDIGO TIPO ERROR |
| NUMERO |
VARCHAR(40) |
NÚMERO DA PROPOSIÇÃO |
| URL_API |
VARCHAR(MAX) |
URL DA API |
| JSON_XML |
VARCHAR(MAX) |
JSON DA API |
| JSON_ENVIO |
VARCHAR(MAX) |
JSON QUE TENTOU SER GRAVADO NO BANCO |
| MENSAGEM_ERRO |
VARCHAR(300) |
MENSAGEM DE ERRO |
| DATA_REGISTRO |
DATETIME |
DATA DE REGISTRO |
| DATA_ATUALIZACAO |
DATETIME |
DATA DA ATUALIZAÇÃO DO ERRO |
5. Fluxo da DAG

A DAG inicia verificando a conexão com o banco de dados e a API. Se ambas estiverem ativas, inicia-se o processo ETL para extração e tratamento de dados. Caso algum registro falhe, ele é gravado na tabela dag_error e reprocessado antes da finalização do ETL.
6. Diagrama de Classe
A figura abaixo, mostra uma proposta de diagrama de classe, com destaque para a a conexão da API e banco de dados, podendo ser substituído sem prejuízo ao funcionamento do processo de ETL.

7 - Assistir ao vídeo
Link do reposítório