Accessing Mocked S3 Bucket via Pyspark?
See original GitHub issueI’m trying to test a function that invokes pyspark to read a file from an S3 bucket. The test simply uploads a test file to the S3 bucket and sees if pyspark can read the file. The test works fine when I provide my actual S3 bucket, but I am trying to see if I can get it to work using moto.
While the upload of the test file works fine with moto, I get a permission denied error when pyspark tries to access the file via its S3 URI.
Here is a simplified version of what I am trying to do.
from moto import mock_s3
from io import BytesIO
from pyspark.sql import SparkSession
import pandas as pd
import boto3
import os
mock = mock_s3()
mock.start()
conn = boto3.resource('s3', region_name='us-east-1')
conn.create_bucket(Bucket='mybucket')
df = pd.DataFrame({'a': [1, 2], 'b': [3, 4]})
file_path = 'test_folder/test.csv'
data_object = BytesIO(df.to_csv(index=False).encode('utf-8'))
conn.Bucket('mybucket').put_object(Key=file_path, Body=data_object)
spark = SparkSession.builder.getOrCreate()
spark.read.csv('s3n://mybucket/test_folder/test.csv', sep=',', header=True)
I then get the following traceback:
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-28-7123d975bed9> in <module>()
----> 1 spark.read.csv('s3n://mybucket/test_folder/test.csv', sep=',', header=True)
/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/sql/readwriter.py in csv(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine)
408 if isinstance(path, basestring):
409 path = [path]
--> 410 return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
411
412 @since(1.5)
/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
Py4JJavaError: An error occurred while calling o264.csv.
: org.apache.hadoop.security.AccessControlException: Permission denied: s3n://mybucket/test_folder/test.csv
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(Jets3tNativeFileSystemStore.java:449)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(Jets3tNativeFileSystemStore.java:427)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.handleException(Jets3tNativeFileSystemStore.java:411)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:181)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at org.apache.hadoop.fs.s3native.$Proxy10.retrieveMetadata(Unknown Source)
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:468)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:359)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:348)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:344)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:348)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:533)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.jets3t.service.impl.rest.HttpException
at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:423)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:277)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestHead(RestStorageService.java:1038)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl(RestStorageService.java:2250)
at org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl(RestStorageService.java:2179)
at org.jets3t.service.StorageService.getObjectDetails(StorageService.java:1120)
at org.jets3t.service.StorageService.getObjectDetails(StorageService.java:575)
at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:174)
... 30 more
I expected to be able to read the file into a pyspark dataframe, but it seems like I am being denied access. Is something like this possible to do using the moto library?
I am using Python 3.6.0 with boto3=1.5.9 and moto=1.2.0.
Issue Analytics
- State:
- Created 5 years ago
- Comments:18 (3 by maintainers)
Top Results From Across the Web
How to mock S3 service for Pyspark jobs. | by Mahsa Seifikar
To mock the s3 connection, we use moto library and spark-session fixture to mock SparkSession object. import boto3 import os import pytest
Read more >How to test mocked (moto/boto) S3 read/write in PySpark
1 Answer 1 · The above code creates a mock s3 bucket with name "MUBUCKET". The bucket is empty. · The name of...
Read more >Testing Glue Pyspark jobs - Towards Data Science
In this article, I'll show you how you can setup a mocked S3 bucket that you can access from your python process as...
Read more >Accessing Mocked S3 Bucket via Pyspark? - Bountysource
I'm trying to test a function that invokes pyspark to read a file from an S3 bucket. The test simply uploads a test...
Read more >PySpark | Part-1 | Accessing the AWS S3 data by using Python
PySpark | Tutorial-26 | Part-1 | Accessing the AWS S3 data by using Python | Boto3 Library.
Read more >
Top Related Medium Post
No results found
Top Related StackOverflow Question
No results found
Troubleshoot Live Code
Lightrun enables developers to add logs, metrics and snapshots to live code - no restarts or redeploys required.
Start Free
Top Related Reddit Thread
No results found
Top Related Hackernoon Post
No results found
Top Related Tweet
No results found
Top Related Dev.to Post
No results found
Top Related Hashnode Post
No results found
Using a separate moto server for S3 and following this and this, there doesn’t seem to be an immediate problem reading via pyspark.
@jbvsmo The standard
mock_s3-decorator specifically patches boto/boto3. It doesn’t look like PySpark uses those dependencies, so I don’t see how it could work.@dslaw already posted a working solution using MotoServer as an external process, so as far as I’m concerned this can be closed.
Note that Moto has it’s own Docker-image here: https://hub.docker.com/r/motoserver/moto/tags Other Docker-images (like the one posted above) are not supported.