import asyncio import datetime import json import logging import os import sqlite3 import subprocess from sqlite3 import Connection from typing import Optional, Callable import requests import telegram.constants from bs4 import BeautifulSoup from telegram import Update, InlineKeyboardMarkup, InlineKeyboardButton from telegram.ext import ApplicationBuilder, CommandHandler, ContextTypes, CallbackQueryHandler, Application log_dir = os.getenv("BEEBOT_LOGS") if not log_dir: log_dir = "/mnt/logs" if not os.path.exists(log_dir): os.mkdir(log_dir) logging.basicConfig( format="[%(asctime)s] %(name)s/%(levelname)s - %(message)s", level=logging.INFO, handlers=[ logging.FileHandler( os.path.join( log_dir, datetime.datetime.today().strftime("%Y%m%d-%H%M%S.log") ) ), logging.StreamHandler() ] ) logging.getLogger("httpx").setLevel(logging.WARNING) logger = logging.getLogger(__name__) def flag(code): offset = 127462 - ord('A') code = code.upper() return chr(ord(code[0]) + offset) + chr(ord(code[1]) + offset) class BeeBot: MENU_URL = "https://www.epfl.ch/campus/restaurants-shops-hotels/fr/industrie21-epfl-valais-wallis/?date={date}" RESTO_ID = 545 BASE_DIR = os.path.dirname(__file__) CACHE_PATH = os.path.join(BASE_DIR, "cache.json") DB_PATH = os.path.join(BASE_DIR, "database.db") TEMPLATE_PATH = os.path.join(BASE_DIR, "menu.typ") IMG_DIR = "/tmp/menu_images" LANGUAGES = { "en": { "emoji": flag("gb"), "name": "English" }, "fr": { "emoji": flag("fr"), "name": "Français" }, "de": { "emoji": flag("de"), "name": "Deutsch" } } CATEGORIES = { "E": { "name": "category.student" }, "D": { "name": "category.phd_student" }, "C": { "name": "category.campus" }, "V": { "name": "category.visitor" } } def __init__(self, token: str): self.tg_token: str = token self.tg_app: Optional[Application] = None self.cache: dict[str, str] = {} self.db_con: Optional[Connection] = None self.langs: Optional[dict[str, dict[str, str]]] = None self.fetch_lock: asyncio.Lock = asyncio.Lock() self.gen_lock: asyncio.Lock = asyncio.Lock() def mainloop(self): logger.info("Initialization") self.db_con = sqlite3.connect(self.DB_PATH) self.check_database() self.load_cache() self.load_i18n() self.tg_app = ApplicationBuilder().token(self.tg_token).build() self.tg_app.add_handler(CommandHandler("week", self.cmd_week)) self.tg_app.add_handler(CommandHandler("today", self.cmd_today)) self.tg_app.add_handler(CommandHandler("settings", self.cmd_settings)) self.tg_app.add_handler(CallbackQueryHandler(self.cb_handler)) logger.info("Starting bot") self.tg_app.run_polling() def load_i18n(self) -> None: with open(os.path.join(self.BASE_DIR, "lang.json"), "r") as f: self.langs = json.load(f) def i18n(self, lang: str, key: str) -> str: if lang not in self.langs: lang = "en" if key not in self.langs[lang]: return f"[{key}]" return self.langs[lang][key] async def cmd_week(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: logger.debug("Received /week") await self.request_menu(update, context, False) async def cmd_today(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: logger.debug("Received /today") await self.request_menu(update, context, True) async def cmd_settings(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: logger.debug("Received /settings") lang = self.get_user_pref(update, context)["lang"] menu = self.get_settings_menu(context) reply_markup = InlineKeyboardMarkup(menu) await update.effective_chat.send_message(text=self.i18n(lang, "menu.settings"), reply_markup=reply_markup) async def cb_handler(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: query = update.callback_query if query.data == "change_language": await self.cb_change_language(update, context) elif query.data.startswith("set_language"): await self.cb_set_language(update, context) elif query.data == "change_categories": await self.cb_change_categories(update, context) elif query.data.startswith("toggle_category"): await self.cb_toggle_category(update, context) elif query.data == "back_to_settings": await self.show_menu(update, context, "menu.settings", self.get_settings_menu) async def show_menu(self, update: Update, context: ContextTypes.DEFAULT_TYPE, text_key: str, menu_func: Callable[[ContextTypes.DEFAULT_TYPE], list[list[InlineKeyboardButton]]]) -> None: reply_markup = InlineKeyboardMarkup(menu_func(context)) await update.effective_message.edit_text( text=self.i18n(context.user_data["lang"], text_key), reply_markup=reply_markup ) async def cb_change_language(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: logger.debug("Clicked 'Change language'") await self.show_menu(update, context, "menu.languages", self.get_language_menu) async def cb_set_language(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: lang = update.callback_query.data.split(":")[1] logger.debug(f"Clicked 'Set language to {lang}'") cur = self.db_con.cursor() cur.execute( "UPDATE user SET lang=? WHERE telegram_id=?", (lang, update.effective_user.id) ) self.db_con.commit() cur.close() self.get_user_pref(update, context) await self.show_menu(update, context, "menu.languages", self.get_language_menu) async def cb_change_categories(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: logger.debug("Clicked 'Change categories'") await self.show_menu(update, context, "menu.categories", self.get_categories_menu) async def cb_toggle_category(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: category = update.callback_query.data.split(":")[1] logger.debug(f"Clicked 'Toggle category {category}'") categories: set[str] = context.user_data["categories"] if category in categories: categories.remove(category) else: categories.add(category) cur = self.db_con.cursor() cur.execute( "UPDATE user SET categories=? WHERE telegram_id=?", (",".join(categories), update.effective_user.id) ) self.db_con.commit() cur.close() self.get_user_pref(update, context) await self.show_menu(update, context, "menu.categories", self.get_categories_menu) async def request_menu(self, update: Update, context: ContextTypes.DEFAULT_TYPE, today_only: bool) -> None: chat_id = update.effective_chat.id await context.bot.send_chat_action(chat_id=chat_id, action=telegram.constants.ChatAction.TYPING) prefs = self.get_user_pref(update) logger.debug(f"User prefs: {prefs}") categories = prefs["categories"] img_id = self.get_img_id(today_only, categories) filename = img_id + ".png" img_path = os.path.join(self.IMG_DIR, filename) menu_id = "today_menu" if today_only else "week_menu" menu_path = os.path.join(self.BASE_DIR, "menus_today.json" if today_only else "menus_week.json") # If menu needs to be fetched if not os.path.exists(menu_path) or self.is_outdated(menu_id, today_only): # Notify user msg = await update.message.reply_text(self.i18n(prefs["lang"], "notif.wait_updating")) async with self.fetch_lock: if not os.path.exists(menu_path) or self.is_outdated(menu_id, today_only): if today_only: await self.fetch_today_menu() else: await self.fetch_week_menu() await msg.delete() # If image needs to be (re)generated if not os.path.exists(img_path) or self.is_outdated(img_id, today_only): async with self.gen_lock: if not os.path.exists(img_path) or self.is_outdated(img_id, today_only): await self.gen_image(today_only, categories, img_path, img_id) await context.bot.send_photo(chat_id=chat_id, photo=img_path) def check_database(self): cur = self.db_con.cursor() cur.execute(""" CREATE TABLE IF NOT EXISTS "user" ( "id" INTEGER, "telegram_id" INTEGER NOT NULL UNIQUE, "categories" TEXT, "lang" TEXT NOT NULL DEFAULT 'fr', "created_at" TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY("id" AUTOINCREMENT) ); """) self.db_con.commit() cur.close() def get_user_pref(self, update: Update, context: Optional[ContextTypes.DEFAULT_TYPE] = None) -> dict: user_id = update.effective_user.id cur = self.db_con.cursor() res = cur.execute("SELECT categories, lang FROM user WHERE telegram_id=?", (user_id,)) user = res.fetchone() cur.close() prefs = { "categories": {"E", "D", "C", "V"}, "lang": "fr" } if user is None: self.create_user(user_id) else: categories = set(user[0].split(",")) prefs = { "categories": categories, "lang": user[1] } if context is not None: context.user_data["categories"] = prefs["categories"] context.user_data["lang"] = prefs["lang"] return prefs def create_user(self, telegram_id: int) -> None: logger.debug(f"New user with id {telegram_id}") cur = self.db_con.cursor() cur.execute( "INSERT INTO user (telegram_id, categories, lang) VALUES (?, ?, ?)", (telegram_id, "E,D,C,V", "fr") ) self.db_con.commit() cur.close() def load_cache(self): if os.path.exists(self.CACHE_PATH): with open(self.CACHE_PATH, "r") as f: self.cache = json.load(f) def save_cache(self): with open(self.CACHE_PATH, "w") as f: json.dump(self.cache, f, indent=4) @staticmethod def get_img_id(today_only: bool, categories: set[str]) -> str: categs = "".join(sorted(categories)) layout = "today" if today_only else "week" return f"{layout}-{categs}" def is_outdated(self, obj_id: str, today_only: bool) -> bool: if obj_id not in self.cache: return True gen_date = datetime.datetime.strptime(self.cache[obj_id], "%Y-%m-%d") today = datetime.datetime.today() if today_only: if (today - gen_date).days > 0: return True else: delta1 = datetime.timedelta(days=gen_date.weekday()) delta2 = datetime.timedelta(days=today.weekday()) gen_monday = gen_date - delta1 cur_monday = today - delta2 if (cur_monday - gen_monday).days > 0: return True return False async def gen_image(self, today_only: bool, categories: set[str], outpath: str, img_id: str) -> None: if not os.path.exists(self.IMG_DIR): os.mkdir(self.IMG_DIR) logger.info(f"Generating {'today' if today_only else 'week'} menu image for categories {categories}") menu_path = "menus_today.json" if today_only else "menus_week.json" subprocess.call([ "typst", "compile", "--root", self.BASE_DIR, "--font-path", os.path.join(self.BASE_DIR, "fonts"), "--input", f"path={menu_path}", "--input", f"categories={','.join(categories)}", self.TEMPLATE_PATH, outpath ]) self.cache[img_id] = datetime.datetime.today().strftime("%Y-%m-%d") self.save_cache() def save_menu(self, days: list, menu_id: str, filename: str) -> None: with open(os.path.join(self.BASE_DIR, filename), "w") as f: json.dump(days, f) self.cache[menu_id] = datetime.datetime.today().strftime("%Y-%m-%d") self.save_cache() async def fetch_week_menu(self) -> None: logger.info("Fetching week menu") today = datetime.datetime.today() delta = datetime.timedelta(days=today.weekday()) monday = today - delta days = [] for i in range(5): dt = datetime.timedelta(days=i) date = monday + dt menus = await self.fetch_menu(date) days.append({ "date": date.strftime("%Y-%m-%d"), "menus": menus }) self.save_menu(days, "week_menu", "menus_week.json") async def fetch_today_menu(self) -> None: logger.info("Fetching today menu") today = datetime.datetime.today() menus = await self.fetch_menu(today) days = [{ "date": today.strftime("%Y-%m-%d"), "menus": menus }] self.save_menu(days, "today_menu", "menus_today.json") async def fetch_menu(self, date: datetime.date) -> list: url = self.MENU_URL.format(date=date.strftime("%Y-%m-%d")) headers = { "User-Agent": "BeeBot 1.0" } res = requests.get(url, headers=headers) if res.status_code != 200: return [] bs = BeautifulSoup(res.content, features="lxml") table = bs.find("table", {"id": "menuTable"}) lines = table.find("tbody").findAll("tr", {"data-restoid": str(self.RESTO_ID)}) menus = [] for line in lines: menu = line.find("td", class_="menu").find("div", class_="descr").contents[0].text menu_lines = menu.split("\n") price_cells = line.find("td", class_="prices").findAll("span", class_="price") prices = {} for cell in price_cells: category = cell.find("abbr").text price = cell.contents[-1].replace("CHF", "").strip() prices[category] = float(price) menus.append({ "name": menu_lines[0], "extra": menu_lines[1:], "prices": prices }) return menus def get_settings_menu(self, context: ContextTypes.DEFAULT_TYPE) -> list[list[InlineKeyboardButton]]: lang = context.user_data["lang"] menu = [ [ InlineKeyboardButton( self.i18n(lang, "setting.language").format(lang), callback_data="change_language" ) ], [ InlineKeyboardButton( self.i18n(lang, "setting.categories").format(" / ".join(context.user_data["categories"])), callback_data="change_categories" ) ] ] return menu def get_language_menu(self, context: ContextTypes.DEFAULT_TYPE) -> list[list[InlineKeyboardButton]]: user_lang = context.user_data["lang"] buttons = [] for lang, data in self.LANGUAGES.items(): extra = " ✅" if user_lang == lang else "" buttons.append( InlineKeyboardButton( data["emoji"] + extra, callback_data=f"set_language:{lang}" ) ) menu = [buttons[i:i+2] for i in range(0, len(buttons), 2)] menu.append([ InlineKeyboardButton( self.i18n(user_lang, "menu.back_to_settings"), callback_data="back_to_settings" ) ]) return menu def get_categories_menu(self, context: ContextTypes.DEFAULT_TYPE) -> list[list[InlineKeyboardButton]]: lang = context.user_data["lang"] buttons = [] for categ, data in self.CATEGORIES.items(): extra = " ✅" if categ in context.user_data["categories"] else " ❌" buttons.append( InlineKeyboardButton( self.i18n(lang, data["name"]) + extra, callback_data=f"toggle_category:{categ}" ) ) menu = [buttons[i:i+2] for i in range(0, len(buttons), 2)] menu.append([ InlineKeyboardButton( self.i18n(lang, "menu.back_to_settings"), callback_data="back_to_settings" ) ]) return menu if __name__ == '__main__': logger.info("Welcome to BeeBot !") bot = BeeBot(os.getenv("TELEGRAM_TOKEN")) bot.mainloop()