|

楼主 |
发表于 2025-4-8 12:33:41
|
显示全部楼层
本帖最后由 陈先森 于 2025-4-8 19:58 编辑
import numpy as np
import pandas as pd
from tqdm import tqdm
from sklearn.model_selection import TimeSeriesSplit
from sklearn.feature_selection import RFE
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import top_k_accuracy_score
from xgboost import XGBClassifier
import lightgbm as lgb
import warnings
import logging
from datetime import datetime
import os
from collections import defaultdict
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('lottery_prediction.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
warnings.filterwarnings('ignore')
class LotteryPredictor:
def __init__(self):
self.data = []
self.next_period = 1
self.models = {
'xgb': None,
'lgb': None
}
self.feature_names = [
'period', 'prev_normal1', 'prev_normal2', 'prev_normal3',
'prev_normal4', 'prev_normal5', 'prev_normal6', 'prev_special',
'parity', 'size', 'prime', 'freq_window', 'freq_historical',
'is_consecutive', 'diff_mean', 'diff_max', 'same_last_digit'
]
self.output_dir = 'output'
def ensure_output_dir(self):
"""确保输出目录存在"""
if not os.path.exists(self.output_dir):
os.makedirs(self.output_dir)
logger.info(f"创建输出目录: {self.output_dir}")
def get_output_path(self, filename):
"""获取输出文件完整路径"""
return os.path.join(self.output_dir, f"{self.next_period}期_{filename}")
def load_data(self, file_path):
"""加载历史数据"""
try:
logger.info(f"正在从 {file_path} 加载数据...")
if not os.path.exists(file_path):
raise FileNotFoundError(f"数据文件 {file_path} 不存在")
with open(file_path, 'r') as f:
for line in f:
line = line.strip()
if line:
parts = line.split()
if len(parts) == 8: # 确保是完整数据行
period = int(parts[0])
normal_numbers = list(map(int, parts[1:7]))
special_number = int(parts[7])
self.data.append({
'period': period,
'normal': normal_numbers,
'special': special_number
})
self.next_period = period + 1
logger.info(f"成功加载 {len(self.data)} 条历史数据")
if len(self.data) < 50:
logger.warning("数据量较少,预测准确性可能受影响")
except Exception as e:
logger.error(f"加载数据时出错: {str(e)}")
raise
def is_prime(self, n):
"""判断是否为质数"""
if n < 2: return 0
for i in range(2, int(np.sqrt(n)) + 1):
if n % i == 0: return 0
return 1
def generate_features(self):
"""生成特征"""
if len(self.data) < 2:
logger.error("至少需要2期数据才能生成特征")
return None, None, None
features = []
targets = []
window_size = min(50, len(self.data)) # 动态调整窗口大小
logger.info("正在生成特征...")
for i in tqdm(range(1, len(self.data))):
current = self.data
prev = self.data[i-1]
# 基础特征
period = current['period']
prev_normal = prev['normal']
prev_special = prev['special']
# 为每个平码生成特征
for num_idx in range(6): # 平码1-6
current_num = current['normal'][num_idx]
# 数值特征
parity = current_num % 2
size = 1 if current_num <= 24 else 0
prime = self.is_prime(current_num)
# 时序特征
window_numbers = [d['normal'][num_idx] for d in self.data[max(0,i-window_size):i]]
freq_in_window = window_numbers.count(current_num) / len(window_numbers) if window_numbers else 0
all_numbers = [d['normal'][num_idx] for d in self.data[:i]]
freq_historical = all_numbers.count(current_num) / len(all_numbers) if all_numbers else 0
is_consecutive = 1 if abs(current_num - prev['normal'][num_idx]) == 1 else 0
# 关联特征
diff_mean = np.mean([current_num - n for n in prev_normal])
diff_max = max([current_num - n for n in prev_normal])
same_last_digit = sum([1 for n in prev_normal if n % 10 == current_num % 10])
# 构建特征向量
feature_vector = [
period, *prev_normal, prev_special, parity, size, prime,
freq_in_window, freq_historical, is_consecutive,
diff_mean, diff_max, same_last_digit
]
features.append(feature_vector)
targets.append((num_idx, current_num - 1)) # 转换为(平码索引, 0-48的类别)
# 转换为numpy数组
features = np.array(features)
targets = np.array(targets)
# 确保是二维数组
if features.ndim == 1:
features = features.reshape(-1, len(self.feature_names))
# 分离期号
periods = features[:, 0].astype(int)
train_features = features[:, 1:]
return train_features, targets, periods
def initialize_models(self):
"""初始化模型"""
logger.info("正在初始化模型...")
# 为每个平码位置初始化模型
self.models['xgb'] = [XGBClassifier(
n_estimators=1200,
learning_rate=0.05,
max_depth=6,
subsample=0.75,
colsample_bytree=0.82,
reg_alpha=0.3,
reg_lambda=1,
num_class=49,
objective='multi:softmax',
n_jobs=-1,
eval_metric='mlogloss',
tree_method='gpu_hist' if self.check_gpu() else 'auto',
predictor='gpu_predictor' if self.check_gpu() else 'cpu_predictor'
) for _ in range(6)]
self.models['lgb'] = [lgb.LGBMClassifier(
n_estimators=2150,
learning_rate=0.041,
num_leaves=71,
subsample=0.78,
colsample_bytree=0.85,
reg_alpha=0.12,
reg_lambda=0.68,
num_class=49,
objective='multiclass',
n_jobs=-1,
device='cpu' if self.check_gpu() else 'cpu'
) for _ in range(6)]
def check_gpu(self):
"""检查GPU是否可用"""
try:
from numba import cuda
return cuda.is_available()
except:
return False
def time_series_cv(self, model, X, y):
"""时间序列交叉验证"""
if len(X) < 20:
logger.warning("数据量不足,跳过交叉验证")
return 0.5
n_splits = 5
test_size = max(5, int(len(X) * 0.2)) # 确保测试集至少有5个样本
if len(X) < n_splits * test_size + 20:
n_splits = max(1, (len(X) - 20) // test_size)
logger.info(f"调整交叉验证分割数为 {n_splits}")
tscv = TimeSeriesSplit(n_splits=n_splits, test_size=test_size)
accuracies = []
for train_idx, test_idx in tscv.split(X):
X_train, X_test = X[train_idx], X[test_idx]
y_train, y_test = y[train_idx], y[test_idx]
# 添加虚拟样本确保所有类别存在
dummy_X = np.zeros((49, X_train.shape[1]))
dummy_y = np.arange(49)
X_train = np.vstack([X_train, dummy_X])
y_train = np.hstack([y_train, dummy_y])
# 标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# 特征选择
selector = RFE(
estimator=XGBClassifier(
n_estimators=100,
learning_rate=0.1,
max_depth=3,
num_class=49,
objective='multi:softmax'
),
n_features_to_select=12,
step=0.1
)
selector.fit(X_train_scaled, y_train)
X_train_selected = selector.transform(X_train_scaled)
X_test_selected = selector.transform(X_test_scaled)
# 模型训练
model.fit(
X_train_selected, y_train,
eval_set=[(X_test_selected, y_test)],
verbose=False
)
# 评估
y_pred_proba = model.predict_proba(X_test_selected)
top16_acc = top_k_accuracy_score(y_test, y_pred_proba, k=16, labels=np.arange(49))
accuracies.append(top16_acc)
return np.mean(accuracies)
def train_models(self, X, y):
"""训练模型"""
if len(X) == 0:
logger.error("没有训练数据可用")
return None, None
# 数据划分 - 从训练集中分出验证集
split_idx = int(len(X) * 0.8)
X_train, X_val = X[:split_idx], X[split_idx:]
y_train, y_val = y[:split_idx], y[split_idx:]
# 为每个平码位置训练模型
xgb_scores = []
lgb_scores = []
for num_idx in range(6):
# 获取当前平码位置的数据
mask = y_train[:, 0] == num_idx
X_train_pos = X_train[mask]
y_train_pos = y_train[mask, 1]
mask_val = y_val[:, 0] == num_idx
X_val_pos = X_val[mask_val] if len(X_val) > 0 else None
y_val_pos = y_val[mask_val, 1] if len(X_val) > 0 else None
if len(X_train_pos) == 0:
logger.warning(f"平码{num_idx+1}训练数据不足,使用默认模型")
xgb_scores.append(0.5)
lgb_scores.append(0.5)
continue
# 添加虚拟样本确保所有类别存在
dummy_X = np.zeros((49, X_train_pos.shape[1]))
dummy_y = np.arange(49)
X_train_full = np.vstack([X_train_pos, dummy_X])
y_train_full = np.hstack([y_train_pos, dummy_y])
# 特征选择
selector = RFE(
estimator=XGBClassifier(
n_estimators=100,
learning_rate=0.1,
max_depth=3,
num_class=49,
objective='multi:softmax'
),
n_features_to_select=12,
step=0.1
)
selector.fit(X_train_full, y_train_full)
X_train_selected = selector.transform(X_train_full)
X_val_selected = selector.transform(X_val_pos) if X_val_pos is not None and len(X_val_pos) > 0 else None
# 训练XGBoost模型
try:
if X_val_selected is not None and len(X_val_selected) > 0:
self.models['xgb'][num_idx].fit(
X_train_selected, y_train_full,
eval_set=[(X_val_selected, y_val_pos)],
verbose=False
)
else:
# 如果没有验证集,移除early_stopping_rounds
self.models['xgb'][num_idx].set_params(early_stopping_rounds=None)
self.models['xgb'][num_idx].fit(X_train_selected, y_train_full, verbose=False)
except Exception as e:
logger.error(f"训练XGBoost模型(平码{num_idx+1})时出错: {str(e)}")
# 作为后备方案,使用默认参数重新初始化模型
self.models['xgb'][num_idx] = XGBClassifier(
n_estimators=1200,
learning_rate=0.037,
max_depth=6,
subsample=0.75,
colsample_bytree=0.82,
reg_alpha=0.15,
reg_lambda=0.75,
num_class=49,
objective='multi:softmax',
n_jobs=-1,
eval_metric='mlogloss',
tree_method='gpu_hist' if self.check_gpu() else 'auto',
predictor='gpu_predictor' if self.check_gpu() else 'cpu_predictor'
)
self.models['xgb'][num_idx].fit(X_train_selected, y_train_full, verbose=False)
# 训练LightGBM模型
self.models['lgb'][num_idx].fit(X_train_selected, y_train_full, verbose=False)
# 交叉验证评分
xgb_score = 0.5 # 默认值
lgb_score = 0.5 # 默认值
if len(X_train_pos) > 20: # 只有数据量足够时才进行交叉验证
logger.info(f"正在进行平码{num_idx+1}交叉验证...")
xgb_score = self.time_series_cv(self.models['xgb'][num_idx], X_train_pos, y_train_pos)
lgb_score = self.time_series_cv(self.models['lgb'][num_idx], X_train_pos, y_train_pos)
logger.info(f"平码{num_idx+1} XGBoost 平均 Top-16 准确率: {xgb_score:.4f}")
logger.info(f"平码{num_idx+1} LightGBM 平均 Top-16 准确率: {lgb_score:.4f}")
else:
logger.warning(f"平码{num_idx+1}数据量不足,跳过交叉验证,使用默认评分0.5")
xgb_scores.append(xgb_score)
lgb_scores.append(lgb_score)
return xgb_scores, lgb_scores
def predict(self, X, periods, xgb_scores, lgb_scores):
"""生成预测结果"""
if len(X) == 0:
logger.warning("没有测试数据,将生成随机预测")
return self.generate_random_prediction()
# 为每个平码位置生成预测
all_probs = []
for num_idx in range(6):
# 获取当前平码位置的数据
mask = (periods == self.next_period - 1) # 假设最后一条记录是上一期
X_pos = X[mask]
if len(X_pos) == 0:
logger.warning(f"没有平码{num_idx+1}的测试数据,使用随机预测")
probs = np.ones(49) / 49
all_probs.append(probs)
continue
# 特征选择
selector = RFE(
estimator=XGBClassifier(
n_estimators=100,
learning_rate=0.1,
max_depth=3,
num_class=49,
objective='multi:softmax'
),
n_features_to_select=12,
step=0.1
)
selector.fit(X_pos, np.zeros(len(X_pos))) # 伪拟合,实际应该使用训练好的selector
X_selected = selector.transform(X_pos)
# 集成预测
weights = [
xgb_scores[num_idx] / (xgb_scores[num_idx] + lgb_scores[num_idx]),
lgb_scores[num_idx] / (xgb_scores[num_idx] + lgb_scores[num_idx])
]
xgb_probs = self.models['xgb'][num_idx].predict_proba(X_selected)
lgb_probs = self.models['lgb'][num_idx].predict_proba(X_selected)
final_probs = xgb_probs * weights[0] + lgb_probs * weights[1]
# 温度缩放校准
temperature = 0.8
calibrated_probs = np.exp(np.log(final_probs) / temperature)
calibrated_probs /= calibrated_probs.sum(axis=1)[:, np.newaxis]
all_probs.append(calibrated_probs[0]) # 取第一条记录的预测
return all_probs
def generate_random_prediction(self):
"""生成随机预测(当没有数据时)"""
return [np.ones(49) / 49 for _ in range(6)]
def generate_statistics_files(self):
"""生成平码统计文件"""
if not self.data:
logger.warning("没有数据,无法生成统计文件")
return
# 确保输出目录存在
self.ensure_output_dir()
# 统计所有平码出现次数
number_counts = defaultdict(int)
for record in self.data:
for num in record['normal']:
number_counts[num] += 1
# 按出现次数分组
count_groups = defaultdict(list)
for num, count in number_counts.items():
count_groups[count].append(num)
# 生成平码统计次数文件
stats_content = f"{self.data[-1]['period']}期 平码统计次数\n"
stats_content += "=" * 60 + "\n"
# 按出现次数从高到低排序
sorted_counts = sorted(count_groups.keys(), reverse=True)
for count in sorted_counts:
numbers = sorted(count_groups[count])
stats_content += f"〖{count}次〗:{','.join(map(str, numbers))}(共{len(numbers)}个)\n\n"
stats_content += "=" * 60 + "\n"
stats_file = self.get_output_path("平码统计次数.txt")
with open(stats_file, 'w', encoding='utf-8') as f:
f.write(stats_content)
# 生成平码统计平均概率文件
total_draws = len(self.data)
prob_content = f"{self.data[-1]['period']}期 平码统计平均概率\n"
prob_content += "=" * 60 + "\n"
# 计算每个号码的出现概率
number_probs = []
for num in range(1, 50):
prob = number_counts.get(num, 0) / (total_draws * 6) # 每个号码在每期6个平码中的出现概率
number_probs.append((num, prob))
# 按概率从高到低排序
number_probs.sort(key=lambda x: x[1], reverse=True)
cumulative_prob = 0.0
for idx, (num, prob) in enumerate(number_probs[:16], 1): # 只显示前16个
cumulative_prob += prob
prob_content += f"{idx:2d}. 号码: {num:02d} \t 概率: {prob:.2%} \t 累积概率: {cumulative_prob:.2%}\n"
prob_content += "=" * 60 + "\n"
prob_file = self.get_output_path("平码统计平均概率.txt")
with open(prob_file, 'w', encoding='utf-8') as f:
f.write(prob_content)
logger.info("已生成平码统计文件:")
logger.info(f"- {stats_file}")
logger.info(f"- {prob_file}")
def save_results(self, all_probs, next_period):
"""保存预测结果"""
numbers = np.arange(1, 50)
# 确保输出目录存在
self.ensure_output_dir()
# 主预测结果
result_str = f"预测期号: {next_period}\n"
result_str += "=" * 60 + "\n"
for num_idx in range(6):
probs = all_probs[num_idx]
top16_idx = np.argsort(probs)[::-1][:16]
result_str += f"平码{num_idx+1}预测 (Top-16):\n"
cumulative_prob = 0.0
for idx, pos in enumerate(top16_idx, 1):
num = numbers[pos]
prob = probs[pos]
cumulative_prob += prob
result_str += f"{idx:2d}. 号码: {num:02d} \t 概率: {prob:.2%} \t 累积概率: {cumulative_prob:.2%}\n"
result_str += "-" * 60 + "\n"
result_str += "=" *60 + "\n"
result_str += f"生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
# 保存文件
predictions_file = self.get_output_path("predictions.txt")
with open(predictions_file, 'w', encoding='utf-8') as f:
f.write(result_str)
# 精简版预测结果
result_str1 = f"\n\n预测期号: {next_period}\n"
result_str1 += "以下为本次预测的16个最可能号码(每个平码位置):\n"
result_str1 += "=" * 65 + "\n\n"
for num_idx in range(6):
probs = all_probs[num_idx]
top16_idx = np.argsort(probs)[::-1][:16]
result_str1 += f"平码{num_idx+1}: "
for pos in top16_idx:
num = numbers[pos]
result_str1 += f"{num:02d}, "
result_str1 += "\n\n"
result_str1 += "=" * 65 + "\n"
result_str1 += "祝您好运!天天中大奖!\n"
result_str1 += f"\n生成时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
pure_file = self.get_output_path("纯平码.txt")
with open(pure_file, 'w', encoding='utf-8') as f:
f.write(result_str1)
# 全号码预测文件
result_str2 = f"\n\n预测期号: {next_period}\n"
result_str2 += "所有号码预测概率:\n"
result_str2 += "=" * 60 + "\n"
for num_idx in range(6):
probs = all_probs[num_idx]
top49_idx = np.argsort(probs)[::-1]
result_str2 += f"\n平码{num_idx+1}预测概率:\n"
for idx, pos in enumerate(top49_idx, 1):
num = numbers[pos]
prob = probs[pos]
result_str2 += f"{idx:2d}. 号码: {num:02d} \t 概率: {prob:.2%}\n"
result_str2 += "-" * 60 + "\n"
result_str2 += "=" * 60 + "\n"
all_predictions_file = self.get_output_path("49_predictions.txt")
with open(all_predictions_file, 'w', encoding='utf-8') as f:
f.write(result_str2)
# 生成统计文件
self.generate_statistics_files()
logger.info("\n预测结果已保存到:")
logger.info(f"- {predictions_file}")
logger.info(f"- {pure_file}")
logger.info(f"- {all_predictions_file}")
def run(self, data_file):
"""运行预测流程"""
try:
print("\n\n 平码预测系统高级版\n 本系统测试16个号码理论准确率为85% \n\n")
# 加载数据
self.load_data(data_file)
# 生成特征
X, y, periods = self.generate_features()
if X is None:
logger.error("无法生成特征,程序终止")
return
# 数据划分
split_idx = int(len(X) * 0.8)
X_train, X_test = X[:split_idx], X[split_idx:]
y_train, y_test = y[:split_idx], y[split_idx:]
periods_test = periods[split_idx:]
# 初始化模型
self.initialize_models()
# 训练模型
try:
xgb_scores, lgb_scores = self.train_models(X_train, y_train)
except Exception as e:
logger.error(f"模型训练失败: {str(e)}")
# 使用默认评分继续
xgb_scores, lgb_scores = [0.5]*6, [0.5]*6
# 预测
all_probs = self.predict(X_test, periods_test, xgb_scores, lgb_scores)
# 保存结果
self.save_results(all_probs, self.next_period)
logger.info("\n祝您好运!天天中大奖!\n")
except Exception as e:
logger.error(f"运行过程中出错: {str(e)}", exc_info=True)
raise
if __name__ == "__main__":
predictor = LotteryPredictor()
predictor.run('history_data.txt')
推荐电脑配置
CPU:八核3.0GHz+
内存:16GB
GPU:NVIDIA GTX 2080及以上(8GB显存+)
SSD存储
历史数据越大要求越高
|
|