Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063. #47

Closed
surajshukla1105 opened this issue May 3, 2017 · 9 comments

Comments

@surajshukla1105
Copy link

surajshukla1105 commented May 3, 2017

Hi, I am getting error while trying to run model using pyspark

from elephas.spark_model import SparkModel
import elephas.spark_model
import keras
from elephas import optimizers as elephas_optimizers

adagrad = elephas_optimizers.Adagrad()

spark_model = SparkModel(sc,model, optimizer=adagrad, frequency='epoch', mode='asynchronous')
spark_model.train(rdd, nb_epoch=20, batch_size=32, verbose=0, validation_split=0.1)

Exception Traceback (most recent call last)
in ()
7
8 spark_model = SparkModel(sc,model, optimizer=adagrad, frequency='epoch', mode='asynchronous')
----> 9 spark_model.train(rdd, nb_epoch=20, batch_size=32, verbose=0, validation_split=0.1)

.\Anaconda3\envs\tensorflow\lib\site-packages\elephas\spark_model.py in train(self, rdd, nb_epoch, batch_size, verbose, validation_split)
186
187 if self.mode in ['asynchronous', 'synchronous', 'hogwild']:
--> 188 self._train(rdd, nb_epoch, batch_size, verbose, validation_split, master_url)
189 else:
190 print("""Choose from one of the modes: asynchronous, synchronous or hogwild""")

.\Anaconda3\envs\tensorflow\lib\site-packages\elephas\spark_model.py in _train(self, rdd, nb_epoch, batch_size, verbose, validation_split, master_url)
197 self.master_network.compile(optimizer=self.master_optimizer, loss=self.master_loss, metrics=self.master_metrics)
198 if self.mode in ['asynchronous', 'hogwild']:
--> 199 self.start_server()
200 yaml = self.master_network.to_yaml()
201 train_config = self.get_train_config(nb_epoch, batch_size,

.\Anaconda3\envs\tensorflow\lib\site-packages\elephas\spark_model.py in start_server(self)
122 ''' Start parameter server'''
123 self.server = Process(target=self.start_service)
--> 124 self.server.start()
125
126 def stop_server(self):

.\Anaconda3\envs\tensorflow\lib\multiprocessing\process.py in start(self)
103 'daemonic processes are not allowed to have children'
104 _cleanup()
--> 105 self._popen = self._Popen(self)
106 self._sentinel = self._popen.sentinel
107 _children.add(self)

.\Anaconda3\envs\tensorflow\lib\multiprocessing\context.py in _Popen(process_obj)
210 @staticmethod
211 def _Popen(process_obj):
--> 212 return _default_context.get_context().Process._Popen(process_obj)
213
214 class DefaultContext(BaseContext):

.\Anaconda3\envs\tensorflow\lib\multiprocessing\context.py in _Popen(process_obj)
311 def _Popen(process_obj):
312 from .popen_spawn_win32 import Popen
--> 313 return Popen(process_obj)
314
315 class SpawnContext(BaseContext):

.\Anaconda3\envs\tensorflow\lib\multiprocessing\popen_spawn_win32.py in init(self, process_obj)
64 try:
65 reduction.dump(prep_data, to_child)
---> 66 reduction.dump(process_obj, to_child)
67 finally:
68 context.set_spawning_popen(None)

.\Anaconda3\envs\tensorflow\lib\multiprocessing\reduction.py in dump(obj, file, protocol)
57 def dump(obj, file, protocol=None):
58 '''Replacement for pickle.dump() using ForkingPickler.'''
---> 59 ForkingPickler(file, protocol).dump(obj)
60
61 #

.\spark\spark-2.1.0-bin-hadoop2.7\python\pyspark\context.py in getnewargs(self)
277 # This method is called when attempting to pickle SparkContext, which is always an error:
278 raise Exception(
--> 279 "It appears that you are attempting to reference SparkContext from a broadcast "
280 "variable, action, or transformation. SparkContext can only be used on the driver, "
281 "not in code that it run on workers. For more information, see SPARK-5063."

Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

Could anyone please help me with this.

@misi1987107
Copy link

Hello.man,I have the same mistake with you ? Did you slove this problem?
Please help me .

@taoyizhi68
Copy link

same problem here.

@fairyqiqi
Copy link

I'm using Spark 2.2 and I managed to change some of the elephas codes locally to get the example codes run. But I got this issue as well when I ran ml_mlp.py.

Traceback (most recent call last):
File "D:/spark/initialTry/../GitHub/elephas/examples/ml_mlp.py", line 80, in
fitted_pipeline = pipeline.fit(df)
File "D:\spark\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\ml\base.py", line 64, in fit
File "D:\spark\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\ml\pipeline.py", line 108, in _fit
File "D:\spark\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\ml\base.py", line 64, in fit
File "D:\Program Files\Anaconda3\lib\site-packages\elephas\ml_model.py", line 67, in _fit
verbose=verbosity, validation_split=validation_split)
File "D:\Program Files\Anaconda3\lib\site-packages\elephas\spark_model.py", line 188, in train
self._train(rdd, nb_epoch, batch_size, verbose, validation_split, master_url)
File "D:\Program Files\Anaconda3\lib\site-packages\elephas\spark_model.py", line 199, in _train
self.start_server()
File "D:\Program Files\Anaconda3\lib\site-packages\elephas\spark_model.py", line 124, in start_server
self.server.start()
File "D:\Program Files\Anaconda3\lib\multiprocessing\process.py", line 105, in start
self._popen = self._Popen(self)
File "D:\Program Files\Anaconda3\lib\multiprocessing\context.py", line 212, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "D:\Program Files\Anaconda3\lib\multiprocessing\context.py", line 313, in _Popen
return Popen(process_obj)
File "D:\Program Files\Anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 66, in init
reduction.dump(process_obj, to_child)
File "D:\Program Files\Anaconda3\lib\multiprocessing\reduction.py", line 59, in dump
ForkingPickler(file, protocol).dump(obj)
File "D:\spark\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip\pyspark\context.py", line 306, in getnewargs
Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
Using TensorFlow backend.

Would you please shed some lights on what went wrong?

@jacekad
Copy link

jacekad commented May 8, 2018

I just got similar issue. Can you advise me how can I fix this issue ?

i want to run it on the local spark instance (context is estabiliseh as below)

import os
import sys
os.environ["SPARK_HOME"]= "C:\....\spark-2.2.0-hadoop2.6"
import findspark
findspark.init()
import numpy as np
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SQLContext
conf = SparkConf().setAppName('Elephas_App').setMaster('local[8]')

@maxpumperla
Copy link
Owner

@jacekad this is a mistake on my end. Need to get back to this at some point. To resolve the problem one has to do precisely as the error message suggest and not let workers access sc. Must be in the _train method somewhere. training takes place on workers, so SparkContext has to be eliminated from there.

@mohaimenz
Copy link

Hi Max,
I see the same issue and I have seen the repository. Do not see any fix yet.
Does that mean we can't use it until it is fixed?

Looking forward for your response?

@sashgorokhov
Copy link

Had the similar issue, not with elephas but with plain pyspark app. Figured out how to resolve it.

Here is my errorneus code sample:

class Foo:
    def __init__(self, spark_context: pyspark.SparkContext):
        self._spark_context = spark_context
        self.just_plain_string = 'foobar'

    def do_work(self):
        rdd = self._spark_context.range(0, 10)
        rdd.map(lambda i: self.just_plain_string + str(i))
        return rdd.collect()

So the problem here is that i using class variable in my lambda expression. Internally pyspark tries to pickle everything it gets, so it pickles the whole object Foo wich contains reference to spark context.
To fix that, i had to remove reference to that class from lambda:

class Foo:
    def __init__(self, spark_context: pyspark.SparkContext):
        self._spark_context = spark_context
        self.just_plain_string = 'foobar'

    def do_work(self):
        rdd = self._spark_context.range(0, 10)
        just_plain_string = self.just_plain_string
        rdd.map(lambda i: just_plain_string + str(i))
        return rdd.collect()

Hope that will help someone.

@maxpumperla
Copy link
Owner

@surajshukla1105 thanks for reporting this!

@maxpumperla
Copy link
Owner

@surajshukla1105 @sashgorokhov check out the latest version on master, I don't see this problem coming up anymore.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants