创建一个IP查找表的活动在一个SIEM架构
2021年10月18日 在工程的博客
在处理网络安全数据时,有一件事是肯定的:没有可用数据来源的短缺。如果有的话,有太多的数据源与重叠的数据。传统SIEM(安全信息和事件管理)工具并不适合处理复杂的转换和缝合多个数据源的一个有效方式。除此之外,是不划算的过程tb和pb级的事件数据每天SIEM系统。
一个常见的网络安全用例要求娶多个数据源的归因是一个IP地址到一个特定的用户。当使用传统SIEM工具,以确定网络上的恶意或可疑活动,SOC(安全操作中心)分析师推出多个查询不同的数据源(VPN, DHCP等等)和手工缝合在一起找出一个IP地址的时间表和行动。这可能需要15分钟/ IP地址。宝贵的时间在发生安全事故,可以拯救你的自动聚合和管理数据在着陆之前SIEM解决方案。
在这个博客中,我们将介绍一个简单的数据收集方法,结合多个数据源创建IP查找表和自动化。这个表是一个威胁情报的基本构建块创建一个整体的照片在你的网络活动。它将使您查询IP地址在一个给定的时间窗口,属性,用户/ MAC地址和跟踪发生的事件的顺序。
我们将使用思科伊势姿态和分析器事件作为off-VPN VPN日志和Infoblox DHCP日志活动。我们使用的是砖实验室数据生成器来模拟这些事件日志和把他们变成一个S3 bucket。在真实的场景中,您可以使用一个实时流媒体服务等运动消防带推动这种从本地服务器日志到S3。这些日志然后摄取使用砖Lakehouse平台bob体育客户端下载。我们已经建立了整个端到端管道使用三角洲生活表,它允许我们在每个阶段确保数据质量的增量数据管理,以及端到端管道血统。一旦数据策划成主IP查找表,如Splunk (SIEM工具使用砖Splunk附加)或BI工具或仪表板表等PowerBI或检查员可以用作表示层。
网络安全lakehouse架构
图1展示了一个典型的网络安全生态系统。这是一个纠缠的不同数据源和系统网络SIEM工具组合。SOC分析师必须查询不同来源和结果缝合到一起得到有意义的见解,变得更加复杂,增加大量的事件数据。平均梳理企业将pb级的数据,没有合适的工具来处理这些庞大的数据集,这个任务可能很乏味,如果不是不可能的。
我们提出一个替代网络安全lakehouse架构,如图2所示。实现这个架构的关键步骤是:
- 土地数据从所有来源到廉价的对象存储
- 转变成为一个高度优化大数据平台:过程和牧师和针数据使用三角洲湖bob体育客户端下载
- 只移动策划数据一旦准备好被消耗到SIEM解决方案
其他优势,Lakehouse架构提供了以下好处:
- 打破竖井:数据对每个人都可用在同一平台上,每个数据角色可以使用自己喜欢的工具(笔记本、DBSQL等)在三角洲湖访问单个源的bob体育客户端下载真理。此外,他们可以很容易地检查代码,对程序和协作在同一平台无需导出代码或数据在其他地方。bob体育客户端下载
- 结合批处理和流媒体:批处理和使用三角洲湖流媒体查询都是几乎相同的。你可以建立你的管道,让他们不会过时,以防将来处理方式的变化。
- 质量保证:你可以设置数据质量约束在不同阶段的处理管道使用预期。这将允许你选择如何应对意料之外的数据。
构建网络安全数据管道与达美住表
从三角洲住表文档:
三角洲生活表是一个框架为构建可靠、可维护、可测试的数据处理管道。你定义转换执行数据,和δ生活表管理任务编制、集群管理、监控、数据质量和错误处理。
下图显示了端到端管道创建IP VPN和DHCP日志查找表。我们选择使用三角洲生活表(DLT)建立管道,因为它的简单,它所提供的数据质量保证措施和跟踪整个管道的血统。与DLT,您可以轻松地构建ETL管道使用Python或SQL。下面显示了ETL的DAG管道我们建立了创建一个IP查找表。
代码走查
你可以找到的笔记本电脑在这里。正如图1中所示,我们的土地每个数据源生(青铜)层。然后我们原始日志使用适当的解析器解析成一个银层。最后,我们得到感兴趣的列(IpAddress MacAddress、用户名等)从解析表,组合成一个策划,准备使用IP查找表(黄金)。
思科伊势分析器解析事件,他们在半键值的风格,我们可以使用PySpark的from_json
方法。此外,我们已经从元数据提取事件时间戳(头)相关的日志。我们使用类似的方法来解析思科伊势姿态事件。
@dlt。表def ise_profiler_silver ():返回(dlt.read_stream (“ise_profiler_bronze”).withColumn(“价值”,F.concat (F.lit(”,元数据= "),F.col(“价值”))).withColumn(“解析”,parse_string_as_json (F.col(“价值”))).withColumn(“数据”,F.from_json (F.col(“解析”),profiler_schema))。选择(F.col(数据。*)).withColumn (“MetadataTimestamp”,F.concat_ws (”“F.array ([F.split (F.col(“元数据”)," ")(我)为我在(8,9]])),))
我们使用正则表达式映射(PySpark regexp_extract方法)来解析DHCP日志。每个消息类型(DHCPACK, DHCPREQUEST等)解析使用其预期的正则表达式模式。我们选择解析DHCP消息类型的ACK, NACK,报价,要求,释放,下降,为简单起见到期。
def parse_dhcp_message_with_regex (source_table_name=“dhcp_bronze”):返回(dlt.read_stream (source_table_name)。选择(“价值”,F。REGEX_DHCPOFFER regexp_extract(“价值”,0).alias (“EventType”))。在哪里(F.col (“EventType”) .isin ((“DHCPREQUEST”,“DHCPACK”,“DHCPNAK”,“DHCPOFFER”,“DHCPRELEASE”,“DHCPDECLINE”,“DHCPEXPIRE”,]))#解析不同类型的消息与不同的正则表达式模式成IP地址列.withColumn (“IpAddress”,F.when (F.col (EventType) .isin ((“DHCPREQUEST”)),F。regexp_extract(“价值”,“(?
为了确保解析表的数据质量,我们对传入的数据设定预期。@dlt表达的。expect_all_or_drop,我们预计EventTimestamp、IpAddress MacAddress没有任何缺失的值。
< pre > valid_dhcp_parsed_record = {:“valid_timestamp EventTimestamp是不零”,:“valid_IpAddress IpAddress是不零”,:“valid_MacAddress MacAddress是不零”,}
@dlt.table@dlt.expect_all_or_drop (valid_dhcp_parsed_record)def dhcp_silver ():返回parse_dhcp_message_with_regex ()
这是解析前DHCP记录是什么样子:
7月13 09:08:00 12.99.46.134了dhcpd [3761]: DHCPACK, 12.88.0.15 (f0:5e: 0 c: b2:35: bf)通过eth2
这是什么模式的解析三角洲表看起来像:
EventTimestamp:字符串IpAddress:字符串MacAddress:字符串EventType:字符串
最后,解析并结合三个数据源之后,我们获得知识产权活动和他们的时间表,如图3所示:
下一个步骤
在这篇文章中,我们介绍的步骤构建一个简单的知识产权归属表。这个表是第一个构建块构建一个整体活动的照片在你的网络威胁检测和事件反应。一个明智的下一步是在数据源和表添加更多的列。
当你拥有越来越多的IP地址信息在你的表,你可以提取“正常”的行为模式对于IP网络中使用机器学习算法。然后你可以检测异常并标记IP如果外面表现正常的界限。
您可以运行附带的笔记本通过遵循下面的链接直接发布:
更多的网络安全内容,查看建立一个网络安全Lakehouse CrowdStrike猎鹰事件和通过DNS分析检测罪犯和国家博客。