diff --git a/py/main/get_info.py b/py/main/get_info.py index f009c41..b79368c 100644 --- a/py/main/get_info.py +++ b/py/main/get_info.py @@ -3,6 +3,12 @@ from datetime import timedelta, datetime import sys import requests +from Crypto.Cipher import AES +from Crypto.Util.Padding import pad, unpad +from datetime import datetime +import getpass +from get_bearer_token import get_bearer_token +import base64 logging.basicConfig(level=logging.INFO) logger = logging.getLogger("开始打印日志") @@ -128,6 +134,81 @@ def get_segment(build_id, nowday): sys.exit() +# 根据当前系统时间获取 key +def get_key(): + # 获取当前日期,并转换为字符串 + current_date = datetime.now().strftime("%Y%m%d") + + # 生成回文 + palindrome = current_date[::-1] + + # 使用当前日期和回文作为密钥 + key = current_date + palindrome + + # print("当前日期:", current_date) + # print("回文:", palindrome) + # print("密钥:", key) + + return key + + +# 读取授权码 +def get_auth_token(): + # token = "test" + # return token + try: + # 从命令行中获取用户名和密码 + username = input("请输入用户名(学号): \n") + password = getpass.getpass('请输入密码: \n') + name, token = get_bearer_token(username, password) + logger.info(f"你好,{name}同学") + new_token = "bearer" + str(token) + # logger.info(new_token) + return new_token + except Exception as e: + logger.error(f"获取授权码时发生异常: {str(e)}") + sys.exit() + + +# 加密函数 +def encrypt(text): + # 自动获取 key + key = get_key() + # 目前获取到的加密密钥 + iv = "ZZWBKJ_ZHIHUAWEI" + key_bytes = key.encode('utf-8') + iv_bytes = iv.encode('utf-8') + + cipher = AES.new(key_bytes, AES.MODE_CBC, iv_bytes) + ciphertext_bytes = cipher.encrypt(pad(text.encode('utf-8'), AES.block_size)) + + return base64.b64encode(ciphertext_bytes).decode('utf-8') + + +# 定义解密函数 +def decrypt(ciphertext): + # 自动获取 key + key = get_key() + # 目前获取到的加密密钥 + iv = "ZZWBKJ_ZHIHUAWEI" + + # 将密钥和初始化向量转换为 bytes 格式 + key_bytes = key.encode('utf-8') + iv_bytes = iv.encode('utf-8') + + # 将密文进行 base64 解码 + ciphertext = base64.b64decode(ciphertext) + + # 使用 AES 进行解密 + cipher = AES.new(key_bytes, AES.MODE_CBC, iv_bytes) + decrypted_bytes = cipher.decrypt(ciphertext) + + # 去除解密后的填充 + decrypted_text = unpad(decrypted_bytes, AES.block_size).decode('utf-8') + + return decrypted_text + + def get_seat_info(build_id, segment, nowday): try: interrupted = False diff --git a/py/main/get_seat.py b/py/main/get_seat.py index 74c9077..d222637 100644 --- a/py/main/get_seat.py +++ b/py/main/get_seat.py @@ -1,17 +1,17 @@ -import base64 +import asyncio + import logging import multiprocessing import random import time -from datetime import datetime -import getpass + +from telegram import Bot import requests -from Crypto.Cipher import AES -from Crypto.Util.Padding import pad, unpad + import sys -from get_bearer_token import get_bearer_token -from get_info import get_date, get_seat_info, get_segment, get_build_id + +from get_info import get_date, get_seat_info, get_segment, get_build_id, get_auth_token, encrypt # 配置日志 logging.basicConfig(level=logging.INFO) @@ -22,26 +22,10 @@ # 在代码的顶部定义全局变量 global_exclude_ids = set() seat_result = {} - - -# 读取授权码 -def get_auth_token(): - # token = "test" - # return token - try: - # 从命令行中获取用户名和密码 - username = input("请输入用户名(学号): \n") - password = getpass.getpass('请输入密码: \n') - name, token = get_bearer_token(username, password) - logger.info(f"你好,{name}同学") - new_token = "bearer" + str(token) - # logger.info(new_token) - return new_token - except Exception as e: - logger.error(f"获取授权码时发生异常: {str(e)}") - sys.exit() - - +CHANNEL_ID = "Your Channel id" +TELEGRAM_BOT_TOKEN = 'Your telegram token' +TELEGRAM_URL = "https://telegram.sakurasep.workers.dev/bot" +message = "" MAX_RETRIES = 3 # 最大重试次数 RETRY_DELAY = 5 # 重试间隔时间(秒) @@ -66,61 +50,19 @@ def send_post_request_and_save_response(url, data, headers): sys.exit() -# 根据当前系统时间获取 key -def get_key(): - # 获取当前日期,并转换为字符串 - current_date = datetime.now().strftime("%Y%m%d") - - # 生成回文 - palindrome = current_date[::-1] - - # 使用当前日期和回文作为密钥 - key = current_date + palindrome - - # print("当前日期:", current_date) - # print("回文:", palindrome) - # print("密钥:", key) - - return key - - -# 加密函数 -def encrypt(text): - # 自动获取 key - key = get_key() - # 目前获取到的加密密钥 - iv = "ZZWBKJ_ZHIHUAWEI" - key_bytes = key.encode('utf-8') - iv_bytes = iv.encode('utf-8') +def add_message(text): + global message + message += text - cipher = AES.new(key_bytes, AES.MODE_CBC, iv_bytes) - ciphertext_bytes = cipher.encrypt(pad(text.encode('utf-8'), AES.block_size)) - return base64.b64encode(ciphertext_bytes).decode('utf-8') - - -# 定义解密函数 -def decrypt(ciphertext): - # 自动获取 key - key = get_key() - # 目前获取到的加密密钥 - iv = "ZZWBKJ_ZHIHUAWEI" - - # 将密钥和初始化向量转换为 bytes 格式 - key_bytes = key.encode('utf-8') - iv_bytes = iv.encode('utf-8') - - # 将密文进行 base64 解码 - ciphertext = base64.b64decode(ciphertext) - - # 使用 AES 进行解密 - cipher = AES.new(key_bytes, AES.MODE_CBC, iv_bytes) - decrypted_bytes = cipher.decrypt(ciphertext) - - # 去除解密后的填充 - decrypted_text = unpad(decrypted_bytes, AES.block_size).decode('utf-8') - - return decrypted_text +async def send_seat_result_to_channel(): + try: + # 使用 API 令牌初始化您的机器人 + bot = Bot(token=TELEGRAM_BOT_TOKEN) + logger.info(f"要发送的消息为: {message}\n") + await bot.send_message(chat_id=CHANNEL_ID, text=message) + except Exception as e: + logger.error(f"发送消息到 Telegram 失败: {e}") # 状态检测函数 @@ -138,7 +80,10 @@ def check_reservation_status(seat_id, m): logger.info("重复预约, 请检查选择的时间段或是否已经成功预约") return True elif status == "预约成功": + # elif "1" == "1": logger.info("成功预约") + add_message(f"预约状态为:{status}") + asyncio.run(send_seat_result_to_channel()) return True elif status == "开放预约时间19:20": logger.info("未到预约时间,程序将会 10s 查询一次") @@ -149,15 +94,19 @@ def check_reservation_status(seat_id, m): return True elif status == "该空间当前状态不可预约": logger.info("此位置已被预约") - if m == "2": - new_seat_id = input("请重新输入想要预约的相同自习室的 id:\n") - return False, new_seat_id - else: + if m == "3": logger.info(f"{seat_id} 已被预约,加入排除名单") global_exclude_ids.add(seat_id) time.sleep(1) # logger.info(global_exclude_ids) return False + elif m == "2": + seat_id = input("请重新输入想要预约的相同自习室的 id:\n") + return seat_id + else: + logger.info(f"{seat_id} 已被预约,重新刷新状态") + time.sleep(1) + return False else: logger.info("未知状态,程序退出") @@ -240,12 +189,15 @@ def prefer_get_select_id(build_id): def get_and_select_seat_default(auth, build_id, segment, nowday, m): # 初始化 interrupted = False + global message try: while not interrupted: if m == "1": # logger.info(build_id) select_id = prefer_get_select_id(build_id) logger.info(f"优选的座位是:{select_id}") + message = f"优选的座位是:{select_id}" + time.sleep(1) else: # logger.info(f"获取的 segment: {segment}") data = get_seat_info(build_id, segment, nowday) @@ -260,8 +212,8 @@ def get_and_select_seat_default(auth, build_id, segment, nowday, m): # 获取该字典中 'id' 键对应的值 select_id = random_dict['id'] seat_no = random_dict['no'] - logger.info(f"随机选择的座位为: {select_id} 真实位置: {seat_no}") + message = f"随机选择的座位为: {select_id} 真实位置: {seat_no} \n" if select_id is None: logger.info("选定座位为空, 程序继续运行") @@ -270,7 +222,7 @@ def get_and_select_seat_default(auth, build_id, segment, nowday, m): else: post_to_get_seat(select_id, segment, auth) # logger.info(seat_result) - interrupted = check_reservation_status(select_id, "1") + interrupted = check_reservation_status(select_id, m) except KeyboardInterrupt: logger.info(f"接收到中断信号,程序将退出。") @@ -286,7 +238,7 @@ def get_and_select_seat_selected(auth, segment, seat_id): logger.info(f"你选定的座位为: {seat_id}") post_to_get_seat(seat_id, segment, auth) # logger.info(seat_result) - interrupted, seat_id = check_reservation_status(seat_id, "2") + interrupted = check_reservation_status(seat_id, "2") except KeyboardInterrupt: logger.info(f"接收到中断信号,程序将退出。")