1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465 |
- #! /usr/bin/env python3
- import apache_beam as beam
- from apache_beam.options.pipeline_options import PipelineOptions
- job_server = "3.84.135.13"
- # expansion_service = "localhost:8096"
- # Set up this host to point to 127.0.0.1 in /etc/hosts
- # bootstrap_servers = "host.docker.internal:9092"
- # kafka_consumer_group_id = "kafka_echo"
- # input_topic = "echo-input"
- # output_topic = "echo-output"
- pipeline_options = {
- # MUST BE PortableRunner
- "runner": "PortableRunner",
- "job_name": "ftd_beam_test",
- "job_endpoint": f"{job_server}:8099",
- "artifact_endpoint": f"{job_server}:8098",
- "environment_type": "EXTERNAL",
- "environment_config": "localhost:50000",
- "streaming": True,
- "parallelism": 2,
- "experiments": ["use_deprecated_read"],
- "checkpointing_interval": "60000",
- }
- options = PipelineOptions([], **pipeline_options)
- # options = PipelineOptions(
- # [
- # "--runner=FlinkRunner",
- # "--flink_version=1.16",
- # "--job_endpoint=3.95.212.26:8081",
- # # "--environment_type=LOOPBACK" # loopback for local only?
- # # "--environment_type=DOCKER"
- # ]
- # )
- # options = PipelineOptions(
- # [
- # "--runner=FlinkRunner",
- # # "--flink_version=1.16",
- # "--job_endpoint=3.84.135.13:8099",
- # # "--environment_type=LOOPBACK" # loopback for local only?
- # # "--environment_type=DOCKER"
- # ]
- # )
- # From another example (https://python.plainenglish.io/apache-beam-flink-cluster-kubernetes-python-a1965f37b7cb)
- # options = PipelineOptions([
- # "--runner=FlinkRunner",
- # "--flink_version=1.10",
- # "--flink_master=localhost:8081",
- # "--environment_type=EXTERNAL",
- # "--environment_config=localhost:50000"
- # ])
- with beam.Pipeline(options=options) as pipeline:
- elements = (
- pipeline
- | "Create elements"
- >> beam.Create(["Hello", "World", "Fred", "was", "here", "twice", "Fred"])
- | "Print Elements" >> beam.Map(print)
- )
|