Skip to content

feat: expose services with listener classes #562

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ All notable changes to this project will be documented in this file.
- Use `--file-log-max-files` (or `FILE_LOG_MAX_FILES`) to limit the number of log files kept.
- Use `--file-log-rotation-period` (or `FILE_LOG_ROTATION_PERIOD`) to configure the frequency of rotation.
- Use `--console-log-format` (or `CONSOLE_LOG_FORMAT`) to set the format to `plain` (default) or `json`.
- Expose history and connect services via listener classes ([#562])

### Changed

Expand All @@ -35,6 +36,7 @@ All notable changes to this project will be documented in this file.
[#554]: https://github.com/stackabletech/spark-k8s-operator/pull/554
[#559]: https://github.com/stackabletech/spark-k8s-operator/pull/559
[#560]: https://github.com/stackabletech/spark-k8s-operator/pull/560
[#562]: https://github.com/stackabletech/spark-k8s-operator/pull/562

## [25.3.0] - 2025-03-21

Expand Down
52 changes: 12 additions & 40 deletions deploy/helm/spark-k8s-operator/crds/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1118,27 +1118,8 @@ spec:
description: A Spark cluster history server component. This resource is managed by the Stackable operator for Apache Spark. Find more information on how to use it in the [operator documentation](https://docs.stackable.tech/home/nightly/spark-k8s/usage-guide/history-server).
properties:
clusterConfig:
default:
listenerClass: cluster-internal
default: {}
description: Global Spark history server configuration that applies to all roles and role groups.
properties:
listenerClass:
default: cluster-internal
description: |-
This field controls which type of Service the Operator creates for this HistoryServer:

* cluster-internal: Use a ClusterIP service

* external-unstable: Use a NodePort service

* external-stable: Use a LoadBalancer service

This is a temporary solution with the goal to keep yaml manifests forward compatible. In the future, this setting will control which ListenerClass <https://docs.stackable.tech/home/stable/listener-operator/listenerclass.html> will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change.
enum:
- cluster-internal
- external-unstable
- external-stable
type: string
type: object
image:
anyOf:
Expand Down Expand Up @@ -1395,6 +1376,9 @@ spec:
cleaner:
nullable: true
type: boolean
listenerClass:
nullable: true
type: string
logging:
default:
containers: {}
Expand Down Expand Up @@ -1641,6 +1625,9 @@ spec:
cleaner:
nullable: true
type: boolean
listenerClass:
nullable: true
type: string
logging:
default:
containers: {}
Expand Down Expand Up @@ -1879,27 +1866,8 @@ spec:
type: string
type: array
clusterConfig:
default:
listenerClass: external-unstable
default: {}
description: Global Spark Connect server configuration that applies to all roles.
properties:
listenerClass:
default: external-unstable
description: |-
This field controls which type of Service the Operator creates for this ConnectServer:

* cluster-internal: Use a ClusterIP service

* external-unstable: Use a NodePort service

* external-stable: Use a LoadBalancer service

This is a temporary solution with the goal to keep yaml manifests forward compatible. In the future, this setting will control which ListenerClass <https://docs.stackable.tech/home/stable/listener-operator/listenerclass.html> will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change.
enum:
- cluster-internal
- external-unstable
- external-stable
type: string
type: object
clusterOperation:
default:
Expand Down Expand Up @@ -2191,6 +2159,10 @@ spec:
config:
default: {}
properties:
listenerClass:
description: This field controls which [ListenerClass](https://docs.stackable.tech/home/nightly/listener-operator/listenerclass.html) is used to expose the Spark services.
nullable: true
type: string
logging:
default:
containers: {}
Expand Down
11 changes: 11 additions & 0 deletions deploy/helm/spark-k8s-operator/templates/roles.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,14 @@ rules:
- bind
resourceNames:
- {{ include "operator.name" . }}-clusterrole
- apiGroups:
- listeners.stackable.tech
resources:
- listeners
verbs:
- get
- list
- watch
- patch
- create
- delete
30 changes: 19 additions & 11 deletions docs/modules/spark-k8s/pages/usage-guide/listenerclass.adoc
Original file line number Diff line number Diff line change
@@ -1,18 +1,26 @@
= Service exposition with ListenerClasses
= Service exposition with listener classes
:description: Configure the Spark connect and history services exposure with listener classes: cluster-internal, external-unstable, or external-stable.

The Spark operator deploys SparkApplications, and does not offer a UI or other API, so no services are exposed.
However, the operator can also deploy HistoryServers, which do offer a UI and API.
The operator deploys a service called `<name>-historyserver` (where `<name>` is the name of the spark application) through which the HistoryServer can be reached.
== History services

This service can have three different types: `cluster-internal`, `external-unstable` and `external-stable`.
Read more about the types in the xref:concepts:service-exposition.adoc[service exposition] documentation at platform level.

This is how the ListenerClass is configured:
The operator deploys a xref:listener-operator:listener.adoc[Listener] for each spark history pod.
The default is to only being accessible from within the Kubernetes cluster, but this can be changed by setting `.spec.nodes.config.listenerClass`:

[source,yaml]
----
apiVersion: spark.stackable.tech/v1alpha1
kind: SparkHistoryServer
metadata:
name: spark-history
spec:
clusterConfig:
listenerClass: cluster-internal # <1>
nodes:
config:
listenerClass: external-unstable # <1>
----
<1> The default `cluster-internal` setting.
<1> Specify one of `external-stable`, `external-unstable`, `cluster-internal` (the default setting is `cluster-internal`).

For the example above, the listener operator creates a service named `spark-history-node-default` where `spark-history` is the name of the SparkHistoryServer, `node` is the service role (the only service role available for history servers) and `default` is the role group.

== Connect services

Connect pods can be exposed using listener classes in exactly tha same fashion as history servers.
58 changes: 33 additions & 25 deletions rust/operator-binary/src/connect/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use stackable_operator::{
},
logging::controller::ReconcilerError,
status::condition::{
compute_conditions, deployment::DeploymentConditionBuilder,
operations::ClusterOperationsConditionBuilder,
compute_conditions, operations::ClusterOperationsConditionBuilder,
statefulset::StatefulSetConditionBuilder,
},
time::Duration,
};
Expand All @@ -29,6 +29,14 @@ use crate::{
#[strum_discriminants(derive(IntoStaticStr))]
#[allow(clippy::enum_variant_names)]
pub enum Error {
#[snafu(display("failed to build spark connect listener"))]
BuildListener { source: server::Error },

#[snafu(display("failed to apply spark connect listener"))]
ApplyListener {
source: stackable_operator::cluster_resources::Error,
},

#[snafu(display("failed to serialize connect properties"))]
SerializeProperties { source: common::Error },

Expand All @@ -50,8 +58,8 @@ pub enum Error {
#[snafu(display("failed to build spark connect server config map for {name}"))]
BuildServerConfigMap { source: server::Error, name: String },

#[snafu(display("failed to build spark connect deployment"))]
BuildServerDeployment { source: server::Error },
#[snafu(display("failed to build spark connect stateful set"))]
BuildServerStatefulSet { source: server::Error },

#[snafu(display("failed to update status of spark connect server {name}"))]
ApplyStatus {
Expand All @@ -62,8 +70,8 @@ pub enum Error {
#[snafu(display("spark connect object has no namespace"))]
ObjectHasNoNamespace,

#[snafu(display("failed to update the connect server deployment"))]
ApplyDeployment {
#[snafu(display("failed to update the connect server stateful set"))]
ApplyStatefulSet {
source: stackable_operator::cluster_resources::Error,
},

Expand Down Expand Up @@ -192,21 +200,9 @@ pub async fn reconcile(
.await
.context(ApplyRoleBindingSnafu)?;

// Expose connect server to the outside world
let service = server::build_service(scs, &resolved_product_image.app_version_label, None)
.context(BuildServiceSnafu)?;
cluster_resources
.add(client, service.clone())
.await
.context(ApplyServiceSnafu)?;

// Headless service used by executors connect back to the driver
let service = server::build_service(
scs,
&resolved_product_image.app_version_label,
Some("None".to_string()),
)
.context(BuildServiceSnafu)?;
let service = server::build_internal_service(scs, &resolved_product_image.app_version_label)
.context(BuildServiceSnafu)?;

cluster_resources
.add(client, service.clone())
Expand Down Expand Up @@ -275,24 +271,36 @@ pub async fn reconcile(
name: scs.name_unchecked(),
})?;

// ========================================
// Server stateful set
let args = server::command_args(&scs.spec.args);
let deployment = server::build_deployment(
let stateful_set = server::build_stateful_set(
scs,
&server_config,
&resolved_product_image,
&service_account,
&server_config_map,
args,
)
.context(BuildServerDeploymentSnafu)?;
.context(BuildServerStatefulSetSnafu)?;

// ========================================
// Server listener
let listener = server::build_listener(scs, &server_config, &resolved_product_image)
.context(BuildListenerSnafu)?;

cluster_resources
.add(client, listener)
.await
.context(ApplyListenerSnafu)?;

let mut ss_cond_builder = DeploymentConditionBuilder::default();
let mut ss_cond_builder = StatefulSetConditionBuilder::default();

ss_cond_builder.add(
cluster_resources
.add(client, deployment)
.add(client, stateful_set)
.await
.context(ApplyDeploymentSnafu)?,
.context(ApplyStatefulSetSnafu)?,
);

cluster_resources
Expand Down
44 changes: 6 additions & 38 deletions rust/operator-binary/src/connect/crd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,21 +106,7 @@ pub mod versioned {

#[derive(Clone, Deserialize, Debug, Default, Eq, JsonSchema, PartialEq, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct SparkConnectServerClusterConfig {
/// This field controls which type of Service the Operator creates for this ConnectServer:
///
/// * cluster-internal: Use a ClusterIP service
///
/// * external-unstable: Use a NodePort service
///
/// * external-stable: Use a LoadBalancer service
///
/// This is a temporary solution with the goal to keep yaml manifests forward compatible.
/// In the future, this setting will control which ListenerClass <https://docs.stackable.tech/home/stable/listener-operator/listenerclass.html>
/// will be used to expose the service, and ListenerClass names will stay the same, allowing for a non-breaking change.
#[serde(default)]
pub listener_class: CurrentlySupportedListenerClasses,
}
pub struct SparkConnectServerClusterConfig {}

#[derive(Clone, Debug, Default, JsonSchema, PartialEq, Fragment)]
#[fragment_attrs(
Expand All @@ -147,6 +133,10 @@ pub mod versioned {
/// This can be shortened by the `maxCertificateLifetime` setting on the SecretClass issuing the TLS certificate.
#[fragment_attrs(serde(default))]
pub requested_secret_lifetime: Option<Duration>,

/// This field controls which [ListenerClass](DOCS_BASE_URL_PLACEHOLDER/listener-operator/listenerclass.html) is used to expose the Spark services.
#[serde(default)]
pub listener_class: String,
}

#[derive(Clone, Debug, Default, JsonSchema, PartialEq, Fragment)]
Expand Down Expand Up @@ -178,29 +168,6 @@ pub mod versioned {
}
}

// TODO: Temporary solution until listener-operator is finished
#[derive(Clone, Debug, Default, Display, Deserialize, Eq, JsonSchema, PartialEq, Serialize)]
#[serde(rename_all = "PascalCase")]
pub(crate) enum CurrentlySupportedListenerClasses {
#[serde(rename = "cluster-internal")]
ClusterInternal,
#[default]
#[serde(rename = "external-unstable")]
ExternalUnstable,
#[serde(rename = "external-stable")]
ExternalStable,
}

impl CurrentlySupportedListenerClasses {
pub fn k8s_service_type(&self) -> String {
match self {
CurrentlySupportedListenerClasses::ClusterInternal => "ClusterIP".to_string(),
CurrentlySupportedListenerClasses::ExternalUnstable => "NodePort".to_string(),
CurrentlySupportedListenerClasses::ExternalStable => "LoadBalancer".to_string(),
}
}
}

#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, Debug, Default, JsonSchema, PartialEq, Fragment)]
#[fragment_attrs(
Expand Down Expand Up @@ -258,6 +225,7 @@ impl v1alpha1::ServerConfig {
},
logging: product_logging::spec::default_logging(),
requested_secret_lifetime: Some(Self::DEFAULT_CONNECT_SECRET_LIFETIME),
listener_class: Some("cluster-internal".into()),
}
}

Expand Down
Loading