Project

General

Profile

Actions

Support #49639

open

Getting Error while writing Spark Dataframe to Ceph Storage using Spark 3.0.1 (Hadoop 3.2) / Spark 2.4.5 (Hadoop 2.7)

Added by Prateek Dubey about 3 years ago. Updated about 3 years ago.

Status:
New
Priority:
Normal
Assignee:
-
Target version:
% Done:

0%

Tags:
Reviewed:
Affected Versions:
Pull request ID:

Description

In my organisation we have currently deployed Ceph to replace HDFS to run our AI/ML workloads using Spark on Kubernetes. All the deployment is done On-Premise to process data stored in Ceph using Spark (3.0.1/ 2.4.5).

During my testing with Ceph, I'm able to access Ceph Storage using S3CMD CLI, AWS S3API CLI, Boto3 and also able to read data from Ceph using Spark on Kubernetes. I tested PutObject, GetObject, DeleteObject etc using AWS/ S3 CLI along with Boto3 and everything is working fine. However, I'm getting an error while writing data back to Ceph Storage using Spark. I'm the owner of the Bucket and have FULL_CONTROL perms on the Bucket. Also, I added following Bucket Policy on it to give cephadmin explicit permissions as well.

bucket_policy = {
'Version': '2012-10-17',
'Statement': [{
'Sid': 'AllowCephS3Access',
'Effect': 'Allow',
'Principal': {"AWS": ["arn:aws:iam:::user/cephadmin"]},
'Action': ['s3:*'],
"Resource": [
"arn:aws:s3:::telco-cdl-bucket/*",
"arn:aws:s3:::telco-cdl-bucket"
]
}]
}

Below is the PySpark code which I used to Read and Write data to and from Ceph Storage using Spark 3.0.1/ Hadoop 3.2. I tested same code on Spark 2.4.5/ Hadoop 2.7 and in both cases Read is working fine but not write operation.

from pyspark.sql import SparkSession

spark = SparkSession.builder \
.appName("prateek-pyspark-ceph") \
.config("spark.kubernetes.driver.master", "k8s://https://xxx:6443") \
.config("spark.kubernetes.namespace", "jupyter") \
.config("spark.kubernetes.container.image", "spark-executor-3.0.1") \
.config("spark.kubernetes.container.image.pullPolicy" ,"Always") \
.config("spark.kubernetes.container.image.pullSecrets" ,"gitlab") \
.config("spark.kubernetes.authenticate.driver.serviceAccountName", "spark") \
.config("spark.kubernetes.authenticate.executor.serviceAccountName", "spark") \
.config("spark.kubernetes.authenticate.submission.caCertFile", "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt") \
.config("spark.kubernetes.authenticate.submission.oauthTokenFile", "/var/run/secrets/kubernetes.io/serviceaccount/token") \
.config("spark.hadoop.fs.s3a.access.key", "xxxx") \
.config("spark.hadoop.fs.s3a.secret.key", "xxxx") \
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
.config("spark.hadoop.fs.s3a.endpoint", "{}:{}".format("http://xxxx", "8080")) \
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") \
.config("spark.hadoop.fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A") \
.config("spark.hadoop.fs.s3a.multiobjectdelete.enable", "false") \
.config("spark.hadoop.fs.s3a.fast.upload","true") \
.config("spark.executor.instances", "1") \
.config("spark.executor.cores", "3") \
.config("spark.executor.memory", "55g") \
.config("spark.eventLog.enabled", "false") \
.getOrCreate()

  1. Read Source Datasets
    musical_data= spark.read.json("s3a://ceph-bucket/input-data/Musical_Instruments_data.json")
    musical_metadata= spark.read.json("s3a://ceph-bucket/input-data/Musical_Instruments_metadata.json")
  1. Register dataframes as temp tables
    musical_metadata.registerTempTable("musical_metadata")
    musical_data.registerTempTable("musical_data")
  1. Top products based on unique user reviews
    top_rated = spark.sql("""
    select musical_data.asin as product_id,
    count(distinct musical_data.reviewerID) as unique_reviewer_id_count,
    musical_metadata.price as product_price
    from musical_data left outer join musical_metadata
    on musical_data.asin == musical_metadata.asin
    group by product_id, product_price
    order by unique_reviewer_id_count desc
    limit 10
    """)
  1. Save output as csv
    top_rated.write.format("csv") \
    .option("header","true") \
    .mode("overwrite") \
    .save("s3a://ceph-bucket/output-data/")
  1. Stop Spark Context to release resources
    spark.stop()

Following is the Error I'm getting while writing to Ceph

Report this ad

0

In my organisation I'm currently exploring how we can use Ceph to replace HDFS to run out AI/ML workloads. As part of this initiative we setup a Ceph Cluster and imported it into Kubernetes using Rook.

During my testing with Ceph I was able to access Ceph Storage using S3CMD CLI and also able to read data from Ceph using Spark on Kubernetes. However, I'm getting an error while writing data back to Ceph Storage.

Below is my code and error which I'm getting while writing data back. Hoping someone can help with this.

from pyspark.sql import SparkSession

spark = SparkSession.builder \
.appName("prateek-pyspark-ceph") \
.config("spark.kubernetes.driver.master", "k8s://https://xxx:6443") \
.config("spark.kubernetes.namespace", "jupyter") \
.config("spark.kubernetes.container.image", "spark-executor-3.0.1") \
.config("spark.kubernetes.container.image.pullPolicy" ,"Always") \
.config("spark.kubernetes.container.image.pullSecrets" ,"gcr") \
.config("spark.kubernetes.authenticate.driver.serviceAccountName", "spark") \
.config("spark.kubernetes.authenticate.executor.serviceAccountName", "spark") \
.config("spark.kubernetes.authenticate.submission.caCertFile", "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt") \
.config("spark.kubernetes.authenticate.submission.oauthTokenFile", "/var/run/secrets/kubernetes.io/serviceaccount/token") \
.config("spark.hadoop.fs.s3a.access.key", "xxxx") \
.config("spark.hadoop.fs.s3a.secret.key", "xxxx") \
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
.config("spark.hadoop.fs.s3a.endpoint", "{}:{}".format("http://xxxx", "8080")) \
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") \
.config("spark.hadoop.fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A") \
.config("spark.hadoop.fs.s3a.multiobjectdelete.enable", "false") \
.config("spark.hadoop.fs.s3a.fast.upload","true") \
.config("spark.eventLog.dir", "s3a://bucket/spark-event-log/") \
.config("spark.executor.instances", "1") \
.config("spark.executor.cores", "3") \
.config("spark.executor.memory", "55g") \
.config("spark.eventLog.enabled", "false") \
.getOrCreate()

  1. Read Source Datasets

musical_data= spark.read.json("s3a://bucket/input-data/Musical_Instruments_data.json")
musical_metadata= spark.read.json("s3a://bucket/input-data/Musical_Instruments_metadata.json")

  1. Register dataframes as temp tables

musical_metadata.registerTempTable("musical_metadata")
musical_data.registerTempTable("musical_data")

  1. Top products based on unique user reviews

top_rated = spark.sql("""
select musical_data.asin as product_id,
count(distinct musical_data.reviewerID) as unique_reviewer_id_count,
musical_metadata.price as product_price
from musical_data left outer join musical_metadata
on musical_data.asin == musical_metadata.asin
group by product_id, product_price
order by unique_reviewer_id_count desc
limit 10
""")

  1. Display top 10 products

top_rated.show(truncate=False)

  1. Save output as csv

top_rated.write.format("csv") \
.option("header","true") \
.mode("overwrite") \
.save("s3a://bucket/output-data/")

  1. Stop Spark Context to release resources

spark.stop()
Error while writing Dataframe.

Py4JJavaError: An error occurred while calling o740.save.
: org.apache.hadoop.fs.s3a.AWSBadRequestException: PUT 0-byte object on output-data/_temporary/0/: com.amazonaws.services.s3.model.AmazonS3Exception: null (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: tx0000000000000000002b3-00604055fa-1ea62-sg; S3 Extended Request ID: 1ea62-sg-sg; Proxy: null), S3 Extended Request ID: 1ea62-sg-sg:InvalidRequest: null (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: tx0000000000000000002b3-00604055fa-1ea62-sg; S3 Extended Request ID: 1ea62-sg-sg; Proxy: null)
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:224)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:236)
at org.apache.hadoop.fs.s3a.S3AFileSystem.createEmptyObject(S3AFileSystem.java:2786)
at org.apache.hadoop.fs.s3a.S3AFileSystem.createFakeDirectory(S3AFileSystem.java:2761)
at org.apache.hadoop.fs.s3a.S3AFileSystem.innerMkdirs(S3AFileSystem.java:2088)
at org.apache.hadoop.fs.s3a.S3AFileSystem.mkdirs(S3AFileSystem.java:2021)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2275)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:354)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:163)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:168)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:963)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:963)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:399)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:288)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: null (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: tx0000000000000000002b3-00604055fa-1ea62-sg; S3 Extended Request ID: 1ea62-sg-sg; Proxy: null), S3 Extended Request ID: 1ea62-sg-sg
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1828)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1412)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1374)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5212)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5158)
at com.amazonaws.services.s3.AmazonS3Client.access$300(AmazonS3Client.java:398)
at com.amazonaws.services.s3.AmazonS3Client$PutObjectStrategy.invokeServiceCall(AmazonS3Client.java:6113)
at com.amazonaws.services.s3.AmazonS3Client.uploadObject(AmazonS3Client.java:1817)
at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1777)
at org.apache.hadoop.fs.s3a.S3AFileSystem.putObjectDirect(S3AFileSystem.java:1545)
at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$createEmptyObject$13(S3AFileSystem.java:2788)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
... 44 more

NOTE - Exact same code when tested on AWS S3 worked perfectly fine.

Versions -

Spark 3.0.1, Hadoop 3.2, hadoop-aws-3.2.0.jar, aws-java-sdk-1.11.874.jar, aws-java-sdk-bundle-1.11.874.jar

and

Spark 2.4.5, Hadoop 2.7, hadoop-aws-2.7.3.jar, aws-java-sdk-1.7.4.jar, aws-java-sdk-bundle-1.7.4.jar

I would really appreciate if someone from the Ceph community can help me resolve this issue or guide me in a direction wherein I can know what is going wrong. I have been trying to work on this for a week now and completely going in circles but not able to fix it.

Actions #1

Updated by Prateek Dubey about 3 years ago

Prateek Dubey wrote:

In my organisation we have currently deployed Ceph to replace HDFS to run our AI/ML workloads using Spark on Kubernetes. All the deployment is done On-Premise to process data stored in Ceph using Spark (3.0.1/ 2.4.5).

During my testing with Ceph, I'm able to access Ceph Storage using S3CMD CLI, AWS S3API CLI, Boto3 and also able to read data from Ceph using Spark on Kubernetes. I tested PutObject, GetObject, DeleteObject etc using AWS/ S3 CLI along with Boto3 and everything is working fine. However, I'm getting an error while writing data back to Ceph Storage using Spark. I'm the owner of the Bucket and have FULL_CONTROL perms on the Bucket. Also, I added following Bucket Policy on it to give cephadmin explicit permissions as well.

bucket_policy = {
'Version': '2012-10-17',
'Statement': [{
'Sid': 'AllowCephS3Access',
'Effect': 'Allow',
'Principal': {"AWS": ["arn:aws:iam:::user/cephadmin"]},
'Action': ['s3:*'],
"Resource": [
"arn:aws:s3:::telco-cdl-bucket/*",
"arn:aws:s3:::telco-cdl-bucket"
]
}]
}

Below is the PySpark code which I used to Read and Write data to and from Ceph Storage using Spark 3.0.1/ Hadoop 3.2. I tested same code on Spark 2.4.5/ Hadoop 2.7 and in both cases Read is working fine but not write operation.

from pyspark.sql import SparkSession

spark = SparkSession.builder \
.appName("prateek-pyspark-ceph") \
.config("spark.kubernetes.driver.master", "k8s://https://xxx:6443") \
.config("spark.kubernetes.namespace", "jupyter") \
.config("spark.kubernetes.container.image", "spark-executor-3.0.1") \
.config("spark.kubernetes.container.image.pullPolicy" ,"Always") \
.config("spark.kubernetes.container.image.pullSecrets" ,"gitlab") \
.config("spark.kubernetes.authenticate.driver.serviceAccountName", "spark") \
.config("spark.kubernetes.authenticate.executor.serviceAccountName", "spark") \
.config("spark.kubernetes.authenticate.submission.caCertFile", "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt") \
.config("spark.kubernetes.authenticate.submission.oauthTokenFile", "/var/run/secrets/kubernetes.io/serviceaccount/token") \
.config("spark.hadoop.fs.s3a.access.key", "xxxx") \
.config("spark.hadoop.fs.s3a.secret.key", "xxxx") \
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
.config("spark.hadoop.fs.s3a.endpoint", "{}:{}".format("http://xxxx", "8080")) \
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") \
.config("spark.hadoop.fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A") \
.config("spark.hadoop.fs.s3a.multiobjectdelete.enable", "false") \
.config("spark.hadoop.fs.s3a.fast.upload","true") \
.config("spark.executor.instances", "1") \
.config("spark.executor.cores", "3") \
.config("spark.executor.memory", "55g") \
.config("spark.eventLog.enabled", "false") \
.getOrCreate()

musical_data= spark.read.json("s3a://ceph-bucket/input-data/Musical_Instruments_data.json")
musical_metadata= spark.read.json("s3a://ceph-bucket/input-data/Musical_Instruments_metadata.json")

musical_metadata.registerTempTable("musical_metadata")
musical_data.registerTempTable("musical_data")

top_rated = spark.sql("""
select musical_data.asin as product_id,
count(distinct musical_data.reviewerID) as unique_reviewer_id_count,
musical_metadata.price as product_price
from musical_data left outer join musical_metadata
on musical_data.asin == musical_metadata.asin
group by product_id, product_price
order by unique_reviewer_id_count desc
limit 10
""")

top_rated.write.format("csv") \
.option("header","true") \
.mode("overwrite") \
.save("s3a://ceph-bucket/output-data/")

spark.stop()

Following is the Error I'm getting while writing to Ceph

Report this ad

0

In my organisation I'm currently exploring how we can use Ceph to replace HDFS to run out AI/ML workloads. As part of this initiative we setup a Ceph Cluster and imported it into Kubernetes using Rook.

During my testing with Ceph I was able to access Ceph Storage using S3CMD CLI and also able to read data from Ceph using Spark on Kubernetes. However, I'm getting an error while writing data back to Ceph Storage.

Below is my code and error which I'm getting while writing data back. Hoping someone can help with this.

from pyspark.sql import SparkSession

spark = SparkSession.builder \
.appName("prateek-pyspark-ceph") \
.config("spark.kubernetes.driver.master", "k8s://https://xxx:6443") \
.config("spark.kubernetes.namespace", "jupyter") \
.config("spark.kubernetes.container.image", "spark-executor-3.0.1") \
.config("spark.kubernetes.container.image.pullPolicy" ,"Always") \
.config("spark.kubernetes.container.image.pullSecrets" ,"gcr") \
.config("spark.kubernetes.authenticate.driver.serviceAccountName", "spark") \
.config("spark.kubernetes.authenticate.executor.serviceAccountName", "spark") \
.config("spark.kubernetes.authenticate.submission.caCertFile", "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt") \
.config("spark.kubernetes.authenticate.submission.oauthTokenFile", "/var/run/secrets/kubernetes.io/serviceaccount/token") \
.config("spark.hadoop.fs.s3a.access.key", "xxxx") \
.config("spark.hadoop.fs.s3a.secret.key", "xxxx") \
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
.config("spark.hadoop.fs.s3a.endpoint", "{}:{}".format("http://xxxx", "8080")) \
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") \
.config("spark.hadoop.fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A") \
.config("spark.hadoop.fs.s3a.multiobjectdelete.enable", "false") \
.config("spark.hadoop.fs.s3a.fast.upload","true") \
.config("spark.eventLog.dir", "s3a://bucket/spark-event-log/") \
.config("spark.executor.instances", "1") \
.config("spark.executor.cores", "3") \
.config("spark.executor.memory", "55g") \
.config("spark.eventLog.enabled", "false") \
.getOrCreate()

  1. Read Source Datasets

musical_data= spark.read.json("s3a://bucket/input-data/Musical_Instruments_data.json")
musical_metadata= spark.read.json("s3a://bucket/input-data/Musical_Instruments_metadata.json")

  1. Register dataframes as temp tables

musical_metadata.registerTempTable("musical_metadata")
musical_data.registerTempTable("musical_data")

  1. Top products based on unique user reviews

top_rated = spark.sql("""
select musical_data.asin as product_id,
count(distinct musical_data.reviewerID) as unique_reviewer_id_count,
musical_metadata.price as product_price
from musical_data left outer join musical_metadata
on musical_data.asin == musical_metadata.asin
group by product_id, product_price
order by unique_reviewer_id_count desc
limit 10
""")

  1. Display top 10 products

top_rated.show(truncate=False)

  1. Save output as csv

top_rated.write.format("csv") \
.option("header","true") \
.mode("overwrite") \
.save("s3a://bucket/output-data/")

  1. Stop Spark Context to release resources

spark.stop()
Error while writing Dataframe.

Py4JJavaError: An error occurred while calling o740.save.
: org.apache.hadoop.fs.s3a.AWSBadRequestException: PUT 0-byte object on output-data/_temporary/0/: com.amazonaws.services.s3.model.AmazonS3Exception: null (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: tx0000000000000000002b3-00604055fa-1ea62-sg; S3 Extended Request ID: 1ea62-sg-sg; Proxy: null), S3 Extended Request ID: 1ea62-sg-sg:InvalidRequest: null (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: tx0000000000000000002b3-00604055fa-1ea62-sg; S3 Extended Request ID: 1ea62-sg-sg; Proxy: null)
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:224)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:236)
at org.apache.hadoop.fs.s3a.S3AFileSystem.createEmptyObject(S3AFileSystem.java:2786)
at org.apache.hadoop.fs.s3a.S3AFileSystem.createFakeDirectory(S3AFileSystem.java:2761)
at org.apache.hadoop.fs.s3a.S3AFileSystem.innerMkdirs(S3AFileSystem.java:2088)
at org.apache.hadoop.fs.s3a.S3AFileSystem.mkdirs(S3AFileSystem.java:2021)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2275)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:354)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:163)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:168)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:963)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:963)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:399)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:288)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: null (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: tx0000000000000000002b3-00604055fa-1ea62-sg; S3 Extended Request ID: 1ea62-sg-sg; Proxy: null), S3 Extended Request ID: 1ea62-sg-sg
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1828)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1412)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1374)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5212)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5158)
at com.amazonaws.services.s3.AmazonS3Client.access$300(AmazonS3Client.java:398)
at com.amazonaws.services.s3.AmazonS3Client$PutObjectStrategy.invokeServiceCall(AmazonS3Client.java:6113)
at com.amazonaws.services.s3.AmazonS3Client.uploadObject(AmazonS3Client.java:1817)
at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1777)
at org.apache.hadoop.fs.s3a.S3AFileSystem.putObjectDirect(S3AFileSystem.java:1545)
at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$createEmptyObject$13(S3AFileSystem.java:2788)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
... 44 more

NOTE - Exact same code when tested on AWS S3 worked perfectly fine.

Versions -

Spark 3.0.1, Hadoop 3.2, hadoop-aws-3.2.0.jar, aws-java-sdk-1.11.874.jar, aws-java-sdk-bundle-1.11.874.jar

and

Spark 2.4.5, Hadoop 2.7, hadoop-aws-2.7.3.jar, aws-java-sdk-1.7.4.jar, aws-java-sdk-bundle-1.7.4.jar

I would really appreciate if someone from the Ceph community can help me resolve this issue or guide me in a direction wherein I can know what is going wrong. I have been trying to work on this for a week now and completely going in circles but not able to fix it.

Actions #2

Updated by Prateek Dubey about 3 years ago

Prateek Dubey wrote:

Prateek Dubey wrote:

In my organisation we have currently deployed Ceph to replace HDFS to run our AI/ML workloads using Spark on Kubernetes. All the deployment is done On-Premise to process data stored in Ceph using Spark (3.0.1/ 2.4.5).

During my testing with Ceph, I'm able to access Ceph Storage using S3CMD CLI, AWS S3API CLI, Boto3 and also able to read data from Ceph using Spark on Kubernetes. I tested PutObject, GetObject, DeleteObject etc using AWS/ S3 CLI along with Boto3 and everything is working fine. However, I'm getting an error while writing data back to Ceph Storage using Spark. I'm the owner of the Bucket and have FULL_CONTROL perms on the Bucket. Also, I added following Bucket Policy on it to give cephadmin explicit permissions as well.

bucket_policy = {
'Version': '2012-10-17',
'Statement': [{
'Sid': 'AllowCephS3Access',
'Effect': 'Allow',
'Principal': {"AWS": ["arn:aws:iam:::user/cephadmin"]},
'Action': ['s3:*'],
"Resource": [
"arn:aws:s3:::telco-cdl-bucket/*",
"arn:aws:s3:::telco-cdl-bucket"
]
}]
}

Below is the PySpark code which I used to Read and Write data to and from Ceph Storage using Spark 3.0.1/ Hadoop 3.2. I tested same code on Spark 2.4.5/ Hadoop 2.7 and in both cases Read is working fine but not write operation.

from pyspark.sql import SparkSession

spark = SparkSession.builder \
.appName("prateek-pyspark-ceph") \
.config("spark.kubernetes.driver.master", "k8s://https://xxx:6443") \
.config("spark.kubernetes.namespace", "jupyter") \
.config("spark.kubernetes.container.image", "spark-executor-3.0.1") \
.config("spark.kubernetes.container.image.pullPolicy" ,"Always") \
.config("spark.kubernetes.container.image.pullSecrets" ,"gitlab") \
.config("spark.kubernetes.authenticate.driver.serviceAccountName", "spark") \
.config("spark.kubernetes.authenticate.executor.serviceAccountName", "spark") \
.config("spark.kubernetes.authenticate.submission.caCertFile", "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt") \
.config("spark.kubernetes.authenticate.submission.oauthTokenFile", "/var/run/secrets/kubernetes.io/serviceaccount/token") \
.config("spark.hadoop.fs.s3a.access.key", "xxxx") \
.config("spark.hadoop.fs.s3a.secret.key", "xxxx") \
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
.config("spark.hadoop.fs.s3a.endpoint", "{}:{}".format("http://xxxx", "8080")) \
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") \
.config("spark.hadoop.fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A") \
.config("spark.hadoop.fs.s3a.multiobjectdelete.enable", "false") \
.config("spark.hadoop.fs.s3a.fast.upload","true") \
.config("spark.executor.instances", "1") \
.config("spark.executor.cores", "3") \
.config("spark.executor.memory", "55g") \
.config("spark.eventLog.enabled", "false") \
.getOrCreate()

musical_data= spark.read.json("s3a://ceph-bucket/input-data/Musical_Instruments_data.json")
musical_metadata= spark.read.json("s3a://ceph-bucket/input-data/Musical_Instruments_metadata.json")

musical_metadata.registerTempTable("musical_metadata")
musical_data.registerTempTable("musical_data")

top_rated = spark.sql("""
select musical_data.asin as product_id,
count(distinct musical_data.reviewerID) as unique_reviewer_id_count,
musical_metadata.price as product_price
from musical_data left outer join musical_metadata
on musical_data.asin == musical_metadata.asin
group by product_id, product_price
order by unique_reviewer_id_count desc
limit 10
""")

top_rated.write.format("csv") \
.option("header","true") \
.mode("overwrite") \
.save("s3a://ceph-bucket/output-data/")

spark.stop()

Following is the Error I'm getting while writing to Ceph

Report this ad

0

In my organisation I'm currently exploring how we can use Ceph to replace HDFS to run out AI/ML workloads. As part of this initiative we setup a Ceph Cluster and imported it into Kubernetes using Rook.

During my testing with Ceph I was able to access Ceph Storage using S3CMD CLI and also able to read data from Ceph using Spark on Kubernetes. However, I'm getting an error while writing data back to Ceph Storage.

Below is my code and error which I'm getting while writing data back. Hoping someone can help with this.

from pyspark.sql import SparkSession

spark = SparkSession.builder \
.appName("prateek-pyspark-ceph") \
.config("spark.kubernetes.driver.master", "k8s://https://xxx:6443") \
.config("spark.kubernetes.namespace", "jupyter") \
.config("spark.kubernetes.container.image", "spark-executor-3.0.1") \
.config("spark.kubernetes.container.image.pullPolicy" ,"Always") \
.config("spark.kubernetes.container.image.pullSecrets" ,"gcr") \
.config("spark.kubernetes.authenticate.driver.serviceAccountName", "spark") \
.config("spark.kubernetes.authenticate.executor.serviceAccountName", "spark") \
.config("spark.kubernetes.authenticate.submission.caCertFile", "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt") \
.config("spark.kubernetes.authenticate.submission.oauthTokenFile", "/var/run/secrets/kubernetes.io/serviceaccount/token") \
.config("spark.hadoop.fs.s3a.access.key", "xxxx") \
.config("spark.hadoop.fs.s3a.secret.key", "xxxx") \
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
.config("spark.hadoop.fs.s3a.endpoint", "{}:{}".format("http://xxxx", "8080")) \
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") \
.config("spark.hadoop.fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3A") \
.config("spark.hadoop.fs.s3a.multiobjectdelete.enable", "false") \
.config("spark.hadoop.fs.s3a.fast.upload","true") \
.config("spark.eventLog.dir", "s3a://bucket/spark-event-log/") \
.config("spark.executor.instances", "1") \
.config("spark.executor.cores", "3") \
.config("spark.executor.memory", "55g") \
.config("spark.eventLog.enabled", "false") \
.getOrCreate()

  1. Read Source Datasets

musical_data= spark.read.json("s3a://bucket/input-data/Musical_Instruments_data.json")
musical_metadata= spark.read.json("s3a://bucket/input-data/Musical_Instruments_metadata.json")

  1. Register dataframes as temp tables

musical_metadata.registerTempTable("musical_metadata")
musical_data.registerTempTable("musical_data")

  1. Top products based on unique user reviews

top_rated = spark.sql("""
select musical_data.asin as product_id,
count(distinct musical_data.reviewerID) as unique_reviewer_id_count,
musical_metadata.price as product_price
from musical_data left outer join musical_metadata
on musical_data.asin == musical_metadata.asin
group by product_id, product_price
order by unique_reviewer_id_count desc
limit 10
""")

  1. Display top 10 products

top_rated.show(truncate=False)

  1. Save output as csv

top_rated.write.format("csv") \
.option("header","true") \
.mode("overwrite") \
.save("s3a://bucket/output-data/")

  1. Stop Spark Context to release resources

spark.stop()
Error while writing Dataframe.

Py4JJavaError: An error occurred while calling o740.save.
: org.apache.hadoop.fs.s3a.AWSBadRequestException: PUT 0-byte object on output-data/_temporary/0/: com.amazonaws.services.s3.model.AmazonS3Exception: null (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: tx0000000000000000002b3-00604055fa-1ea62-sg; S3 Extended Request ID: 1ea62-sg-sg; Proxy: null), S3 Extended Request ID: 1ea62-sg-sg:InvalidRequest: null (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: tx0000000000000000002b3-00604055fa-1ea62-sg; S3 Extended Request ID: 1ea62-sg-sg; Proxy: null)
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:224)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:111)
at org.apache.hadoop.fs.s3a.Invoker.lambda$retry$3(Invoker.java:265)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:322)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:261)
at org.apache.hadoop.fs.s3a.Invoker.retry(Invoker.java:236)
at org.apache.hadoop.fs.s3a.S3AFileSystem.createEmptyObject(S3AFileSystem.java:2786)
at org.apache.hadoop.fs.s3a.S3AFileSystem.createFakeDirectory(S3AFileSystem.java:2761)
at org.apache.hadoop.fs.s3a.S3AFileSystem.innerMkdirs(S3AFileSystem.java:2088)
at org.apache.hadoop.fs.s3a.S3AFileSystem.mkdirs(S3AFileSystem.java:2021)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2275)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:354)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:163)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:168)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:178)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:122)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:121)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:963)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:963)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:399)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:288)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: null (Service: Amazon S3; Status Code: 400; Error Code: InvalidRequest; Request ID: tx0000000000000000002b3-00604055fa-1ea62-sg; S3 Extended Request ID: 1ea62-sg-sg; Proxy: null), S3 Extended Request ID: 1ea62-sg-sg
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1828)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1412)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1374)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5212)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5158)
at com.amazonaws.services.s3.AmazonS3Client.access$300(AmazonS3Client.java:398)
at com.amazonaws.services.s3.AmazonS3Client$PutObjectStrategy.invokeServiceCall(AmazonS3Client.java:6113)
at com.amazonaws.services.s3.AmazonS3Client.uploadObject(AmazonS3Client.java:1817)
at com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1777)
at org.apache.hadoop.fs.s3a.S3AFileSystem.putObjectDirect(S3AFileSystem.java:1545)
at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$createEmptyObject$13(S3AFileSystem.java:2788)
at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109)
... 44 more

NOTE - Exact same code when tested on AWS S3 worked perfectly fine.

Versions -

Spark 3.0.1, Hadoop 3.2, hadoop-aws-3.2.0.jar, aws-java-sdk-1.11.874.jar, aws-java-sdk-bundle-1.11.874.jar

and

Spark 2.4.5, Hadoop 2.7, hadoop-aws-2.7.3.jar, aws-java-sdk-1.7.4.jar, aws-java-sdk-bundle-1.7.4.jar

I would really appreciate if someone from the Ceph community can help me resolve this issue or guide me in a direction wherein I can know what is going wrong. I have been trying to work on this for a week now and completely going in circles but not able to fix it.

Actions #3

Updated by Prateek Dubey about 3 years ago

So I tested same configuration and code in Cluster Mode on Spark via Spark Submit and it worked :)

So I think issue is more of like for Client Mode or the Access Credentials are not getting populated on Executors correctly. Can someone provide how it needs to be run for Ceph on Client Mode in Spark. As on AWS S3 it works perfectly fine.

./spark-submit \
--master k8s://https://xxxx:6443 \
--deploy-mode cluster \
--name prateek-ceph-pyspark \
--conf spark.kubernetes.namespace=jupyter \
--conf spark.executor.instances=1 \
--conf spark.executor.cores=3 \
--conf spark.executor.memory=55g \
--conf spark.kubernetes.container.image=spark-executor-3.0.1 \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.kubernetes.authenticate.executor.serviceAccountName=spark \
--conf spark.kubernetes.container.image.pullPolicy=Always \
--conf spark.kubernetes.container.image.pullSecrets=gcr \
--conf spark.kubernetes.authenticate.submission.caCertFile=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt \
--conf spark.kubernetes.authenticate.submission.oauthTokenFile=/var/run/secrets/kubernetes.io/serviceaccount/token \
--conf spark.hadoop.fs.s3a.access.key=<ACCESS_KEY> \
--conf spark.hadoop.fs.s3a.secret.key=<SECRET_KEY> \
--conf spark.hadoop.fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider \
--conf spark.hadoop.fs.s3a.endpoint=http://xxxx:8080 \
--conf spark.hadoop.fs.s3a.connection.ssl.enabled=false \
--conf spark.hadoop.fs.s3a.path.style.access=true \
--conf spark.eventLog.enabled=false \
s3a://ceph-bucket/scripts/Ceph_PySpark.py

Actions #4

Updated by Prateek Dubey about 3 years ago

Actions

Also available in: Atom PDF