Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support flink command line options #260

Open
jto opened this issue Feb 14, 2022 · 0 comments
Open

Support flink command line options #260

jto opened this issue Feb 14, 2022 · 0 comments
Labels
enhancement New feature or request

Comments

@jto
Copy link
Contributor

jto commented Feb 14, 2022

The flink command line supports a number of flags, like --classpath.
Those flags need to be passed before the jar name, otherwise, flink simply ignore them.

For example the following would work:

flink run \
  --class foo.bar.Baz -\
  -classpath /path/to/a/custom/artifact.jar \
  myapp.jar

But this would not

flink run \
  --class foo.bar.Baz \
  myapp.jar \
  --classpath /path/to/a/custom/artifact.jar

The operator won't let you pass those custom flags bc. there's not way to insert flag before the job jar. see

func getDesiredJob(observed *ObservedClusterState) *batchv1.Job {
var flinkCluster = observed.cluster
var recorded = flinkCluster.Status
var jobSpec = flinkCluster.Spec.Job
var jobStatus = recorded.Components.Job
if jobSpec == nil {
return nil
}
// When the job should be stopped, keep that state unless update is triggered or the job must to be restarted.
if (shouldStopJob(flinkCluster) || jobStatus.IsStopped()) &&
(!shouldUpdateJob(observed) && !jobStatus.ShouldRestart(jobSpec)) {
return nil
}
var clusterSpec = flinkCluster.Spec
var imageSpec = clusterSpec.Image
var serviceAccount = clusterSpec.ServiceAccountName
var jobManagerSpec = clusterSpec.JobManager
var clusterNamespace = flinkCluster.Namespace
var clusterName = flinkCluster.Name
var jobName = getJobName(clusterName)
var jobManagerServiceName = clusterName + "-jobmanager"
var jobManagerAddress = fmt.Sprintf(
"%s:%d", jobManagerServiceName, *jobManagerSpec.Ports.UI)
var podLabels = getClusterLabels(*flinkCluster)
podLabels = mergeLabels(podLabels, jobManagerSpec.PodLabels)
var jobLabels = mergeLabels(podLabels, getRevisionHashLabels(&recorded.Revision))
var jobArgs = []string{"bash", submitJobScriptPath}
jobArgs = append(jobArgs, "--jobmanager", jobManagerAddress)
if jobSpec.ClassName != nil {
jobArgs = append(jobArgs, "--class", *jobSpec.ClassName)
}
var fromSavepoint = convertFromSavepoint(jobSpec, jobStatus, &recorded.Revision)
if fromSavepoint != nil {
jobArgs = append(jobArgs, "--fromSavepoint", *fromSavepoint)
}
if jobSpec.AllowNonRestoredState != nil &&
*jobSpec.AllowNonRestoredState {
jobArgs = append(jobArgs, "--allowNonRestoredState")
}
if parallelism, err := calJobParallelism(flinkCluster); err == nil {
jobArgs = append(jobArgs, "--parallelism", fmt.Sprint(parallelism))
}
if jobSpec.NoLoggingToStdout != nil &&
*jobSpec.NoLoggingToStdout {
jobArgs = append(jobArgs, "--sysoutLogging")
}
if jobSpec.Mode != nil {
switch *jobSpec.Mode {
case v1beta1.JobModeBlocking:
case v1beta1.JobModeDetached:
jobArgs = append(jobArgs, "--detached")
}
}
var securityContext = jobSpec.SecurityContext
var envVars []corev1.EnvVar
if jobSpec.JarFile != nil {
jobArgs = append(jobArgs, getLocalPath(&envVars, "FLINK_JOB_JAR_URI", *jobSpec.JarFile))
}
if jobSpec.PyFile != nil {
jobArgs = append(jobArgs, "--python", getLocalPath(&envVars, "FLINK_JOB_JAR_URI", *jobSpec.PyFile))
}
if jobSpec.PyFiles != nil {
jobArgs = append(jobArgs, "--pyFiles", getLocalPath(&envVars, "FLINK_JOB_PYTHON_FILES_URI", *jobSpec.PyFiles))
}
if jobSpec.PyModule != nil {
jobArgs = append(jobArgs, "--pyModule", *jobSpec.PyModule)
}
envVars = append(envVars,
corev1.EnvVar{
Name: "FLINK_JM_ADDR",
Value: jobManagerAddress,
})
jobArgs = append(jobArgs, jobSpec.Args...)

@jto jto added the enhancement New feature or request label Feb 14, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant