Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/modules/spark-k8s/examples/example-spark-connect.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ spec:
productVersion: "3.5.8" # <2>
pullPolicy: IfNotPresent
args:
- "--package org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1" # <3>
- "--packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.11.0" # <3>
server:
podOverrides:
spec:
Expand Down
11 changes: 5 additions & 6 deletions docs/modules/spark-k8s/pages/usage-guide/job-dependencies.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,13 @@ metadata:
spec:
sparkConf:
spark.sql.extensions: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.catalog.spark_catalog: org.apache.iceberg.spark.SparkSessionCatalog
spark.sql.catalog.spark_catalog.type: hive
spark.sql.catalog.local: org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.local.type: hadoop
spark.sql.catalog.local.warehouse: /tmp/warehouse
spark.sql.catalog.lakehouse: org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.lakehouse.type: hive
spark.sql.catalog.lakehouse.uri: thrift://hive-iceberg-metastore:9083
spark.sql.defaultCatalog: lakehouse
deps:
packages:
- org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1 # <1>
- org.apache.iceberg:iceberg-spark-runtime-4.1_2.13:1.11.0 # <1>
...
----

Expand Down
14 changes: 7 additions & 7 deletions rust/operator-binary/src/connect/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ pub(crate) fn build_stateful_set(

#[allow(clippy::result_large_err)]
pub(crate) fn command_args(user_args: &[String]) -> Vec<String> {
let mut command = vec![formatdoc! { "
let mut command = formatdoc! { "
containerdebug --output={VOLUME_MOUNT_PATH_LOG}/containerdebug-state.json --loop &

cp {VOLUME_MOUNT_PATH_CONFIG}/{SPARK_DEFAULTS_FILE_NAME} /tmp/spark.properties
Expand All @@ -441,13 +441,13 @@ pub(crate) fn command_args(user_args: &[String]) -> Vec<String> {
/stackable/spark/sbin/start-connect-server.sh \\
--deploy-mode client \\
--master k8s://https://${{KUBERNETES_SERVICE_HOST}}:${{KUBERNETES_SERVICE_PORT_HTTPS}} \\
--properties-file /tmp/spark.properties
" }];
--properties-file /tmp/spark.properties"};

// User provided command line arguments
command.extend_from_slice(user_args);

vec![command.join(" ")]
// Append user-provided command line arguments as continuation lines.
for user_arg in user_args {
command.push_str(&format!(" \\\n{user_arg}"));
}
vec![command]
}

#[allow(clippy::result_large_err)]
Expand Down
2 changes: 2 additions & 0 deletions tests/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ releases:
operatorVersion: 0.0.0-dev
zookeeper:
operatorVersion: 0.0.0-dev
hive:
operatorVersion: 0.0.0-dev
hdfs:
operatorVersion: 0.0.0-dev
hbase:
Expand Down
4 changes: 2 additions & 2 deletions tests/templates/kuttl/iceberg/10-deploy-spark-app.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ spec:
deps:
packages:
{% if test_scenario['values']['spark-iceberg'].startswith("4") %}
- org.apache.iceberg:iceberg-spark-runtime-{{ ".".join(test_scenario['values']['spark-iceberg'].split('.')[:2]) }}_2.13:1.10.1
- org.apache.iceberg:iceberg-spark-runtime-{{ ".".join(test_scenario['values']['spark-iceberg'].split('.')[:2]) }}_2.13:{{ test_scenario['values']['iceberg-latest'] }}
{% else %}
- org.apache.iceberg:iceberg-spark-runtime-{{ ".".join(test_scenario['values']['spark-iceberg'].split('.')[:2]) }}_2.12:1.10.1
- org.apache.iceberg:iceberg-spark-runtime-{{ ".".join(test_scenario['values']['spark-iceberg'].split('.')[:2]) }}_2.12:{{ test_scenario['values']['iceberg-latest'] }}
{% endif %}
volumes:
- name: script
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,30 @@ metadata:
stringData:
accessKey: spark
secretKey: sparkspark
---
apiVersion: s3.stackable.tech/v1alpha1
kind: S3Connection
metadata:
name: minio
spec:
host: minio
port: 9000
accessStyle: Path
credentials:
secretClass: minio-credentials-class
{% if test_scenario['values']['s3-use-tls'] == 'true' %}
tls:
verification:
server:
caCert:
secretClass: minio-tls-ca
{% endif %}
---
apiVersion: s3.stackable.tech/v1alpha1
kind: S3Bucket
metadata:
name: ingest-bucket
spec:
bucketName: ingest-bucket
connection:
reference: minio
12 changes: 12 additions & 0 deletions tests/templates/kuttl/spark-connect/06-assert.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
apiVersion: kuttl.dev/v1beta1
kind: TestAssert
timeout: 600
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: postgresql-hive
status:
readyReplicas: 1
replicas: 1
12 changes: 12 additions & 0 deletions tests/templates/kuttl/spark-connect/06-install-hive-postgres.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
apiVersion: kuttl.dev/v1beta1
kind: TestStep
timeout: 300
commands:
- script: >-
helm upgrade postgresql-hive
--install
--version=12.5.6
--namespace $NAMESPACE
-f 06_helm-bitnami-postgresql-values.yaml
--repo https://charts.bitnami.com/bitnami postgresql
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
---
global:
security:
allowInsecureImages: true # needed starting with Chart version 16.3.0 if modifying images

image:
repository: bitnamilegacy/postgresql

volumePermissions:
enabled: false
image:
repository: bitnamilegacy/os-shell
securityContext:
runAsUser: auto

metrics:
image:
repository: bitnamilegacy/postgres-exporter

primary:
extendedConfiguration: |
password_encryption=md5
podSecurityContext:
{% if test_scenario['values']['openshift'] == 'true' %}
enabled: false
{% else %}
enabled: true
{% endif %}
containerSecurityContext:
enabled: false
resources:
requests:
memory: "512Mi"
cpu: "512m"
limits:
memory: "512Mi"
cpu: "1"

auth:
username: hive
password: hive
database: hive
12 changes: 12 additions & 0 deletions tests/templates/kuttl/spark-connect/07-assert.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
apiVersion: kuttl.dev/v1beta1
kind: TestAssert
timeout: 900
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: hive-metastore-default
status:
readyReplicas: 1
replicas: 1
6 changes: 6 additions & 0 deletions tests/templates/kuttl/spark-connect/07-install-hive.yaml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
apiVersion: kuttl.dev/v1beta1
kind: TestStep
commands:
# We need to replace $NAMESPACE (by KUTTL)
- script: envsubst '$NAMESPACE' < 07_hive.yaml | kubectl apply -n $NAMESPACE -f -
41 changes: 41 additions & 0 deletions tests/templates/kuttl/spark-connect/07_hive.yaml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
---
apiVersion: hive.stackable.tech/v1alpha1
kind: HiveCluster
metadata:
name: hive
spec:
image:
{% if test_scenario['values']['hive-iceberg'].find(",") > 0 %}
custom: "{{ test_scenario['values']['hive-iceberg'].split(',')[1] }}"
productVersion: "{{ test_scenario['values']['hive-iceberg'].split(',')[0] }}"
{% else %}
productVersion: "{{ test_scenario['values']['hive-iceberg'] }}"
{% endif %}
pullPolicy: IfNotPresent
clusterConfig:
metadataDatabase:
postgresql:
host: postgresql-hive
database: hive
credentialsSecretName: postgres-credentials
s3:
reference: minio
{% if lookup('env', 'VECTOR_AGGREGATOR') %}
vectorAggregatorConfigMapName: vector-aggregator-discovery
{% endif %}
metastore:
config:
logging:
enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }}
roleGroups:
default:
replicas: 1
---
apiVersion: v1
kind: Secret
metadata:
name: postgres-credentials
type: Opaque
stringData:
username: hive
password: hive
Original file line number Diff line number Diff line change
@@ -1,31 +1,4 @@
---
apiVersion: s3.stackable.tech/v1alpha1
kind: S3Connection
metadata:
name: s3-connection
spec:
host: minio
port: 9000
accessStyle: Path
credentials:
secretClass: minio-credentials-class
{% if test_scenario['values']['s3-use-tls'] == 'true' %}
tls:
verification:
server:
caCert:
secretClass: minio-tls-ca
{% endif %}
---
apiVersion: s3.stackable.tech/v1alpha1
kind: S3Bucket
metadata:
name: ingest-bucket
spec:
bucketName: ingest-bucket
connection:
reference: s3-connection
---
apiVersion: v1
kind: ConfigMap
metadata:
Expand Down Expand Up @@ -61,12 +34,18 @@ spec:
pullPolicy: IfNotPresent
{% if lookup('env', 'VECTOR_AGGREGATOR') %}
vectorAggregatorConfigMapName: vector-aggregator-discovery
{% endif %}
args:
{% if test_scenario['values']['spark-connect'].startswith("4") %}
- "--packages org.apache.iceberg:iceberg-spark-runtime-{{ ".".join(test_scenario['values']['spark-connect'].split('.')[:2]) }}_2.13:{{ test_scenario['values']['iceberg-latest'] }}"
{% else %}
- "--packages org.apache.iceberg:iceberg-spark-runtime-{{ ".".join(test_scenario['values']['spark-connect'].split('.')[:2]) }}_2.12:{{ test_scenario['values']['iceberg-latest'] }}"
{% endif %}
connectors:
s3buckets:
- reference: ingest-bucket
s3connection:
reference: s3-connection
reference: minio
server:
jvmArgumentOverrides:
add:
Expand All @@ -82,12 +61,14 @@ spec:
configMap: spark-connect-log-config
configOverrides:
spark-defaults.conf:
spark.sql.extensions: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.jars.ivy: /tmp/ivy2
executor:
configOverrides:
spark-defaults.conf:
spark.executor.instances: "1"
spark.executor.memoryOverhead: "1m"
spark.sql.extensions: org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
config:
logging:
enableVectorAgent: {{ lookup('env', 'VECTOR_AGGREGATOR') | length > 0 }}
Expand Down
1 change: 1 addition & 0 deletions tests/templates/kuttl/spark-connect/13-assert.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ commands:
spark.kubernetes.executor.request.cores=1
spark.kubernetes.namespace=__NAMESPACE__
spark.metrics.conf=/stackable/spark/conf/metrics.properties
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.ui.prometheus.enabled=true
template.yaml: |
metadata:
Expand Down
14 changes: 14 additions & 0 deletions tests/templates/kuttl/spark-connect/20-run-connect-client.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ data:
spark = (
SparkSession.builder.appName("S3BucketsExample")
.remote(remote)
# Must be (and already is) set on the SparkConnectServer
# .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkSessionCatalog")
.config("spark.sql.catalog.lakehouse", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.lakehouse.type", "hive")
.config("spark.sql.catalog.lakehouse.uri", "thrift://hive-metastore:9083")
.config("spark.sql.defaultCatalog", "lakehouse")
.getOrCreate()
)

Expand Down Expand Up @@ -74,6 +81,13 @@ data:
print(f"Writing statistics to {stats_bucket}")
stats.write.mode("overwrite").parquet(stats_bucket)

print(f"Writing and reading Iceberg data")
spark.sql("CREATE SCHEMA IF NOT EXISTS lakehouse.test LOCATION 's3a://lakehouse/test/'")
sdf.writeTo("lakehouse.test.test").using("iceberg").createOrReplace()
data = spark.read.table("lakehouse.test.test")

data.show()

spark.stop()
---
apiVersion: batch/v1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ provisioning:
buckets:
- name: ingest-bucket
- name: stats-bucket
- name: lakehouse
usersExistingSecrets:
- minio-users
resources:
Expand Down
13 changes: 11 additions & 2 deletions tests/test-definition.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,19 @@ dimensions:
# No delta-lake release with support for Spark 4.1 yet
# - 4.1.1
# - 3.5.6,oci.stackable.tech/sandbox/spark-k8s:3.5.6-stackable0.0.0-dev
# Note: We keep this list, as new Spark versions are only supported delayed by Iceberg
- name: spark-iceberg
values:
- 3.5.7
- 3.5.8
- 4.0.1
# No iceberg release with support for Spark 4.1 yet
# - 4.1.1
- 4.1.1
- name: iceberg-latest
values:
- 1.11.0
- name: hive-iceberg
values:
- 4.0.0
- name: spark-connect
values:
- 3.5.7
Expand Down Expand Up @@ -112,6 +118,7 @@ tests:
- name: iceberg
dimensions:
- spark-iceberg
- iceberg-latest
- openshift
- name: delta-lake
dimensions:
Expand All @@ -133,6 +140,8 @@ tests:
- name: spark-connect
dimensions:
- spark-connect
- iceberg-latest
- hive-iceberg
- openshift
- s3-use-tls

Expand Down
Loading