Accessing Mocked S3 Bucket via Pyspark?

See original GitHub issue

I’m trying to test a function that invokes pyspark to read a file from an S3 bucket. The test simply uploads a test file to the S3 bucket and sees if pyspark can read the file. The test works fine when I provide my actual S3 bucket, but I am trying to see if I can get it to work using moto.

While the upload of the test file works fine with moto, I get a permission denied error when pyspark tries to access the file via its S3 URI.

Here is a simplified version of what I am trying to do.

from moto import mock_s3
from io import BytesIO
from pyspark.sql import SparkSession
import pandas as pd
import boto3
import os

mock = mock_s3()
mock.start()

conn = boto3.resource('s3', region_name='us-east-1')
conn.create_bucket(Bucket='mybucket')

df = pd.DataFrame({'a': [1, 2], 'b': [3, 4]})
file_path = 'test_folder/test.csv'
data_object = BytesIO(df.to_csv(index=False).encode('utf-8'))
conn.Bucket('mybucket').put_object(Key=file_path, Body=data_object)

spark = SparkSession.builder.getOrCreate()
spark.read.csv('s3n://mybucket/test_folder/test.csv', sep=',', header=True)

I then get the following traceback:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-28-7123d975bed9> in <module>()
----> 1 spark.read.csv('s3n://mybucket/test_folder/test.csv', sep=',', header=True)

/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/sql/readwriter.py in csv(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine)
    408         if isinstance(path, basestring):
    409             path = [path]
--> 410         return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    411 
    412     @since(1.5)

/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(

Py4JJavaError: An error occurred while calling o264.csv.
: org.apache.hadoop.security.AccessControlException: Permission denied: s3n://mybucket/test_folder/test.csv
	at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(Jets3tNativeFileSystemStore.java:449)
	at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.processException(Jets3tNativeFileSystemStore.java:427)
	at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.handleException(Jets3tNativeFileSystemStore.java:411)
	at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:181)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at org.apache.hadoop.fs.s3native.$Proxy10.retrieveMetadata(Unknown Source)
	at org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:468)
	at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1426)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:359)
	at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:348)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at scala.collection.immutable.List.flatMap(List.scala:344)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:348)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
	at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:533)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.jets3t.service.impl.rest.HttpException
	at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:423)
	at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRequest(RestStorageService.java:277)
	at org.jets3t.service.impl.rest.httpclient.RestStorageService.performRestHead(RestStorageService.java:1038)
	at org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectImpl(RestStorageService.java:2250)
	at org.jets3t.service.impl.rest.httpclient.RestStorageService.getObjectDetailsImpl(RestStorageService.java:2179)
	at org.jets3t.service.StorageService.getObjectDetails(StorageService.java:1120)
	at org.jets3t.service.StorageService.getObjectDetails(StorageService.java:575)
	at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:174)
	... 30 more

I expected to be able to read the file into a pyspark dataframe, but it seems like I am being denied access. Is something like this possible to do using the moto library?

I am using Python 3.6.0 with boto3=1.5.9 and moto=1.2.0.

Issue Analytics

  • State:closed
  • Created 5 years ago
  • Comments:18 (3 by maintainers)

github_iconTop GitHub Comments

7reactions
dslawcommented, Oct 11, 2018

Using a separate moto server for S3 and following this and this, there doesn’t seem to be an immediate problem reading via pyspark.

from io import BytesIO
from pyspark.sql import SparkSession
import boto3
import os
import pandas as pd


os.environ["PYSPARK_SUBMIT_ARGS"] = (
    '--packages "org.apache.hadoop:hadoop-aws:2.7.3" pyspark-shell'
)
spark = SparkSession.builder.getOrCreate()

# Setup spark to use s3, and point it to the moto server.
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("fs.s3a.access.key", "mock")
hadoop_conf.set("fs.s3a.secret.key", "mock")
hadoop_conf.set("fs.s3a.endpoint", "http://127.0.0.1:5000")

conn = boto3.resource("s3", endpoint_url="http://127.0.0.1:5000")
conn.create_bucket(Bucket="bucket")

data = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
with BytesIO(data.to_csv(index=False).encode()) as buffer:
    conn.Bucket("bucket").put_object(Key="test/test.csv", Body=buffer)

df = spark.read.csv("s3://bucket/test/test.csv", header=True)
df
# DataFrame[a: string, b: string]
0reactions
bblommerscommented, Oct 15, 2021

@jbvsmo The standard mock_s3-decorator specifically patches boto/boto3. It doesn’t look like PySpark uses those dependencies, so I don’t see how it could work.

@dslaw already posted a working solution using MotoServer as an external process, so as far as I’m concerned this can be closed.

Note that Moto has it’s own Docker-image here: https://hub.docker.com/r/motoserver/moto/tags Other Docker-images (like the one posted above) are not supported.

Read more comments on GitHub >

github_iconTop Results From Across the Web

How to mock S3 service for Pyspark jobs. | by Mahsa Seifikar
To mock the s3 connection, we use moto library and spark-session fixture to mock SparkSession object. import boto3 import os import pytest
Read more >
How to test mocked (moto/boto) S3 read/write in PySpark
1 Answer 1 · The above code creates a mock s3 bucket with name "MUBUCKET". The bucket is empty. · The name of...
Read more >
Testing Glue Pyspark jobs - Towards Data Science
In this article, I'll show you how you can setup a mocked S3 bucket that you can access from your python process as...
Read more >
Accessing Mocked S3 Bucket via Pyspark? - Bountysource
I'm trying to test a function that invokes pyspark to read a file from an S3 bucket. The test simply uploads a test...
Read more >
PySpark | Part-1 | Accessing the AWS S3 data by using Python
PySpark | Tutorial-26 | Part-1 | Accessing the AWS S3 data by using Python | Boto3 Library.
Read more >

github_iconTop Related Medium Post

No results found

github_iconTop Related StackOverflow Question

No results found

github_iconTroubleshoot Live Code

Lightrun enables developers to add logs, metrics and snapshots to live code - no restarts or redeploys required.
Start Free

github_iconTop Related Reddit Thread

No results found

github_iconTop Related Hackernoon Post

No results found

github_iconTop Related Tweet

No results found

github_iconTop Related Dev.to Post

No results found

github_iconTop Related Hashnode Post

No results found