Create Amazon Kinesis Data Analytics Application - Tumbling Window
2020年06月13日
TumblingWindow]$ vim src/main/java/com/amazonaws/services/kinesisanalytics/TumblingWindowStreamingJob.java
TumblingWindow]$ cd
~]$ python3.8 stock.py
When attaching Firehose to the output Kinesis Data Stream, below data could be observed in the output in S3 generated by Firehose.
References
Example: Tumbling Window
-
-
amazon-kinesis-data-analytics-java-examples]$ cd TumblingWindow
TumblingWindow]$ mvn package -Dflink.version=1.8.2
[INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 13.247 s [INFO] Finished at: 2020-06-13T12:34:22Z [INFO] ------------------------------------------------------------------------
TumblingWindow]$ vim src/main/java/com/amazonaws/services/kinesisanalytics/TumblingWindowStreamingJob.java
package com.amazonaws.services.kinesisanalytics; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import org.apache.flink.util.Collector; import java.util.Properties; /** * A Kinesis Data Analytics for Java application to calculate word count for * records in a Kinesis Data Stream using a tumbling window. */ public class TumblingWindowStreamingJob { private static final String region = "us-west-2"; private static final String inputStreamName = "ExampleInputStream"; private static final String outputStreamName = "ExampleOutputStream"; private static DataStream<String> createSourceFromStaticConfig( StreamExecutionEnvironment env) { Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static FlinkKinesisProducer<String> createSinkFromStaticConfig() { Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; } public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> input = createSourceFromStaticConfig(env); input.flatMap(new Tokenizer()) // Tokenizer for generating words .keyBy(0) // Logically partition the stream for each word .timeWindow(Time.seconds(5)) // Tumbling window definition .sum(1) // Sum the number of words per partition .map(value -> value.f0 + "," + value.f1.toString() + "\n") .addSink(createSinkFromStaticConfig()); env.execute("Word Count"); } public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) { String[] tokens = value.toLowerCase().split("\\W+"); for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<>(token, 1)); } } } } }
TumblingWindow]$ cd
~]$ python3.8 stock.py
When attaching Firehose to the output Kinesis Data Stream, below data could be observed in the output in S3 generated by Firehose.
event_time,501 756174,1 748946,1 886957,1 741447,1 733841,1
References
Example: Tumbling Window
-