向用户推荐巧克力是一个协作过滤问题
在本文中,我将用Apache Beam替换原始解决方案中的Pandas - 这将允许解决方案更容易扩展到更大的数据集。由于解决方案中存在上下文,因此我将简单介绍一下技术细节。
第1步:提取原始数据
对于协作过滤,我们不需要知道关于用户或内容的任何属性。实质上,我们需要知道的仅仅是userId,itemId和特定用户给出特定项目的评级。在这种情况下,我们可以使用页面上的时间作为评级的代理。Google Analytics 360将网络流量信息导出到BigQuery,我从BigQuery中提取数据:
#standardSQL
WITH visitor_page_content AS (
SELECT
fullVisitorID,
(SELECT MAX(IF(index=10, value, NULL)) FROM UNNEST(hits.customDimensions)) AS latestContentId,
(LEAD(hits.time, 1) OVER (PARTITION BY fullVisitorId ORDER BY hits.time ASC) - hits.time) AS session_duration
FROM `cloud-training-demos.GA360_test.ga_sessions_sample`,
UNNEST(hits) AS hits
WHERE
# only include hits on pages
hits.type = "PAGE"
GROUP BY
fullVisitorId, latestContentId, hits.time
)
# aggregate web stats
SELECT
fullVisitorID as visitorId,
latestContentId as contentId,
SUM(session_duration) AS session_duration
FROM visitor_page_content
WHERE latestContentId IS NOT NULL
GROUP BY fullVisitorID, latestContentId
HAVING session_duration > 0
ORDER BY latestContentId
查询本身特定于设置Google Analytics的方式 - 具体而言,设置自定义维度的方式 - 您可能必须使用其他查询将数据提取到与此表类似的内容中:
这是进行协作过滤所需的原始数据集。很明显,您将用于访问ID,contentId和评分的内容取决于您的问题。除此之外,其他一切都很标准,你应该可以按原样使用它。
第2步:创建枚举的用户和项目标识
WALS算法要求枚举用户id和item id,即它们应该简单地是交互矩阵中的行号和列号。所以,我们需要获取上面的visitorId,它是一个字符串,并将它们映射到0,1,2,...。我们需要为item id做同样的事情。此外,评级必须是小数字,通常为0-1。所以,我们必须扩大session_duration。
为了做到这一点,我们将使用TensorFlow Transform(TFT) - 这是一个库,允许您使用Apache Beam创建预处理数据集进行训练,然后在推理期间将预处理作为TensorFlow图的一部分应用!
以下是使用TFT进行预处理功能的关键:
def preprocess_tft(rowdict):
median = 57937
result = {
'userId' : tft.string_to_int(rowdict['visitorId'], vocab_filename='vocab_users'),
'itemId' : tft.string_to_int(rowdict['contentId'], vocab_filename='vocab_items'),
'rating' : 0.3 * (1 + (rowdict['session_duration'] - median)/median)
}
# cap the rating at 1.0
result['rating'] = tf.where(tf.less(result['rating'], tf.ones(tf.shape(result['rating']))),
result['rating'], tf.ones(tf.shape(result['rating'])))
return result
预处理BigQuery中由visitorId,contentId和session_duration组成的行是一个名为result的Python字典,其中包含三列:userId,itemId和rating。
tft.string_to_int查看整个训练数据集并创建一个枚举访问者的映射,并将映射(“词汇表”)写入文件vocab_users。我为contentId做同样的事情,创建itemId。该评级是通过将session_duration缩放到0-1之内获得的。我的缩放比例基本上限制了很长的会话持续时间的长尾,这可能代表在报纸文章中关闭笔记本电脑的人。关键要注意的是,我使用纯TensorFlow函数(如tf.less和tf.ones)来进行剪裁。这很重要,因为这个预处理函数必须在推理(预测)过程中作为TensorFlow服务图的一部分应用。
使用Apache Beam将预处理函数应用于训练数据集:
transformed_dataset,transform_fn =(
raw_dataset | beam_impl.AnalyzeAndTransformDataset(preprocess_tft))
第3步:写出WALS训练数据集
WALS的训练集由两个文件组成 - 一个提供特定用户评分的所有项目(交互矩阵逐行),另一个提供评估特定项目的所有用户(交互矩阵按列)。显然,这两个文件包含相同的数据,但需要分割数据集以便它们可以并行处理。我们也可以在执行枚举的同一个Apache Beam管道中执行此操作:
users_for_item =(transformed_data
|'map_items'>> beam.Map(lambda x:(x ['itemId'],x))
|'group_items'>> beam.GroupByKey()
|'totfr_items'>> beam.Map(lambda item_userlist:to_tfrecord(item_userlist,'userId')))
然后,我们可以在Cloud Dataflow上执行Apache Beam。
此时,我们将拥有以下文件:
items_for_user-00000-of-00003
...
users_for_item-00000-of-00004
...
transform_fn/transform_fn/saved_model.pb
transform_fn/transform_fn/assets/
transform_fn/transform_fn/assets/vocab_items
transform_fn/transform_fn/assets/vocab_users
```users_for_item```包含TFExample格式中每个项目的所有用户/评级。这里的项目和用户是整数(而不是字符串),即itemId不是contentId,userId不是visitorId。评分被缩放。
``items_for_user```包含TFExample格式中每个用户的所有项目/评级。这里的项目和用户是整数(而不是字符串),即itemId不是contentId,userId不是visitorId。评分被缩放。
```vocab_items```包含从contentId到枚举itemId的映射
`vocab_items``包含visitorId到枚举userId的映射
saved_model.pb包含我们在预处理期间所做的所有tensorflow转换,以便它们可以在预测期间应用。
第4步:编写TensorFlow代码
TensorFlow中有一个基于Estimator API的WALS实现。我们按照我们使用任何其他Estimator的方式使用它 - 请参阅GitHub仓库中的函数read_dataset()和train_and_evaluate()。
更有趣的是我们如何使用训练好的估计器进行批量预测。对于特定的用户,我们希望找到前K个项目。这可以在TensorFlow中使用:
def find_top_k(user,item_factors,k):
all_items = tf.matmul(tf.expand_dims(user,0),tf.transpose(item_factors))
topk = tf.nn.top_k(all_items,k = k)
return tf.cast (topk.indices,dtype = tf.int64)
批量预测包括为每个用户调用上述函数,但要确保在我们写出输出时,我们写出字符串visitorId而不是数字userId(对于contentId / userId类似):
def batch_predict(args):
import numpy as np
# read vocabulary into Python list for quick index-ed lookup
def create_lookup(filename):
from tensorflow.python.lib.io import file_io
dirname = os.path.join(args['input_path'], 'transform_fn/transform_fn/assets/')
with file_io.FileIO(os.path.join(dirname, filename), mode='r') as ifp:
return [x.rstrip() for x in ifp]
originalItemIds = create_lookup('vocab_items')
originalUserIds = create_lookup('vocab_users')
with tf.Session() as sess:
estimator = tf.contrib.factorization.WALSMatrixFactorization(
num_rows=args['nusers'], num_cols=args['nitems'],
embedding_dimension=args['n_embeds'],
model_dir=args['output_dir'])
# but for in-vocab data, the row factors are already in the checkpoint
user_factors = tf.convert_to_tensor(estimator.get_row_factors()[0]) # (nusers, nembeds)
# in either case, we have to assume catalog doesn't change, so col_factors are read in
item_factors = tf.convert_to_tensor(estimator.get_col_factors()[0])# (nitems, nembeds)
# for each user, find the top K items
topk = tf.squeeze(tf.map_fn(lambda user: find_top_k(user, item_factors, args['topk']),
user_factors, dtype=tf.int64))
with file_io.FileIO(os.path.join(args['output_dir'], 'batch_pred.txt'), mode='w') as f:
for userId, best_items_for_user in enumerate(topk.eval()):
f.write(originalUserIds[userId] + '\t') # write userId \t item1,item2,item3...
f.write(','.join(originalItemIds[itemId] for itemId in best_items_for_user) + '\n')
要进行培训和批量预测,我们可以在Cloud ML Engine上运行TensorFlow模型,同时不需要考虑任何基础架构:
gcloud ml-engine jobs submit training $JOBNAME \
--region=$REGION \
--module-name=trainer.task \
--package-path=${PWD}/wals_tft/trainer \
--job-dir=$OUTDIR \
--staging-bucket=gs://$BUCKET \
--scale-tier=BASIC_GPU \
--runtime-version=1.5 \
-- \
--output_dir=$OUTDIR \
--input_path=gs://${BUCKET}/wals/preproc_tft \
--num_epochs=10 --nitems=5668 --nusers=82802
像这样硬编码nitems和nusers是有点丑陋的。因此,我们可以返回到Beam管道,并将文件和核心文件写入文件,然后简单地使用“gsutil cat”来获取适当的值--GitHub上的完整代码执行此操作。
以下是输出结果的例子:
6167894456739729438 298997422,262707977,263058146
3795498541234027150 296993188,97034003,298989783
基本上,你为每个visitorId获得3个项目。
第5步:行和列因素
虽然产品推荐是WALS的关键用例,但另一个用例是找到表示产品和用户的低维方式,例如,通过对产品因素和列因子进行聚类来进行产品或客户细分。所以,我们实现了一个服务函数来将这些提供给调用者:
def for_user_embeddings(originalUserId):
# convert the userId that the end-user provided to integer
originalUserIds = tf.contrib.lookup.index_table_from_file(
os.path.join(args['input_path'], 'transform_fn/transform_fn/assets/vocab_users'))
userId = originalUserIds.lookup(originalUserId)
# all items for this user (for user_embeddings)
items = tf.range(args['nitems'], dtype=tf.int64)
users = userId * tf.ones([args['nitems']], dtype=tf.int64)
ratings = 0.1 * tf.ones_like(users, dtype=tf.float32)
return items, users, ratings, tf.constant(True)
Orchestration
请注意,本文仅用于替换原始解决方案的机器学习培训和批量预测部分。原始解决方案还解释了如何进行编排和过滤。它们在哪里适合?
在这一点上,我们现在有一个BigQuery查询,一个Beam / Dataflow管道和一个AppEngine应用程序(见下文)。你如何一个接一个地定期运行它们?按照解决方案中的建议使用Apache Airflow来执行此编排。
过滤
如果你向顾客推荐巧克力,那么推荐他们已经尝试过的巧克力是可以的,但是如果你向用户推荐新闻文章,那么重要的是你不要推荐他们已经阅读过的文章。
与原始解决方案不同,我的批量预测代码不会过滤掉用户已阅读的文章。如果重要的是建议不包括已经阅读/购买的项目,那么有两种方法可以做到这一点。
更简单的方法是在找到top_k之前将与已读项目相对应的条目(这里是评级<0.01的条目)
def find_top_k(user, item_factors, read_items, k):
all_items = tf.matmul(tf.expand_dims(user, 0),
tf.transpose(item_factors))
all_items = tf.where(tf.less(read_items,
0.01*tf.ones(tf.shape(read_items))),
all_items,
tf.zeros(tf.shape(all_items)))
topk = tf.nn.top_k(all_items, k=k)
return tf.cast(topk.indices, dtype=tf.int64)
问题在于滞后 - 您可能不推荐用户昨天阅读的内容(因为它在您的训练数据集中),但批量预测代码可以实时访问阅读文章流,因此您将推荐他们几分钟前阅读的文章。
如果这个滞后是你想避免的问题,那么你应该让批量预测中的k高得多(例如,即使你只推荐其中5个,你也可以从推荐人那里得到20篇文章),然后在AppEngine中进行第二级过滤,如原始解决方案中所建议的。