Deep Knowledge Tracing: A PyTorch Version
知识追踪(Knowledge Tracing)是根据学生过去的答题情况对学生的知识掌握情况进行建模,从而得到学生当前知识状态表示的一种技术。将深度学习的方法引入知识追踪最早出现于发表在NeurIPS 2015上的一篇论文《Deep Knowledge Tracing》,作者来自斯坦福大学。在这篇论文中,作者提出了使用深度知识追踪(Deep Knowledge Tracing, DKT)的概念,利用RNN对学生的学习情况进行建模,之后引出了一系列工作,2019年已经有使用Transformer代替RNN和LSTM并且达到了SOTA的论文。DKT作为知识追踪模型深度化的开山之作,在几乎所有的深度知识追踪模型中都作为baseline,而DKT作者给出的模型实现是基于lua语言的,为了能够让更多的研究人员更方便的使用,这里给出一种python的实现,采用的是pytorch框架。
下载
模型代码已经发布在github上,可点击这里查看和下载具体代码。
或者可以直接通过如下命令直接下载到本地:
git clone https://github.com/pydaxing/DeepKnowledgeTracing-DKT-Pytorch.git
具体运行和使用方法参考GitHub项目上ReadMe。
项目结构-DKT
在DKT文件夹下包括两个文件夹:KTDataset和KnowledgeTracing。
KTDataset文件夹下有6个常用的知识追踪数据集,数据都已经处理成三行格式:
第一行:答题数 第二行:题目编号 第三行:答题结果,0表示错,1表示对
Note:可根据需要,按照数据格式自行添加新的数据集。
模型结构-KnowledgeTracing
模型的整个流程都在KnowledgeTracing目录下,包括模型、参数设置、数据处理、模型训练和评估,分别在四个子目录下:model, Constant,data,evaluation。
参数设置-Constant
Constant下主要设置一些参数和超参数,超参数也分为四大块:数据集存储路径、数据集、题目数、模型超参数。
数据集存储路径
Dpath = '../../KTDataset'
数据集:一共包括6个数据集
datasets = {
'assist2009' : 'assist2009',
'assist2015' : 'assist2015',
'assist2017' : 'assist2017',
'static2011' : 'static2011',
'kddcup2010' : 'kddcup2010',
'synthetic' : 'synthetic'
}
题目数:表示每个数据集里面题目的数量
numbers = {
'assist2009' : 124,
'assist2015' : 100,
'assist2017' : 102,
'static2011' : 1224,
'kddcup2010' : 661,
'synthetic' : 50
}
模型超参数:主要包括所用数据集、输入输出维度、学习率、最大步长、学习周期等。
DATASET = datasets['static2011']
NUM_OF_QUESTIONS = numbers['static2011']
# the max step of RNN model
MAX_STEP = 50
BATCH_SIZE = 64
LR = 0.002
EPOCH = 1000
#input dimension
INPUT = NUM_OF_QUESTIONS * 2
# embedding dimension
EMBED = NUM_OF_QUESTIONS
# hidden layer dimension
HIDDEN = 200
# nums of hidden layers
LAYERS = 1
# output dimension
OUTPUT = NUM_OF_QUESTIONS
模型实现-model
模型在model目录下的RNNModel.py文件中实现,模型实际上就是一个简单的LSTM网络,其结构跟DKT原文中所讲述的结构一致,在LSTM模型最后添加了一个线性层和一个sigmoid激活函数。
class DKT(nn.Module):
def __init__(self, input_dim, hidden_dim, layer_dim, output_dim):
super(DKT, self).__init__()
self.hidden_dim = hidden_dim
self.layer_dim = layer_dim
self.output_dim = output_dim
self.rnn = nn.RNN(input_dim, hidden_dim, layer_dim, batch_first=True,nonlinearity='tanh')
self.fc = nn.Linear(self.hidden_dim, self.output_dim)
self.sig = nn.Sigmoid()
def forward(self, x):
h0 = Variable(torch.zeros(self.layer_dim, x.size(0), self.hidden_dim))
out,hn = self.rnn(x, h0)
res = self.sig(self.fc(out))
return res
数据处理-data
在data目录下包括三个文件:readdata.py、DKTDataSet.py、dataloader.py。它们的作用分别是定义数据的读取、pytorch框架下的数据集定义、以及pytorch框架下的dataloader的构造。
readata: 在readata.py文件中,定义了一个类:DataReader,从名字可以看出这是一个用来读取数据的类。其中包含两个函数getTrainData()和getTestData(),分别是用来读取训练数据和测试数据。两个函数的定义其实一模一样,只是名字不一样用来区分训练和测试数据,这样的写法有些冗余,后面会再做一些优化。
class DataReader():
def __init__(self, path, maxstep, numofques):
self.path = path
self.maxstep = maxstep
self.numofques = numofques
def getTrainData(self):
...
def getTestData(self):
...
DataReader类有三个参数:
path: 数据文件存储路径 maxstep: 最大序列长度 numofques: 此数据集中所有题目的总个数(去重后)
获取与处理数据部分,以getTrainData()函数为例,getTestData()与其一样。
def getTrainData(self):
trainqus = np.array([])
trainans = np.array([])
with open(self.path, 'r') as train:
for len, ques, ans in tqdm.tqdm(itertools.zip_longest(*[train] * 3), desc='loading train data: ', mininterval=2):
len = int(len.strip().strip(','))
ques = np.array(ques.strip().strip(',').split(',')).astype(np.int)
ans = np.array(ans.strip().strip(',').split(',')).astype(np.int)
mod = 0 if len%self.maxstep == 0 else (self.maxstep - len%self.maxstep)
zero = np.zeros(mod) - 1
ques = np.append(ques, zero)
ans = np.append(ans, zero)
trainqus = np.append(trainqus, ques).astype(np.int)
trainans = np.append(trainans, ans).astype(np.int)
return trainqus.reshape([-1, self.maxstep]), trainans.reshape([-1, self.maxstep])
在getTrainData()中,首先定义两个numpy数组trainqus和trainans,前者存储题目编号,后者存储对应的答题结果。然后打开文件开始读取数据。
因为数据是三行格式的,所以每一次读取三行,每次读取三行的实现方式如下:
for len, ques, ans in tqdm.tqdm(itertools.zip_longest(*[train] * 3), desc='loading train data: ', mininterval=2)
其中tqdm是进度条展示,可忽略,简化来看每次读取三行的方法如下:
for len, ques, ans in itertools.zip_longest(*[train] * 3)
然后是对三行数据进行字符串处理,分别得到题目编号以及对应的答题结果:
ques = np.array(ques.strip().strip(',').split(',')).astype(np.int)
ans = np.array(ans.strip().strip(',').split(',')).astype(np.int)
然后是处理长度不一致的问题,将所有答题序列的长度都处理成maxstep的整数倍,长度不够的补0。
mod = 0 if len%self.maxstep == 0 else (self.maxstep - len%self.maxstep)
zero = np.zeros(mod) - 1
ques = np.append(ques, zero)
ans = np.append(ans, zero)
举例:ques长度为18,设置maxstep为5,那么ques补充成maxstep的整数倍应该是4倍为20,所以ques应该补充两个0变成长度为20的序列;如果ques长度为11,那么补充4个0,长度变成15;ques长度为10,则不补充。
每一个ques的长度处理成maxstep的整数倍之后,添加到trainques数组中去,这样每一次添加都保证了trainques的长度为maxstep的整数倍。ans以及trainans的处理过程一样。
trainqus = np.append(trainqus, ques).astype(np.int)
trainans = np.append(trainans, ans).astype(np.int)
最后对trainques和trainans进行reshape,处理成N*maxstep的矩阵形式,N即可看做学生个数。maxstep即为答题个数。
举例,数据形式的变化过程,比如设置maxstep为3,总题目数为5,现在有如下三个学生的原始答题记录: 学生1: 2 1 2 1 0 学生2: 4 2 4 1 3 0 1 1 0 学生3: 7 5 3 1 4 5 4 2 0 0 1 1 0 1 0
ques通过readata读取并处理之后会变成: 1 2 0 2 4 1 3 0 0 5 3 1 4 5 4 2 0 0
DKTDataSet:要定义pytorch框架下的数据集,需要继承torch的Dataset类,覆写__init__、__len__以及__getitem__三个函数。还可以根据需要自己添加数据处理的函数,在DKTDataSet中添加的one-hot处理函数。
class DKTDataSet(Dataset):
def __init__(self, ques, ans):
...
def __len__(self):
...
def __getitem__(self, index):
...
def onehot(self, questions, answers):
...
在readdata处理好数据之后,我们在DKTDataSet中对其进行封装处理,直接返回题目的one-hot形式而不再是题目编号。
在__init__中做一些初始化操作,比入读进数据ques和ans,前者是题目编号,后者是答题结果。
def __init__(self, ques, ans):
self.ques = ques
self.ans = ans
__len__返回数据集的长度(大小),这里直接返回ques或者ans的行数,也就是学生数。
def __len__(self):
return len(self.ques)
__getitem__返回需要获取的某条数据,这里根据index参数直接返回对应的数据即可,这里我们返回前将数据通过自定义的onehot函数处理成one-hot的形式,并且将数据类型转换为FloatTensor。
def __getitem__(self, index):
questions = self.ques[index]
answers = self.ans[index]
onehot = self.onehot(questions, answers)
return torch.FloatTensor(onehot.tolist())
__onehot__是自定义的将题目编号转变成one-hot形式的函数。
def onehot(self, questions, answers):
result = np.zeros(shape=[C.MAX_STEP, 2 * C.NUM_OF_QUESTIONS])
for i in range(C.MAX_STEP):
if answers[i] > 0:
result[i][questions[i]] = 1
elif answers[i] == 0:
result[i][questions[i] + C.NUM_OF_QUESTIONS] = 1
return result
与原文保持一致,one-hot的维度为两倍的总题目数,所以对于readata中处理好的每一条记录ques,将变成[C.MAX_STEP, 2 * C.NUM_OF_QUESTIONS]大小的矩阵,因为每条记录ques中包含C.MAX_STEP个题目,每个题目的onehot维度为2 * C.NUM_OF_QUESTIONS。
接着readata中的例子,ques在DKTDataSet中转变成onehot形式之后,数据的形式变成: [[1 0 0 0 0 0 0 0 0 0] -> 1 [0 0 0 0 0 0 1 0 0 0] -> 2 [0 0 0 0 0 0 0 0 0 0] -> 0 [0 0 0 0 0 0 1 0 0 0] -> 2 [0 0 0 1 0 0 0 0 0 0] -> 4 [1 0 0 0 0 0 0 0 0 0] -> 1 …]
dataloader:在dataloader.py中,包含一个训练数据的loader和一个测试数据的loader,分别是getTrainLoader和getTestLoader,实际上这两个loader的实现一模一样,只是去了两个不同的名字为了区分训练和测试数据,这样的方式比较冗余,后面的版本会进行优化。
def getTrainLoader(train_data_path):
handle = DataReader(train_data_path ,C.MAX_STEP, C.NUM_OF_QUESTIONS)
trainques, trainans = handle.getTrainData()
dtrain = DKTDataSet(trainques, trainans)
trainLoader = Data.DataLoader(dtrain, batch_size=C.BATCH_SIZE, shuffle=True)
return trainLoader
def getTestLoader(test_data_path):
handle = DataReader(test_data_path, C.MAX_STEP, C.NUM_OF_QUESTIONS)
testques, testans = handle.getTestData()
dtest = DKTDataSet(testques, testans)
testLoader = Data.DataLoader(dtest, batch_size=C.BATCH_SIZE, shuffle=False)
return testLoader
关于如何定义loader就不做过多介绍,关于pytorch的dataloader的相关文章有很多。
在dataloader.py中还有一个函数:getLoader,这个函数封装了getTrainLoader和getTestLoader,通过调用此函数直接获取训练和测试的loader。并且函数的参数是数据集的名称,根据数据集名称分别为不同的数据集构造loader。
def getLoader(dataset):
trainLoaders = []
testLoaders = []
if dataset == 'assist2009':
trainLoader = getTrainLoader(C.Dpath + '/assist2009/builder_train.csv')
trainLoaders.append(trainLoader)
testLoader = getTestLoader(C.Dpath + '/assist2009/builder_test.csv')
testLoaders.append(testLoader)
elif dataset == 'assist2015':
trainLoader = getTrainLoader(C.Dpath + '/assist2015/assist2015_train.txt')
trainLoaders.append(trainLoader)
testLoader = getTestLoader(C.Dpath + '/assist2015/assist2015_test.txt')
testLoaders.append(testLoader)
...
模型训练与测试-evaluation
在evaluation目录下,有两个文件,一个是eval.py文件,主要实现模型的训练和测试以及品谷的过程;另一个是run.py文件,是主程序入口。
eval:在eval.py文件中,定义了两个函数train和test分别实现模型的训练和测试:
def train(trainLoaders, model, optimizer, lossFunc):
for i in range(len(trainLoaders)):
model, optimizer = train_epoch(model, trainLoaders[i], optimizer, lossFunc)
return model, optimizer
def test(testLoaders, model):
ground_truth = torch.Tensor([])
prediction = torch.Tensor([])
for i in range(len(testLoaders)):
pred_epoch, gold_epoch = test_epoch(model, testLoaders[i])
prediction = torch.cat([prediction, pred_epoch])
ground_truth = torch.cat([ground_truth, gold_epoch])
performance(ground_truth, prediction)
而训练过程有分为很多epoch,每一个epoch的过程在train_epoch中实现。而对于测试过程,由于某些测试集可能会很大,导致内存一次存不下,所以将测试集分成多个loader,然后对于每一个loader都调用一次test_epoch,然后把所有的loader的结果合并起来。最后,所有的结果拼接起来后,通过performance函数计算模型的各个评价指标。
...
prediction = torch.cat([prediction, pred_epoch])
ground_truth = torch.cat([ground_truth, gold_epoch])
performance(ground_truth, prediction)
对于train_epoch,过程跟一般的pytorch模型训练过程一样,读取数据loader、预测、计算损失、反向传播等:
def train_epoch(model, trainLoader, optimizer, loss_func):
for batch in tqdm.tqdm(trainLoader, desc='Training: ', mininterval=2):
pred = model(batch)
loss = loss_func(pred, batch)
optimizer.zero_grad()
loss.backward()
optimizer.step()
return model, optimizer
对于test_epoch,由于知识追踪任务比较特殊,每一个时刻的输出都是预测下一个时刻答对题目的概率,因此有一些额外的处理。先上代码:
def test_epoch(model, testLoader):
gold_epoch = torch.Tensor([])
pred_epoch = torch.Tensor([])
for batch in tqdm.tqdm(testLoader, desc='Testing: ', mininterval=2):
pred = model(batch)
for student in range(pred.shape[0]):
temp_pred = torch.Tensor([])
temp_gold = torch.Tensor([])
delta = batch[student][:,0:C.NUM_OF_QUESTIONS] + batch[student][:,C.NUM_OF_QUESTIONS:]
temp = pred[student][:C.MAX_STEP - 1].mm(delta[1:].t())
index = torch.LongTensor([[i for i in range(C.MAX_STEP - 1)]])
p = temp.gather(0, index)[0]
a = (((batch[student][:, 0:C.NUM_OF_QUESTIONS] - batch[student][:, C.NUM_OF_QUESTIONS:]).sum(1) + 1)//2)[1:]
for i in range(len(p)):
if p[i] > 0:
temp_pred = torch.cat([temp_pred,p[i:i+1]])
temp_gold = torch.cat([temp_gold, a[i:i+1]])
pred_epoch = torch.cat([pred_epoch, temp_pred])
gold_epoch = torch.cat([gold_epoch, temp_gold])
return pred_epoch, gold_epoch
在test_epoch函数中,先定义两个列表,分别用来存储真实结果ground truth 和预测的结果pred:
gold_epoch = torch.Tensor([])
pred_epoch = torch.Tensor([])
然后读取数据,分多个batch进行预测,因为一次预测可能数据量过大导致内存溢出而出错。Note:每一个batch中包含多个学生,每个学生有maxstep个题目,每个题目表示成了2*num_of_ques维的onehot向量。
for batch in tqdm.tqdm(testLoader, desc='Testing: ', mininterval=2):
pred = model(batch)
预测完之后,整理数据,把学生所有的题目的预测结果存储起来,方便后面的评估。对于每一个学生,先创建两个列表,分别存储真是答题结果ground truth和预测结果pred。然后再将每个学生的结果添加进开始定义的两个总结果列表gold_epoch和pred_epoch中去。
for student in range(pred.shape[0]):
temp_pred = torch.Tensor([])
temp_gold = torch.Tensor([])
然后是获取预测结果,这里先将2*num_of_ques维的题目onehot向量分成前后两个部分,每部分分别是num_of_ques维,然后相加,乘以预测结果,即可得到对应的题目的预测结果,这里的计算过程可自行推敲,等有机会再给出可视化的计算过程。因为每一个时刻都是预测的下一个时刻的结果,所以题目编号需要向后移一个,体现在delta[1:]这里:
delta = batch[student][:,0:C.NUM_OF_QUESTIONS] + batch[student][:,C.NUM_OF_QUESTIONS:]
temp = pred[student][:C.MAX_STEP - 1].mm(delta[1:].t())
index = torch.LongTensor([[i for i in range(C.MAX_STEP - 1)]])
p = temp.gather(0, index)[0]
对于答题的真实结果,其实在onehot的向量中就已经体现了,答对则向量前半部分对应的位置为1,答错则向量后半部分对应的位置为1。根据这个特点,按照下面的方式就可以直接通过onehot向量推出真实答题结果:
a = (((batch[student][:, 0:C.NUM_OF_QUESTIONS] - batch[student][:, C.NUM_OF_QUESTIONS:]).sum(1) + 1)//2)[1:]
到此处为止,预测结果和真实结果就已经都得到了。但是,这里还要在做一个筛选,别忘了我们之前在数据长度不够的时候是补0了的,这里需要把补0的结果全部都过滤掉。由于补零的题目的onehot向量为全零向量,那么全零向量经过神经网络之后预测结果肯定为0。而正常题目不是非零的,那么预测结果为0的可能性极小,因为神经网络参数为0的可能性极小。所以我们根据预测结果是否为0,直接把为0的全部去除掉(我们这里的处理方法似乎不是很合理,因为正常题目也是有可能出现预测结果为0的情况,但是这种可能性极小,对模型整体而言几乎没什么影响,所以这么做也是合理的,并且十分方便):
if p[i] > 0:
temp_pred = torch.cat([temp_pred,p[i:i+1]])
temp_gold = torch.cat([temp_gold, a[i:i+1]])
在每次处理完一个学生的数据之后,将其添加到总结果列表中去:
pred_epoch = torch.cat([pred_epoch, temp_pred])
gold_epoch = torch.cat([gold_epoch, temp_gold])
最后返回结果即可。
在eval.py文件中还定义了一个损失函数类lossFunc,基于pytorch框架的自定义的损失函数。其实这个损失函数就是分类问题中常用的交叉熵函数,只是知识追踪问题的数据是序列化的,所以这里不太方便直接调用pytorch框架中已有的交叉熵函数,自己按需实现了一下,里面涉及的一些过程和test_epoch中的部分过程类似:
class lossFunc(nn.Module):
def __init__(self):
super(lossFunc, self).__init__()
def forward(self, pred, batch):
loss = torch.Tensor([0.0])
for student in range(pred.shape[0]):
delta = batch[student][:,0:C.NUM_OF_QUESTIONS] + batch[student][:,C.NUM_OF_QUESTIONS:]
temp = pred[student][:C.MAX_STEP - 1].mm(delta[1:].t())
index = torch.LongTensor([[i for i in range(C.MAX_STEP - 1)]])
p = temp.gather(0, index)[0]
a = (((batch[student][:, 0:C.NUM_OF_QUESTIONS] - batch[student][:, C.NUM_OF_QUESTIONS:]).sum(1) + 1)//2)[1:]
for i in range(len(p)):
if p[i] > 0:
loss = loss - (a[i]*torch.log(p[i]) + (1-a[i])*torch.log(1-p[i]))
return loss
最后,eval.py文件中包含一个performance函数,从名字就可以看出这个函数用来评价模型的表现,也就是计算预测结果的各个指标,包括AUC、F1、Recall、Precision,可以根据需要自行添加,计算方式可自定义或者直接掉包:
def performance(ground_truth, prediction):
fpr, tpr, thresholds = metrics.roc_curve(ground_truth.detach().numpy(), prediction.detach().numpy())
auc = metrics.auc(fpr, tpr)
f1 = metrics.f1_score(ground_truth.detach().numpy(), torch.round(prediction).detach().numpy())
recall = metrics.recall_score(ground_truth.detach().numpy(), torch.round(prediction).detach().numpy())
precision = metrics.precision_score(ground_truth.detach().numpy(), torch.round(prediction).detach().numpy())
print('auc:' + str(auc) + ' f1: ' + str(f1) + ' recall: ' + str(recall) + ' precision: ' + str(precision) + '\n')
到此处为止,DKT项目的所有部分都已介绍完毕。