beam_test.py 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. #! /usr/bin/env python3
  2. import apache_beam as beam
  3. from apache_beam.options.pipeline_options import PipelineOptions
  4. job_server = "3.84.135.13"
  5. # expansion_service = "localhost:8096"
  6. # Set up this host to point to 127.0.0.1 in /etc/hosts
  7. # bootstrap_servers = "host.docker.internal:9092"
  8. # kafka_consumer_group_id = "kafka_echo"
  9. # input_topic = "echo-input"
  10. # output_topic = "echo-output"
  11. pipeline_options = {
  12. # MUST BE PortableRunner
  13. "runner": "PortableRunner",
  14. "job_name": "ftd_beam_test",
  15. "job_endpoint": f"{job_server}:8099",
  16. "artifact_endpoint": f"{job_server}:8098",
  17. "environment_type": "EXTERNAL",
  18. "environment_config": "localhost:50000",
  19. "streaming": True,
  20. "parallelism": 2,
  21. "experiments": ["use_deprecated_read"],
  22. "checkpointing_interval": "60000",
  23. }
  24. options = PipelineOptions([], **pipeline_options)
  25. # options = PipelineOptions(
  26. # [
  27. # "--runner=FlinkRunner",
  28. # "--flink_version=1.16",
  29. # "--job_endpoint=3.95.212.26:8081",
  30. # # "--environment_type=LOOPBACK" # loopback for local only?
  31. # # "--environment_type=DOCKER"
  32. # ]
  33. # )
  34. # options = PipelineOptions(
  35. # [
  36. # "--runner=FlinkRunner",
  37. # # "--flink_version=1.16",
  38. # "--job_endpoint=3.84.135.13:8099",
  39. # # "--environment_type=LOOPBACK" # loopback for local only?
  40. # # "--environment_type=DOCKER"
  41. # ]
  42. # )
  43. # From another example (https://python.plainenglish.io/apache-beam-flink-cluster-kubernetes-python-a1965f37b7cb)
  44. # options = PipelineOptions([
  45. # "--runner=FlinkRunner",
  46. # "--flink_version=1.10",
  47. # "--flink_master=localhost:8081",
  48. # "--environment_type=EXTERNAL",
  49. # "--environment_config=localhost:50000"
  50. # ])
  51. with beam.Pipeline(options=options) as pipeline:
  52. elements = (
  53. pipeline
  54. | "Create elements"
  55. >> beam.Create(["Hello", "World", "Fred", "was", "here", "twice", "Fred"])
  56. | "Print Elements" >> beam.Map(print)
  57. )