energyprices toegevoegd

celery-integration
peter.fong 11 months ago
parent 74223771e6
commit 6990183bee

@ -0,0 +1,109 @@
import requests
import mysql.connector
from mysql.connector import Error
import datetime
import json
import os
from dotenv import load_dotenv
load_dotenv()
# Configuratievariabelen uit .env bestand
ENERVER_API_URL = os.getenv("ENERVER_API_URL_DAILY")
ENERVER_API_TOKEN = os.getenv("ENERVER_API_TOKEN")
# Verkrijg de Telegram configuraties uit het .env bestand
TELEGRAM_API_URL = (
f"https://api.telegram.org/bot{os.getenv('TELEGRAM_BOT_TOKEN')}/sendMessage"
)
TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID")
MYSQL_CONFIG = {
"host": os.getenv("MYSQL_HOST"),
"user": os.getenv("MYSQL_USER"),
"password": os.getenv("MYSQL_PASSWORD"),
"database": os.getenv("MYSQL_DATABASE"),
}
# SQL to create the table
CREATE_TABLE_SQL = """
CREATE TABLE IF NOT EXISTS energy_costs (
id INT AUTO_INCREMENT PRIMARY KEY,
kwh_usage FLOAT NOT NULL,
starttime DATETIME NOT NULL,
endtime DATETIME NOT NULL,
kwhprice FLOAT NOT NULL,
energy_hour DATETIME NOT NULL UNIQUE,
el_costs FLOAT NOT NULL
);
"""
# SQL to insert data
INSERT_DATA_SQL = """
INSERT INTO energy_costs (kwh_usage, starttime, endtime, kwhprice, energy_hour, el_costs)
VALUES (%s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
kwh_usage = VALUES(kwh_usage),
starttime = VALUES(starttime),
endtime = VALUES(endtime),
kwhprice = VALUES(kwhprice),
el_costs = VALUES(el_costs);
"""
# SQL to fetch query results
SELECT_QUERY = """
SELECT Z.kwh as kwh_usage,
Z.starttime, Z.endtime,
dp.price_fr as kwhprice,
dp.timestamp as energy_hour,
(Z.kwh * dp.price_fr) as el_costs
FROM (
SELECT DATE_FORMAT(FROM_UNIXTIME(timestamp), '%Y-%m-%d %H:00:00') AS hour_group,
MAX(net_value) - MIN(net_value) AS kwh,
FROM_UNIXTIME(MIN(timestamp)) as starttime,
FROM_UNIXTIME(MAX(timestamp)) as endtime,
MIN(net_value) as minnet_value,
MAX(net_value) as maxnet_value
FROM energy_usage
GROUP BY DATE_FORMAT(FROM_UNIXTIME(timestamp), '%Y-%m-%d %H:00:00')
) Z
INNER JOIN daillyprices dp
ON dp.timestamp = Z.hour_group;
"""
def main():
try:
# Connect to the database
connection = mysql.connector.connect(**MYSQL_CONFIG)
if connection.is_connected():
cursor = connection.cursor()
# Create the table if it doesn't exist
cursor.execute(CREATE_TABLE_SQL)
connection.commit()
# Fetch the query results
cursor.execute(SELECT_QUERY)
results = cursor.fetchall()
# Insert data into the energy_costs table
for row in results:
cursor.execute(INSERT_DATA_SQL, row)
connection.commit()
print("calculate_energycosts: Data inserted successfully.")
except Error as e:
print(f"Error: {e}")
finally:
if connection.is_connected():
cursor.close()
connection.close()
print("MySQL connection is closed.")
if __name__ == "__main__":
main()

@ -0,0 +1,144 @@
import requests
import mysql.connector
from mysql.connector import Error
import datetime
import json
import os
from dotenv import load_dotenv
# Laad configuratie uit .env bestand
load_dotenv()
# Configuratievariabelen uit .env bestand
ENERVER_API_URL = os.getenv("ENERVER_API_URL_DAILY")
ENERVER_API_TOKEN = os.getenv("ENERVER_API_TOKEN")
# Verkrijg de Telegram configuraties uit het .env bestand
TELEGRAM_API_URL = (
f"https://api.telegram.org/bot{os.getenv('TELEGRAM_BOT_TOKEN')}/sendMessage"
)
TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID")
MYSQL_CONFIG = {
"host": os.getenv("MYSQL_HOST"),
"user": os.getenv("MYSQL_USER"),
"password": os.getenv("MYSQL_PASSWORD"),
"database": os.getenv("MYSQL_DATABASE"),
}
def send_telegram_message(message):
"""Stuur een bericht naar een Telegram chat."""
payload = {"chat_id": TELEGRAM_CHAT_ID, "text": message}
try:
response = requests.post(TELEGRAM_API_URL, data=payload)
response.raise_for_status()
print("Telegram bericht succesvol verzonden.")
except requests.RequestException as e:
print(f"Fout bij het verzenden van Telegram bericht: {e}")
def fetch_enever_prices():
"""Haal stroomprijsgegevens op van de EneVer API."""
params = {"token": ENERVER_API_TOKEN}
try:
response = requests.get(ENERVER_API_URL, params=params)
response.raise_for_status()
return response.json() # Verwacht JSON als antwoord
except requests.RequestException as e:
print(f"Fout bij ophalen EneVer-prijzen: {e}")
return None
def ensure_table_and_columns_exist(cursor, table_name, columns):
"""Controleer of de tabel en kolommen bestaan, en voeg ontbrekende kolommen toe."""
# Zorg dat de tabel bestaat met een unieke constraint op 'timestamp'
cursor.execute(
f"""
CREATE TABLE IF NOT EXISTS {table_name} (
timestamp DATETIME PRIMARY KEY
);
"""
)
# Controleer bestaande kolommen
cursor.execute(f"DESCRIBE {table_name};")
existing_columns = [row[0] for row in cursor.fetchall()]
# Voeg ontbrekende kolommen toe
for col_name, col_type in columns.items():
if col_name not in existing_columns:
cursor.execute(
f"""
ALTER TABLE {table_name} ADD COLUMN {col_name} {col_type};
"""
)
def store_prices_to_db(prices):
"""Sla de relevante stroomprijzen op in een MySQL-database."""
try:
connection = mysql.connector.connect(**MYSQL_CONFIG)
if connection.is_connected():
cursor = connection.cursor()
table_name = "daillyprices"
# Specificeer de kolommen die nodig zijn
columns = {"timestamp": "DATETIME"}
for key in prices["data"][0].keys():
if key.startswith("prijs"):
col_name = f"price_{key[5:].lower()}"
columns[col_name] = "DECIMAL(10, 5)"
# Zorg dat de tabel en kolommen bestaan
ensure_table_and_columns_exist(cursor, table_name, columns)
# Voeg de prijzen toe aan de tabel
for price_entry in prices["data"]:
timestamp = datetime.datetime.strptime(
price_entry["datum"], "%Y-%m-%d %H:%M:%S"
)
insert_columns = ["timestamp"]
insert_values = [timestamp]
update_clause = []
for key, value in price_entry.items():
if key.startswith("prijs"):
col_name = f"price_{key[5:].lower()}"
insert_columns.append(col_name)
insert_values.append(float(value))
update_clause.append(
f"{col_name} = VALUES({col_name})")
insert_query = f"""
INSERT INTO {table_name} ({', '.join(insert_columns)})
VALUES ({', '.join(['%s'] * len(insert_values))})
ON DUPLICATE KEY UPDATE {', '.join(update_clause)};
"""
cursor.execute(insert_query, tuple(insert_values))
connection.commit()
print("Prijzen succesvol opgeslagen in de database.")
except Error as e:
print(f"Fout bij verbinden met de database: {e}")
finally:
if connection.is_connected():
cursor.close()
connection.close()
if __name__ == "__main__":
# Prijzen ophalen van EneVer API
enever_prices = fetch_enever_prices()
if enever_prices:
# Prijzen opslaan in de database
store_prices_to_db(enever_prices)
send_telegram_message(
"De dagelijkse stroomprijzen zijn succesvol opgeslagen in de database."
)
else:
print("Geen prijzen beschikbaar om op te slaan.")

@ -0,0 +1,122 @@
import requests
import mysql.connector
import os
from datetime import datetime
from dotenv import load_dotenv
# Laad configuratie uit .env bestand
load_dotenv()
# Configuratievariabelen uit .env bestand
ENERGY_USAGE_URL = os.getenv("ENERGY_USAGE_URL")
# Verkrijg de Telegram configuraties uit het .env bestand
TELEGRAM_API_URL = (
f"https://api.telegram.org/bot{os.getenv('TELEGRAM_BOT_TOKEN')}/sendMessage"
)
TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID")
MYSQL_CONFIG = {
"host": os.getenv("MYSQL_HOST"),
"user": os.getenv("MYSQL_USER"),
"password": os.getenv("MYSQL_PASSWORD"),
"database": os.getenv("MYSQL_DATABASE"),
}
# JSON URL (pas deze aan naar de werkelijke URL)
json_url = ENERGY_USAGE_URL # Vervang met de URL van de JSON-bron
def send_telegram_message(message):
"""Stuur een bericht naar een Telegram chat."""
payload = {"chat_id": TELEGRAM_CHAT_ID, "text": message}
try:
response = requests.post(TELEGRAM_API_URL, data=payload)
response.raise_for_status()
print("Telegram bericht succesvol verzonden.")
except requests.RequestException as e:
print(f"Fout bij het verzenden van Telegram bericht: {e}")
def create_table_if_not_exists():
"""Controleer of de tabel bestaat en maak deze indien nodig."""
try:
connection = mysql.connector.connect(**MYSQL_CONFIG)
cursor = connection.cursor()
# SQL-query om tabel te maken als deze niet bestaat
create_table_query = """
CREATE TABLE IF NOT EXISTS energy_usage (
id INT AUTO_INCREMENT PRIMARY KEY,
utc DATETIME NOT NULL,
timestamp int,
net_value FLOAT NOT NULL
);
"""
cursor.execute(create_table_query)
connection.commit()
print("Tabel 'energy_usage' gecontroleerd of aangemaakt.")
except mysql.connector.Error as err:
print(f"Fout bij het maken van de tabel: {err}")
finally:
if connection.is_connected():
cursor.close()
connection.close()
def fetch_json_data(url):
"""Fetch JSON data van de opgegeven URL."""
response = requests.get(url)
response.raise_for_status() # Raise een fout als de request mislukt
return response.json()
def save_to_mysql(data):
"""Opslaan van gegevens in MySQL."""
try:
connection = mysql.connector.connect(**MYSQL_CONFIG)
cursor = connection.cursor()
# SQL-query voor invoegen
query = "INSERT INTO energy_usage (timestamp,utc, net_value) VALUES (%s,%s, %s)"
print(query)
for record in data:
# tm omzetten naar datetime
timestamp = record["tm"]
tm_datetime = datetime.fromtimestamp(record["tm"])
net_value = record["net"]
cursor.execute(query, (timestamp, tm_datetime, net_value))
# Bevestig wijzigingen
connection.commit()
except mysql.connector.Error as err:
send_telegram_message(f"Fout bij MySQL: {err}")
finally:
if connection.is_connected():
cursor.close()
connection.close()
def main():
try:
# Controleer of tabel bestaat of maak deze aan
create_table_if_not_exists()
# JSON ophalen
json_data = fetch_json_data(json_url)
# print(json_data)
# Opslaan in MySQL
save_to_mysql(json_data)
except Exception as e:
print(f"Er is een fout opgetreden: {e}")
if __name__ == "__main__":
main()
Loading…
Cancel
Save