Create Amazon Kinesis Data Analytics Application - Tumbling Window
2020年06月13日
TumblingWindow]$ vim src/main/java/com/amazonaws/services/kinesisanalytics/TumblingWindowStreamingJob.java
TumblingWindow]$ cd
~]$ python3.8 stock.py
When attaching Firehose to the output Kinesis Data Stream, below data could be observed in the output in S3 generated by Firehose.
References
Example: Tumbling Window
-
-
amazon-kinesis-data-analytics-java-examples]$ cd TumblingWindow
TumblingWindow]$ mvn package -Dflink.version=1.8.2
[INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 13.247 s [INFO] Finished at: 2020-06-13T12:34:22Z [INFO] ------------------------------------------------------------------------
TumblingWindow]$ vim src/main/java/com/amazonaws/services/kinesisanalytics/TumblingWindowStreamingJob.java
package com.amazonaws.services.kinesisanalytics;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
import org.apache.flink.util.Collector;
import java.util.Properties;
/**
* A Kinesis Data Analytics for Java application to calculate word count for
* records in a Kinesis Data Stream using a tumbling window.
*/
public class TumblingWindowStreamingJob {
private static final String region = "us-west-2";
private static final String inputStreamName = "ExampleInputStream";
private static final String outputStreamName = "ExampleOutputStream";
private static DataStream<String> createSourceFromStaticConfig(
StreamExecutionEnvironment env) {
Properties inputProperties = new Properties();
inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"LATEST");
return env.addSource(new FlinkKinesisConsumer<>(inputStreamName,
new SimpleStringSchema(), inputProperties));
}
private static FlinkKinesisProducer<String> createSinkFromStaticConfig() {
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new
SimpleStringSchema(), outputProperties);
sink.setDefaultStream(outputStreamName);
sink.setDefaultPartition("0");
return sink;
}
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> input = createSourceFromStaticConfig(env);
input.flatMap(new Tokenizer()) // Tokenizer for generating words
.keyBy(0) // Logically partition the stream for each word
.timeWindow(Time.seconds(5)) // Tumbling window definition
.sum(1) // Sum the number of words per partition
.map(value -> value.f0 + "," + value.f1.toString() + "\n")
.addSink(createSinkFromStaticConfig());
env.execute("Word Count");
}
public static final class Tokenizer
implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
String[] tokens = value.toLowerCase().split("\\W+");
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<>(token, 1));
}
}
}
}
}
TumblingWindow]$ cd
~]$ python3.8 stock.py
When attaching Firehose to the output Kinesis Data Stream, below data could be observed in the output in S3 generated by Firehose.
event_time,501 756174,1 748946,1 886957,1 741447,1 733841,1
References
Example: Tumbling Window
-