Files
fun-rec/codes/base_models/xDeepFM.py
2021-12-04 10:58:42 +08:00

286 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import pandas as pd
import numpy as np
from tqdm import tqdm
import warnings, random, math, os
from collections import namedtuple, OrderedDict
import tensorflow as tf
from tensorflow.keras.layers import *
from tensorflow.keras.models import *
import tensorflow.keras.backend as K
from tensorflow.python.keras.initializers import (Zeros, glorot_normal,
glorot_uniform)
from tensorflow.python.keras.regularizers import l2
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder, MinMaxScaler, StandardScaler, LabelEncoder
from utils import DenseFeat, SparseFeat, VarLenSparseFeat
# 简单处理特征,包括填充缺失值,数值处理,类别编码
def data_process(data_df, dense_features, sparse_features):
data_df[dense_features] = data_df[dense_features].fillna(0.0)
for f in dense_features:
data_df[f] = data_df[f].apply(lambda x: np.log(x+1) if x > -1 else -1)
data_df[sparse_features] = data_df[sparse_features].fillna("-1")
for f in sparse_features:
lbe = LabelEncoder()
data_df[f] = lbe.fit_transform(data_df[f])
return data_df[dense_features + sparse_features]
# 构建输入层
# 将输入的数据转换成字典的形式定义输入层的时候让输入层的name和字典中特征的key一致就可以使得输入的数据和对应的Input层对应
def build_input_layers(feature_columns):
"""构建Input层字典并以dense和sparse两类字典的形式返回"""
dense_input_dict, sparse_input_dict = {}, {}
for fc in feature_columns:
if isinstance(fc, SparseFeat):
sparse_input_dict[fc.name] = Input(shape=(1, ), name=fc.name, dtype=fc.dtype)
elif isinstance(fc, DenseFeat):
dense_input_dict[fc.name] = Input(shape=(fc.dimension, ), name=fc.name, dtype=fc.dtype)
return dense_input_dict, sparse_input_dict
# 构建embedding层
def build_embedding_layers(feature_columns, input_layer_dict, is_linear):
# 定义一个embedding层对应的字典
embedding_layers_dict = dict()
# 将特征中的sparse特征筛选出来
sparse_features_columns = list(filter(lambda x: isinstance(x, SparseFeat), feature_columns)) if feature_columns else []
# 如果是用于线性部分的embedding层其维度是1否则维度是自己定义的embedding维度
if is_linear:
for fc in sparse_features_columns:
embedding_layers_dict[fc.name] = Embedding(fc.vocabulary_size, 1, name='1d_emb_'+fc.name)
else:
for fc in sparse_features_columns:
embedding_layers_dict[fc.name] = Embedding(fc.vocabulary_size, fc.embedding_dim, name='kd_emb_'+fc.name)
return embedding_layers_dict
# 将所有的sparse特征embedding拼接
def concat_embedding_list(feature_columns, input_layer_dict, embedding_layer_dict, flatten=False):
# 将sparse特征筛选出来
sparse_feature_columns = list(filter(lambda x: isinstance(x, SparseFeat), feature_columns))
embedding_list = []
for fc in sparse_feature_columns:
_input = input_layer_dict[fc.name] # 获取输入层
_embed = embedding_layer_dict[fc.name] # B x 1 x dim 获取对应的embedding层
embed = _embed(_input) # B x dim 将input层输入到embedding层中
# 是否需要flatten, 如果embedding列表最终是直接输入到Dense层中需要进行Flatten否则不需要
if flatten:
embed = Flatten()(embed)
embedding_list.append(embed)
return embedding_list
def get_dnn_output(dnn_input, hidden_units=[1024, 512, 256], dnn_dropout=0.3, activation='relu'):
# 建立dnn_network
dnn_network = [Dense(units=unit, activation=activation) for unit in hidden_units]
dropout = Dropout(dnn_dropout)
# 前向传播
x = dnn_input
for dnn in dnn_network:
x = dropout(dnn(x))
return x
# 得到线性部分的计算结果, 即线性部分计算的前向传播逻辑
def get_linear_logits(dense_input_dict, sparse_input_dict, linear_feature_columns):
"""
线性部分的计算所有特征的Input层然后经过一个全连接层线性计算结果logits
即FM线性部分的那块计算w1x1+w2x2+...wnxn + b,只不过,连续特征和离散特征这里的线性计算还不太一样
连续特征由于是数值,可以直接过全连接,得到线性这边的输出。
离散特征需要先embedding得到1维embedding然后直接把这个1维的embedding相加就得到离散这边的线性输出。
:param dense_input_dict: A dict. 连续特征构建的输入层字典 形式{'dense_name': Input(shape, name, dtype)}
:param sparse_input_dict: A dict. 离散特征构建的输入层字典 形式{'sparse_name': Input(shape, name, dtype)}
:param linear_feature_columns: A list. 里面的每个元素是namedtuple(元组的一种扩展类型,同时支持序号和属性名访问组件)类型表示的是linear数据的特征封装版
"""
# 把所有的dense特征合并起来,经过一个神经元的全连接,做的计算 w1x1 + w2x2 + w3x3....wnxn
concat_dense_inputs = Concatenate(axis=1)(list(dense_input_dict.values()))
dense_logits_output = Dense(1)(concat_dense_inputs)
# 获取linear部分sparse特征的embedding层这里使用embedding的原因
# 对于linear部分直接将特征进行OneHot然后通过一个全连接层当维度特别大的时候计算比较慢
# 使用embedding层的好处就是可以通过查表的方式获取到非零元素对应的权重然后将这些权重相加提升效率
linear_embedding_layers = build_embedding_layers(linear_feature_columns, sparse_input_dict, is_linear=True)
# 将一维的embedding拼接注意这里需要一个Flatten层 使维度对应
sparse_1d_embed = []
for fc in linear_feature_columns:
# 离散特征要进行embedding
if isinstance(fc, SparseFeat):
# 找到对应Input层然后后面接上embedding层
feat_input = sparse_input_dict[fc.name]
embed = Flatten()(linear_embedding_layers[fc.name](feat_input))
sparse_1d_embed.append(embed)
# embedding中查询得到的权重就是对应onehot向量中一个位置的权重所以后面不用再接一个全连接了本身一维的embedding就相当于全连接
# 只不过是这里的输入特征只有0和1所以直接向非零元素对应的权重相加就等同于进行了全连接操作(非零元素部分乘的是1)
sparse_logits_output = Add()(sparse_1d_embed)
# 最终将dense特征和sparse特征对应的logits相加得到最终linear的logits
linear_part = Add()([dense_logits_output, sparse_logits_output])
return linear_part
class CIN(Layer):
def __init__(self, cin_size, l2_reg=1e-4):
"""
:param: cin_size: A list. [H_1, H_2, ....H_T], a list of number of layers
"""
super(CIN, self).__init__()
self.cin_size = cin_size
self.l2_reg = l2_reg
def build(self, input_shape):
# input_shape [None, field_nums, embedding_dim]
self.field_nums = input_shape[1]
# CIN 的每一层大小这里加入第0层也就是输入层H_0
self.field_nums = [self.field_nums] + self.cin_size
# 过滤器
self.cin_W = {
'CIN_W_' + str(i): self.add_weight(
name='CIN_W_' + str(i),
shape = (1, self.field_nums[0] * self.field_nums[i], self.field_nums[i+1]), # 这个大小要理解
initializer='random_uniform',
regularizer=l2(self.l2_reg),
trainable=True
)
for i in range(len(self.field_nums)-1)
}
super(CIN, self).build(input_shape)
def call(self, inputs):
# inputs [None, field_num, embed_dim]
embed_dim = inputs.shape[-1]
hidden_layers_results = [inputs]
# 从embedding的维度把张量一个个的切开,这个为了后面逐通道进行卷积,算起来好算
# 这个结果是个list list长度是embed_dim, 每个元素维度是[None, field_nums[0], 1] field_nums[0]即输入的特征个数
# 即把输入的[None, field_num, embed_dim]切成了embed_dim个[None, field_nums[0], 1]的张量
split_X_0 = tf.split(hidden_layers_results[0], embed_dim, 2)
for idx, size in enumerate(self.cin_size):
# 这个操作和上面是同理的也是为了逐通道卷积的时候更加方便分割的是当一层的输入Xk-1
split_X_K = tf.split(hidden_layers_results[-1], embed_dim, 2) # embed_dim个[None, field_nums[i], 1] feild_nums[i] 当前隐藏层单元数量
# 外积的运算
out_product_res_m = tf.matmul(split_X_0, split_X_K, transpose_b=True) # [embed_dim, None, field_nums[0], field_nums[i]]
out_product_res_o = tf.reshape(out_product_res_m, shape=[embed_dim, -1, self.field_nums[0]*self.field_nums[idx]]) # 后两维合并起来
out_product_res = tf.transpose(out_product_res_o, perm=[1, 0, 2]) # [None, dim, field_nums[0]*field_nums[i]]
# 卷积运算
# 这个理解的时候每个样本相当于1张通道为1的照片 dim为宽度 field_nums[0]*field_nums[i]为长度
# 这时候的卷积核大小是field_nums[0]*field_nums[i]的, 这样一个卷积核的卷积操作相当于在dim上进行滑动每一次滑动会得到一个数
# 这样一个卷积核之后会得到dim个数即得到了[None, dim, 1]的张量, 这个即当前层某个神经元的输出
# 当前层一共有field_nums[i+1]个神经元, 也就是field_nums[i+1]个卷积核,最终的这个输出维度[None, dim, field_nums[i+1]]
cur_layer_out = tf.nn.conv1d(input=out_product_res, filters=self.cin_W['CIN_W_'+str(idx)], stride=1, padding='VALID')
cur_layer_out = tf.transpose(cur_layer_out, perm=[0, 2, 1]) # [None, field_num[i+1], dim]
hidden_layers_results.append(cur_layer_out)
# 最后CIN的结果要取每个中间层的输出这里不要第0层的了
final_result = hidden_layers_results[1:] # 这个的维度T个[None, field_num[i], dim] T 是CIN的网络层数
# 接下来在第一维度上拼起来
result = tf.concat(final_result, axis=1) # [None, H1+H2+...HT, dim]
# 接下来, dim维度上加和并把第三个维度1干掉
result = tf.reduce_sum(result, axis=-1, keepdims=False) # [None, H1+H2+..HT]
return result
def xDeepFM(linear_feature_columns, dnn_feature_columns, cin_size=[128, 128]):
# 构建输入层即所有特征对应的Input()层,这里使用字典的形式返回,方便后续构建模型
dense_input_dict, sparse_input_dict = build_input_layers(linear_feature_columns+dnn_feature_columns)
# 构建模型的输入层,模型的输入层不能是字典的形式,应该将字典的形式转换成列表的形式
# 注意这里实际的输入预Input层对应是通过模型输入时候的字典数据的key与对应name的Input层
input_layers = list(dense_input_dict.values()) + list(sparse_input_dict.values())
# 线性部分的计算逻辑 -- linear
linear_logits = get_linear_logits(dense_input_dict, sparse_input_dict, linear_feature_columns)
# 构建维度为k的embedding层这里使用字典的形式返回方便后面搭建模型
# 线性层和dnn层统一的embedding层
embedding_layer_dict = build_embedding_layers(linear_feature_columns+dnn_feature_columns, sparse_input_dict, is_linear=False)
# DNN侧的计算逻辑 -- Deep
# 将dnn_feature_columns里面的连续特征筛选出来并把相应的Input层拼接到一块
dnn_dense_feature_columns = list(filter(lambda x: isinstance(x, DenseFeat), dnn_feature_columns)) if dnn_feature_columns else []
dnn_dense_feature_columns = [fc.name for fc in dnn_dense_feature_columns]
dnn_concat_dense_inputs = Concatenate(axis=1)([dense_input_dict[col] for col in dnn_dense_feature_columns])
# 将dnn_feature_columns里面的离散特征筛选出来相应的embedding层拼接到一块
dnn_sparse_kd_embed = concat_embedding_list(dnn_feature_columns, sparse_input_dict, embedding_layer_dict, flatten=True)
dnn_concat_sparse_kd_embed = Concatenate(axis=1)(dnn_sparse_kd_embed)
# DNN层的输入和输出
dnn_input = Concatenate(axis=1)([dnn_concat_dense_inputs, dnn_concat_sparse_kd_embed])
dnn_out = get_dnn_output(dnn_input)
dnn_logits = Dense(1)(dnn_out)
# CIN侧的计算逻辑 这里使用的DNN feature里面的sparse部分,这里不要flatten
exFM_sparse_kd_embed = concat_embedding_list(dnn_feature_columns, sparse_input_dict, embedding_layer_dict, flatten=False)
exFM_input = Concatenate(axis=1)(exFM_sparse_kd_embed)
exFM_out = CIN(cin_size=cin_size)(exFM_input)
exFM_logits = Dense(1)(exFM_out)
# 三边的结果stack
stack_output = Add()([linear_logits, dnn_logits, exFM_logits])
# 输出层
output_layer = Dense(1, activation='sigmoid')(stack_output)
model = Model(input_layers, output_layer)
return model
if __name__ == "__main__":
# 读取数据
data = pd.read_csv('../data/criteo_sample.txt')
# 划分dense和sparse特征
columns = data.columns.values
dense_features = [feat for feat in columns if 'I' in feat]
sparse_features = [feat for feat in columns if 'C' in feat]
# 简单的数据预处理
train_data = data_process(data, dense_features, sparse_features)
train_data['label'] = data['label']
# 将特征分组分成linear部分和dnn部分(根据实际场景进行选择)并将分组之后的特征做标记使用DenseFeat, SparseFeat
linear_feature_columns = [SparseFeat(feat, vocabulary_size=data[feat].nunique(),embedding_dim=4)
for i,feat in enumerate(sparse_features)] + [DenseFeat(feat, 1,)
for feat in dense_features]
dnn_feature_columns = [SparseFeat(feat, vocabulary_size=data[feat].nunique(),embedding_dim=4)
for i,feat in enumerate(sparse_features)] + [DenseFeat(feat, 1,)
for feat in dense_features]
# 构建xDeepFM模型
model = xDeepFM(linear_feature_columns, dnn_feature_columns)
model.summary()
model.compile(optimizer="adam",
loss="binary_crossentropy",
metrics=["binary_crossentropy", tf.keras.metrics.AUC(name='auc')])
# 将输入数据转化成字典的形式输入
train_model_input = {name: data[name] for name in dense_features + sparse_features}
# 模型训练
model.fit(train_model_input, train_data['label'].values,
batch_size=64, epochs=5, validation_split=0.2, )