Fault-tolerance demo & reconfiguration - CS 591 K1: Data Stream Processing and Analytics Spring 2020
| Boston University 2020 When the JobManager fails all tasks are automatically cancelled. The new JobManager performs the following steps: 1. It requests the storage locations from ZooKeeper to scaling actions, predict their effects, and decide which and when to apply • Allocate new resources, spawn new processes or release unused resources, safely terminate processes • Adjust dataflow scaling actions, predict their effects, and decide which and when to apply • Allocate new resources, spawn new processes or release unused resources, safely terminate processes • Adjust dataflow0 码力 | 41 页 | 4.09 MB | 1 年前3Course introduction - CS 591 K1: Data Stream Processing and Analytics Spring 2020
• know when to use stream processing vs other technology • be able to comprehensively compare features and processing guarantees of streaming systems • be proficient in using Apache Flink and Kafka transfers of €1000 to a "fake account" until either you're out of money or the activity is detected. • Features to detect fraudulent activity like this: • The transaction amount. • The number of recent (e0 码力 | 34 页 | 2.53 MB | 1 年前3Scalable Stream Processing - Spark Streaming and Flink
at which streaming data will be divided into batches. val conf = new SparkConf().setAppName(appName).setMaster(master) val ssc = new StreamingContext(conf, Seconds(1)) ▶ It can also be created from an an existing SparkContext object. val sc = ... // existing SparkContext val ssc = new StreamingContext(sc, Seconds(1)) 10 / 79 StreamingContext ▶ StreamingContext is the main entry point of all Spark at which streaming data will be divided into batches. val conf = new SparkConf().setAppName(appName).setMaster(master) val ssc = new StreamingContext(conf, Seconds(1)) ▶ It can also be created from an0 码力 | 113 页 | 1.22 MB | 1 年前3Streaming in Apache Flink
are either ◦public, or ◦have a default getter and setter Tuple2person = new Tuple2<>("Fred", 35); // zero based index! String name = person.f0; Integer age = person.f1; {}; public Person(String name, Integer age) { … }; } Person person = new Person("Fred Flintstone", 35); Setup • https://training.ververica.com/devEnvSetup.html • Datasets: DataStream rides = env.addSource(new TaxiRideSource(...)); DataStream enrichedNYCRides = rides .filter(new RideCleansing.NYCFilter()) .map(new Enrichment()); enrichedNYCRides 0 码力 | 45 页 | 3.00 MB | 1 年前3Windows and triggers - CS 591 K1: Data Stream Processing and Analytics Spring 2020
val env = StreamExecutionEnvironment.getExecutionEnvironment val sensorData = env.addSource(new SensorSource) val maxTemp = sensorData .map(r => Reading(r.id,r.time,(r.temp-32)*(5 .process(new TemperatureAverager) val avgTemp = sensorData .keyBy(_.id) // shortcut for window.(TumblingEventTimeWindows.of(size)) .timeWindow(Time.seconds(1)) .process(new TemperatureAverager) every 15 minutes .window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(15))) .process(new TemperatureAverager) val slidingAvgTemp = sensorData .keyBy(_.id) // shortcut for window.0 码力 | 35 页 | 444.84 KB | 1 年前3Graph streaming algorithms - CS 591 K1: Data Stream Processing and Analytics Spring 2020
a bitcoin transaction, a packet routed from a source to destination Vertex events: A new product, a new movie, a user ??? Vasiliki Kalavri | Boston University 2020 6 ??? Vasiliki Kalavri | Boston stream: events indicate edge additions or deletions A t+1, the graph is obtained by inserting a new edge or deleting an existing edge (u, v) to E(t+1). If any of u, v do not already exist in V(t) step: For each vertex • choose the min of neighbors’ component IDs and own component ID as the new ID • if the component ID changed since the last iteration, notify neighbors 16 ??? Vasiliki Kalavri0 码力 | 72 页 | 7.77 MB | 1 年前3Streaming languages and operator semantics - CS 591 K1: Data Stream Processing and Analytics Spring 2020
traditional SQL aggregates • A Non-blocking query operator can produce answers incrementally as new input records arrive. • projection, selection, union 14 Vasiliki Kalavri | Boston University 2020 bound and the upper bound advances for every new event • all events since 1/1/2019 • Sliding windows have fixed size but both their bounds advance for new events • last 10 events or event in the last Streaming operators take sequences (streams) as input and return sequences (streams) as output: For each new input tuple in S, G adds zero, one, or several tuples to the output. Let Gj(S) be the cumulative0 码力 | 53 页 | 532.37 KB | 1 年前3Introduction to Apache Flink and Apache Kafka - CS 591 K1: Data Stream Processing and Analytics Spring 2020
val env = StreamExecutionEnvironment.getExecutionEnvironment val sensorData = env.addSource(new SensorSource) val maxTemp = sensorData .map(r => Reading(r.id,r.time,(r.temp-32)*(5.0/9 val env = StreamExecutionEnvironment.getExecutionEnvironment val sensorData = env.addSource(new SensorSource) val maxTemp = sensorData .map(r => Reading(r.id,r.time,(r.temp-32)*(5.0/9 val env = StreamExecutionEnvironment.getExecutionEnvironment val sensorData = env.addSource(new SensorSource) val maxTemp = sensorData .map(r => Reading(r.id,r.time,(r.temp-32)*(5.0/90 码力 | 26 页 | 3.33 MB | 1 年前3State management - CS 591 K1: Data Stream Processing and Analytics Spring 2020
checkpointPath: String = ??? // configure path for checkpoints on the remote filesystem val backend = new RocksDBStateBackend(checkpointPath) // configure the state backend env.setStateBackend(backend) Configuring alerts: DataStream[(String, Double, Double)] = keyedData .flatMap(new TemperatureAlertFunction(1.7)) Using state in Flink 15 KeyedStream State access inside the flatMap def open(parameters: Configuration): Unit = { // create state descriptor val lastTempDescriptor = new ValueStateDescriptor[Double]("lastTemp", classOf[Double]) // obtain the state handle lastTempState0 码力 | 24 页 | 914.13 KB | 1 年前3Notions of time and progress - CS 591 K1: Data Stream Processing and Analytics Spring 2020
Vasiliki Kalavri | Boston University 2020 6 1977 1980 1983 1999 2002 2005 2015 Episode IV: A New Hope Episode V: The Empire Strikes Back Episode VI: Return of the Jedi Episode I: The timestamp override def getCurrentWatermark: Watermark = { // generated watermark with 1 min tolerance new Watermark(maxTs - bound) } override def extractTimestamp(r: Reading, prevTS: Long): Long = { // update extractedTS: Long): Watermark = { if (r.id == "sensor_1") { // emit watermark if reading is from sensor_1 new Watermark(extractedTS - bound) } else { // do not emit a watermark null } } override def extractTimestamp(r:0 码力 | 22 页 | 2.22 MB | 1 年前3
共 19 条
- 1
- 2