#! /usr/bin/env python3 import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions job_server = "3.84.135.13" # expansion_service = "localhost:8096" # Set up this host to point to 127.0.0.1 in /etc/hosts # bootstrap_servers = "host.docker.internal:9092" # kafka_consumer_group_id = "kafka_echo" # input_topic = "echo-input" # output_topic = "echo-output" pipeline_options = { # MUST BE PortableRunner "runner": "PortableRunner", "job_name": "ftd_beam_test", "job_endpoint": f"{job_server}:8099", "artifact_endpoint": f"{job_server}:8098", "environment_type": "EXTERNAL", "environment_config": "localhost:50000", "streaming": True, "parallelism": 2, "experiments": ["use_deprecated_read"], "checkpointing_interval": "60000", } options = PipelineOptions([], **pipeline_options) # options = PipelineOptions( # [ # "--runner=FlinkRunner", # "--flink_version=1.16", # "--job_endpoint=3.95.212.26:8081", # # "--environment_type=LOOPBACK" # loopback for local only? # # "--environment_type=DOCKER" # ] # ) # options = PipelineOptions( # [ # "--runner=FlinkRunner", # # "--flink_version=1.16", # "--job_endpoint=3.84.135.13:8099", # # "--environment_type=LOOPBACK" # loopback for local only? # # "--environment_type=DOCKER" # ] # ) # From another example (https://python.plainenglish.io/apache-beam-flink-cluster-kubernetes-python-a1965f37b7cb) # options = PipelineOptions([ # "--runner=FlinkRunner", # "--flink_version=1.10", # "--flink_master=localhost:8081", # "--environment_type=EXTERNAL", # "--environment_config=localhost:50000" # ]) with beam.Pipeline(options=options) as pipeline: elements = ( pipeline | "Create elements" >> beam.Create(["Hello", "World", "Fred", "was", "here", "twice", "Fred"]) | "Print Elements" >> beam.Map(print) )