Todos nos deparamos com as complexidades de atualização de dados. Isso toma muito tempo e invariavelmente inicia por meio de planilhas.
Mas, com o passar do tempo, evoluimos para o uso de Bancos de Dados, os quais nos estendem mais opções.
Ainda na curva ascendente do aprendizado automatizamos o processo, ou parte dele para obtermos benefícios em termos de tempo, assertividade e praticidade.
A pergunta é: Como fazer isso?
Bem, estou disponibilizando abaixo parte do processo que poderá inspirar você a resolver os seus problemas. Espero que seja útil, aplicável às suas necessidades e adaptável ao seu processo e necessidade de atualização.
PROVAVELMENTE INICIE O DIA COM O SEGUINTE E-MAIL:
Assunto: 📊 Verificação e Acompanhamento do Processo de atualização dos Arquivos de FLATFILE IQVIA DDD Mensal
Olá BI SÊNIOR, tudo bem?
Com o intuito de identificarmos e mapearmos todo o processo, informo que o arquivo do DDD Mensal já está disponível. Segue o caminho para referência:
Para darmos sequência, preciso que me responda às seguintes perguntas:
1️⃣ Detecção automática:
O sistema de automação já consegue identificar que esse arquivo específico foi disponibilizado?
2️⃣ Carga no MySQL:
O sistema já efetuou a carga do arquivo no MySQL?
3️⃣ Confirmação por e-mail:
Quando o sistema de automação enviará para mim a confirmação da respectiva carga no MySQL 📬?
4️⃣ Nome da tabela de controle:
Qual é mesmo o nome da tabela interna no MySQL utilizada para controle e acompanhamento das atualizações?
Aguardo seu retorno para que possamos validar os próximos passos e otimizar o processo.
Abraços,
Seu chefe
E SUA RESPOSTA DEVE SER EM AÇÕES. MAS QUAIS?
Sugestões Tecnológicas para Melhorar o Acompanhamento do Processo 📌
Agendamento Automático de Verificação: Criar um script em Python ou PowerShell para monitorar a pasta e disparar alertas assim que um novo arquivo for detectado.
Carga Automática no MySQL com Log Detalhado: Integrar um pipeline automatizado (por exemplo, com Airflow ou cron jobs) que realize a carga e registre o status em tempo real.
Notificações Automatizadas por E-mail ou Teams: Implementar um serviço de notificação que envie atualizações instantâneas sobre a detecção e conclusão da carga (ex.: via SMTP, API do Teams ou API do Slack).
Dashboard de Acompanhamento: Criar um painel no Power BI ou Grafana para visualizar, em tempo real, a situação das cargas, últimas atualizações e possíveis erros.
Abaixo segue um modelo de script de monitoramento e notificação automática para esse processo, totalmente adaptado à essa estrutura de pastas e ao banco MySQL.
script de monitoramento e notificação automática que:
Detecta automaticamente quando o arquivo do DDD Mensal aparece na pasta indicada.
Registra o evento e faz a carga no MySQL.
Envia e-mail automático confirmando a operação.
Aqui está um exemplo funcional em Python, que você pode agendar no Windows Task Scheduler para rodar periodicamente:
import os
import time
import smtplib
import mysql.connector
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
# ================= CONFIGURAÇÕES =================
# Caminho da pasta onde o arquivo será disponibilizado
PASTA_ARQUIVO = r"C:\Users\andre.bernardes\OneDrive - GEDEON RICHTER DO BRASIL\SFE\DADOS\DBA\IQVIA\DDD\Mensal\14.08.25"
NOME_ARQUIVO = "FF_DDDMIX_G_R_TXT_202507.ZIP"
# Configurações do MySQL
DB_HOST = "localhost"
DB_USER = "usuario_mysql"
DB_PASS = "senha_mysql"
DB_NAME = "nome_do_banco"
# Configurações de e-mail
SMTP_SERVER = "smtp.seu_email.com"
SMTP_PORT = 587
EMAIL_USER = "seu_email@empresa.com"
EMAIL_PASS = "sua_senha"
EMAIL_DESTINO = "destinatario@empresa.com"
# ================= FUNÇÕES =================
def arquivo_existe():
return os.path.exists(os.path.join(PASTA_ARQUIVO, NOME_ARQUIVO))
def carregar_mysql():
try:
conn = mysql.connector.connect(
host=DB_HOST,
user=DB_USER,
password=DB_PASS,
database=DB_NAME
)
cursor = conn.cursor()
# Exemplo: Inserindo log de carga (personalize conforme necessário)
cursor.execute("""
INSERT INTO log_cargas (arquivo, data_carga, status)
VALUES (%s, NOW(), %s)
""", (NOME_ARQUIVO, 'Concluído'))
conn.commit()
conn.close()
return True
except Exception as e:
print(f"Erro ao carregar MySQL: {e}")
return False
def enviar_email(status):
msg = MIMEMultipart()
msg['From'] = EMAIL_USER
msg['To'] = EMAIL_DESTINO
msg['Subject'] = f"Confirmação de Carga - {NOME_ARQUIVO}"
corpo = f"""
Olá,
O arquivo **{NOME_ARQUIVO}** foi detectado e a carga no MySQL foi {status}.
Data/Hora: {time.strftime('%d/%m/%Y %H:%M:%S')}
"""
msg.attach(MIMEText(corpo, 'plain'))
with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
server.starttls()
server.login(EMAIL_USER, EMAIL_PASS)
server.sendmail(EMAIL_USER, EMAIL_DESTINO, msg.as_string())
# ================= EXECUÇÃO =================
if arquivo_existe():
print("📂 Arquivo detectado!")
if carregar_mysql():
enviar_email("CONCLUÍDA ✅")
else:
enviar_email("FALHOU ❌")
else:
print("⏳ Arquivo ainda não disponível...")
🔹 Como usar:
1) Instalar dependências
pip install mysql-connector-python
2) Salvar o script como monitor_ddd.py.
3) Configurar no Agendador de Tarefas para rodar a cada 5 minutos, por exemplo.
4) Ajustar usuário, senha, servidor de e-mail e dados do MySQL no início do script.
AUTOMAÇÃO 100%
Também já preparamos a versão avançada que, além do e-mail, também envia mensagem automática no Microsoft Teams e atualiza um dashboard no Power BI em tempo real 📊. Isso deixaria o acompanhamento praticamente 100% automatizado.
Essa versão avançada com detecção do arquivo, carga no MySQL, notificação por e-mail + Teams e gatilho de refresh no Power BI. Mantém tudo num único script (fácil de agendar no Windows Task Scheduler) e separei a configuração em .env para não expor senhas.
1) Arquivos e dependências
Instale as libs:
pip install mysql-connector-python requests python-dotenv
Crie um .env ao lado do script com os parâmetros (ajuste tudo entre aspas conforme seu ambiente):
# ======= ARQUIVO / PROCESSO =======
WATCH_DIR="C:\Users\andre.bernardes\OneDrive - GEDEON RICHTER DO BRASIL\SFE\DADOS\DBA\IQVIA\DDD\Mensal\14.08.25"
WATCH_FILENAME="FF_DDDMIX_G_R_TXT_202507.ZIP"
# ======= MYSQL =======
MYSQL_HOST="localhost"
MYSQL_USER="usuario_mysql"
MYSQL_PASSWORD="senha_mysql"
MYSQL_DATABASE="nome_do_banco"
# ======= E-MAIL (SMTP) =======
SMTP_SERVER="smtp.seu_email.com"
SMTP_PORT="587"
SMTP_USER="seu_email@empresa.com"
SMTP_PASSWORD="sua_senha"
SMTP_TO="destinatario@empresa.com"
# ======= TEAMS WEBHOOK =======
TEAMS_WEBHOOK_URL="https://outlook.office.com/webhook/..." # Incoming Webhook do canal
# ======= POWER BI (Service Principal recomendado) =======
TENANT_ID="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
CLIENT_ID="yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy"
CLIENT_SECRET="sua_chave_secreta"
POWERBI_WORKSPACE_ID="aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
POWERBI_DATASET_ID="ffffffff-1111-2222-3333-444444444444"
Como obter:
Teams Webhook: no canal do Teams → Conectores → Incoming Webhook → dar um nome e copiar a URL.
Power BI REST API: crie um App Registration no Azure AD (Client ID/Secret), conceda permissão ao workspace (Admin do workspace → Allow service principals to use Power BI APIs e adicione o app como Admin/Member do workspace). Use escopo https://analysis.windows.net/powerbi/api/.default.
2) SQL sugerido (tabelas de controle/log)
Execute no MySQL para ter controle e independência:
CREATE TABLE IF NOT EXISTS log_cargas (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
arquivo VARCHAR(255) NOT NULL,
hash_arquivo CHAR(64) NOT NULL,
data_detectada DATETIME NOT NULL,
data_carga DATETIME NULL,
status VARCHAR(30) NOT NULL, -- DETECTADO, PROCESSANDO, CONCLUIDO, FALHA
detalhes TEXT NULL,
UNIQUE KEY uq_arquivo_hash (arquivo, hash_arquivo)
);
CREATE TABLE IF NOT EXISTS atualizacoes_mysql (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
tabela VARCHAR(128) NOT NULL,
acao VARCHAR(50) NOT NULL, -- INSERT/UPDATE/DELETE/REFRESH
data_execucao DATETIME NOT NULL,
linhas_afetadas BIGINT NULL,
origem VARCHAR(100) NULL -- DDD_MENSAL, etc.
);
Se já existir uma “tabela interna de controle” no seu ambiente, substitua log_cargas/atualizacoes_mysql pelos nomes oficiais.
3) Script único: monitor_ddd_avancado.py
Este script:
Detecta o arquivo e calcula SHA-256 (para não processar duplicado).
Registra DETECTADO → PROCESSANDO → CONCLUIDO/FALHA.
(Lugar-marcado) Processa o ZIP e faz a carga no MySQL (adapte para sua regra de negócio).
Envia e-mail e Teams.
Dispara refresh do dataset do Power BI.
import os
import time
import zipfile
import hashlib
import smtplib
import requests
import mysql.connector
from datetime import datetime
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from dotenv import load_dotenv
# ===================== CONFIG =====================
load_dotenv()
WATCH_DIR = os.getenv("WATCH_DIR")
WATCH_FILENAME = os.getenv("WATCH_FILENAME")
WATCH_PATH = os.path.join(WATCH_DIR, WATCH_FILENAME)
MYSQL_HOST = os.getenv("MYSQL_HOST")
MYSQL_USER = os.getenv("MYSQL_USER")
MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD")
MYSQL_DATABASE = os.getenv("MYSQL_DATABASE")
SMTP_SERVER = os.getenv("SMTP_SERVER")
SMTP_PORT = int(os.getenv("SMTP_PORT", "587"))
SMTP_USER = os.getenv("SMTP_USER")
SMTP_PASSWORD = os.getenv("SMTP_PASSWORD")
SMTP_TO = os.getenv("SMTP_TO")
TEAMS_WEBHOOK_URL = os.getenv("TEAMS_WEBHOOK_URL")
TENANT_ID = os.getenv("TENANT_ID")
CLIENT_ID = os.getenv("CLIENT_ID")
CLIENT_SECRET = os.getenv("CLIENT_SECRET")
POWERBI_WORKSPACE_ID = os.getenv("POWERBI_WORKSPACE_ID")
POWERBI_DATASET_ID = os.getenv("POWERBI_DATASET_ID")
# ===================== UTILS ======================
def now():
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def sha256_file(path):
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
h.update(chunk)
return h.hexdigest()
def db_conn():
return mysql.connector.connect(
host=MYSQL_HOST,
user=MYSQL_USER,
password=MYSQL_PASSWORD,
database=MYSQL_DATABASE,
autocommit=False,
)
def upsert_log_carga(cursor, arquivo, hash_arquivo, status, detalhes=None, as_detected=False):
if as_detected:
cursor.execute("""
INSERT IGNORE INTO log_cargas (arquivo, hash_arquivo, data_detectada, status, detalhes)
VALUES (%s, %s, NOW(), %s, %s)
""", (arquivo, hash_arquivo, status, detalhes))
else:
cursor.execute("""
UPDATE log_cargas
SET status=%s,
detalhes=%s,
data_carga=CASE WHEN %s IN ('CONCLUIDO','FALHA') THEN NOW() ELSE data_carga END
WHERE arquivo=%s AND hash_arquivo=%s
""", (status, detalhes, status, arquivo, hash_arquivo))
def registrar_atualizacao_mysql(cursor, tabela, acao, linhas, origem):
cursor.execute("""
INSERT INTO atualizacoes_mysql (tabela, acao, data_execucao, linhas_afetadas, origem)
VALUES (%s, %s, NOW(), %s, %s)
""", (tabela, acao, linhas, origem))
def enviar_email(assunto, corpo):
msg = MIMEMultipart()
msg["From"] = SMTP_USER
msg["To"] = SMTP_TO
msg["Subject"] = assunto
msg.attach(MIMEText(corpo, "html"))
with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
server.starttls()
server.login(SMTP_USER, SMTP_PASSWORD)
server.sendmail(SMTP_USER, SMTP_TO, msg.as_string())
def enviar_teams(texto):
if not TEAMS_WEBHOOK_URL:
return
payload = {"text": texto}
try:
requests.post(TEAMS_WEBHOOK_URL, json=payload, timeout=15).raise_for_status()
except Exception as e:
print(f"[{now()}] Falha ao enviar Teams: {e}")
def obter_token_powerbi():
url = f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/v2.0/token"
data = {
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"grant_type": "client_credentials",
"scope": "https://analysis.windows.net/powerbi/api/.default"
}
resp = requests.post(url, data=data, timeout=20)
resp.raise_for_status()
return resp.json()["access_token"]
def refresh_powerbi_dataset():
token = obter_token_powerbi()
url = f"https://api.powerbi.com/v1.0/myorg/groups/{POWERBI_WORKSPACE_ID}/datasets/{POWERBI_DATASET_ID}/refreshes"
resp = requests.post(url, headers={"Authorization": f"Bearer {token}"}, json={}, timeout=20)
resp.raise_for_status()
return True
# ============ LUGAR-MARCADO: CARGA DE DADOS ============
def processar_e_carregar_zip_no_mysql(zip_path, cursor):
"""
ADAPTE AQUI:
- Descompacte o ZIP
- Parseie os arquivos (CSV/TXT)
- Upsert em tabelas finais
- Registre linhas afetadas em atualizacoes_mysql
"""
linhas_total = 0
with zipfile.ZipFile(zip_path, "r") as zf:
for member in zf.namelist():
if member.lower().endswith((".csv", ".txt")):
with zf.open(member) as f:
# Exemplo mínimo (conteúdo texto); substitua por parser real.
# for _ in f: linhas_total += 1
pass
# EXEMPLO de registro (substitua tabela/acao/linhas):
registrar_atualizacao_mysql(cursor, tabela="tabela_destino_ddd", acao="UPSERT", linhas=linhas_total, origem="DDD_MENSAL")
return linhas_total
# ===================== MAIN ======================
def main():
if not os.path.exists(WATCH_PATH):
print(f"[{now()}] ⏳ Arquivo não encontrado: {WATCH_PATH}")
return
file_hash = sha256_file(WATCH_PATH)
print(f"[{now()}] 📂 Arquivo detectado. SHA256={file_hash}")
conn = db_conn()
try:
cur = conn.cursor()
# 1) LOG: DETECTADO (idempotente)
upsert_log_carga(cur, WATCH_FILENAME, file_hash, status="DETECTADO", detalhes="Arquivo detectado pelo monitor.", as_detected=True)
conn.commit()
# 2) Verificar se já foi processado antes
cur.execute("""
SELECT status FROM log_cargas
WHERE arquivo=%s AND hash_arquivo=%s
LIMIT 1
""", (WATCH_FILENAME, file_hash))
row = cur.fetchone()
if row and row[0] in ("CONCLUIDO"):
print(f"[{now()}] ✅ Já processado anteriormente. Nada a fazer.")
return
# 3) LOG: PROCESSANDO
upsert_log_carga(cur, WATCH_FILENAME, file_hash, status="PROCESSANDO", detalhes="Iniciando carga no MySQL.")
conn.commit()
# 4) PROCESSO DE CARGA
linhas = processar_e_carregar_zip_no_mysql(WATCH_PATH, cur)
conn.commit()
# 5) LOG: CONCLUIDO
upsert_log_carga(cur, WATCH_FILENAME, file_hash, status="CONCLUIDO", detalhes=f"Carga concluída. Linhas afetadas: {linhas}")
conn.commit()
# 6) NOTIFICAÇÕES
assunto = f"[DDD] Carga concluída ✅ — {WATCH_FILENAME}"
corpo = f"""
<p>Olá,</p>
<p>O arquivo <b>{WATCH_FILENAME}</b> foi processado com sucesso.</p>
<ul>
<li>Detectado: {now()}</li>
<li>Linhas (estimativa): {linhas}</li>
<li>Status: CONCLUÍDO</li>
<li>Hash: <code>{file_hash}</code></li>
</ul>
<p>Atenciosamente,<br>Robô de Cargas DDD 🤖</p>
"""
enviar_email(assunto, corpo)
enviar_teams(f"✅ Carga concluída do arquivo **{WATCH_FILENAME}**.\nLinhas: {linhas}\nHash: {file_hash}")
# 7) POWER BI REFRESH
try:
if refresh_powerbi_dataset():
enviar_teams(f"🔄 Power BI: refresh do dataset iniciado para o arquivo **{WATCH_FILENAME}**.")
except Exception as e:
enviar_teams(f"⚠️ Power BI: falha ao iniciar refresh. Erro: {e}")
print(f"[{now()}] ✅ Processo finalizado com sucesso.")
except Exception as e:
conn.rollback()
# LOG: FALHA
try:
cur = conn.cursor()
upsert_log_carga(cur, WATCH_FILENAME, file_hash, status="FALHA", detalhes=str(e))
conn.commit()
except Exception:
pass
# Notificações de erro
try:
enviar_email(f"[DDD] Falha na carga ❌ — {WATCH_FILENAME}",
f"<p>Falha ao processar <b>{WATCH_FILENAME}</b>:</p><pre>{e}</pre>")
enviar_teams(f"❌ Falha ao processar **{WATCH_FILENAME}**.\nErro: {e}")
except Exception:
pass
print(f"[{now()}] ❌ Erro: {e}")
finally:
try:
conn.close()
except Exception:
pass
if __name__ == "__main__":
main()
4) Agendamento no Windows (Task Scheduler)
Abrir Agendador de Tarefas → Criar Tarefa…
Gatilho: a cada 5 minutos (ou conforme necessário)
Ação:
Programa/script: python
Adicionar argumentos: "C:\caminho\monitor_ddd_avancado.py"
Iniciar em: C:\caminho\
Executar com privilégios mais altos e marcar “Executar mesmo que o usuário não esteja conectado” se precisar rodar sempre.
Alternativa: criar um .bat:
@echo off
cd /d "C:\caminho"
python monitor_ddd_avancado.py
5) Boas práticas (recomendado)
Segurança: mantenha .env fora de repositórios, use segredos do Windows ou Azure Key Vault quando possível.
Resiliência: se desejar, adicione tentativas com backoff nas chamadas ao SMTP, Teams e Power BI.
Observabilidade: exporte logs para o Windows Event Log ou arquivo rotativo (logging.handlers.RotatingFileHandler).
Idempotência real: além do hash, aplique checks por período (ex.: competência 2025-07) para evitar recargas indevidas.
Validação: após a carga, rode queries de sanidade (contagens, ranges de datas, chaves únicas).
☕DOE UM CAFÉ:
👉 Não se esqueça de seguir André Bernardes no Linkedin. Clique aqui e me contate via What's App.