学习[spark lda]https://spark.apache.org/docs/latest/mllib-clustering.html#latent-dirichlet-allocation-lda() 英文文档,为了学习方便顺便翻译了。

理论参数

狄利克雷分配(Latent Dirichlet allocation LDA)是一个从文本文献集合中推断主题的主题模型。

LDA可以被认为是如下的聚类算法:

  • 主题对应于聚类中心,文档对应于数据集中的示例(数据行)。
  • 主题和文档都存在于特征空间中,其中特征向量是词计数的向量(词袋)。
  • LDA不是使用传统距离来估计聚类,而是使用基于文本文档如何生成的统计模型的函数。

LDA通过setOptimizer函数支持不同的推理算法。 EMLDAOptimizer利用似然函数的期望最大化来学习聚类,并产生综合结果,而OnlineLDAOptimizer使用迭代小批量采样进行在线变分推理,并且通常是内存友好的。

LDA将文档集合作为词计数的向量和以下参数(使用构建器模式进行设置):

  • k:主题数(即集群中心)
  • optimizer:用于学习LDA模型的optimizer,可以是EMLDAOptimizer或OnlineLDAOptimizer
  • docConcentration:Dirichlet参数,用于事先分配文档在主题上的分布。 较大的值可以促进更平滑的推断分配。
  • topicConcentration:Dirichlet参数,用于关于术语(词)的主题分布。 较大的值可以促进更平滑的推断分配。
  • maxIterations:限制迭代次数。
  • checkpointInterval:如果使用点检查(在Spark配置中设置),则此参数指定检查点的创建频率。 如果maxIterations较大,则使用检查点可以帮助减少磁盘上的洗牌文件大小,并有助于故障恢复。

所有spark.mllib的LDA模型都支持:

  • describeTopics:将主题返回为最重要的术语和术语权重数组
  • topicsMatrix:返回一个vocabSize由k矩阵,其中每列是一个主题

注意:在积极开发下,LDA仍然是一个实验性的功能。 因此,某些功能仅在优化程序生成的两个优化程序/模型之一中可用。 目前,分布式模型可以被转换成本地模型,但是反之亦然。

以下讨论将分别描述每个optimizer/model 对。

期望最大化

在EMLDAOptimizer和DistributedLDAModel中实现。
对于提供给LDA的参数:

  • docConcentration:只支持对称先验,因此提供的k维向量中的所有值必须相同。 所有的值也必须是[Math Processing Error]> 1.0。 提供向量(-1)的默认行为(统一的k维向量值为[数学处理错误](50 / k)+1
  • topicConcentration:只支持对称的先验值,值必须为[Math Processing Error]> 1.0。 导致默认值为[数学处理错误] 0.1 + 1
  • maxIterations:EM迭代的最大次数。

注意:做足够的迭代很重要。 在早期迭代中,EM经常具有无用的主题,但是这些主题经过多次迭代后会显着提高。 根据你的数据集,使用至少20次和50-100次迭代通常是合理的。

EMLDAOptimizer生成一个DistributedLDAModel,它不仅存储推断的主题,而且存储训练语料库中每个文档的完整训练语料库和主题分布。 DistributedLDAModel支持:

  • topTopicsPerDocument:训练语料库中每个文档的热门主题及其权重
  • topDocumentsPerTopic:每个主题的顶端文档以及文档中主题的相应权重。
  • logPrior:给定超参数的估计主题和文档 - 主题分布的对数概率docConcentration和
    topicConcentration
  • logLikelihood:给定推断主题和文档 - 主题分布的训练语料库的对数似然性

在线变分贝叶斯

在OnlineLDAOptimizer和LocalLDAModel中实现。 对于提供给LDA的参数:

  • docConcentration:不对称的先验可以通过在每个k维中传入一个值等于Dirichlet参数的向量来使用。 值应该是[数学处理错误]> = 0。 提供向量(-1)导致默认行为(具有值[数学处理错误](1.0 / k)的统一k维向量)
  • topicConcentration:仅支持对称的先验。 值必须是[数学处理错误]> = 0。 提供-1结果的默认值为[数学处理错误](1.0 / k)。
  • maxIterations:要提交的最大小装数。

另外,OnlineLDAOptimizer接受以下参数:

  • miniBatchFraction:在每次迭代中采样和使用的语料库的分数
  • optimizeDocConcentration:如果设置为true,则在每个小批次之后执行超参数docConcentration(aka)的最大似然估计,并在返回的LocalLDAModel
  • tau0和kappa中设置优化docConcentration:用于学习 这是通过[数学处理误差](τ0+ iter)-κ来计算的,其中[数学处理误差] iter是当前的迭代次数。

OnlineLDAOptimizer生成一个LocalLDAModel,它只存储推断的主题。 LocalLDAModel支持:

  • logLikelihood(文档):根据推断的主题计算提供文档的下限。
  • logPerplexity(文档):根据推断的主题计算所提供文档的困惑度的上限。

示例

在下面的例子中,我们加载了表示文档语料库的字数统计向量。 然后,我们使用LDA从文档中推断出三个主题。 所需簇的数量传递给算法。 然后我们输出主题,表示为概率分布的单词。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package sparkLDA;

import scala.Tuple2;

import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.clustering.DistributedLDAModel;
import org.apache.spark.mllib.clustering.LDA;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;

import java.io.File;
import java.io.IOException;

import org.apache.spark.SparkConf;

public class SparkLDA {

private static String HADOOP_HOME_DIR = checkHadoopHome();

// 检查spark 环境 是否设置成功
private static String checkHadoopHome() {
// first check the Dflag hadoop.home.dir with JVM scope
String home = System.getProperty("hadoop.home.dir");

// fall back to the system/user-global env variable
if (home == null) {
home = System.getenv("HADOOP_HOME");
}
try {
// couldn't find either setting for hadoop's home directory
if (home == null) {
throw new IOException("HADOOP_HOME or hadoop.home.dir are not set.");
}
if (home.startsWith("\"") && home.endsWith("\"")) {
home = home.substring(1, home.length() - 1);
}
// check that the home setting is actually a directory that exists
File homedir = new File(home);
if (!homedir.isAbsolute() || !homedir.exists() || !homedir.isDirectory()) {
throw new IOException("Hadoop home directory " + homedir
+ " does not exist, is not a directory, or is not an absolute path.");
}
home = homedir.getCanonicalPath();
} catch (IOException ioe) {
System.out.println("null");
home = null;
}
return home;
}

public static void main(String[] args) throws IOException {

System.out.println(HADOOP_HOME_DIR);

SparkConf conf = new SparkConf().setAppName("SOME APP NAME").setMaster("local[2]").set("spark.executor.memory",
"1g");
JavaSparkContext sc = new JavaSparkContext(conf);

// Load and parse the data
String path = "./sample_lda_data.txt";
JavaRDD<String> data = sc.textFile(path);
JavaRDD<Vector> parsedData = data.map(new Function<String, Vector>() {
public Vector call(String s) {
String[] sarray = s.trim().split(" ");
double[] values = new double[sarray.length];
for (int i = 0; i < sarray.length; i++) {
values[i] = Double.parseDouble(sarray[i]);
}
return Vectors.dense(values);
}
});
// Index documents with unique IDs
JavaPairRDD<Long, Vector> corpus = JavaPairRDD
.fromJavaRDD(parsedData.zipWithIndex().map(new Function<Tuple2<Vector, Long>, Tuple2<Long, Vector>>() {
public Tuple2<Long, Vector> call(Tuple2<Vector, Long> doc_id) {
return doc_id.swap();
}
}));
corpus.cache();

// Cluster the documents into three topics using LDA
DistributedLDAModel ldaModel = (DistributedLDAModel) new LDA().setK(3).run(corpus);

// Output topics. Each is a distribution over words (matching word count
// vectors)
System.out.println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize() + " words):");
Matrix topics = ldaModel.topicsMatrix();
for (int topic = 0; topic < 3; topic++) {
System.out.print("Topic " + topic + ":");
for (int word = 0; word < ldaModel.vocabSize(); word++) {
System.out.print(" " + topics.apply(word, topic));
}
System.out.println();
}
sc.stop();
}
}

sample_lda_data.txt 内容:每行数据解析为Vector的结构。其中原始数据如下,每一行表示一个文档,每一列表示一个单词,每一个元素D(m,w)表示第m篇文档中单词w的词频

1
2
3
4
5
6
7
8
9
10
11
12
1 2 6 0 2 3 1 1 0 0 3
1 3 0 1 3 0 0 2 0 0 1
1 4 1 0 0 4 9 0 1 2 0
2 1 0 3 0 0 5 0 2 3 9
3 1 1 9 3 0 2 0 0 1 3
4 2 0 3 4 5 1 1 1 4 0
2 1 0 3 0 0 5 0 2 2 9
1 1 1 9 2 1 2 0 0 1 3
4 4 0 3 4 2 1 3 0 0 0
2 8 2 0 3 0 2 0 2 7 2
1 1 1 9 0 2 2 0 0 3 3
4 1 0 0 4 5 1 3 0 1 0

输出:

1
2
3
4
5
6
7
8
9
10
11
12
D:\ProgramFiles\hadoop-2.8.1
Using Sparks default log4j profile: org/apache/spark/log4j-defaults.properties
17/12/05 14:26:07 INFO SparkContext: Running Spark version 2.2.0
略....
Topic 0: 15.089683953506297 14.750709899385281 1.5312722542015011 1.9508857699006295 15.318235757476856
12.59129347897879 7.3969781938882 6.943873430629874 2.4051185864422506 7.614176613626162 2.248599788783381
Topic 1: 4.700089259409153 6.789874452485002 1.1186095797841795 9.247805430189871 2.9482721618042618
2.19614141542458 17.961253884900074 1.2321612842281264 4.657846439828521 10.10316570751245 26.544267285383096
Topic 2: 6.210226787084551 7.459415648129717 9.350118166014319 28.8013087999095 6.733492080718882
7.21256510559663 5.641767921211726 1.8239652851420005 0.9370349737292286 6.282657678861387 4.207132925833523
17/12/05 14:24:09 INFO SparkUI: Stopped Spark web UI at http://172.30.160.143:4040
略....

完结,希望能帮助你!