You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
kmftools/energie/store_dailyprices.py

145 lines
4.8 KiB

import requests
import mysql.connector
from mysql.connector import Error
import datetime
import json
import os
from dotenv import load_dotenv
# Laad configuratie uit .env bestand
load_dotenv()
# Configuratievariabelen uit .env bestand
ENERVER_API_URL = os.getenv("ENERVER_API_URL_DAILY")
ENERVER_API_TOKEN = os.getenv("ENERVER_API_TOKEN")
# Verkrijg de Telegram configuraties uit het .env bestand
TELEGRAM_API_URL = (
f"https://api.telegram.org/bot{os.getenv('TELEGRAM_BOT_TOKEN')}/sendMessage"
)
TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID")
MYSQL_CONFIG = {
"host": os.getenv("MYSQL_HOST"),
"user": os.getenv("MYSQL_USER"),
"password": os.getenv("MYSQL_PASSWORD"),
"database": os.getenv("MYSQL_DATABASE"),
}
def send_telegram_message(message):
"""Stuur een bericht naar een Telegram chat."""
payload = {"chat_id": TELEGRAM_CHAT_ID, "text": message}
try:
response = requests.post(TELEGRAM_API_URL, data=payload)
response.raise_for_status()
print("Telegram bericht succesvol verzonden.")
except requests.RequestException as e:
print(f"Fout bij het verzenden van Telegram bericht: {e}")
def fetch_enever_prices():
"""Haal stroomprijsgegevens op van de EneVer API."""
params = {"token": ENERVER_API_TOKEN}
try:
response = requests.get(ENERVER_API_URL, params=params)
response.raise_for_status()
return response.json() # Verwacht JSON als antwoord
except requests.RequestException as e:
print(f"Fout bij ophalen EneVer-prijzen: {e}")
return None
def ensure_table_and_columns_exist(cursor, table_name, columns):
"""Controleer of de tabel en kolommen bestaan, en voeg ontbrekende kolommen toe."""
# Zorg dat de tabel bestaat met een unieke constraint op 'timestamp'
cursor.execute(
f"""
CREATE TABLE IF NOT EXISTS {table_name} (
timestamp DATETIME PRIMARY KEY
);
"""
)
# Controleer bestaande kolommen
cursor.execute(f"DESCRIBE {table_name};")
existing_columns = [row[0] for row in cursor.fetchall()]
# Voeg ontbrekende kolommen toe
for col_name, col_type in columns.items():
if col_name not in existing_columns:
cursor.execute(
f"""
ALTER TABLE {table_name} ADD COLUMN {col_name} {col_type};
"""
)
def store_prices_to_db(prices):
"""Sla de relevante stroomprijzen op in een MySQL-database."""
try:
connection = mysql.connector.connect(**MYSQL_CONFIG)
if connection.is_connected():
cursor = connection.cursor()
table_name = "daillyprices"
# Specificeer de kolommen die nodig zijn
columns = {"timestamp": "DATETIME"}
for key in prices["data"][0].keys():
if key.startswith("prijs"):
col_name = f"price_{key[5:].lower()}"
columns[col_name] = "DECIMAL(10, 5)"
# Zorg dat de tabel en kolommen bestaan
ensure_table_and_columns_exist(cursor, table_name, columns)
# Voeg de prijzen toe aan de tabel
for price_entry in prices["data"]:
timestamp = datetime.datetime.strptime(
price_entry["datum"], "%Y-%m-%d %H:%M:%S"
)
insert_columns = ["timestamp"]
insert_values = [timestamp]
update_clause = []
for key, value in price_entry.items():
if key.startswith("prijs"):
col_name = f"price_{key[5:].lower()}"
insert_columns.append(col_name)
insert_values.append(float(value))
update_clause.append(
f"{col_name} = VALUES({col_name})")
insert_query = f"""
INSERT INTO {table_name} ({', '.join(insert_columns)})
VALUES ({', '.join(['%s'] * len(insert_values))})
ON DUPLICATE KEY UPDATE {', '.join(update_clause)};
"""
cursor.execute(insert_query, tuple(insert_values))
connection.commit()
print("Prijzen succesvol opgeslagen in de database.")
except Error as e:
print(f"Fout bij verbinden met de database: {e}")
finally:
if connection.is_connected():
cursor.close()
connection.close()
if __name__ == "__main__":
# Prijzen ophalen van EneVer API
enever_prices = fetch_enever_prices()
if enever_prices:
# Prijzen opslaan in de database
store_prices_to_db(enever_prices)
send_telegram_message(
"De dagelijkse stroomprijzen zijn succesvol opgeslagen in de database."
)
else:
print("Geen prijzen beschikbaar om op te slaan.")