PySpark - SparkContext


SparkContext 是任何 Spark 功能的入口点。当我们运行任何 Spark 应用程序时,都会启动一个驱动程序,其中包含 main 函数,并且您的 SparkContext 会在此处启动。然后驱动程序在工作节点上的执行器内运行操作。

SparkContext 使用 Py4J 启动JVM并创建JavaSparkContext。默认情况下,PySpark 将 SparkContext 用作'sc',因此创建新的 SparkContext 不起作用。

SparkContext

以下代码块包含 PySpark 类和 SparkContext 可以采用的参数的详细信息。

class pyspark.SparkContext (
   master = None,
   appName = None, 
   sparkHome = None, 
   pyFiles = None, 
   environment = None, 
   batchSize = 0, 
   serializer = PickleSerializer(), 
   conf = None, 
   gateway = None, 
   jsc = None, 
   profiler_cls = <class 'pyspark.profiler.BasicProfiler'>
)

参数

以下是 SparkContext 的参数。

  • Master - 这是它连接到的集群的 URL。

  • appName - 您的工作名称。

  • SparkHome - Spark 安装目录。

  • pyFiles - 要发送到集群并添加到 PYTHONPATH 的 .zip 或 .py 文件。

  • 环境- 工作节点环境变量。

  • batchSize - 表示为单个 Java 对象的 Python 对象的数量。设置 1 以禁用批处理,设置 0 以根据对象大小自动选择批处理大小,或设置 -1 以使用无限的批处理大小。

  • 序列化器- RDD 序列化器。

  • Conf - L{SparkConf} 的对象,用于设置所有 Spark 属性。

  • 网关- 使用现有的网关和 JVM,否则初始化新的 JVM。

  • JSC - JavaSparkContext 实例。

  • profiler_cls - 用于进行分析的自定义分析器类(默认为 pyspark.profiler.BasicProfiler)。

上述参数中,使用最多的是masterappname 。任何 PySpark 程序的前两行如下所示 -

from pyspark import SparkContext
sc = SparkContext("local", "First App")

SparkContext 示例 – PySpark Shell

现在您已经足够了解 SparkContext,让我们在 PySpark shell 上运行一个简单的示例。在此示例中,我们将计算README.md文件中包含字符“a”或“b”的行数。因此,假设文件中有 5 行,其中 3 行包含字符“a”,则输出将为 → Line with a: 3。对于字符“b”也将执行相同的操作。

注意- 在以下示例中,我们不会创建任何 SparkContext 对象,因为默认情况下,当 PySpark shell 启动时,Spark 会自动创建名为 sc 的 SparkContext 对象。如果您尝试创建另一个 SparkContext 对象,您将收到以下错误 – “ValueError:无法同时运行多个 SparkContext”。

PySpark 外壳

<<< logFile = "file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md"
<<< logData = sc.textFile(logFile).cache()
<<< numAs = logData.filter(lambda s: 'a' in s).count()
<<< numBs = logData.filter(lambda s: 'b' in s).count()
<<< print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
Lines with a: 62, lines with b: 30

SparkContext 示例 - Python 程序

让我们使用 Python 程序运行相同的示例。创建一个名为firstapp.py的Python 文件,并在该文件中输入以下代码。

----------------------------------------firstapp.py---------------------------------------
from pyspark import SparkContext
logFile = "file:///home/hadoop/spark-2.1.0-bin-hadoop2.7/README.md"  
sc = SparkContext("local", "first app")
logData = sc.textFile(logFile).cache()
numAs = logData.filter(lambda s: 'a' in s).count()
numBs = logData.filter(lambda s: 'b' in s).count()
print "Lines with a: %i, lines with b: %i" % (numAs, numBs)
----------------------------------------firstapp.py---------------------------------------

然后我们将在终端中执行以下命令来运行这个Python文件。我们将得到与上面相同的输出。

$SPARK_HOME/bin/spark-submit firstapp.py
Output: Lines with a: 62, lines with b: 30