foreachRDD设计模式的使用

foreachRDD设计模式的使用

首先来看看这个算子的的功能
foreachRDD(func) :对从流中生成的每个 RDD 应用函数 func 的最通用的输出运算符. 此功能应将每个 RDD 中的数据推送到外部系统, 例如将 RDD 保存到文件, 或将其通过网络写入数据库. 请注意, 函数 func 在运行流应用程序的 driver 进程中执行, 通常会在其中具有 RDD 动作, 这将强制流式传输 RDD 的计算.

错误示例

dstream.foreachRDD 是一个强大的原语, 允许将数据发送到外部系统.但是, 了解如何正确有效地使用这个原语很重要. 避免一些常见的错误如下.
通常向外部系统写入数据需要创建连接对象(例如与远程服务器的 TCP 连接), 并使用它将数据发送到远程系统.为此, 开发人员可能会无意中尝试在Spark driver 中创建连接对象, 然后尝试在Spark工作人员中使用它来在RDD中保存记录.例如(在 Scala 中):

1
2
3
4
5
6
dstream.foreachRDD { rdd =>
val connection = createNewConnection() // executed at the driver
rdd.foreach { record =>
connection.send(record) // executed at the worker
}
}

上面这段这是不正确的, 因为这需要将连接对象序列化并从 driver 发送到 worker. 这种连接对象很少能跨机器转移. 此错误可能会显示为序列化错误(连接对象不可序列化), 初始化错误(连接对象需要在 worker 初始化)等.
正确的解决方案是在 worker 创建连接对象.

但是, 这可能会导致另一个常见的错误 - 为每个记录创建一个新的连接. 例如:

1
2
3
4
5
6
7
dstream.foreachRDD { rdd =>
rdd.foreach { record =>
val connection = createNewConnection()
connection.send(record)
connection.close()
}
}

通常, 创建连接对象具有时间和资源开销. 因此, 创建和销毁每个记录的连接对象可能会引起不必要的高开销, 并可显着降低系统的总体吞吐量.

最佳实践来了

  1. 更好的解决方案是使用 rdd.foreachPartition - 创建一个连接对象, 并使用该连接在 RDD 分区中发送所有记录.
1
2
3
4
5
6
7
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
}
}

这样可以在多个记录上分摊连接创建开销.

  1. 还可以通过跨多个RDD /批次重用连接对象来进一步优化. 可以维护连接对象的静态池, 而不是将多个批次的 RDD 推送到外部系统时重新使用, 从而进一步减少开销.
1
2
3
4
5
6
7
8
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}

请注意, 池中的连接应根据需要懒惰创建, 如果不使用一段时间, 则会超时. 这实现了最有效地将数据发送到外部系统.

其他要记住的要点:
DStreams 通过输出操作进行延迟执行, 就像 RDD 由 RDD 操作懒惰地执行. 具体来说, DStream 输出操作中的 RDD 动作强制处理接收到的数据.因此, 如果您的应用程序没有任何输出操作, 或者具有 dstream.foreachRDD() 等输出操作, 而在其中没有任何 RDD 操作, 则不会执行任何操作.系统将简单地接收数据并将其丢弃.

默认情况下, 输出操作是 one-at-a-time 执行的. 它们按照它们在应用程序中定义的顺序执行.

-------------End Of This ArticleThank You For Reading-------------