我的目标是使用elasticsearch-hadoop连接器将数据直接加载到ES与pySpark.I’m quite new to dataproc and pySpark and got stuck quite early.I run single node cluster (Image 1.3 ,Debian 9,Hadoop 2.9,Spark 2.3)and this my code.
我运行一个单节点集群(Image 1.3 ,Debian 9,Hadoop 2.9,Spark 2.3),这是我的代码。我想我需要安装Java。
谢谢!
from pyspark.sql import SQLContext
from pyspark.sql.functions import lit
import os
from pyspark.sql import SparkSession
def install_java():
!apt-get install -y openjdk-8-jdk-headless -qq > /dev/null #install openjdk
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" #set environment
variable
!java -version #check java version
install_java()
conf = SparkConf().setAppName("testing").setMaster('ip-address')
sc = SparkContext.getOrCreate()
ExceptionTraceback (most recent call last)
<ipython-input-18-df37a24b7514> in <module>()
----> 1 sc = SparkContext.getOrCreate()
/usr/lib/spark/python/pyspark/context.pyc in getOrCreate(cls, conf)
361 with SparkContext._lock:
362 if SparkContext._active_spark_context is None:
--> 363 SparkContext(conf=conf or SparkConf())
364 return SparkContext._active_spark_context
365
/usr/lib/spark/python/pyspark/context.pyc in __init__(self, master, appName, sparkHome,
pyFiles, environment, batchSize, serializer, conf, gateway, jsc, profiler_cls)
127 " note this option will be removed in Spark 3.0")
128
--> 129 SparkContext._ensure_initialized(self, gateway=gateway, conf=conf)
130 try:
131 self._do_init(master, appName, sparkHome, pyFiles, environment, batchSize,
serializer,
/usr/lib/spark/python/pyspark/context.pyc in _ensure_initialized(cls, instance, gateway,
conf)
310 with SparkContext._lock:
311 if not SparkContext._gateway:
--> 312 SparkContext._gateway = gateway or launch_gateway(conf)
313 SparkContext._jvm = SparkContext._gateway.jvm
314
/usr/lib/spark/python/pyspark/java_gateway.pyc in launch_gateway(conf)
44 :return: a JVM gateway
45 """
---> 46 return _launch_gateway(conf)
47
48
/usr/lib/spark/python/pyspark/java_gateway.pyc in _launch_gateway(conf, insecure)
106
107 if not os.path.isfile(conn_info_file):
--> 108 raise Exception("Java gateway process exited before sending its port
number")
109
110 with open(conn_info_file, "rb") as info:
Exception: Java gateway process exited before sending its port number
解决方案:
好了,解决了,我需要在创建新的SparkContext之前停止当前上下文。
sc.stop()