参考自 https://github.com/xufengtt/recom_teach_code,学习记录。
环境配置(maxcompute+dataworks)
- 下载天猫推荐数据集;
- 开启 aliyun 的 maxcompute,dataworks,pai;
- 使用 odpscmd 上传本地数据,具体配置方法是在 conf 文件夹配置 odps_config.ini 文件,填写 project_name(recom_maxcompute)、access_id、access_key、end_point 参数;进入 bin 目录运行
odpscmd
;
建立数据特征
上传数据——数据运营层
- 在 dataworks 创建表
CREATE TABLE IF NOT EXISTS item_dim (item_id STRING,title STRING,pict_url STRING,category STRING,brand_id STRING,seller_id STRING
) LIFECYCLE 90;
- 上传数据
odpscmd -e "tunnel upload tianchi_2014001_rec_tmall_product.txt item_dim -fd '\u0001';"
- 查看上传结果
SELECT * FROM item_dim LIMIT 100;
- 同样方式上传其他数据
CREATE TABLE IF NOT EXISTS user_item_beh_log (item_id STRING ,user_id STRING ,action STRING ,vtime STRING
) LIFECYCLE 90;
odpscmd -e "tunnel upload tianchi_2014002_rec_tmall_log_parta.txt user_item_beh_log -fd '\u0001';"
odpscmd -e "tunnel upload tianchi_2014002_rec_tmall_log_partb.txt user_item_beh_log -fd '\u0001';"
odpscmd -e "tunnel upload tianchi_2014002_rec_tmall_log_partc.txt user_item_beh_log -fd '\u0001';"
数据仓库层
- 首先查看一些指标,可以看到这个数据集是离工业界数据量较近的数据
SELECT max(vtime), min(vtime), COUNT(DISTINCT item_id), COUNT(DISTINCT user_id), COUNT(*)
FROM user_item_beh_log;
数据仓库中分层模型主要有:
- 三层模型:数据运营层(ODS,Operational Data Store);数据仓库层(DW,Data Warehouse Layer);数据服务层(ADS,Application Data Service)
- 四层模型:添加了多维明细层(DWS,Detailed Warehouse Store)
本项目的数据仓储设计:
- 数据运营层:使用上传数据表
- 数据仓库层:需要建设的粒度,1)时间粒度:天维度存储;2)行为类别:点击/收藏/加购/购买不同行为分开存储;3)key 粒度,用户侧特征 user+brand, user+item, user+cate1_id, user+cate2_id, user;品牌测特征 brand+user,brand+item;用户与品牌交互特征 user+brand,user+brand+item,user+brand+cate1_id, user+brand+cate2_id。因此,按照行为类型和时间进行分拆,点击/收藏/加购/购买四张表,且每张表都以天粒度行为时间为分区。为能支持以上所有特征计算,每个表的字段有 user, brand, item, cate1_id, cate2_id
- 数据服务层:品牌统计特征,用户维度统计特征,用户与品牌交叉统计特征
- 维表(属性表):商品品牌/类目/名称
CREATE TABLE IF NOT EXISTS dw_user_item_click_log (user_id STRING COMMENT '用户id',item_id STRING COMMENT '商品id',brand_id STRING COMMENT '品牌id',seller_id STRING COMMENT '商家id',cate1 STRING COMMENT '类目1id',cate2 STRING COMMENT '类目2id',op_time STRING COMMENT '点击时间'
) PARTITIONED BY (ds STRING COMMENT '日期分区') LIFECYCLE 90;
ds(date string)是创建分区表时的分区字段,不是表的一个普通字段,所以不需要显式声明,以上面这种方式建立四张行为类别表,天维度分区,以下面这种方式添加数据
INSERT OVERWRITE TABLE dw_user_item_click_log PARTITION (ds)
SELECT user_id, t2.item_id, brand_id, seller_id, cate1, cate2, vtime, ds
FROM (SELECT user_id, item_id, vtime, to_char(TO_DATE(vtime, 'yyyy-mm-dd hh:mi:ss'), 'yyyymmdd') as dsFROM user_item_beh_log WHERE action = 'click'
) t1 join (SELECT item_id, brand_id, seller_id, SPLIT_PART(category, '-', 1) AS cate1, SPLIT_PART(category, '-', 2) AS cate2FROM item_dim
) t2 on t1.item_id = t2.item_id;
验证是否成功,SELECT * FROM dw_user_item_click_log WHERE ds = ${bizdate} LIMIT 10;
数据服务层
- 品牌维度特征:点击/收藏/购物车/支付,创建和填充一个用于存储品牌维度特征的表,是基于用户在一定时间范围内与品牌互动的行为数据,下面代码是计算了过去60天内(包括 ${bizdate} 当天)的点击、收藏、购物车和支付的用户数,使用 DISTINCT 确保每个用户只被计算一次。
CREATE TABLE IF NOT EXISTS brand_stat_feature_ads (brand_id STRING,click_num BIGINT,collect_num BIGINT,cart_num BIGINT,alipay_num BIGINT
) PARTITIONED BY (ds STRING) LIFECYCLE 60;INSERT OVERWRITE TABLE brand_stat_feature_ads PARTITION(ds=${bizdate})
SELECT t1.brand_id, click_num,if(collect_num is null, 0, collect_num),if(cart_num is null, 0, cart_num),if(alipay_num is null, 0, alipay_num)
FROM (SELECT brand_id, COUNT(DISTINCT user_id) AS click_numFROM dw_user_item_click_logwhere ds <= ${bizdate} and ds >= TO_CHAR(DATEADD(TO_DATE(${bizdate}, 'yyyymmdd'), -60, 'dd'), 'yyyymmdd')GROUP BY brand_id
) t1 left join (SELECT brand_id, COUNT(DISTINCT user_id) AS collect_numFROM dw_user_item_collect_logwhere ds <= ${bizdate} and ds >= TO_CHAR(DATEADD(TO_DATE(${bizdate}, 'yyyymmdd'), -60, 'dd'), 'yyyymmdd')GROUP BY brand_id
) t2 on t1.brand_id=t2.brand_id
left join (SELECT brand_id, COUNT(DISTINCT user_id) AS cart_numFROM dw_user_item_cart_logwhere ds <= ${bizdate} and ds >= TO_CHAR(DATEADD(TO_DATE(${bizdate}, 'yyyymmdd'), -60, 'dd'), 'yyyymmdd')GROUP BY brand_id
) t3 on t1.brand_id=t3.brand_id
left join (SELECT brand_id, COUNT(DISTINCT user_id) AS alipay_numFROM dw_user_item_alipay_logwhere ds <= ${bizdate} and ds >= TO_CHAR(DATEADD(TO_DATE(${bizdate}, 'yyyymmdd'), -60, 'dd'), 'yyyymmdd')GROUP BY brand_id
) t4 on t1.brand_id=t4.brand_id;
- 用户维度特征:创建存储用户点击/收藏/购物车/支付行为的特征,下面代码是使用 COUNT 来计算每个用户在 3days 窗口内点击的不同商品、品牌、卖家、一级分类和二级分类的数量,以及点击的天数,其他时间窗口和行为同理。
CREATE TABLE IF NOT EXISTS user_click_beh_feature_ads (user_id STRING,item_num_3d BIGINT,brand_num_3d BIGINT,seller_num_3d BIGINT,cate1_num_3d BIGINT,cate2_num_3d BIGINT,cnt_days_3d BIGINT,... --3d,7d,15d,60d,90d
)PARTITIONED BY (ds string) LIFECYCLE 60;INSERT OVERWRITE TABLE user_click_beh_feature_ads PARTITION (ds=${bizdate}) -- 用户点击行为特征
SELECT user_id,COUNT(DISTINCT IF(ds>=TO_CHAR(DATEADD(TO_DATE(${bizdate}, 'yyyymmdd'),-3, 'dd'),'yyyymmdd'), item_id, null)),COUNT(DISTINCT IF(ds>=TO_CHAR(DATEADD(TO_DATE(${bizdate}, 'yyyymmdd'),-3, 'dd'),'yyyymmdd'), brand_id, null)),COUNT(DISTINCT IF(ds>=TO_CHAR(DATEADD(TO_DATE(${bizdate}, 'yyyymmdd'),-3, 'dd'),'yyyymmdd'), seller_id, null)),COUNT(DISTINCT IF(ds>=TO_CHAR(DATEADD(TO_DATE(${bizdate}, 'yyyymmdd'),-3, 'dd'),'yyyymmdd'), cate1, null)),COUNT(DISTINCT IF(ds>=TO_CHAR(DATEADD(TO_DATE(${bizdate}, 'yyyymmdd'),-3, 'dd'),'yyyymmdd'), cate2, null)),COUNT(DISTINCT IF(ds>=TO_CHAR(DATEADD(TO_DATE(${bizdate}, 'yyyymmdd'),-3, 'dd'),'yyyymmdd'), ds, null))...--3d,7d,15d,60d,90d
FROM dw_user_item_click_log
WHERE ds<=${bizdate} AND ds>=TO_CHAR(DATEADD(TO_DATE(${bizdate}, 'yyyymmdd'),-90, 'dd'),'yyyymmdd')
GROUP BY user_id;
- 用户与品牌交叉特征
实际场景中,比如20w个品牌,可能只有几千个头部品牌才有实力使用推荐服务,因此选取头部品牌减少计算量,也能实验出不错的效果,这里建立维表brand_cate_dim
,以近30天为准,按照支付用户数量选取top500的品牌:
create table if not exists brand_top500_alipay_dim (brand_id string,alipay_num bigint
)LIFECYCLE 60;insert OVERwrite table brand_top500_alipay_dim
select brand_id, alipay_num
from (select brand_id, count(DISTINCT user_id) as alipay_numfrom dw_user_item_alipay_logwhere ds<=${bizdate} and ds>to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-30, 'dd'),'yyyymmdd')and brand_id is not nullgroup by brand_id
)t1 order by alipay_num desc limit 500
;
然后计算用户和品牌的点击商品多样性(clk_num)、点击频次(clk_num),点击品牌天数(clk_day),这里的窗口为 90/60/30/15/7/3/1d,这里增加1d这个强特征,因为1d的短期窗口的行为有关键影响;对于收藏,有收藏商品多样性(clt_item),收藏总次数和天数没有重要意义,购物车和购买有商品多样性(cart_item),购物车频次(cart_num),支付多样性(pay_item),支付频次(pay_num)。这里都使用的top500的品牌,减少计算量。
create table if not exists user_brand_cross_beh_feature_ads (user_id string,brand_id string,clk_item_90d bigint,...--60/30/15/7/3/1dclk_num_90d bigint,...clk_day_90d bigint,...clt_item_90d bigint,...cart_item_90d bigint,...cart_num_90d bigint,...pay_item_90d bigint,...pay_num_90d bigint,...
)PARTITIONED BY (ds STRING) LIFECYCLE 60;insert overwrite table user_brand_cross_beh_feature_ads partition (ds=${bizdate})
select t1.user_id, t1.brand_id,clk_item_90d,...clk_num_90d,...clk_day_90d,...if(clt_item_90d is null, 0, clt_item_90d) as clk_item_90d,...if(cart_item_90d is null, 0, cart_item_90d) as cart_item_90d,...if(cart_num_90d is null, 0, cart_num_90d) as cart_num_90d,...if(pay_item_90d is null, 0, pay_item_90d) as pay_item_90d,...if(pay_num_90d is null, 0, pay_num_90d) as pay_num_90d,...from (select user_id, brand_id,count(distinct if(ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-90, 'dd'),'yyyymmdd'), item_id, null)) as clk_item_90d...,count(if(ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-90, 'dd'),'yyyymmdd'), item_id, null)) as clk_num_90d...,count(distinct if(ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-90, 'dd'),'yyyymmdd'), ds, null)) as clk_day_90d...from (select t1.user_id, t2.brand_id, t1.item_id, t1.dsfrom (select user_id, brand_id, item_id, dsfrom dw_user_item_click_logwhere ds<=${bizdate} and ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-90, 'dd'),'yyyymmdd'))t1 join (select brand_idfrom brand_top500_alipay_dim where ds=${bizdate})t2 on t1.brand_id=t2.brand_id)t1group by user_id, brand_id
)t1 left join (select user_id, brand_id,count(if(ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-90, 'dd'),'yyyymmdd'), item_id, null)) as clt_item_90d...from (select t1.user_id, t2.brand_id, t1.item_id, t1.dsfrom (select user_id, brand_id, item_id, dsfrom dw_user_item_collect_logwhere ds<=${bizdate} and ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-90, 'dd'),'yyyymmdd'))t1 join (select brand_idfrom brand_top500_alipay_dim where ds=${bizdate})t2 on t1.brand_id=t2.brand_id)t1group by user_id, brand_id
)t2 on t1.user_id=t2.user_id and t1.brand_id=t2.brand_id
...
此外,用户与品牌交叉特征还有用户与品牌的二级类目的交互数据,先建立品牌二级类目维表(暂时先不考虑一级类目),这里取top500的品牌的每个品牌下用户数最多的top2二级分类,ROW_NUMBER 是窗口函数为结果集中的每一行分配唯一的序号,OVER子句定义窗口函数的分区和排序规则,这里表示数据根据brand_id列的值进行分区,意味着每个品牌ID有自己独立的行号序列。
create table if not exists brand_cate2_dim (brand_id string,cate2 string
)lifecycle 60;insert overwrite table brand_cate2_dim
select brand_id, cate2
from (select brand_id, cate2, ROW_NUMBER() OVER(PARTITION BY brand_id ORDER BY num desc) AS numberfrom (select t1.brand_id, t1.cate2, count(distinct user_id) as numfrom (select user_id, ds, brand_id, cate2from dw_user_item_alipay_logwhere ds<${bizdate} and ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-30, 'dd'),'yyyymmdd'))t1 join (select brand_idfrom brand_top500_alipay_dim where ds=${bizdate})t2 on t1.brand_id=t2.brand_idgroup by t1.cate2, t1.brand_id)t1
)t1 where number <=2
;
得到二级分类维表后,计算用户和二级分类交叉数据就简单了,这里使用cate2(二级分类)作为连接条件关联日志和二级分类维表,这样做的目的是将用户日志和品牌的二级分类信息关联起来,尽管连接条件是cate2,但查询的最终目的是获取与特定二级分类相关的用户点击行为,并将其与品牌ID关联,所以查询仍然是选择brand_id,将用户行为与品牌信息关联起来,以便进行更有针对性地分析和决策,因此这里是select t2.brand_id。/*+mapjoin(t2)*/
这个提示告诉查询优化器在执行连接操作时,将t2表加载到内存中,在map阶段完成与t1表的连接,前提是t2表足够小,这可以显著提高查询性能。
create table if not exists user_brand_cate2_cross_beh_feature_ads (user_id string,brand_id string,clk_item_90d bigint,...--60/30/15/7/3/1d...
)PARTITIONED BY (ds STRING) LIFECYCLE 60;insert overwrite table user_brand_cross_beh_feature_ads partition (ds=${bizdate})
select t1.user_id, t1.brand_id,clk_item_90d,...if(clt_item_90d is null, 0, clt_item_90d) as clk_item_90d,...from (select user_id, brand_id,count(distinct if(ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-90, 'dd'),'yyyymmdd'), item_id, null)) as clk_item_90d...from (select /*+mapjoin(t2)*/t1.user_id, t2.brand_id, item_id, t1.dsfrom (select user_id, brand_id, item_id, dsfrom dw_user_item_click_logwhere ds<=${bizdate} and ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-90, 'dd'),'yyyymmdd'))t1 join (select brand_idfrom brand_top500_alipay_dim where ds=${bizdate})t2 on t1.brand_id=t2.brand_id)t1group by user_id, brand_id
)t1 left join (select user_id, brand_id,count(if(ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-90, 'dd'),'yyyymmdd'), item_id, null)) as clt_item_90d...from (select t1.user_id, t2.brand_id, item_id, t1.dsfrom (select user_id, brand_id, item_id, dsfrom dw_user_item_collect_logwhere ds<=${bizdate} and ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-90, 'dd'),'yyyymmdd'))t1 join (select brand_idfrom brand_top500_alipay_dim where ds=${bizdate})t2 on t1.brand_id=t2.brand_id)t1group by user_id, brand_id
)t2 on t1.user_id=t2.user_id and t1.brand_id=t2.brand_id
...
自此,这里的应用层特征开发完成了,利用的是ODPS SQL,其他平台的SQL语句虽然不同,但思路是相同的。
建立样本
购买模型是预测一个用户对该品牌买 or 不买,是一个二分类问题,label 0(负样本)表示不买,1(正样本)表示购买,我们有用户池子,历史一年访问过该平台的用户,样本建设的维度,品牌,以品牌为度选正样本和负样本,正样本,以实验时间那天为起点,未来7天购买的用户为正样本;负样本,未购买的用户为负样本。
比如假设实验时间是20130701这天,则20130701-20130707期间购买该品牌的用户是正样本,用户池子中未购买的为负样本。
用户池子的确定是根据业务要求确定,目标召回率要能够最大化的召回目标用户;
正样本数据:
create table if not exists user_pay_sample_pos (user_id string,brand_id string
) partitioned by (ds string) LIFECYCLE 60;INSERT OVERWRITE TABLE user_pay_sample_pos PARTITION (ds='${bizdate}')
select t1.user_id, t1.brand_id
from (select distinct user_id , brand_idfrom dw_user_item_alipay_log where ds>${bizdate} and ds<=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),7, 'dd'),'yyyymmdd')
) t1 join (select brand_idfrom brand_top500_aliyun_num where ds=${bizdate}
) t2 on t1.brand_id=t2.brand_id;
负样本数据:
由于正负样本数量及其不平衡,因此使用以下查询建立用户id维表,起到随机抽样的作用,为每个不同的user_id生成一个随机数rnd,然后根据随机数对用户进行排序,并分配一个序号number,然后进行抽样,并且排除正负样本相同的情况:
CREATE TABLE if NOT EXISTS user_id_number (user_id string,number bigint
) LIFECYCLE 60;select user_id, ROW_NUMBER() OVER(ORDER BY rnd DESC) AS number
from (select user_id,RAND() AS rndfrom (select DISTINCT user_id from user_item_beh_log where ds=${bizdate}) t1
) t1;
负样本数据为:
INSERT OVERWRITE TABLE user_pay_sample PARTITION(ds=${bizdate})
select t1.neg_user_id as user_id, t1.brand_id, 0 as label
from (--去重,防止重复随机数select DISTINCT t1.brand_id, t2.user_id as neg_user_idfrom (--转换数组为三元组格式select TRANS_ARRAY(2, ',', user_id, brand_id, rand_neg) as (user_id, brand_id, rand_neg)from (--生成10个随机数,用逗号连接成数组select user_id ,brand_id, concat(cast(rand()*10000000 as bigint),',',cast(rand()*10000000 as bigint),',',cast(rand()*10000000 as bigint),',',cast(rand()*10000000 as bigint),',',cast(rand()*10000000 as bigint),',',cast(rand()*10000000 as bigint),',',cast(rand()*10000000 as bigint),',',cast(rand()*10000000 as bigint),',',cast(rand()*10000000 as bigint),',',cast(rand()*10000000 as bigint)) as rand_negfrom user_pay_sample_poswhere ds = '${bizdate}')t1)t1 join (select user_id, numberfrom user_id_number)t2 on t1.rand_neg=t2.number
)t1 left anti join (--使用 anti join 排除正样本,防止负样本和正样本是同一个样本select user_id, brand_idfrom user_pay_sample_poswhere ds = '${bizdate}'
)t2 on t1.neg_user_id=t2.user_id and t1.brand_id=t2.brand_id
union all--负样本和正样本合并
select user_id, brand_id, 1 as label
from user_pay_sample_pos
where ds = '${bizdate}'
;
样本与特征 join
下面代码是将样本和特征 join,这是以某一天为基准的
create table if not exists user_pay_sample_feature_join (user_id string,brand_id string,label bigint,click_item_num_3d BIGINT,click_brand_num_3d BIGINT,click_seller_num_3d BIGINT,...
)PARTITIONED by (ds string) LIFECYCLE 60;select t1.user_id, t1.brand_id, t1.label,if(t2.item_num_3d is null, 0, t2.item_num_3d),if(t2.brand_num_3d is null, 0, t2.brand_num_3d),if(t2.seller_num_3d is null, 0, t2.seller_num_3d),...
from (select user_id, brand_id, label from user_pay_samplewhere ds=${bizdate}
) t1 left join (select *from user_click_beh_feature_adswhere ds=${bizdate}
) t2 on t1.user_id=t2.user_id
left join (select *from user_collect_beh_feature_adswhere ds=${bizdate}
) t3 on t1.user_id=t3.user_id
left join (select *from user_cart_beh_feature_adswhere ds=${bizdate}
) t4 on t1.user_id=t4.user_id
left join (select *from user_alipay_beh_feature_adswhere ds=${bizdate}
) t5 on t1.user_id=t5.user_id
left join (select *from brand_stat_feature_adswhere ds=${bizdate}
) t6 on t1.brand_id=t6.brand_id
left join (select *from user_brand_cross_beh_feature_adswhere ds=${bizdate}
) t7 on t1.user_id=t7.user_id and t1.brand_id=t7.brand_id
left join (select *from user_brand_cate2_cross_beh_feature_adswhere ds=${bizdate}
) t8 on t1.user_id=t8.user_id and t1.brand_id=t8.brand_id
where (t2.cnt_days_90d is not null or t2.cate1_num_90d is not null or t3.item_num_90d is not nullor t4.item_num_90d is not null or t5.item_num_90d is not null or t7.clk_item_90d is not null or t8.clk_item_90d is not null)
;
而一天的样本量实际还是比较少的,由于总数据是20130401-20131001,为了保证90day的数据存在,所以从20130701开始提取特征,7day为一周期(未来七天购买情况作为正负样本),所以有0708、0715、0722、0729…0916等12天作为训练集,留一天0923作为测试集,约1000+w的样本量。
补充数据,按照上面思路开始跑取所需数据,这里可以根据ds=$(bizdate)依次运行添加日期参数添加12次,但这样每个文件都需要选12次,效率太低,因此在这里使用aliyun平台的调度配置。
首先,先查看表与表的依赖关系,目前为止,我们的文件结构为:
- 原始表:
user_item_beh_log
- 原始表分区后的四张表:
dw_user_item_click_log
,dw_user_item_collect_log
,dw_user_item_cart_log
,dw_user_item_alipay_log
- ODS数据层:
user_item_beh_log.sql
- DW数据层:
user_beh_log_dw.sql
- ADS数据层(这里面的四个sql是没有依赖关系的,所以可以并发的跑):
-
- 品牌维度数据:
brand_stat_feature_ads
- 品牌维度数据:
-
- 用户维度数据:
user_beh_feature_ads
- 用户维度数据:
-
- 用户品牌交互数据:
user_brand_cate2_cross_beh_feature_ads
,user_brand_cross_beh_feature_ads
- 用户品牌交互数据:
- 维表:
brand_cate_dim
——top500支付的品牌及其对应的top2二级分类,user_id_dim
——把user_id编号的表 - gbdt 样本:
-
user_pay_sample
,这是划分正负样本的数据,这个表也可以与上面四个并发,所以也依赖于virtual
-
user_pay_sample_feature_join
,样本和特征join的表,这里依赖了上面的特征表和样本表,所以最后跑。
-
查看了依赖关系之后,创建一个虚拟节点
virtual_depend
,里面放一个无成本的sql语句比如select 1
,修改其调度配置然后提交。 -
把五个并行的sql节点依赖于这个节点,以
brand_stat_feature_ads.sql
为例,修改调度配置,调度参数为bizdate,参数值设置$[yyyymmdd-1];依赖的上游节点为virtual_depend,这里要注意删除输入依赖,否则从代码解析会导致多余依赖,完成后保存并提交。 -
user_pay_sample_feature_join
里的所有表都有输入依赖,所以不需删除,直接从代码解析依赖即可。 -
最终的周期任务如下:
至此,训练集特征数据全部补完,下面看一下这个数据长什么样子
SELECT * FROM user_pay_sample_feature_join where ds='20130909' limit 100;
评估集建立样本、样本与特征 join
- 建立样本(user_pay_sample_eval)
取 9.23-10.1 的周期作为评估集,负样本用户量约 900w,为了节省资源,这里对负样本用户随机采样 300w 作为测试集,这里品牌选取 b47686 韩都衣舍 b56508 三星手机 b62063 诺基亚 b78739 LILY
,用于查看交叉品牌不同的影响,使用 mapjoin
对用户和品牌进行笛卡尔积,然后将负样本(随机选取的)和正样本(从 dw_user_item_alipay_log 表中得到的支付用户)合并 :
create table if not exists user_pay_sample_eval (user_id string,brand_id string,label bigint
)PARTITIONED BY (ds STRING) LIFECYCLE 60;insert OVERWRITE TABLE user_pay_sample_eval partition (ds=${bizdate})
select user_id, brand_id, max(label) as label
from (select /*+mapjoin(t2)*/t1.user_id, t2.brand_id, 0 as labelfrom (select user_idfrom user_id_numberwhere number<=3000000)t1 join (--b47686 韩都衣舍--b56508 三星手机--b62063 诺基亚--b78739 LILYselect 'b47686' as brand_idunion allselect 'b56508' as brand_idunion allselect 'b62063' as brand_idunion allselect 'b78739' as brand_id)t2union allselect user_id, brand_id, 1 as labelfrom dw_user_item_alipay_logwhere ds > '${bizdate}' and ds <= to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),7, 'dd'),'yyyymmdd') and brand_id in ('b47686','b56508','b62063','b78739')group by user_id, brand_id
)t1 group by user_id, brand_id
- 样本与特征 join(user_pay_sample_feature_join_eval)
create table if not exists user_pay_sample_feature_join_eval (user_id string,brand_id string,label bigint,click_item_num_3d BIGINT,click_brand_num_3d BIGINT,click_seller_num_3d BIGINT,...
)PARTITIONED by (ds string) LIFECYCLE 60;select t1.user_id, t1.brand_id, t1.label,if(t2.item_num_3d is null, 0, t2.item_num_3d),if(t2.brand_num_3d is null, 0, t2.brand_num_3d),if(t2.seller_num_3d is null, 0, t2.seller_num_3d),...
from (select user_id, brand_id, label from user_pay_samplewhere ds=${bizdate}
) t1 left join (select *from user_click_beh_feature_adswhere ds=${bizdate}
) t2 on t1.user_id=t2.user_id
left join (select *from user_collect_beh_feature_adswhere ds=${bizdate}
) t3 on t1.user_id=t3.user_id
left join (select *from user_cart_beh_feature_adswhere ds=${bizdate}
) t4 on t1.user_id=t4.user_id
left join (select *from user_alipay_beh_feature_adswhere ds=${bizdate}
) t5 on t1.user_id=t5.user_id
left join (select *from brand_stat_feature_adswhere ds=${bizdate} and brand_id in ('b47686','b56508','b62063','b78739')
) t6 on t1.brand_id=t6.brand_id
left join (select *from user_brand_cross_beh_feature_ads where ds=${bizdate} and brand_id in ('b47686','b56508','b62063','b78739')
) t7 on t1.user_id=t7.user_id and t1.brand_id=t7.brand_id
left join (select *from user_brand_cate2_cross_beh_feature_adswhere ds=${bizdate} and brand_id in ('b47686','b56508','b62063','b78739')
) t8 on t1.user_id=t8.user_id and t1.brand_id=t8.brand_id
where (t2.cnt_days_90d is not null or t2.cate1_num_90d is not null or t3.item_num_90d is not nullor t4.item_num_90d is not null or t5.item_num_90d is not null or t7.clk_item_90d is not null or t8.clk_item_90d is not null)
这里对品牌特征和用户及品牌交叉特征做了 brand_id in ('b47686','b56508','b62063','b78739')
的限制,减少品牌数,测试集数据如下:
SELECT * FROM user_pay_sample_feature_join_eval where ds='20130923' limit 100
下面看一下在训练集中上面四个品牌的样本个数
select t1.brand_id, count(*)
from (select *FROM user_pay_sample_feature_join where ds>='20130701' and ds <= '20130916'
) t1 join (--b47686 韩都衣舍--b56508 三星手机--b62063 诺基亚--b78739 LILYselect 'b47686' as brand_idunion allselect 'b56508' as brand_idunion allselect 'b62063' as brand_idunion allselect 'b78739' as brand_id
) t2 on t1.brand_id = t2.brand_id
group by t1.brand_id
可以看到整体样本量不大,韩都衣舍和三星比另外两个牌子样本量大很多。
训练模型-GBDT
思路
- baseline:使用每个品牌的数据,单独训练一个模型,并预测结果。
- 混合训练:韩都衣舍+三星手机,这是不同行业的品牌混合后的效果;韩都衣舍+Lily,这是同行业的品牌混合后的效果。
- 所有品牌混合训练
读取数据
这里使用 PyODPS 读写 MaxCompute 表数据:文档
训练过程
- 安装必要的库
pip install lightgbm pandas scikit-learn pyodps
- 训练模型,这里分别用 gbdt 训练四个品牌的模型
import lightgbm as lgb
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_scoreimport numpy as np
import osfrom odps import ODPS
from odps.df import DataFrame
# 建立链接。
access_id =
access_key =
project = 'recom_maxcompute_dev'
endpoint = 'http://service.cn-shanghai.maxcompute.aliyun.com/api'o = ODPS(access_id,access_key,project,endpoint,
)# 读取数据。
brand_id='b78739'
sql = '''
SELECT *
FROM recom_maxcompute_dev.user_pay_sample_feature_join
WHERE ds>='20130701' and ds<='20130916' and brand_id='{brand}'
;
'''.format(brand=brand_id)
print(sql)
query_job = o.execute_sql(sql)
result = query_job.open_reader(tunnel=True)
df = result.to_pandas(n_process=4) #n_process配置可参考机器配置,取值大于1时可以开启多线程加速。
print('read data finish')
# 假设您的数据已经加载到DataFrame中,名为df
# df = pd.read_csv('your_data.csv') # 如果数据来自CSV文件# 分离特征和标签
X = df.drop(['label', 'user_id', 'brand_id', 'ds'], axis=1)
y = df['label']# 分割数据集为训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)# 创建LightGBM数据结构
lgb_train = lgb.Dataset(X_train, label=y_train)
lgb_eval = lgb.Dataset(X_test, label=y_test, reference=lgb_train)
print('post process data finish')
# 设置参数
params = {'boosting_type': 'gbdt','objective': 'binary','metric': 'auc','num_leaves': 31,'learning_rate': 0.08,'feature_fraction': 0.9,'bagging_fraction': 0.8,'bagging_freq': 5,'verbose': -1 # 设置为-1以减少输出
}# 存储AUC值的列表
train_aucs = []
test_aucs = []# 开始训练,从5棵树开始,直到100棵树,每5棵树为一个间隔
with open(f'./models_{brand_id}/auc_scores.txt', 'w') as f:for i in range(20, 70, 1):print(f"Training model with {i} trees...")# 训练模型gbm = lgb.train(params,lgb_train,num_boost_round=i,valid_sets=[lgb_train, lgb_eval],valid_names=['train', 'valid']) # 关闭详细日志# 使用最后一棵树进行预测并计算 AUCy_train_pred = gbm.predict(X_train)y_test_pred = gbm.predict(X_test)train_auc = roc_auc_score(y_train, y_train_pred)test_auc = roc_auc_score(y_test, y_test_pred)# 打印 AUCprint(f"Iteration {i}: Train AUC={train_auc:.4f}, Test AUC={test_auc:.4f}")# 保存模型gbm.save_model(f'./models_{brand_id}/model_{i}.txt')# 保存 AUCf.write(f"Iteration {i}: Train AUC={train_auc:.4f}, Test AUC={test_auc:.4f}\n")
- 训练日志
- 训练混合模型,这里学习率过小并没有收敛(测试集 AUC 一直增加,并没有减小),因此调大为 0.13,训练韩都衣舍+三星手机,韩都衣舍+LILY的混合模型
# 读取数据。
brand_id='b47686'
brand_id2='b56508'
sql = '''
SELECT *
FROM recom_maxcompute_dev.user_pay_sample_feature_join
WHERE ds>='20130701' and ds<='20130916' and brand_id in ('{brand}', '{brand2}')
;
'''.format(brand=brand_id, brand2=brand_id2)
with open(f'./models_{brand_id}_{brand_id2}/auc_scores.txt', 'w') as f:for i in range(20, 70, 1):print(f"Training model with {i} trees...")# 训练模型gbm = lgb.train(params,lgb_train,num_boost_round=i,valid_sets=[lgb_train, lgb_eval],valid_names=['train', 'valid']) # 关闭详细日志# 使用最后一棵树进行预测并计算 AUCy_train_pred = gbm.predict(X_train)y_test_pred = gbm.predict(X_test)train_auc = roc_auc_score(y_train, y_train_pred)test_auc = roc_auc_score(y_test, y_test_pred)# 打印 AUCprint(f"Iteration {i}: Train AUC={train_auc:.4f}, Test AUC={test_auc:.4f}")# 保存模型gbm.save_model(f'./models_{brand_id}_{brand_id2}/model_{i}.txt')# 保存 AUCf.write(f"Iteration {i}: Train AUC={train_auc:.4f}, Test AUC={test_auc:.4f}\n")
- 全行业混合训练
o = ODPS(access_id,access_key,project,endpoint,
)# 读取数据。
sql = '''
SELECT *
FROM recom_maxcompute_dev.user_pay_sample_feature_join
WHERE ds>='20130701' and ds<='20130916'
;
'''
print(sql)
query_job = o.execute_sql(sql)
result = query_job.open_reader(tunnel=True)
df = result.to_pandas(n_process=52) #n_process配置可参考机器配置,取值大于1时可以开启多线程加速。
print('read data finish')
# 删除非特征列
df = df.drop(columns=['user_id', 'brand_id', 'ds'])# 分离特征和标签
X = df.drop(['label'], axis=1)
y = df['label']# 分割数据集为训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)# 创建LightGBM数据结构
lgb_train = lgb.Dataset(X_train, label=y_train)
lgb_eval = lgb.Dataset(X_test, label=y_test, reference=lgb_train)
print('post process data finish')
# 设置参数
params = {'boosting_type': 'gbdt','objective': 'binary','metric': 'auc','num_leaves': 31,'learning_rate': 0.1,'feature_fraction': 0.9,'bagging_fraction': 0.8,'bagging_freq': 5,'verbose': -1 # 设置为-1以减少输出
}# 存储AUC值的列表
train_aucs = []
test_aucs = []# 开始训练,从5棵树开始,直到100棵树,每5棵树为一个间隔
with open(f'./models_all/auc_scores.txt', 'w') as f:for i in range(5, 100, 5):print(f"Training model with {i} trees...")# 训练模型gbm = lgb.train(params,lgb_train,num_boost_round=i,valid_sets=[lgb_train, lgb_eval],valid_names=['train', 'valid']) # 关闭详细日志# 使用最后一棵树进行预测并计算 AUCy_train_pred = gbm.predict(X_train)y_test_pred = gbm.predict(X_test)train_auc = roc_auc_score(y_train, y_train_pred)test_auc = roc_auc_score(y_test, y_test_pred)# 打印 AUCprint(f"Iteration {i}: Train AUC={train_auc:.4f}, Test AUC={test_auc:.4f}")# 保存模型gbm.save_model(f'./models_all/model_{i}.txt')# 保存 AUCf.write(f"Iteration {i}: Train AUC={train_auc:.4f}, Test AUC={test_auc:.4f}\n")
评估方案-GBDT
- 模型训练过程指标
模型训练过程中的指标是 AUC,在模型训练过程中,通过对比训练集和验证集的 AUC 指标,来确定是否过拟合,当需要全面评估分类模型的性能时,ROC曲线是一个非常重要的工具。ROC曲线通过展示模型在所有可能的分类阈值下的性能,提供了一个全面的视角来观察模型的真阳性率(TPR)和假阳性率(FPR)之间的关系。ROC曲线考虑了模型在所有可能的阈值下的性能,这使得它能够提供一个全面的模型性能视图。 - 模型业务离线评估指标
模型业务离线评估指标使用 topn 召回率,假设用户要投放 n 个人,评估集里用户购买该品牌的人有 M 个人,而模型预测的 top n 个人中实际购买的人有 m 个人,则召回率为:
r e c a l l t o p n = m / M recall_{topn}=m/M recalltopn=m/M
在推荐系统或信息检索等业务场景中,Top-N召回率更直接地反映了业务目标,即在前N个推荐项中捕获用户感兴趣的项目的能力。这与AUC关注的整体分类性能有所不同。Top-N召回率模拟了用户可能查看的推荐列表的前N项,这与用户实际交互的场景更为贴近。而AUC则提供了一个全面的分类性能视图,可能不会直接反映用户在特定推荐场景下的体验。
总共有 300w 个用户,这里就看 top 3k,5k,1w,5w,10w 的召回率.
为了节省计算资源,这里使用分批推理的方式,因此对user_pay_sample_feature_join_eval
表添加随机数,分十批进行推理
分批推理
import pandas as pd
import lightgbm as lgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_scoreimport numpy as np
import osfrom odps import ODPS
from odps.df import DataFrameaccess_id =
access_key =
project = 'recom_maxcompute_dev'
endpoint = 'http://service.cn-shanghai.maxcompute.aliyun.com/api'o = ODPS(access_id,access_key,project,endpoint,
)# 选取根据品牌数据和树个数的模型
brand_id='b78739'
trees = '28'# 读取数据。
def load_and_predict(model_path):# 初始化预测和标签列表predictions = []labels = []# 加载模型gbm = lgb.Booster(model_file=model_path)# 分批读取数据for num in range(10):# 读取数据sql = '''SELECT *FROM recom_maxcompute_dev.user_pay_sample_feature_join_evalWHERE ds='20130923' and brand_id='{brand}' and rnd>{start} and rnd<={end};'''.format(brand=brand_id, start=num/10.0, end=(num+1)/10.0)print(sql)query_job = o.execute_sql(sql)result = query_job.open_reader(tunnel=True)print('read data finish')df = result.to_pandas(n_process=4) #n_process配置可参考机器配置,取值大于1时可以开启多线程加速。# 删除非特征列chunk = df.drop(columns=['user_id', 'brand_id', 'ds', 'rnd'])# 删除非特征列 'label'X_test = chunk.drop('label', axis=1)y_test = chunk['label']y_pred = gbm.predict(X_test)# 预测当前批次数据predictions.extend(y_pred)labels.extend(y_test)return predictions, labelsdef calculate_top_k_ratio(predictions, labels, top_k_list):# 将预测和标签转换成DataFrameresults_df = pd.DataFrame({'prediction': predictions, 'label': labels})# 按预测分数降序排序results_df = results_df.sort_values(by='prediction', ascending=False)# 计算总正例数total_positives = (results_df['label'] == 1).sum()# 计算不同 top 数量下的正例比例ratios = {}for k in top_k_list:top_k_df = results_df.head(k)top_k_positives = (top_k_df['label'] == 1).sum()ratio = top_k_positives / total_positivesratios[k] = ratioreturn ratios# 模型文件路径
model_path = f'models_{brand_id}/model_{trees}.txt'# 需要计算top数量
top_k_list = [1000, 3000, 5000, 10000, 50000]# 分批加载数据并预测
predictions, labels = load_and_predict(model_path)# 计算topk的正例比例
ratios = calculate_top_k_ratio(predictions, labels, top_k_list)# 输出结果
for k, ratio in ratios.items():print(f"Top {k} ratio of positive labels: {ratio:4f}")# 如果需要保存结果到文件
with open(f'models_{brand_id}/top_results.txt', 'w') as f:for k, ratio in ratios.items():f.write(f"Top {k} ratio of positive labels: {ratio: 4f}\n")
单一模型和两两混合模型结果
-
78739 LILY模型预测自己,tree 28,样本量 10877
-
47686 韩都衣舍模型预测自己,tree 50,样本量 90550
-
62063 诺基亚模型预测自己,tree 29,样本量 10430
-
56508 三星手机模型预测自己,tree 32,样本量 71389
可以看出使用手机品牌特征训练的模型不如衣服品牌,可能是因为衣服特征更明显,而手机特征更难挖掘。
- 下面是混合训练模型的预测结果:
- 韩都衣舍+三星手机特征训练的模型,tree52,用于预测韩都衣舍:
brand_id='b47686'
brand_model='b47686_b56508'
trees='52'
# 模型文件路径
model_path = f'models_{brand_model}/model_{trees}.txt'
原始结果
可以看到不同类型品牌混合训练,在数据配比相似的情况下,各 top 召回均有所上升,这可能是因为特征发掘更充分。
- 韩都衣舍+LILY特征训练的模型,tree22,用于预测韩都衣舍:
brand_id='b47686'
brand_model='b47686_b78739'
trees='22'
原始结果
可以看到 TOP 10000、5000上升了,其余都下降了,因为LILY的样本量远小于韩都衣舍,所以改变不明显,同时这种组合方式虽然是同类型品牌,但特征交叉效果一般,所以有升有降。
- 韩都衣舍+LILY特征训练的模型,tree22,用于预测LILY:
brand_id='b78739'
brand_model='b47686_b78739'
trees='22'
原始结果
可以看到,对LILY添加了韩都衣舍的大量样本混合训练,在预测LILY本身时有明显提升。
全样本训练模型结果
o = ODPS(access_id,access_key,project,endpoint,
)# 选取根据品牌数据和树个数的模型
brand_id='b47686'
# 78739,62063,56508# 读取数据。
def load_and_predict(model_path):# 初始化预测和标签列表predictions = []labels = []# 加载模型gbm = lgb.Booster(model_file=model_path)sql = '''SELECT *FROM recom_maxcompute_dev.user_pay_sample_feature_join_evalWHERE ds='20130923' and brand_id='{brand}' ;'''.format(brand=brand_id)print(sql)query_job = o.execute_sql(sql)result = query_job.open_reader(tunnel=True)print('read data finish')df = result.to_pandas(n_process=52) #n_process配置可参考机器配置,取值大于1时可以开启多线程加速。# 删除非特征列chunk = df.drop(columns=['user_id', 'brand_id', 'ds', 'rnd'])# 删除非特征列 'label'X_test = chunk.drop('label', axis=1)y_test = chunk['label']y_pred = gbm.predict(X_test)# 预测当前批次数据predictions.extend(y_pred)labels.extend(y_test)return predictions, labelsdef calculate_top_k_ratio(predictions, labels, top_k_list):# 将预测和标签转换成DataFrameresults_df = pd.DataFrame({'prediction': predictions, 'label': labels})# 按预测分数降序排序results_df = results_df.sort_values(by='prediction', ascending=False)# 计算总正例数total_positives = (results_df['label'] == 1).sum()# 计算不同 top 数量下的正例比例ratios = {}for k in top_k_list:top_k_df = results_df.head(k)top_k_positives = (top_k_df['label'] == 1).sum()ratio = top_k_positives / total_positivesratios[k] = ratioreturn ratiostrees='95'model_path=f'./models_all/model_{trees}.txt'
# 需要计算top数量
top_k_list = [1000, 3000, 5000, 10000, 50000]# 分批加载数据并预测
predictions, labels = load_and_predict(model_path)# 计算topk的正例比例
ratios = calculate_top_k_ratio(predictions, labels, top_k_list)# 输出结果
for k, ratio in ratios.items():print(f"Top {k} ratio of positive labels: {ratio:4f}")# 如果需要保存结果到文件
with open(f'top_results_{brand_id}.txt', 'w') as f:for k, ratio in ratios.items():f.write(f"Top {k} ratio of positive labels: {ratio: 4f}\n")
可以看出全样本训练效果有明显的提升。
b47686 韩都衣舍 | b56508 三星手机 | b62063 诺基亚 | b78739 LILY |
---|---|---|---|
0.114842 0.209537 0.265950 0.349899 0.533915 | 0.068056 0.104861 0.129861 0.164583 0.284722 | 0.102041 0.190476 0.217687 0.258503 0.353741 | 0.257225 0.361272 0.427746 0.488439 0.624277 |
0.141706 0.230356 0.288113 0.366017 0.537273 | 0.073611 0.103472 0.125000 0.177778 0.289583 | 0.149660 0.217687 0.231293 0.278912 0.394558 | 0.300578 0.416185 0.462428 0.528902 0.627168 |
然而,GBDT 对于大型稀疏特征的局限使得有相当一部分人群是无法召回的,因此接下来采用 DNN 深度学习模型来进行试验。
DNN 基础知识
线性回归——二分类
损失函数
假设我们有特征 X = [ x 1 , x 2 , … , x n ] X = [x_1, x_2, \ldots, x_n] X=[x1,x2,…,xn],则线性回归为:
y ^ = w 1 x 1 + w 2 x 2 + … + w n x n + b 0 \hat{y} = w_1 x_1 + w_2 x_2 + \ldots + w_n x_n + b_0 y^=w1x1+w2x2+…+wnxn+b0
由于二分类,目标值为0或者1,因此需要将 y ^ \hat{y} y^ 限制在0-1,使用sigmoid函数即可:
h W ( X ) = sigmoid ( x ) = 1 1 + e − ( w 1 x 1 + w 2 x 2 + … + w n x n + b 0 ) h_W(X) = \text{sigmoid}(x) = \frac{1}{1 + e^{-(w_1 x_1 + w_2 x_2 + \ldots + w_n x_n + b_0)}} hW(X)=sigmoid(x)=1+e−(w1x1+w2x2+…+wnxn+b0)1
h W ( X ) h_W(X) hW(X) 有实际的物理意义,表示取1的概率,则其取0和1的概率为:
P ( Y = 1 ∣ X ; W ) = h W ( X ) P(Y=1\mid X; W)=h_W(X) P(Y=1∣X;W)=hW(X)
P ( Y = 0 ∣ X ; W ) = 1 − h W ( X ) P(Y=0\mid X; W)=1-h_W(X) P(Y=0∣X;W)=1−hW(X)
假设目标值为 ( y ),则预测为 y y y 的概率(二分类假设目标变量服从伯努利分布)为:
P ( Y = y ∣ X ; W ) = ( h W ( X ) ) y ( 1 − h W ( X ) ) 1 − y P(Y=y\mid X; W)=\left(h_W(X)\right)^y\left(1-h_W(X)\right)^{1-y} P(Y=y∣X;W)=(hW(X))y(1−hW(X))1−y
假设有 ( N ) 条样本,则其似然函数为:
J ( W ) = ∏ i = 1 N ( h W ( X i ) ) y i ( 1 − h W ( X i ) ) 1 − y i J(W)=\prod_{i=1}^N\left(h_W\left(X_i\right)\right)^{y_i}\left(1-h_W\left(X_i\right)\right)^{1-y_i} J(W)=i=1∏N(hW(Xi))yi(1−hW(Xi))1−yi
算法的核心目标是找到一组参数 ( W ) 使得 ( J(W) ) 值最大,( J(W) ) 最大表示大部分样本预测为其真实值 ( y ) 的概率最大。取对数及取负数,将求最大值改为取最小值,该公式为交叉熵损失函数:
L ( W ) = − log ( J ( W ) ) = − 1 N ∑ i = 1 N [ y i log ( h W ( X i ) ) + ( 1 − y i ) log ( 1 − h W ( X i ) ) ] L(W)=-\log(J(W))=-\frac{1}{N}\sum_{i=1}^N\left[y_i\log\left(h_W\left(X_i\right)\right)+\left(1-y_i\right)\log\left(1-h_W\left(X_i\right)\right)\right] L(W)=−log(J(W))=−N1i=1∑N[yilog(hW(Xi))+(1−yi)log(1−hW(Xi))]
求解用梯度下降
L ( W ) L(W) L(W) 求导详解:
L ( W ) = − log ( J ( W ) ) = − 1 N ∑ i = 1 N [ y i log ( h W ( X i ) ) + ( 1 − y i ) log ( 1 − h W ( X i ) ) ] L(W) = -\log(J(W)) = -\frac{1}{N} \sum_{i=1}^N \left[ y_i \log(h_W(X_i)) + (1-y_i) \log(1-h_W(X_i)) \right] L(W)=−log(J(W))=−N1i=1∑N[yilog(hW(Xi))+(1−yi)log(1−hW(Xi))]
h W ( X i ) = 1 1 + e − W T X i h_W(X_i) = \frac{1}{1+e^{-W^TX_i}} hW(Xi)=1+e−WTXi1
则,
log ( h W ( X i ) ) = log ( 1 1 + e − W T X i ) = − log ( 1 + e − W T X i ) log ( 1 − h W ( X i ) ) = log ( 1 − 1 1 + e − W T X i ) = log ( e − W T X i 1 + e − W T X i ) = log ( e − W T X i ) − log ( 1 + e − W T X i ) = − W T X i − log ( 1 + e − W T X i ) \begin{align*} \log(h_W(X_i)) &= \log\left(\frac{1}{1+e^{-W^TX_i}}\right) \\ &= -\log(1+e^{-W^TX_i}) \\ \log(1-h_W(X_i)) &= \log\left(1-\frac{1}{1+e^{-W^TX_i}}\right) \\ &= \log\left(\frac{e^{-W^TX_i}}{1+e^{-W^TX_i}}\right) \\ &= \log(e^{-W^TX_i}) - \log(1+e^{-W^TX_i}) \\ &= -W^TX_i - \log(1+e^{-W^TX_i}) \end{align*} log(hW(Xi))log(1−hW(Xi))=log(1+e−WTXi1)=−log(1+e−WTXi)=log(1−1+e−WTXi1)=log(1+e−WTXie−WTXi)=log(e−WTXi)−log(1+e−WTXi)=−WTXi−log(1+e−WTXi)
L ( W ) = − log ( J ( W ) ) = − 1 N ∑ i = 1 N [ − y i log ( 1 + e − W T X i ) + ( 1 − y i ) ( − W T X i − log ( 1 + e − W T X i ) ) ] = − 1 N ∑ i = 1 N [ y i W T X i − W T X i − log ( 1 + e − W T X i ) ) = − 1 N ∑ i = 1 N [ y i W T X i − log ( e W T X i ) − log ( 1 + e − W T X i ) ) = − 1 N ∑ i = 1 N [ y i W T X i − ( log ( e W T X i ) + log ( 1 + e − W T X i ) ) ) = − 1 N ∑ i = 1 N [ y i W T X i − log ( 1 + e W T X i ) ] \begin{align*} L(W) &= -\log(J(W)) = -\frac{1}{N} \sum_{i=1}^{N} \left[ -y_i \log(1+e^{-W^T X_i}) + (1-y_i)(-W^T X_i - \log(1+e^{-W^T X_i})) \right] \\ &= -\frac{1}{N} \sum_{i=1}^{N} \left[ y_i W^T X_i - W^T X_i - \log(1+e^{-W^T X_i}) \right) \\ &= -\frac{1}{N} \sum_{i=1}^{N} \left[ y_i W^T X_i - \log(e^{W^T X_i}) - \log(1+e^{-W^T X_i}) \right) \\ &= -\frac{1}{N} \sum_{i=1}^{N} \left[ y_i W^T X_i - (\log(e^{W^T X_i}) + \log(1+e^{-W^T X_i})) \right) \\ &= -\frac{1}{N} \sum_{i=1}^{N} \left[ y_i W^T X_i - \log(1+e^{W^T X_i}) \right] \end{align*} L(W)=−log(J(W))=−N1i=1∑N[−yilog(1+e−WTXi)+(1−yi)(−WTXi−log(1+e−WTXi))]=−N1i=1∑N[yiWTXi−WTXi−log(1+e−WTXi))=−N1i=1∑N[yiWTXi−log(eWTXi)−log(1+e−WTXi))=−N1i=1∑N[yiWTXi−(log(eWTXi)+log(1+e−WTXi)))=−N1i=1∑N[yiWTXi−log(1+eWTXi)]
则,
∂ ∂ w j L ( W ) = ∂ ∂ w j ( − 1 N ∑ i = 1 N [ y i W T X i − log ( 1 + e W T X i ) ] ) = − 1 N ∑ i = 1 N [ y i ∂ ∂ w j W T X i − ∂ ∂ w j log ( 1 + e W T X i ) ] = − 1 N ∑ i = 1 N [ y i x i j − x i j e W T X i 1 + e W T X i ] = − 1 N ∑ i = 1 N [ y i x i j − x i j 1 + e − W T X i ] = − 1 N ∑ i = 1 N [ y i − 1 1 + e − W T X i ] ∗ x i j = − 1 N ∑ i = 1 N ( h W ( X i ) − y i ) ∗ x i j \begin{align*} \frac{\partial}{\partial w_j}L(W) &= \frac{\partial}{\partial w_j}\left(-\frac{1}{N}\sum_{i=1}^N[y_iW^TX_i-\log(1+e^{W^TX_i})]\right) \\ &= -\frac{1}{N}\sum_{i=1}^N\left[y_i\frac{\partial}{\partial w_j}W^TX_i-\frac{\partial}{\partial w_j}\log(1+e^{W^TX_i})\right] \\ &= -\frac{1}{N}\sum_{i=1}^N\left[y_ix_i^j-\frac{x_i^je^{W^TX_i}}{1+e^{W^TX_i}}\right] \\ &= -\frac{1}{N}\sum_{i=1}^N\left[y_ix_i^j-\frac{x_i^j}{1+e^{-W^TX_i}}\right] \\ &= -\frac{1}{N}\sum_{i=1}^N\left[y_i-\frac{1}{1+e^{-W^TX_i}}\right]*x_i^j \\ &= -\frac{1}{N}\sum_{i=1}^N(h_W(X_i)-y_i)*x_i^j \end{align*} ∂wj∂L(W)=∂wj∂(−N1i=1∑N[yiWTXi−log(1+eWTXi)])=−N1i=1∑N[yi∂wj∂WTXi−∂wj∂log(1+eWTXi)]=−N1i=1∑N[yixij−1+eWTXixijeWTXi]=−N1i=1∑N[yixij−1+e−WTXixij]=−N1i=1∑N[yi−1+e−WTXi1]∗xij=−N1i=1∑N(hW(Xi)−yi)∗xij
注意:当预测值接近真实值时 y i y_i yi 则梯度为0,当预测值偏离真实值时,则梯度与该 x i j x^j_i xij 分量和误差大小成正比。
梯度更新方法
给定N个样本,我们通过偏导求解,已经求解出 L ( W ) L(W) L(W) 在 ( x i , y i ) (x_i, y_i) (xi,yi) 处关于 w i w_i wi 的导数为:
∂ ∂ w j L ( W ) = − 1 N ∑ i = 1 N ( h W ( X i ) − y i ) ∗ x i j \frac{\partial}{\partial w_j} L(W) = -\frac{1}{N} \sum_{i=1}^N \left( h_W(X_i) - y_i \right) * x_i^j ∂wj∂L(W)=−N1i=1∑N(hW(Xi)−yi)∗xij
我们该如何更新 w i w_i wi 的值,假设 w i w_i wi 的初始值为 w i 0 w_i^0 wi0,则,
w i 1 = w i 0 − l r ∗ ∂ ∂ w j L ( W ) w_i^1 = w_i^0 - lr * \frac{\partial}{\partial w_j} L(W) wi1=wi0−lr∗∂wj∂L(W)
其中 l r lr lr 表示学习率,表示每次更新的步长,“减”表示向梯度的负方向更新。但是在实际的训练中,训练样本可能是几十几百万甚至上亿,模型参数该如何更新。
1) 全量更新
一次性计算所有的样本对于参数的偏导数,然后使用以下公式直接计算梯度值:
∂ ∂ w j L ( W ) = − 1 N ∑ i = 1 N ( h W ( X i ) − y i ) ∗ x i j \frac{\partial}{\partial w_{j}} L(W) = -\frac{1}{N} \sum_{i=1}^{N} \left( h_{W}(X_i) - y_i \right) * x_i^j ∂wj∂L(W)=−N1i=1∑N(hW(Xi)−yi)∗xij
优点:梯度会比较稳定。
缺点:容易达到局部最优,且极度耗内存。
2)单条更新
每次只计算一条样本的梯度,并更新参数。
缺点:由于样本的不确定性,梯度会非常不稳定,可能会不收敛。
3)batch 更新
假设有10万个样本,每次随机采样512个样本,放入模型训练,每轮根据这小批量样本计算梯度值然后更新梯度。
优点:平衡了单条更新的稳健性和全量梯度下降的效率。
有的时候使用了部分样本就已经收敛了,那么就不需要用全部的样本,直接根据实际情况停止。如果不进行随机采样,有可能很长的step都是训练同一个品牌的样本,所以不行,在训练之前要对样本进行shuffle随机打乱。
为什么 Loss 不用均方误差
L ( W ) = 1 2 ∑ i = 0 N ( y i − h W ( X i ) ) 2 L(W) = \frac{1}{2} \sum_{i=0}^N \left( y_i - h_W(X_i) \right)^2 L(W)=21i=0∑N(yi−hW(Xi))2
对于sigmoid函数, h ( x ) = 1 1 + e − x h(x) = \frac{1}{1 + e^{-x}} h(x)=1+e−x1,我们求其导数,首先对公式做变化,
h ( x ) = 1 1 + e − x = e x 1 + e x = 1 − ( e x + 1 ) − 1 , h(x) = \frac{1}{1 + e^{-x}} = \frac{e^x}{1 + e^x} = 1 - (e^x + 1)^{-1}, h(x)=1+e−x1=1+exex=1−(ex+1)−1,
根据求导法则:
h ′ ( x ) = ∂ ∂ x h ( x ) = ( − 1 ) ∗ ( − 1 ) ( e x + 1 ) − 2 e x = ( 1 + e − x ) − 2 e − 2 x e x = ( 1 + e − x ) − 1 ∗ e − x 1 + e − x = 1 1 + e − x ∗ ( 1 − 1 1 + e − x ) = h ( x ) ∗ ( 1 − h ( x ) ) \begin{align*} h'(x) &= \frac{\partial}{\partial x} h(x) = (-1) * (-1) (e^x + 1)^{-2} e^x \\ &= (1 + e^{-x})^{-2} e^{-2x} e^x \\ &= (1 + e^{-x})^{-1} * \frac{e^{-x}}{1 + e^{-x}} \\ &= \frac{1}{1 + e^{-x}} * \left(1 - \frac{1}{1 + e^{-x}}\right) \\ &= h(x) * (1 - h(x)) \end{align*} h′(x)=∂x∂h(x)=(−1)∗(−1)(ex+1)−2ex=(1+e−x)−2e−2xex=(1+e−x)−1∗1+e−xe−x=1+e−x1∗(1−1+e−x1)=h(x)∗(1−h(x))
导数值最大为 0.25,x越大和越小都趋近 0,这样会导致梯度直接消失。
对于均方误差函数, L ( W ) = 1 2 ∑ i = 0 N ( y i − h W ( X i ) ) 2 L(W) = \frac{1}{2} \sum_{i=0}^N \left( y_i - h_W(X_i) \right)^2 L(W)=21i=0∑N(yi−hW(Xi))2,我们求其偏导为:
∂ ∂ w i L ( W ) = − ∑ i = 0 N ( y i − h W ( X i ) ) h W ′ ( X i ) ∗ x i \frac{\partial}{\partial w_i} L(W) = -\sum_{i=0}^N \left( y_i - h_W(X_i) \right) h_W'(X_i) * x_i ∂wi∂L(W)=−i=0∑N(yi−hW(Xi))hW′(Xi)∗xi
而交叉熵的偏导为,
∂ ∂ w i L ( W ) = ∑ i = 0 N ( y i − h W ( X i ) ) ∗ x i j \frac{\partial}{\partial w_i} L(W) = \sum_{i=0}^N \left( y_i - h_W(X_i) \right) * x_i^j ∂wi∂L(W)=i=0∑N(yi−hW(Xi))∗xij
没有均方误差那么严重的梯度消失问题。
神经元
多层神经网络
假设输入为 X = [ x 1 , x 2 , x 3 , … , x n ] X = [x_1, x_2, x_3, \ldots, x_n] X=[x1,x2,x3,…,xn],第一个隐藏层(Hidden 1), H 1 = [ h 1 1 , h 1 2 , … , h 1 n 1 ] H^1 = [h_1^1, h_1^2, \ldots, h_1^{n_1}] H1=[h11,h12,…,h1n1],第二个隐藏层(Hidden 2), H 2 = [ h 2 1 , h 2 2 , … , h 2 n 2 ] H^2 = [h_2^1, h_2^2, \ldots, h_2^{n_2}] H2=[h21,h22,…,h2n2],假设激活函数为Relu,则
h 1 1 = Relu ( w 1 1 x 1 + w 1 2 x 2 + … + w 1 n x n + b 0 ) = Relu ( w 1 X ) , 其中 w 1 ∈ R 1 × n , X ∈ R n × 1 \begin{align*} & h_1^1 = \text{Relu}(w_1^1 x_1 + w_1^2 x_2 + \ldots + w_1^n x_n + b_0) = \text{Relu}(w_1 X), \text{其中} w_1 \in \mathbb{R}^{1 \times n}, X \in \mathbb{R}^{n \times 1} \end{align*} h11=Relu(w11x1+w12x2+…+w1nxn+b0)=Relu(w1X),其中w1∈R1×n,X∈Rn×1
将 w 1 , w 2 , … , w n 1 w_1, w_2, \ldots, w_{n_1} w1,w2,…,wn1,组成一个矩阵变为,
W 1 = [ w 1 , w 2 , … , w n 1 ] , W 1 ∈ R n 1 × n \begin{align*} & W_1 = [w_1, w_2, \ldots, w_{n_1}], W_1 \in \mathbb{R}^{n_1 \times n} \end{align*} W1=[w1,w2,…,wn1],W1∈Rn1×n
则,
H 1 = Relu ( W 1 X ) H_1 = \text{Relu}(W_1 X) H1=Relu(W1X)
H 2 = Relu ( W 2 Relu ( W 1 X ) ) H_2 = \text{Relu}(W_2 \text{Relu}(W_1 X)) H2=Relu(W2Relu(W1X))
激活函数
如果没有激活函数,模型退化为线性回归,
H 2 = Relu ( W 2 Relu ( W 1 X ) ) H_2 = \text{Relu}(W_2 \text{Relu}(W_1 X)) H2=Relu(W2Relu(W1X))
H ^ 2 = W 2 ( W 1 X ) = W 2 W 1 X = W X , W = W 1 W 2 \hat{H}_2 = W_2 (W_1 X) = W_2 W_1 X = W X, W = W_1 W_2 H^2=W2(W1X)=W2W1X=WX,W=W1W2
有了激活函数以后,模型增加了“非线性能力”,即特征选择和组合的能力。
如下图:当模型有了激活函数,则模型可以按照需要,任意选择特征,并进行任意的组合,以单个节点为例,有了激活函数,模型通过选择特征和组合能力,构建了一颗树(就是选择哪条权重要哪条不要)。而到最后一层时,就是根据需要构建了多棵树。
输出层
如果是线性回归或者二分类,模型一般输出一个值,对于多分类,根据需要输出多个值,而最后一层全连接层一般不加激活函数,直接输出。假设输出为二分类,模型只输出一个值,则,
logits = W 3 Relu ( W 2 Relu ( W 1 X ) ) \text{logits} = W_3 \text{Relu}(W_2 \text{Relu}(W_1 X)) logits=W3Relu(W2Relu(W1X))
输出层 embedding
假设输入的特征是经过离散后的编号的特征,比如[‘男’, ‘近30条购买5次’, ‘重庆市’],编号后为[1, 2, 3]。输入的特征为int型从1到N编号的数组。
如何将int型的编号数组变为embedding?
假设特征整体有N个(该N表示编号的最大值),每个特征用一个实数值维度为M的向量 d d d 表示,即 d 1 = [ d 1 1 , d 1 2 , … , d 1 M ] ∈ R 1 × M d_1 = [d_1^1, d_1^2, \ldots, d_1^M] \in \mathbb{R}^{1 \times M} d1=[d11,d12,…,d1M]∈R1×M,N个向量组成的矩阵为 D = [ d 1 , d 2 , … , d N ] ∈ R N × M D = [d_1, d_2, \ldots, d_N] \in \mathbb{R}^{N \times M} D=[d1,d2,…,dN]∈RN×M。
当输入为[1, 2, 3]时,模型会从矩阵 D D D 中选取第1、2、3行的embedding组成新的矩阵,即 D ′ = [ d 1 , d 2 , d 3 ] ∈ R 3 × M D' = [d_1, d_2, d_3] \in \mathbb{R}^{3 \times M} D′=[d1,d2,d3]∈R3×M,然后将 D ′ D' D′ 输入到模型中。
Loss
交叉熵损失函数:
L ( W ) = − log ( J ( W ) ) = − 1 N ∑ i = 1 N [ y i log ( h W ( X i ) ) + ( 1 − y i ) log ( 1 − h W ( X i ) ) ] L(W)=-\log(J(W))=-\frac{1}{N}\sum_{i=1}^N\left[y_i\log\left(h_W\left(X_i\right)\right)+\left(1-y_i\right)\log\left(1-h_W\left(X_i\right)\right)\right] L(W)=−log(J(W))=−N1i=1∑N[yilog(hW(Xi))+(1−yi)log(1−hW(Xi))]
优化器
- 随机梯度下降法
优化器:在batch迭代过程中如何通过梯度更新参数的算法。
假设 g t = ∂ ∂ w j L ( W ) g_t = \frac{\partial}{\partial w_j} L(W) gt=∂wj∂L(W),最简单的优化器是随机梯度下降法:
w i t = w i t − 1 − l r ∗ ∂ ∂ w j L ( W ) = w i t − 1 − l r ∗ g t w_i^t = w_i^{t-1} - lr * \frac{\partial}{\partial w_j} L(W) = w_i^{t-1} - lr * g_t wit=wit−1−lr∗∂wj∂L(W)=wit−1−lr∗gt
该算法有什么缺点:
-
由于样本的随机性,梯度 g t g_t gt 会有不确定性,包括方向和大小,特别是方向,这会导致权重更新有很大的波动。
-
l r lr lr 是固定的。
对于频繁更新的参数或者稀疏的更新频率低的参数更新步长一致。
随着模型的迭代,接近最优点的时候和刚开始迭代的时候更新步长也一致, l r lr lr 设置太大会导致最后在最优值附近震荡,甚至会跳过最优值,如果设置太小,收敛变慢,且可能收敛于局部最优值。
- Adam算法
Adam算法的关键组成部分之一是:它使用指数加权移动平均值来估算梯度的动量和二次矩,即它使用状态变量:
v t ← β 1 v t − 1 + ( 1 − β 1 ) g t s t ← β 2 s t − 1 + ( 1 − β 2 ) g t 2 v_t \leftarrow \beta_1 v_{t-1} + (1 - \beta_1) g_t \\ s_t \leftarrow \beta_2 s_{t-1} + (1 - \beta_2) g_t^2 vt←β1vt−1+(1−β1)gtst←β2st−1+(1−β2)gt2
其中常设置 β 1 = 0.9 , β 2 = 0.999 \beta_1 = 0.9, \beta_2 = 0.999 β1=0.9,β2=0.999, s t s_t st 移动远远慢于 v t v_t vt 的移动。
v ^ t = v t 1 − β 1 t s ^ t = s t 1 − β 2 t \hat{v}_t = \frac{v_t}{1 - \beta_1^t} \\ \hat{s}_t = \frac{s_t}{1 - \beta_2^t} v^t=1−β1tvts^t=1−β2tst
则新的梯度为:
g t ′ = l r ∗ v ^ t s ^ t + ϵ g_t' = \frac{lr * \hat{v}_t}{\sqrt{\hat{s}_t} + \epsilon} gt′=s^t+ϵlr∗v^t
则梯度更新公式为:
w i t = w i t − 1 − l r ∗ v ^ t s ^ t + ϵ w_i^t = w_i^{t-1} - \frac{lr * \hat{v}_t}{\sqrt{\hat{s}_t} + \epsilon} wit=wit−1−s^t+ϵlr∗v^t
-
动量估计 v t v_t vt:历史梯度的移动平均,保留该梯度的正确方向,且保留了该方向上的梯度值,可以在一定程度上避免每次梯度的不确定性导致的波动。
-
方差估计 s t s_t st:累计历史上梯度的平方。
对于频繁更新的参数,比如自然语言里常出现的字,比如“我”、“的”等,其 s t s_t st 会比较大,而对于稀疏的值,比如“舴”,其出现的次数少,则其 s t s_t st 比较小。则由于其在分母,因此频繁出现的参数的 l r lr lr 会比较小,而出现次数少的参数的 l r lr lr 会比较大。
-
l r lr lr会随着迭代的轮数增加而减小,防止在最小值区间震荡不收敛。
DNN 建模
基于GBDT的品牌购买模型有如下问题:
-
只有统计特征,粒度太粗,无法通过用户的行为序列,学习品牌之间的关系。
例如:目标是预测“李宁”品牌的购买人群,有两个用户A、B,用户A行为序列为[安踏、贵人鸟、特步],用户B行为序列为[阿迪达斯、耐克、小脏鞋],A和B哪个更有可能购买“李宁”?
对于使用GBDT的模型,用户A和B在特征上的表现是,在品牌的二级类目上都有点击行为,没有任何的区分度,因此模型需要通过用户行为序列,区分用户之间的购买兴趣。
-
GBDT模型无法建模规模较大的id类特征,品牌id有8万+,GBDT无法建模。
GBDT建模id类特征,只能使用是和否,比如一个用户的性别[“男”、“女”],GBDT必须建模成为两个特征,是否是男性和是否是女性,如果有8万个id特征,那必须建设8万个是和否特征,GBDT难以建模。
基于以上问题,本方案引入深度学习算法。深度学习算法可以很好地建模id类特征,通过用户行为序列,学习id类特征之间的关系。
为了和 gbdt 对比,使用同样的样本,特征增加用户行为序列特征和目标品牌id特征,评估方案保持一致。
特征开发
总共 8w 个品牌,大多数品牌行为稀疏,为了防止噪声,选取 top 1w 的品牌开发行为特征
create table if not exists brand_top1w_alipay_dim (brand_id string,alipay_num bigint
)LIFECYCLE 60;insert OVERwrite table brand_top1w_alipay_dim
select brand_id, alipay_num
from (select brand_id, count(DISTINCT user_id) as alipay_numfrom dw_user_item_alipay_logwhere ds<=${bizdate} and ds>to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-30, 'dd'),'yyyymmdd')and brand_id is not nullgroup by brand_id
)t1 order by alipay_num desc limit 10000
;
user_brand_seq_feature
:
子查询从dw_user_item_click_log表中选择用户ID和品牌ID,其中ds(日期)是在过去30天内且小于${bizdate}(一个变量,代表业务日期)。然后,使用ROW_NUMBER()窗口函数为每个用户按操作时间降序排列品牌点击,并且只选择前50个品牌点击记录。接着,将这些品牌ID与brand_top1w_alipay_dim表中的数据进行连接,以确保只考虑顶级品牌。最后,使用WM_CONCAT函数将这些品牌ID连接成一个字符串,并按用户ID分组,将结果插入到user_click_brand_seq_feature表中对应的分区。
同理建立收藏和购买表,由于加购和购买类似因此不建立cart表了。
create TABLE if not exists user_click_brand_seq_feature (user_id string,brand_id_seq string
)PARTITIONED BY (ds STRING ) LIFECYCLE 90;insert OVERWRITE TABLE user_click_brand_seq_feature PARTITION (ds=${bizdate})
select user_id, WM_CONCAT(',', concat('b_',brand_id)) as brand_id_seq
from (select t2.user_id, t2.brand_idfrom (select brand_idfrom brand_top1w_alipay_dim)t1 join (SELECT user_id,brand_id, ROW_NUMBER() OVER(PARTITION BY user_id ORDER BY ds desc) AS numberfrom (select user_id, brand_id, max(ds) as dsfrom dw_user_item_click_logwhere ds<=${bizdate} and ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-30, 'dd'),'yyyymmdd')and brand_id is not nullgroup by user_id, brand_id)t1)t2 on t1.brand_id=t2.brand_idwhere number<=50
)t1 group by user_id
;
user_cate_seq_feature
用于存储用户点击类别(category)序列特征,并将过去30天内用户点击类别的序列数据插入到这个表中。这个表的设计和user_click_brand_seq_feature 表类似,但是它关注的是商品或内容的类别(cate)而不是品牌(brand)。
create TABLE if not exists user_click_cate_seq_feature (user_id string,cate_seq string
)PARTITIONED BY (ds STRING ) LIFECYCLE 90;insert OVERWRITE TABLE user_click_cate_seq_feature PARTITION (ds=${bizdate})
select user_id, WM_CONCAT(',', concat('c_',cate2)) as cate2_seq
from (select user_id, cate2, ROW_NUMBER() OVER(PARTITION BY user_id ORDER BY ds DESC) AS numberfrom (select user_id, cate2, max(ds) as dsfrom dw_user_item_click_logwhere ds<=${bizdate} and ds>=to_char(dateadd(to_date(${bizdate}, 'yyyymmdd'),-30, 'dd'),'yyyymmdd')and cate2 is not nullgroup by user_id, cate2)t1
)t1
where number<=50
group by user_id
;
特征拼接、特征离散化和特征序列化
- 参照 gbdt 的样本与特征 join,添加上面新增的品牌行为特征、行业行为特征
- 对特征(原始实数值)进行离散化,对于某些字段,如 item_num_3d 或 brand_num_3d,进行对数(log(2, value + 1))转换,这可能是为了将计数特征转换为更平滑的表示,避免数据中的极端值。
- 使用 concat 函数合并字符串,例如将 ‘b_’ 与 brand_id 字段合并,形成一个新的特征,这可能用于表示品牌的编码。
create table if not exists user_pay_sample_feature_join_dnn(user_id string,brand_id string,label bigint,target_brand_id string,clk_brand_seq string,clt_brand_seq string,pay_brand_seq string,clk_cate_seq string,clt_cate_seq string,pay_cate_seq string,user_click_feature string,user_clt_feature string,user_cart_feature string,user_pay_feature string,brand_stat_feature string,user_cate2_cross_feature string,user_brand_cross_feature string
)PARTITIONED BY (ds STRING ) LIFECYCLE 90;insert OVERWRITE TABLE user_pay_sample_feature_join_dnn partition (ds=${bizdate})
select t1.user_id, t1.brand_id, t1.label, concat('b_',t1.brand_id),if(t2.brand_id_seq is null, 'clkb_seq_null', t2.brand_id_seq),if(t3.brand_id_seq is null, 'cltb_seq_null', t3.brand_id_seq),if(t4.brand_id_seq is null, 'payb_seq_null', t4.brand_id_seq),if(t5.cate_seq is null, 'clkc_seq_null', t5.cate_seq),if(t6.cate_seq is null, 'cltc_seq_null', t6.cate_seq),if(t7.cate_seq is null, 'payc_seq_null', t7.cate_seq),if(t8.user_id is null, 'user_click_null', concat(concat('uclk_item_num_3d','_',if(t8.item_num_3d is null, 'null', cast(log(2,t8.item_num_3d+1) as bigint))),',',concat('uclk_brand_num_3d','_',if(t8.brand_num_3d is null, 'null', cast(log(2,t8.brand_num_3d+1) as bigint))),',',concat('uclk_seller_num_3d','_',if(t8.seller_num_3d is null, 'null', cast(log(2,t8.seller_num_3d+1) as bigint))),',',concat('uclk_cate1_num_3d','_',if(t8.cate1_num_3d is null, 'null', cast(log(2,t8.cate1_num_3d+1) as bigint))),',',concat('uclk_cate2_num_3d','_',if(t8.cate2_num_3d is null, 'null', cast(log(2,t8.cate2_num_3d+1) as bigint))),',',concat('uclk_cnt_days_3d','_',if(t8.cnt_days_3d is null, 'null', cast(log(2,t8.cnt_days_3d+1) as bigint))),',',
...)) as user_click_beh_feature...
from (select *from user_pay_samplewhere ds=${bizdate}
)t1 left join (select user_id, brand_id_seqfrom user_click_brand_seq_featurewhere ds=${bizdate}
)t2 on t1.user_id=t2.user_id
...
where (t2.brand_id_seq is not null or t3.brand_id_seq is not null or t4.brand_id_seq is not null ort5.cate_seq is not null or t6.cate_seq is not null or t7.cate_seq is not null ort8.cnt_days_90d is not null or t9.cate1_num_90d is not null or t10.item_num_90d is not nullor t11.item_num_90d is not null or t12.click_num is not null or t13.clk_item_90d is not null ort14.clk_item_90d is not null)
;
4. 特征序列化
把所有的特征序列化,目的是为了将类似于b_alipay_num_6
这样的特征编号
create table if not exists user_pay_sample_feature_seq (feature string ,number bigint
)LIFECYCLE 90;insert OVERWRITE TABLE user_pay_sample_feature_seq
select feature, ROW_NUMBER() OVER(ORDER BY feature) AS number
from (select DISTINCT featurefrom (select target_brand_id as featurefrom user_pay_sample_feature_join_dnnwhere ds>='20130701' and ds<='20130916'union allselect trans_array(0,',',clk_brand_seq) as (feature)from user_pay_sample_feature_join_dnnwhere ds>='20130701' and ds<='20130916'union allselect trans_array(0,',',clt_brand_seq) as (feature)from user_pay_sample_feature_join_dnnwhere ds>='20130701' and ds<='20130916'
...)t1
)t1
;
使用多个 JOIN 语句将不同的特征序列与原始数据表 user_pay_sample_feature_join_dnn 连接起来
reate table if not exists user_pay_sample_feature_join_dnn_seq(user_id string,brand_id string,label bigint,bizdate string,target_brand_id string,clk_brand_seq string,clt_brand_seq string,pay_brand_seq string,clk_cate_seq string,clt_cate_seq string,pay_cate_seq string,user_click_feature string,user_clt_feature string,user_cart_feature string,user_pay_feature string,brand_stat_feature string,user_cate2_cross_feature string,user_brand_cross_feature string
)LIFECYCLE 90;insert OVERWRITE TABLE user_pay_sample_feature_join_dnn_seq
select t1.user_id, t1.brand_id, t1.label, t1.ds, t1.number,t2.feature, t3.feature, t5.feature, t6.feature, t7.feature, t8.feature,t9.feature, t10.feature, t11.feature, t12.feature, t13.feature, t14.feature, t15.feature
from (select t1.user_id, t1.brand_id, t1.label, t1.ds, t2.numberfrom (select user_id, brand_id, label, ds, target_brand_idfrom user_pay_sample_feature_join_dnnwhere ds>='20130701' and ds<='20130916')t1 join (select feature, numberfrom user_pay_sample_feature_seq)t2 on t1.target_brand_id=t2.feature
)t1 join (select user_id, brand_id, label, ds, WM_CONCAT(',', number) as featurefrom (select user_id, brand_id, label, ds, numberfrom (select trans_array(4, ',', user_id, brand_id, label, ds, clk_brand_seq) as (user_id, brand_id, label, ds, feature)from user_pay_sample_feature_join_dnnwhere ds>='20130701' and ds<='20130916')t1 join (select feature, numberfrom user_pay_sample_feature_seq)t2 on t1.feature=t2.feature)t1group by user_id, brand_id, label, ds
)t2 on t1.user_id=t2.user_id and t1.brand_id=t2.brand_id and t1.label=t2.label and t1.ds=t2.ds
...
;
从 user_pay_sample_feature_join_dnn_seq 表中选择所有列,并为每行生成一个唯一的 key_all,这个键是通过连接随机数和原始数据的一些列(如 user_id, brand_id, label, bizdate)构成的。然后用 group by
使用 MAX() 函数对每个 key_all 分组内的记录选择最大值,这样做可以确保每个特征序列中的每个特征都是最具有代表性的,并且可以用于后续的模型训练。
create table if not exists user_pay_sample_feature_join_dnn_seq_shuffle(key_all string,label bigint,target_brand_id string,clk_brand_seq string,clt_brand_seq string,pay_brand_seq string,clk_cate_seq string,clt_cate_seq string,pay_cate_seq string,user_click_feature string,user_clt_feature string,user_cart_feature string,user_pay_feature string,brand_stat_feature string,user_cate2_cross_feature string,user_brand_cross_feature string
)LIFECYCLE 90;insert OVERWRITE TABLE user_pay_sample_feature_join_dnn_seq_shuffle
select key_all, max(label), MAX(target_brand_id), MAX(clk_brand_seq), MAX(clt_brand_seq), MAX(pay_brand_seq),max(clk_cate_seq), max(clt_cate_seq), max(pay_cate_seq), max(user_click_feature),max(user_clt_feature), max(user_cart_feature), max(user_pay_feature), max(brand_stat_feature),max(user_cate2_cross_feature), max(user_brand_cross_feature)
from (select *, concat(RAND(),'_',RAND(),'_',user_id,'_',brand_id,'_',label,'_',bizdate) as key_allfrom user_pay_sample_feature_join_dnn_seq
)t1 group by key_all
;
训练模型-DNN
思路
- 使用 pyodps 读取数据集。
- dataset:读取数据、转换为 PyTorch 格式,tensor。
- dataLoader:组织数据,也可以数据变换。
- model:包含 embedding、dense、relu。
- 损失函数 loss:使用 celoss(交叉熵损失)。
- 优化器 optimizer:使用 adam。
- 训练的流程:使用 for 循环。
- 保存 loss,使用 tensorboard。
- 评估 auc(Area Under Curve)。
- 保存模型。
代码结构
.
├── config
│ ├── ak_config.py
│ ├── dnn_config.py
├── dataset
│ ├── dnn_dataset.py
├── din_model_train.py
├── dnn_focal_loss_model_train.py
├── dnn_model_test.py
├── dnn_model_train.py
├── loss
│ ├── focal_loss.py
├── model
│ ├── din_model2.py
│ ├── din_model.py
│ ├── dnn_model.py
│ ├── moe_model.py
├── moe_focal_loss_model_train.py
└── utils├── get_data.py
代码内容
dnn_config
config={"embedding_dim":32,"num_embedding":20000,"lr":0.001,"batch_size":512,"num_experts":3,"feature_col" :["target_brand_id","clk_brand_seq","clt_brand_seq","pay_brand_seq","clk_cate_seq","clt_cate_seq","pay_cate_seq","user_click_feature","user_clt_feature","user_cart_feature","user_pay_feature","brand_stat_feature","user_cate2_cross_feature","user_brand_cross_feature"],"features_gate_col":["target_brand_id","brand_stat_feature","clk_brand_seq","user_cate2_cross_feature","user_brand_cross_feature"]
}
dnn_dataset
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, IterableDataset# 基础数据集类,用于处理特征和标签数据
class MyDataset(Dataset):"""自定义数据集类,继承自PyTorch的Dataset用于处理已经预处理好的特征和标签数据"""def __init__(self, features, labels, config):"""初始化数据集参数:features: 特征字典,包含多个特征列labels: 标签数据config: 配置字典,包含特征列名等信息"""self.config = configself.features = {}# 将每个特征列转换为张量格式for ff in self.config["feature_col"]:self.features[ff] = [torch.tensor([int(id) for id in seq.split(',')], dtype=torch.long) for seq in features[ff]]self.labels = torch.tensor(labels, dtype=torch.float32)def __len__(self):"""返回数据集的样本数量"""return len(self.labels)def __getitem__(self, idx):"""获取指定索引的样本参数:idx: 样本索引返回:包含特征和标签的字典"""res_features = {}for ff in self.config["feature_col"]:res_features[ff] = self.features[ff][idx]res_features['labels'] = self.labels[idx]return res_features# 优化版数据集类,用于处理大规模数据
class MyPriorDataset(Dataset):"""优化的数据集类,用于更高效地处理特征数据相比MyDataset,减少了内存使用,采用即时转换策略"""def __init__(self, features, labels, config):"""初始化数据集参数:features: 原始特征数据labels: 标签数据config: 配置字典"""self.config = configself.features = featuresself.labels = labelsdef __len__(self):"""返回数据集的样本数量"""return len(self.labels)def __getitem__(self, idx):"""获取指定索引的样本,即时进行数据转换参数:idx: 样本索引返回:包含特征和标签的字典"""res_features = {}for ff in self.config["feature_col"]:res_features[ff] = torch.tensor([int(id) for id in self.features[ff][idx].split(',')], dtype=torch.long)res_features['labels'] = torch.tensor(self.labels[idx], dtype=torch.float32)return res_features
# 可迭代数据集类,用于流式处理大规模数据
class MyIterDataset(IterableDataset):"""可迭代数据集类,适用于大规模数据的流式处理继承自PyTorch的IterableDataset,支持数据流式加载"""def __init__(self, df):"""初始化可迭代数据集参数:df: pandas DataFrame对象,包含特征和标签数据"""super().__init__()self.df = dfdef __iter__(self):"""返回数据迭代器生成器函数,逐行产出处理后的数据样本返回:字典,包含处理后的特征和标签"""for index, row in self.df.iterrows():yield {'features': {col: list(map(int, row[col].split(','))) for col in feature_columns if col != 'label'},'label': int(row['label'])}
dnn_model
import torch
import torch.nn as nn
import torch.nn.functional as F # 深度神经网络模型类
class MyModel(nn.Module):"""自定义DNN模型实现了一个包含embedding层和三层全连接层的深度神经网络用于特征学习和二分类预测"""def __init__(self, config):"""初始化模型结构参数:config: 配置字典,包含以下关键参数:- num_embedding: embedding字典大小- embedding_dim: embedding向量维度- feature_col: 特征列名列表"""super(MyModel, self).__init__()self.config = config# 创建embedding层,用于将离散特征转换为稠密向量# padding_idx=0表示将0作为填充值,其embedding向量将始终为0self.embedding = nn.Embedding(num_embeddings=self.config["num_embedding"], embedding_dim=self.config["embedding_dim"], padding_idx=0)# 定义三层全连接网络# 第一层:将所有特征的embedding连接后映射到512维self.fc1 = nn.Linear(self.config["embedding_dim"]*len(self.config["feature_col"]), 512)# 第二层:将512维特征映射到128维self.fc2 = nn.Linear(512,128)# 输出层:将128维特征映射到1维输出self.fc3 = nn.Linear(128,1)def forward(self, features):"""前向传播函数参数:features: 字典,包含各个特征列的输入数据每个特征的形状为 [batch_size, sequence_length]返回:tensor: 模型预测输出,形状为 [batch_size, 1]"""# 对每个特征进行embedding操作embedding_dict = {}for ff in self.config["feature_col"]:# 对每个特征序列的embedding结果求和,得到定长表示embedding_dict[ff] = torch.sum(self.embedding(features[ff]), dim=1)# 将所有特征的embedding结果在特征维度上拼接x = torch.cat([embedding_dict[ff] for ff in self.config["feature_col"]], dim=1)# 通过三层全连接网络x = F.relu(self.fc1(x)) # 第一层激活x = F.relu(self.fc2(x)) # 第二层激活x = self.fc3(x) # 输出层(不使用激活函数)return x
get_data
import pandas as pd
import lightgbm as lgb
from sklearn.model_selection import train_test_split
import numpy as np
import os
from odps import ODPS
from torch.nn.utils.rnn import pad_sequence
from config.dnn_config import config as dnn_config
import torch# 从MaxCompute获取训练和测试数据
def get_data(ak_config):"""从MaxCompute数据库获取训练数据并进行预处理参数:ak_config: 包含MaxCompute访问配置的字典返回:train_feature_numpy: 训练集特征test_feature_numpy: 测试集特征train_label: 训练集标签test_label: 测试集标签"""# 初始化ODPS连接o = ODPS(ak_config["access_id"],ak_config["access_key"],ak_config["project"],ak_config["endpoint"],)# 执行SQL查询获取数据sql = '''SELECT *FROM recom_maxcompute_dev.user_pay_sample_feature_join_dnn_seq_shuffle;'''print(sql)query_job = o.execute_sql(sql)result = query_job.open_reader(tunnel=True)df = result.to_pandas(n_process=10) # 使用多线程加速数据读取print('read data finish')# 数据预处理df = df.drop(columns=['key_all']) # 删除非特征列X = df.drop(columns='label') # 分离特征y = df['label'] # 分离标签# 划分训练集和测试集X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)# 构建特征字典feature_col = dnn_config["feature_col"]train_feature_numpy = {}test_feature_numpy = {}for feature in feature_col:train_feature_numpy[feature] = X_train[feature].valuestest_feature_numpy[feature] = X_test[feature].valuestrain_label = y_train.valuestest_label = y_test.valuesreturn train_feature_numpy,test_feature_numpy,train_label,test_label# 计算Top-K正例比例
def calculate_top_k_ratio(predictions, labels, top_k_list):"""计算不同K值下的正例召回率参数:predictions: 模型预测分数labels: 真实标签top_k_list: 需要计算的K值列表返回:ratios: 不同K值对应的正例召回率字典"""results_df = pd.DataFrame({'prediction': predictions, 'label': labels})results_df = results_df.sort_values(by='prediction', ascending=False)total_positives = (results_df['label'] == 1).sum()ratios = {}for k in top_k_list:top_k_df = results_df.head(k)top_k_positives = (top_k_df['label'] == 1).sum()ratio = top_k_positives / total_positivesratios[k] = ratioreturn ratios# 获取特定品牌的测试数据
def get_data_test(ak_config, brand_id):"""从MaxCompute获取特定品牌的测试数据参数:ak_config: MaxCompute访问配置brand_id: 品牌ID返回:test_feature_numpy: 测试特征test_label: 测试标签"""o = ODPS(ak_config["access_id"], ak_config["access_key"],ak_config["project"], ak_config["endpoint"])sql = '''SELECT *FROM recom_maxcompute_dev.user_pay_sample_feature_join_eval_dnn_seqwhere keys_all = '{brand_id}';'''.format(brand_id=brand_id)print(sql)query_job = o.execute_sql(sql)result = query_job.open_reader(tunnel=True)df = result.to_pandas(n_process=10)print('read data finish')df = df.drop(columns=['keys_all'])X = df.drop(columns='label')y = df['label']feature_col = dnn_config["feature_col"]test_feature_numpy = {}for feature in feature_col:test_feature_numpy[feature] = X[feature].valuestest_label = y.valuesreturn test_feature_numpy,test_label# 获取MOE模型测试数据
def get_data_test_moe(ak_config):"""获取MOE模型的测试数据(限制3000条)参数:ak_config: MaxCompute访问配置返回:test_feature_numpy: 测试特征test_label: 测试标签"""o = ODPS(ak_config["access_id"],ak_config["access_key"],ak_config["project"],ak_config["endpoint"],)# 读取数据。sql = '''SELECT *FROM recom_maxcompute_dev.user_pay_sample_feature_join_dnn_seq_shuffle limit 3000;'''print(sql)query_job = o.execute_sql(sql)result = query_job.open_reader(tunnel=True)df = result.to_pandas(n_process=10) #n_process配置可参考机器配置,取值大于1时可以开启多线程加速。print('read data finish')# 删除非特征列df = df.drop(columns=['key_all'])# 分离特征和标签X = df.drop(columns='label')y = df['label']# 划分训练集和测试集feature_col = dnn_config["feature_col"]test_feature_numpy = {}for feature in feature_col:test_feature_numpy[feature] = X[feature].valuestest_label = y.valuesreturn test_feature_numpy,test_label# 基础数据整理函数
def my_collate_fn(batch):"""数据批次整理函数,用于DataLoader处理变长序列,不包含mask参数:batch: 数据批次返回:res_feature: 填充后的特征字典labels: 标签张量"""res_features_tmp = {}labels = []for ff in dnn_config["feature_col"]:res_features_tmp[ff] = []# 收集批次数据for sample in batch:for ff in dnn_config["feature_col"]:res_features_tmp[ff].append(sample[ff])labels.append(sample["labels"])# 对序列进行填充res_feature = {}for ff in dnn_config["feature_col"]:res_feature[ff] = pad_sequence(res_features_tmp[ff], batch_first=True, padding_value=0)return res_feature, torch.tensor(labels)# 带序列mask的数据整理函数
def seq_collate_fn(batch):"""带序列mask的数据批次整理函数用于需要attention mask的模型(如DIN)参数:batch: 数据批次返回:res_feature: 填充后的特征字典res_mask: 特征的mask字典labels: 标签张量"""res_features_tmp = {}labels = []# 收集数据for ff in dnn_config["feature_col"]:res_features_tmp[ff] = []for sample in batch:for ff in dnn_config["feature_col"]:res_features_tmp[ff].append(sample[ff])labels.append(sample["labels"])# 生成特征和对应的maskres_feature = {}res_mask = {}for ff in dnn_config["feature_col"]:res_feature[ff] = pad_sequence(res_features_tmp[ff], batch_first=True, padding_value=0)res_mask[ff] = (res_feature[ff] != 0).type(torch.float32) # 生成mask:非0位置为1,0位置为0return res_feature, res_mask, torch.tensor(labels)
dnn_model_train
from sklearn.metrics import roc_auc_score
import os
from torch.utils.data import DataLoader
import torch
import torch.nn as nn
from torch.utils.tensorboard import SummaryWriterfrom dataset.dnn_dataset import MyPriorDataset
from model.dnn_model import MyModel
from config.ak_config import config as ak_config
from config.dnn_config import config as dnn_config
from utils.get_data import get_data
from utils.get_data import my_collate_fn# 获取训练和测试数据
train_feature_numpy,test_feature_numpy,train_label,test_label = get_data(ak_config)
# 创建训练和测试数据集
dataset_train = MyPriorDataset(train_feature_numpy, train_label,dnn_config)
dataset_test = MyPriorDataset(test_feature_numpy, test_label, dnn_config)print('dataset finish')
# 创建数据加载器
dataloader_train = DataLoader(dataset_train, batch_size=dnn_config["batch_size"], shuffle=False, collate_fn=my_collate_fn)
dataloader_test = DataLoader(dataset_test, batch_size=dnn_config["batch_size"], shuffle=False, collate_fn=my_collate_fn)
print('dataloader finish')# 初始化模型、损失函数和优化器
model = MyModel(dnn_config)
criterion = nn.BCEWithLogitsLoss() # 二元交叉熵损失
optimizer = torch.optim.Adam(model.parameters(), lr=0.004)
writer = SummaryWriter() # 用于TensorBoard可视化# 创建模型保存目录
os.makedirs('models/dnn2', exist_ok=True)def train_model(train_loader, test_loader, model, criterion, optimizer, num_epochs=2):"""模型训练主函数参数:train_loader: 训练数据加载器test_loader: 测试数据加载器model: 神经网络模型criterion: 损失函数optimizer: 优化器num_epochs: 训练轮数"""total_step = 0for epoch in range(num_epochs):model.train() # 设置为训练模式for features,labels in train_loader:# 前向传播labels = labelsoptimizer.zero_grad() # 清除梯度outputs = model(features)labels = torch.unsqueeze(labels,dim=1)loss = criterion(outputs, labels)# 反向传播和优化loss.backward()optimizer.step()total_step += 1# 记录训练损失if (total_step+1)%10 == 0:writer.add_scalar('Training Loss', loss.item(), total_step)# 打印训练进度if (total_step+1)%100 == 0:print(f'Epoch {epoch}, Step {total_step}: Loss={loss.item(): .4f}')# 定期评估模型性能if (total_step+1)%2000 == 0:with torch.no_grad():model.eval() # 设置为评估模式test_preds = []test_targets = []# 在测试集上进行预测for data, target in test_loader:output = model(data)test_preds.extend(output.to('cpu').sigmoid().squeeze().tolist())test_targets.extend(target.squeeze().tolist())# 计算AUC分数test_auc = roc_auc_score(test_targets, test_preds)writer.add_scalar('AUC/train', test_auc, total_step)# 保存模型检查点torch.save(model.state_dict(), f'models/dnn2/model_epoch_{epoch}_{total_step}.pth')model.train()# 每个epoch结束后保存模型torch.save(model.state_dict(), f'models/dnn2/model_epoch_{epoch}.pth')# 训练结束后的最终评估with torch.no_grad():model.eval()test_preds = []test_targets = []for data, target in test_loader:output = model(data)test_preds.extend(output.to('cpu').sigmoid().squeeze().tolist())test_targets.extend(target.squeeze().tolist())test_auc = roc_auc_score(test_targets, test_preds)writer.add_scalar('AUC/train', test_auc, total_step)# 开始训练
train_model(dataloader_train, dataloader_test, model, criterion, optimizer)
writer.close() # 关闭TensorBoard写入器
dnn_model_test
from torch.utils.data import DataLoader
import torch
from dataset.dnn_dataset import MyPriorDataset
from model.dnn_model import MyModel
from config.ak_config import config as ak_config
from config.dnn_config import config as dnn_config
from utils.get_data import get_data_test
from utils.get_data import calculate_top_k_ratio
from utils.get_data import my_collate_fn# 定义要测试的品牌列表
brands = ['b47686','b56508','b62063','b78739']
for brand_id in brands:# 设置模型路径和评估参数model_path = './models/focal2/model_epoch_1_27999.pth'# 定义要评估的top-k值列表top_k_list = [1000, 3000, 5000, 10000, 50000]# 获取测试数据test_feature_numpy,test_label = get_data_test(ak_config, brand_id)dataset_test = MyPriorDataset(test_feature_numpy, test_label, dnn_config)# 创建测试数据加载器dataloader_test = DataLoader(dataset_test, batch_size=dnn_config["batch_size"], shuffle=False, collate_fn=my_collate_fn)# 加载模型model = MyModel(dnn_config).to('cpu')model.load_state_dict(torch.load(model_path))model.to('cpu')model.eval() # 设置为评估模式# 进行预测test_preds = []test_targets = []for data, target in dataloader_test:output = model(data)test_preds.extend(output.sigmoid().squeeze().tolist())test_targets.extend(target.squeeze().tolist())# 计算各个top-k的正例比例ratios = calculate_top_k_ratio(test_preds, test_targets, top_k_list)# 打印结果for k, ratio in ratios.items():print(f"Top {k} ratio of positive labels: {ratio:.4f}")# 将结果保存到文件with open(f'models_{brand_id}_top_results_focal.txt', 'w') as f:for k, ratio in ratios.items():f.write(f"Top {k} ratio of positive labels: {ratio:.4f}\n")
训练过程
可以看到 loss 在震荡,调高 smooth 以后可以看出趋势是在下降的。
focal loss
在真实的样本中,有很多样本是非常容易区分的,比如对品牌有加购的用户,很容易预测为0.9+,对于平台没有任何行为的用户,模型很容易预测为0.1。这些样本都是“容易”样本,对于很多样本预测为0.4-0.6的或者,正样本预测为0.1,负样本预测为0.9,这些样本都是“困难”样本。
假设batch为5,loss如下 [0.1, 0.01, 0.5, 0.1, 0.14],其中0.5是困难样本,其他是容易样本,但是困难样本只在整体的loss中贡献58%,模型还有42%的注意力放到了容易样本上,但是容易样本已经学的很好不需要再学习。因此需要让模型专注学习困难样本,提升困难样本的效果。
本方案采用focal loss优化模型loss函数,让模型自动加大对困难样本的专注度,提升模型效果。
Focal loss 出自视觉领域,Focal Loss for Dense Object Detection,主要解决样本不均衡以及困难样本学习问题。
交叉熵损失函数为,
L ( W ) = − y log ( h W ( X ) ) − ( 1 − y ) log ( 1 − h W ( X ) ) L(W) = -y \log(h_W(X)) - (1 - y) \log(1 - h_W(X)) L(W)=−ylog(hW(X))−(1−y)log(1−hW(X))
其中 h W ( X ) h_W(X) hW(X) 表示预测结果为1的概率,则,
L ( W ) = { − log ( h W ( X ) ) , y = 1 − log ( 1 − h W ( X ) ) , y = 0 L(W) = \begin{cases} -\log(h_W(X)), & y = 1 \\ -\log(1 - h_W(X)), & y = 0 \end{cases} L(W)={−log(hW(X)),−log(1−hW(X)),y=1y=0
定义,focal loss为,
L ( W ) = { − ( 1 − h W ( X ) ) γ log ( h W ( X ) ) , y = 1 − ( h W ( X ) ) γ log ( 1 − h W ( X ) ) , y = 0 L(W) = \left\{ \begin{aligned} & -(1 - h_W(X))^\gamma \log(h_W(X)), & y = 1 \\ & -(h_W(X))^\gamma \log(1 - h_W(X)), & y = 0 \\ \end{aligned} \right. L(W)={−(1−hW(X))γlog(hW(X)),−(hW(X))γlog(1−hW(X)),y=1y=0
其中 γ \gamma γ 一般取2。
对于背景中的例子,预测loss为 [0.1, 0.01, 0.5, 0.1, 0.14],假设只看加权项,则计算后加权项为 [0.01, 0.0001, 0.25, 0.01, 0.0196]。
- 0.5 转换为 0.25,降低了50%,而 0.1 转换为 0.01 降低了 90%,“容易”样本降低的比例比“困难”样本大。
- 困难样本 0.5 转换前占整体误差比例为 58%,转换后占整体样本的比例为 86%。转换后,由于“容易”样本降低幅度大,变相提升了“困难”样本的占比。
由于在实际的案例中,负样本中“容易”样本太多,正样本里“困难”太多,通过该转换后,正样本里“困难”样本提升幅度过大,因此还需要加入一个平衡因子:
L ( W ) = { − α ( 1 − h W ( X ) ) γ log ( h W ( X ) ) , y = 1 − ( 1 − α ) ( h W ( X ) ) γ log ( 1 − h W ( X ) ) , y = 0 L(W) = \left\{ \begin{aligned} & -\alpha(1 - h_W(X))^\gamma \log(h_W(X)), & y = 1 \\ & -(1 - \alpha)(h_W(X))^\gamma \log(1 - h_W(X)), & y = 0 \\ \end{aligned} \right. L(W)={−α(1−hW(X))γlog(hW(X)),−(1−α)(hW(X))γlog(1−hW(X)),y=1y=0
其中 α \alpha α 一般取 0.25。
通过 focal loss,模型可以通过模型训练的实际误差对样本动态加权,让模型更关注“困难”样本。
focal_loss
import torch
import torch.nn as nn
from torch.nn.functional import binary_cross_entropy_with_logits# Focal Loss实现类,用于解决类别不平衡问题
class FocalLoss(nn.Module):"""Focal Loss损失函数实现用于解决分类任务中的类别不平衡问题通过降低易分样本的权重,提高难分样本的权重,使模型更关注难分样本"""def __init__(self, alpha=0.25, gamma=2.0):"""初始化Focal Loss参数:alpha: float, 类别权重因子用于平衡正负样本的重要性当正样本较少时,可以增大alpha值gamma: float, 调制因子用于调节易分样本的权重gamma越大,对易分样本的惩罚越大"""super(FocalLoss, self).__init__()self.alpha = alphaself.gamma = gammadef forward(self, logits, targets):"""计算Focal Loss值参数:logits: tensor, 模型输出的原始预测值(未经过sigmoid)targets: tensor, 真实标签值返回:tensor, 计算得到的focal loss均值"""# 计算sigmoid后的预测概率probs = torch.sigmoid(logits)# 计算二元交叉熵损失(不进行reduction)ce_loss = binary_cross_entropy_with_logits(logits, targets, reduction='none')# 计算调制因子# pt表示预测正确的概率:# 当target=1时,pt = prob# 当target=0时,pt = 1 - probpt = torch.where(targets == 1, probs, 1 - probs)# 计算focal weight:(1-pt)^gamma# 预测越准确,权重越小focal_weight = (1 - pt) ** self.gamma# 应用alpha权重# 当target=1时,权重为alpha# 当target=0时,权重为1-alphaalpha_t = torch.where(targets == 1, self.alpha, 1 - self.alpha)# 计算最终的focal lossfocal_loss_value = alpha_t * focal_weight * ce_loss# 返回batch的平均lossreturn focal_loss_value.mean()
评估结果
b47686 韩都衣舍 | b56508 三星手机 | b62063 诺基亚 | b78739 LILY | 平均 | |
---|---|---|---|---|---|
gbdt 品牌样本 | 0.114842 0.209537 0.265950 0.349899 0.533915 | 0.068056 0.104861 0.129861 0.164583 0.284722 | 0.102041 0.190476 0.217687 0.258503 0.353741 | 0.257225 0.361272 0.427746 0.488439 0.624277 | 0.2569 |
gbdt 全样本 | 0.141706 0.230356 0.288113 0.366017 0.537273 | 0.073611 0.103472 0.125000 0.177778 0.289583 | 0.149660 0.217687 0.231293 0.278912 0.394558 | 0.300578 0.416185 0.462428 0.528902 0.627168 | 0.2970 |
dnn bce loss | 0.1209 0.2317 0.2821 0.3627 0.5346 | 0.0521 0.1007 0.1222 0.1667 0.2938 | 0.1293 0.1837 0.2109 0.2517 0.3741 | 0.2746 0.4017 0.4653 0.5145 0.6561 | 0.2865 |
dnn focal loss | 0.1330 0.2250 0.2807 0.3627 0.5319 | 0.0639 0.1076 0.1306 0.1736 0.3021 | 0.1361 0.1973 0.2041 0.2721 0.3810 | 0.2832 0.3960 0.4624 0.5145 0.6676 | 0.2913 |
可以看到focal loss相比交叉熵效果稍微好了一点,而整体来看dnn在top5w方面优于gbdt。
DIN 训练
模型架构
在之前的DNN模型中,输入的用户行为序列只是使用sum pool将用户的行为序列直接叠加在一起,这不利于学习行为序列之间的关系,为提升模型学习品牌之间的关系,本方案引入deep interest network (DIN)算法使用target attention的模式,让模型通过attention自动学习品牌之间的关联关系,从而提升模型的效果。
代码内容
din_model
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.tensorboard import SummaryWriter# DIN模型基础实现
# 主要处理用户的购买行为序列class LocalActivationUnit(nn.Module):"""局部激活单元用于计算用户行为序列与目标商品之间的注意力权重仅处理购买行为序列"""def __init__(self, hidden_units):"""初始化局部激活单元Args:hidden_units: 隐藏层单元数,用于特征交互和注意力计算"""super(LocalActivationUnit, self).__init__()# 第一个全连接层:输入维度是hidden_units的4倍(包含了多种特征交互)self.fc1 = nn.Linear(hidden_units * 4, hidden_units)# 第二个全连接层:将hidden_units维压缩到1维,用于计算注意力分数self.fc2 = nn.Linear(hidden_units, 1)def forward(self, user_behaviors, target_item, mask):"""计算注意力权重参数:user_behaviors: 用户购买行为序列target_item: 目标商品mask: 序列填充掩码返回:user_interests: 加权后的用户兴趣表示"""# 获取序列长度seq_len = user_behaviors.size(1)# 将目标商品扩展到与行为序列相同的维度,便于后续计算target_item = target_item.unsqueeze(1).expand(-1, seq_len, -1)# 构建特征交互:拼接多种特征交互形式# 包括:原始特征、目标商品特征、差异特征、乘积特征interactions = torch.cat([user_behaviors, # 原始用户行为target_item, # 目标商品user_behaviors-target_item, # 行为与目标的差异user_behaviors*target_item # 行为与目标的逐元素乘积], dim=-1)# 通过两层全连接网络计算注意力分数x = torch.relu(self.fc1(interactions)) # 第一层激活attention_logits = self.fc2(x).squeeze(-1) # 第二层得到注意力分数# 使用掩码处理填充位置,将填充位置的注意力分数设为负无穷attention_logits = attention_logits.masked_fill(mask == 0, float('-inf'))# 使用softmax将注意力分数转换为权重attention_weights = F.softmax(attention_logits, dim=1).unsqueeze(-1)# 计算加权后的用户兴趣表示user_interests = torch.sum(attention_weights * user_behaviors, dim=1)return user_interestsclass DinModel(nn.Module):"""基础DIN模型实现只考虑用户的购买行为序列"""def __init__(self, config):"""初始化DIN模型Args:config: 配置字典,包含模型参数设置"""super(DinModel, self).__init__()self.config = config# 创建embedding层,用于将离散特征转换为稠密向量self.embedding = nn.Embedding(num_embeddings=self.config["num_embedding"], # embedding字典大小embedding_dim=self.config["embedding_dim"], # embedding向量维度padding_idx=0 # 填充token的索引)# 创建多层全连接网络self.fc1 = nn.Linear(self.config["embedding_dim"]*len(self.config["feature_col"]), 512)self.fc2 = nn.Linear(512,128)self.fc3 = nn.Linear(128,1)# 创建局部激活单元,用于计算注意力self.att = LocalActivationUnit(self.config["embedding_dim"])def forward(self, features, mask):"""模型前向传播特点:- 只处理购买行为序列(pay_brand_seq)- 其他特征直接进行embedding sum"""# 存储各特征的embedding结果embedding_dict = {}# 对每个特征进行embedding处理for ff in self.config["feature_col"]:if ff != 'pay_brand_seq': # 非序列特征直接sumembedding_dict[ff] = torch.sum(self.embedding(features[ff]), dim=1)# 使用注意力机制处理行为序列att_emb = self.att(self.embedding(features['pay_brand_seq']), # 用户历史行为序列embedding_dict['target_brand_id'], # 目标商品mask['pay_brand_seq'] # 序列掩码)# 拼接所有特征(除了行为序列)x = torch.cat([embedding_dict[ff] for ff in self.config["feature_col"] if ff != 'pay_brand_seq'], dim=1)# 拼接注意力处理后的序列特征x = torch.cat([x,att_emb], dim=1)# 通过多层全连接网络x = F.relu(self.fc1(x)) # 第一层x = F.relu(self.fc2(x)) # 第二层x = self.fc3(x) # 输出层return x
din_model_train
import os
from torch.utils.data import DataLoader
import torch
import torch.nn as nn
from torch.utils.tensorboard import SummaryWriterfrom dataset.dnn_dataset import MyPriorDataset
from model.din_model import DinModel
from config.ak_config import config as ak_config
from config.dnn_config import config as dnn_config
from utils.get_data import get_data, calculate_top_k_ratio, get_data_test
from utils.get_data import seq_collate_fntrain_feature_numpy,test_feature_numpy,train_label,test_label = get_data(ak_config)
dataset_train = MyPriorDataset(train_feature_numpy, train_label,dnn_config)
print('train dataset finish')
dataloader_train = DataLoader(dataset_train, batch_size=dnn_config["batch_size"], shuffle=False, collate_fn=seq_collate_fn)
print('train dataloader finish')brands = ['b47686','b56508','b62063','b78739']
dataset_test_dict = {}
dataloader_test_dict = {}
for brand_id in brands: test_feature_numpy,test_label = get_data_test(ak_config, brand_id)dataset_test_dict[brand_id] = MyPriorDataset(test_feature_numpy, test_label, dnn_config)dataloader_test_dict[brand_id] = DataLoader(dataset_test_dict[brand_id], batch_size=2048, shuffle=False, collate_fn=seq_collate_fn)
print('test data finish')device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = DinModel(dnn_config).to(device)
criterion = nn.BCEWithLogitsLoss()
#criterion = FocalLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.004)
log_dir = './runs_din4'
writer = SummaryWriter(log_dir)os.makedirs('models/din4', exist_ok=True)
os.makedirs(log_dir, exist_ok=True)def train_model(train_loader, test_loader_dict, model, criterion, optimizer, num_epochs=3):"""DIN模型训练主函数参数:train_loader: 训练数据加载器test_loader_dict: 测试数据加载器字典(按品牌分类)model: DIN模型criterion: 损失函数optimizer: 优化器num_epochs: 训练轮数"""total_step = 0for epoch in range(num_epochs):model.train() # 设置为训练模式for features,mask,labels in train_loader:# 将数据移到指定设备for ff in dnn_config["feature_col"]:features[ff] = features[ff].to(device)mask[ff] = mask[ff].to(device)labels = labels.to(device) # 前向传播和优化optimizer.zero_grad()outputs = model(features,mask)labels = torch.unsqueeze(labels,dim=1)loss = criterion(outputs, labels)loss.backward()optimizer.step()total_step += 1# 记录训练损失if (total_step+1)%10 == 0:writer.add_scalar('Training Loss', loss.item(), total_step)# 打印训练进度if (total_step+1)%100 == 0:print(f'Epoch {epoch}, Step {total_step}: Loss={loss.item(): .4f}')# 定期评估模型性能if (total_step+1)%7000 == 0:with torch.no_grad():model.eval()# 对每个品牌进行评估for brand_id in brands:top_k_list = [1000, 3000, 5000, 10000, 50000]test_preds = []test_targets = []# 在测试集上进行预测for data,mask, target in test_loader_dict[brand_id]:output = model(data,mask)test_preds.extend(output.sigmoid().squeeze().tolist())test_targets.extend(target.squeeze().tolist())# 计算并保存评估结果ratios = calculate_top_k_ratio(test_preds, test_targets, top_k_list)for k, ratio in ratios.items():print(f"{brand_id} Top {k} ratio of positive labels: {ratio:.4f}")with open(f'{log_dir}/models_{brand_id}_top_results_din_{total_step}.txt', 'w') as f:for k, ratio in ratios.items():f.write(f"Top {k} ratio of positive labels: {ratio:.4f}\n")# 保存模型检查点torch.save(model.state_dict(), f'models/din4/model_epoch_{epoch}_{total_step}.pth')model.train()# 每个epoch结束后保存模型torch.save(model.state_dict(), f'models/din4/model_epoch_{epoch}.pth')# 训练结束后的最终评估with torch.no_grad():model.eval()for brand_id in brands:top_k_list = [1000, 3000, 5000, 10000, 50000]test_preds = []test_targets = []for data,mask, target in test_loader_dict[brand_id]:output = model(data,mask)test_preds.extend(output.sigmoid().squeeze().tolist())test_targets.extend(target.squeeze().tolist())ratios = calculate_top_k_ratio(test_preds, test_targets, top_k_list)for k, ratio in ratios.items():print(f"{brand_id} Top {k} ratio of positive labels: {ratio:.4f}")with open(f'{log_dir}/models_{brand_id}_top_results_din_{total_step}.txt', 'w') as f:for k, ratio in ratios.items():f.write(f"Top {k} ratio of positive labels: {ratio:.4f}\n")# 开始训练
train_model(dataloader_train, dataloader_test_dict, model, criterion, optimizer)
writer.close() # 关闭TensorBoard写入器
din_model2
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.tensorboard import SummaryWriter# DIN模型增强版实现
# 同时处理用户的点击和购买行为序列class LocalActivationUnit(nn.Module):"""局部激活单元用于计算用户行为序列与目标商品之间的注意力权重可同时处理点击和购买行为序列"""def __init__(self, hidden_units):"""初始化局部激活单元Args:hidden_units: 隐藏层单元数,用于特征交互和注意力计算"""super(LocalActivationUnit, self).__init__()# 第一个全连接层:将4倍hidden_units(包含四种特征交互)映射到hidden_unitsself.fc1 = nn.Linear(hidden_units * 4, hidden_units)# 第二个全连接层:将hidden_units映射到1维注意力分数self.fc2 = nn.Linear(hidden_units, 1)def forward(self, user_behaviors, target_item, mask):"""前向传播函数Args:user_behaviors: 用户历史行为序列 shape: (batch_size, seq_len, hidden_units)target_item: 目标商品 shape: (batch_size, hidden_units)mask: 序列填充掩码 shape: (batch_size, seq_len)"""# 获取序列长度seq_len = user_behaviors.size(1)# 扩展目标商品维度以匹配序列长度target_item = target_item.unsqueeze(1).expand(-1, seq_len, -1)# 构建特征交互向量,包含四种交互方式:# 1. 原始用户行为# 2. 目标商品# 3. 用户行为与目标商品的差异# 4. 用户行为与目标商品的元素积interactions = torch.cat([user_behaviors, target_item, user_behaviors-target_item, user_behaviors*target_item], dim=-1)# 通过两层全连接网络计算注意力分数x = torch.relu(self.fc1(interactions)) # 第一层带ReLU激活attention_logits = self.fc2(x).squeeze(-1) # 第二层得到注意力分数# 使用掩码处理填充位置,将填充位置的注意力分数设为负无穷attention_logits = attention_logits.masked_fill(mask == 0, float('-inf'))# 使用softmax将注意力分数归一化为权重attention_weights = F.softmax(attention_logits, dim=1).unsqueeze(-1)# 计算加权后的用户兴趣表示user_interests = torch.sum(attention_weights * user_behaviors, dim=1)return user_interestsclass DinModel(nn.Module):"""增强版DIN模型实现主要改进:1. 增加了对用户点击行为序列(clk_brand_seq)的处理2. 分别对点击序列和购买序列使用注意力机制3. 融合两种行为序列的用户兴趣表示"""def __init__(self, config):"""初始化DIN模型Args:config: 配置字典,包含模型参数设置"""super(DinModel, self).__init__()self.config = config# 创建embedding层,用于特征的稠密表示self.embedding = nn.Embedding(num_embeddings=self.config["num_embedding"], # embedding字典大小embedding_dim=self.config["embedding_dim"], # embedding向量维度padding_idx=0 # 填充token的索引)# 构建三层全连接网络self.fc1 = nn.Linear(self.config["embedding_dim"]*len(self.config["feature_col"]), 512)self.fc2 = nn.Linear(512,128)self.fc3 = nn.Linear(128,1)# 创建局部激活单元,用于计算注意力self.att = LocalActivationUnit(self.config["embedding_dim"])def forward(self, features, mask):"""模型前向传播改进之处:1. 分别处理点击序列(clk_brand_seq)和购买序列(pay_brand_seq)2. 对两个序列分别计算注意力权重3. 将两个序列的注意力结果与其他特征一起拼接参数:features: 输入特征字典,包含点击序列和购买序列mask: 两个序列的填充掩码"""embedding_dict = {}# 处理非序列特征for ff in self.config["feature_col"]:if ff != 'clk_brand_seq' and ff != 'pay_brand_seq':embedding_dict[ff] = torch.sum(self.embedding(features[ff]), dim=1)# 新增:处理点击序列embedding_dict['clk_brand_seq'] = self.att(self.embedding(features['clk_brand_seq']),embedding_dict['target_brand_id'],mask['clk_brand_seq'])# 处理购买序列embedding_dict['pay_brand_seq'] = self.att(self.embedding(features['pay_brand_seq']),embedding_dict['target_brand_id'],mask['pay_brand_seq'])# 拼接所有特征x = torch.cat([embedding_dict[ff] for ff in self.config["feature_col"]], dim=1)# 通过三层全连接网络x = F.relu(self.fc1(x)) # 第一层,使用ReLU激活x = F.relu(self.fc2(x)) # 第二层,使用ReLU激活x = self.fc3(x) # 输出层,无激活函数return x
moe 训练
代码内容
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.tensorboard import SummaryWriter# 局部激活单元:用于计算用户行为序列的注意力权重
class LocalActivationUnit(nn.Module):"""局部激活单元与DIN模型中的注意力机制相同,用于计算用户行为序列与目标商品的相关性"""def __init__(self, hidden_units):"""初始化局部激活单元参数:hidden_units: 隐藏层维度"""super(LocalActivationUnit, self).__init__()self.fc1 = nn.Linear(hidden_units * 4, hidden_units) # 第一层全连接self.fc2 = nn.Linear(hidden_units, 1) # 第二层全连接def forward(self, user_behaviors, target_item, mask):"""计算注意力权重参数:user_behaviors: 用户行为序列 [batch_size, seq_len, hidden_units]target_item: 目标商品 [batch_size, hidden_units]mask: 序列掩码 [batch_size, seq_len]"""seq_len = user_behaviors.size(1)target_item = target_item.unsqueeze(1).expand(-1, seq_len, -1)# 特征交互interactions = torch.cat([user_behaviors, target_item, user_behaviors-target_item, user_behaviors*target_item], dim=-1)# 计算注意力分数x = torch.relu(self.fc1(interactions))attention_logits = self.fc2(x).squeeze(-1)# 掩码处理attention_logits = attention_logits.masked_fill(mask == 0, float('-inf'))attention_weights = F.softmax(attention_logits, dim=1).unsqueeze(-1)# 加权求和得到用户兴趣表示user_interests = torch.sum(attention_weights * user_behaviors, dim=1)return user_interests# 专家网络:每个专家独立学习特定的特征模式
class Expert(nn.Module):"""专家网络实现了一个三层全连接网络,作为单个专家的计算单元"""def __init__(self, input_size):"""初始化专家网络参数:input_size: 输入特征维度"""super(Expert, self).__init__()self.network = nn.Sequential(nn.Linear(input_size, 512), # 第一层nn.ReLU(),nn.Linear(512, 128), # 第二层nn.ReLU(),nn.Linear(128, 1) # 输出层)def forward(self, x):"""专家网络的前向计算"""return self.network(x)# 门控网络:学习如何分配任务给不同的专家
class Gate(nn.Module):"""门控网络负责为每个样本动态分配专家的权重"""def __init__(self, input_size, num_experts):"""初始化门控网络参数:input_size: 输入特征维度num_experts: 专家数量"""super(Gate, self).__init__()self.network = nn.Sequential(nn.Linear(input_size, 64), # 第一层nn.ReLU(),nn.Linear(64, 32), # 第二层nn.ReLU(),nn.Linear(32, num_experts) # 输出层,维度等于专家数量)def forward(self, x):"""计算每个专家的权重"""return self.network(x)# 混合专家模型:组合多个专家的预测结果
class MoeModel(nn.Module):"""混合专家模型(Mixture of Experts)结合了DIN的注意力机制和MoE的专家混合机制"""def __init__(self, config):"""初始化MoE模型参数:config: 配置字典,包含模型参数"""super(MoeModel, self).__init__()self.config = config# embedding层self.embedding = nn.Embedding(num_embeddings=self.config["num_embedding"],embedding_dim=self.config["embedding_dim"],padding_idx=0)# 注意力机制self.att = LocalActivationUnit(self.config["embedding_dim"])# 创建多个专家self.experts = nn.ModuleList([Expert(self.config["embedding_dim"]*len(self.config["feature_col"]))for _ in range(self.config["num_experts"])])# 门控网络self.gate = Gate(self.config["embedding_dim"]*len(self.config["features_gate_col"]),self.config["num_experts"])def forward(self, features, mask):"""模型前向传播参数:features: 输入特征字典mask: 序列特征的掩码返回:x: 最终预测结果gating_weights: 专家权重分布"""# 特征embeddingembedding_dict = {}for ff in self.config["feature_col"]:if ff != 'pay_brand_seq':embedding_dict[ff] = torch.sum(self.embedding(features[ff]), dim=1)# 处理序列特征embedding_dict['pay_brand_seq'] = self.att(self.embedding(features['pay_brand_seq']),embedding_dict['target_brand_id'],mask['pay_brand_seq'])# 特征拼接x = torch.cat([embedding_dict[ff] for ff in self.config["feature_col"]], dim=1)# 计算门控特征gate_emb = torch.cat([embedding_dict[ff] for ff in self.config["features_gate_col"]], dim=1)# 计算专家权重gating_weights = F.softmax(self.gate(gate_emb), dim=1)# 获取所有专家的输出并加权组合expert_outputs = torch.stack([expert(x) for expert in self.experts], dim=-1)x = torch.sum(gating_weights * expert_outputs.squeeze(), dim=-1)return x, gating_weights
# 导入必要的库
import pandas as pd
import lightgbm as lgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score
import numpy as np
import os
from odps import ODPS
from odps.df import DataFrame
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.tensorboard import SummaryWriter# 导入自定义模块
from dataset.dnn_dataset import MyDataset
from dataset.dnn_dataset import MyPriorDataset
from model.moe_model import MoeModel # 使用混合专家模型(Mixture of Experts)
from config.ak_config import config as ak_config
from config.dnn_config import config as dnn_config
from utils.get_data import get_data,get_data_test,calculate_top_k_ratio
from utils.get_data import my_collate_fn, seq_collate_fn # 序列数据的特殊整理函数
from loss.focal_loss import FocalLoss# 获取训练数据并创建数据集
train_feature_numpy,test_feature_numpy,train_label,test_label = get_data(ak_config)
dataset_train = MyPriorDataset(train_feature_numpy, train_label,dnn_config)
print('dataset finish')
# 创建训练数据加载器,使用序列整理函数
dataloader_train = DataLoader(dataset_train, batch_size=dnn_config["batch_size"], shuffle=False, collate_fn=seq_collate_fn)
print('dataloader finish')# 为不同品牌创建测试数据集和数据加载器
brands = ['b47686','b56508','b62063','b78739']
dataset_test_dict = {}
dataloader_test_dict = {}
for brand_id in brands: test_feature_numpy,test_label = get_data_test(ak_config, brand_id)dataset_test_dict[brand_id] = MyPriorDataset(test_feature_numpy, test_label, dnn_config)dataloader_test_dict[brand_id] = DataLoader(dataset_test_dict[brand_id], batch_size=2048, shuffle=False, collate_fn=seq_collate_fn)
print('test data finish')# 设置设备(GPU/CPU)并初始化模型、损失函数和优化器
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = MoeModel(dnn_config).to(device) # 初始化MoE模型
criterion = nn.BCEWithLogitsLoss() # 二元交叉熵损失函数
#criterion = FocalLoss() # Focal Loss损失函数(已注释)
optimizer = torch.optim.Adam(model.parameters(), lr=0.004)
log_dir='./runs_moe4' # TensorBoard日志目录
writer = SummaryWriter(log_dir=log_dir)# 创建保存模型和日志的目录
os.makedirs('models/moe4', exist_ok=True)
os.makedirs(log_dir, exist_ok=True)def train_model(train_loader, test_loader_dict, model, criterion, optimizer, num_epochs=3):"""模型训练函数Args:train_loader: 训练数据加载器test_loader_dict: 测试数据加载器字典(按品牌分类)model: MoE神经网络模型criterion: 损失函数optimizer: 优化器num_epochs: 训练轮数"""total_step = 0for epoch in range(num_epochs):model.train() # 设置为训练模式for features,mask,labels in train_loader:# 将特征和掩码数据移到指定设备(GPU/CPU)for ff in dnn_config["feature_col"]:features[ff] = features[ff].to(device)mask[ff] = mask[ff].to(device)labels = labels.to(device) # 训练步骤optimizer.zero_grad() # 清空梯度outputs, _ = model(features,mask) # 前向传播,包含特征和掩码outputs = torch.unsqueeze(outputs,dim=1) # 调整输出维度labels = torch.unsqueeze(labels,dim=1) # 调整标签维度loss = criterion(outputs, labels) # 计算损失loss.backward() # 反向传播optimizer.step() # 更新参数total_step += 1# 记录训练损失到TensorBoardif (total_step+1)%10 == 0:writer.add_scalar('Training Loss', loss.item(), total_step)# 打印训练进度if (total_step+1)%100 == 0:print(f'Epoch {epoch}, Step {total_step}: Loss={loss.item(): .4f}')# 每7000步进行一次评估if (total_step+1)%7000 == 0:with torch.no_grad():model.eval() # 设置为评估模式for brand_id in brands:# 设置需要评估的top-k值top_k_list = [1000, 3000, 5000, 10000, 50000]test_preds = []test_targets = []# 收集预测结果和真实标签for data,mask, target in test_loader_dict[brand_id]:output, _ = model(data,mask)test_preds.extend(output.sigmoid().squeeze().tolist())test_targets.extend(target.squeeze().tolist())# 计算并输出top-k的正例比例ratios = calculate_top_k_ratio(test_preds, test_targets, top_k_list)for k, ratio in ratios.items():print(f"{brand_id} Top {k} ratio of positive labels: {ratio:.4f}")# 保存评估结果到文件with open(f'{log_dir}/models_{brand_id}_top_results_moe_{total_step}.txt', 'w') as f:for k, ratio in ratios.items():f.write(f"Top {k} ratio of positive labels: {ratio:.4f}\n")# 保存模型检查点torch.save(model.state_dict(), f'models/moe4/model_epoch_{epoch}_{total_step}.pth')model.train() # 切回训练模式# 每个epoch结束后保存模型torch.save(model.state_dict(), f'models/moe4/model_epoch_{epoch}.pth')# 开始训练并关闭TensorBoard写入器
train_model(dataloader_train, dataloader_test_dict, model, criterion, optimizer)
writer.close()
评估结果
b47686 韩都衣舍 | b56508 三星手机 | b62063 诺基亚 | b78739 LILY | 平均 | |
---|---|---|---|---|---|
gbdt 品牌样本 | 0.349899 0.533915 | 0.164583 0.284722 | 0.258503 0.353741 | 0.488439 0.624277 | 0.3823 |
gbdt 全样本 | 0.366017 0.537273 | 0.177778 0.289583 | 0.278912 0.394558 | 0.528902 0.627168 | 0.4002 |
dnn bce loss | 0.3627 0.5346 | 0.1667 0.2938 | 0.2517 0.3741 | 0.5145 0.6561 | 0.3943 |
dnn focal loss | 0.3627 0.5319 | 0.1736 0.3021 | 0.2721 0.3810 | 0.5145 0.6676 | 0.4007 |
din | 0.3620 0.5332 | 0.1757 0.3028 | 0.2585 0.4014 | 0.5087 0.6532 | 0.3994 |
moe focal loss | 0.3553 0.5359 | 0.1722 0.2993 | 0.2789 0.3673 | 0.5231 0.6618 | 0.3992 |