四时宝库

程序员的知识宝库

使用TensorFlow和TensorFlow变换为个性化建议构建协作过滤模型

向用户推荐巧克力是一个协作过滤问题

在本文中,我将用Apache Beam替换原始解决方案中的Pandas - 这将允许解决方案更容易扩展到更大的数据集。由于解决方案中存在上下文,因此我将简单介绍一下技术细节。

第1步:提取原始数据

对于协作过滤,我们不需要知道关于用户或内容的任何属性。实质上,我们需要知道的仅仅是userId,itemId和特定用户给出特定项目的评级。在这种情况下,我们可以使用页面上的时间作为评级的代理。Google Analytics 360将网络流量信息导出到BigQuery,我从BigQuery中提取数据:

#standardSQL

WITH visitor_page_content AS (

SELECT

fullVisitorID,

(SELECT MAX(IF(index=10, value, NULL)) FROM UNNEST(hits.customDimensions)) AS latestContentId,

(LEAD(hits.time, 1) OVER (PARTITION BY fullVisitorId ORDER BY hits.time ASC) - hits.time) AS session_duration

FROM `cloud-training-demos.GA360_test.ga_sessions_sample`,

UNNEST(hits) AS hits

WHERE

# only include hits on pages

hits.type = "PAGE"

GROUP BY

fullVisitorId, latestContentId, hits.time

)

# aggregate web stats

SELECT

fullVisitorID as visitorId,

latestContentId as contentId,

SUM(session_duration) AS session_duration

FROM visitor_page_content

WHERE latestContentId IS NOT NULL

GROUP BY fullVisitorID, latestContentId

HAVING session_duration > 0

ORDER BY latestContentId

查询本身特定于设置Google Analytics的方式 - 具体而言,设置自定义维度的方式 - 您可能必须使用其他查询将数据提取到与此表类似的内容中:

这是进行协作过滤所需的原始数据集。很明显,您将用于访问ID,contentId和评分的内容取决于您的问题。除此之外,其他一切都很标准,你应该可以按原样使用它。

第2步:创建枚举的用户和项目标识

WALS算法要求枚举用户id和item id,即它们应该简单地是交互矩阵中的行号和列号。所以,我们需要获取上面的visitorId,它是一个字符串,并将它们映射到0,1,2,...。我们需要为item id做同样的事情。此外,评级必须是小数字,通常为0-1。所以,我们必须扩大session_duration。

为了做到这一点,我们将使用TensorFlow Transform(TFT) - 这是一个库,允许您使用Apache Beam创建预处理数据集进行训练,然后在推理期间将预处理作为TensorFlow图的一部分应用!

以下是使用TFT进行预处理功能的关键:

def preprocess_tft(rowdict):

median = 57937

result = {

'userId' : tft.string_to_int(rowdict['visitorId'], vocab_filename='vocab_users'),

'itemId' : tft.string_to_int(rowdict['contentId'], vocab_filename='vocab_items'),

'rating' : 0.3 * (1 + (rowdict['session_duration'] - median)/median)

}

# cap the rating at 1.0

result['rating'] = tf.where(tf.less(result['rating'], tf.ones(tf.shape(result['rating']))),

result['rating'], tf.ones(tf.shape(result['rating'])))

return result

预处理BigQuery中由visitorId,contentId和session_duration组成的行是一个名为result的Python字典,其中包含三列:userId,itemId和rating。

tft.string_to_int查看整个训练数据集并创建一个枚举访问者的映射,并将映射(“词汇表”)写入文件vocab_users。我为contentId做同样的事情,创建itemId。该评级是通过将session_duration缩放到0-1之内获得的。我的缩放比例基本上限制了很长的会话持续时间的长尾,这可能代表在报纸文章中关闭笔记本电脑的人。关键要注意的是,我使用纯TensorFlow函数(如tf.less和tf.ones)来进行剪裁。这很重要,因为这个预处理函数必须在推理(预测)过程中作为TensorFlow服务图的一部分应用。

使用Apache Beam将预处理函数应用于训练数据集:

transformed_dataset,transform_fn =(

raw_dataset | beam_impl.AnalyzeAndTransformDataset(preprocess_tft))

第3步:写出WALS训练数据集

WALS的训练集由两个文件组成 - 一个提供特定用户评分的所有项目(交互矩阵逐行),另一个提供评估特定项目的所有用户(交互矩阵按列)。显然,这两个文件包含相同的数据,但需要分割数据集以便它们可以并行处理。我们也可以在执行枚举的同一个Apache Beam管道中执行此操作:

users_for_item =(transformed_data

|'map_items'>> beam.Map(lambda x:(x ['itemId'],x))

|'group_items'>> beam.GroupByKey()

|'totfr_items'>> beam.Map(lambda item_userlist:to_tfrecord(item_userlist,'userId')))

然后,我们可以在Cloud Dataflow上执行Apache Beam。

此时,我们将拥有以下文件:

items_for_user-00000-of-00003

...

users_for_item-00000-of-00004

...

transform_fn/transform_fn/saved_model.pb

transform_fn/transform_fn/assets/

transform_fn/transform_fn/assets/vocab_items

transform_fn/transform_fn/assets/vocab_users

  1. ```users_for_item```包含TFExample格式中每个项目的所有用户/评级。这里的项目和用户是整数(而不是字符串),即itemId不是contentId,userId不是visitorId。评分被缩放。

  2. ``items_for_user```包含TFExample格式中每个用户的所有项目/评级。这里的项目和用户是整数(而不是字符串),即itemId不是contentId,userId不是visitorId。评分被缩放。

  3. ```vocab_items```包含从contentId到枚举itemId的映射

  4. `vocab_items``包含visitorId到枚举userId的映射

  5. saved_model.pb包含我们在预处理期间所做的所有tensorflow转换,以便它们可以在预测期间应用。

第4步:编写TensorFlow代码

TensorFlow中有一个基于Estimator API的WALS实现。我们按照我们使用任何其他Estimator的方式使用它 - 请参阅GitHub仓库中的函数read_dataset()和train_and_evaluate()。

更有趣的是我们如何使用训练好的估计器进行批量预测。对于特定的用户,我们希望找到前K个项目。这可以在TensorFlow中使用:

def find_top_k(user,item_factors,k):

all_items = tf.matmul(tf.expand_dims(user,0),tf.transpose(item_factors))

topk = tf.nn.top_k(all_items,k = k)

return tf.cast (topk.indices,dtype = tf.int64)

批量预测包括为每个用户调用上述函数,但要确保在我们写出输出时,我们写出字符串visitorId而不是数字userId(对于contentId / userId类似):

def batch_predict(args):

import numpy as np

# read vocabulary into Python list for quick index-ed lookup

def create_lookup(filename):

from tensorflow.python.lib.io import file_io

dirname = os.path.join(args['input_path'], 'transform_fn/transform_fn/assets/')

with file_io.FileIO(os.path.join(dirname, filename), mode='r') as ifp:

return [x.rstrip() for x in ifp]

originalItemIds = create_lookup('vocab_items')

originalUserIds = create_lookup('vocab_users')

with tf.Session() as sess:

estimator = tf.contrib.factorization.WALSMatrixFactorization(

num_rows=args['nusers'], num_cols=args['nitems'],

embedding_dimension=args['n_embeds'],

model_dir=args['output_dir'])

# but for in-vocab data, the row factors are already in the checkpoint

user_factors = tf.convert_to_tensor(estimator.get_row_factors()[0]) # (nusers, nembeds)

# in either case, we have to assume catalog doesn't change, so col_factors are read in

item_factors = tf.convert_to_tensor(estimator.get_col_factors()[0])# (nitems, nembeds)

# for each user, find the top K items

topk = tf.squeeze(tf.map_fn(lambda user: find_top_k(user, item_factors, args['topk']),

user_factors, dtype=tf.int64))

with file_io.FileIO(os.path.join(args['output_dir'], 'batch_pred.txt'), mode='w') as f:

for userId, best_items_for_user in enumerate(topk.eval()):

f.write(originalUserIds[userId] + '\t') # write userId \t item1,item2,item3...

f.write(','.join(originalItemIds[itemId] for itemId in best_items_for_user) + '\n')

要进行培训和批量预测,我们可以在Cloud ML Engine上运行TensorFlow模型,同时不需要考虑任何基础架构:

gcloud ml-engine jobs submit training $JOBNAME \

--region=$REGION \

--module-name=trainer.task \

--package-path=${PWD}/wals_tft/trainer \

--job-dir=$OUTDIR \

--staging-bucket=gs://$BUCKET \

--scale-tier=BASIC_GPU \

--runtime-version=1.5 \

-- \

--output_dir=$OUTDIR \

--input_path=gs://${BUCKET}/wals/preproc_tft \

--num_epochs=10 --nitems=5668 --nusers=82802

像这样硬编码nitems和nusers是有点丑陋的。因此,我们可以返回到Beam管道,并将文件和核心文件写入文件,然后简单地使用“gsutil cat”来获取适当的值--GitHub上的完整代码执行此操作。

以下是输出结果的例子:

6167894456739729438 298997422,262707977,263058146

3795498541234027150 296993188,97034003,298989783

基本上,你为每个visitorId获得3个项目。

第5步:行和列因素

虽然产品推荐是WALS的关键用例,但另一个用例是找到表示产品和用户的低维方式,例如,通过对产品因素和列因子进行聚类来进行产品或客户细分。所以,我们实现了一个服务函数来将这些提供给调用者:

def for_user_embeddings(originalUserId):

# convert the userId that the end-user provided to integer

originalUserIds = tf.contrib.lookup.index_table_from_file(

os.path.join(args['input_path'], 'transform_fn/transform_fn/assets/vocab_users'))

userId = originalUserIds.lookup(originalUserId)

# all items for this user (for user_embeddings)

items = tf.range(args['nitems'], dtype=tf.int64)

users = userId * tf.ones([args['nitems']], dtype=tf.int64)

ratings = 0.1 * tf.ones_like(users, dtype=tf.float32)

return items, users, ratings, tf.constant(True)

Orchestration

请注意,本文仅用于替换原始解决方案的机器学习培训和批量预测部分。原始解决方案还解释了如何进行编排和过滤。它们在哪里适合?

在这一点上,我们现在有一个BigQuery查询,一个Beam / Dataflow管道和一个AppEngine应用程序(见下文)。你如何一个接一个地定期运行它们?按照解决方案中的建议使用Apache Airflow来执行此编排。

过滤

如果你向顾客推荐巧克力,那么推荐他们已经尝试过的巧克力是可以的,但是如果你向用户推荐新闻文章,那么重要的是你不要推荐他们已经阅读过的文章。

与原始解决方案不同,我的批量预测代码不会过滤掉用户已阅读的文章。如果重要的是建议不包括已经阅读/购买的项目,那么有两种方法可以做到这一点。

更简单的方法是在找到top_k之前将与已读项目相对应的条目(这里是评级<0.01的条目)

def find_top_k(user, item_factors, read_items, k):

all_items = tf.matmul(tf.expand_dims(user, 0),

tf.transpose(item_factors))

all_items = tf.where(tf.less(read_items,

0.01*tf.ones(tf.shape(read_items))),

all_items,

tf.zeros(tf.shape(all_items)))

topk = tf.nn.top_k(all_items, k=k)

return tf.cast(topk.indices, dtype=tf.int64)

问题在于滞后 - 您可能不推荐用户昨天阅读的内容(因为它在您的训练数据集中),但批量预测代码可以实时访问阅读文章流,因此您将推荐他们几分钟前阅读的文章。

如果这个滞后是你想避免的问题,那么你应该让批量预测中的k高得多(例如,即使你只推荐其中5个,你也可以从推荐人那里得到20篇文章),然后在AppEngine中进行第二级过滤,如原始解决方案中所建议的。

发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言
    友情链接