Propósito

✔ Programação GLOBAL® - Quaisquer soluções e/ou desenvolvimento de aplicações pessoais, ou da empresa, que não constem neste Blog devem ser tratados como consultoria freelance. Queiram contatar-nos: brazilsalesforceeffectiveness@gmail.com | ESTE BLOG NÃO SE RESPONSABILIZA POR QUAISQUER DANOS PROVENIENTES DO USO DOS CÓDIGOS AQUI POSTADOS EM APLICAÇÕES PESSOAIS OU DE TERCEIROS.

IQVIA™ DDD® - Drugs Distribution Data - Automatizando a Atualização no BI

IQVIA™ DDD® - Drugs Distribution Data - Automatizando a atualização interna do seu BI
#ProgramaçãoGlobal #BI #Dados #DadosPúblicos #BasedeDados #PortalBrasileirodeDados #CatálogoNacionaldeDados #Data #BusinessIntelligence #DataDrivenDesign

 Compre OS LIVROS DESTA SÉRIE 


Todos nos deparamos com as complexidades de atualização de dados. Isso toma muito tempo e invariavelmente inicia por meio de planilhas.


Mas, com o passar do tempo, evoluimos para o uso de Bancos de Dados, os quais nos estendem mais opções.


Ainda na curva ascendente do aprendizado automatizamos o processo, ou parte dele para obtermos benefícios em termos de tempo, assertividade e praticidade.


A pergunta é: Como fazer isso?


Bem, estou disponibilizando abaixo parte do processo que poderá inspirar você a resolver os seus problemas. Espero que seja útil, aplicável às suas necessidades e adaptável ao seu processo e necessidade de atualização.


PROVAVELMENTE INICIE O DIA COM O SEGUINTE E-MAIL:

Assunto: 📊 Verificação e Acompanhamento do Processo de atualização dos Arquivos de FLATFILE IQVIA DDD Mensal

Olá BI SÊNIOR, tudo bem?

Com o intuito de identificarmos e mapearmos todo o processo, informo que o arquivo do DDD Mensal já está disponível. Segue o caminho para referência:

\OneDrive - COMPANY\DADOS\IQVIA\DDD\Mensal\DD.MM.AAAA\FF_DDDMIX_G_R_TXT_AAAAMM.ZIP

Para darmos sequência, preciso que me responda às seguintes perguntas:

1️⃣ Detecção automática:

O sistema de automação já consegue identificar que esse arquivo específico foi disponibilizado?

2️⃣ Carga no MySQL:

O sistema já efetuou a carga do arquivo no MySQL?

3️⃣ Confirmação por e-mail:

Quando o sistema de automação enviará para mim a confirmação da respectiva carga no MySQL 📬?

4️⃣ Nome da tabela de controle:

Qual é mesmo o nome da tabela interna no MySQL utilizada para controle e acompanhamento das atualizações?

Aguardo seu retorno para que possamos validar os próximos passos e otimizar o processo.

Abraços,
Seu chefe

E SUA RESPOSTA DEVE SER EM AÇÕES. MAS QUAIS?


Sugestões Tecnológicas para Melhorar o Acompanhamento do Processo 📌


Agendamento Automático de Verificação: Criar um script em Python ou PowerShell para monitorar a pasta e disparar alertas assim que um novo arquivo for detectado.


Carga Automática no MySQL com Log Detalhado: Integrar um pipeline automatizado (por exemplo, com Airflow ou cron jobs) que realize a carga e registre o status em tempo real.


Notificações Automatizadas por E-mail ou Teams: Implementar um serviço de notificação que envie atualizações instantâneas sobre a detecção e conclusão da carga (ex.: via SMTP, API do Teams ou API do Slack).


Dashboard de Acompanhamento: Criar um painel no Power BI ou Grafana para visualizar, em tempo real, a situação das cargas, últimas atualizações e possíveis erros.


Abaixo segue um modelo de script de monitoramento e notificação automática para esse processo, totalmente adaptado à essa estrutura de pastas e ao banco MySQL.


script de monitoramento e notificação automática que:


Detecta automaticamente quando o arquivo do DDD Mensal aparece na pasta indicada.


Registra o evento e faz a carga no MySQL.


Envia e-mail automático confirmando a operação.


Aqui está um exemplo funcional em Python, que você pode agendar no Windows Task Scheduler para rodar periodicamente:


import os

import time

import smtplib

import mysql.connector

from email.mime.text import MIMEText

from email.mime.multipart import MIMEMultipart


# ================= CONFIGURAÇÕES =================

# Caminho da pasta onde o arquivo será disponibilizado

PASTA_ARQUIVO = r"C:\Users\andre.bernardes\OneDrive - GEDEON RICHTER DO BRASIL\SFE\DADOS\DBA\IQVIA\DDD\Mensal\14.08.25"

NOME_ARQUIVO = "FF_DDDMIX_G_R_TXT_202507.ZIP"


# Configurações do MySQL

DB_HOST = "localhost"

DB_USER = "usuario_mysql"

DB_PASS = "senha_mysql"

DB_NAME = "nome_do_banco"


# Configurações de e-mail

SMTP_SERVER = "smtp.seu_email.com"

SMTP_PORT = 587

EMAIL_USER = "seu_email@empresa.com"

EMAIL_PASS = "sua_senha"

EMAIL_DESTINO = "destinatario@empresa.com"


# ================= FUNÇÕES =================

def arquivo_existe():

    return os.path.exists(os.path.join(PASTA_ARQUIVO, NOME_ARQUIVO))


def carregar_mysql():

    try:

        conn = mysql.connector.connect(

            host=DB_HOST,

            user=DB_USER,

            password=DB_PASS,

            database=DB_NAME

        )

        cursor = conn.cursor()

        

        # Exemplo: Inserindo log de carga (personalize conforme necessário)

        cursor.execute("""

            INSERT INTO log_cargas (arquivo, data_carga, status)

            VALUES (%s, NOW(), %s)

        """, (NOME_ARQUIVO, 'Concluído'))

        

        conn.commit()

        conn.close()

        return True

    except Exception as e:

        print(f"Erro ao carregar MySQL: {e}")

        return False


def enviar_email(status):

    msg = MIMEMultipart()

    msg['From'] = EMAIL_USER

    msg['To'] = EMAIL_DESTINO

    msg['Subject'] = f"Confirmação de Carga - {NOME_ARQUIVO}"


    corpo = f"""

    Olá,  

    O arquivo **{NOME_ARQUIVO}** foi detectado e a carga no MySQL foi {status}.  

    Data/Hora: {time.strftime('%d/%m/%Y %H:%M:%S')}  

    """

    msg.attach(MIMEText(corpo, 'plain'))


    with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:

        server.starttls()

        server.login(EMAIL_USER, EMAIL_PASS)

        server.sendmail(EMAIL_USER, EMAIL_DESTINO, msg.as_string())


# ================= EXECUÇÃO =================

if arquivo_existe():

    print("📂 Arquivo detectado!")

    if carregar_mysql():

        enviar_email("CONCLUÍDA ✅")

    else:

        enviar_email("FALHOU ❌")

else:

    print("⏳ Arquivo ainda não disponível...")



🔹 Como usar:


1) Instalar dependências



pip install mysql-connector-python



2) Salvar o script como monitor_ddd.py.


3) Configurar no Agendador de Tarefas para rodar a cada 5 minutos, por exemplo.


4) Ajustar usuário, senha, servidor de e-mail e dados do MySQL no início do script.



AUTOMAÇÃO 100%


Também já preparamos a versão avançada que, além do e-mail, também envia mensagem automática no Microsoft Teams e atualiza um dashboard no Power BI em tempo real 📊. Isso deixaria o acompanhamento praticamente 100% automatizado.


Essa versão avançada com detecção do arquivo, carga no MySQL, notificação por e-mail + Teams e gatilho de refresh no Power BI. Mantém tudo num único script (fácil de agendar no Windows Task Scheduler) e separei a configuração em .env para não expor senhas.


1) Arquivos e dependências


Instale as libs:


pip install mysql-connector-python requests python-dotenv


Crie um .env ao lado do script com os parâmetros (ajuste tudo entre aspas conforme seu ambiente):


# ======= ARQUIVO / PROCESSO =======

WATCH_DIR="C:\Users\andre.bernardes\OneDrive - GEDEON RICHTER DO BRASIL\SFE\DADOS\DBA\IQVIA\DDD\Mensal\14.08.25"

WATCH_FILENAME="FF_DDDMIX_G_R_TXT_202507.ZIP"


# ======= MYSQL =======

MYSQL_HOST="localhost"

MYSQL_USER="usuario_mysql"

MYSQL_PASSWORD="senha_mysql"

MYSQL_DATABASE="nome_do_banco"


# ======= E-MAIL (SMTP) =======

SMTP_SERVER="smtp.seu_email.com"

SMTP_PORT="587"

SMTP_USER="seu_email@empresa.com"

SMTP_PASSWORD="sua_senha"

SMTP_TO="destinatario@empresa.com"


# ======= TEAMS WEBHOOK =======

TEAMS_WEBHOOK_URL="https://outlook.office.com/webhook/..."  # Incoming Webhook do canal


# ======= POWER BI (Service Principal recomendado) =======

TENANT_ID="xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"

CLIENT_ID="yyyyyyyy-yyyy-yyyy-yyyy-yyyyyyyyyyyy"

CLIENT_SECRET="sua_chave_secreta"

POWERBI_WORKSPACE_ID="aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"

POWERBI_DATASET_ID="ffffffff-1111-2222-3333-444444444444"



Como obter:


Teams Webhook: no canal do Teams → Conectores → Incoming Webhook → dar um nome e copiar a URL.


Power BI REST API: crie um App Registration no Azure AD (Client ID/Secret), conceda permissão ao workspace (Admin do workspace → Allow service principals to use Power BI APIs e adicione o app como Admin/Member do workspace). Use escopo https://analysis.windows.net/powerbi/api/.default.



2) SQL sugerido (tabelas de controle/log)


Execute no MySQL para ter controle e independência:


CREATE TABLE IF NOT EXISTS log_cargas (

  id BIGINT AUTO_INCREMENT PRIMARY KEY,

  arquivo VARCHAR(255) NOT NULL,

  hash_arquivo CHAR(64) NOT NULL,

  data_detectada DATETIME NOT NULL,

  data_carga DATETIME NULL,

  status VARCHAR(30) NOT NULL,            -- DETECTADO, PROCESSANDO, CONCLUIDO, FALHA

  detalhes TEXT NULL,

  UNIQUE KEY uq_arquivo_hash (arquivo, hash_arquivo)

);


CREATE TABLE IF NOT EXISTS atualizacoes_mysql (

  id BIGINT AUTO_INCREMENT PRIMARY KEY,

  tabela VARCHAR(128) NOT NULL,

  acao VARCHAR(50) NOT NULL,              -- INSERT/UPDATE/DELETE/REFRESH

  data_execucao DATETIME NOT NULL,

  linhas_afetadas BIGINT NULL,

  origem VARCHAR(100) NULL                -- DDD_MENSAL, etc.

);


Se já existir uma “tabela interna de controle” no seu ambiente, substitua log_cargas/atualizacoes_mysql pelos nomes oficiais.



3) Script único: monitor_ddd_avancado.py


Este script:


Detecta o arquivo e calcula SHA-256 (para não processar duplicado).


Registra DETECTADO → PROCESSANDO → CONCLUIDO/FALHA.


(Lugar-marcado) Processa o ZIP e faz a carga no MySQL (adapte para sua regra de negócio).


Envia e-mail e Teams.


Dispara refresh do dataset do Power BI.


import os

import time

import zipfile

import hashlib

import smtplib

import requests

import mysql.connector

from datetime import datetime

from email.mime.text import MIMEText

from email.mime.multipart import MIMEMultipart

from dotenv import load_dotenv


# ===================== CONFIG =====================

load_dotenv()


WATCH_DIR = os.getenv("WATCH_DIR")

WATCH_FILENAME = os.getenv("WATCH_FILENAME")

WATCH_PATH = os.path.join(WATCH_DIR, WATCH_FILENAME)


MYSQL_HOST = os.getenv("MYSQL_HOST")

MYSQL_USER = os.getenv("MYSQL_USER")

MYSQL_PASSWORD = os.getenv("MYSQL_PASSWORD")

MYSQL_DATABASE = os.getenv("MYSQL_DATABASE")


SMTP_SERVER = os.getenv("SMTP_SERVER")

SMTP_PORT = int(os.getenv("SMTP_PORT", "587"))

SMTP_USER = os.getenv("SMTP_USER")

SMTP_PASSWORD = os.getenv("SMTP_PASSWORD")

SMTP_TO = os.getenv("SMTP_TO")


TEAMS_WEBHOOK_URL = os.getenv("TEAMS_WEBHOOK_URL")


TENANT_ID = os.getenv("TENANT_ID")

CLIENT_ID = os.getenv("CLIENT_ID")

CLIENT_SECRET = os.getenv("CLIENT_SECRET")

POWERBI_WORKSPACE_ID = os.getenv("POWERBI_WORKSPACE_ID")

POWERBI_DATASET_ID = os.getenv("POWERBI_DATASET_ID")


# ===================== UTILS ======================

def now():

    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")


def sha256_file(path):

    h = hashlib.sha256()

    with open(path, "rb") as f:

        for chunk in iter(lambda: f.read(8192), b""):

            h.update(chunk)

    return h.hexdigest()


def db_conn():

    return mysql.connector.connect(

        host=MYSQL_HOST,

        user=MYSQL_USER,

        password=MYSQL_PASSWORD,

        database=MYSQL_DATABASE,

        autocommit=False,

    )


def upsert_log_carga(cursor, arquivo, hash_arquivo, status, detalhes=None, as_detected=False):

    if as_detected:

        cursor.execute("""

            INSERT IGNORE INTO log_cargas (arquivo, hash_arquivo, data_detectada, status, detalhes)

            VALUES (%s, %s, NOW(), %s, %s)

        """, (arquivo, hash_arquivo, status, detalhes))

    else:

        cursor.execute("""

            UPDATE log_cargas

               SET status=%s,

                   detalhes=%s,

                   data_carga=CASE WHEN %s IN ('CONCLUIDO','FALHA') THEN NOW() ELSE data_carga END

             WHERE arquivo=%s AND hash_arquivo=%s

        """, (status, detalhes, status, arquivo, hash_arquivo))


def registrar_atualizacao_mysql(cursor, tabela, acao, linhas, origem):

    cursor.execute("""

        INSERT INTO atualizacoes_mysql (tabela, acao, data_execucao, linhas_afetadas, origem)

        VALUES (%s, %s, NOW(), %s, %s)

    """, (tabela, acao, linhas, origem))


def enviar_email(assunto, corpo):

    msg = MIMEMultipart()

    msg["From"] = SMTP_USER

    msg["To"] = SMTP_TO

    msg["Subject"] = assunto

    msg.attach(MIMEText(corpo, "html"))

    with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:

        server.starttls()

        server.login(SMTP_USER, SMTP_PASSWORD)

        server.sendmail(SMTP_USER, SMTP_TO, msg.as_string())


def enviar_teams(texto):

    if not TEAMS_WEBHOOK_URL:

        return

    payload = {"text": texto}

    try:

        requests.post(TEAMS_WEBHOOK_URL, json=payload, timeout=15).raise_for_status()

    except Exception as e:

        print(f"[{now()}] Falha ao enviar Teams: {e}")


def obter_token_powerbi():

    url = f"https://login.microsoftonline.com/{TENANT_ID}/oauth2/v2.0/token"

    data = {

        "client_id": CLIENT_ID,

        "client_secret": CLIENT_SECRET,

        "grant_type": "client_credentials",

        "scope": "https://analysis.windows.net/powerbi/api/.default"

    }

    resp = requests.post(url, data=data, timeout=20)

    resp.raise_for_status()

    return resp.json()["access_token"]


def refresh_powerbi_dataset():

    token = obter_token_powerbi()

    url = f"https://api.powerbi.com/v1.0/myorg/groups/{POWERBI_WORKSPACE_ID}/datasets/{POWERBI_DATASET_ID}/refreshes"

    resp = requests.post(url, headers={"Authorization": f"Bearer {token}"}, json={}, timeout=20)

    resp.raise_for_status()

    return True


# ============ LUGAR-MARCADO: CARGA DE DADOS ============

def processar_e_carregar_zip_no_mysql(zip_path, cursor):

    """

    ADAPTE AQUI:

    - Descompacte o ZIP

    - Parseie os arquivos (CSV/TXT)

    - Upsert em tabelas finais

    - Registre linhas afetadas em atualizacoes_mysql

    """

    linhas_total = 0

    with zipfile.ZipFile(zip_path, "r") as zf:

        for member in zf.namelist():

            if member.lower().endswith((".csv", ".txt")):

                with zf.open(member) as f:

                    # Exemplo mínimo (conteúdo texto); substitua por parser real.

                    # for _ in f: linhas_total += 1

                    pass


    # EXEMPLO de registro (substitua tabela/acao/linhas):

    registrar_atualizacao_mysql(cursor, tabela="tabela_destino_ddd", acao="UPSERT", linhas=linhas_total, origem="DDD_MENSAL")

    return linhas_total


# ===================== MAIN ======================

def main():

    if not os.path.exists(WATCH_PATH):

        print(f"[{now()}] ⏳ Arquivo não encontrado: {WATCH_PATH}")

        return


    file_hash = sha256_file(WATCH_PATH)

    print(f"[{now()}] 📂 Arquivo detectado. SHA256={file_hash}")


    conn = db_conn()

    try:

        cur = conn.cursor()


        # 1) LOG: DETECTADO (idempotente)

        upsert_log_carga(cur, WATCH_FILENAME, file_hash, status="DETECTADO", detalhes="Arquivo detectado pelo monitor.", as_detected=True)

        conn.commit()


        # 2) Verificar se já foi processado antes

        cur.execute("""

            SELECT status FROM log_cargas

             WHERE arquivo=%s AND hash_arquivo=%s

             LIMIT 1

        """, (WATCH_FILENAME, file_hash))

        row = cur.fetchone()

        if row and row[0] in ("CONCLUIDO"):

            print(f"[{now()}] ✅ Já processado anteriormente. Nada a fazer.")

            return


        # 3) LOG: PROCESSANDO

        upsert_log_carga(cur, WATCH_FILENAME, file_hash, status="PROCESSANDO", detalhes="Iniciando carga no MySQL.")

        conn.commit()


        # 4) PROCESSO DE CARGA

        linhas = processar_e_carregar_zip_no_mysql(WATCH_PATH, cur)

        conn.commit()


        # 5) LOG: CONCLUIDO

        upsert_log_carga(cur, WATCH_FILENAME, file_hash, status="CONCLUIDO", detalhes=f"Carga concluída. Linhas afetadas: {linhas}")

        conn.commit()


        # 6) NOTIFICAÇÕES

        assunto = f"[DDD] Carga concluída ✅ — {WATCH_FILENAME}"

        corpo = f"""

        <p>Olá,</p>

        <p>O arquivo <b>{WATCH_FILENAME}</b> foi processado com sucesso.</p>

        <ul>

          <li>Detectado: {now()}</li>

          <li>Linhas (estimativa): {linhas}</li>

          <li>Status: CONCLUÍDO</li>

          <li>Hash: <code>{file_hash}</code></li>

        </ul>

        <p>Atenciosamente,<br>Robô de Cargas DDD 🤖</p>

        """

        enviar_email(assunto, corpo)

        enviar_teams(f"✅ Carga concluída do arquivo **{WATCH_FILENAME}**.\nLinhas: {linhas}\nHash: {file_hash}")


        # 7) POWER BI REFRESH

        try:

            if refresh_powerbi_dataset():

                enviar_teams(f"🔄 Power BI: refresh do dataset iniciado para o arquivo **{WATCH_FILENAME}**.")

        except Exception as e:

            enviar_teams(f"⚠️ Power BI: falha ao iniciar refresh. Erro: {e}")


        print(f"[{now()}] ✅ Processo finalizado com sucesso.")


    except Exception as e:

        conn.rollback()

        # LOG: FALHA

        try:

            cur = conn.cursor()

            upsert_log_carga(cur, WATCH_FILENAME, file_hash, status="FALHA", detalhes=str(e))

            conn.commit()

        except Exception:

            pass

        # Notificações de erro

        try:

            enviar_email(f"[DDD] Falha na carga ❌ — {WATCH_FILENAME}",

                         f"<p>Falha ao processar <b>{WATCH_FILENAME}</b>:</p><pre>{e}</pre>")

            enviar_teams(f"❌ Falha ao processar **{WATCH_FILENAME}**.\nErro: {e}")

        except Exception:

            pass

        print(f"[{now()}] ❌ Erro: {e}")

    finally:

        try:

            conn.close()

        except Exception:

            pass


if __name__ == "__main__":

    main()


4) Agendamento no Windows (Task Scheduler)


Abrir Agendador de Tarefas → Criar Tarefa…


Gatilho: a cada 5 minutos (ou conforme necessário)


Ação:


Programa/script: python


Adicionar argumentos: "C:\caminho\monitor_ddd_avancado.py"


Iniciar em: C:\caminho\


Executar com privilégios mais altos e marcar “Executar mesmo que o usuário não esteja conectado” se precisar rodar sempre.


Alternativa: criar um .bat:


@echo off

cd /d "C:\caminho"

python monitor_ddd_avancado.py



5) Boas práticas (recomendado)


Segurança: mantenha .env fora de repositórios, use segredos do Windows ou Azure Key Vault quando possível.


Resiliência: se desejar, adicione tentativas com backoff nas chamadas ao SMTP, Teams e Power BI.


Observabilidade: exporte logs para o Windows Event Log ou arquivo rotativo (logging.handlers.RotatingFileHandler).


Idempotência real: além do hash, aplique checks por período (ex.: competência 2025-07) para evitar recargas indevidas.


Validação: após a carga, rode queries de sanidade (contagens, ranges de datas, chaves únicas).


DOE UM CAFÉ:

👉 Não se esqueça de seguir André Bernardes no Linkedin. Clique aqui e me contate via What's App. 

Comente e compartilhe este artigo!

brazilsalesforceeffectiveness@gmail.com


,

Clique na imagem acima para acessar 10 e-Books!


 PUDIM PROJECT 

eBook - PT - PUDIM PROJECT 2024 - Python Volume 01 - Funções Essenciais - Série PUDIM PROJECT — André Luiz Bernardes eBook - PT - PUDIM PROJECT 2024 - Python Volume 02 - Funções Essenciais - Série PUDIM PROJECT — André Luiz Bernardes eBook - PT - PUDIM PROJECT 2024 - Python Volume 03 - Automatizando Postagens em Redes e Plataformas Sociais - Série PUDIM PROJECT — André Luiz Bernardes


eBook - PT - PUDIM PROJECT 2024 - Python Volume 04 - Funções para Automatização - Série PUDIM PROJECT — André Luiz Bernardes eBook - PT - PUDIM PROJECT 2024 - Python Volume 05 - Automatizando Postagens em Redes e Plataformas Sociais - Série PUDIM PROJECT — André Luiz Bernardes eBook - PT - PUDIM PROJECT 2024 - Python Volume 06 - Automatizando Postagens em Redes e Plataformas Sociais - Série PUDIM PROJECT — André Luiz Bernardes


eBook - PT - PUDIM PROJECT 2024 - Python Volume 07 - Automatizando Postagens em Redes e Plataformas Sociais - Série PUDIM PROJECT — André Luiz Bernardes eBook - PT - PUDIM PROJECT 2024 - Python Volume 08 - Automatizando Postagens em Redes e Plataformas Sociais - Série PUDIM PROJECT — André Luiz Bernardes eBook - PT - PUDIM PROJECT 2024 - Python Volume 09 - Automatizando Postagens em Redes e Plataformas Sociais - Série PUDIM PROJECT — André Luiz Bernardes


eBook - PT - PUDIM PROJECT 2024 - Python Volume 10 - Automatizando Postagens em Redes e Plataformas Sociais - Série PUDIM PROJECT — André Luiz Bernardes eBook - PT - PUDIM PROJECT 2024 - Python Volume 11 - Automatizando Postagens em Redes e Plataformas Sociais - Série PUDIM PROJECT — André Luiz Bernardes

diHITT - Notícias