diff --git a/codes/news_recsys/news_rec_server/conf/proj_path.py b/codes/news_recsys/news_rec_server/conf/proj_path.py index 9f4835ae..3e48887c 100644 --- a/codes/news_recsys/news_rec_server/conf/proj_path.py +++ b/codes/news_recsys/news_rec_server/conf/proj_path.py @@ -4,4 +4,9 @@ home_path = os.environ['HOME'] proj_path = home_path + "/fun-rec/codes/news_recsys/news_rec_server/" stop_words_path = proj_path + "conf/stop_words.txt" -bad_case_news_log_path = proj_path + "logs/news_bad_cases.log" \ No newline at end of file +bad_case_news_log_path = proj_path + "logs/news_bad_cases.log" + +root_data_path = "/home/recsys/news_data/5w_data/" +log_data_path = root_data_path + "train_data_5w.csv" +doc_info_path = root_data_path + "doc_info.txt" +user_info_path = root_data_path + "user_info_data_5w.csv" \ No newline at end of file diff --git a/codes/news_recsys/news_rec_server/recprocess/recall/region_recall.py b/codes/news_recsys/news_rec_server/recprocess/recall/region_recall.py new file mode 100644 index 00000000..50758f96 --- /dev/null +++ b/codes/news_recsys/news_rec_server/recprocess/recall/region_recall.py @@ -0,0 +1,101 @@ +import os +import sys +sys.path.append("../../") +from conf.proj_path import log_data_path, user_info_path, doc_info_path +import pandas as pd + + +class RegionRecall(object): + def __init__(self, log_data_path, user_info_path, doc_info_path): + super().__init__() + self.log_data_path = log_data_path + self.user_info_path = user_info_path + self.doc_info_path = doc_info_path + self.read_and_process_data() + + def read_and_process_data(self): + """读取并处理数据 + """ + log_columns = ['user_id', 'article_id', 'expo_time', 'net_status', 'flush_nums', 'expo_position', 'click', 'duration'] + user_columns = ['user_id', 'device', 'os', 'province', 'city', 'age','gender'] + doc_columns = ['article_id', 'title', 'ctime', 'img_num', 'cate','sub_cate', 'key_words'] + self.train_log_df = pd.read_csv(self.log_data_path, usecols=log_columns, sep='\t') + self.doc_info_df = pd.read_csv(self.doc_info_path, names=doc_columns, sep='\t') + self.user_info_df = pd.read_csv(self.user_info_path, usecols=user_columns, sep='\t') + # 删除重复数据 + self.train_log_df = self.train_log_df.drop_duplicates(keep='last') + self.doc_info_df = self.doc_info_df.drop_duplicates(keep='last') + self.user_info_df = self.user_info_df.drop_duplicates(keep='last') + # 转换成天的日期,可以用来筛选数据 + self.train_log_df['expo_time_day'] = pd.to_datetime(self.train_log_df['expo_time'], unit='ms').dt.strftime('%Y_%m_%d') + print("read_and_process_data success...") + + def get_article_stat_data(self, train_df, doc_info_df): + """统计所有文章的点击和曝光次数,以及点击率(点击次数 / 曝光次数) + """ + # 日志数据去重, 保留最后一条记录 + train_df = train_df.drop_duplicates(keep='last') + # 统计曝光 + expo_num_s = train_df.groupby('article_id')['user_id'].count() + expo_num_df = pd.DataFrame({'article_id': expo_num_s.index, 'expo_num': expo_num_s.values}) + # 统计点击 + click_num_s = train_df.groupby('article_id')['click'].sum() + click_num_df = pd.DataFrame({'article_id': click_num_s.index, 'click_num': click_num_s.values}) + article_df = pd.merge(expo_num_df, click_num_df, how='left', on='article_id') + # 拼接文章信息 + article_df = pd.merge(article_df, doc_info_df[['article_id', 'ctime', 'cate']], how='left', on='article_id') + article_df['click_rate'] = article_df['click_num'] / article_df['expo_num'] + return article_df + + def get_province_articles(self, df, topK, cur_time): + """筛选和过滤规则, 里面的超参都可以根据业务数据的具体分布进行修改 + """ + df['ctime_date'] = pd.to_datetime(df['ctime'], unit='ms') + # 时间差 + df['delta_time'] = pd.to_datetime([cur_time] * df.shape[0]) - df['ctime_date'] + # 保留最近三天的新闻,保证新闻的时效性 + df = df[df['delta_time'].dt.days >= 3].reset_index(drop=True) + # expo_num 过滤 + df = df[df['expo_num'] >= 1000].reset_index(drop=True) + # 点击率 + df = df[df['click_rate'] >= 0.1].reset_index(drop=True) + # 按照点击率倒排 + df = df.sort_values('click_rate', ascending=False).reset_index(drop=True) + # 数据格式:article_id:cate:click_rate + df['article_id_and_click_rate'] = df.apply(lambda x: str(\ + x['article_id']) + ':' + str(x['cate']) + ':' + str(round(x['click_rate'], 5)), axis=1) + article_list = df['article_id_and_click_rate'].values[:topK] + return article_list + + def province_recall(self, N, cur_time): + article_df = self.get_article_stat_data(self.train_log_df, self.doc_info_df) + region_articles_df = pd.merge(self.train_log_df, self.user_info_df, how='left', on='user_id') + region_articles_df = region_articles_df[['user_id', 'article_id', 'province', 'city']] + region_articles_df = pd.merge(region_articles_df, article_df, how='left', on='article_id') + # 去除时间为空, 以及一些异常数据 + region_articles_df['ctime'] = region_articles_df['ctime'].astype(str) + region_articles_df = region_articles_df[region_articles_df['ctime'].str.isnumeric()] + # 分组 + province_df_dict = {} + for name, df in region_articles_df.groupby('province'): + if df.shape[0] < 5000: + continue + # 分完组之后可以取出重复数据 + df = df[['article_id', 'expo_num', 'click_num', 'click_rate', 'ctime', 'cate']].\ + drop_duplicates(subset='article_id').reset_index(drop=True) + province_df_dict[name] = df + # 给每个省份筛选一部分优质物料 + province_results_dict = {} + for province, df in province_df_dict.items(): + province_results_dict[province] = self.get_province_articles(df, N, cur_time) + print("province_recall success...") + return province_results_dict + + +if __name__ == "__main__": + root_path = '/data1/ryluo/5w_data/' + region_recall = RegionRecall(log_data_path, user_info_path, doc_info_path) + province_results_dict = region_recall.province_recall(N=300, cur_time='2021-07-03') + # 这里的召回内容还没有落盘 + print(province_results_dict) + # TODO 召回结果落盘逻辑 \ No newline at end of file