New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063. #47
Comments
Hello.man,I have the same mistake with you ? Did you slove this problem? |
same problem here. |
I'm using Spark 2.2 and I managed to change some of the elephas codes locally to get the example codes run. But I got this issue as well when I ran ml_mlp.py. Traceback (most recent call last): Would you please shed some lights on what went wrong? |
I just got similar issue. Can you advise me how can I fix this issue ? i want to run it on the local spark instance (context is estabiliseh as below) import os |
@jacekad this is a mistake on my end. Need to get back to this at some point. To resolve the problem one has to do precisely as the error message suggest and not let workers access |
Hi Max, Looking forward for your response? |
Had the similar issue, not with elephas but with plain pyspark app. Figured out how to resolve it. Here is my errorneus code sample: class Foo:
def __init__(self, spark_context: pyspark.SparkContext):
self._spark_context = spark_context
self.just_plain_string = 'foobar'
def do_work(self):
rdd = self._spark_context.range(0, 10)
rdd.map(lambda i: self.just_plain_string + str(i))
return rdd.collect() So the problem here is that i using class variable in my lambda expression. Internally pyspark tries to pickle everything it gets, so it pickles the whole object Foo wich contains reference to spark context. class Foo:
def __init__(self, spark_context: pyspark.SparkContext):
self._spark_context = spark_context
self.just_plain_string = 'foobar'
def do_work(self):
rdd = self._spark_context.range(0, 10)
just_plain_string = self.just_plain_string
rdd.map(lambda i: just_plain_string + str(i))
return rdd.collect() Hope that will help someone. |
@surajshukla1105 thanks for reporting this! |
@surajshukla1105 @sashgorokhov check out the latest version on master, I don't see this problem coming up anymore. |
Hi, I am getting error while trying to run model using pyspark
from elephas.spark_model import SparkModel
import elephas.spark_model
import keras
from elephas import optimizers as elephas_optimizers
adagrad = elephas_optimizers.Adagrad()
spark_model = SparkModel(sc,model, optimizer=adagrad, frequency='epoch', mode='asynchronous')
spark_model.train(rdd, nb_epoch=20, batch_size=32, verbose=0, validation_split=0.1)
Exception Traceback (most recent call last)
in ()
7
8 spark_model = SparkModel(sc,model, optimizer=adagrad, frequency='epoch', mode='asynchronous')
----> 9 spark_model.train(rdd, nb_epoch=20, batch_size=32, verbose=0, validation_split=0.1)
.\Anaconda3\envs\tensorflow\lib\site-packages\elephas\spark_model.py in train(self, rdd, nb_epoch, batch_size, verbose, validation_split)
186
187 if self.mode in ['asynchronous', 'synchronous', 'hogwild']:
--> 188 self._train(rdd, nb_epoch, batch_size, verbose, validation_split, master_url)
189 else:
190 print("""Choose from one of the modes: asynchronous, synchronous or hogwild""")
.\Anaconda3\envs\tensorflow\lib\site-packages\elephas\spark_model.py in _train(self, rdd, nb_epoch, batch_size, verbose, validation_split, master_url)
197 self.master_network.compile(optimizer=self.master_optimizer, loss=self.master_loss, metrics=self.master_metrics)
198 if self.mode in ['asynchronous', 'hogwild']:
--> 199 self.start_server()
200 yaml = self.master_network.to_yaml()
201 train_config = self.get_train_config(nb_epoch, batch_size,
.\Anaconda3\envs\tensorflow\lib\site-packages\elephas\spark_model.py in start_server(self)
122 ''' Start parameter server'''
123 self.server = Process(target=self.start_service)
--> 124 self.server.start()
125
126 def stop_server(self):
.\Anaconda3\envs\tensorflow\lib\multiprocessing\process.py in start(self)
103 'daemonic processes are not allowed to have children'
104 _cleanup()
--> 105 self._popen = self._Popen(self)
106 self._sentinel = self._popen.sentinel
107 _children.add(self)
.\Anaconda3\envs\tensorflow\lib\multiprocessing\context.py in _Popen(process_obj)
210 @staticmethod
211 def _Popen(process_obj):
--> 212 return _default_context.get_context().Process._Popen(process_obj)
213
214 class DefaultContext(BaseContext):
.\Anaconda3\envs\tensorflow\lib\multiprocessing\context.py in _Popen(process_obj)
311 def _Popen(process_obj):
312 from .popen_spawn_win32 import Popen
--> 313 return Popen(process_obj)
314
315 class SpawnContext(BaseContext):
.\Anaconda3\envs\tensorflow\lib\multiprocessing\popen_spawn_win32.py in init(self, process_obj)
64 try:
65 reduction.dump(prep_data, to_child)
---> 66 reduction.dump(process_obj, to_child)
67 finally:
68 context.set_spawning_popen(None)
.\Anaconda3\envs\tensorflow\lib\multiprocessing\reduction.py in dump(obj, file, protocol)
57 def dump(obj, file, protocol=None):
58 '''Replacement for pickle.dump() using ForkingPickler.'''
---> 59 ForkingPickler(file, protocol).dump(obj)
60
61 #
.\spark\spark-2.1.0-bin-hadoop2.7\python\pyspark\context.py in getnewargs(self)
277 # This method is called when attempting to pickle SparkContext, which is always an error:
278 raise Exception(
--> 279 "It appears that you are attempting to reference SparkContext from a broadcast "
280 "variable, action, or transformation. SparkContext can only be used on the driver, "
281 "not in code that it run on workers. For more information, see SPARK-5063."
Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.
Could anyone please help me with this.
The text was updated successfully, but these errors were encountered: