Я хочу запустить два параллельных конвейера внутри одного задания потока данных на GCP. Я уже создал один конвейер, и он отлично работает, но я хочу использовать другой, не создавая еще одно задание.
Я так много искал ответ, но не смог найти примеров кода :(
Это не работает, если я запускаю это так:
pipe1.run();
pipe2.run();
Он дает мне "Уже есть активное имя задания... Если вы хотите отправить второе задание, попробуйте еще раз установить другое имя, используя --jobName
"
🤔 А знаете ли вы, что...
Java - это объектно-ориентированный язык программирования.
Вы можете применить другие входные данные к конвейеру, и это приведет к раздельному конвейеру в одном задании. Например.:
public class ExamplePipeline {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
options.setRunner(DirectRunner.class);
Pipeline pipeline = Pipeline.create(options);
PCollection<String> linesForPipelineOne = pipeline.apply(Create.of("A1", "B1"));
PCollection<String> linesToWriteFromPipelineOne = linesForPipelineOne.apply("Pipeline 1 transform",
ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.println("Pipeline one:" + c.element());
c.output(c.element() + " extra message.");
}
}));
linesToWriteFromPipelineOne.apply((TextIO.write().to("file.txt")));
PCollection<String> linesForPipelineTwo = pipeline.apply(Create.of("A2", "B2"));
linesForPipelineTwo.apply("Pipeline 2 transoform",
ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.println("Pipeline two:" + c.element());
}
}));
pipeline.run();
}
Как видите, вы можете применить два (или более) отдельных PBegin к конвейеру с несколькими PDone/Sinks. В этом примере "pipeline 1"
сбрасывает и записывает вывод в файл, а "pipeline 2"
выводит только на экран.
Если вы запустите это с помощью DataflowRunner
на GCP, графический интерфейс покажет вам 2 неподключенных «конвейера».