Kelly O 'Malley是Databricks的解决方案工程师,她帮助初创公司构建和实施大数据管道。在加入Databricks之前,她曾在国防工业担任软件工程师,编写网络代码。她在加州大学洛杉矶分校获得了计算机科学学士学位。在科技世界之外,凯利喜欢烹饪,diy项目,并在海滩上度过时光。
本研讨会是我们的“有抱负的数据科学家数据分析入门”系列研讨会的最后一部分。
本研讨会涵盖了最流行的大数据处理引擎Apache Spark的基础知识。在本课程中,您将学习如何使用Spark摄取数据,分析Spark UI,并更好地理解分布式计算。我们将使用《纽约时报》(https://github.com/nytimes/covid-19-data)发布的数据。不需要Spark的先验知识,但强烈建议有Python经验。
你需要:注册社区版在这里并获得研讨会演示材料和样本笔记本在这里.
虽然不需要任何准备工作,但我们建议您了解基本的python知识。请观看第一部分,Python介绍以了解Python。
这是我们四部分系列研讨会的第四部分,有抱负的数据科学家的数据分析入门。今天的研讨会是Apache Spark介绍。
我只是想提醒大家之前的三次会议,三次研讨会。所有的视频都可以在我们的YouTube频道上找到,这是观看视频的链接。我也可以在演讲结束后把它们放到聊天中。第一部分是Python入门,第二部分是pandas数据分析,第三部分是机器学习。所以我鼓励每个人,如果你还没有这样做,看看这些视频。这是一个很棒的四部分系列,我们很高兴今天开始第四部分。
我知道你们有些人通过Zoom收看我们的节目,有些人通过YouTube直播收看我们的节目,欢迎大家。这只是一个加入我们的数据+ AI在线聚会小组的号召,在那里你会收到我们所有即将到来的在线聚会的通知,我们的技术会谈和研讨会,就像今天这样。这就是连接的连杆。对于YouTube,请订阅并打开通知。如果你喜欢在YouTube上观看视频,这是一个很好的资源,可以获得即将到来的在线聚会和研讨会的通知。到目前为止,我们一直在YouTube上直播我们的所有内容。所以如果你打开通知,这是一个很好的方式来获得即将到来的会议通知。
这是一些链接,这是GitHub的回购,上面有笔记本上的所有内容,这是我们研讨会系列的四个部分,实际上,还有我们在这个小组中进行的所有在线聚会和技术会谈。如果你还没有注册社区版,这也是一个链接。我想说的另一件事是我们有两个正在使用的功能,聊天,然后我们还有问答。聊天很有用,如果你有任何音频问题,我在这里放了链接,所以监控资源和一般聊天,如果你愿意的话。然后请大家在问答环节中提出问题。今天有几位助教会来帮我们回答问题。所以把这些放在问答环节,这是我们尽可能回答你们问题的最好方式。
废话不多说,我想让我们的助教和我们的老师做一个简短的介绍。Denny和Brook,你们为什么不把我们踢出去呢?
-你好,我叫丹尼·李。我是Databricks的一名开发者拥护者。感谢各位参加本次会议。-大家好,我叫布鲁克·维尼格我是Databricks的机器学习实践主管。我们的团队专注于为客户提供机器学习解决方案,无论是教授机器学习的最佳实践还是实施解决方案。
-太棒了,大家好。我叫Kelly,是Databricks的解决方案工程师。我帮助的大多是初创公司,他们通常是Databricks的新手,开始实施他们的大数据管道。
我能接手然后走吗?-当然。——太棒了。-停止屏幕共享,冷却,继续。-所以我要分享我的屏幕。我将以一些幻灯片开始,然后我们将进入一个Databricks笔记本。因此,如果您还没有注册Databricks社区版,现在将是一个很好的时机。该链接在聊天中,但它是www.neidfyre.com/try-databricks。我们将在五到十分钟内讨论这个问题。所以如果你们已经准备好了就太好了。
好的,我们从一些介绍幻灯片开始,一些背后的动机,首先,什么是Apache Spark,然后是Spark架构,所以我们可以真正关注Spark基础,Spark架构,Spark幕后。
我想从对Apache Spark生态系统的全面了解开始。除了Spark Core(我们的核心Spark api)之外,Spark上还有很多模块,这些模块使Spark成为一个非常健壮的生态系统,适用于任何和所有大数据用例。所以我们有ETL流程、机器学习、图形处理,以及向各种真正流行的数据源输入和输出数据的能力,这里的想法是,当你有大数据和需求时,Spark将能够适应。
这是Spark背后的一些历史。我认为Spark是一个很好的定义,它是一个直接的提要,Apache Spark网站,它是一个大数据处理的统一分析引擎,内置了流媒体、SQL、机器学习和图形处理模块。bob体育亚洲版这就是我们在上一张幻灯片上看到的,它的理念是统一的分析。bob体育亚洲版所以你不仅在做ETL,你也有能力做机器学习和图形处理。2009年,加州大学伯克利分校的一群博士生创建了这个程序。实际上,这些人后来创建了Databricks,这就是我们的由来。
目前,它有Scala、Java、Python、R和SQL的api。我们稍后会在笔记本上用到一些Python和SQL。这些数字,实际上,是由200多家公司的1400名开发人员组成的,我认为这个数字有点保守,因为Spark是一个开源项目。bob下载地址你可以自己去GitHub上看代码。
好了,让我们稍微了解一下Spark的工作原理。在我讲的过程中,如果你们有问题,请在问答环节提问,这样助教们就能看到。同样,任何聊天都可以进入聊天。我也会尽量保持互动,所以我有问题要问你们,我希望你们在聊天中给出答案。这上面说的是如何加工m&m巧克力豆。是的,这不是拼写错误,我们将用m&m巧克力豆做类比。让我们从这个开始。你可能还记得小时候的一个游戏,老师给你一罐软糖或m&m豆或小糖果,然后进行比赛,谁能猜出最接近罐子里m&m豆的数量,谁就能赢得罐子。
我们要做一些类似的事情,但我们要稍微改变一下规则。
你不需要猜最接近的,你必须最快地得到确切的m&m巧克力豆数量。因为我们让事情变得更有约束,你会有机会得到这些m&m巧克力豆。假设我们有一大桶m&m豆,我们想要尽快数出来,这样我们就能赢。你们能不能在谈话中说说我们该怎么做,该怎么做。
然后,我看不见聊天内容,所以助教来的时候能不能给我一些答案?
-[布鲁克]是的,所以有些人建议把它们一个一个排起来,或者数行数列,然后把两行相乘。-好吧,这真的很有趣,这肯定会给我们一个解决方案。我担心的是你是一个接一个地计数,所以你有一个连续的计数,这可能需要一段时间。
- [Brooke]是的,所以有些人已经提到了Spark方面的事情,关于在我们之间分配计数任务。-太棒了,没错,这就是我想要的。我们很快就到了那里。就像布鲁克说的,有人说,“把计数任务分给我们。”我们这样想。不要自己一颗一颗地数,让我们叫上朋友,让我们所有的朋友聚在一起,每个人都可以数一堆一堆的m&m。
这就是我想要用来讨论Spark集群的东西。因此,这是物理Spark集群的整体解剖,因为从本质上讲,Spark本身就是一种进行分布式计算的方式。因此,我们将计算能力分布到多台计算机或节点上。如果我们回避一下M&M的类比,我们有一个司机,也就是老板,也就是你。所以你在这里扮演经理的角色。你可以控制巧克力豆的数量,你可以跟踪,你可以安排时间。然后你有一些朋友为你做实际的工作。这就是Spark中所谓的worker。一个司机对应N个工人。这取决于您的工作负载。 Within a worker, the actual work takes place on what’s called an executor. And this is just a Java virtual machine. In open-source Spark, there can be more than one executor per worker. Within Databricks, this is one-to-one. So for the case of this talk, we’re just going to use worker and executor interchangeably. But, essentially, what I want you to get out of this slide is that a Spark cluster consists of one driver, who’s doing the management, and then many workers who are actually doing the compute.
太棒了,所以,是的,我们要回到M&MS,而这样做的原因是我们遇到了一个小问题。
所以我们有很多朋友,我们告诉他们要做什么,他们数了一堆m&m巧克力豆。
这里有个问题。谁能在聊天中告诉我,每个朋友都有一堆m&m巧克力豆的潜在问题是什么?我们还要做一个额外的步骤。
-[布鲁克]有人说分配工作。-好吧,没错,这是绝对正确的,这是需要发生的事情。但是为了这个,我们假设我们已经分配了所有的功。-[布鲁克]好吧,然后其他人插话说:“在你的朋友中相加。”“收集结果并结合起来。”-太棒了,这是绝对正确的,我们的每个朋友都有自己的个人点数,但现在我们需要把这些都结合起来,因为我需要把我的结果还给他们,这样我就可以赢得m&m巧克力豆了。
这又回到了Spark作业是什么。
现在我们已经讨论了一些Spark物理集群的样子,其中我们有一个驱动程序和许多工作人员在工作,我们可以分解实际的逻辑Spark作业是什么。
让我们从我们的工作开始,数m&m巧克力豆。我们要数,我们已经知道这差不多是两组工作,第一组是每个朋友数他们的m&m豆,然后第二组是我们需要得到最终结果所以所有的个人计数需要汇总并返回给我。这就是火花阶段。因此,每个Spark作业可以有一个或多个stage。这些是进一步的功的单位。我们定义阶段开始和结束的方式是需要交换任何类型的数据。让我们回到m&m巧克力豆。一旦每个朋友都数完了他们的m&m,我们就会让一个朋友做最后的汇总,因为,同样,工作人员做了,我只是告诉他们该做什么。所以当我们让最后一个朋友做最后的计数时,他或她需要从其他朋友那里得到所有的个人计数,这意味着我们在交换数据。这叫做洗牌。 Anytime there’s a shuffle, we’re going to have a Stage boundary. In this case, we only have two Stages because we have individual counts and then aggregation.
在Spark中,我们还将stage进一步分解为所谓的任务。这些都是Spark可以做的最小的工作单元,这是一个单独的计算核心或计算能力,运行在数据的子集上。
我们不需要太多
具体到现在任务被划分的位置。我只是想确保我们对Spark工作有一些解剖。现在我们要进入一个笔记本。所以我将向你们展示所有这些在实践中的样子,因为我认为这是最好的学习方式。
好的,如果你有社区版,我们现在要去那里。在我们开始之前,我会给每个人一分钟左右的时间,以确保他们都完全设置好了,然后我会详细介绍所有内容,如何导入笔记本,如何启动集群。我不想把任何人留在这里。
让我们快速浏览一下,这是Databricks社区版主页。这个页面上有几个快捷键,我们可以看看。我们要做的是创建一个集群。我们讲了一点Spark星团的样子。这里需要知道的重要事情是,要运行Spark代码,我们需要一个集群来运行它。我们的笔记本包含Spark代码,让我们创建一个集群,这样我们就可以实际运行Spark代码。在这个集群页面,我要点击创建集群。这为我们的集群提供了一些配置选项。我们不需要做任何具体的事情,我只需要给它一个名字,然后点击创建,这样这个集群就可以旋转起来了。我会花一分钟左右因为我们要做一些基础设置,但如果我快速重载,它会弹出在我的页面上。 So while our cluster is starting up, I just want to go back in and do a quick review of our cluster in general. So what we’re doing is we are setting up a local-mode Spark cluster because we’re using Community Edition. So essentially what we’re doing is we are running Spark, we’re doing this driver work, or architecture, but we’re doing it on a single local machine. What this means is that Community Edition is really great for prototyping, it’s great for students who are learning Spark. If you want to take Spark into production for a production job, you’re going to want to use some version of Enterprise Spark, like for example Databricks.
当这个集群旋转起来的时候,对我来说,它实际上已经开始了,这个绿色的小点意味着我们可以开始了,我们要导入我们的笔记本,这样我们就可以运行了。我想助教们会把这个GitHub链接放到聊天里,但这是Spark笔记本介绍的链接。这个笔记本也会在演讲后提供给你。如果你想跟随,那么请继续,现在就去这个GitHub页面。你要做的就是复制这里的URL,然后回到Databricks,我们会点击这个Workspace选项卡。这是一种文件结构我们所有的笔记本都存储在这里。所以你可以把笔记本放到这个高度。我将遵循一些最佳实践,我将把它放在我自己的文件夹中即使我是这个工作区的唯一用户。我要进入用户,我的Kelly文件夹。这里已经有一个笔记本了,为了以防万一,我把这个笔记本备份一下,我要导入我的笔记本。 So again, if I click this little drop-down next to my email address, I can just click Import. And then there’s two options, Import from File, Import from URL. I’m going to paste in that GitHub link, it’s a iPython notebook and click Import. And here it is.
我给每个人一两秒钟时间,确保我们都在这一点上,所以我们可以继续,一旦我们看完笔记本。我还想补充一点,这是一个很长的笔记本,我不希望看完整个。当我们深入到笔记本的时候,有一些更详细的分析,一些更高级的Spark api的更详细的信息。我很好地记录了它,所以我的想法是,您可以在事实发生后返回并获得文档链接,以了解更多信息并进行自己的分析。BOB低频彩
好,当我们进入这个笔记本时,就像我说的,我们创建了一个Spark集群,我们有一个Spark代码,一些Spark代码,所以我们必须告诉我们的Spark代码要在哪个Spark集群上运行。我要做的就是点击这个下拉框这里写着Detached然后我要选择我刚刚创建的集群。这就是它的全部。现在我有了一个在Spark集群上运行Spark代码的笔记本。在我们讲这个之前,我想讲一下笔记本的基本原理这是一个交互式笔记本,每个单元格都有代码。我会讲到所有的特殊命令,比如百分号是什么意思,不用担心。但重要的是,当你有一个单元格的代码时,有两种方式可以运行它。你可以按Shift-Enter,它就会运行代码,或者还有这个运行按钮,你也可以点击它,它就会运行单元格。我还想指出一点,如果你想在笔记本中添加一个新单元格,如果你把鼠标悬停在一个已有单元格的底部,有一个加号按钮。你可以点击它,它会给你一个新的单元格来写一些代码。 All right, so high-level overview of what we’re going to go through in this notebook, we already did our Intro to Spark slides, we had an introduction to what a physical cluster looks like, the anatomy of a Spark job, then we’re going to talk about a little bit of data representation in Spark, ’cause it is different than other tools like pandas and I think it’s really important to know. And then we’re gonna talk about, again, our distributed account. We’re going to look at exactly what that looks like in Spark, a little bit about how Spark does operations, and then some Sparks SQL. If we have time at the end, there’s some more Python code, that’s some more interesting analysis we can go through.
我想说的第一件事是,我已经运行了第一个单元格,这实际上不是Spark代码。这只是与文件系统的交互。这就是%fs的意思,我说,文件系统,做LS我这样做的原因是为了展示我们将要操作的数据集。这就是COVID-19的数据。每个使用Community Edition的人都可以使用Databricks数据集文件夹。你可以获得很多非常有趣的COVID-19数据。在Spark介绍之后,如果你想对这些数据做一些更有趣的分析,你可以在Databricks数据集以及其他数据集中找到它,你可以随意摆弄。我们要使用的CSV是us-counties CSV。其中包含的是《纽约时报》的COVID-19数据,按县和县划分。 If you were at our previous Intro to Data Analysis sessions, this is actually the same dataset that we use last week with the scikit-learn analysis. You might already be familiar with it.
好了,让我们来看看Spark的一些数据表示。开始的这张图有点乱,但我只想指出一些东西,我们已经有的东西和我们还需要的东西。你可以看到左边的环境,你需要一个环境来运行Spark。数据库会帮你处理的。我们可以去掉这个环境。然后是数据源。我们需要一些数据来做操作。Spark通常用于大数据。我们也为您安排好了。Databricks数据集中有一些数据。 It’s really just sitting in an S3 bucket, so that’s available to you as well. So now all we need to focus on is this Workloads section, and that’s where our actual compute and the actual Spark engine comes in. So we have Spark Core at the core, and that is the core Spark engine, the core APIs. What we wanna do is get at a higher level and look into how we can really manipulate this data using Python and SQL. So there’s two APIs that I’m going to talk about right now, and that is the RDD API and the DataFrames API. And you can see they’re kind of stacked, as in the DataFrames API does exist on top of the RDD API. So let’s talk about RDDs first. They are something really important, they’re also something I don’t want to spend too much time on, because they are somewhat of an older, deprecated API. So when Spark was created, RDDs were the original form of data representation. RDD itself stands for a resilient distributed dataset, and even the terminology is really important, because it describes how Spark data is represented at a very high level. Let’s talk about each term individually. Dataset is pretty self-explanatory, it’s a collection of data. We have a distributed dataset in that, again, a Spark cluster is multiple nodes or multiple computers that work together to process data in parallel. So what that means is we have our dataset distributed amongst those nodes. So each node can operate on a chunk or partition of data. And then, I think most importantly, is this resilient term. So what resilient means in the context of an RDD is our data is fault tolerant. This is made possible because RDDs, once created, are immutable, you can’t change an already created RDD, you have to create a new one if you wanna make a change. RDDs also keep track of their lineage, so they keep track of any operations that were done to produce the RDD data that you’re working with. And what this means is, if we lose a node or one of our workers, we can reconstruct that data and our overall Spark job isn’t going to fail. This is really important because, again, with Spark, we’re operating on big data. It’s a very heavy compute workload a lot of the time, and we don’t want to have our entire job fail if just one of our pieces of hardware fails. Something I do wanna point out about that is I’m saying, yes, you can recover from a failure, and that’s a failure of a worker. There’s also the Spark driver. If the Spark driver fails, we’re in a little bit more trouble, ’cause then our Spark job will fail if we lose the driver.
好了,现在我们讲了一点rdd,我想谈谈数据框架。所以我相信dataframe是在2015年左右引入的,它们是我建议在使用Python和Spark时使用的数据表示。它们是我们在这个笔记本上要用到的数据表示。有很多原因,为什么DataFrames应该用在rdd上面,尽管我想说,在DataFrames下面,rdd在那里。它们是有价值的,用来做底层计算,我们只是抽象它,用Spark优化它。有了dataframe,它们有更高级的api,这意味着它们更用户友好,更容易学习,但它们也更强大。我认为很重要的一点是,我们不仅有这种用户友好的模式,而且它还经过了大量优化,性能更高。在后面的笔记本上,我们会讲到为什么这是可能的。我想留给你们的最后一件事是关于为什么数据框架很重要的一些动机,这张图实际上是一个以秒为单位的计时图,它聚集成一对。我们可以在这些红色图表的底部看到,用RDD API,这是用Python所花费的时间。 It’s slow, Scala’s a lot faster. When we look at DataFrames, all four of these languages are the same time and they’re all faster than RDDs. SQL and R aren’t listed under RDDs because they weren’t actually supported with RDDs. So again, another reason to use DataFrames, they’re faster, they’re easier to use, and they provide more language options.
太棒了,我们开始吧。让我们创建第一个数据框架。我要运行这个单元格,这里我们使用的是DataFrame读取器。通过这段代码,我所做的是,这不仅仅是我的变量,这是COVID数据帧,我告诉Spark读取我的数据。这和我们之前看到的路径是一样的。这是我在读取CSV,这是DataFrame CSV阅读器。不同数据源有不同的DataFrame读取器,如果你读取Parquet或JSON格式的数据,有DataFrame读取器,你可以用。Parquet或。JSON,而不是CSV。每个DataFrame阅读器都有它自己的选项,我们马上会看一下,因为我想给你们看的是我们刚刚盲读了CSV文件,然后我做了显示,它显示了我们刚刚读入的数据。你可以看到有一些问题。这里有两个问题我想解决。 The first one is, pretty clearly, we have our header of our data is not the header, it’s the first row. We have some weird column names going on, we wanna fix that. The other problem that we have is, if I click this little drop down here, we can see that all of my columns are string-type. So I just did a blind read of my data, it just auto-assigned string. I wanna be able to do maybe some integer addition on the integer columns or maybe some plotting with this date column, so I want a more defined schema, or types, for each of my columns.
为了做到这一点,我们要做的是让我们看一下实际的Spark文档。我想让你们看看它是什么样的这样我们就能知道我们必须传递给CSV阅读器的选项是什么。我要打开这个链接。
这是最新Spark文档的主页。我想从这里开始向你们展示如何进入API文档。如果我点击API文档,然后是Python,因为我们现在正在写Python代码,那么我有一大堆包选项。我不打算寻找CSV阅读器,而是直接搜索CSV。然后你可以看到它马上弹出,DataFrame读取器和DataFrame写入器。我知道我在读一个数据框架,所以我要点击这个。
现在我能看到的是DataFrame读取器能接收的所有参数。我想让第一行是头文件我想要一个比字符串更好的模式。这实际上是这里的两个选项,使用第一行作为新列,很好,然后推断一个模式。这意味着,“嘿,Spark,对这个CSV文件的模式进行最佳猜测。”我们将把这两个都设为真,看看会得到什么。
我只是在header = true和first schema = true时运行这个,很好。我们已经可以看到我有良好的列标题,日期,县,州,FIPS,病例,死亡。然后如果我看一下列中的类型,看起来模式被完美地推断出来了。我有一个日期列,一些字符串列,和一些int列。在我们继续之前,因为我们要用这些数据做更多的工作,我想快速地过一遍这些数据中的列,这样我们就熟悉了。
这是《纽约时报》对各个县的统计。我们所拥有的是,在每一行中,一个日期,这是这一行有效的日期-时间,我们有县和州。FIPS有点奇怪,这是人口普查做的事情,为了给美国的每个县一个唯一的数字标识符。我们不会马上使用它,但它会在后面用到因为它实际上是一种很好的方法来为列提供唯一标识符。如果我们想把它和其他数据连接起来,我们可以使用FIPS代码因为它是一个县唯一的。我们也有病例和死亡人数,关于病例和死亡人数这一栏需要注意的是,“纽约时报”每天都会为每个县增加一个新行。这是截至当天的累计病例和死亡人数。因此,如果我们想要最新的数据,并且只需要最新的数据,我们只需要每个县的最新数据。
好,让我们进入一个实际的分布式计数,我们可以看看一些Spark代码是什么样子,然后我还想给你们展示Spark UI是什么样子。在编写代码进行优化和调试时,这是一种非常重要的方法。因此,我们不只是计算m&m,而是继续计算DataFrame中的行数。这里是我们的DataFrame,我们可以看到。不过,我们只显示前20行。美国有很多县,我们知道我们每天都有一个新的条目,所以我猜会有很多争吵。让我们看看我们有多少数据。在我运行这个之前,这应该和我们的m&m豆计数非常相似,尽管我们计算的是真实数据。一旦我们完成了Spark工作,我们需要多少阶段?你们能把这个加进我们要找的东西里吗?
好吧,不幸的是我看不到聊天内容。布鲁克,里面有数字吗?-[布鲁克]四,二,三,几个三。-好吧,好吧,让我们想想这个问题。实际上,让我们回顾一下m&m巧克力豆。如果我们还记得,我们的M&M计数分两个阶段进行,原因是,每个朋友都数自己的M&M然后第二阶段是总和。实际上,这个计数,即使我们是在数据帧的行上做的,我们可以把DataFrame行等同于M&M,我们有两个stage,就像我们的M&M计数一样。因为我们要做的是每个工人都要数他们自己的行堆,然后第二阶段,其中一个工人要做最终的汇总。如果我在这里运行这个count,它会以run的形式运行,你可以看到我运行了一个Spark作业。你可以看到我们有89000行,这是相当多的。 And then we can actually look at our Spark job. And this is when we’re gonna get into the Spark UI. So I can see I ran one Spark job, that’s my count, awesome. If I hit this drop-down, we can see, yes, we had two stages. And, again, what this is is the Stage one count, each worker counts their own pile of rows, and then Stage two, we’re going to aggregate each of those individual counts. This View button here is actually a link, so we’re gonna click on it. And this is what’s called the Spark UI. Within a database notebook, we provide a host of Spark UIs. So you can just go ahead and look at any of your jobs. But what we can actually see for job five, which is our count, there’s a lot of information about it and there’s a lot of tabs in the Spark UI. The Spark UI can be very confusing because it does provide so much information, so I wanna focus now on just some really important things, like right now, looking at the two different Stages. So we can see Stage five happened, and then on top of that, Stage six happened. So this first Stage here, we just know because of our knowledge of how a count works, that this is each individual worker counting up their pile of rows, not M&Ms, rows. And you can see there is only one task. Normally, we would, for a count, expect this to be a lot higher, but because our dataset’s actually very small, we only have one task.
我们的第二阶段是最后的集合。这只是一个工人在计算每个人的计数。
这里我想指出的是Shuffle Read和Shuffle Write的概念。看,我之前告诉过你,阶段边界发生在洗牌或数据交换时。这实际上是一个两步的过程。具体来说是写,然后读。所以在我们的每个第一个worker完成他们的个人计数之后,他们会做一个Shuffle Write,它所做的是让他们的个人计数可以被将要进行最终聚合的worker使用,所以最后一个worker开始他们的Stage,并进行Shuffle Read。这就是他们如何得到所有的个人数据,这样他们就可以看到这些数据。读和写是完全一样的,就像我们期望的那样。
这只是对Spark UI的一点了解。我们可以点击其中一个链接,查看更多任务信息,比如时间。我们将在大约两分钟内更多地介绍Spark UI我将向你们展示查询计划是什么样的,但这里有很多信息可供你们使用。
好了,让我们来看看一些有趣的Spark代码。我们来分析一下。我住在南加州的洛杉矶。我碰巧知道洛杉矶会有很多新冠肺炎病例,所以我只想看看我所在县的数据。然后,就像我之前说的,最近的信息在上面,所以我想按日期排序。洛杉矶县的最新信息会在我创建的DataFrame的顶部。我将继续运行这段代码。我想指出的是,这里的Spark代码,如果你熟悉类似熊猫的东西,这在语法上看起来有点不同。它仍然是Python代码。它是Python的一个特定版本,叫做PySpark。 The syntax for how you reference columns, how you run your functions is slightly different, so it might take some getting used to. But, essentially, what I’m doing here is, I wanted to sort by the date column descending. So I have the most recent date on top, and then I wanna filter for only the County of Los Angeles. And I ran that, it took 0.15 seconds, and nothing happened. There was no Spark jobs created, unlike here. So this is something very interesting. I tried to do a sort, tried to do a filter, I got no results.
这实际上是Spark的核心思想之一,如果你在pandas DataFrame上写这种代码,例如,如果你熟悉pandas,只要你做一个过滤器,如果我有一个pandas DataFrame Los Angeles的过滤器,我总是会立即得到一个输出结果。对于Spark,什么都没有发生。这是因为,在Spark中,我们将操作分为两个不同的类别,转换和操作。Spark工作的基本原理是,我们要了解为什么会这样,转换是懒惰的,而动作是迫切的。这意味着,如果我运行一个转换或者如果我在DataFrame上定义一个转换,绝对不会发生任何事情。我必须运行一个动作和/或触发我的转换。
所以我可以看出,当我运行和之前一样的代码时,因为什么都没有发生,我知道排序和过滤器都是转换。我不会让任何排序或过滤发生直到我调用action。
好,让我们来谈谈为什么转换和动作实际上存在,以及为什么Spark被称为惰性求值。所以懒惰有几个好处。第一个问题是,使用Spark,我们有时会操作巨大的数据集。如果你想做一个像pandas一样的急切过滤器,那么我调用过滤器的第二次,我们过滤了所有的代码,我们进去,工作,我们可能会遇到问题因为我们必须读入所有的数据。这可能是不可能的,这取决于我们的数据集有多大,以及我们的Spark集群的大小和我们拥有的内存量。由于懒惰,我们实际上不需要读入任何数据,直到我们确切地知道我们要对转换步骤做什么。实际上使操作本身瘫痪也更容易。我想我一直在说,我们正在研究真正的大数据。但这是为什么这是必要的,你要做的事情有点不同的能够处理字节甚至pb的数据,是当你定义转换,就像在我的例子中,我做了一个比一个过滤器,我可以做一个,然后一个过滤器,然后一个过滤器,然后一个地图在我的数据或不同的转换像这里说结束。我可以在一个工作节点上的单个数据元素上进行流水线操作这样就可以运行而不需要过滤,得到结果,排序,得到结果。 It can just all happen, happen, happen on the same chunk of data. So we’ve sped things up considerably using this method. And then I think, most importantly, is what it give Spark as a framework the option to do. And we’re going to get a little bit in how Spark does its optimizations. So there’s something that’s called the catalyst optimizer, and that is the core optimizing engine for Spark, just some Spark software packaged in. I don’t wanna get too much into what the catalyst optimizer can do and how it works within the notebook. There’s a link to a blog that has a lot of really great information behind the scenes. But I do wanna talk a little bit on how it will help us here. So the first thing I wanna mention is you have to use DataFrames or another API that the catalyst optimizer can support in order to make use of this optimization. So that’s another reason why you wanna use DataFrames or RDDs, is this is what gives us the speed-up that we were looking for on that graph earlier. So what the catalyst optimizer can do is, when we have this chain of transformations like what I did before, it was only two, it was a sort than a filter, and then I call an action, which is what we’re going to do in a second, the optimizer can look at our chain of transformations and it can maybe say, “Hold on, this doesn’t look right,” and it can make some tweaks. It can do everything from rearrange the order that things have happened in to choose a more performant version of sort. And this just happens behind the scenes for you, so the actual code that executes might be a little bit different than what you write. So I wanna talk at a high level about what the optimizer does. It essentially it takes a look at your code, it parses it, says, “Okay,” in my case, “You did a sort and then a filter,” it tries to optimize that if it can, and then it comes up with a physical plan of how it’s actually gonna do that on the hardware. And then it just does that. So you can see here, it takes in the DataFrame and then it takes in whatever you’re doing to that DataFrame as input. I do wanna briefly mention the existence of a dataset. So what a dataset is, is it’s another high-level API that you have access to in Spark. The main difference behind the DataFrame and a dataset being is that a dataset is a typed digital representation, as in you need a typed language to actually use that. This is a bit of a generalization, but it’s essentially just a typed DataFrame. Because we’re using Python, Python is not a strongly typed language, we can’t use datasets. So as far as we’re concerned for this talk, DataFrames are our option. And when you’re using Python, DataFrames are the best option to use.
我说了很多,我做了排序,我做了筛选。当我做一个动作时,它会触发。让我们做一个动作。
这里我有和之前一样的代码,我做了排序,做了筛选,显示了一个动作。你可以看到我实际上运行了一个Spark作业,这很好,这正是我想要的输出。我把最近的日期放在上面,这些都是洛杉矶的,它起作用了。我看到了很多聊天的片段。我看不见聊天内容,所以我不知道人们在说什么。我想知道是否有人指出了我的坏代码,因为我这里确实有坏代码。这不是最优的方法,我想告诉你们优化器是如何修复一些错误的。所以我实际上做的是,我说,“好吧,我只想要洛杉矶。"我想把最近的日期放在最上面"但我线性写这段代码的方法是先排序,再过滤。如果我们想想这意味着什么,我让Spark做的是,“好吧,这是你所有的数据。 “Sort all of it, and then throw out a bunch of it.” If we’re working on big data, that would be really inefficient because I would probably wanna throw out a lot of my data and then do the sort so I’m sorting less data. So we can actually see the catalyst optimizer realize this and fix it for me. So I’m gonna show you how to do this. If, again, I click on this drop-down, I click on View next to my Spark job, there’s this associated SQL query. It’s SQL query 12. If I click on the link, I can see exactly what’s being done, and this is the final physical code that’s run.
在Details选项卡下有很多信息。我不想全部讲一遍,但我想给你们看的是优化器说“嘿,这不好”的确切点。所以如果我们从底部开始读我所做的是,“好的,排序,然后过滤。”
逻辑计划是,再次强调,一旦我们优化了逻辑计划,就会进行排序和筛选。一旦催化剂优化器执行优化步骤,你可以看到过滤和排序被翻转。所以实际上发生了什么,即使在我的数据上,我说,好,排序,然后筛选,Spark代码,先运行过滤,然后排序。所以即使我写的代码不是最优的,优化器也会帮我处理。
很好,现在让我们来谈谈Spark SQL。而且我认为我们完全按照计划进行。我想展示一些SQL代码,以及如何使用SQL来查询Spark dataframe。这是一种符合ANSI标准的SQL语言,所以如果您习惯使用SQL,那么对您来说应该非常熟悉。我们不会写复杂的SQL,所以不用担心。我要运行的第一个单元格是createOrReplaceTempView。这就是我们的coviddataframe,它是用Python创建的。这意味着我们只能使用Python来使用它。为了创建可以使用SQL查询的东西,我们将使用createOrReplaceTempView函数和我们希望能够查询的SQL表的名称。它所做的就是在Python创建的DataFrame之上创建一个临时视图,然后我们可以使用SQL与之交互。 The name I gave it is “covid.”
太棒了,我要指出的第一件事是顶部的%sql命令。我们在Python笔记本上运行,这并不意味着我们不能混合和匹配语言。我们要做的是%sql,它会告诉我们下面单元格中的代码是sql代码。现在我有了这个,我可以从COVID运行SELECT *。它所做的就是,给我COVID表中的所有东西,*就是所有东西。我可以看到这看起来和我们最初打印数据集时看到的很熟悉。同样的数据,只是格式略有不同,因为在Databricks Notebooks后台,我们会使用SQL查询,它会使用不同的方法来显示数据这实际上给了我们更多的功能。我们也可以用Python显式地做到这一点。我只是想让你先展示一下。这叫做显示,我之所以要用SELECT *来显示是因为我想创建一个图。 Because we have date data, we know we have updated counts, as dates go. It will be really great to have some sort of view over time on what our cases look like. This is all of our data, but next, we’re gonna filter down by county. And that’s actually why I have this little comment here, with keys, grouping, and values, because what we’re going to do is we’re going to use the UI to create a graph. So on this little drop-down, that’s where is this little bar chart and then a little down arrow. We’re gonna click Line. This is not a useful line chart to us, so we’re going to do some options so we can behind-the-scenes create our graph. So if I click on Plot options, I can see there’s some keys, some value, series groupings. This is not what I want. I’m just going to get rid of them. Then now what I’m going to do is drag in the keys, grouping, and value that I do want to create my graph. And what this is going to do behind the scenes, is this is actually going to kick off a Spark job with all of these groupings. And you could write a SQL query again to do the exact same thing and then plot it yourself. So it’s just kind of a shortcut for us. So like it said in the comment, you want date to be along the bottom, you want time series, you wanna group by county, and then I want my values to be cases. And then if I click Apply, it’s running a Spark job, running a couple of Spark jobs, actually. It could be doing quite a lot of groupings. And then once this is complete, what we’ll be able to see is a graph, a line chart, actually, by county. So if I expand this a little bit, because we have so many counties, we’re only showing the A names. But we can see over time, if we hover, Adams County, wherever that may be, seems to be trending up the most right now.
好吧,就像我之前说的,我只想过滤到洛杉矶,我住的地方。这是和之前一样的代码,从COVID表中选择所有内容,不同的是,我想要County, Los Angeles。请随意将其替换为你所居住的任何国家,同样的想法。太棒了,这是洛杉矶独有的。让我们把它画出来,这一次,我要把病例和死亡画在同一个图表上。点击直线,绘图选项,进行序列分组。同样,我们以洛杉矶为分组。我们来看病例和死亡,有点病态。
我们可以看到,随着时间的推移,它呈急剧上升趋势,这让我不太高兴。但我希望顶部的部分开始趋于平稳。但是这样你就可以看到你所在的县或者你住的地方,这是一个很好的分析开始,只是一些简单的绘图。
在回答任何悬而未决的问题之前,我想给你们展示的最后一个SQL查询,我会快速回顾一下我们笔记本上剩下的内容,这是一个有点复杂的查询。这是按顺序分组而不是主要依靠分组和绘图选项来为我们做。这里我要做的是,因为数据集中的每个日期,每次都有更新,我要为每个县获取最大值。这就是它的作用,我要按县分组。对于每个县,我想计算最大病例数和最大死亡人数。然后我要做的是,我只需要有最多病例的县。所以我要按县排序,不好意思,是按最大案件数递减排序,所以最多的在上面。所以我现在应该有一个数据框架,其中有一个县,最大病例数,以及该县的最大死亡人数。这是有序的,所以我可以看到纽约市实际上是受灾最严重的。这是约克郡的另一个县,库克县是芝加哥所在的县。 So if I wanna view this visually, all I really have to do is click this little bar chart option at the bottom, it’ll just automatically create, it makes a graph for us, and we can really visualize how many cases New York City has with this.
好了,我就不讲这个笔记本的其他部分了,因为它有很好的记录,我希望每个人都能进一步看一下。但本质上,它所做的是,它为您提供了一些如何开始使用Spark进行分析的想法。它做的是,从互联网上提取一些人口普查数据,使用我之前指出的FIPS代码进行连接,然后对人口发病率进行一些有趣的分析。所以如果你们有时间的话,这是非常棒的。还有,我试着用各种各样的Spark api来指出一点可以用Spark做的各种各样的事情以及你能想到的不同类型的分析。
太好了,我们还有五分钟的时间。布鲁克或丹尼,你觉得有什么有趣的问题能让大家都听听吗?-[布鲁克]是的,当然。其中一个是关于什么时候使用pandas和Spark的高级概念。-是的,这是个很好的问题。这是我从那些更熟悉熊猫的人那里听到的,那些更熟悉传统Python的人。pandas DataFrame和Spark DataFrame的主要区别在于Spark DataFrame是分布式的。所以当我们看熊猫和我们看到的所谓的单节点计算或者只是做所有的工作在自己的笔记本电脑,这是当你可以使用类似的熊猫可能是所有适合在内存的数据,实际上是告诉我有一天,如果你在做计算熊猫DataFrame,你要确保你的熊猫DataFrame大小明显小于计算机内存的大小,这样你就可以确保你可以做计算。但如果你正在处理大数据,而这些数据可能无法放入一台计算机的内存中,或者你的熊猫计算无法工作,那么Spark就可以派上用场了。这就是为什么你能够对大tb的数据甚至是几gb的数据进行计算。 Your computer’s running slow, there’s a lot of data, you Spark. I do wanna quickly shout out another open-source project, it’s called Koalas, named because it’s another cute, fuzzy animal. And what Koalas is, is it’s actually a way to write pandas code using the pandas syntax, because like I mentioned earlier, pandas syntax is a little bit different from Spark syntax. So Koalas is a bridge between pandas and Spark. Behind the scenes of Koalas, Spark code is running, but the syntax, it’s the same as pandas. So if you’re very familiar with pandas, you wanna move into Sparks, I would take a look at the Koalas project. And I also know, but I think maybe early June, there’s gonna be a workshop on Koalas.
- [Brooke]很好,还有另一个问题,Spark DataFrame API提供了这么多优化,还有空间让程序员进一步优化他们的代码吗?-当然,有很多事情可以做。我没有讨论分区或如何设置Spark配置,但是有很多关于shuffle如何完成的思考,当你想选择使用shuffle时,因为这是一个数据交换,你想如何分割数据是很昂贵的。当你深入Spark时,你就会更加熟悉,何时何地你想要优化,何时你想后退一步,让催化剂优化器完成它的工作。- [Brooke]还有一件事我想补充的是,很多优化也与如何在磁盘上布局数据有关。所以它不一定要改变你的Spark代码,改变你的Spark配置,但它会查看我如何在磁盘上物理分区我的数据。有些人会问Spark有什么不同,开源Spark和Databricks的Spark有什么不同?-是的,这是个很好的问题。这是Databricks看待事物的方式,任何可用性所需的API都是开源的。Databricks所做的就是添加一些额外的特性来提高性能,使其更易于使用。 So like you can see here with this notebook, we provide a fully-hosted Spark environment, and that’s why I didn’t have to get into behind-the-scenes exactly how we’re going to wrangle a cluster together or even talk about a resource manager, because that’s something that Databricks will do all for you. So even though the Spark running on Databricks is open-source Spark with some cherries on top, it’s really the hosted environment, these notebooks, all the other features that differentiates Databricks. – [Brooke] Great, and then there’s a question about machine learning integration with Databricks. Let me know if you want me to take this or if you wanna take it. – I’m actually gonna let Brooke take that, because Brooke is the machine learning practice lead. – [Brooke] (laughs) Sure, I’ll take this one. So the question was about what’s the road map from the machine learning modules and Spark? They’re referring to MLlib, and are there other type of integrations you can do with Spark, taking bench with tools like NLTK, TensorFlow, et cetera? So when we talk about MLlib, that’s the umbrella term we use to refer to Sparks distributed machine learning library. There’s two different machine learning packages, technically, there’s MLlib, that’s the old one, based off of RDDs, and Spark ML, which is the newer one based off of DataFrames. So Spark ML allows you to train models on massive amounts of data, but it does have a few drawbacks. Notably, it doesn’t have all of the same algorithms that are supported that scikit-learn does. It doesn’t have any deep learning modules like TensorFlow. But you can still take advantage of Spark for machine learning by being able to do things like model inference at scale. You can load your model onto each of your workers and then do inference across your data. You can also use Spark for things like distributed hyper perimeter search, so you’ll build a different model on each of your workers, and then you can use a tool like Hyperopt to then evolve your hyper parameter search for each subsequent models. So even if you are building single machine models or even if you are using pandas, you can still take advantage of Spark.
-[凯利]谢谢,布鲁克。- [Brooke]是的,然后其他人问我们能不能再开一个关于Spark优化的网络研讨会?这是个好建议,我们把它放到队列中。Spark峰会上也有很多关于这个问题的精彩演讲。-是的,这是一些东西,如果你只是搜索Spark Tuning,旧的Spark Summit演讲都可以在YouTube上找到,有一些非常棒,非常深入的Spark Tuning演讲。
我想我们现在可以回答其他问题了。凯伦,你能帮我们收拾一下吗?
-当然,实际上我只是在找参加Spark峰会的链接,我刚刚在聊天中放了这个链接。今年是免费的虚拟课程,所以我强烈鼓励每个人都能加入。
太好了,非常感谢Kelly,谢谢Brooke和Denny。感谢大家的收看。通过Zoom加入后,你会在24小时内收到一封带有录音的电子邮件,它链接到其他一些提到的资源。再说一遍,视频可以在YouTube上看到,所以我也会在邮件中提到这一点。谢谢大家的加入,