You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
kmftools/energie/store_energy_usage.py

126 lines
3.5 KiB

import requests
import mysql.connector
import os
from datetime import datetime
from dotenv import load_dotenv
# Geef het absolute pad op naar je .env bestand
env_path = "./.env" # Pas dit pad aan!
# Laad de .env bestand expliciet
load_dotenv(env_path)
# Configuratievariabelen uit .env bestand
ENERGY_USAGE_URL = os.getenv("ENERGY_USAGE_URL")
# Verkrijg de Telegram configuraties uit het .env bestand
TELEGRAM_API_URL = (
f"https://api.telegram.org/bot{os.getenv('TELEGRAM_BOT_TOKEN')}/sendMessage"
)
TELEGRAM_CHAT_ID = os.getenv("TELEGRAM_CHAT_ID")
MYSQL_CONFIG = {
"host": os.getenv("MYSQL_HOST"),
"user": os.getenv("MYSQL_USER"),
"password": os.getenv("MYSQL_PASSWORD"),
"database": os.getenv("MYSQL_DATABASE"),
}
# JSON URL (pas deze aan naar de werkelijke URL)
json_url = ENERGY_USAGE_URL # Vervang met de URL van de JSON-bron
def send_telegram_message(message):
"""Stuur een bericht naar een Telegram chat."""
payload = {"chat_id": TELEGRAM_CHAT_ID, "text": message}
try:
response = requests.post(TELEGRAM_API_URL, data=payload)
response.raise_for_status()
print("Telegram bericht succesvol verzonden.")
except requests.RequestException as e:
print(f"Fout bij het verzenden van Telegram bericht: {e}")
def create_table_if_not_exists():
"""Controleer of de tabel bestaat en maak deze indien nodig."""
try:
connection = mysql.connector.connect(**MYSQL_CONFIG)
cursor = connection.cursor()
# SQL-query om tabel te maken als deze niet bestaat
create_table_query = """
CREATE TABLE IF NOT EXISTS energy_usage (
id INT AUTO_INCREMENT PRIMARY KEY,
utc DATETIME NOT NULL,
timestamp int,
net_value FLOAT NOT NULL
);
"""
cursor.execute(create_table_query)
connection.commit()
print("Tabel 'energy_usage' gecontroleerd of aangemaakt.")
except mysql.connector.Error as err:
print(f"Fout bij het maken van de tabel: {err}")
finally:
if connection.is_connected():
cursor.close()
connection.close()
def fetch_json_data(url):
"""Fetch JSON data van de opgegeven URL."""
response = requests.get(url)
response.raise_for_status() # Raise een fout als de request mislukt
return response.json()
def save_to_mysql(data):
"""Opslaan van gegevens in MySQL."""
try:
connection = mysql.connector.connect(**MYSQL_CONFIG)
cursor = connection.cursor()
# SQL-query voor invoegen
query = "INSERT INTO energy_usage (timestamp,utc, net_value) VALUES (%s,%s, %s)"
print(query)
for record in data:
# tm omzetten naar datetime
timestamp = record["tm"]
tm_datetime = datetime.fromtimestamp(record["tm"])
net_value = record["net"]
cursor.execute(query, (timestamp, tm_datetime, net_value))
# Bevestig wijzigingen
connection.commit()
except mysql.connector.Error as err:
send_telegram_message(f"Fout bij MySQL: {err}")
finally:
if connection.is_connected():
cursor.close()
connection.close()
def main():
try:
# Controleer of tabel bestaat of maak deze aan
create_table_if_not_exists()
# JSON ophalen
json_data = fetch_json_data(json_url)
# print(json_data)
# Opslaan in MySQL
save_to_mysql(json_data)
except Exception as e:
print(f"Er is een fout opgetreden: {e}")
if __name__ == "__main__":
main()