如何进行调整Apache火花在大型集群的应用程序

下载幻灯片

Omkar Joshi提供概述在超级性能的挑战是如何解决在推出其新建旗舰摄取系统,马尔马拉(开源)摄入来自不同数据源的数据就像卡夫卡,MySQL,卡桑德拉,Hadoop。这个系统是在生产和推出已经运行了一年多了,有更多的摄取系统机载之上。Omkar和团队大量使用jvm-profiler在他们的分析给他们宝贵的见解。这个新系统是使用火花框架构建数据摄入。这是为了摄取数十亿卡夫卡消息/每30分钟主题从成千上万的主题。的数据量由数百TBs订单的管道。在这个规模,每个字节和毫秒保存。Omkar细节如何解决这样的问题和见解的优化已经完成生产。

一些关键的亮点是:

  • 如何理解你的瓶颈在火花应用程序中,缓存来缓存你火花DAG避免重读你的输入数据
  • 如何有效地使用蓄电池来避免不必要的火花的行为
  • 如何检查你的堆和非堆内存使用量在数以百计的执行者
  • 如何改变你的数据保存的布局长期储存成本
  • 如何有效地使用序列化器和压缩来节省交通网络和磁盘吗
  • 如何减少应用程序的摊余成本通过多路复用的工作。

他们使用不同的技术来减少内存占用,正在运行的应用程序运行时,磁盘使用情况。在储蓄方面,他们能够显著(- 40% ~ 10%)减少内存占用,运行时和磁盘使用情况。

看更多的火花+人工智能会话

免费试着砖

视频记录

下午好,每个人。欢迎来到今天的讲座关于如何性能调优火花在大型集群的应用程序。在我们开始之前,我想从这个演讲的背景下会发生什么。这不是讨论如何设置配置火花引发应用程序,这样您就可以开始跑步。所以大部分的火花配置,你可以在网上找到它。这是谈论当我们建立了马尔马拉乳房时,这是一个通用系统摄入。从任何来源摄取数据。我们做了一些不同于现成的当你谷歌之外。这差不多就是我们将要讨论在这个演讲。我有演讲分为四个不同的改进。 And, let’s look at them.

这是简短的介绍我是谁。我目前在网飞公司的软件工程师。在那之前,我是一个软件工程师在乳房,我架构和作者马尔马拉的地方。在此之前,我曾与一家名为Hedvig软件定义存储公司,我架构对象存储和NFS的解决方案。

这是今天的议程。我将简单介绍JVM分析器,Hadoop分析器,引发听众,自动调整。然后其余的说我会更加关注四个不同的改进,主要是存储、CPU运行时,记忆效率和改进。

JVM分析器:分布式

为什么分析火花工作是非常重要的?为什么这是非常具有挑战性的?所以分析一个简单的独立的Java应用程序是一个非常简单的问题。因为你有应用程序运行所有的时间你可以连接远程调试器或甚至可以使用不同的分析技术分析单个API或特定的类或部分。与火花主要是因为它变得非常具有挑战性,首先它运行在成千上万的执行人。和第二个是如果你看过火花应用程序运行在生产,那么你知道火花工作分为多个火花阶段。和每个火花阶段执行不同的核心部分。它的意思是,当你想要知道,喜欢,好吧,如果我火花的工作是运行很好,还是有改进的余地?我不知道,因为因为它是跨越这么多火花阶段,我不知道这火花阶段后我应该有改进的余地。当火花团队Uber建造JVM分析器,这样所有的火花开发者可以调试火花应用和性能调优。 This JVM profiler is available online.

这是一个开源和提bob下载地址供的团队已经在先前的火花峰会。如果你感兴趣的话,请去看看他们的视频。

Hadoop分析器(回顾)

在高级别上,什么是Hadoop分析器,它基本上是一个Java代理。所以当你定义一个火花发射在额外的参数,你可以提供一些配置上列出他们的网站。一旦你提交一个火花工作,实行出现时,谈到了这个Java代理。这个Java代理可以让你做一些分析,首先是一个CPU内存。所以当我们说内存,它也告诉你记忆的速度。假设你使用一些x GB的内存堆,剩下一些x GB的内存你的办公室。同时,它会给你进一步信息,如假设如果你使用直接字节缓冲区或如果您使用内存映射缓冲区,然后给你非常详细的关于记忆的信息。这是具体的时候我们使用计算内存使用情况马尔马拉工作。稍后我将讨论在这个演讲。第二件事是持续时间分析的方法。 So let’s say you have some API calls which you are making to NameNode or DataNode, and you want to profile them, Hadoop profiler is the way to do it. And the third thing is method arguments profiling. So let’s say if you are making some remote API calls, and you are passing some arguments, Hadoop profiler lets you profile them as well. Once the information is collected in the Java agent, it needs a way to report them. So the default reporters are Kafka and InfluxDB. At Uber, we use Kafka. But if you want to add your own reporter, it’s open source so feel free to go and add your own.

火花侦听器

引发听众,我们使用的是超级的原因各不相同,一个用例来检测是否有节点是缓慢的,因为我们说,有一个光盘或网络所面临的问题。所以我们要确定哪些任务运行在实行,可能杀了那个任务或可能实行的黑名单。这是一个用例。第二个用例是任务分析。假设你有一个火花工作运行但现在你想知道发生了什么,哪个任务运行实行?花了多少时间?我有办法改变火花调度逻辑,这样我可以在别的地方上运行这个任务吗?这所有的信息你可以使用火花捕捉听众。所以,让我为我们所做的设置上下文的一部分自动调整。所以通常情况是数据科学团队不断火花。 And that is what they use for running all their iPhones, Blackberries. So what we saw as a typical pattern at Uber is, usually they will have like a default Spark configuration. So when I say default configuration, I’m talking about memory and number of executors they’re using to run the query. It runs fine till they start accessing bigger hive tables, and that’s when it starts breaking. So usually they tune that Spark configuration by adding more number of executors or adding more memory to it. The problem is, once that bigger table query starts running, they check in that configuration into their central repository. The downside of that is, it now gets used in all hive on Spark queries which they are running against our production hive cluster. So let’s say previously, they were using only 500 executors to run all their hive queries. Now suddenly they need 2000 or 2500 executors. The problem which we noticed is they actually don’t need those many executors to run all those queries. Only couple of queries need those many executors. So in order to save money for company, what we did is we basically looked at the historical pattern of those queries and analyze that okay, when they specify, let’s say, 500 executors with 16 GB of memory, do they really end up using that? If not, then there is a room for improvement. So looking at the historical pattern, we adjusted the number of executors and memory to see if the query finishes, if not, we will always fall back to the configuration provided by them. What this helped us is, this helped us a lot by reducing the Yarn queue usage footprint and thereby saving money for Uber. So all these Spark improvements we did when we were building Marmaray. So Marmaray is an open source project. It was open sourced in Spark in September 2018. I have added a GitHub link so if you are interested, please go and check it out. There’s also a link to the blog Post. So please check out the blog Post.

高级体系结构

这是马尔马拉的高级体系结构。我不会太多的细节。外有许多演讲我们给了我们详细讨论它。唯一我想强调的是,整个马尔马拉摄入框架应该摄取数据从任何来源,它是建立在火花。所以我们使用火花作为死刑室。

火花工作

现在让我们来谈谈火花工作改进。

首先,我将讨论存储改进。

有效的数据布局(铺)

整个分析数据在拼花超级存储格式。拼花,大家都知道,是一个柱状数据格式。这意味着假设你有100行,你要储存在一个桶里。假设每一行有4列,然后拼花的数据存储的方式,对所有100行返回一个数据列第一,紧随其后的是两列。所有几百列两个数据行,然后列三四个列。所以我们看不同类型的压缩我们写入数据时得到拼花。有两种类型的按压。一个是柱状压缩,另一个是时髦的压缩。时髦的或您选择的任何压缩类型。可能是Gzip, Brotli或别的东西。

我们观察到的是,因为这是一个柱状存储格式,得到更多的利益,如果你利用柱状压缩,而不是使用不同的压缩类型。

第一个名字

让我们看看这个例子。这里我们要拼花中存储用户信息。如你所见,我们总五列,用户ID是一个单位的标识符。这是永远不会是唯一的。现在假设,如果我们被用户ID排序的记录和存储,在拼花。你可以看到在上面的表中,所有五列没有任何共同之处。如果你写这拼花,你不会得到任何柱状压缩。你将得到的压缩偏差假设如果你使用时髦或Gzip。假设5或10%的压缩是你会得到什么。但是如果你同意或重组这些数据,我已经显示在以下表中,让我们来看看你是否这些行由城市或国家,然后是所有的用户,会发生什么谁是同一个城市的一部分。 Now what will happen is, let’s say if you want to store the bottom table into a parquet, then column three and column four have identical values. So in the upper table, if you are trying to store in parquet, you would have ended up storing roughly about 10 values, five for city and five for state, but if you are storing it in a sorted format, since they are identical, you will only store two, one for city which is Atlanta city and one state which is Georgia. So, here you can see that you will get a lot of compression by sorting the records by city and state. So one thing I would like to highlight here is, so whenever you want to decide what sorting criteria you should be using. Take a look at your cardinality of individual columns. So it has worked the best for us, which is if you know the cardinality of the columns sort the records with increasing cardinality. That’s where you most likely are going to get better compression. But always try different combinations because you can have a column which has, let’s say, a textual data, and it’s a longer length. So it’s taking more storage space. So if you start with that, as against sorting by a byte, you will most likely be getting better compression.

CPU /运行时

现在让我们来谈谈CPU和运行时。所以在本节中,我们实际上做了很多改进。

定制的火花蓄电池

所以我们先谈谈引发蓄电池。我们可以看到,引发大量使用蓄电池。所以如果你要引发UI,你看到的所有指标,填充在幕后,有各种长蓄电池。

因此传统上,这是像很多人使用智能手机。但是在这长蓄电池火花框架提供了一个叫做蓄电池V2,你可以看到在右边。这真的蓄电池V2允许您定义自己的自定义蓄电池。这可以是一个非常强大的工具。这是一个例子,你可以使用蓄电池来解决一个真正的业务用例的同时,减少运行时间,以及节省一些资源。所以我们试图解决的用例是假设有一组附加记录,和你想要复制这些记录。在做重复数据删除,你要算出有多少重复你看到每个州。这就是我们的目标。让我们开始。我现在关注的左边部分幻灯片。 So first, we are trying to read the records from parquet. Then, before we do the duplication, we will try to find out the records of our state, I’m talking about a very nice way. Then, we will try to dedup the records which is step three. Step four, after you’re duplication, we want to find out how many records we have per state. Step five, the deduplicated records, we will write it to HDFS. Step six, since we have two counts, one after the deduplication and one before deduplication, so if we take a diff we’ll know how many duplicates we have per state. So let’s say if you run this per job, you’ll see that there are five Sparks stage and three of them are coming from count biking. Which is not required, and it’s definitely something we can improve upon. So let’s look at the right side now. So, what we have done is we have defined a custom accumulator called rights per state accumulator. And what it takes is basically a map of string comma long. So string here is the state ID. And long is basically number of rides per state. What we will do is, we will define two such accumulators one for rides before state and one for rides after. Sorry, rides before dedup and one for rides per state after dedup. What you’re doing here is, if you look at the argument which is passed to dedup function in line three, we are just chaining a map operation. And as you know map is not a Spark action. It’s a lazy evaluated core. So you’re just chaining some extra functions to be executed for every record whenever this dot gets executed. So, you’re adding such one map function and we are saying that for every record increment the counter based on its state ID. After your duplication is done, we are chaining one more map operation, where we are trying to increment the count for a rights per state after dedup accumulator. Finally, we are sending that for writing to HDFS. So here in this entire Spark dag write to HDFS is the only action. Once that gets executed and has finished on the driver you will have basically two accumulators populated with respective counts. So once the action has finished, you can take the diff and you will know that, how many rides have duplicates per state. The good thing is if you run this code, you will only have two Spark stages, and it will do exactly the same what it did on the left side. So, the good thing is like you will be able to save a large number of executers runtime and thereby you will be able to save cost. Let’s look at the other example.

ncreasing卡夫卡读并行性

所以,我们有一个摄取系统,是Hadoop摄取数据从卡夫卡。我所说的情况,我们有一个主题256个分区。就为这个假设的用例,它每运行300万条消息。问题是卡夫卡,我们使用火花流从图书馆图书馆阅读卡夫卡和火花流有一个固有的缺点。这是它试图销卡夫卡分区一个特定的遗嘱执行人。这意味着,假设你想读从相同的卡夫卡分区从两个执行人不允许你这样做。它基本上试图序列化的操作当你想读卡夫卡分区相同。问题是,我们有很多我们想做的操作数据,我们以前读卡夫卡写作HDFS。像分类记录,集团由一些订单操作的数量,对吧?所以,为此,我们不能使用256执行订单,我们想要使用更多的执行人。 So typically, like this is what is done in industries. You read from Kafka and then you shuffle. So, you basically have a repartition stage right after that, and you send the data to as many executors as you have and then you get more parallelism. The problem with this approach is, as you can see, in order to do repartition, what you are basically doing is you are doing a shuffle write which is around 1.3 terabytes, and it is followed by a rematch shufflering, which is around 1.3 terabytes. So there are two problems, one, definitely CPU cycles are wasted in doing a shuffle write and shuffle read. And the second problem is that you are also wearing out disk. So, if you do this frequently, then you are basically trying to reduce the lifetime of disk. So you have to replace the disk very frequently. So, this was a problem and what we did basically is, we try to remove this restriction from Spark streaming library, and we basically split one Kafka partition into 32 virtual partitions. We made this configurable, so you can define how many virtual partitions you want to create for every Kafka partition, but what that let us do is, we can now read from more than one executor, same data partition. So, think of virtual partition as one Kafka partition, let’s say was, we were reading earlier 400 messages. Now, what we are doing is we are saying one virtual partition is read from Kafka partition one offset 0 to hundred second virtual partition, will try to read it from 101 to 200, and so on and so forth. So in order to achieve that, this is what we did. So if you look at the Spark streaming library code, then there is something called as Kafka RDD and in that there is a get preferred locations and that’s how they actually pin Kafka partition to executer inside override that feature.

Kryo序列化

所以提高火花性能的另一种方法是通过使用Kryo序列化。如果你没有使用它,我会强烈推荐它。有很多优点与Kryo是它有一个较小的内存占用比Java序列化器它更快,它支持自定义序列化器。不过有一个问题与Kryo我们固定在内部引发构建。它不是开源但很快我们将这样bob下载地址做。这是当你设置一些Kryo火花的具体配置驱动程序。他们今天不传播到所有的执行人。这意味着,如果你看一下火花UI,你会看到我所有这些Kryo配置但实际上他们不习惯。这就像我们观察的东西。我们在调试Kryo序列化流流马尔马拉当我们性能。 So there are some things I have highlighted here. One is, let’s say if you are using a Spark with Avro genetic recording, then it’s recommended that you register your Avro schemas, because that helps you really, reduce your Kryo serialization string size. I will be talking about that in the next slide. And let’s say if you are using some custom classes, then it’s highly recommended that you register those classes using Spark Kryo classes to register. And if you’re not sure what classes I should be registering, just turn on the registration required configuration. In the logs it will start spitting out okay, this is the class I’m not able to find that we basically you can do some N iterations and add all the classes in (indistinct) So now let’s look at the Kryo serialization stream without schema added into the config and with schema added into the config. So on the left side we have not added required schemas into the config. So this is how the Kryo serialization stream will look like. So what Kryo does is, it will write schema for every record written to the Kryo serialization stream. So let’s say you’re writing that to disk or sending it to a network. This is how your serialization stream will be. So if your schemas are big, which is most likely guess for us, then your record size will be around 4248 bytes. But let’s say if you register the schema, then you are just writing a schema identifier which is four bytes. And let’s say your data remains the same 128 bytes. So then you are basically reducing your record size from 4000 bytes to around 132 bytes. So this is like a huge saving you can get. And this actually helps in our disk and network.

ser / des时间减少重组负载

另一个改进,我们做的是通过重组我们的负载。让我们看看左边。我们有一个典型的火花载荷,有一些自定义元数据信息。还有一些额外的负载。线6是一个定制的负载,这是一个地图,一个字符串作为键和值通用图标。我把它在这里的原因是假设这有假设100 - 1000键好。火花的有效载荷是用于多个引发操作。所以我们喜欢,我们读它从某个地方然后我们正在做,我们在做,我们在做集团由堆操作订单我们正在做的事情。最后我们需要此数据,当我们写作HDFS我只是这个用例。

这种负载的问题是当你使用火花载荷在任何操作你的火花,火花反序列化整个Java类。假设这张地图有几千个条目。假设这个通用的记录都有一个超级复杂的模式。会发生什么火花会花很多这个对象进行序列化和反序列化的CPU周期。这不是必需的,比方说,如果你只是在此用例中,如果你只是看排序键,然后更有意义序列化这张地图,这是我所做的在右边。假设如果你有像这样的东西,它在内部存储在一个序列化格式,所以您可以添加一个字节数组。优势,每当火花负载由火花反序列化,它要做的就是从磁盘读取二进制数据并把它。所以CPU周期的数量在这个方法比花在左边。

并行火花的迭代器

另一个改进,我们做的是麻痹引发迭代器。让我们看一个例子。我添加了一个非常简单的例子,我们有一些地图分区函数提供你一个迭代器记录,我们只是做当我。接下来,拼花点写作家。基本上,一旦我的记录,然后我要叫拼花作家或关闭。这是一个简单的写作方式记录拼花。现在让我们看看会发生什么这个操作的一部分。好吧,我专注于左侧。颜色编码是这样,橙色意味着一切发生在火花线,绿色意味着一切发生在用户线。所以,只要你表现我点hasnext火花找出是否有任何记录。所以它的磁盘读取,读取流将会好的,我将回到创造了新的纪录。 So whenever you say i dot next it will basically return you the next record. Once you get the record you will give it to columnar parquet writer. Once your row group is filled parquet writer will do columnar compression, write it to underlying output stream and data will be returned to HDFS. So far, so good. The problem we were seeing is, since we were writing one GB file, this whole operation was taking 45 minutes, 45 ish minutes. So that was not expected we wanted to you know, we wanted this to run faster. So what we did is we basically split the responsibilities. So we let the Spark thread read from disk and populate into an internal shared memory buffer. And we spawned a new thread, which can read from the shared buffer and do the parquet right operation. The advantage we got with that is since we have now two threads, and they are sharing a buffer, we can read very fast from Spark and also finish this whole operation quite fast. So we were able to reduce the runtime from 45 to 25 minutes by just doing this optimization. So, if you have a use case something like this, please think about this.

通过共享相同的火花资源提高利用率

现在我将讨论效率改进。所以,这是一个非常简单的用例,或者我写了一个非常简单的火花,我肯定不会在生产运行。但是我想强调这是假设有一个Java火花上下文和我们从卡夫卡正在阅读。我们假设一个话题然后我们(模糊)对吗?现在,如果我们想看看会发生什么当我们运行这段代码时,这就是这样…这是你会看到的。所以,纵轴是遗嘱执行人id和水平轴代表时间。在之间,我们有很多红色的指针。所以这些竖线基本上意味着阶段边界。现在你可以看到,有很多的地方只有你看到一条水平线,水平线表示,只有一个或者两个火星任务正在运行和其他遗嘱执行人是闲置的。这里很多空格表明有很多时间作为运行这个火花的一部分工作,执行人是闲置的。 So if you now see the efficiency of this job, the efficiency of this job is going to be very, very low. And that’s a very big problem, because finally, when you are running an ingestion system, all you look at is what is the dollar value you’re going to spend when you are moving one GB or one TB of data from store system to sync system. Now the dollar value is going to be very high for a system like this, because you’re wasting a lot of resources and there is a room for improvement. So what we did is, we instead of launching ingestion for one Kafka topic in one Spark job, we clubbed a lot of Kafka topics together. So what we did is we basically launched… At Uber we launched 32. But take a look at your own around… Take a look at your own Spark dag and you can decide your own configuration, but 32 worked very well for us. Now you can see that there is hardly any white space here. You will only see white color right at the end as a vertical line, but that’s expected because when you are almost finishing the Spark job, there is pretty much nothing left. And that’s why there are some executors who are going to be idle, which is okay. But if you look at the left half, this is completely filled. All these colors means that the Spark is good to start working on some tasks or the other. So finally, I will talk about memory improvements. Here, I’m primarily focusing on off heap improvements, which we had to do.

堆内存的改进(工作)

如果你记得,就像一开始我的演讲,我在谈论JVM分析器,它在内存中帮助我们分析我们的工作。这是帮助我们。所以发生了什么是,我们的工作是运行了7 GB的内存,并且运行良好。所以我们的工作是竞选,比方说,30分钟。25分钟,一切都很好,突然我们看到几个执行人被杀。当我们看司机日志。这是我们看到的。容器被纱超过内存限制。我们不知道发生了什么,因为我们没有使用额外的内存,我们没有做任何它内存操作。所以我们不确定发生了什么。 So, JVM profiler help here. So what was happening is we use persist very heavily in Marmaray. So we cache lots of stuff so that we don’t re-compute operations. What happens is, Spark let’s say you have to executor two and which needs data from previous stage, and if that previous stage pass did not run on the same executor, it will ask for the data from someone other executor. Now when it does that, what Spark was doing till Spark two dot one version is, it used to memory map the entire file. So let’s say you have some RDD blocks, which are one GB or two GB in size. Then let’s say suddenly, some external executors request those RDD blocks then Spark will simply start memory mapping those files. And what will happen is that will simply jump up your physical memory usage and, that’s why YARN which is monitoring the physical memory usage for every processor will go ahead and kill it. This is definitely not something what we wanted. So we had some ideas in mind of how we can improve it. So we were thinking about doing a rolling buffer where instead of memory mapping the entire file, we could do like some four MB buffers and keep rolling so that we reduce the physical memory usage. But parallely we also had an ongoing task of moving from Spark two dot one to two dot four. So the good thing here is like if you have switched to Spark two dot four you will not see this problem. The reason is because in Spark two dot four they have migrated from using this memory map like process to using file regions. They did that for some different reason. But actually this solves this problem as well. So if you are hitting this issue and you’re on Spark two dot one I highly recommend just move to Spark two dot four. That can save you a lot of memory and a lot of money for your company. If you want to fix it for two dot one, yeah, let’s talk.

这就是我今天的。我们正在招聘。如果你有兴趣,你想有趣的问题,请联系。

看更多的火花+人工智能会话

免费试着砖
«回来
关于Omkar Joshi

Omkar Joshi Uber Hadoop的平台上是一个高级软件工程师团队,在那里他的架构马尔马拉。bob体育客户端下载Omkar解决大规模分布式系统问题有浓厚的兴趣。以前,他在Hedvig对象存储和NFS的解决方案,是一个初始对Hadoop的纱线调度器贡献。