Параллельный конвейер внутри одного задания потока данных

Я хочу запустить два параллельных конвейера внутри одного задания потока данных на GCP. Я уже создал один конвейер, и он отлично работает, но я хочу использовать другой, не создавая еще одно задание.

Я так много искал ответ, но не смог найти примеров кода :(

Это не работает, если я запускаю это так:

pipe1.run();
pipe2.run();

Он дает мне "Уже есть активное имя задания... Если вы хотите отправить второе задание, попробуйте еще раз установить другое имя, используя --jobName"

🤔 А знаете ли вы, что...
Java - это объектно-ориентированный язык программирования.


2
1 478
1

Ответ:

Решено

Вы можете применить другие входные данные к конвейеру, и это приведет к раздельному конвейеру в одном задании. Например.:

public class ExamplePipeline {

public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
    options.setRunner(DirectRunner.class);

    Pipeline pipeline = Pipeline.create(options);

    PCollection<String> linesForPipelineOne = pipeline.apply(Create.of("A1", "B1"));
    PCollection<String> linesToWriteFromPipelineOne = linesForPipelineOne.apply("Pipeline 1 transform",
            ParDo.of(new DoFn<String, String>() {

        @ProcessElement
        public void processElement(ProcessContext c) {
            System.out.println("Pipeline one:" + c.element());
            c.output(c.element() + " extra message.");
        }

    }));
    linesToWriteFromPipelineOne.apply((TextIO.write().to("file.txt")));

    PCollection<String> linesForPipelineTwo = pipeline.apply(Create.of("A2", "B2"));
    linesForPipelineTwo.apply("Pipeline 2 transoform",
            ParDo.of(new DoFn<String, String>() {

        @ProcessElement
        public void processElement(ProcessContext c) {
            System.out.println("Pipeline two:" + c.element());
        }

    }));

    pipeline.run();
}

Как видите, вы можете применить два (или более) отдельных PBegin к конвейеру с несколькими PDone/Sinks. В этом примере "pipeline 1" сбрасывает и записывает вывод в файл, а "pipeline 2" выводит только на экран.

Если вы запустите это с помощью DataflowRunner на GCP, графический интерфейс покажет вам 2 неподключенных «конвейера».