冷眸

Deep Knowledge Tracing: A PyTorch Version

· 冷眸

知识追踪(Knowledge Tracing)是根据学生过去的答题情况对学生的知识掌握情况进行建模,从而得到学生当前知识状态表示的一种技术。将深度学习的方法引入知识追踪最早出现于发表在NeurIPS 2015上的一篇论文《Deep Knowledge Tracing》,作者来自斯坦福大学。在这篇论文中,作者提出了使用深度知识追踪(Deep Knowledge Tracing, DKT)的概念,利用RNN对学生的学习情况进行建模,之后引出了一系列工作,2019年已经有使用Transformer代替RNN和LSTM并且达到了SOTA的论文。DKT作为知识追踪模型深度化的开山之作,在几乎所有的深度知识追踪模型中都作为baseline,而DKT作者给出的模型实现是基于lua语言的,为了能够让更多的研究人员更方便的使用,这里给出一种python的实现,采用的是pytorch框架。

下载

模型代码已经发布在github上,可点击这里查看和下载具体代码。

或者可以直接通过如下命令直接下载到本地:

git clone https://github.com/pydaxing/DeepKnowledgeTracing-DKT-Pytorch.git

具体运行和使用方法参考GitHub项目上ReadMe。

项目结构-DKT

在DKT文件夹下包括两个文件夹:KTDataset和KnowledgeTracing。

KTDataset文件夹下有6个常用的知识追踪数据集,数据都已经处理成三行格式:

第一行:答题数 第二行:题目编号 第三行:答题结果,0表示错,1表示对

Note:可根据需要,按照数据格式自行添加新的数据集。

模型结构-KnowledgeTracing

模型的整个流程都在KnowledgeTracing目录下,包括模型、参数设置、数据处理、模型训练和评估,分别在四个子目录下:model, Constant,data,evaluation。

参数设置-Constant

Constant下主要设置一些参数和超参数,超参数也分为四大块:数据集存储路径、数据集、题目数、模型超参数。

数据集存储路径

Dpath = '../../KTDataset'

数据集:一共包括6个数据集

datasets = {
    'assist2009' : 'assist2009',
    'assist2015' : 'assist2015',
    'assist2017' : 'assist2017',
    'static2011' : 'static2011',
    'kddcup2010' : 'kddcup2010',
    'synthetic' : 'synthetic'
}

题目数:表示每个数据集里面题目的数量

numbers = {
    'assist2009' : 124,  
    'assist2015' : 100,
    'assist2017' : 102,
    'static2011' : 1224, 
    'kddcup2010' : 661,  
    'synthetic' : 50
}

模型超参数:主要包括所用数据集、输入输出维度、学习率、最大步长、学习周期等。

DATASET = datasets['static2011']
NUM_OF_QUESTIONS = numbers['static2011']
# the max step of RNN model
MAX_STEP = 50
BATCH_SIZE = 64
LR = 0.002
EPOCH = 1000
#input dimension
INPUT = NUM_OF_QUESTIONS * 2
# embedding dimension
EMBED = NUM_OF_QUESTIONS
# hidden layer dimension
HIDDEN = 200
# nums of hidden layers
LAYERS = 1
# output dimension
OUTPUT = NUM_OF_QUESTIONS

模型实现-model

模型在model目录下的RNNModel.py文件中实现,模型实际上就是一个简单的LSTM网络,其结构跟DKT原文中所讲述的结构一致,在LSTM模型最后添加了一个线性层和一个sigmoid激活函数。

class DKT(nn.Module):
    def __init__(self, input_dim, hidden_dim, layer_dim, output_dim):
        super(DKT, self).__init__()
        self.hidden_dim = hidden_dim
        self.layer_dim = layer_dim
        self.output_dim = output_dim
        self.rnn = nn.RNN(input_dim, hidden_dim, layer_dim, batch_first=True,nonlinearity='tanh')
        self.fc = nn.Linear(self.hidden_dim, self.output_dim)
        self.sig = nn.Sigmoid()

    def forward(self, x):
        h0 = Variable(torch.zeros(self.layer_dim, x.size(0), self.hidden_dim))
        out,hn = self.rnn(x, h0)
        res = self.sig(self.fc(out))
        return res

数据处理-data

在data目录下包括三个文件:readdata.py、DKTDataSet.py、dataloader.py。它们的作用分别是定义数据的读取、pytorch框架下的数据集定义、以及pytorch框架下的dataloader的构造。

readata: 在readata.py文件中,定义了一个类:DataReader,从名字可以看出这是一个用来读取数据的类。其中包含两个函数getTrainData()和getTestData(),分别是用来读取训练数据和测试数据。两个函数的定义其实一模一样,只是名字不一样用来区分训练和测试数据,这样的写法有些冗余,后面会再做一些优化。

class DataReader():
    def __init__(self, path, maxstep, numofques):
        self.path = path
        self.maxstep = maxstep
        self.numofques = numofques

    def getTrainData(self):
        ...

    def getTestData(self):
        ...

DataReader类有三个参数:

path: 数据文件存储路径 maxstep: 最大序列长度 numofques: 此数据集中所有题目的总个数(去重后)

获取与处理数据部分,以getTrainData()函数为例,getTestData()与其一样。

def getTrainData(self):
    trainqus = np.array([])
    trainans = np.array([])
    with open(self.path, 'r') as train:
        for len, ques, ans in tqdm.tqdm(itertools.zip_longest(*[train] * 3), desc='loading train data:    ', mininterval=2):
            len = int(len.strip().strip(','))
            ques = np.array(ques.strip().strip(',').split(',')).astype(np.int)
            ans = np.array(ans.strip().strip(',').split(',')).astype(np.int)
            mod = 0 if len%self.maxstep == 0 else (self.maxstep - len%self.maxstep)
            zero = np.zeros(mod) - 1
            ques = np.append(ques, zero)
            ans = np.append(ans, zero)
            trainqus = np.append(trainqus, ques).astype(np.int)
            trainans = np.append(trainans, ans).astype(np.int)
    return trainqus.reshape([-1, self.maxstep]), trainans.reshape([-1, self.maxstep])

在getTrainData()中,首先定义两个numpy数组trainqus和trainans,前者存储题目编号,后者存储对应的答题结果。然后打开文件开始读取数据。

因为数据是三行格式的,所以每一次读取三行,每次读取三行的实现方式如下:

for len, ques, ans in tqdm.tqdm(itertools.zip_longest(*[train] * 3), desc='loading train data:    ', mininterval=2)

其中tqdm是进度条展示,可忽略,简化来看每次读取三行的方法如下:

for len, ques, ans in itertools.zip_longest(*[train] * 3)

然后是对三行数据进行字符串处理,分别得到题目编号以及对应的答题结果:

ques = np.array(ques.strip().strip(',').split(',')).astype(np.int)
ans = np.array(ans.strip().strip(',').split(',')).astype(np.int)

然后是处理长度不一致的问题,将所有答题序列的长度都处理成maxstep的整数倍,长度不够的补0。

mod = 0 if len%self.maxstep == 0 else (self.maxstep - len%self.maxstep)
zero = np.zeros(mod) - 1
ques = np.append(ques, zero)
ans = np.append(ans, zero)

举例:ques长度为18,设置maxstep为5,那么ques补充成maxstep的整数倍应该是4倍为20,所以ques应该补充两个0变成长度为20的序列;如果ques长度为11,那么补充4个0,长度变成15;ques长度为10,则不补充。

每一个ques的长度处理成maxstep的整数倍之后,添加到trainques数组中去,这样每一次添加都保证了trainques的长度为maxstep的整数倍。ans以及trainans的处理过程一样。

trainqus = np.append(trainqus, ques).astype(np.int)
trainans = np.append(trainans, ans).astype(np.int)

最后对trainques和trainans进行reshape,处理成N*maxstep的矩阵形式,N即可看做学生个数。maxstep即为答题个数。

举例,数据形式的变化过程,比如设置maxstep为3,总题目数为5,现在有如下三个学生的原始答题记录: 学生1: 2 1 2 1 0 学生2: 4 2 4 1 3 0 1 1 0 学生3: 7 5 3 1 4 5 4 2 0 0 1 1 0 1 0

ques通过readata读取并处理之后会变成: 1 2 0 2 4 1 3 0 0 5 3 1 4 5 4 2 0 0

DKTDataSet:要定义pytorch框架下的数据集,需要继承torch的Dataset类,覆写__init__、__len__以及__getitem__三个函数。还可以根据需要自己添加数据处理的函数,在DKTDataSet中添加的one-hot处理函数。

class DKTDataSet(Dataset):
    def __init__(self, ques, ans):
        ...

    def __len__(self):
        ...

    def __getitem__(self, index):
        ...

    def onehot(self, questions, answers):
        ...

在readdata处理好数据之后,我们在DKTDataSet中对其进行封装处理,直接返回题目的one-hot形式而不再是题目编号。

在__init__中做一些初始化操作,比入读进数据ques和ans,前者是题目编号,后者是答题结果。

def __init__(self, ques, ans):
    self.ques = ques
    self.ans = ans

__len__返回数据集的长度(大小),这里直接返回ques或者ans的行数,也就是学生数。

def __len__(self):
    return len(self.ques)

__getitem__返回需要获取的某条数据,这里根据index参数直接返回对应的数据即可,这里我们返回前将数据通过自定义的onehot函数处理成one-hot的形式,并且将数据类型转换为FloatTensor。

def __getitem__(self, index):
    questions = self.ques[index]
    answers = self.ans[index]
    onehot = self.onehot(questions, answers)
    return torch.FloatTensor(onehot.tolist())

__onehot__是自定义的将题目编号转变成one-hot形式的函数。

def onehot(self, questions, answers):
    result = np.zeros(shape=[C.MAX_STEP, 2 * C.NUM_OF_QUESTIONS])
    for i in range(C.MAX_STEP):
        if answers[i] > 0:
            result[i][questions[i]] = 1
        elif answers[i] == 0:
            result[i][questions[i] + C.NUM_OF_QUESTIONS] = 1
    return result

与原文保持一致,one-hot的维度为两倍的总题目数,所以对于readata中处理好的每一条记录ques,将变成[C.MAX_STEP, 2 * C.NUM_OF_QUESTIONS]大小的矩阵,因为每条记录ques中包含C.MAX_STEP个题目,每个题目的onehot维度为2 * C.NUM_OF_QUESTIONS。

接着readata中的例子,ques在DKTDataSet中转变成onehot形式之后,数据的形式变成: [[1 0 0 0 0 0 0 0 0 0] -> 1  [0 0 0 0 0 0 1 0 0 0] -> 2  [0 0 0 0 0 0 0 0 0 0] -> 0  [0 0 0 0 0 0 1 0 0 0] -> 2  [0 0 0 1 0 0 0 0 0 0] -> 4  [1 0 0 0 0 0 0 0 0 0] -> 1  …]

dataloader:在dataloader.py中,包含一个训练数据的loader和一个测试数据的loader,分别是getTrainLoader和getTestLoader,实际上这两个loader的实现一模一样,只是去了两个不同的名字为了区分训练和测试数据,这样的方式比较冗余,后面的版本会进行优化。

def getTrainLoader(train_data_path):
    handle = DataReader(train_data_path ,C.MAX_STEP, C.NUM_OF_QUESTIONS)
    trainques, trainans = handle.getTrainData()
    dtrain = DKTDataSet(trainques, trainans)
    trainLoader = Data.DataLoader(dtrain, batch_size=C.BATCH_SIZE, shuffle=True)
    return trainLoader

def getTestLoader(test_data_path):
    handle = DataReader(test_data_path, C.MAX_STEP, C.NUM_OF_QUESTIONS)
    testques, testans = handle.getTestData()
    dtest = DKTDataSet(testques, testans)
    testLoader = Data.DataLoader(dtest, batch_size=C.BATCH_SIZE, shuffle=False)
    return testLoader

关于如何定义loader就不做过多介绍,关于pytorch的dataloader的相关文章有很多。

在dataloader.py中还有一个函数:getLoader,这个函数封装了getTrainLoader和getTestLoader,通过调用此函数直接获取训练和测试的loader。并且函数的参数是数据集的名称,根据数据集名称分别为不同的数据集构造loader。

def getLoader(dataset):
    trainLoaders = []
    testLoaders = []
    if dataset == 'assist2009':
        trainLoader = getTrainLoader(C.Dpath + '/assist2009/builder_train.csv')
        trainLoaders.append(trainLoader)
        testLoader = getTestLoader(C.Dpath + '/assist2009/builder_test.csv')
        testLoaders.append(testLoader)
    elif dataset == 'assist2015':
        trainLoader = getTrainLoader(C.Dpath + '/assist2015/assist2015_train.txt')
        trainLoaders.append(trainLoader)
        testLoader = getTestLoader(C.Dpath + '/assist2015/assist2015_test.txt')
        testLoaders.append(testLoader)
    ...

模型训练与测试-evaluation

在evaluation目录下,有两个文件,一个是eval.py文件,主要实现模型的训练和测试以及品谷的过程;另一个是run.py文件,是主程序入口。

eval:在eval.py文件中,定义了两个函数train和test分别实现模型的训练和测试:

def train(trainLoaders, model, optimizer, lossFunc):
    for i in range(len(trainLoaders)):
        model, optimizer = train_epoch(model, trainLoaders[i], optimizer, lossFunc)
    return model, optimizer

def test(testLoaders, model):
    ground_truth = torch.Tensor([])
    prediction = torch.Tensor([])
    for i in range(len(testLoaders)):
        pred_epoch, gold_epoch = test_epoch(model, testLoaders[i])
        prediction = torch.cat([prediction, pred_epoch])
        ground_truth = torch.cat([ground_truth, gold_epoch])
    performance(ground_truth, prediction)

而训练过程有分为很多epoch,每一个epoch的过程在train_epoch中实现。而对于测试过程,由于某些测试集可能会很大,导致内存一次存不下,所以将测试集分成多个loader,然后对于每一个loader都调用一次test_epoch,然后把所有的loader的结果合并起来。最后,所有的结果拼接起来后,通过performance函数计算模型的各个评价指标。

...
    prediction = torch.cat([prediction, pred_epoch])
    ground_truth = torch.cat([ground_truth, gold_epoch])
performance(ground_truth, prediction)

对于train_epoch,过程跟一般的pytorch模型训练过程一样,读取数据loader、预测、计算损失、反向传播等:

def train_epoch(model, trainLoader, optimizer, loss_func):
    for batch in tqdm.tqdm(trainLoader, desc='Training:    ', mininterval=2):
        pred = model(batch)
        loss = loss_func(pred, batch)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    return model, optimizer

对于test_epoch,由于知识追踪任务比较特殊,每一个时刻的输出都是预测下一个时刻答对题目的概率,因此有一些额外的处理。先上代码:

def test_epoch(model, testLoader):
    gold_epoch = torch.Tensor([])
    pred_epoch = torch.Tensor([])
    for batch in tqdm.tqdm(testLoader, desc='Testing:    ', mininterval=2):
        pred = model(batch)
        for student in range(pred.shape[0]):
            temp_pred = torch.Tensor([])
            temp_gold = torch.Tensor([])
            delta = batch[student][:,0:C.NUM_OF_QUESTIONS] + batch[student][:,C.NUM_OF_QUESTIONS:]
            temp = pred[student][:C.MAX_STEP - 1].mm(delta[1:].t())
            index = torch.LongTensor([[i for i in range(C.MAX_STEP - 1)]])
            p = temp.gather(0, index)[0]
            a = (((batch[student][:, 0:C.NUM_OF_QUESTIONS] - batch[student][:, C.NUM_OF_QUESTIONS:]).sum(1) + 1)//2)[1:]
            for i in range(len(p)):
                if p[i] > 0:
                    temp_pred = torch.cat([temp_pred,p[i:i+1]])
                    temp_gold = torch.cat([temp_gold, a[i:i+1]])
            pred_epoch = torch.cat([pred_epoch, temp_pred])
            gold_epoch = torch.cat([gold_epoch, temp_gold])
    return pred_epoch, gold_epoch

在test_epoch函数中,先定义两个列表,分别用来存储真实结果ground truth 和预测的结果pred:

gold_epoch = torch.Tensor([])
pred_epoch = torch.Tensor([])

然后读取数据,分多个batch进行预测,因为一次预测可能数据量过大导致内存溢出而出错。Note:每一个batch中包含多个学生,每个学生有maxstep个题目,每个题目表示成了2*num_of_ques维的onehot向量。

for batch in tqdm.tqdm(testLoader, desc='Testing:    ', mininterval=2):
    pred = model(batch)

预测完之后,整理数据,把学生所有的题目的预测结果存储起来,方便后面的评估。对于每一个学生,先创建两个列表,分别存储真是答题结果ground truth和预测结果pred。然后再将每个学生的结果添加进开始定义的两个总结果列表gold_epoch和pred_epoch中去。

for student in range(pred.shape[0]):
    temp_pred = torch.Tensor([])
    temp_gold = torch.Tensor([])

然后是获取预测结果,这里先将2*num_of_ques维的题目onehot向量分成前后两个部分,每部分分别是num_of_ques维,然后相加,乘以预测结果,即可得到对应的题目的预测结果,这里的计算过程可自行推敲,等有机会再给出可视化的计算过程。因为每一个时刻都是预测的下一个时刻的结果,所以题目编号需要向后移一个,体现在delta[1:]这里:

delta = batch[student][:,0:C.NUM_OF_QUESTIONS] + batch[student][:,C.NUM_OF_QUESTIONS:]
temp = pred[student][:C.MAX_STEP - 1].mm(delta[1:].t())
index = torch.LongTensor([[i for i in range(C.MAX_STEP - 1)]])
p = temp.gather(0, index)[0]

对于答题的真实结果,其实在onehot的向量中就已经体现了,答对则向量前半部分对应的位置为1,答错则向量后半部分对应的位置为1。根据这个特点,按照下面的方式就可以直接通过onehot向量推出真实答题结果:

a = (((batch[student][:, 0:C.NUM_OF_QUESTIONS] - batch[student][:, C.NUM_OF_QUESTIONS:]).sum(1) + 1)//2)[1:]

到此处为止,预测结果和真实结果就已经都得到了。但是,这里还要在做一个筛选,别忘了我们之前在数据长度不够的时候是补0了的,这里需要把补0的结果全部都过滤掉。由于补零的题目的onehot向量为全零向量,那么全零向量经过神经网络之后预测结果肯定为0。而正常题目不是非零的,那么预测结果为0的可能性极小,因为神经网络参数为0的可能性极小。所以我们根据预测结果是否为0,直接把为0的全部去除掉(我们这里的处理方法似乎不是很合理,因为正常题目也是有可能出现预测结果为0的情况,但是这种可能性极小,对模型整体而言几乎没什么影响,所以这么做也是合理的,并且十分方便):

if p[i] > 0:
    temp_pred = torch.cat([temp_pred,p[i:i+1]])
    temp_gold = torch.cat([temp_gold, a[i:i+1]])

在每次处理完一个学生的数据之后,将其添加到总结果列表中去:

pred_epoch = torch.cat([pred_epoch, temp_pred])
gold_epoch = torch.cat([gold_epoch, temp_gold])

最后返回结果即可。

在eval.py文件中还定义了一个损失函数类lossFunc,基于pytorch框架的自定义的损失函数。其实这个损失函数就是分类问题中常用的交叉熵函数,只是知识追踪问题的数据是序列化的,所以这里不太方便直接调用pytorch框架中已有的交叉熵函数,自己按需实现了一下,里面涉及的一些过程和test_epoch中的部分过程类似:

class lossFunc(nn.Module):
    def __init__(self):
        super(lossFunc, self).__init__()

    def forward(self, pred, batch):
        loss = torch.Tensor([0.0])
        for student in range(pred.shape[0]):
            delta = batch[student][:,0:C.NUM_OF_QUESTIONS] + batch[student][:,C.NUM_OF_QUESTIONS:]
            temp = pred[student][:C.MAX_STEP - 1].mm(delta[1:].t())
            index = torch.LongTensor([[i for i in range(C.MAX_STEP - 1)]])
            p = temp.gather(0, index)[0]
            a = (((batch[student][:, 0:C.NUM_OF_QUESTIONS] - batch[student][:, C.NUM_OF_QUESTIONS:]).sum(1) + 1)//2)[1:]
            for i in range(len(p)):
                if p[i] > 0:
                    loss = loss - (a[i]*torch.log(p[i]) + (1-a[i])*torch.log(1-p[i]))
        return loss

最后,eval.py文件中包含一个performance函数,从名字就可以看出这个函数用来评价模型的表现,也就是计算预测结果的各个指标,包括AUC、F1、Recall、Precision,可以根据需要自行添加,计算方式可自定义或者直接掉包:

def performance(ground_truth, prediction):
    fpr, tpr, thresholds = metrics.roc_curve(ground_truth.detach().numpy(), prediction.detach().numpy())
    auc = metrics.auc(fpr, tpr)
    f1 = metrics.f1_score(ground_truth.detach().numpy(), torch.round(prediction).detach().numpy())
    recall = metrics.recall_score(ground_truth.detach().numpy(), torch.round(prediction).detach().numpy())
    precision = metrics.precision_score(ground_truth.detach().numpy(), torch.round(prediction).detach().numpy())
    print('auc:' + str(auc) + ' f1: ' + str(f1) + ' recall: ' + str(recall) + ' precision: ' + str(precision) + '\n')

到此处为止,DKT项目的所有部分都已介绍完毕。