使用 .NET 5 体验大数据和机器学习 (3)

DocRepoParser 项目以递归方式遍历存储库中的子文件夹,以收集各文档有关的元数据。Common 项目包含几个帮助程序类。例如,FilesHelper 用于所有文件 I/O。它跟踪存储文件和文件名的位置,并提供诸如为其他项目读取文件的服务。构造函数需要一个标签(一个唯一标识工作流的数字)和包含文档的 repo 或顶级文件夹的路径。默认情况下,它在用户的本地应用程序数据文件夹下创建一个文件夹。如有必要,可以将其覆盖。

MarkdownParser利用 Microsoft.Toolkit.Parsers解析 Markdown 的库。该库有两个任务:首先,它必须提取标题和子标题;其次,它必须提取单词。Markdown 文件以 "块 "的形式暴露出来,代表标题、链接和其他 Markdown 特征。块又包含承载文本的“Inlines”。例如,这段代码通过迭代行和单元格来解析一个 TableBlock,以找到 Inlines。

case TableBlock table: table.Rows.SelectMany(r => r.Cells) .SelectMany(c => c.Inlines) .ForEach(i => candidate = RecurseInline(i, candidate, words, titles)); break;

此代码提取超链接的文本部分:

case HyperlinkInline hyper: if (!string.IsNullOrWhiteSpace(hyper.Text)) { words.Append(hyper.Text.ExtractWords()); } break;

结果是一个 CSV 文件,如下图所示:

第一步只是准备要处理的数据。下一步使用 Spark for .NET 作业确定每个文档的字数,阅读时间和前 20 个术语。

构建 Spark Job

SparkWordsProcessor项目用来运行 Spark 作业。虽然该应用程序是一个控制台项目,但它需要 Spark 来运行。runjob.cmd批处理命令将作业提交到正确配置的 Windows 计算机上运行。典型作业的模式是创建一个会话或“应用程序”,执行一些逻辑,然后停止会话。

var spark = SparkSession.Builder() .AppName(nameof(SparkWordsProcessor)) .GetOrCreate(); RunJob(); spark.Stop();

通过将其路径传递给 Spark 会话,可以轻松读取上一步的文件。

var docs = spark.Read().HasHeader().Csv(filesHelper.TempDataFile); docs.CreateOrReplaceTempView(nameof(docs)); var totalDocs = docs.Count();

docs变量解析为一个DataFrame。Data Frame 本质上是一个带有一组列和一个通用接口的表,用于与数据交互,而不管其底层来源是什么。可以从其他 data frame 中引用一个 data frame。SparkSQL 也可以用来查询 data frame。你必须创建一个临时视图,该视图为 data frame 提供别名,以便从 SQL 中引用它。通过CreateOrReplaceTempView方法,可以像这样从 data frame 中查询行:

SELECT * FROM docs

totalDocs变量检索文档中所有行的计数。Spark 提供了一个名为Split的将字符串分解为数组的函数。Explode函数将每个数组项变成一行:

var words = docs.Select(fileCol, Functions.Split(nameof(FileDataParse.Words) .AsColumn(), " ") .Alias(wordList)) .Select(fileCol, Functions.Explode(wordList.AsColumn()) .Alias(word));

该查询为每个单词或术语生成一行。这个 data frame 是生成术语频率(TF)或者说每个文档中每个词的计数的基础。

var termFrequency = words .GroupBy(fileCol, Functions.Lower(word.AsColumn()).Alias(word)) .Count() .OrderBy(fileCol, count.AsColumn().Desc());

Spark 有内置的模型,可以确定“术语频率/反向文档频率”。在这个例子中,你将手动确定术语频率来演示它是如何计算的。术语在每个文档中以特定的频率出现。一篇关于 wizard 的文档可能有很高的“wizard”一词计数。同一篇文档中,"the "和 "is "这两个词的出现次数可能也很高。对我们来说,很明显,“wizard”这个词更重要,也提供了更多的语境。另一方面,Spark 必须经过训练才能识别重要的术语。为了确定什么是真正重要的,我们将总结文档频率(document frequency),或者说一个词在 repo 中所有文档中出现的次数。这就是“按不同出现次数分组”:

var documentFrequency = words .GroupBy(Functions.Lower(word.AsColumn()) .Alias(word)) .Agg(Functions.CountDistinct(fileCol) .Alias(docFrequency));

现在是计算的时候了。一个特殊的方程式可以计算出所谓的反向文档频率(inverse document frequency),即 IDF。将总文档的自然对数(加一)输入方程,然后除以该词的文档频率(加一)。

static double CalculateIdf(int docFrequency, int totalDocuments) => Math.Log(totalDocuments + 1) / (docFrequency + 1);

在所有文档中出现的词比出现频率较低的词赋值低。例如,给定 1000 个文档,一个在每个文档中出现的词与一个只在少数文档中出现的词(约 1 个)相比,IDF 为 0.003。Spark 支持用户定义的函数,你可以这样注册。

spark.Udf().Register<int, int, double>(nameof(CalculateIdf), CalculateIdf);

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wspssj.html