apache spark – Getting error while writing dataframe to Ceph Storage

In my organisation I’m currently exploring how we can use Ceph to replace HDFS to run out AI/ML workloads. As part of this initiative we setup a Ceph Cluster and imported it into Kubernetes using Rook.

During my testing with Ceph I was able to access Ceph Storage using S3CMD CLI and also able to read data from Ceph using Spark on Kubernetes. However, I’m getting an error while writing data back to Ceph Storage.

Below is my code and error which I’m getting while writing data back. Hoping someone can help with this.

from pyspark.sql import SparkSession

spark = SparkSession.builder 
        .appName("prateek-pyspark-ceph") 
        .config("spark.kubernetes.driver.master", "k8s://https://xxx:6443") 
        .config("spark.kubernetes.namespace", "jupyter") 
        .config("spark.kubernetes.container.image", "10.77.10.16:5050/telco-cdl-spark-executor-3.0.1") 
        .config("spark.kubernetes.container.image.pullPolicy" ,"Always") 
        .config("spark.kubernetes.container.image.pullSecrets" ,"gcr") 
        .config("spark.kubernetes.authenticate.driver.serviceAccountName", "spark") 
        .config("spark.kubernetes.authenticate.executor.serviceAccountName", "spark") 
        .config("spark.kubernetes.authenticate.submission.caCertFile", "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt") 
        .config("spark.kubernetes.authenticate.submission.oauthTokenFile", "/var/run/secrets/kubernetes.io/serviceaccount/token") 
        .config("spark.hadoop.fs.s3a.access.key", "xxxx") 
        .config("spark.hadoop.fs.s3a.secret.key", "xxxx") 
        .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") 
        .config("spark.hadoop.fs.s3a.endpoint", "{}:{}".format("http://10.77.60.45", "8080")) 
        .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") 
        .config("spark.hadoop.fs.s3a.path.style.access", "true") 
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") 
        .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") 
        .config("spark.hadoop.fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A") 
        .config("spark.hadoop.fs.s3a.multiobjectdelete.enable", "false") 
        .config("spark.hadoop.fs.s3a.fast.upload","true") 
        .config("spark.eventLog.dir", "s3a://telco-cdl-bucket/spark-event-log/") 
        .config("spark.executor.instances", "1") 
        .config("spark.executor.cores", "3") 
        .config("spark.executor.memory", "55g") 
        .config("spark.eventLog.enabled", "false") 
        .getOrCreate()

# Read Source Datasets 

musical_data= spark.read.json("s3a://bucket/input-data/Musical_Instruments_data.json")
musical_metadata= spark.read.json("s3a://bucket/input-data/Musical_Instruments_metadata.json")

# Register dataframes as temp tables 

musical_metadata.registerTempTable("musical_metadata")
musical_data.registerTempTable("musical_data")

# Top products based on unique user reviews

top_rated = spark.sql("""
select musical_data.asin as product_id, 
        count(distinct musical_data.reviewerID) as unique_reviewer_id_count, 
        musical_metadata.price as product_price
from musical_data left outer join musical_metadata
on musical_data.asin == musical_metadata.asin
group by product_id, product_price
order by unique_reviewer_id_count desc
limit 10
""")

# Display top 10 products

top_rated.show(truncate=False)

# Save output as csv

top_rated.write.format("csv") 
        .option("header","true") 
        .mode("overwrite") 
        .save("s3a://bucket/output-data/")

# Stop Spark Context to release resources 

spark.stop()

Error while writing Dataframe.

Py4JJavaError: An error occurred while calling o740.save.
: org.apache.hadoop.fs.s3a.AWSBadRequestException: PUT 0-byte object  on output-data/_temporary/0/: com.amazonaws.services.s3.model.AmazonS3Exception: null (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: tx0000000000000000002b3-00604055fa-1ea62-sg; S3 Extended Request ID: 1ea62-sg-sg; Proxy: null), S3 Extended Request ID: 1ea62-sg-sg:InvalidRequest: null (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: tx0000000000000000002b3-00604055fa-1ea62-sg; S3 Extended Request ID: 1ea62-sg-sg; Proxy: null)
    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:224)
    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
    at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
    at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
    at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261)
    at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:236)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.createEmptyObject(S3AFileSystem.java:2786)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.createFakeDirectory(S3AFileSystem.java:2761)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.innerMkdirs(S3AFileSystem.java:2088)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.mkdirs(S3AFileSystem.java:2021)
    at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2275)
    at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:354)
    at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:163)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:168)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:963)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:963)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:399)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:288)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: null (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: tx0000000000000000002b3-00604055fa-1ea62-sg; S3 Extended Request ID: 1ea62-sg-sg; Proxy: null), S3 Extended Request ID: 1ea62-sg-sg
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1828)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1412)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1374)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5212)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5158)
    at com.amazonaws.services.s3.AmazonS3Client.access$300(AmazonS3Client.java:398)
    at com.amazonaws.services.s3.AmazonS3Client$PutObjectStrategy.invokeServiceCall(AmazonS3Client.java:6113)
    at com.amazonaws.services.s3.AmazonS3Client.uploadObject(AmazonS3Client.java:1817)
    at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1777)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.putObjectDirect(S3AFileSystem.java:1545)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$createEmptyObject$13(S3AFileSystem.java:2788)
    at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
    ... 44 more