Browse Source

beam test

Fred Damstra (Macbook 2015) 2 years ago
parent
commit
d3a80dc4c0
2 changed files with 50 additions and 15 deletions
  1. 49 14
      Workloads/flink/beam_test.py
  2. 1 1
      Workloads/flink/requirements.txt

+ 49 - 14
Workloads/flink/beam_test.py

@@ -2,13 +2,49 @@
 import apache_beam as beam
 from apache_beam.options.pipeline_options import PipelineOptions
 
-options = PipelineOptions([
-    "--runner=FlinkRunner",
-    "--flink_version=1.14",
-    "--job_endpoint=10.42.42.215:8081",
-#    "--environment_type=LOOPBACK" # loopback for local only?
-    "--environment_type=DOCKER"
-])
+job_server = "3.84.135.13"
+# expansion_service = "localhost:8096"
+# Set up this host to point to 127.0.0.1 in /etc/hosts
+# bootstrap_servers = "host.docker.internal:9092"
+# kafka_consumer_group_id = "kafka_echo"
+# input_topic = "echo-input"
+# output_topic = "echo-output"
+
+
+pipeline_options = {
+    # MUST BE PortableRunner
+    "runner": "PortableRunner",
+    "job_name": "ftd_beam_test",
+    "job_endpoint": f"{job_server}:8099",
+    "artifact_endpoint": f"{job_server}:8098",
+    "environment_type": "EXTERNAL",
+    "environment_config": "localhost:50000",
+    "streaming": True,
+    "parallelism": 2,
+    "experiments": ["use_deprecated_read"],
+    "checkpointing_interval": "60000",
+}
+options = PipelineOptions([], **pipeline_options)
+
+
+# options = PipelineOptions(
+#    [
+#        "--runner=FlinkRunner",
+#        "--flink_version=1.16",
+#        "--job_endpoint=3.95.212.26:8081",
+#        #    "--environment_type=LOOPBACK" # loopback for local only?
+#        #    "--environment_type=DOCKER"
+#    ]
+# )
+# options = PipelineOptions(
+#    [
+#        "--runner=FlinkRunner",
+#        #        "--flink_version=1.16",
+#        "--job_endpoint=3.84.135.13:8099",
+#        #    "--environment_type=LOOPBACK" # loopback for local only?
+#        #    "--environment_type=DOCKER"
+#    ]
+# )
 
 # From another example (https://python.plainenglish.io/apache-beam-flink-cluster-kubernetes-python-a1965f37b7cb)
 #    options = PipelineOptions([
@@ -20,11 +56,10 @@ options = PipelineOptions([
 #    ])
 
 
-
-
 with beam.Pipeline(options=options) as pipeline:
-  elements = (
-          pipeline
-          | "Create elements" >> beam.Create(["Hello", "World", "Fred", "was", "here", "twice", "Fred"])
-          | "Print Elements" >> beam.Map(print)
-          )
+    elements = (
+        pipeline
+        | "Create elements"
+        >> beam.Create(["Hello", "World", "Fred", "was", "here", "twice", "Fred"])
+        | "Print Elements" >> beam.Map(print)
+    )

+ 1 - 1
Workloads/flink/requirements.txt

@@ -1 +1 @@
-apache-beam
+apache-beam==2.48.0