ОШИБКА DockerEnvironmentFactory: журналы Docker-контейнера xxxxx при попытке запустить конвейер Apache Beam, написанный на Go, с помощью Spark runner

У меня есть конвейер, написанный на Go, который я хочу выполнить с помощью Spark runner, автономный Spark установлен на моем локальном компьютере.

  • Апач Луч 2.56.0

  • Апач Спарк 3.2.2

Я запустил мастер и рабочий Spark из установочного каталога с помощью этих команд.

# for master
./sbin/start-master.sh -h localhost

# for worker
./sbin/start-worker.sh spark://localhost:7077

Затем я запустил луч_spark3_job_server и смонтировал /tmp

docker run -v /tmp:/tmp --net=host apache/beam_spark3_job_server:2.56.0 \
        --spark-master-url=spark://localhost:7077

Теперь, когда запущен проект Go

go run main.go --runner PortableRunner \
        --endpoint localhost:8099 \
        --environment_type LOOPBACK

работает нормально, но environment_type установлено как LOOPBACK.
Итак, если я хочу удалить его и снова запустить скрипт без него (по умолчанию установлено значение DOCKER)

go run main.go --runner PortableRunner \
        --endpoint localhost:8099

с этим я получаю это на консоли

java.lang.IllegalStateException: No container running for id xxxxx

Хотя это отличается от того, что в этой теме Ни один контейнер не работает с идентификатором xxxxxx при запуске примеров Apache Beam Go sdk, потому что использование команды docker run с -v решило проблему с отсутствием файла в /tmp/beam-artifact-staging.

Тем не менее, проблема все еще остается.
Это некоторые фрагменты логов из Спарка

24/07/02 15:21:37 DEBUG DockerEnvironmentFactory: Creating Docker Container with ID 1-1
24/07/02 15:21:39 DEBUG DockerEnvironmentFactory: Created Docker Container with Container ID 1464722e38b1eba5d50a3f4a7f8036c7ae03d524abcb3f0a5e868a73f5b51fc4
24/07/02 15:21:39 INFO GrpcLoggingService: Beam Fn Logging client connected.
24/07/02 15:21:39 DEBUG : Initializing Go harness: /opt/apache/beam/boot --id=1-1 --provision_endpoint=localhost:39005
24/07/02 15:21:39 DEBUG LocalFileSystem: opening file /tmp/beam-artifact-staging/9b228b83e120b5aa87f4ce34788bacdf1c35d2f05311deb8efb494bfbea0ff0b/1-0:go-/tmp/worker-1-1719926483620669554
24/07/02 15:21:40 WARN GrpcLoggingService: Logging client failed unexpectedly.
org.apache.beam.vendor.grpc.v1p60p1.io.grpc.StatusRuntimeException: CANCELLED: client cancelled
    at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Status.asRuntimeException(Status.java:529)
    at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:291)
    at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
    at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
    at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
    at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
    at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:370)
    at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:359)
    at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:910)
    at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
24/07/02 15:21:42 DEBUG AwsRegionProviderChain: Unable to load region from software.amazon.awssdk.regions.providers.InstanceProfileRegionProvider@247a4d99:Unable to contact EC2 metadata service.
24/07/02 15:21:42 DEBUG LocalDiskShuffleMapOutputWriter: Writing shuffle index file for mapId 1 with length 8
24/07/02 15:21:42 DEBUG IndexShuffleBlockResolver: Shuffle index for mapId 1: [0,0,0,0,0,0,0,0]
24/07/02 15:21:42 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 6019 bytes result sent to driver
24/07/02 15:21:42 DEBUG ExecutorMetricsPoller: stageTCMP: (0, 0) -> 1
24/07/02 15:21:44 INFO DockerEnvironmentFactory: Still waiting for startup of environment apache/beam_go_sdk:2.56.0 for worker id 1-1
24/07/02 15:21:44 ERROR DockerEnvironmentFactory: Docker container 1464722e38b1eba5d50a3f4a7f8036c7ae03d524abcb3f0a5e868a73f5b51fc4 logs:
2024/07/02 13:21:39 Provision info:
pipeline_options:{fields:{key:"beam:option:app_name:v1"  value:{string_value:"go-job-1-1719926483620667092"}}  fields:{key:"beam:option:experiments:v1"  value:{list_value:{values:{string_value:"beam_fn_api"}}}}  fields:{key:"beam:option:go_options:v1"  value:{struct_value:{fields:{key:"options"  value:{struct_value:{fields:{key:"endpoint"  value:{string_value:"localhost:8099"}}  fields:{key:"hookOrder"  value:{string_value:"[\"default_remote_logging\"]"}}  fields:{key:"hooks"  value:{string_value:"{\"default_remote_logging\":null}"}}  fields:{key:"job"  value:{string_value:"wordcount"}}  fields:{key:"runner"  value:{string_value:"spark"}}}}}}}}  fields:{key:"beam:option:job_name:v1"  value:{string_value:"go0job0101719926483620667092-root-0702132126-ff3f12ba"}}  fields:{key:"beam:option:options_id:v1"  value:{number_value:2}}  fields:{key:"beam:option:parallelism:v1"  value:{number_value:-1}}  fields:{key:"beam:option:retain_docker_containers:v1"  value:{bool_value:false}}  fields:{key:"beam:option:runner:v1"  value:{null_value:NULL_VALUE}}  fields:{key:"beam:option:spark_master:v1"  value:{string_value:"spark://localhost:7077"}}}  retrieval_token:"go-job-1-1719926483620667092_8d8b0d53-0d18-49dc-908b-a85d0be89cc5"  logging_endpoint:{url:"localhost:36449"}  artifact_endpoint:{url:"localhost:36373"}  control_endpoint:{url:"localhost:43091"}  dependencies:{type_urn:"beam:artifact:type:file:v1"  type_payload:"\n\x84\x01/tmp/beam-artifact-staging/9b228b83e120b5aa87f4ce34788bacdf1c35d2f05311deb8efb494bfbea0ff0b/1-0:go-/tmp/worker-1-1719926483620669554"  role_urn:"beam:artifact:role:go_worker_binary:v1"}  runner_capabilities:"beam:protocol:control_response_elements_embedding:v1"
2024/07/02 13:21:40 Downloaded: /tmp/staged/1-worker-1-1719926483620669554 (sha256: 89580cb558dbc92138c20bdb88f8687d7c96386e9f6d0b717b07b68fe9327476, size: 122860883)
24/07/02 15:21:44 ERROR Executor: Exception in task 7.0 in stage 0.0 (TID 7)
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: No container running for id 1464722e38b1eba5d50a3f4a7f8036c7ae03d524abcb3f0a5e868a73f5b51fc4
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2086)
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012)
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035)
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013)
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:5020)
    at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:458)
    at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:443)
    at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:310)
    at org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)
    at org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:207)
    at org.apache.beam.runners.spark.translation.SparkExecutableStageFunction.call(SparkExecutableStageFunction.java:142)
    at org.apache.beam.runners.spark.translation.SparkExecutableStageFunction.call(SparkExecutableStageFunction.java:81)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitions$1(JavaRDDLike.scala:153)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalStateException: No container running for id 1464722e38b1eba5d50a3f4a7f8036c7ae03d524abcb3f0a5e868a73f5b51fc4
    at org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:137)
    at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:259)
    at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:232)
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3571)
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2313)
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2190)
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2080)
    ... 39 more
    Suppressed: java.io.IOException: Received exit code 1 for command 'docker kill 1464722e38b1eba5d50a3f4a7f8036c7ae03d524abcb3f0a5e868a73f5b51fc4'. stderr: Error response from daemon: cannot kill container: 1464722e38b1eba5d50a3f4a7f8036c7ae03d524abcb3f0a5e868a73f5b51fc4: container 1464722e38b1eba5d50a3f4a7f8036c7ae03d524abcb3f0a5e868a73f5b51fc4 is not running
        at org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:255)
        at org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:181)
        at org.apache.beam.runners.fnexecution.environment.DockerCommand.killContainer(DockerCommand.java:161)
        at org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:161)
        ... 45 more

Самые интересные строки

24/07/02 15:21:44 INFO DockerEnvironmentFactory: Still waiting for startup of environment apache/beam_go_sdk:2.56.0 for worker id 1-1
24/07/02 15:21:44 ERROR DockerEnvironmentFactory: Docker container 1464722e38b1eba5d50a3f4a7f8036c7ae03d524abcb3f0a5e868a73f5b51fc4 logs:

🤔 А знаете ли вы, что...
Go компилируется в машинный код, что делает его очень быстрым.


2
79 746
1

Ответ:

Решено

Похоже, проблема заключалась в том, как я структурировал код в Go, а также в том, как я его написал. Ни Beam, ни Spark, ни Docker в данном случае не были проблемой.