1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
| import torch
import torch.nn as nn
import torch.optim as optim
import random
# -------------------
# 数据生成函数
# -------------------
def generate_data(n=1000):
data = []
for _ in range(n):
a = random.randint(0, 9)
b = random.randint(0, 9)
label = 1 if a > b else 0
data.append(([a, b], label))
return data
def generate_equal_data(n=500):
data = []
for _ in range(n):
a = random.randint(0, 9)
b = random.randint(0, 9)
label = 1 if a == b else 0
data.append(([a, b], label))
return data
# -------------------
# 数据转换为Tensor
# -------------------
def to_tensor(data):
x = torch.tensor([item[0] for item in data], dtype=torch.long)
y = torch.tensor([item[1] for item in data], dtype=torch.float32).unsqueeze(1)
return x, y
# -------------------
# 简单Transformer比较器模型
# -------------------
class TransformerComparator(nn.Module):
def __init__(self):
super().__init__()
vocab_size = 10 # 数字范围 0-9
embedding_dim = 32
max_seq_len = 2
self.embedding = nn.Embedding(vocab_size, embedding_dim)
self.position = nn.Parameter(torch.randn(1, max_seq_len, embedding_dim))
encoder_layer = nn.TransformerEncoderLayer(
d_model=embedding_dim,
nhead=2,
dim_feedforward=64,
batch_first=True,
activation="relu"
)
self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=1)
self.classifier = nn.Sequential(
nn.Flatten(),
nn.Linear(embedding_dim * max_seq_len, 1),
nn.Sigmoid()
)
def forward(self, x):
x = self.embedding(x) + self.position # [B, 2, D]
x = self.transformer(x) # [B, 2, D]
return self.classifier(x) # [B, 1]
# -------------------
# 训练函数
# -------------------
def train(model, optimizer, loss_fn, x_train, y_train, epochs=20):
model.train()
for epoch in range(epochs):
pred = model(x_train)
loss = loss_fn(pred, y_train)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if epoch % 5 == 0 or epoch == epochs-1:
print(f"Epoch {epoch}: loss={loss.item():.4f}")
# -------------------
# 测试函数
# -------------------
def test(model, x_test, y_test):
model.eval()
with torch.no_grad():
pred = model(x_test)
acc = ((pred > 0.5) == y_test).float().mean().item()
print(f"Test Accuracy: {acc:.4f}")
return acc
# -------------------
# 主程序
# -------------------
if __name__ == "__main__":
# 1. 训练初始模型判断 a > b
train_data = generate_data(1000)
test_data = generate_data(200)
x_train, y_train = to_tensor(train_data)
x_test, y_test = to_tensor(test_data)
model = TransformerComparator()
loss_fn = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.005)
print("训练判断 a > b 的模型")
train(model, optimizer, loss_fn, x_train, y_train, epochs=30)
test(model, x_test, y_test)
# 保存模型
torch.save(model.state_dict(), "transformer_comparator.pt")
# 2. 微调模型判断 a == b
equal_train_data = generate_equal_data(500)
equal_test_data = generate_equal_data(100)
x_eq_train, y_eq_train = to_tensor(equal_train_data)
x_eq_test, y_eq_test = to_tensor(equal_test_data)
finetune_model = TransformerComparator()
finetune_model.load_state_dict(torch.load("transformer_comparator.pt"))
optimizer_ft = optim.Adam(finetune_model.parameters(), lr=0.001)
loss_fn_ft = nn.BCELoss()
print("微调模型判断 a == b")
for epoch in range(10):
finetune_model.train()
pred = finetune_model(x_eq_train)
loss = loss_fn_ft(pred, y_eq_train)
optimizer_ft.zero_grad()
loss.backward()
optimizer_ft.step()
if epoch % 2 == 0 or epoch == 9:
finetune_model.eval()
with torch.no_grad():
test_pred = finetune_model(x_eq_test)
acc = ((test_pred > 0.5) == y_eq_test).float().mean().item()
print(f"[FT Epoch {epoch}] loss={loss.item():.4f}, accuracy={acc:.4f}")
|