1 Commits

Author SHA1 Message Date
zhongqiangwu960812
2ea75605ab upload sdm_mind demo 2022-04-03 13:35:30 +08:00
12 changed files with 1402 additions and 13 deletions

View File

@@ -1,14 +1,14 @@
from random import sample, seed
import sys
sys.path.append("..")
import os
import time
import random
import numpy as np
import pandas as pd
from datetime import date, datetime
from sklearn.preprocessing import LabelEncoder
from features import DenseFeat, SparseFeat, VarLenSparseFeat
from tqdm import tqdm
import tensorflow as tf
from tensorflow.python.keras.preprocessing.sequence import pad_sequences
def process_data(sample_num=5000000):
train_data_path = "./data/train"
@@ -59,3 +59,234 @@ def process_data(sample_num=5000000):
train_label = np.array(train_df['click'])
train_df.pop('click')
return feature_max_index_dict, train_input_dict, train_label
def gen_sdm_data_set(click_data, seq_short_len=5, seq_prefer_len=50):
"""
:param: seq_short_len: 短期会话的长度
:param: seq_prefer_len: 会话的最长长度
"""
click_data.sort_values("expo_time", inplace=True)
train_set, test_set = [], []
for user_id, hist_click in tqdm(click_data.groupby('user_id')):
pos_list = hist_click['article_id'].tolist()
cat1_list = hist_click['cat_1'].tolist()
cat2_list = hist_click['cat_2'].tolist()
# 滑动窗口切分数据
for i in range(1, len(pos_list)):
hist = pos_list[:i]
cat1_hist = cat1_list[:i]
cat2_hist = cat2_list[:i]
# 序列长度只够短期的
if i <= seq_short_len and i != len(pos_list) - 1:
train_set.append((
# 用户id, 用户短期历史行为序列, 用户长期历史行为序列, 当前行为文章, label
user_id, hist[::-1], [0] * seq_prefer_len, pos_list[i], 1,
# 用户短期历史序列长度, 用户长期历史序列长度,
len(hist[::-1]), 0,
# 用户短期历史序列对应类别1 用户长期历史行为序列对应类别1
cat1_hist[::-1], [0] * seq_prefer_len,
# 历史短期历史序列对应类别2 用户长期历史行为序列对应类别2
cat2_hist[::-1], [0] * seq_prefer_len
))
# 序列长度够长期的
elif i != len(pos_list) - 1:
train_set.append((
# 用户id, 用户短期历史行为序列,用户长期历史行为序列, 当前行为文章, label
user_id, hist[::-1][:seq_short_len], hist[::-1][seq_short_len:], pos_list[i], 1,
# 用户短期行为序列长度,用户长期行为序列长度,
seq_short_len, len(hist[::-1]) - seq_short_len,
# 用户短期历史行为序列对应类别1 用户长期历史行为序列对应类别1
cat1_hist[::-1][:seq_short_len], cat1_hist[::-1][seq_short_len:],
# 用户短期历史行为序列对应类别2 用户长期历史行为序列对应类别2
cat2_hist[::-1][:seq_short_len], cat2_hist[::-1][seq_short_len:]
))
# 测试集保留最长的那一条
elif i <= seq_short_len and i == len(pos_list) - 1:
test_set.append((
user_id, hist[::-1], [0] * seq_prefer_len, pos_list[i], 1,
len(hist[::-1]), 0,
cat1_hist[::-1], [0] * seq_perfer_len,
cat2_hist[::-1], [0] * seq_prefer_len
))
else:
test_set.append((
user_id, hist[::-1][:seq_short_len], hist[::-1][seq_short_len:], pos_list[i], 1,
seq_short_len, len(hist[::-1]) - seq_short_len,
cat1_hist[::-1][:seq_short_len], cat1_hist[::-1][seq_short_len:],
cat2_list[::-1][:seq_short_len], cat2_hist[::-1][seq_short_len:]
))
random.shuffle(train_set)
random.shuffle(test_set)
return train_set, test_set
def gen_sdm_model_input(train_set, user_profile, seq_short_len, seq_prefer_len):
"""构造模型输入"""
# row: [user_id, short_train_seq, perfer_train_seq, item_id, label, short_len, perfer_len, cat_1_short, cat_1_perfer, cat_2_short, cat_2_prefer]
train_uid = np.array([row[0] for row in train_set])
short_train_seq = [row[1] for row in train_set]
prefer_train_seq = [row[2] for row in train_set]
train_iid = np.array([row[3] for row in train_set])
train_label = np.array([row[4] for row in train_set])
train_short_len = np.array([row[5] for row in train_set])
train_prefer_len = np.array([row[6] for row in train_set])
short_train_seq_cat1 = np.array([row[7] for row in train_set])
prefer_train_seq_cat1 = np.array([row[8] for row in train_set])
short_train_seq_cat2 = np.array([row[9] for row in train_set])
prefer_train_seq_cat2 = np.array([row[10] for row in train_set])
# padding操作
train_short_item_pad = pad_sequences(short_train_seq, maxlen=seq_short_len, padding='post', truncating='post',
value=0)
train_prefer_item_pad = pad_sequences(prefer_train_seq, maxlen=seq_prefer_len, padding='post', truncating='post',
value=0)
train_short_cat1_pad = pad_sequences(short_train_seq_cat1, maxlen=seq_short_len, padding='post', truncating='post',
value=0)
train_prefer_cat1_pad = pad_sequences(prefer_train_seq_cat1, maxlen=seq_prefer_len, padding='post',
truncating='post', value=0)
train_short_cat2_pad = pad_sequences(short_train_seq_cat2, maxlen=seq_short_len, padding='post', truncating='post',
value=0)
train_prefer_cat2_pad = pad_sequences(prefer_train_seq_cat2, maxlen=seq_prefer_len, padding='post',
truncating='post', value=0)
# 形成输入词典
train_model_input = {
"user_id": train_uid,
"doc_id": train_iid,
"short_doc_id": train_short_item_pad,
"prefer_doc_id": train_prefer_item_pad,
"prefer_sess_length": train_prefer_len,
"short_sess_length": train_short_len,
"short_cat1": train_short_cat1_pad,
"prefer_cat1": train_prefer_cat1_pad,
"short_cat2": train_short_cat2_pad,
"prefer_cat2": train_prefer_cat2_pad
}
# 其他的用户特征加入
for key in ["gender", "age", "city"]:
train_model_input[key] = user_profile.loc[train_model_input['user_id']][key].values
return train_model_input, train_label
def gen_neg_sample_candiate(pos_list, item_ids, doc_clicked_count_dict, negsample, methods='multinomial'):
# w2v的负样本采样 增加高热item成为负样本的概率
# 参考https://blog.csdn.net/weixin_42299244/article/details/112734531, 这里用tf相应函数替换
# tf.random.categoral 非相关数据的降采样
if methods == 'multinomial':
# input表示每个item出现的次数
items, item_counts = [], []
# 用户未点击的item 这个遍历效率很低 后面看看能不能优化下
# for item_id in list(set(item_ids)-set(pos_list)):
# items.append(item_id)
# item_counts.append(doc_clicked_count_dict[item_id]*1.0)
items = list(doc_clicked_count_dict.keys())
item_counts = list(doc_clicked_count_dict.values())
item_freqs = np.array(item_counts) / np.sum(item_counts)
item_freqs = item_freqs ** 0.75
item_freqs = item_freqs / np.sum(item_freqs)
neg_item_index = tf.random.categorical(item_freqs.reshape(1, -1), len(pos_list) * negsample)
neg_list = np.array([items[i] for i in neg_item_index.numpy().tolist()[0] if items[i] not in pos_list])
random.shuffle(neg_list)
# 曝光样本随机采
else:
candidate_set = list(set(item_ids) - set(pos_list)) # 热度采样
neg_list = np.random.choice(candidate_set, size=len(pos_list) * negsample, replace=True) # 对于每个正样本选择n个负样本
return neg_list
def gen_data_set(click_data, doc_clicked_count_dict, negsample, control_users=False):
"""构造youtubeDNN的数据集"""
# 按照曝光时间排序
click_data.sort_values("expo_time", inplace=True)
item_ids = click_data['article_id'].unique()
train_set, test_set = [], []
for user_id, hist_click in tqdm(click_data.groupby('user_id')):
# 这里按照expo_date分开每一天用滑动窗口滑可能相关性更高些,另外这样序列不会太长因为eda发现有点击1111个的
# for expo_date, hist_click in hist_date_click.groupby('expo_date'):
# 用户当天的点击历史id
pos_list = hist_click['article_id'].tolist()
user_control_flag = True
if control_users:
user_samples_cou = 0
# 过长的序列截断
if len(pos_list) > 50:
pos_list = pos_list[-50:]
if negsample > 0:
neg_list = gen_neg_sample_candiate(pos_list, item_ids, doc_clicked_count_dict, negsample,
methods='random')
# 只有1个的也截断 去掉,当然我之前做了处理,这里没有这种情况了
if len(pos_list) < 2:
continue
else:
# 序列至少是2
for i in range(1, len(pos_list)):
hist = pos_list[:i]
# 这里采用打压热门item策略降低高展item成为正样本的概率
#freq_i = doc_clicked_count_dict[pos_list[i]] / (np.sum(list(doc_clicked_count_dict.values())))
#p_posi = (np.sqrt(freq_i / 0.001) + 1) * (0.001 / freq_i)
# p_posi=0.3 表示该item_i成为正样本的概率是0.3
if user_control_flag and i != len(pos_list) - 1:
#if random.random() > (1 - p_posi):
if True:
row = [user_id, hist[::-1], pos_list[i], hist_click.iloc[0]['city'], hist_click.iloc[0]['age'],
hist_click.iloc[0]['gender'], 1, len(hist[::-1])]
train_set.append(row)
for negi in range(negsample):
row = [user_id, hist[::-1], neg_list[i * negsample + negi], hist_click.iloc[0]['city'],
hist_click.iloc[0]['age'], hist_click.iloc[0]['gender'], 0, len(hist[::-1])]
train_set.append(row)
if control_users:
user_samples_cou += 1
# 每个用户序列最长是50 即每个用户正样本个数最多是50个, 如果每个用户训练样本数量到了30个训练集不能加这个用户了
if user_samples_cou > 30:
user_samples_cou = False
# 整个序列加入到test_set 注意,这里一定每个用户只有一个最长序列,相当于测试集数目等于用户个数
elif i == len(pos_list) - 1:
row = [user_id, hist[::-1], pos_list[i], hist_click.iloc[0]['city'], hist_click.iloc[0]['age'],
hist_click.iloc[0]['gender'], 0, len(hist[::-1])]
test_set.append(row)
random.shuffle(train_set)
random.shuffle(test_set)
return train_set, test_set
"""构造模型输入"""
def gen_model_input(train_set, his_seq_max_len):
"""构造模型的输入"""
# row: [user_id, hist_list, cur_doc_id, city, age, gender, label, hist_len]
train_uid = np.array([row[0] for row in train_set])
train_hist_seq = [row[1] for row in train_set]
train_iid = np.array([row[2] for row in train_set])
train_u_city = np.array([row[3] for row in train_set])
train_u_age = np.array([row[4] for row in train_set])
train_u_gender = np.array([row[5] for row in train_set])
train_label = np.array([row[6] for row in train_set])
train_hist_len = np.array([row[7] for row in train_set])
train_seq_pad = pad_sequences(train_hist_seq, maxlen=his_seq_max_len, padding='post', truncating='post', value=0)
train_model_input = {
"user_id": train_uid,
"doc_id": train_iid,
"hist_doc_id": train_seq_pad,
"hist_len": train_hist_len,
"u_city": train_u_city,
"u_age": train_u_age,
"u_gender": train_u_gender,
}
return train_model_input, train_label

View File

@@ -0,0 +1,31 @@
import os
# 切换目录
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
os.chdir(BASE_DIR)
print(os.getcwd())
import pandas as pd
from examples.utils import get_hist_and_last_click
from recall_methods import mind_recall
from metrics import H_Rate
import warnings
warnings.filterwarnings('ignore')
if __name__ == "__main__":
data_path = 'data'
data = pd.read_csv(os.path.join(data_path, 'train_data.csv'), index_col=0, parse_dates=['expo_time'])
# 选择出需要用到的列
use_cols = ['user_id', 'article_id', 'expo_time', 'net_status', 'exop_position', 'duration', 'device', 'city',
'age', 'gender', 'click']
data_new = data[use_cols]
# 划分训练集和测试集
click_df = data_new[data_new['click'] == 1]
user_click_hist_df, user_click_last_df = get_hist_and_last_click(click_df)
# mind 召回
user_recall_doc_dict1, user_recall_doc_dict2 = mind_recall(user_click_hist_df)
# 评估
H_Rate(user_recall_doc_dict1, user_click_last_df, topk=200)

View File

@@ -0,0 +1,38 @@
import os
# 切换目录
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
os.chdir(BASE_DIR)
print(os.getcwd())
import pandas as pd
from examples.utils import get_hist_and_last_click
from recall_methods import sdm_recall
from metrics import H_Rate
import warnings
warnings.filterwarnings('ignore')
if __name__ == "__main__":
data_path = 'data'
data = pd.read_csv(os.path.join(data_path, 'train_data.csv'), index_col=0, parse_dates=['expo_time'])
# 选择出需要用到的列
use_cols = ['user_id', 'article_id', 'expo_time', 'net_status', 'exop_position', 'duration', 'city', 'age',
'gender', 'click', 'cat_1', 'cat_2']
data_new = data[use_cols]
# 划分训练集和测试集
click_df = data_new[data_new['click'] == 1]
user_click_hist_df, user_click_last_df = get_hist_and_last_click(click_df)
# sdm 召回
user_recall_doc_dict = sdm_recall(user_click_hist_df)
# 评估
H_Rate(user_recall_doc_dict, user_click_last_df, topk=200)

View File

@@ -0,0 +1,151 @@
import pickle
import collections
import numpy as np
from tensorflow.python.keras.models import Model
from deepctr.feature_column import SparseFeat, VarLenSparseFeat, DenseFeat
from deepmatch.utils import sampledsoftmaxloss
from models.matching.MIND import MIND
from models.matching.SDM import SDM
from annoy import AnnoyIndex
def get_hist_and_last_click(all_click):
all_click = all_click.sort_values(by=['user_id', 'expo_time'])
click_last_df = all_click.groupby('user_id').tail(1)
# 如果用户只有一个点击hist为空了会导致训练的时候这个用户不可见此时默认泄露一下
def hist_func(user_df):
if len(user_df) == 1:
return user_df
else:
return user_df[:-1]
click_hist_df = all_click.groupby('user_id').apply(hist_func).reset_index(drop=True)
return click_hist_df, click_last_df
def train_sdm_model(train_model_input, train_label, embedding_dim, feature_max_idx, SEQ_LEN_short, SEQ_LEN_prefer
,batch_size, epochs, verbose, validation_split):
"""构建sdm并完成训练"""
# 建立模型
user_feature_columns = [
SparseFeat('user_id', feature_max_idx['user_id'], 16),
SparseFeat('gender', feature_max_idx['gender'], 16),
SparseFeat('age', feature_max_idx['age'], 16),
SparseFeat('city', feature_max_idx['city'], 16),
VarLenSparseFeat
(SparseFeat('short_doc_id', feature_max_idx['article_id'], embedding_dim, embedding_name="doc_id"), SEQ_LEN_short, 'mean', 'short_sess_length'),
VarLenSparseFeat
(SparseFeat('prefer_doc_id', feature_max_idx['article_id'], embedding_dim, embedding_name='doc_id'), SEQ_LEN_prefer, 'mean', 'prefer_sess_length'),
VarLenSparseFeat(SparseFeat('short_cat1', feature_max_idx['cat_1'], embedding_dim, embedding_name='cat_1'), SEQ_LEN_short, 'mean', 'short_sess_length'),
VarLenSparseFeat(SparseFeat('prefer_cat1', feature_max_idx['cat_1'], embedding_dim, embedding_name='cat_1'), SEQ_LEN_prefer, 'mean', 'prefer_sess_length'),
VarLenSparseFeat(SparseFeat('short_cat2', feature_max_idx['cat_2'], embedding_dim, embedding_name='cat_2'), SEQ_LEN_short, 'mean', 'short_sess_length'),
VarLenSparseFeat(SparseFeat('prefer_cat2', feature_max_idx['cat_2'], embedding_dim, embedding_name='cat_2'), SEQ_LEN_prefer, 'mean', 'prefer_sess_length'),
]
item_feature_columns = [SparseFeat('doc_id', feature_max_idx['article_id'], embedding_dim)]
# 定义模型
model = SDM(user_feature_columns, item_feature_columns, history_feature_list=['doc_id', 'cat1', 'cat2'])
# 模型编译
model.compile(optimizer="adam", loss=sampledsoftmaxloss)
# 模型训练这里可以定义验证集的比例如果设置为0的话就是全量数据直接进行训练
history = model.fit(train_model_input, train_label, batch_size=batch_size, epochs=epochs, verbose=verbose, validation_split=validation_split)
return model
def train_mind_model(train_model_input, train_label, embedding_dim, feature_max_idx, his_seq_maxlen, batch_size, epochs,
verbose, validation_split):
"""构建mind并完成训练"""
# 建立模型
user_feature_columns = [
SparseFeat('user_id', feature_max_idx['user_id'], embedding_dim),
VarLenSparseFeat(SparseFeat('hist_doc_id', feature_max_idx['article_id'], embedding_dim,
embedding_name="click_doc_id"), his_seq_maxlen, 'mean', 'hist_len'),
DenseFeat('hist_len', 1),
SparseFeat('u_city', feature_max_idx['city'], embedding_dim),
SparseFeat('u_age', feature_max_idx['age'], embedding_dim),
SparseFeat('u_gender', feature_max_idx['gender'], embedding_dim),
]
doc_feature_columns = [
SparseFeat('doc_id', feature_max_idx['article_id'], embedding_dim)
# 这里后面也可以把文章的类别画像特征加入
]
# 定义模型
model = MIND(user_feature_columns, doc_feature_columns, num_sampled=5, user_dnn_hidden_units=(64, embedding_dim))
# 模型编译
model.compile(optimizer="adam", loss=sampledsoftmaxloss)
# 模型训练这里可以定义验证集的比例如果设置为0的话就是全量数据直接进行训练
history = model.fit(train_model_input, train_label, batch_size=batch_size, epochs=epochs, verbose=verbose,
validation_split=validation_split)
return model
"""获取用户embedding和文章embedding"""
def get_embeddings(model, test_model_input, user_idx_2_rawid, doc_idx_2_rawid, save_path='embedding/'):
doc_model_input = {'doc_id' :np.array(list(doc_idx_2_rawid.keys()))}
user_embedding_model = Model(inputs=model.user_input, outputs=model.user_embedding)
doc_embedding_model = Model(inputs=model.item_input, outputs=model.item_embedding)
# 保存当前的item_embedding 和 user_embedding 排序的时候可能能够用到但是需要注意保存的时候需要和原始的id对应
user_embs = user_embedding_model.predict(test_model_input, batch_size=2 ** 12)
doc_embs = doc_embedding_model.predict(doc_model_input, batch_size=2 ** 12)
# embedding保存之前归一化一下
user_embs = user_embs / np.linalg.norm(user_embs, axis=1, keepdims=True)
doc_embs = doc_embs / np.linalg.norm(doc_embs, axis=1, keepdims=True)
# 将Embedding转换成字典的形式方便查询
raw_user_id_emb_dict = {user_idx_2_rawid[k]: \
v for k, v in zip(user_idx_2_rawid.keys(), user_embs)}
raw_doc_id_emb_dict = {doc_idx_2_rawid[k]: \
v for k, v in zip(doc_idx_2_rawid.keys(), doc_embs)}
# 将Embedding保存到本地
pickle.dump(raw_user_id_emb_dict, open(save_path + 'user_emb.pkl', 'wb'))
pickle.dump(raw_doc_id_emb_dict, open(save_path + 'doc_emb.pkl', 'wb'))
# 读取
# user_embs_dict = pickle.load(open('embedding/user_youtube_emb.pkl', 'rb'))
# doc_embs_dict = pickle.load(open('embedding/doc_youtube_emb.pkl', 'rb'))
return user_embs, doc_embs
"""最近邻检索得到召回结果"""
def get_recall_res(user_embs, doc_embs, user_idx_2_rawid, doc_idx_2_rawid, topk):
"""近邻检索这里用annoy tree"""
# 把doc_embs构建成索引树
f = user_embs.shape[1]
t = AnnoyIndex(f, 'angular')
for i, v in enumerate(doc_embs):
t.add_item(i, v)
t.build(10)
# 可以保存该索引树 t.save('annoy.ann')
# 每个用户向量, 返回最近的TopK个item
user_recall_items_dict = collections.defaultdict(dict)
for i, u in enumerate(user_embs):
recall_doc_scores = t.get_nns_by_vector(u, topk, include_distances=True)
# recall_doc_scores是(([doc_idx], [scores])) 这里需要转成原始doc的id
raw_doc_scores = list(recall_doc_scores)
raw_doc_scores[0] = [doc_idx_2_rawid[i] for i in raw_doc_scores[0]]
# 转换成实际用户id
try:
user_recall_items_dict[user_idx_2_rawid[i]] = dict(zip(*raw_doc_scores))
except:
continue
# 默认是分数从小到大排的序, 这里要从大到小
user_recall_items_dict = {k: sorted(v.items(), key=lambda x: x[1], reverse=True) for k, v in user_recall_items_dict.items()}
# 保存一份
pickle.dump(user_recall_items_dict, open('u2i_dict.pkl', 'wb'))
return user_recall_items_dict

View File

@@ -1,7 +1,8 @@
import tensorflow as tf
from tensorflow.keras.layers import Layer, Dense, Activation, Dropout, \
BatchNormalization
from tensorflow.keras.initializers import Zeros
from tensorflow.keras.layers import *
from tensorflow.python.keras.initializers import RandomNormal, Zeros, glorot_normal, TruncatedNormal, Ones
class DNN(Layer):
@@ -127,9 +128,98 @@ class PredictLayer(Layer):
base_config = super(PredictLayer, self).get_config()
return dict(list(base_config.items()) + list(config.items()))
class EmbeddingIndex(Layer):
def __init__(self, index, **kwargs):
self.index = index
super(EmbeddingIndex, self).__init__(**kwargs)
def build(self, input_shape):
super(EmbeddingIndex, self).build(input_shape) # Be sure to call this somewhere!
def call(self, x, **kwargs):
return tf.constant(self.index)
# TODO Activation
class SampledSoftmaxLayer(Layer):
def __init__(self, num_sampled=5, **kwargs):
self.num_sampled = num_sampled
super(SampledSoftmaxLayer, self).__init__(**kwargs)
def build(self, input_shape):
self.size = input_shape[0][0] # docs num
self.zero_bias = self.add_weight(shape=[self.size], initializer=Zeros, dtype=tf.float32, trainable=False, name='bias')
super(SampledSoftmaxLayer, self).build(input_shape)
def call(self, inputs_with_label_idx):
embeddings, inputs, label_idx = inputs_with_label_idx
# 这里盲猜下这个操作应该是有了label_idx就能拿到其embedding这个是用户行为过多
# 所以(user_embedding, embedding[label_idx])就是正样本
# 然后根据num_sampled再从传入的embeddings字典里面随机取num_sampled个负样本
# 根据公式log(sigmoid(user_embedding, embedding[label_idx])) + 求和(log(sigmoid(-user_embedding, embedding[负样本])))得到损失
loss = tf.nn.sampled_softmax_loss(weights=embeddings, # (numclass, dim)
biases=self.zero_bias, # (numclass)
labels=label_idx, # (batch, num_true)
inputs=inputs, # (batch, dim)
num_sampled=self.num_sampled, # 负采样个数
num_classes=self.size # 类别数量
)
return tf.expand_dims(loss, axis=1)
# TODO Normalization
class LayerNormalization(Layer):
def __init__(self, axis=-1, eps=1e-9, center=True,
scale=True, **kwargs):
self.axis = axis
self.eps = eps
self.center = center
self.scale = scale
super(LayerNormalization, self).__init__(**kwargs)
def build(self, input_shape):
self.gamma = self.add_weight(name='gamma', shape=input_shape[-1:],
initializer=Ones(), trainable=True)
self.beta = self.add_weight(name='beta', shape=input_shape[-1:],
initializer=Zeros(), trainable=True)
super(LayerNormalization, self).build(input_shape)
def call(self, inputs):
mean = K.mean(inputs, axis=self.axis, keepdims=True)
variance = K.mean(K.square(inputs - mean), axis=-1, keepdims=True)
std = K.sqrt(variance + self.eps)
outputs = (inputs - mean) / std
if self.scale:
outputs *= self.gamma
if self.center:
outputs += self.beta
return outputs
class PoolingLayer(Layer):
def __init__(self, mode='mean', supports_masking=False, **kwargs):
if mode not in ['sum', 'mean', 'max']:
raise ValueError("mode must be sum or mean")
self.mode = mode
self.eps = tf.constant(1e-8, tf.float32)
super(PoolingLayer, self).__init__(**kwargs)
self.supports_masking = supports_masking
def build(self, input_shape):
super(PoolingLayer, self).build(
input_shape) # Be sure to call this somewhere!
def call(self, seq_value_len_list, mask=None, **kwargs):
if not isinstance(seq_value_len_list, list):
seq_value_len_list = [seq_value_len_list]
if len(seq_value_len_list) == 1:
return seq_value_len_list[0]
expand_seq_value_len_list = list(map(lambda x: tf.expand_dims(x, axis=-1), seq_value_len_list))
a = concat_func(expand_seq_value_len_list)
if self.mode == "mean":
hist = reduce_mean(a, axis=-1, )
if self.mode == "sum":
hist = reduce_sum(a, axis=-1, )
if self.mode == "max":
hist = reduce_max(a, axis=-1, )
return hist
def get_config(self, ):
config = {'mode': self.mode, 'supports_masking': self.supports_masking}
base_config = super(PoolingLayer, self).get_config()
return dict(list(base_config.items()) + list(config.items()))

View File

@@ -1,8 +1,10 @@
import itertools
from re import L
import tensorflow as tf
from tensorflow.keras.layers import Layer, Concatenate
from tensorflow.keras.initializers import TruncatedNormal, glorot_normal
from tensorflow.keras.layers import *
from tensorflow.keras.initializers import TruncatedNormal, glorot_normal, RandomNormal
from layers.utils import softmax, squash, reduce_sum
class FM(Layer):
@@ -352,4 +354,256 @@ class BilinearInteractionLayer(Layer):
config = {"bilinear_type": self.bilinear_type, "seed": self.seed}
base_config = super(BilinearInteractionLayer, self).get_config()
base_config.update(config)
return base_config
return base_config
class AttentionSequencePoolingLayer(Layer):
"""
:param query: [batch_szie, 1, c_q]
:param keys: [batch_size, T, c_k]
:param keys_length: [batch_size, 1]
:return [batch, 1, c_k]
"""
def __init__(self, dropout_rate=0, scale=True, **kwargs):
self.dropout_rate = dropout_rate
self.scale = scale
super(AttentionSequencePoolingLayer, self).__init__(**kwargs)
def build(self, input_shape):
self.projection_layer = Dense(units=1, activation='tanh')
super(AttentionSequencePoolingLayer, self).build(input_shape)
def call(self, inputs, mask=None, **kwargs):
# queries[None, 1, 64], keys[None, 50, 32], keys_length[None, 1] 表示真实的会话长度, 后面mask会用到
queries, keys, keys_length = inputs
hist_len = keys.get_shape()[1]
key_mask = tf.sequence_mask(keys_length, hist_len) # mask 矩阵 (None, 1, 50)
queries = tf.tile(queries, [1, hist_len, 1]) # [None, 50, 64] 为每个key都配备一个query
# 后面把queries与keys拼起来过全连接 这里是这样的, 本身接下来是要求queryies与keys中每个item相关性常规操作我们能想到的就是直接内积求得分数
# 而这里的Attention实现使用的是LuongAttention 传统的Attention原来是有两种BahdanauAttention与LuongAttention 这个在博客上整理下
# 这里采用的LuongAttention对其方式是q_k先拼接然后过DNN的方式
q_k = tf.concat([queries, keys], axis=-1) # [None, 50, 96]
output_scores = self.projection_layer(q_k) # [None, 50, 1]
if self.scale:
output_scores = output_scores / (q_k.get_shape().as_list()[-1] ** 0.5)
attention_score = tf.transpose(output_scores, [0, 2, 1])
# 加权求和 需要把填充的那部分mask掉
paddings = tf.ones_like(attention_score) * (-2 ** 32 + 1)
attention_score = tf.where(key_mask, attention_score, paddings)
attention_score = softmax(attention_score) # [None, 1, 50]
outputs = tf.matmul(attention_score, keys) # [None, 1, 32]
return outputs
def compute_output_shape(self, input_shape):
return (None, 1, input_shape[1][1])
class MultiHeadAttention(Layer):
def __init__(self, num_units=8, head_num=4, scale=True, dropout_rate=0.2, use_layer_norm=True, use_res=True,
seed=2020, **kwargs):
self.num_units = num_units
self.head_num = head_num
self.scale = scale
self.dropout_rate = dropout_rate
self.use_layer_norm = use_layer_norm
self.use_res = use_res
self.seed = seed
super(MultiHeadAttention, self).__init__(**kwargs)
def build(self, input_shape):
embedding_size = int(input_shape[0][-1])
# wq_wk_wv 放到一块
self.W = self.add_weight(name='Q_K_V', shape=[embedding_size, self.num_units * 3],
dtype=tf.float32,
initializer=TruncatedNormal(seed=self.seed))
self.W_output = self.add_weight(name='output_W', shape=[self.num_units, self.num_units],
dtype=tf.float32,
initializer=TruncatedNormal(seed=self.seed))
self.layer_norm = LayerNormalization()
self.dropout = Dropout(self.dropout_rate, seed=self.seed)
self.seq_len_max = int(input_shape[0][1])
super(MultiHeadAttention, self).build(input_shape)
def call(self, inputs, training=None, **kwargs):
input_info, keys_length = inputs
hist_len = input_info.get_shape()[1]
key_masks = tf.sequence_mask(keys_length, hist_len) # (None, 1, 5)
key_masks = tf.squeeze(key_masks, axis=1) # (None, 5)
Q_K_V = tf.tensordot(input_info, self.W, axes=(-1, 0)) # (None, seq_len, embed*3)
querys, keys, values = tf.split(Q_K_V, 3, -1)
# 计算的时候,分开头计算
querys = tf.concat(tf.split(querys, self.head_num, axis=2), axis=0) # (head_num*None, seq_len, embed/head_num)
keys = tf.concat(tf.split(keys, self.head_num, axis=2), axis=0) # (head_num*None, seq_len, embed/headnum)
values = tf.concat(tf.split(values, self.head_num, axis=2), axis=0) # (head_num*None, seq_len, embed/head_num)
# 注意力分数
att_score = tf.matmul(querys, tf.transpose(keys, [0, 2, 1])) # (head_num*None, seq_len, seq_len)
if self.scale:
att_score = att_score / (keys.get_shape().as_list()[-1] ** 0.5)
key_masks = tf.tile(key_masks, [self.head_num, 1]) # [head_num*None, seq_len]
key_masks = tf.tile(tf.expand_dims(key_masks, 1),
[1, tf.shape(input_info)[1], 1]) # [head_num*None, seq_len, seq_len]
paddings = tf.ones_like(att_score) * (-2 ** 32 + 1) # [head_num*None, seq_len, seq_len]
align = tf.where(key_masks, att_score, paddings) # [head_num*None, seq_len, seq_len]
align = softmax(align)
output = tf.matmul(align, values) # [head_num*None, seq_len, emb/head_num]
output = tf.concat(tf.split(output, self.head_num, axis=0), axis=2) # [None, seq_len, emb]
output = tf.tensordot(output, self.W_output, axes=(-1, 0)) # [None, seq_len, emb]
output = self.dropout(output, training=training)
if self.use_res:
output += input_info
if self.use_layer_norm:
output = self.layer_norm(output)
return output
def compute_output_shape(self, input_shape):
return (None, input_shape[0][1], self.num_units)
class UserAttention(Layer):
def __init__(self, num_units=None, activation='tanh', use_res=True, dropout_rate=0, scale=True, seed=2020,
**kwargs):
self.scale = scale
self.num_units = num_units
self.activation = activation
self.seed = seed
self.dropout_rate = dropout_rate
self.use_res = use_res
super(UserAttention, self).__init__(**kwargs)
def build(self, input_shape):
if self.num_units is None:
self.num_units = input_shape[0][-1]
self.dense = Dense(self.num_units, activation=self.activation)
super(UserAttention, self).build(input_shape)
def call(self, inputs, mask=None, **kwargs):
# [None, 1, embed] [None, 5, embed], [None, 1]
user_query, keys, keys_length = inputs
hist_len = keys.get_shape()[1]
key_masks = tf.sequence_mask(keys_length, hist_len) # [None, 1, 5]
query = self.dense(user_query) # [None, 1, num_units]
# 注意力分数
att_score = tf.matmul(query, tf.transpose(keys, [0, 2, 1])) # (None, 1, seq_len)
if self.scale:
att_score = att_score / (keys.get_shape().as_list()[-1] ** 0.5)
paddings = tf.ones_like(att_score) * (-2 ** 32 + 1) # [None, 1, seq_len]
align = tf.where(key_masks, att_score, paddings) # [None, 1, seq_len]
align = softmax(align)
output = tf.matmul(align, keys) # [None, 1, embed]
return output
def compute_output_shape(self, input_shape):
return (None, 1, input_shape[1][2])
def compute_mask(self, inputs, mask):
return mask
class LabelAwareAttention(Layer):
def __init__(self, k_max, pow_p=1, **kwargs):
self.k_max = k_max
self.pow_p = pow_p
super(LabelAwareAttention, self).__init__(**kwargs)
def build(self, input_shape):
self.embedding_size = input_shape[0][-1]
super(LabelAwareAttention, self).build(input_shape)
def call(self, inputs):
keys, query = inputs[0], inputs[1] # keys (None, 2, 8) query (None, 1, 8)
weight = reduce_sum(keys * query, axis=-1, keep_dims=True) # (None, 2, 1)
weight = tf.pow(weight, self.pow_p) # (None, 2, 1)
# k如果需要动态调整那么这里就根据实际长度mask操作这样被mask的输出胶囊的权重为0 发挥不出作用了
if len(inputs) == 3:
k_user = tf.cast(tf.maximum(
1.,
tf.minimum(
tf.cast(self.k_max, dtype="float32"), # k_max
tf.log1p(tf.cast(inputs[2], dtype="float32")) / tf.log(2.) # hist_len
)
), dtype="int64")
seq_mask = tf.transpose(tf.sequence_mask(k_user, self.k_max), [0, 2, 1])
padding = tf.ones_like(seq_mask, dtype=tf.float32) * (-2 ** 32 + 1) # [x,k_max,1]
weight = tf.where(seq_mask, weight, padding)
weight = softmax(weight, dim=1, name='weight')
output = reduce_sum(keys * weight, axis=1) # (None, 8)
return output
class CapsuleLayer(Layer):
def __init__(self, input_units, out_units, max_len, k_max, iteration_times=3, init_std=1.0, **kwargs):
self.input_units = input_units
self.out_units = out_units
self.max_len = max_len
self.k_max = k_max
self.iteration_times = iteration_times
self.init_std = init_std
super(CapsuleLayer, self).__init__(**kwargs)
def build(self, input_shape):
# 路由对数大小是1250 即每个路由对数与输入胶囊个数一一对应,同时如果有两组输出胶囊的话, 那么这里就需要2组B
self.routing_logits = self.add_weight(shape=[1, self.k_max, self.max_len],
initializer=RandomNormal(stddev=self.init_std),
trainable=False, name='B', dtype=tf.float32)
# 双线性映射矩阵,维度是[输入胶囊维度,输出胶囊维度] 这样才能进行映射
self.bilinear_mapping_matrix = self.add_weight(shape=[self.input_units, self.out_units],
initializer=RandomNormal(stddev=self.init_std),
name="S", dtype=tf.float32)
super(CapsuleLayer, self).build(input_shape)
def call(self, inputs, **kwargs):
# input (hist_emb, hist_len) ,其中hist_emb是(None, seq_len, emb_dim), hist_len是(none, 1) batch and sel_len
behavior_embeddings, seq_len = inputs
batch_size = tf.shape(behavior_embeddings)[0]
seq_len_tile = tf.tile(seq_len, [1, self.k_max]) # 在第二个维度上复制一份 k_max个输出胶囊嘛 (None, 2) 第一列和第二列都是序列真实长度
for i in range(self.iteration_times):
mask = tf.sequence_mask(seq_len_tile,
self.max_len) # None, 2, 50) 第一个维度是样本,第二个维度是胶囊,第三个维度是[True, True, ..False, False, ..]
pad = tf.ones_like(mask, dtype=tf.float32) * (
-2 ** 32 + 1) # (None, 2, 50) 被mask的位置是非常小的数这样softmax的时候这个位置正好是0
routing_logits_with_padding = tf.where(mask, tf.tile(self.routing_logits, [batch_size, 1, 1]), pad)
# (None, 2, 50) 沿着batch_size进行复制每个样本都得有这样一套B2个输出胶囊 50个输入胶囊
weight = tf.nn.softmax(routing_logits_with_padding) # (none, 2, 50) # softmax得到权重
# (None, seq_len, emb_dim) * (emb_dim, out_emb) = (None, 50, 8) axes=1表示a的最后1个维度与b进行张量乘法
behavior_embedding_mapping = tf.tensordot(behavior_embeddings, self.bilinear_mapping_matrix, axes=1)
Z = tf.matmul(weight, behavior_embedding_mapping) # (None, 2, 8)即 上面的B与behavior_embed_map加权求和
interest_capsules = squash(Z) # (None, 2, 8)
delta_routing_logits = reduce_sum(
# (None, 2, 8) * (None, 8, 50) = (None, 2, 50)
tf.matmul(interest_capsules, tf.transpose(behavior_embedding_mapping, perm=[0, 2, 1])),
axis=0, keep_dims=True
) # (1, 2, 50) 样本维度这里相加 所有样本一块为聚类做贡献
self.routing_logits.assign_add(delta_routing_logits) # 原来的基础上加上这个东西 1, 2, 50)
interest_capsules = tf.reshape(interest_capsules, [-1, self.k_max, self.out_units]) # (None, 2, 8)
return interest_capsules
def compute_output_shape(self, input_shape):
return (None, self.k_max, self.out_units)
# 下面这个如果需要保存模型的时候会用到
def get_config(self, ):
config = {'input_units': self.input_units, 'out_units': self.out_units, 'max_len': self.max_len,
'k_max': self.k_max, 'iteration_times': self.iteration_times, "init_std": self.init_std}
base_config = super(CapsuleLayer, self).get_config()
return dict(list(base_config.items()) + list(config.items()))

View File

@@ -1,7 +1,94 @@
from tensorflow.keras.layers import Layer
import tensorflow as tf
from tensorflow.keras.layers import *
if tf.__version__ >= '2.0.0':
tf.compat.v1.disable_eager_execution()
# TODO
class PoolingLayer(Layer):
def __init__(self, trainable=True, name=None, dtype=None, dynamic=False, **kwargs):
super().__init__(trainable, name, dtype, dynamic, **kwargs)
pass
pass
class DynamicMultiRNN(Layer):
def __init__(self, num_units=None, rnn_type='LSTM', return_sequence=True, num_layers=2, num_residual_layers=1,
dropout_rate=0.2, forget_bias=1.0, **kwargs):
self.num_units = num_units
self.return_sequence = return_sequence
self.rnn_type = rnn_type
self.num_layers = num_layers
self.num_residual_layers = num_residual_layers
self.dropout = dropout_rate
self.forget_bias = forget_bias
super(DynamicMultiRNN, self).__init__(**kwargs)
def build(self, input_shape):
input_seq_shape = input_shape[0]
if self.num_units is None:
self.num_units = input_seq_shape.as_list()[-1]
# 这里先只用LSTM
if self.rnn_type == "LSTM":
try:
# AttributeError: module 'tensorflow_core._api.v2.nn' has no attribute 'rnn_cell'
single_cell = tf.nn.rnn_cell.BasicLSTMCell(self.num_units, forget_bias=self.forget_bias)
except AttributeError:
single_cell = tf.compat.v1.nn.rnn_cell.BasicLSTMCell(self.num_units, forget_bias=self.forget_bias)
dropout = self.dropout if tf.keras.backend.learning_phase() == 1 else 0 # 训练的时候开启Dropout 不训练的时候关闭
try:
single_cell = tf.nn.rnn_cell.DropoutWrapper(cell=single_cell, input_keep_prob=(1.0 - dropout))
except AttributeError:
single_cell = tf.compat.v1.nn.rnn_cell.DropoutWrapper(cell=single_cell, input_keep_prob=(1.0 - dropout))
cell_list = []
for i in range(self.num_layers):
residual = (i >= self.num_layers - self.num_residual_layers)
if residual:
# 实现的功能, 把输入concat到输出上一起返回 就跟ResNet一样
try:
single_cell_residual = tf.nn.rnn_cell.ResidualWrapper(single_cell)
except AttributeError:
single_cell_residual = tf.compat.v1.nn.rnn_cell.ResidualWrapper(single_cell)
cell_list.append(single_cell_residual)
else:
cell_list.append(single_cell)
if len(cell_list) == 1:
self.final_cell = cell_list[0]
else:
# 构建多隐层RNN
try:
self.final_cell = tf.nn.rnn_cell.MultiRNNCell(cell_list)
except AttributeError:
self.final_cell = tf.compat.v1.nn.rnn_cell.MultiRNNCell(cell_list)
super(DynamicMultiRNN, self).build(input_shape)
def call(self, input_list):
rnn_input, sequence_length = input_list
try:
# AttributeError: module 'tensorflow' has no attribute 'variable_scope'
with tf.name_scope('rnn'), tf.variable_scope('rnn', reuse=tf.AUTO_REUSE):
# rnn_output是所有时间步的隐藏层状态 hidden_state是最后一个时间步的输出 如果是LSTM这里是c和h两个
rnn_output, hidden_state = tf.nn.dynamic_rnn(self.final_cell, inputs=rnn_input,
sequence_length=tf.squeeze(sequence_length),
dtype=tf.float32, scope=self.name)
except AttributeError:
with tf.name_scope("rnn"), tf.compat.v1.variable_scope("rnn", reuse=tf.compat.v1.AUTO_REUSE):
rnn_output, hidden_state = tf.compat.v1.nn.dynamic_rnn(self.final_cell, inputs=rnn_input,
sequence_length=tf.squeeze(sequence_length),
dtype=tf.float32, scope=self.name)
if self.return_sequence:
return rnn_output
else:
return tf.expand_dims(hidden_state, axis=1)
def compute_output_shape(self, input_shape):
rnn_input_shape = input_shape[0]
if self.return_sequence:
return rnn_input_shape
else:
return (None, 1, rnn_input_shape[2])

View File

@@ -1,5 +1,7 @@
from collections import OrderedDict
import tensorflow as tf
from tensorflow.keras.layers import *
from deepctr.feature_column import SparseFeat, DenseFeat, VarLenSparseFeat
class NoMask(tf.keras.layers.Layer):
def __init__(self, **kwargs):
@@ -15,3 +17,106 @@ class NoMask(tf.keras.layers.Layer):
def compute_mask(self, inputs, mask):
return None
def softmax(logits, dim=-1, name=None):
try:
return tf.nn.softmax(logits, dim=dim, name=name)
except TypeError:
return tf.nn.softmax(logits, axis=dim, name=name)
def concat_func(inputs, axis=-1, mask=False):
if not mask:
inputs = list(map(NoMask(), inputs))
if len(inputs) == 1:
return inputs[0]
else:
return tf.keras.layers.Concatenate(axis=axis)(inputs)
def build_input_layers(feature_columns, prefix=''):
input_features = OrderedDict()
for fc in feature_columns:
if isinstance(fc, SparseFeat):
input_features[fc.name] = Input(
shape=(1,), name=prefix + fc.name, dtype=fc.dtype)
elif isinstance(fc, DenseFeat):
input_features[fc.name] = Input(
shape=(fc.dimension,), name=prefix + fc.name, dtype=fc.dtype)
elif isinstance(fc, VarLenSparseFeat):
input_features[fc.name] = Input(shape=(fc.maxlen,), name=prefix + fc.name,
dtype=fc.dtype)
if fc.weight_name is not None:
input_features[fc.weight_name] = Input(shape=(fc.maxlen, 1), name=prefix + fc.weight_name,
dtype="float32")
if fc.length_name is not None:
input_features[fc.length_name] = Input((1,), name=prefix + fc.length_name, dtype='int32')
else:
raise TypeError("Invalid feature column type,got", type(fc))
return input_features
def build_embedding_layers(feature_columns):
embedding_layer_dict = {}
for fc in feature_columns:
if isinstance(fc, SparseFeat):
embedding_layer_dict[fc.name] = Embedding(fc.vocabulary_size, fc.embedding_dim, name='emb_' + fc.name)
elif isinstance(fc, VarLenSparseFeat):
# 这里加1是因为mask有个默认的embedding占用了一个位置比如mask为0 而0这个位置的embedding是mask专用了
embedding_layer_dict[fc.name] = Embedding(fc.vocabulary_size + 1, fc.embedding_dim, name='emb_' + fc.name,
mask_zero=True)
return embedding_layer_dict
def embedding_lookup(feature_columns, input_layer_dict, embedding_layer_dict):
# 这个函数是Input层与embedding层给接起来
embedding_list = []
for fc in feature_columns:
_input = input_layer_dict[fc]
_embed = embedding_layer_dict[fc]
embed = _embed(_input)
embedding_list.append(embed)
return embedding_list
def get_item_embedding(item_embedding, item_input_layer):
return Lambda(lambda x: tf.squeeze(tf.gather(item_embedding, x), axis=1))(
item_input_layer)
def squash(inputs):
vec_squared_norm = reduce_sum(tf.square(inputs), axis=-1, keep_dims=True) # (None, 2, 1)
scalar_factor = vec_squared_norm / (1 + vec_squared_norm) / tf.sqrt(vec_squared_norm + 1e-8)
vec_squashed = scalar_factor * inputs
return vec_squashed
def reduce_sum(input_tensor,
axis=None,
keep_dims=False,
name=None,
reduction_indices=None):
try:
return tf.reduce_sum(input_tensor,
axis=axis,
keep_dims=keep_dims,
name=name,
reduction_indices=reduction_indices)
except TypeError:
return tf.reduce_sum(input_tensor,
axis=axis,
keepdims=keep_dims,
name=name)
def combined_dnn_input(sparse_embedding_list, dense_value_list):
if len(sparse_embedding_list) > 0 and len(dense_value_list) > 0:
sparse_dnn_input = Flatten()(concat_func(sparse_embedding_list))
dense_dnn_input = Flatten()(concat_func(dense_value_list))
return concat_func([sparse_dnn_input, dense_dnn_input])
elif len(sparse_embedding_list) > 0:
return Flatten()(concat_func(sparse_embedding_list))
elif len(dense_value_list) > 0:
return Flatten()(concat_func(dense_value_list))
else:
raise NotImplementedError("dnn_feature_columns can not be empty list")

View File

@@ -1 +1,18 @@
# TODO 将召回排序常用的指标汇总
# TODO 将召回排序常用的指标汇总
# 依次评估召回的前10, 20, 30, 40, 50个文章中的击中率
def H_Rate(user_recall_items_dict, trn_last_click_df, topk=100):
last_click_item_dict = dict(zip(trn_last_click_df['user_id'], trn_last_click_df['article_id']))
user_num = len(user_recall_items_dict)
for k in range(50, topk + 1, 50):
hit_num = 0
for user, item_list in user_recall_items_dict.items():
if user in last_click_item_dict:
# 获取前k个召回的结果
tmp_recall_items = [x[0] for x in user_recall_items_dict[user][:k]]
if last_click_item_dict[user] in set(tmp_recall_items):
hit_num += 1
hit_rate = round(hit_num * 1.0 / user_num, 5)
print(' topk: ', k, ' : ', 'hit_num: ', hit_num, 'hit_rate: ', hit_rate, 'user_num : ', user_num)

View File

@@ -0,0 +1,118 @@
import tensorflow as tf
from tensorflow.python.keras import backend as K
from tensorflow.keras.layers import *
from tensorflow.keras.models import *
from tensorflow.python.keras.models import Model
# 不用deepctr的feature_column 在filter选择的时候怎么选择不出东西
from deepctr.feature_column import SparseFeat, VarLenSparseFeat, DenseFeat
from layers.utils import build_input_layers, build_embedding_layers, embedding_lookup, combined_dnn_input, NoMask, get_item_embedding
from layers.interaction import LabelAwareAttention, CapsuleLayer
from layers.core import SampledSoftmaxLayer, PoolingLayer, EmbeddingIndex, DNN
def tile_user_otherfeat(user_other_feature, k_max):
return tf.tile(tf.expand_dims(user_other_feature, -2), [1, k_max, 1])
def MIND(user_feature_columns, item_feature_columns, num_sampled=5, k_max=2, p=1.0, dynamic_k=False, user_dnn_hidden_units=(64, 32),
dnn_activation='relu', dnn_use_bn=False, l2_reg_dnn=0, l2_reg_embedding=1e-6, dnn_dropout=0, output_activation='linear', seed=1024):
"""
:param k_max: 用户兴趣胶囊的最大个数
"""
# 目前这里只支持item_feature_columns为1的情况即只能转入item_id
if len(item_feature_columns) > 1:
raise ValueError("Now MIND only support 1 item feature like item_id")
# 获取item相关的配置参数
item_feature_column = item_feature_columns[0]
item_feature_name = item_feature_column.name
item_vocabulary_size = item_feature_column.vocabulary_size
item_embedding_dim = item_feature_column.embedding_dim
behavior_feature_list = [item_feature_name]
# 为用户特征创建Input层
user_input_layer_dict = build_input_layers(user_feature_columns)
item_input_layer_dict = build_input_layers(item_feature_columns)
# 将Input层转化成列表的形式作为model的输入
user_input_layers = list(user_input_layer_dict.values())
item_input_layers = list(item_input_layer_dict.values())
# 筛选出特征中的sparse特征和dense特征方便单独处理
sparse_feature_columns = list \
(filter(lambda x: isinstance(x, SparseFeat), user_feature_columns)) if user_feature_columns else []
dense_feature_columns = list \
(filter(lambda x: isinstance(x, DenseFeat), user_feature_columns)) if user_feature_columns else []
varlen_feature_columns = list \
(filter(lambda x: isinstance(x, VarLenSparseFeat), user_feature_columns)) if user_feature_columns else []
# 由于这个变长序列里面只有历史点击文章没有类别啥的所以这里直接可以用varlen_feature_columns
# deepctr这里单独把点击文章这个放到了history_feature_columns
seq_max_len = varlen_feature_columns[0].maxlen
# 构建embedding字典
embedding_layer_dict = build_embedding_layers(user_feature_columns +item_feature_columns)
# 获取当前的行为特征(doc)的embedding这里面可能又多个类别特征所以需要pooling下
query_embed_list = embedding_lookup(behavior_feature_list, item_input_layer_dict, embedding_layer_dict) # 长度为1
# 获取行为序列(doc_id序列, hist_doc_id) 对应的embedding这里有可能有多个行为产生了行为序列所以需要使用列表将其放在一起
keys_embed_list = embedding_lookup([varlen_feature_columns[0].name], user_input_layer_dict, embedding_layer_dict) # 长度为1
# 用户离散特征的输入层与embedding层拼接
dnn_input_emb_list = embedding_lookup([col.name for col in sparse_feature_columns], user_input_layer_dict, embedding_layer_dict)
# 获取dense
dnn_dense_input = []
for fc in dense_feature_columns:
if fc.name != 'hist_len': # 连续特征不要这个
dnn_dense_input.append(user_input_layer_dict[fc.name])
# 把keys_emb_list和query_emb_listpooling操作 这是因为可能每个商品不仅有id还可能用类别品牌等多个embedding向量这种需要pooling成一个
history_emb = PoolingLayer()(NoMask()(keys_embed_list)) # (None, 50, 8)
target_emb = PoolingLayer()(NoMask()(query_embed_list)) # (None, 1, 8)
hist_len = user_input_layer_dict['hist_len']
# 胶囊网络
# (None, 2, 8) 得到了两个兴趣胶囊
high_capsule = CapsuleLayer(input_units=item_embedding_dim, out_units=item_embedding_dim,
max_len=seq_max_len, k_max=k_max)((history_emb, hist_len)) # 【5, 7, 8, 9】
# 把用户的其他特征拼接到胶囊网络上来
if len(dnn_input_emb_list) > 0 or len(dnn_dense_input) > 0:
user_other_feature = combined_dnn_input(dnn_input_emb_list, dnn_dense_input)
# (None, 2, 32) 这里会发现其他的用户特征是每个胶囊复制了一份,然后拼接起来
other_feature_tile = tf.keras.layers.Lambda(tile_user_otherfeat, arguments={'k_max': k_max})(user_other_feature)
user_deep_input = Concatenate()([NoMask()(other_feature_tile), high_capsule]) # (None, 2, 40)
else:
user_deep_input = high_capsule
# 接下来过一个DNN层获取最终的用户表示向量 如果是三维输入, 那么最后一个维度与w相乘所以这里如果不自己写可以用Dense层的列表也可以
user_embeddings = DNN(user_dnn_hidden_units, dnn_activation)(user_deep_input) # (None, 2, 8)
# 接下来过Label-aware layer
if dynamic_k:
user_embedding_final = LabelAwareAttention(k_max=k_max, pow_p=p)((user_embeddings, target_emb, hist_len))
else:
user_embedding_final = LabelAwareAttention(k_max=k_max, pow_p=p)((user_embeddings, target_emb))
# 接下来
item_embedding_matrix = embedding_layer_dict[item_feature_name] # 获取doc_id的embedding层
item_index = EmbeddingIndex(list(range(item_vocabulary_size))) \
(item_input_layer_dict[item_feature_name]) # 所有doc_id的索引
item_embedding_weight = NoMask()(item_embedding_matrix(item_index)) # 拿到所有item的embedding
pooling_item_embedding_weight = PoolingLayer() \
([item_embedding_weight]) # 这里依然是当可能不止item_id或许还有brand_id, cat_id等需要池化
# 这里传入的是整个doc_id的embedding user_embedding, 以及用户点击的doc_id然后去进行负采样计算损失操作
output = SampledSoftmaxLayer(num_sampled) \
([pooling_item_embedding_weight, user_embedding_final, item_input_layer_dict[item_feature_name]])
model = Model(inputs=user_input_layers + item_input_layers, outputs=output)
# 下面是等模型训练完了之后获取用户和item的embedding
model.__setattr__("user_input", user_input_layers)
model.__setattr__("user_embedding", user_embeddings)
model.__setattr__("item_input", item_input_layers)
model.__setattr__("item_embedding", get_item_embedding(pooling_item_embedding_weight, item_input_layer_dict[item_feature_name]))
return model

View File

@@ -0,0 +1,146 @@
from collections import OrderedDict
import tensorflow as tf
from tensorflow.python.keras import backend as K
from tensorflow.keras.layers import *
from tensorflow.keras.models import *
from tensorflow.python.keras.models import Model
# 不用deepctr的feature_column 在filter选择的时候怎么选择不出东西
from deepctr.feature_column import SparseFeat, VarLenSparseFeat, DenseFeat
from layers.utils import build_input_layers, build_embedding_layers, embedding_lookup, concat_func, NoMask, get_item_embedding
from layers.interaction import MultiHeadAttention, UserAttention, AttentionSequencePoolingLayer
from layers.core import SampledSoftmaxLayer, PoolingLayer, EmbeddingIndex
from layers.sequence import DynamicMultiRNN
if tf.__version__ >= '2.0.0':
tf.compat.v1.disable_eager_execution()
def SDM(user_feature_columns, item_feature_columns, history_feature_list, num_sampled=5, units=32, rnn_layers=2,
dropout_rate=0.2, rnn_num_res=1, num_head=4, l2_reg_embedding=1e-6, dnn_activation='tanh', seed=1024):
"""
:param rnn_num_res: rnn的残差层个数
:param history_feature_list: short和long sequence field
"""
# item_feature目前只支持doc_id 再加别的就不行了,其实这里可以改造下
if (len(item_feature_columns)) > 1:
raise ValueError("SDM only support 1 item feature like doc_id")
# 获取item_feature的一些属性
item_feature_column = item_feature_columns[0]
item_feature_name = item_feature_column.name
item_vocabulary_size = item_feature_column.vocabulary_size
# 为用户特征创建Input层
user_input_layer_dict = build_input_layers(user_feature_columns)
item_input_layer_dict = build_input_layers(item_feature_columns)
# 将Input层转化成列表的形式作为model的输入
user_input_layers = list(user_input_layer_dict.values())
item_input_layers = list(item_input_layer_dict.values())
# 筛选出特征中的sparse特征和dense特征方便单独处理
sparse_feature_columns = list \
(filter(lambda x: isinstance(x, SparseFeat), user_feature_columns)) if user_feature_columns else []
dense_feature_columns = list \
(filter(lambda x: isinstance(x, DenseFeat), user_feature_columns)) if user_feature_columns else []
if len(dense_feature_columns) != 0:
raise ValueError("SDM dont support dense feature") # 目前不支持Dense feature
varlen_feature_columns = list \
(filter(lambda x: isinstance(x, VarLenSparseFeat), user_feature_columns)) if user_feature_columns else []
# 构建embedding字典
embedding_layer_dict = build_embedding_layers(user_feature_columns + item_feature_columns)
# 拿到短期会话和长期会话列 之前的命名规则在这里起作用
sparse_varlen_feature_columns = []
prefer_history_columns = []
short_history_columns = []
prefer_fc_names = list(map(lambda x: "prefer_" + x, history_feature_list))
short_fc_names = list(map(lambda x: "short_" + x, history_feature_list))
for fc in varlen_feature_columns:
if fc.name in prefer_fc_names:
prefer_history_columns.append(fc)
elif fc.name in short_fc_names:
short_history_columns.append(fc)
else:
sparse_varlen_feature_columns.append(fc)
# 获取用户的长期行为序列列表 L^u
# [<tf.Tensor 'emb_prefer_doc_id_2/Identity:0' shape=(None, 50, 32) dtype=float32>, <tf.Tensor 'emb_prefer_cat1_2/Identity:0' shape=(None, 50, 32) dtype=float32>, <tf.Tensor 'emb_prefer_cat2_2/Identity:0' shape=(None, 50, 32) dtype=float32>]
prefer_emb_list = embedding_lookup(prefer_fc_names, user_input_layer_dict, embedding_layer_dict)
# 获取用户的短期序列列表 S^u
# [<tf.Tensor 'emb_short_doc_id_2/Identity:0' shape=(None, 5, 32) dtype=float32>, <tf.Tensor 'emb_short_cat1_2/Identity:0' shape=(None, 5, 32) dtype=float32>, <tf.Tensor 'emb_short_cat2_2/Identity:0' shape=(None, 5, 32) dtype=float32>]
short_emb_list = embedding_lookup(short_fc_names, user_input_layer_dict, embedding_layer_dict)
# 用户离散特征的输入层与embedding层拼接 e^u
user_emb_list = embedding_lookup([col.name for col in sparse_feature_columns], user_input_layer_dict, embedding_layer_dict)
user_emb = concat_func(user_emb_list)
user_emb_output = Dense(units, activation=dnn_activation, name='user_emb_output')(user_emb) # (None, 1, 32)
# 长期序列行为编码
# 过AttentionSequencePoolingLayer --> Concat --> DNN
prefer_sess_length = user_input_layer_dict['prefer_sess_length']
prefer_att_outputs = []
# 遍历长期行为序列
for i, prefer_emb in enumerate(prefer_emb_list):
prefer_attention_output = AttentionSequencePoolingLayer(dropout_rate=0) \
([user_emb_output, prefer_emb, prefer_sess_length])
prefer_att_outputs.append(prefer_attention_output)
prefer_att_concat = concat_func \
(prefer_att_outputs) # (None, 1, 32) <== Concat(item_embeddingcat1_embedding,cat2_embedding)
prefer_output = Dense(units, activation=dnn_activation, name='prefer_output')(prefer_att_concat)
# print(prefer_output.shape) # (None, 1, 32)
# 短期行为序列编码
short_sess_length = user_input_layer_dict['short_sess_length']
short_emb_concat = concat_func(short_emb_list) # (None, 5, 64) 这里注意下, 对于短期序列描述item的side info信息进行了拼接
short_emb_input = Dense(units, activation=dnn_activation, name='short_emb_input')(short_emb_concat) # (None, 5, 32)
# 过rnn 这里的return_sequence=True 每个时间步都需要输出h
short_rnn_output = DynamicMultiRNN(num_units=units, return_sequence=True, num_layers=rnn_layers,
num_residual_layers=rnn_num_res,
dropout_rate=dropout_rate)([short_emb_input, short_sess_length])
# print(short_rnn_output) # (None, 5, 32)
# 过MultiHeadAttention # (None, 5, 32)
short_att_output = MultiHeadAttention(num_units=units, head_num=num_head, dropout_rate=dropout_rate) \
([short_rnn_output, short_sess_length]) # (None, 5, 64)
# user_attention # (None, 1, 32)
short_output = UserAttention(num_units=units, activation=dnn_activation, use_res=True, dropout_rate=dropout_rate) \
([user_emb_output, short_att_output, short_sess_length])
# 门控融合
gated_input = concat_func([prefer_output, short_output, user_emb_output])
gate = Dense(units, activation='sigmoid')(gated_input) # (None, 1, 32)
# temp = tf.multiply(gate, short_output) + tf.multiply(1-gate, prefer_output) 感觉这俩一样?
gated_output = Lambda(lambda x: tf.multiply(x[0], x[1]) + tf.multiply( 1 -x[0], x[2])) \
([gate, short_output, prefer_output]) # [None, 1,32]
gated_output_reshape = Lambda(lambda x: tf.squeeze(x, 1)) \
(gated_output) # (None, 32) 这个维度必须要和docembedding层的维度一样否则后面没法sortmax_loss
# 接下来
item_embedding_matrix = embedding_layer_dict[item_feature_name] # 获取doc_id的embedding层
item_index = EmbeddingIndex(list(range(item_vocabulary_size))) \
(item_input_layer_dict[item_feature_name]) # 所有doc_id的索引
item_embedding_weight = NoMask()(item_embedding_matrix(item_index)) # 拿到所有item的embedding
pooling_item_embedding_weight = PoolingLayer() \
([item_embedding_weight]) # 这里依然是当可能不止item_id或许还有brand_id, cat_id等需要池化
# 这里传入的是整个doc_id的embedding user_embedding, 以及用户点击的doc_id然后去进行负采样计算损失操作
output = SampledSoftmaxLayer(num_sampled) \
([pooling_item_embedding_weight, gated_output_reshape, item_input_layer_dict[item_feature_name]])
model = Model(inputs=user_input_layers +item_input_layers, outputs=output)
# 下面是等模型训练完了之后获取用户和item的embedding
model.__setattr__("user_input", user_input_layers)
model.__setattr__("user_embedding", gated_output_reshape) # 用户embedding是取得门控融合的用户向量
model.__setattr__("item_input", item_input_layers)
# item_embedding取得pooling_item_embedding_weight, 这个会发现是负采样操作训练的那个embedding矩阵
model.__setattr__("item_embedding", get_item_embedding(pooling_item_embedding_weight, item_input_layer_dict[item_feature_name]))
return model

View File

@@ -0,0 +1,121 @@
import os
import numpy as np
import pickle
from sklearn.preprocessing import LabelEncoder
# 从utils里面导入函数
from examples.preprocess import gen_data_set, gen_model_input, gen_sdm_model_input, gen_sdm_data_set
from examples.utils import train_mind_model, get_embeddings, get_recall_res, train_sdm_model
def sdm_recall(data, topk=200, embedding_dim=32, SEQ_LEN_short=5, SEQ_LEN_prefer=50,
batch_size=64, epochs=1, verbose=1, validation_split=0.0):
"""通过SDM模型计算用户向量和文章向量
param: data: 用户日志数据
topk: 对于每个用户,召回多少篇文章
"""
user_id_raw = data[['user_id']].drop_duplicates('user_id')
doc_id_raw = data[['article_id']].drop_duplicates('article_id')
# 类别数据编码
base_features = ['user_id', 'article_id', 'city', 'age', 'gender', 'cat_1', 'cat_2']
feature_max_idx = {}
for f in base_features:
lbe = LabelEncoder()
data[f] = lbe.fit_transform(data[f]) + 1
feature_max_idx[f] = data[f].max() + 1
# 构建用户id词典和doc的id词典方便从用户idx找到原始的id
user_id_enc = data[['user_id']].drop_duplicates('user_id')
doc_id_enc = data[['article_id']].drop_duplicates('article_id')
user_idx_2_rawid = dict(zip(user_id_enc['user_id'], user_id_raw['user_id']))
doc_idx_2_rawid = dict(zip(doc_id_enc['article_id'], doc_id_raw['article_id']))
user_profile = data[['user_id', 'gender', 'age', 'city']].drop_duplicates('user_id')
item_profile = data[['article_id']].drop_duplicates('article_id')
user_profile.set_index('user_id', inplace=True)
train_set, test_set = gen_sdm_data_set(data, seq_short_len=SEQ_LEN_short, seq_prefer_len=SEQ_LEN_prefer)
train_model_input, train_label = gen_sdm_model_input(train_set, user_profile, SEQ_LEN_short, SEQ_LEN_prefer)
test_model_input, test_label = gen_sdm_model_input(test_set, user_profile, SEQ_LEN_short, SEQ_LEN_prefer)
# 构建模型并完成训练
model = train_sdm_model(train_model_input, train_label, embedding_dim, feature_max_idx, SEQ_LEN_short,
SEQ_LEN_prefer, batch_size, epochs, verbose, validation_split)
# 获得用户embedding和doc的embedding 并进行保存
user_embs, doc_embs = get_embeddings(model, test_model_input, user_idx_2_rawid, doc_idx_2_rawid)
# 对每个用户,拿到召回结果并返回回来
user_recall_items_dict = get_recall_res(user_embs, doc_embs, user_idx_2_rawid, doc_idx_2_rawid, topk)
return user_recall_items_dict
def mind_recall(data, topk=200, embedding_dim=8, his_seq_maxlen=50, negsample=0,
batch_size=64, epochs=1, verbose=1, validation_split=0.0):
"""通过MIND模型计算用户向量和文章向量
param: data: 用户日志数据
topk: 对于每个用户,召回多少篇文章
"""
user_id_raw = data[['user_id']].drop_duplicates('user_id')
doc_id_raw = data[['article_id']].drop_duplicates('article_id')
# 类别数据编码
base_features = ['user_id', 'article_id', 'city', 'age', 'gender']
feature_max_idx = {}
for f in base_features:
lbe = LabelEncoder()
data[f] = lbe.fit_transform(data[f])
feature_max_idx[f] = data[f].max() + 1
# 构建用户id词典和doc的id词典方便从用户idx找到原始的id
user_id_enc = data[['user_id']].drop_duplicates('user_id')
doc_id_enc = data[['article_id']].drop_duplicates('article_id')
user_idx_2_rawid = dict(zip(user_id_enc['user_id'], user_id_raw['user_id']))
doc_idx_2_rawid = dict(zip(doc_id_enc['article_id'], doc_id_raw['article_id']))
# 保存下每篇文章的被点击数量, 方便后面高热文章的打压
doc_clicked_count_df = data.groupby('article_id')['click'].apply(lambda x: x.count()).reset_index()
doc_clicked_count_dict = dict(zip(doc_clicked_count_df['article_id'], doc_clicked_count_df['click']))
if os.path.exists('data/train_model_input.pkl'):
train_model_input = pickle.load(open('data/train_model_input.pkl', 'rb'))
train_label = np.load('data/train_label.npy')
test_model_input = pickle.load(open('data/test_model_input.pkl', 'rb'))
test_label = np.load('data/test_label.npy')
else:
train_set, test_set = gen_data_set(data, doc_clicked_count_dict, negsample, control_users=False)
# 构造MIND模型的输入
train_model_input, train_label = gen_model_input(train_set, his_seq_maxlen)
test_model_input, test_label = gen_model_input(test_set, his_seq_maxlen)
# 保存一份输入直接,要不然每次都得这么构造输入太慢了
pickle.dump(train_model_input, open('data/train_model_input.pkl', 'wb'))
pickle.dump(test_model_input, open('data/test_model_input.pkl', 'wb'))
np.save('data/train_label.npy', train_label)
np.save('data/test_label.npy', test_label)
# 构建模型并完成训练
model = train_mind_model(train_model_input, train_label, embedding_dim, feature_max_idx, his_seq_maxlen, batch_size,
epochs, verbose, validation_split)
# 获得用户embedding和doc的embedding 并进行保存
user_embs, doc_embs = get_embeddings(model, test_model_input, user_idx_2_rawid, doc_idx_2_rawid)
# MIND模型这里有k个兴趣向量所以要分开进行召回
user_embs1 = user_embs[:, 0, :]
user_embs2 = user_embs[:, 1, :]
# 对每个用户,拿到召回结果并返回回来
user_recall_items_dict1 = get_recall_res(user_embs1, doc_embs, user_idx_2_rawid, doc_idx_2_rawid, topk)
user_recall_items_dict2 = get_recall_res(user_embs2, doc_embs, user_idx_2_rawid, doc_idx_2_rawid, topk)
# 合并,当然我这里没有去重
# user_recall_items_dict = defaultdict(list)
# for user in user_recall_items_dict1:
# user_recall_items_dict[user] = user_recall_items_dict1[user] + user_recall_items_dict2[user]
return user_recall_items_dict1, user_recall_items_dict2