From 2ea75605ab8a78f9e3dc7e50b0b89a28779c4c1b Mon Sep 17 00:00:00 2001 From: zhongqiangwu960812 <17862328070@163.com> Date: Sun, 3 Apr 2022 13:35:30 +0800 Subject: [PATCH] upload sdm_mind demo --- codes/funrec/examples/preprocess.py | 239 +++++++++++++++++++++++- codes/funrec/examples/run_mind.py | 31 ++++ codes/funrec/examples/run_sdm.py | 38 ++++ codes/funrec/examples/utils.py | 151 ++++++++++++++++ codes/funrec/layers/core.py | 94 +++++++++- codes/funrec/layers/interaction.py | 260 ++++++++++++++++++++++++++- codes/funrec/layers/sequence.py | 91 +++++++++- codes/funrec/layers/utils.py | 107 ++++++++++- codes/funrec/metrics.py | 19 +- codes/funrec/models/matching/MIND.py | 118 ++++++++++++ codes/funrec/models/matching/SDM.py | 146 +++++++++++++++ codes/funrec/recall_methods.py | 121 +++++++++++++ 12 files changed, 1402 insertions(+), 13 deletions(-) create mode 100644 codes/funrec/examples/run_mind.py create mode 100644 codes/funrec/examples/run_sdm.py create mode 100644 codes/funrec/examples/utils.py create mode 100644 codes/funrec/models/matching/MIND.py create mode 100644 codes/funrec/models/matching/SDM.py create mode 100644 codes/funrec/recall_methods.py diff --git a/codes/funrec/examples/preprocess.py b/codes/funrec/examples/preprocess.py index 6d74c4bf..679e4359 100644 --- a/codes/funrec/examples/preprocess.py +++ b/codes/funrec/examples/preprocess.py @@ -1,14 +1,14 @@ from random import sample, seed import sys sys.path.append("..") -import os -import time +import random import numpy as np import pandas as pd from datetime import date, datetime from sklearn.preprocessing import LabelEncoder -from features import DenseFeat, SparseFeat, VarLenSparseFeat - +from tqdm import tqdm +import tensorflow as tf +from tensorflow.python.keras.preprocessing.sequence import pad_sequences def process_data(sample_num=5000000): train_data_path = "./data/train" @@ -59,3 +59,234 @@ def process_data(sample_num=5000000): train_label = np.array(train_df['click']) train_df.pop('click') return feature_max_index_dict, train_input_dict, train_label + + +def gen_sdm_data_set(click_data, seq_short_len=5, seq_prefer_len=50): + """ + :param: seq_short_len: 短期会话的长度 + :param: seq_prefer_len: 会话的最长长度 + """ + click_data.sort_values("expo_time", inplace=True) + + train_set, test_set = [], [] + for user_id, hist_click in tqdm(click_data.groupby('user_id')): + pos_list = hist_click['article_id'].tolist() + cat1_list = hist_click['cat_1'].tolist() + cat2_list = hist_click['cat_2'].tolist() + + # 滑动窗口切分数据 + for i in range(1, len(pos_list)): + hist = pos_list[:i] + cat1_hist = cat1_list[:i] + cat2_hist = cat2_list[:i] + # 序列长度只够短期的 + if i <= seq_short_len and i != len(pos_list) - 1: + train_set.append(( + # 用户id, 用户短期历史行为序列, 用户长期历史行为序列, 当前行为文章, label, + user_id, hist[::-1], [0] * seq_prefer_len, pos_list[i], 1, + # 用户短期历史序列长度, 用户长期历史序列长度, + len(hist[::-1]), 0, + # 用户短期历史序列对应类别1, 用户长期历史行为序列对应类别1 + cat1_hist[::-1], [0] * seq_prefer_len, + # 历史短期历史序列对应类别2, 用户长期历史行为序列对应类别2 + cat2_hist[::-1], [0] * seq_prefer_len + )) + # 序列长度够长期的 + elif i != len(pos_list) - 1: + train_set.append(( + # 用户id, 用户短期历史行为序列,用户长期历史行为序列, 当前行为文章, label + user_id, hist[::-1][:seq_short_len], hist[::-1][seq_short_len:], pos_list[i], 1, + # 用户短期行为序列长度,用户长期行为序列长度, + seq_short_len, len(hist[::-1]) - seq_short_len, + # 用户短期历史行为序列对应类别1, 用户长期历史行为序列对应类别1 + cat1_hist[::-1][:seq_short_len], cat1_hist[::-1][seq_short_len:], + # 用户短期历史行为序列对应类别2, 用户长期历史行为序列对应类别2 + cat2_hist[::-1][:seq_short_len], cat2_hist[::-1][seq_short_len:] + )) + # 测试集保留最长的那一条 + elif i <= seq_short_len and i == len(pos_list) - 1: + test_set.append(( + user_id, hist[::-1], [0] * seq_prefer_len, pos_list[i], 1, + len(hist[::-1]), 0, + cat1_hist[::-1], [0] * seq_perfer_len, + cat2_hist[::-1], [0] * seq_prefer_len + )) + else: + test_set.append(( + user_id, hist[::-1][:seq_short_len], hist[::-1][seq_short_len:], pos_list[i], 1, + seq_short_len, len(hist[::-1]) - seq_short_len, + cat1_hist[::-1][:seq_short_len], cat1_hist[::-1][seq_short_len:], + cat2_list[::-1][:seq_short_len], cat2_hist[::-1][seq_short_len:] + )) + + random.shuffle(train_set) + random.shuffle(test_set) + + return train_set, test_set + + +def gen_sdm_model_input(train_set, user_profile, seq_short_len, seq_prefer_len): + """构造模型输入""" + # row: [user_id, short_train_seq, perfer_train_seq, item_id, label, short_len, perfer_len, cat_1_short, cat_1_perfer, cat_2_short, cat_2_prefer] + train_uid = np.array([row[0] for row in train_set]) + short_train_seq = [row[1] for row in train_set] + prefer_train_seq = [row[2] for row in train_set] + train_iid = np.array([row[3] for row in train_set]) + train_label = np.array([row[4] for row in train_set]) + train_short_len = np.array([row[5] for row in train_set]) + train_prefer_len = np.array([row[6] for row in train_set]) + short_train_seq_cat1 = np.array([row[7] for row in train_set]) + prefer_train_seq_cat1 = np.array([row[8] for row in train_set]) + short_train_seq_cat2 = np.array([row[9] for row in train_set]) + prefer_train_seq_cat2 = np.array([row[10] for row in train_set]) + + # padding操作 + train_short_item_pad = pad_sequences(short_train_seq, maxlen=seq_short_len, padding='post', truncating='post', + value=0) + train_prefer_item_pad = pad_sequences(prefer_train_seq, maxlen=seq_prefer_len, padding='post', truncating='post', + value=0) + train_short_cat1_pad = pad_sequences(short_train_seq_cat1, maxlen=seq_short_len, padding='post', truncating='post', + value=0) + train_prefer_cat1_pad = pad_sequences(prefer_train_seq_cat1, maxlen=seq_prefer_len, padding='post', + truncating='post', value=0) + train_short_cat2_pad = pad_sequences(short_train_seq_cat2, maxlen=seq_short_len, padding='post', truncating='post', + value=0) + train_prefer_cat2_pad = pad_sequences(prefer_train_seq_cat2, maxlen=seq_prefer_len, padding='post', + truncating='post', value=0) + + # 形成输入词典 + train_model_input = { + "user_id": train_uid, + "doc_id": train_iid, + "short_doc_id": train_short_item_pad, + "prefer_doc_id": train_prefer_item_pad, + "prefer_sess_length": train_prefer_len, + "short_sess_length": train_short_len, + "short_cat1": train_short_cat1_pad, + "prefer_cat1": train_prefer_cat1_pad, + "short_cat2": train_short_cat2_pad, + "prefer_cat2": train_prefer_cat2_pad + } + + # 其他的用户特征加入 + for key in ["gender", "age", "city"]: + train_model_input[key] = user_profile.loc[train_model_input['user_id']][key].values + + return train_model_input, train_label + +def gen_neg_sample_candiate(pos_list, item_ids, doc_clicked_count_dict, negsample, methods='multinomial'): + # w2v的负样本采样, 增加高热item成为负样本的概率 + # 参考https://blog.csdn.net/weixin_42299244/article/details/112734531, 这里用tf相应函数替换 + # tf.random.categoral 非相关数据的降采样 + if methods == 'multinomial': + # input表示每个item出现的次数 + items, item_counts = [], [] + # 用户未点击的item 这个遍历效率很低 后面看看能不能优化下 + # for item_id in list(set(item_ids)-set(pos_list)): + # items.append(item_id) + # item_counts.append(doc_clicked_count_dict[item_id]*1.0) + + items = list(doc_clicked_count_dict.keys()) + item_counts = list(doc_clicked_count_dict.values()) + item_freqs = np.array(item_counts) / np.sum(item_counts) + item_freqs = item_freqs ** 0.75 + item_freqs = item_freqs / np.sum(item_freqs) + neg_item_index = tf.random.categorical(item_freqs.reshape(1, -1), len(pos_list) * negsample) + neg_list = np.array([items[i] for i in neg_item_index.numpy().tolist()[0] if items[i] not in pos_list]) + random.shuffle(neg_list) + # 曝光样本随机采 + else: + candidate_set = list(set(item_ids) - set(pos_list)) # 热度采样 + neg_list = np.random.choice(candidate_set, size=len(pos_list) * negsample, replace=True) # 对于每个正样本,选择n个负样本 + + return neg_list + +def gen_data_set(click_data, doc_clicked_count_dict, negsample, control_users=False): + """构造youtubeDNN的数据集""" + # 按照曝光时间排序 + click_data.sort_values("expo_time", inplace=True) + item_ids = click_data['article_id'].unique() + + train_set, test_set = [], [] + for user_id, hist_click in tqdm(click_data.groupby('user_id')): + # 这里按照expo_date分开,每一天用滑动窗口滑,可能相关性更高些,另外,这样序列不会太长,因为eda发现有点击1111个的 + # for expo_date, hist_click in hist_date_click.groupby('expo_date'): + # 用户当天的点击历史id + pos_list = hist_click['article_id'].tolist() + user_control_flag = True + + if control_users: + user_samples_cou = 0 + + # 过长的序列截断 + if len(pos_list) > 50: + pos_list = pos_list[-50:] + + if negsample > 0: + neg_list = gen_neg_sample_candiate(pos_list, item_ids, doc_clicked_count_dict, negsample, + methods='random') + + # 只有1个的也截断 去掉,当然我之前做了处理,这里没有这种情况了 + if len(pos_list) < 2: + continue + else: + # 序列至少是2 + for i in range(1, len(pos_list)): + hist = pos_list[:i] + # 这里采用打压热门item策略,降低高展item成为正样本的概率 + #freq_i = doc_clicked_count_dict[pos_list[i]] / (np.sum(list(doc_clicked_count_dict.values()))) + #p_posi = (np.sqrt(freq_i / 0.001) + 1) * (0.001 / freq_i) + + # p_posi=0.3 表示该item_i成为正样本的概率是0.3, + if user_control_flag and i != len(pos_list) - 1: + #if random.random() > (1 - p_posi): + if True: + row = [user_id, hist[::-1], pos_list[i], hist_click.iloc[0]['city'], hist_click.iloc[0]['age'], + hist_click.iloc[0]['gender'], 1, len(hist[::-1])] + train_set.append(row) + + for negi in range(negsample): + row = [user_id, hist[::-1], neg_list[i * negsample + negi], hist_click.iloc[0]['city'], + hist_click.iloc[0]['age'], hist_click.iloc[0]['gender'], 0, len(hist[::-1])] + train_set.append(row) + + if control_users: + user_samples_cou += 1 + # 每个用户序列最长是50, 即每个用户正样本个数最多是50个, 如果每个用户训练样本数量到了30个,训练集不能加这个用户了 + if user_samples_cou > 30: + user_samples_cou = False + + # 整个序列加入到test_set, 注意,这里一定每个用户只有一个最长序列,相当于测试集数目等于用户个数 + elif i == len(pos_list) - 1: + row = [user_id, hist[::-1], pos_list[i], hist_click.iloc[0]['city'], hist_click.iloc[0]['age'], + hist_click.iloc[0]['gender'], 0, len(hist[::-1])] + test_set.append(row) + + random.shuffle(train_set) + random.shuffle(test_set) + return train_set, test_set + +"""构造模型输入""" +def gen_model_input(train_set, his_seq_max_len): + """构造模型的输入""" + # row: [user_id, hist_list, cur_doc_id, city, age, gender, label, hist_len] + train_uid = np.array([row[0] for row in train_set]) + train_hist_seq = [row[1] for row in train_set] + train_iid = np.array([row[2] for row in train_set]) + train_u_city = np.array([row[3] for row in train_set]) + train_u_age = np.array([row[4] for row in train_set]) + train_u_gender = np.array([row[5] for row in train_set]) + train_label = np.array([row[6] for row in train_set]) + train_hist_len = np.array([row[7] for row in train_set]) + + train_seq_pad = pad_sequences(train_hist_seq, maxlen=his_seq_max_len, padding='post', truncating='post', value=0) + train_model_input = { + "user_id": train_uid, + "doc_id": train_iid, + "hist_doc_id": train_seq_pad, + "hist_len": train_hist_len, + "u_city": train_u_city, + "u_age": train_u_age, + "u_gender": train_u_gender, + } + return train_model_input, train_label \ No newline at end of file diff --git a/codes/funrec/examples/run_mind.py b/codes/funrec/examples/run_mind.py new file mode 100644 index 00000000..889b630e --- /dev/null +++ b/codes/funrec/examples/run_mind.py @@ -0,0 +1,31 @@ +import os +# 切换目录 +BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +os.chdir(BASE_DIR) +print(os.getcwd()) + +import pandas as pd +from examples.utils import get_hist_and_last_click +from recall_methods import mind_recall +from metrics import H_Rate +import warnings +warnings.filterwarnings('ignore') + +if __name__ == "__main__": + data_path = 'data' + data = pd.read_csv(os.path.join(data_path, 'train_data.csv'), index_col=0, parse_dates=['expo_time']) + + # 选择出需要用到的列 + use_cols = ['user_id', 'article_id', 'expo_time', 'net_status', 'exop_position', 'duration', 'device', 'city', + 'age', 'gender', 'click'] + data_new = data[use_cols] + + # 划分训练集和测试集 + click_df = data_new[data_new['click'] == 1] + user_click_hist_df, user_click_last_df = get_hist_and_last_click(click_df) + + # mind 召回 + user_recall_doc_dict1, user_recall_doc_dict2 = mind_recall(user_click_hist_df) + + # 评估 + H_Rate(user_recall_doc_dict1, user_click_last_df, topk=200) \ No newline at end of file diff --git a/codes/funrec/examples/run_sdm.py b/codes/funrec/examples/run_sdm.py new file mode 100644 index 00000000..86bbb2ff --- /dev/null +++ b/codes/funrec/examples/run_sdm.py @@ -0,0 +1,38 @@ +import os +# 切换目录 +BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) +os.chdir(BASE_DIR) +print(os.getcwd()) + +import pandas as pd +from examples.utils import get_hist_and_last_click +from recall_methods import sdm_recall +from metrics import H_Rate +import warnings +warnings.filterwarnings('ignore') + +if __name__ == "__main__": + data_path = 'data' + data = pd.read_csv(os.path.join(data_path, 'train_data.csv'), index_col=0, parse_dates=['expo_time']) + + # 选择出需要用到的列 + use_cols = ['user_id', 'article_id', 'expo_time', 'net_status', 'exop_position', 'duration', 'city', 'age', + 'gender', 'click', 'cat_1', 'cat_2'] + data_new = data[use_cols] + + # 划分训练集和测试集 + click_df = data_new[data_new['click'] == 1] + user_click_hist_df, user_click_last_df = get_hist_and_last_click(click_df) + + # sdm 召回 + user_recall_doc_dict = sdm_recall(user_click_hist_df) + + # 评估 + H_Rate(user_recall_doc_dict, user_click_last_df, topk=200) + + + + + + + diff --git a/codes/funrec/examples/utils.py b/codes/funrec/examples/utils.py new file mode 100644 index 00000000..f89eaf35 --- /dev/null +++ b/codes/funrec/examples/utils.py @@ -0,0 +1,151 @@ +import pickle +import collections + +import numpy as np +from tensorflow.python.keras.models import Model + +from deepctr.feature_column import SparseFeat, VarLenSparseFeat, DenseFeat +from deepmatch.utils import sampledsoftmaxloss +from models.matching.MIND import MIND +from models.matching.SDM import SDM + +from annoy import AnnoyIndex + +def get_hist_and_last_click(all_click): + all_click = all_click.sort_values(by=['user_id', 'expo_time']) + click_last_df = all_click.groupby('user_id').tail(1) + + # 如果用户只有一个点击,hist为空了,会导致训练的时候这个用户不可见,此时默认泄露一下 + def hist_func(user_df): + if len(user_df) == 1: + return user_df + else: + return user_df[:-1] + + click_hist_df = all_click.groupby('user_id').apply(hist_func).reset_index(drop=True) + + return click_hist_df, click_last_df + +def train_sdm_model(train_model_input, train_label, embedding_dim, feature_max_idx, SEQ_LEN_short, SEQ_LEN_prefer + ,batch_size, epochs, verbose, validation_split): + """构建sdm并完成训练""" + # 建立模型 + user_feature_columns = [ + SparseFeat('user_id', feature_max_idx['user_id'], 16), + SparseFeat('gender', feature_max_idx['gender'], 16), + SparseFeat('age', feature_max_idx['age'], 16), + SparseFeat('city', feature_max_idx['city'], 16), + + VarLenSparseFeat + (SparseFeat('short_doc_id', feature_max_idx['article_id'], embedding_dim, embedding_name="doc_id"), SEQ_LEN_short, 'mean', 'short_sess_length'), + VarLenSparseFeat + (SparseFeat('prefer_doc_id', feature_max_idx['article_id'], embedding_dim, embedding_name='doc_id'), SEQ_LEN_prefer, 'mean', 'prefer_sess_length'), + VarLenSparseFeat(SparseFeat('short_cat1', feature_max_idx['cat_1'], embedding_dim, embedding_name='cat_1'), SEQ_LEN_short, 'mean', 'short_sess_length'), + VarLenSparseFeat(SparseFeat('prefer_cat1', feature_max_idx['cat_1'], embedding_dim, embedding_name='cat_1'), SEQ_LEN_prefer, 'mean', 'prefer_sess_length'), + VarLenSparseFeat(SparseFeat('short_cat2', feature_max_idx['cat_2'], embedding_dim, embedding_name='cat_2'), SEQ_LEN_short, 'mean', 'short_sess_length'), + VarLenSparseFeat(SparseFeat('prefer_cat2', feature_max_idx['cat_2'], embedding_dim, embedding_name='cat_2'), SEQ_LEN_prefer, 'mean', 'prefer_sess_length'), + ] + + item_feature_columns = [SparseFeat('doc_id', feature_max_idx['article_id'], embedding_dim)] + + # 定义模型 + model = SDM(user_feature_columns, item_feature_columns, history_feature_list=['doc_id', 'cat1', 'cat2']) + + # 模型编译 + model.compile(optimizer="adam", loss=sampledsoftmaxloss) + + # 模型训练,这里可以定义验证集的比例,如果设置为0的话就是全量数据直接进行训练 + history = model.fit(train_model_input, train_label, batch_size=batch_size, epochs=epochs, verbose=verbose, validation_split=validation_split) + + return model + +def train_mind_model(train_model_input, train_label, embedding_dim, feature_max_idx, his_seq_maxlen, batch_size, epochs, + verbose, validation_split): + """构建mind并完成训练""" + # 建立模型 + user_feature_columns = [ + SparseFeat('user_id', feature_max_idx['user_id'], embedding_dim), + VarLenSparseFeat(SparseFeat('hist_doc_id', feature_max_idx['article_id'], embedding_dim, + embedding_name="click_doc_id"), his_seq_maxlen, 'mean', 'hist_len'), + DenseFeat('hist_len', 1), + SparseFeat('u_city', feature_max_idx['city'], embedding_dim), + SparseFeat('u_age', feature_max_idx['age'], embedding_dim), + SparseFeat('u_gender', feature_max_idx['gender'], embedding_dim), + ] + doc_feature_columns = [ + SparseFeat('doc_id', feature_max_idx['article_id'], embedding_dim) + # 这里后面也可以把文章的类别画像特征加入 + ] + + # 定义模型 + model = MIND(user_feature_columns, doc_feature_columns, num_sampled=5, user_dnn_hidden_units=(64, embedding_dim)) + + # 模型编译 + model.compile(optimizer="adam", loss=sampledsoftmaxloss) + + # 模型训练,这里可以定义验证集的比例,如果设置为0的话就是全量数据直接进行训练 + history = model.fit(train_model_input, train_label, batch_size=batch_size, epochs=epochs, verbose=verbose, + validation_split=validation_split) + + return model + + +"""获取用户embedding和文章embedding""" +def get_embeddings(model, test_model_input, user_idx_2_rawid, doc_idx_2_rawid, save_path='embedding/'): + doc_model_input = {'doc_id' :np.array(list(doc_idx_2_rawid.keys()))} + + user_embedding_model = Model(inputs=model.user_input, outputs=model.user_embedding) + doc_embedding_model = Model(inputs=model.item_input, outputs=model.item_embedding) + + # 保存当前的item_embedding 和 user_embedding 排序的时候可能能够用到,但是需要注意保存的时候需要和原始的id对应 + user_embs = user_embedding_model.predict(test_model_input, batch_size=2 ** 12) + doc_embs = doc_embedding_model.predict(doc_model_input, batch_size=2 ** 12) + # embedding保存之前归一化一下 + user_embs = user_embs / np.linalg.norm(user_embs, axis=1, keepdims=True) + doc_embs = doc_embs / np.linalg.norm(doc_embs, axis=1, keepdims=True) + + # 将Embedding转换成字典的形式方便查询 + raw_user_id_emb_dict = {user_idx_2_rawid[k]: \ + v for k, v in zip(user_idx_2_rawid.keys(), user_embs)} + raw_doc_id_emb_dict = {doc_idx_2_rawid[k]: \ + v for k, v in zip(doc_idx_2_rawid.keys(), doc_embs)} + # 将Embedding保存到本地 + pickle.dump(raw_user_id_emb_dict, open(save_path + 'user_emb.pkl', 'wb')) + pickle.dump(raw_doc_id_emb_dict, open(save_path + 'doc_emb.pkl', 'wb')) + + # 读取 + # user_embs_dict = pickle.load(open('embedding/user_youtube_emb.pkl', 'rb')) + # doc_embs_dict = pickle.load(open('embedding/doc_youtube_emb.pkl', 'rb')) + return user_embs, doc_embs + +"""最近邻检索得到召回结果""" +def get_recall_res(user_embs, doc_embs, user_idx_2_rawid, doc_idx_2_rawid, topk): + """近邻检索,这里用annoy tree""" + # 把doc_embs构建成索引树 + f = user_embs.shape[1] + t = AnnoyIndex(f, 'angular') + for i, v in enumerate(doc_embs): + t.add_item(i, v) + t.build(10) + # 可以保存该索引树 t.save('annoy.ann') + + # 每个用户向量, 返回最近的TopK个item + user_recall_items_dict = collections.defaultdict(dict) + for i, u in enumerate(user_embs): + recall_doc_scores = t.get_nns_by_vector(u, topk, include_distances=True) + # recall_doc_scores是(([doc_idx], [scores])), 这里需要转成原始doc的id + raw_doc_scores = list(recall_doc_scores) + raw_doc_scores[0] = [doc_idx_2_rawid[i] for i in raw_doc_scores[0]] + # 转换成实际用户id + try: + user_recall_items_dict[user_idx_2_rawid[i]] = dict(zip(*raw_doc_scores)) + except: + continue + + # 默认是分数从小到大排的序, 这里要从大到小 + user_recall_items_dict = {k: sorted(v.items(), key=lambda x: x[1], reverse=True) for k, v in user_recall_items_dict.items()} + + # 保存一份 + pickle.dump(user_recall_items_dict, open('u2i_dict.pkl', 'wb')) + + return user_recall_items_dict \ No newline at end of file diff --git a/codes/funrec/layers/core.py b/codes/funrec/layers/core.py index e08291e2..ff72def3 100644 --- a/codes/funrec/layers/core.py +++ b/codes/funrec/layers/core.py @@ -1,7 +1,8 @@ import tensorflow as tf from tensorflow.keras.layers import Layer, Dense, Activation, Dropout, \ BatchNormalization -from tensorflow.keras.initializers import Zeros +from tensorflow.keras.layers import * +from tensorflow.python.keras.initializers import RandomNormal, Zeros, glorot_normal, TruncatedNormal, Ones class DNN(Layer): @@ -127,9 +128,98 @@ class PredictLayer(Layer): base_config = super(PredictLayer, self).get_config() return dict(list(base_config.items()) + list(config.items())) +class EmbeddingIndex(Layer): + def __init__(self, index, **kwargs): + self.index = index + super(EmbeddingIndex, self).__init__(**kwargs) + + def build(self, input_shape): + super(EmbeddingIndex, self).build(input_shape) # Be sure to call this somewhere! + + def call(self, x, **kwargs): + return tf.constant(self.index) # TODO Activation - +class SampledSoftmaxLayer(Layer): + def __init__(self, num_sampled=5, **kwargs): + self.num_sampled = num_sampled + super(SampledSoftmaxLayer, self).__init__(**kwargs) + def build(self, input_shape): + self.size = input_shape[0][0] # docs num + self.zero_bias = self.add_weight(shape=[self.size], initializer=Zeros, dtype=tf.float32, trainable=False, name='bias') + super(SampledSoftmaxLayer, self).build(input_shape) + def call(self, inputs_with_label_idx): + embeddings, inputs, label_idx = inputs_with_label_idx + # 这里盲猜下这个操作,应该是有了label_idx,就能拿到其embedding,这个是用户行为过多 + # 所以(user_embedding, embedding[label_idx])就是正样本 + # 然后根据num_sampled,再从传入的embeddings字典里面随机取num_sampled个负样本 + # 根据公式log(sigmoid(user_embedding, embedding[label_idx])) + 求和(log(sigmoid(-user_embedding, embedding[负样本])))得到损失 + loss = tf.nn.sampled_softmax_loss(weights=embeddings, # (numclass, dim) + biases=self.zero_bias, # (numclass) + labels=label_idx, # (batch, num_true) + inputs=inputs, # (batch, dim) + num_sampled=self.num_sampled, # 负采样个数 + num_classes=self.size # 类别数量 + ) + return tf.expand_dims(loss, axis=1) # TODO Normalization +class LayerNormalization(Layer): + def __init__(self, axis=-1, eps=1e-9, center=True, + scale=True, **kwargs): + self.axis = axis + self.eps = eps + self.center = center + self.scale = scale + super(LayerNormalization, self).__init__(**kwargs) + def build(self, input_shape): + self.gamma = self.add_weight(name='gamma', shape=input_shape[-1:], + initializer=Ones(), trainable=True) + self.beta = self.add_weight(name='beta', shape=input_shape[-1:], + initializer=Zeros(), trainable=True) + super(LayerNormalization, self).build(input_shape) + + def call(self, inputs): + mean = K.mean(inputs, axis=self.axis, keepdims=True) + variance = K.mean(K.square(inputs - mean), axis=-1, keepdims=True) + std = K.sqrt(variance + self.eps) + outputs = (inputs - mean) / std + if self.scale: + outputs *= self.gamma + if self.center: + outputs += self.beta + return outputs + +class PoolingLayer(Layer): + def __init__(self, mode='mean', supports_masking=False, **kwargs): + if mode not in ['sum', 'mean', 'max']: + raise ValueError("mode must be sum or mean") + self.mode = mode + self.eps = tf.constant(1e-8, tf.float32) + super(PoolingLayer, self).__init__(**kwargs) + self.supports_masking = supports_masking + + def build(self, input_shape): + super(PoolingLayer, self).build( + input_shape) # Be sure to call this somewhere! + + def call(self, seq_value_len_list, mask=None, **kwargs): + if not isinstance(seq_value_len_list, list): + seq_value_len_list = [seq_value_len_list] + if len(seq_value_len_list) == 1: + return seq_value_len_list[0] + expand_seq_value_len_list = list(map(lambda x: tf.expand_dims(x, axis=-1), seq_value_len_list)) + a = concat_func(expand_seq_value_len_list) + if self.mode == "mean": + hist = reduce_mean(a, axis=-1, ) + if self.mode == "sum": + hist = reduce_sum(a, axis=-1, ) + if self.mode == "max": + hist = reduce_max(a, axis=-1, ) + return hist + + def get_config(self, ): + config = {'mode': self.mode, 'supports_masking': self.supports_masking} + base_config = super(PoolingLayer, self).get_config() + return dict(list(base_config.items()) + list(config.items())) \ No newline at end of file diff --git a/codes/funrec/layers/interaction.py b/codes/funrec/layers/interaction.py index 79e4df3a..aac9fab0 100644 --- a/codes/funrec/layers/interaction.py +++ b/codes/funrec/layers/interaction.py @@ -1,8 +1,10 @@ import itertools from re import L import tensorflow as tf -from tensorflow.keras.layers import Layer, Concatenate -from tensorflow.keras.initializers import TruncatedNormal, glorot_normal +from tensorflow.keras.layers import * +from tensorflow.keras.initializers import TruncatedNormal, glorot_normal, RandomNormal + +from layers.utils import softmax, squash, reduce_sum class FM(Layer): @@ -352,4 +354,256 @@ class BilinearInteractionLayer(Layer): config = {"bilinear_type": self.bilinear_type, "seed": self.seed} base_config = super(BilinearInteractionLayer, self).get_config() base_config.update(config) - return base_config \ No newline at end of file + return base_config + + +class AttentionSequencePoolingLayer(Layer): + """ + :param query: [batch_szie, 1, c_q] + :param keys: [batch_size, T, c_k] + :param keys_length: [batch_size, 1] + :return [batch, 1, c_k] + """ + + def __init__(self, dropout_rate=0, scale=True, **kwargs): + self.dropout_rate = dropout_rate + self.scale = scale + super(AttentionSequencePoolingLayer, self).__init__(**kwargs) + + def build(self, input_shape): + self.projection_layer = Dense(units=1, activation='tanh') + super(AttentionSequencePoolingLayer, self).build(input_shape) + + def call(self, inputs, mask=None, **kwargs): + # queries[None, 1, 64], keys[None, 50, 32], keys_length[None, 1], 表示真实的会话长度, 后面mask会用到 + queries, keys, keys_length = inputs + hist_len = keys.get_shape()[1] + key_mask = tf.sequence_mask(keys_length, hist_len) # mask 矩阵 (None, 1, 50) + + queries = tf.tile(queries, [1, hist_len, 1]) # [None, 50, 64] 为每个key都配备一个query + # 后面,把queries与keys拼起来过全连接, 这里是这样的, 本身接下来是要求queryies与keys中每个item相关性,常规操作我们能想到的就是直接内积求得分数 + # 而这里的Attention实现,使用的是LuongAttention, 传统的Attention原来是有两种BahdanauAttention与LuongAttention, 这个在博客上整理下 + # 这里采用的LuongAttention,对其方式是q_k先拼接,然后过DNN的方式 + q_k = tf.concat([queries, keys], axis=-1) # [None, 50, 96] + output_scores = self.projection_layer(q_k) # [None, 50, 1] + + if self.scale: + output_scores = output_scores / (q_k.get_shape().as_list()[-1] ** 0.5) + attention_score = tf.transpose(output_scores, [0, 2, 1]) + + # 加权求和 需要把填充的那部分mask掉 + paddings = tf.ones_like(attention_score) * (-2 ** 32 + 1) + attention_score = tf.where(key_mask, attention_score, paddings) + attention_score = softmax(attention_score) # [None, 1, 50] + + outputs = tf.matmul(attention_score, keys) # [None, 1, 32] + return outputs + + def compute_output_shape(self, input_shape): + return (None, 1, input_shape[1][1]) + + +class MultiHeadAttention(Layer): + def __init__(self, num_units=8, head_num=4, scale=True, dropout_rate=0.2, use_layer_norm=True, use_res=True, + seed=2020, **kwargs): + self.num_units = num_units + self.head_num = head_num + self.scale = scale + self.dropout_rate = dropout_rate + self.use_layer_norm = use_layer_norm + self.use_res = use_res + self.seed = seed + super(MultiHeadAttention, self).__init__(**kwargs) + + def build(self, input_shape): + embedding_size = int(input_shape[0][-1]) + # wq_wk_wv 放到一块 + self.W = self.add_weight(name='Q_K_V', shape=[embedding_size, self.num_units * 3], + dtype=tf.float32, + initializer=TruncatedNormal(seed=self.seed)) + self.W_output = self.add_weight(name='output_W', shape=[self.num_units, self.num_units], + dtype=tf.float32, + initializer=TruncatedNormal(seed=self.seed)) + self.layer_norm = LayerNormalization() + self.dropout = Dropout(self.dropout_rate, seed=self.seed) + self.seq_len_max = int(input_shape[0][1]) + super(MultiHeadAttention, self).build(input_shape) + + def call(self, inputs, training=None, **kwargs): + input_info, keys_length = inputs + hist_len = input_info.get_shape()[1] + key_masks = tf.sequence_mask(keys_length, hist_len) # (None, 1, 5) + key_masks = tf.squeeze(key_masks, axis=1) # (None, 5) + + Q_K_V = tf.tensordot(input_info, self.W, axes=(-1, 0)) # (None, seq_len, embed*3) + querys, keys, values = tf.split(Q_K_V, 3, -1) + + # 计算的时候,分开头计算 + querys = tf.concat(tf.split(querys, self.head_num, axis=2), axis=0) # (head_num*None, seq_len, embed/head_num) + keys = tf.concat(tf.split(keys, self.head_num, axis=2), axis=0) # (head_num*None, seq_len, embed/headnum) + values = tf.concat(tf.split(values, self.head_num, axis=2), axis=0) # (head_num*None, seq_len, embed/head_num) + + # 注意力分数 + att_score = tf.matmul(querys, tf.transpose(keys, [0, 2, 1])) # (head_num*None, seq_len, seq_len) + if self.scale: + att_score = att_score / (keys.get_shape().as_list()[-1] ** 0.5) + key_masks = tf.tile(key_masks, [self.head_num, 1]) # [head_num*None, seq_len] + key_masks = tf.tile(tf.expand_dims(key_masks, 1), + [1, tf.shape(input_info)[1], 1]) # [head_num*None, seq_len, seq_len] + + paddings = tf.ones_like(att_score) * (-2 ** 32 + 1) # [head_num*None, seq_len, seq_len] + align = tf.where(key_masks, att_score, paddings) # [head_num*None, seq_len, seq_len] + align = softmax(align) + output = tf.matmul(align, values) # [head_num*None, seq_len, emb/head_num] + + output = tf.concat(tf.split(output, self.head_num, axis=0), axis=2) # [None, seq_len, emb] + output = tf.tensordot(output, self.W_output, axes=(-1, 0)) # [None, seq_len, emb] + output = self.dropout(output, training=training) + if self.use_res: + output += input_info + if self.use_layer_norm: + output = self.layer_norm(output) + return output + + def compute_output_shape(self, input_shape): + return (None, input_shape[0][1], self.num_units) + + +class UserAttention(Layer): + def __init__(self, num_units=None, activation='tanh', use_res=True, dropout_rate=0, scale=True, seed=2020, + **kwargs): + self.scale = scale + self.num_units = num_units + self.activation = activation + self.seed = seed + self.dropout_rate = dropout_rate + self.use_res = use_res + super(UserAttention, self).__init__(**kwargs) + + def build(self, input_shape): + if self.num_units is None: + self.num_units = input_shape[0][-1] + self.dense = Dense(self.num_units, activation=self.activation) + super(UserAttention, self).build(input_shape) + + def call(self, inputs, mask=None, **kwargs): + # [None, 1, embed] [None, 5, embed], [None, 1] + user_query, keys, keys_length = inputs + hist_len = keys.get_shape()[1] + key_masks = tf.sequence_mask(keys_length, hist_len) # [None, 1, 5] + query = self.dense(user_query) # [None, 1, num_units] + + # 注意力分数 + att_score = tf.matmul(query, tf.transpose(keys, [0, 2, 1])) # (None, 1, seq_len) + if self.scale: + att_score = att_score / (keys.get_shape().as_list()[-1] ** 0.5) + + paddings = tf.ones_like(att_score) * (-2 ** 32 + 1) # [None, 1, seq_len] + align = tf.where(key_masks, att_score, paddings) # [None, 1, seq_len] + align = softmax(align) + + output = tf.matmul(align, keys) # [None, 1, embed] + return output + + def compute_output_shape(self, input_shape): + return (None, 1, input_shape[1][2]) + + def compute_mask(self, inputs, mask): + return mask + +class LabelAwareAttention(Layer): + def __init__(self, k_max, pow_p=1, **kwargs): + self.k_max = k_max + self.pow_p = pow_p + super(LabelAwareAttention, self).__init__(**kwargs) + + def build(self, input_shape): + self.embedding_size = input_shape[0][-1] + super(LabelAwareAttention, self).build(input_shape) + + def call(self, inputs): + keys, query = inputs[0], inputs[1] # keys (None, 2, 8) query (None, 1, 8) + weight = reduce_sum(keys * query, axis=-1, keep_dims=True) # (None, 2, 1) + weight = tf.pow(weight, self.pow_p) # (None, 2, 1) + + # k如果需要动态调整,那么这里就根据实际长度mask操作,这样被mask的输出胶囊的权重为0, 发挥不出作用了 + if len(inputs) == 3: + k_user = tf.cast(tf.maximum( + 1., + tf.minimum( + tf.cast(self.k_max, dtype="float32"), # k_max + tf.log1p(tf.cast(inputs[2], dtype="float32")) / tf.log(2.) # hist_len + ) + ), dtype="int64") + seq_mask = tf.transpose(tf.sequence_mask(k_user, self.k_max), [0, 2, 1]) + padding = tf.ones_like(seq_mask, dtype=tf.float32) * (-2 ** 32 + 1) # [x,k_max,1] + weight = tf.where(seq_mask, weight, padding) + + weight = softmax(weight, dim=1, name='weight') + output = reduce_sum(keys * weight, axis=1) # (None, 8) + return output + + +class CapsuleLayer(Layer): + def __init__(self, input_units, out_units, max_len, k_max, iteration_times=3, init_std=1.0, **kwargs): + self.input_units = input_units + self.out_units = out_units + self.max_len = max_len + self.k_max = k_max + self.iteration_times = iteration_times + self.init_std = init_std + super(CapsuleLayer, self).__init__(**kwargs) + + def build(self, input_shape): + # 路由对数,大小是1,2,50, 即每个路由对数与输入胶囊个数一一对应,同时如果有两组输出胶囊的话, 那么这里就需要2组B + self.routing_logits = self.add_weight(shape=[1, self.k_max, self.max_len], + initializer=RandomNormal(stddev=self.init_std), + trainable=False, name='B', dtype=tf.float32) + # 双线性映射矩阵,维度是[输入胶囊维度,输出胶囊维度] 这样才能进行映射 + self.bilinear_mapping_matrix = self.add_weight(shape=[self.input_units, self.out_units], + initializer=RandomNormal(stddev=self.init_std), + name="S", dtype=tf.float32) + super(CapsuleLayer, self).build(input_shape) + + def call(self, inputs, **kwargs): + # input (hist_emb, hist_len) ,其中hist_emb是(None, seq_len, emb_dim), hist_len是(none, 1) batch and sel_len + behavior_embeddings, seq_len = inputs + batch_size = tf.shape(behavior_embeddings)[0] + seq_len_tile = tf.tile(seq_len, [1, self.k_max]) # 在第二个维度上复制一份 k_max个输出胶囊嘛 (None, 2) 第一列和第二列都是序列真实长度 + + for i in range(self.iteration_times): + mask = tf.sequence_mask(seq_len_tile, + self.max_len) # (None, 2, 50) 第一个维度是样本,第二个维度是胶囊,第三个维度是[True, True, ..False, False, ..] + pad = tf.ones_like(mask, dtype=tf.float32) * ( + -2 ** 32 + 1) # (None, 2, 50) 被mask的位置是非常小的数,这样softmax的时候这个位置正好是0 + routing_logits_with_padding = tf.where(mask, tf.tile(self.routing_logits, [batch_size, 1, 1]), pad) + # (None, 2, 50) 沿着batch_size进行复制,每个样本都得有这样一套B,2个输出胶囊, 50个输入胶囊 + + weight = tf.nn.softmax(routing_logits_with_padding) # (none, 2, 50) # softmax得到权重 + + # (None, seq_len, emb_dim) * (emb_dim, out_emb) = (None, 50, 8) axes=1表示a的最后1个维度,与b进行张量乘法 + behavior_embedding_mapping = tf.tensordot(behavior_embeddings, self.bilinear_mapping_matrix, axes=1) + + Z = tf.matmul(weight, behavior_embedding_mapping) # (None, 2, 8)即 上面的B与behavior_embed_map加权求和 + interest_capsules = squash(Z) # (None, 2, 8) + + delta_routing_logits = reduce_sum( + # (None, 2, 8) * (None, 8, 50) = (None, 2, 50) + tf.matmul(interest_capsules, tf.transpose(behavior_embedding_mapping, perm=[0, 2, 1])), + axis=0, keep_dims=True + ) # (1, 2, 50) 样本维度这里相加 所有样本一块为聚类做贡献 + + self.routing_logits.assign_add(delta_routing_logits) # 原来的基础上加上这个东西 (1, 2, 50) + + interest_capsules = tf.reshape(interest_capsules, [-1, self.k_max, self.out_units]) # (None, 2, 8) + return interest_capsules + + def compute_output_shape(self, input_shape): + return (None, self.k_max, self.out_units) + + # 下面这个如果需要保存模型的时候会用到 + def get_config(self, ): + config = {'input_units': self.input_units, 'out_units': self.out_units, 'max_len': self.max_len, + 'k_max': self.k_max, 'iteration_times': self.iteration_times, "init_std": self.init_std} + base_config = super(CapsuleLayer, self).get_config() + return dict(list(base_config.items()) + list(config.items())) \ No newline at end of file diff --git a/codes/funrec/layers/sequence.py b/codes/funrec/layers/sequence.py index e3240824..a0d67a69 100644 --- a/codes/funrec/layers/sequence.py +++ b/codes/funrec/layers/sequence.py @@ -1,7 +1,94 @@ -from tensorflow.keras.layers import Layer +import tensorflow as tf +from tensorflow.keras.layers import * + +if tf.__version__ >= '2.0.0': + tf.compat.v1.disable_eager_execution() # TODO class PoolingLayer(Layer): def __init__(self, trainable=True, name=None, dtype=None, dynamic=False, **kwargs): super().__init__(trainable, name, dtype, dynamic, **kwargs) - pass \ No newline at end of file + pass + + +class DynamicMultiRNN(Layer): + def __init__(self, num_units=None, rnn_type='LSTM', return_sequence=True, num_layers=2, num_residual_layers=1, + dropout_rate=0.2, forget_bias=1.0, **kwargs): + self.num_units = num_units + self.return_sequence = return_sequence + self.rnn_type = rnn_type + self.num_layers = num_layers + self.num_residual_layers = num_residual_layers + self.dropout = dropout_rate + self.forget_bias = forget_bias + super(DynamicMultiRNN, self).__init__(**kwargs) + + def build(self, input_shape): + input_seq_shape = input_shape[0] + if self.num_units is None: + self.num_units = input_seq_shape.as_list()[-1] + # 这里先只用LSTM + if self.rnn_type == "LSTM": + try: + # AttributeError: module 'tensorflow_core._api.v2.nn' has no attribute 'rnn_cell' + single_cell = tf.nn.rnn_cell.BasicLSTMCell(self.num_units, forget_bias=self.forget_bias) + except AttributeError: + single_cell = tf.compat.v1.nn.rnn_cell.BasicLSTMCell(self.num_units, forget_bias=self.forget_bias) + + dropout = self.dropout if tf.keras.backend.learning_phase() == 1 else 0 # 训练的时候开启Dropout, 不训练的时候关闭 + try: + single_cell = tf.nn.rnn_cell.DropoutWrapper(cell=single_cell, input_keep_prob=(1.0 - dropout)) + except AttributeError: + single_cell = tf.compat.v1.nn.rnn_cell.DropoutWrapper(cell=single_cell, input_keep_prob=(1.0 - dropout)) + + cell_list = [] + for i in range(self.num_layers): + residual = (i >= self.num_layers - self.num_residual_layers) + if residual: + # 实现的功能, 把输入concat到输出上一起返回, 就跟ResNet一样 + try: + single_cell_residual = tf.nn.rnn_cell.ResidualWrapper(single_cell) + except AttributeError: + single_cell_residual = tf.compat.v1.nn.rnn_cell.ResidualWrapper(single_cell) + cell_list.append(single_cell_residual) + else: + cell_list.append(single_cell) + + if len(cell_list) == 1: + self.final_cell = cell_list[0] + else: + # 构建多隐层RNN + try: + self.final_cell = tf.nn.rnn_cell.MultiRNNCell(cell_list) + except AttributeError: + self.final_cell = tf.compat.v1.nn.rnn_cell.MultiRNNCell(cell_list) + + super(DynamicMultiRNN, self).build(input_shape) + + def call(self, input_list): + rnn_input, sequence_length = input_list + + try: + # AttributeError: module 'tensorflow' has no attribute 'variable_scope' + with tf.name_scope('rnn'), tf.variable_scope('rnn', reuse=tf.AUTO_REUSE): + # rnn_output是所有时间步的隐藏层状态, hidden_state是最后一个时间步的输出, 如果是LSTM这里是c和h两个 + rnn_output, hidden_state = tf.nn.dynamic_rnn(self.final_cell, inputs=rnn_input, + sequence_length=tf.squeeze(sequence_length), + dtype=tf.float32, scope=self.name) + except AttributeError: + with tf.name_scope("rnn"), tf.compat.v1.variable_scope("rnn", reuse=tf.compat.v1.AUTO_REUSE): + rnn_output, hidden_state = tf.compat.v1.nn.dynamic_rnn(self.final_cell, inputs=rnn_input, + sequence_length=tf.squeeze(sequence_length), + dtype=tf.float32, scope=self.name) + if self.return_sequence: + return rnn_output + else: + return tf.expand_dims(hidden_state, axis=1) + + def compute_output_shape(self, input_shape): + rnn_input_shape = input_shape[0] + if self.return_sequence: + return rnn_input_shape + else: + return (None, 1, rnn_input_shape[2]) + diff --git a/codes/funrec/layers/utils.py b/codes/funrec/layers/utils.py index 5a9cecfc..b9cedc0d 100644 --- a/codes/funrec/layers/utils.py +++ b/codes/funrec/layers/utils.py @@ -1,5 +1,7 @@ +from collections import OrderedDict import tensorflow as tf - +from tensorflow.keras.layers import * +from deepctr.feature_column import SparseFeat, DenseFeat, VarLenSparseFeat class NoMask(tf.keras.layers.Layer): def __init__(self, **kwargs): @@ -15,3 +17,106 @@ class NoMask(tf.keras.layers.Layer): def compute_mask(self, inputs, mask): return None +def softmax(logits, dim=-1, name=None): + try: + return tf.nn.softmax(logits, dim=dim, name=name) + except TypeError: + return tf.nn.softmax(logits, axis=dim, name=name) + +def concat_func(inputs, axis=-1, mask=False): + if not mask: + inputs = list(map(NoMask(), inputs)) + if len(inputs) == 1: + return inputs[0] + else: + return tf.keras.layers.Concatenate(axis=axis)(inputs) + +def build_input_layers(feature_columns, prefix=''): + input_features = OrderedDict() + for fc in feature_columns: + if isinstance(fc, SparseFeat): + input_features[fc.name] = Input( + shape=(1,), name=prefix + fc.name, dtype=fc.dtype) + elif isinstance(fc, DenseFeat): + input_features[fc.name] = Input( + shape=(fc.dimension,), name=prefix + fc.name, dtype=fc.dtype) + elif isinstance(fc, VarLenSparseFeat): + input_features[fc.name] = Input(shape=(fc.maxlen,), name=prefix + fc.name, + dtype=fc.dtype) + if fc.weight_name is not None: + input_features[fc.weight_name] = Input(shape=(fc.maxlen, 1), name=prefix + fc.weight_name, + dtype="float32") + if fc.length_name is not None: + input_features[fc.length_name] = Input((1,), name=prefix + fc.length_name, dtype='int32') + + else: + raise TypeError("Invalid feature column type,got", type(fc)) + + return input_features + + +def build_embedding_layers(feature_columns): + embedding_layer_dict = {} + + for fc in feature_columns: + if isinstance(fc, SparseFeat): + embedding_layer_dict[fc.name] = Embedding(fc.vocabulary_size, fc.embedding_dim, name='emb_' + fc.name) + elif isinstance(fc, VarLenSparseFeat): + # 这里加1是因为mask有个默认的embedding,占用了一个位置,比如mask为0, 而0这个位置的embedding是mask专用了 + embedding_layer_dict[fc.name] = Embedding(fc.vocabulary_size + 1, fc.embedding_dim, name='emb_' + fc.name, + mask_zero=True) + + return embedding_layer_dict + + +def embedding_lookup(feature_columns, input_layer_dict, embedding_layer_dict): + # 这个函数是Input层与embedding层给接起来 + embedding_list = [] + + for fc in feature_columns: + _input = input_layer_dict[fc] + _embed = embedding_layer_dict[fc] + embed = _embed(_input) + embedding_list.append(embed) + + return embedding_list + +def get_item_embedding(item_embedding, item_input_layer): + return Lambda(lambda x: tf.squeeze(tf.gather(item_embedding, x), axis=1))( + item_input_layer) + +def squash(inputs): + vec_squared_norm = reduce_sum(tf.square(inputs), axis=-1, keep_dims=True) # (None, 2, 1) + scalar_factor = vec_squared_norm / (1 + vec_squared_norm) / tf.sqrt(vec_squared_norm + 1e-8) + vec_squashed = scalar_factor * inputs + return vec_squashed + +def reduce_sum(input_tensor, + axis=None, + keep_dims=False, + name=None, + reduction_indices=None): + try: + return tf.reduce_sum(input_tensor, + axis=axis, + keep_dims=keep_dims, + name=name, + reduction_indices=reduction_indices) + except TypeError: + return tf.reduce_sum(input_tensor, + axis=axis, + keepdims=keep_dims, + name=name) + +def combined_dnn_input(sparse_embedding_list, dense_value_list): + if len(sparse_embedding_list) > 0 and len(dense_value_list) > 0: + sparse_dnn_input = Flatten()(concat_func(sparse_embedding_list)) + dense_dnn_input = Flatten()(concat_func(dense_value_list)) + return concat_func([sparse_dnn_input, dense_dnn_input]) + elif len(sparse_embedding_list) > 0: + return Flatten()(concat_func(sparse_embedding_list)) + elif len(dense_value_list) > 0: + return Flatten()(concat_func(dense_value_list)) + else: + raise NotImplementedError("dnn_feature_columns can not be empty list") + diff --git a/codes/funrec/metrics.py b/codes/funrec/metrics.py index b610f836..c8daab30 100644 --- a/codes/funrec/metrics.py +++ b/codes/funrec/metrics.py @@ -1 +1,18 @@ -# TODO 将召回排序常用的指标汇总 \ No newline at end of file +# TODO 将召回排序常用的指标汇总 + +# 依次评估召回的前10, 20, 30, 40, 50个文章中的击中率 +def H_Rate(user_recall_items_dict, trn_last_click_df, topk=100): + last_click_item_dict = dict(zip(trn_last_click_df['user_id'], trn_last_click_df['article_id'])) + user_num = len(user_recall_items_dict) + + for k in range(50, topk + 1, 50): + hit_num = 0 + for user, item_list in user_recall_items_dict.items(): + if user in last_click_item_dict: + # 获取前k个召回的结果 + tmp_recall_items = [x[0] for x in user_recall_items_dict[user][:k]] + if last_click_item_dict[user] in set(tmp_recall_items): + hit_num += 1 + + hit_rate = round(hit_num * 1.0 / user_num, 5) + print(' topk: ', k, ' : ', 'hit_num: ', hit_num, 'hit_rate: ', hit_rate, 'user_num : ', user_num) \ No newline at end of file diff --git a/codes/funrec/models/matching/MIND.py b/codes/funrec/models/matching/MIND.py new file mode 100644 index 00000000..da4635b4 --- /dev/null +++ b/codes/funrec/models/matching/MIND.py @@ -0,0 +1,118 @@ +import tensorflow as tf +from tensorflow.python.keras import backend as K +from tensorflow.keras.layers import * +from tensorflow.keras.models import * +from tensorflow.python.keras.models import Model + +# 不用deepctr的feature_column, 在filter选择的时候,怎么选择不出东西? +from deepctr.feature_column import SparseFeat, VarLenSparseFeat, DenseFeat + +from layers.utils import build_input_layers, build_embedding_layers, embedding_lookup, combined_dnn_input, NoMask, get_item_embedding +from layers.interaction import LabelAwareAttention, CapsuleLayer +from layers.core import SampledSoftmaxLayer, PoolingLayer, EmbeddingIndex, DNN + +def tile_user_otherfeat(user_other_feature, k_max): + return tf.tile(tf.expand_dims(user_other_feature, -2), [1, k_max, 1]) + + +def MIND(user_feature_columns, item_feature_columns, num_sampled=5, k_max=2, p=1.0, dynamic_k=False, user_dnn_hidden_units=(64, 32), + dnn_activation='relu', dnn_use_bn=False, l2_reg_dnn=0, l2_reg_embedding=1e-6, dnn_dropout=0, output_activation='linear', seed=1024): + """ + :param k_max: 用户兴趣胶囊的最大个数 + """ + # 目前这里只支持item_feature_columns为1的情况,即只能转入item_id + if len(item_feature_columns) > 1: + raise ValueError("Now MIND only support 1 item feature like item_id") + + # 获取item相关的配置参数 + item_feature_column = item_feature_columns[0] + item_feature_name = item_feature_column.name + item_vocabulary_size = item_feature_column.vocabulary_size + item_embedding_dim = item_feature_column.embedding_dim + + behavior_feature_list = [item_feature_name] + + # 为用户特征创建Input层 + user_input_layer_dict = build_input_layers(user_feature_columns) + item_input_layer_dict = build_input_layers(item_feature_columns) + # 将Input层转化成列表的形式作为model的输入 + user_input_layers = list(user_input_layer_dict.values()) + item_input_layers = list(item_input_layer_dict.values()) + + # 筛选出特征中的sparse特征和dense特征,方便单独处理 + sparse_feature_columns = list \ + (filter(lambda x: isinstance(x, SparseFeat), user_feature_columns)) if user_feature_columns else [] + dense_feature_columns = list \ + (filter(lambda x: isinstance(x, DenseFeat), user_feature_columns)) if user_feature_columns else [] + varlen_feature_columns = list \ + (filter(lambda x: isinstance(x, VarLenSparseFeat), user_feature_columns)) if user_feature_columns else [] + + # 由于这个变长序列里面只有历史点击文章,没有类别啥的,所以这里直接可以用varlen_feature_columns + # deepctr这里单独把点击文章这个放到了history_feature_columns + seq_max_len = varlen_feature_columns[0].maxlen + + # 构建embedding字典 + embedding_layer_dict = build_embedding_layers(user_feature_columns +item_feature_columns) + + # 获取当前的行为特征(doc)的embedding,这里面可能又多个类别特征,所以需要pooling下 + query_embed_list = embedding_lookup(behavior_feature_list, item_input_layer_dict, embedding_layer_dict) # 长度为1 + # 获取行为序列(doc_id序列, hist_doc_id) 对应的embedding,这里有可能有多个行为产生了行为序列,所以需要使用列表将其放在一起 + keys_embed_list = embedding_lookup([varlen_feature_columns[0].name], user_input_layer_dict, embedding_layer_dict) # 长度为1 + + # 用户离散特征的输入层与embedding层拼接 + dnn_input_emb_list = embedding_lookup([col.name for col in sparse_feature_columns], user_input_layer_dict, embedding_layer_dict) + + # 获取dense + dnn_dense_input = [] + for fc in dense_feature_columns: + if fc.name != 'hist_len': # 连续特征不要这个 + dnn_dense_input.append(user_input_layer_dict[fc.name]) + + # 把keys_emb_list和query_emb_listpooling操作, 这是因为可能每个商品不仅有id,还可能用类别,品牌等多个embedding向量,这种需要pooling成一个 + history_emb = PoolingLayer()(NoMask()(keys_embed_list)) # (None, 50, 8) + target_emb = PoolingLayer()(NoMask()(query_embed_list)) # (None, 1, 8) + + hist_len = user_input_layer_dict['hist_len'] + # 胶囊网络 + # (None, 2, 8) 得到了两个兴趣胶囊 + high_capsule = CapsuleLayer(input_units=item_embedding_dim, out_units=item_embedding_dim, + max_len=seq_max_len, k_max=k_max)((history_emb, hist_len)) # 【5, 7, 8, 9】 + + # 把用户的其他特征拼接到胶囊网络上来 + if len(dnn_input_emb_list) > 0 or len(dnn_dense_input) > 0: + user_other_feature = combined_dnn_input(dnn_input_emb_list, dnn_dense_input) + # (None, 2, 32) 这里会发现其他的用户特征是每个胶囊复制了一份,然后拼接起来 + other_feature_tile = tf.keras.layers.Lambda(tile_user_otherfeat, arguments={'k_max': k_max})(user_other_feature) + user_deep_input = Concatenate()([NoMask()(other_feature_tile), high_capsule]) # (None, 2, 40) + else: + user_deep_input = high_capsule + + # 接下来过一个DNN层,获取最终的用户表示向量 如果是三维输入, 那么最后一个维度与w相乘,所以这里如果不自己写,可以用Dense层的列表也可以 + user_embeddings = DNN(user_dnn_hidden_units, dnn_activation)(user_deep_input) # (None, 2, 8) + # 接下来,过Label-aware layer + if dynamic_k: + user_embedding_final = LabelAwareAttention(k_max=k_max, pow_p=p)((user_embeddings, target_emb, hist_len)) + else: + user_embedding_final = LabelAwareAttention(k_max=k_max, pow_p=p)((user_embeddings, target_emb)) + + # 接下来 + item_embedding_matrix = embedding_layer_dict[item_feature_name] # 获取doc_id的embedding层 + item_index = EmbeddingIndex(list(range(item_vocabulary_size))) \ + (item_input_layer_dict[item_feature_name]) # 所有doc_id的索引 + item_embedding_weight = NoMask()(item_embedding_matrix(item_index)) # 拿到所有item的embedding + pooling_item_embedding_weight = PoolingLayer() \ + ([item_embedding_weight]) # 这里依然是当可能不止item_id,或许还有brand_id, cat_id等,需要池化 + + # 这里传入的是整个doc_id的embedding, user_embedding, 以及用户点击的doc_id,然后去进行负采样计算损失操作 + output = SampledSoftmaxLayer(num_sampled) \ + ([pooling_item_embedding_weight, user_embedding_final, item_input_layer_dict[item_feature_name]]) + + model = Model(inputs=user_input_layers + item_input_layers, outputs=output) + + # 下面是等模型训练完了之后,获取用户和item的embedding + model.__setattr__("user_input", user_input_layers) + model.__setattr__("user_embedding", user_embeddings) + model.__setattr__("item_input", item_input_layers) + model.__setattr__("item_embedding", get_item_embedding(pooling_item_embedding_weight, item_input_layer_dict[item_feature_name])) + + return model \ No newline at end of file diff --git a/codes/funrec/models/matching/SDM.py b/codes/funrec/models/matching/SDM.py new file mode 100644 index 00000000..256e08a4 --- /dev/null +++ b/codes/funrec/models/matching/SDM.py @@ -0,0 +1,146 @@ +from collections import OrderedDict + +import tensorflow as tf +from tensorflow.python.keras import backend as K +from tensorflow.keras.layers import * +from tensorflow.keras.models import * +from tensorflow.python.keras.models import Model + +# 不用deepctr的feature_column, 在filter选择的时候,怎么选择不出东西? +from deepctr.feature_column import SparseFeat, VarLenSparseFeat, DenseFeat + +from layers.utils import build_input_layers, build_embedding_layers, embedding_lookup, concat_func, NoMask, get_item_embedding +from layers.interaction import MultiHeadAttention, UserAttention, AttentionSequencePoolingLayer +from layers.core import SampledSoftmaxLayer, PoolingLayer, EmbeddingIndex +from layers.sequence import DynamicMultiRNN + +if tf.__version__ >= '2.0.0': + tf.compat.v1.disable_eager_execution() + + +def SDM(user_feature_columns, item_feature_columns, history_feature_list, num_sampled=5, units=32, rnn_layers=2, + dropout_rate=0.2, rnn_num_res=1, num_head=4, l2_reg_embedding=1e-6, dnn_activation='tanh', seed=1024): + """ + :param rnn_num_res: rnn的残差层个数 + :param history_feature_list: short和long sequence field + """ + # item_feature目前只支持doc_id, 再加别的就不行了,其实这里可以改造下 + if (len(item_feature_columns)) > 1: + raise ValueError("SDM only support 1 item feature like doc_id") + + # 获取item_feature的一些属性 + item_feature_column = item_feature_columns[0] + item_feature_name = item_feature_column.name + item_vocabulary_size = item_feature_column.vocabulary_size + + # 为用户特征创建Input层 + user_input_layer_dict = build_input_layers(user_feature_columns) + item_input_layer_dict = build_input_layers(item_feature_columns) + + # 将Input层转化成列表的形式作为model的输入 + user_input_layers = list(user_input_layer_dict.values()) + item_input_layers = list(item_input_layer_dict.values()) + + # 筛选出特征中的sparse特征和dense特征,方便单独处理 + sparse_feature_columns = list \ + (filter(lambda x: isinstance(x, SparseFeat), user_feature_columns)) if user_feature_columns else [] + dense_feature_columns = list \ + (filter(lambda x: isinstance(x, DenseFeat), user_feature_columns)) if user_feature_columns else [] + if len(dense_feature_columns) != 0: + raise ValueError("SDM dont support dense feature") # 目前不支持Dense feature + varlen_feature_columns = list \ + (filter(lambda x: isinstance(x, VarLenSparseFeat), user_feature_columns)) if user_feature_columns else [] + + # 构建embedding字典 + embedding_layer_dict = build_embedding_layers(user_feature_columns + item_feature_columns) + + # 拿到短期会话和长期会话列 之前的命名规则在这里起作用 + sparse_varlen_feature_columns = [] + prefer_history_columns = [] + short_history_columns = [] + + prefer_fc_names = list(map(lambda x: "prefer_" + x, history_feature_list)) + short_fc_names = list(map(lambda x: "short_" + x, history_feature_list)) + + for fc in varlen_feature_columns: + if fc.name in prefer_fc_names: + prefer_history_columns.append(fc) + elif fc.name in short_fc_names: + short_history_columns.append(fc) + else: + sparse_varlen_feature_columns.append(fc) + + # 获取用户的长期行为序列列表 L^u + # [, , ] + prefer_emb_list = embedding_lookup(prefer_fc_names, user_input_layer_dict, embedding_layer_dict) + # 获取用户的短期序列列表 S^u + # [, , ] + short_emb_list = embedding_lookup(short_fc_names, user_input_layer_dict, embedding_layer_dict) + + # 用户离散特征的输入层与embedding层拼接 e^u + user_emb_list = embedding_lookup([col.name for col in sparse_feature_columns], user_input_layer_dict, embedding_layer_dict) + user_emb = concat_func(user_emb_list) + user_emb_output = Dense(units, activation=dnn_activation, name='user_emb_output')(user_emb) # (None, 1, 32) + + # 长期序列行为编码 + # 过AttentionSequencePoolingLayer --> Concat --> DNN + prefer_sess_length = user_input_layer_dict['prefer_sess_length'] + prefer_att_outputs = [] + # 遍历长期行为序列 + for i, prefer_emb in enumerate(prefer_emb_list): + prefer_attention_output = AttentionSequencePoolingLayer(dropout_rate=0) \ + ([user_emb_output, prefer_emb, prefer_sess_length]) + prefer_att_outputs.append(prefer_attention_output) + prefer_att_concat = concat_func \ + (prefer_att_outputs) # (None, 1, 32) <== Concat(item_embedding,cat1_embedding,cat2_embedding) + prefer_output = Dense(units, activation=dnn_activation, name='prefer_output')(prefer_att_concat) + # print(prefer_output.shape) # (None, 1, 32) + + # 短期行为序列编码 + short_sess_length = user_input_layer_dict['short_sess_length'] + short_emb_concat = concat_func(short_emb_list) # (None, 5, 64) 这里注意下, 对于短期序列,描述item的side info信息进行了拼接 + short_emb_input = Dense(units, activation=dnn_activation, name='short_emb_input')(short_emb_concat) # (None, 5, 32) + # 过rnn 这里的return_sequence=True, 每个时间步都需要输出h + short_rnn_output = DynamicMultiRNN(num_units=units, return_sequence=True, num_layers=rnn_layers, + num_residual_layers=rnn_num_res, + dropout_rate=dropout_rate)([short_emb_input, short_sess_length]) + # print(short_rnn_output) # (None, 5, 32) + # 过MultiHeadAttention # (None, 5, 32) + short_att_output = MultiHeadAttention(num_units=units, head_num=num_head, dropout_rate=dropout_rate) \ + ([short_rnn_output, short_sess_length]) # (None, 5, 64) + # user_attention # (None, 1, 32) + short_output = UserAttention(num_units=units, activation=dnn_activation, use_res=True, dropout_rate=dropout_rate) \ + ([user_emb_output, short_att_output, short_sess_length]) + + # 门控融合 + gated_input = concat_func([prefer_output, short_output, user_emb_output]) + gate = Dense(units, activation='sigmoid')(gated_input) # (None, 1, 32) + + # temp = tf.multiply(gate, short_output) + tf.multiply(1-gate, prefer_output) 感觉这俩一样? + gated_output = Lambda(lambda x: tf.multiply(x[0], x[1]) + tf.multiply( 1 -x[0], x[2])) \ + ([gate, short_output, prefer_output]) # [None, 1,32] + gated_output_reshape = Lambda(lambda x: tf.squeeze(x, 1)) \ + (gated_output) # (None, 32) 这个维度必须要和docembedding层的维度一样,否则后面没法sortmax_loss + + # 接下来 + item_embedding_matrix = embedding_layer_dict[item_feature_name] # 获取doc_id的embedding层 + item_index = EmbeddingIndex(list(range(item_vocabulary_size))) \ + (item_input_layer_dict[item_feature_name]) # 所有doc_id的索引 + item_embedding_weight = NoMask()(item_embedding_matrix(item_index)) # 拿到所有item的embedding + pooling_item_embedding_weight = PoolingLayer() \ + ([item_embedding_weight]) # 这里依然是当可能不止item_id,或许还有brand_id, cat_id等,需要池化 + + # 这里传入的是整个doc_id的embedding, user_embedding, 以及用户点击的doc_id,然后去进行负采样计算损失操作 + output = SampledSoftmaxLayer(num_sampled) \ + ([pooling_item_embedding_weight, gated_output_reshape, item_input_layer_dict[item_feature_name]]) + + model = Model(inputs=user_input_layers +item_input_layers, outputs=output) + + # 下面是等模型训练完了之后,获取用户和item的embedding + model.__setattr__("user_input", user_input_layers) + model.__setattr__("user_embedding", gated_output_reshape) # 用户embedding是取得门控融合的用户向量 + model.__setattr__("item_input", item_input_layers) + # item_embedding取得pooling_item_embedding_weight, 这个会发现是负采样操作训练的那个embedding矩阵 + model.__setattr__("item_embedding", get_item_embedding(pooling_item_embedding_weight, item_input_layer_dict[item_feature_name])) + + return model \ No newline at end of file diff --git a/codes/funrec/recall_methods.py b/codes/funrec/recall_methods.py new file mode 100644 index 00000000..9204f4c7 --- /dev/null +++ b/codes/funrec/recall_methods.py @@ -0,0 +1,121 @@ +import os +import numpy as np +import pickle +from sklearn.preprocessing import LabelEncoder + +# 从utils里面导入函数 +from examples.preprocess import gen_data_set, gen_model_input, gen_sdm_model_input, gen_sdm_data_set +from examples.utils import train_mind_model, get_embeddings, get_recall_res, train_sdm_model + + +def sdm_recall(data, topk=200, embedding_dim=32, SEQ_LEN_short=5, SEQ_LEN_prefer=50, + batch_size=64, epochs=1, verbose=1, validation_split=0.0): + """通过SDM模型,计算用户向量和文章向量 + param: data: 用户日志数据 + topk: 对于每个用户,召回多少篇文章 + """ + user_id_raw = data[['user_id']].drop_duplicates('user_id') + doc_id_raw = data[['article_id']].drop_duplicates('article_id') + + # 类别数据编码 + base_features = ['user_id', 'article_id', 'city', 'age', 'gender', 'cat_1', 'cat_2'] + feature_max_idx = {} + for f in base_features: + lbe = LabelEncoder() + data[f] = lbe.fit_transform(data[f]) + 1 + feature_max_idx[f] = data[f].max() + 1 + + # 构建用户id词典和doc的id词典,方便从用户idx找到原始的id + user_id_enc = data[['user_id']].drop_duplicates('user_id') + doc_id_enc = data[['article_id']].drop_duplicates('article_id') + user_idx_2_rawid = dict(zip(user_id_enc['user_id'], user_id_raw['user_id'])) + doc_idx_2_rawid = dict(zip(doc_id_enc['article_id'], doc_id_raw['article_id'])) + + user_profile = data[['user_id', 'gender', 'age', 'city']].drop_duplicates('user_id') + item_profile = data[['article_id']].drop_duplicates('article_id') + user_profile.set_index('user_id', inplace=True) + + train_set, test_set = gen_sdm_data_set(data, seq_short_len=SEQ_LEN_short, seq_prefer_len=SEQ_LEN_prefer) + + train_model_input, train_label = gen_sdm_model_input(train_set, user_profile, SEQ_LEN_short, SEQ_LEN_prefer) + test_model_input, test_label = gen_sdm_model_input(test_set, user_profile, SEQ_LEN_short, SEQ_LEN_prefer) + + # 构建模型并完成训练 + model = train_sdm_model(train_model_input, train_label, embedding_dim, feature_max_idx, SEQ_LEN_short, + SEQ_LEN_prefer, batch_size, epochs, verbose, validation_split) + + # 获得用户embedding和doc的embedding, 并进行保存 + user_embs, doc_embs = get_embeddings(model, test_model_input, user_idx_2_rawid, doc_idx_2_rawid) + + # 对每个用户,拿到召回结果并返回回来 + user_recall_items_dict = get_recall_res(user_embs, doc_embs, user_idx_2_rawid, doc_idx_2_rawid, topk) + + return user_recall_items_dict + + +def mind_recall(data, topk=200, embedding_dim=8, his_seq_maxlen=50, negsample=0, + batch_size=64, epochs=1, verbose=1, validation_split=0.0): + """通过MIND模型,计算用户向量和文章向量 + param: data: 用户日志数据 + topk: 对于每个用户,召回多少篇文章 + """ + user_id_raw = data[['user_id']].drop_duplicates('user_id') + doc_id_raw = data[['article_id']].drop_duplicates('article_id') + + # 类别数据编码 + base_features = ['user_id', 'article_id', 'city', 'age', 'gender'] + feature_max_idx = {} + for f in base_features: + lbe = LabelEncoder() + data[f] = lbe.fit_transform(data[f]) + feature_max_idx[f] = data[f].max() + 1 + + # 构建用户id词典和doc的id词典,方便从用户idx找到原始的id + user_id_enc = data[['user_id']].drop_duplicates('user_id') + doc_id_enc = data[['article_id']].drop_duplicates('article_id') + user_idx_2_rawid = dict(zip(user_id_enc['user_id'], user_id_raw['user_id'])) + doc_idx_2_rawid = dict(zip(doc_id_enc['article_id'], doc_id_raw['article_id'])) + + # 保存下每篇文章的被点击数量, 方便后面高热文章的打压 + doc_clicked_count_df = data.groupby('article_id')['click'].apply(lambda x: x.count()).reset_index() + doc_clicked_count_dict = dict(zip(doc_clicked_count_df['article_id'], doc_clicked_count_df['click'])) + + if os.path.exists('data/train_model_input.pkl'): + train_model_input = pickle.load(open('data/train_model_input.pkl', 'rb')) + train_label = np.load('data/train_label.npy') + test_model_input = pickle.load(open('data/test_model_input.pkl', 'rb')) + test_label = np.load('data/test_label.npy') + else: + train_set, test_set = gen_data_set(data, doc_clicked_count_dict, negsample, control_users=False) + + # 构造MIND模型的输入 + train_model_input, train_label = gen_model_input(train_set, his_seq_maxlen) + test_model_input, test_label = gen_model_input(test_set, his_seq_maxlen) + + # 保存一份输入直接,要不然每次都得这么构造输入太慢了 + pickle.dump(train_model_input, open('data/train_model_input.pkl', 'wb')) + pickle.dump(test_model_input, open('data/test_model_input.pkl', 'wb')) + np.save('data/train_label.npy', train_label) + np.save('data/test_label.npy', test_label) + + # 构建模型并完成训练 + model = train_mind_model(train_model_input, train_label, embedding_dim, feature_max_idx, his_seq_maxlen, batch_size, + epochs, verbose, validation_split) + + # 获得用户embedding和doc的embedding, 并进行保存 + user_embs, doc_embs = get_embeddings(model, test_model_input, user_idx_2_rawid, doc_idx_2_rawid) + + # MIND模型这里有k个兴趣向量,所以要分开进行召回 + user_embs1 = user_embs[:, 0, :] + user_embs2 = user_embs[:, 1, :] + + # 对每个用户,拿到召回结果并返回回来 + user_recall_items_dict1 = get_recall_res(user_embs1, doc_embs, user_idx_2_rawid, doc_idx_2_rawid, topk) + user_recall_items_dict2 = get_recall_res(user_embs2, doc_embs, user_idx_2_rawid, doc_idx_2_rawid, topk) + + # 合并,当然我这里没有去重 + # user_recall_items_dict = defaultdict(list) + # for user in user_recall_items_dict1: + # user_recall_items_dict[user] = user_recall_items_dict1[user] + user_recall_items_dict2[user] + + return user_recall_items_dict1, user_recall_items_dict2