-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathStanfordOnline CSX0002 -- Mining Massive Datasets.Rmd
3918 lines (3645 loc) · 209 KB
/
StanfordOnline CSX0002 -- Mining Massive Datasets.Rmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
---
title: "StanfordOnline CSX0002 -- Mining Massive Datasets"
author: "John HHU"
date: "2022-12-30"
output: html_document
---
```{r setup, include=FALSE}
knitr::opts_chunk$set(echo = TRUE)
```
## R Markdown
This is an R Markdown document. Markdown is a simple formatting syntax for authoring HTML, PDF, and MS Word documents. For more details on using R Markdown see <http://rmarkdown.rstudio.com>.
When you click the **Knit** button a document will be generated that includes both content as well as the output of any embedded R code chunks within the document. You can embed an R code chunk like this:
```{r cars}
summary(cars)
```
## Including Plots
You can also embed plots, for example:
```{r pressure, echo=FALSE}
plot(pressure)
```
Note that the `echo = FALSE` parameter was added to the code chunk to prevent printing of the R code that generated the plot.
## Course / Welcome / Basic Information About This MOOC
# README


# Pre-Course Survey


## Course / Module 1: MapReduce / Outline of Module 1
# README

http://www.mmds.org/
http://infolab.stanford.edu/~ullman/mmds/book0n.pdf
## Course / Module 1: MapReduce / MapReduce
# 1. Distributed File Systems (15:50)
Welcome to Mining Massive Datasets.
I'm Anand Rajaraman and today's topic is Map-Reduce.
In the last few years Map-Reduce has emerged as a leading paradigm for
mining really massive data sets.
But before we get into Map-Reduce proper, let's spend a few minutes trying to
understand why we need Map-Reduce in the first place.
Let's start with the basics.
Now we're all familiar with the basic computational model of CPU and
memory, right?
The algorithm runs on the CPU, and accesses data that's in memory.
Now we may need to bring the data in from disk into memory, but
once the data is in memory, fits in there fully.
So you don't need to access disk again, and
the algorithm just runs in the data that's on memory.
Now there's a familiar model that we use to implement all kinds of algorithms, and
machined learning, and statistics.
And pretty much everything else.
All right?
Now, what happened to the data is so
big, that it can't all fit in memory at the same time.
That's where data mining comes in.
And classical data mining algorithms.
Look at the disk in addition to looking at CPU and memory.
So the data's on disk,
you can only bring in a portion of the data into memory at a time.
And you can process it in batches, and you know, write back results to disk.
And this is the realm of classical data mining algorithms.
But sometimes even this is not sufficient.
Let's look at an example.
So think about Google, crawling and indexing the web, right?
Let's say, google has crawled 10 billion web pages.
And let's further say, that the average size of a web page is 20 KB.
Now, these are representative numbers from real life.
Now if you take ten billion webpages, each of 20 KB,
you have, total data set size of 200 TB.
Now, when you have 200 TB, let's assume that they're using
the classical computational model, classical data mining model.
And all this data is stored on a single disk, and
we have read tend to be processed inside a CPU.
Now the fundamental limitation here is the bandwidth,
the data bandwidth between the disk and the CPU.
The data has to be read from the disk into the CPU, and
the disk read bandwidth for most modern SATA disk representative number.
Is around 50MB a second.
So, so we can read data at 50MB a second.
How long does it take to read 200TB at 50MB a second?
Can do some simple math, and
the answer is 4 million seconds which is more than 46 days.
Remember, this is an awfully long time, and
is just the time to read the data into memory.
To do something useful with the data, it's going to take even longer.
Right, so clearly this is unacceptable.
You can't take four to six days just to read the data.
So you need a better solution.
Now the obvious thing that you think of is that it can split the data into chunks.
And you can have multiple disks and CPUs.
you, you stripe the data across multiple disks.
And you can read it, and, and process it in parallel in multiple CPUs.
That will cut down, this time by a lot.
For example, if you had a 1,000 disks and CPUs, in four thousa-,
4 million seconds.
And we were completely in parallel, in 4 million seconds, you could do the job in,
4 million by 1,000, which is 4,000 seconds.
And that's just about an hour which is, which is very acceptable time.
Right? So
this is the fundamental idea behind the idea of cluster computing.
Right? And this is,
this tiered architecture that has emerged for
cluster computing is something like this.
You have the racks consisting of commodity Linux nodes.
As you go with commodity Linux nodes because they are very cheap.
And you can, you can buy thousands and thousands of them and, and rack them up.
you, you have many of these racks.
Each rack has 16 to 64 of these commodity Linux nodes and
these nodes are connected by a switch.
and, the, the, the switch in a rack is typically a gigabit switch.
So there's 1 Gbps bandwidth between any pair of nodes in rack.
Of course 16 to 64 nodes is not sufficient.
So you have multiple racks, and all the,
the racks themselves are connected by backbone switches.
And the backbones is,
is a higher bandwidth switch can do two to ten gigabits between racks.
Right? So so we have 16 to 64 nodes in a rack.
And then you, you rack up multiple racks, and, and you get a data center.
So this is the standard classical architecture that has emerged over
the last few years.
For you know, for storing and mining very large data sets.
Now once you have this kind of cluster this doesn't solve the problem completely.
Because cluster computing comes with it's own challenges.
But before we get there, let's get us, you know, ideal of the scale, right?
In 2011 somebody estimated that Google had a million machines,
million nodes like this.
In stacked up you know, is, is somewhat like this.
So, so it gives, so that gives you a sense of the scale of modern data centers and,
and, and clusters, right?
So here's, here's a picture.
This is what, it looks like inside a data center.
So the, the, what you see there is, is the back up racks, and
you can see the connections, between, between the racks.
Now, once you have such a big cluster,
you actually have to do computations on the cluster.
Right?
And clustered computing comes with its own, challenges.
The first and the most major challenge is that nodes can fail.
Right?
Now a single, node doesn't fail that often.
Right? If you,
if you just connect, the next node and
let it stay up, it can probably stay up for, three years without failing.
Three years is about a 1,000 days.
So that's, you know, once in a 1,000 days failure isn't such a big deal.
But now imagine that you have a 1,000 servers in a cluster.
And in your, and if you assume that these, servers fail, independent of each other.
You're going to get approximately one failure a day.
Which is, still isn't such a big deal.
You can probably deal with it.
But now imagine something on the scale of Google which has a million servers,
in its cluster.
So if you have a million servers, you're going to get a 1,000 failures per day.
Now a 1,000 failures per day is a lot and
you need some kind of infrastructure to deal with that kind of failure rate.
Your failures on that scale introduce two kinds of problems.
The first problem is that if, you know, if nodes are going to fail and
you're going to store your data on these nodes.
How do you keep the data and store persistently?
What does this mean?
Persistence means that once you store the data,
you're guaranteed you can read it again.
But if the node in which you stored the data fails, then you can't read the data.
You might even lose the data.
So how do you keep the data stored persistently if like,
these nodes can fail.
Now the second problem is is is one of availability.
So, let's say you're running one of the computations, and this computation is, a,
you know, analyzing massive amounts of data.
And it's chugging through the computation and
it's going, you know, run half way through the computation.
And, you know, at this critical point, a couple of nodes fail, right?
And that node had data that is necessary for the computation.
Now how we deal with this problem.
Now in the first place you may have to go back and
restart the computation all over again.
But if you restart it now and, and, and
the computation turns again when the computation is running.
So kind of need an infrastructure that can hide these kinds of node failures and
let the computation go to go to completion even if nodes fail.
The second challenge of cluster computing is that
the network itself can become a bottleneck.
Now remember, there is this 1 Gbps network bandwidth.
That is available between individual nodes in a rack and
a smaller bandwidth that's available between individual racks.
Though if you have 10 TB of data, and you have to move it
across a 1 Gbps network connection, that takes approximately a day.
You can do the math and figure that out.
You know a complex computation might need to move a lot of data, and
that can slow the computation down.
So you need a framework that you know, doesn't move data around so
much while it's doing computation.
The third problem is that distributed programming can be really really hard.
Even sophisticated programmers find it hard to write distributed programs
correctly and avoid race conditions and various kinds of complications.
So here's a simple problem that hides most of the complexity of
distributed programming.
And, and makes it easy to write you know,
algorithms that can mine very massive data sets.
So we look at three problems that you know that we face when,
when we're dealing with cluster computing.
And, Map-Reduce addresses all three of these challenges.
Right? First of all,
the first problem that we saw was that, was one of persistence and
availability of nodes can fade.
The Map-Reduce model addresses this problem by storing data redundantly on
multiple nodes.
The same data is stored on multiple nodes so that even if you lose one of
those nodes, the data is still available on another node.
The second problem that we saw was one of network bottlenecks.
And this happens when you move around data a lot.
What the Map-Reduce model does is it moves the computation close to the data.
And avoids copying data around the network.
And this minimizes the network bottle neck problem.
And thirdly, the Map-Reduce model also provides a very
simple programming model that hides the complexity of all the online magic.
So let's look at each of these pieces in turn.
The first piece is the redundant storage infrastructure.
Now redundant storage is provided by what's called a distributed file system.
Now distributed file system is a file system that stores data you know,
across a cluster, but stores each piece of data multiple times.
So, the distributed file system provides a global file namespace.
It provides redundancy and availability.
There are multiple implementations of distributed file systems.
Google's GFS is or Google File System, or GFS is one example.
Hadoop's HDFS is another example.
And these are the two most popular distributed file systems out there.
Our typical usage pattern that these distributed file systems are optimized for
is huge files.
That are in the 100s to, of GB to TB.
But the, even though the files are really huge,
the data is very rarely updated in place.
Right, once, once data is written you know it's, it's very, very often.
But when it's updated, it's updated through appends.
It's never updated in place.
And for example let, let, imagine the Google scenario once again.
When Google encounters a new webpage it, it adds the webpage to a depository.
Doesn't ever go and
update the content of the webpage that it already has crawled, right?
So a typical usage pattern consists of writing the data once,
reading it multiple times and appending to it occasionally.
Lets go into the hood of a distributed file system to see how it actually works.
Data is kept in chunks that are spread across machines.
So if you take any file, the file is divided into chunks, and
these chunks are spread across multiple machines.
So the machines themselves are called chunk servers in this context.
So here's, here's an example.
There are multiple multiple chunks servers.
Chunk server 1, 2, 3, and 4.
And here's the file 1.
And file 1 is divided into six chunks in this case, C0, C1, C2, C3, C4 and C5.
And these chunks as you can see four of the chunks happen to be on Chunk server 1.
One of them is on Chunks server 2 and, one of them is on Chunks server 3.
Now this is not sufficient.
You actually have to store multiple copies of each of these chunks and so
we replicate these chunks so here copy, here is a copy of C1.
On Chunk server 2, a copy of C2 in Chunk server 3, and so on.
So each chunk, in this case is replicated twice.
And if you notice carefully you'll see that replicas of
a chunk are never on the same chunk server.
They're always on different chunks of, so
C1 has one replica on Chunk server 1 and one on Chunk server 2.
C0 has one on Chunk server 1, and one on Chunk server N, and so on.
And here is here is another file, D.
D has two chunks, D0 and D1.
And that's replicated twice.
And so and so that's stored on different chunks server [INAUDIBLE].
Now so, so you serve you serve from chunk files and
store them on, on these, on these chunk servers.
Now we turn some of the chunk servers, also act as compute servers.
And when, whenever your computation has to access data.
That computation is actually scheduled on the chunk server that
actually contains the data.
This way you avoid moving data to where the computation needs to run,
but instead you move the computation to where the data is.
And that's how you put a wide under the city data movement in the system.
This isn't clear when you look at look at some examples.
So the sum of this, each file is split into contiguous chunks.
And the chunks are typically 16 to 64 MB in in size.
On each chunk is replicated,
in our example we saw each chunk replicated twice.
But it could be 2x or 3x replication.
3x is the most common.
And we saw that the chunks were actually kept on different chunk servers.
But, but when you replicate 3x, you know, the system usually makes an effort.
To keep at least one replica in a entirely different rack if possible and
why do we do that?
We do that because it's you know,
the most common scenario is that a single node can fail.
But it's also possible that the switch on a rack can fail, and
when the switch on a rack fails, the entire rack becomes inaccessible.
And then if you have all the chunks for a, for in all the replicas of a chunk in
one rack then that whole chunk can become inaccessible.
So if you keep replicas of a chunk on different racks then even if
a switch fails then it can still access that chunk.
Right so the system tries to make sure that,
that the replicas of a chunk are actually kept on different racks.
The second component of a distributed file system is, is a master node.
Now the master node is also known as the, it's called a master node in
the Google file system, it's a called a Name Node in Hadoop's HDFS.
But the master node stores metadata about where the files are stored.
And for
example, if my you know, it'll know that file one is divided into six chunks.
And here is, here are the locations of each of the six chunks, and
here are the locations of the replicas.
And the master node itself may be replicated because otherwise it
might become a single point of failure.
The final component of a distributed file system is a client library.
Now, when the, when a client, or, or an algorithm that needs to
access the data tries to access a file it goes through the client library.
The client library talks to the master and
finds the chunk servers that actually store the chunks.
And once that's done the client is directly connected to the chunk servers.
Where it can access the data without going through the master nodes.
So the data access actually happens in peer-to-peer fashion without going
through the master node
# 2. The MapReduce Computational Model (22:04)
Welcome back to Mining of Massive Datasets.
We're going to continue our lecture on MapReduce, and
take a look on the MapReduce computational model.
So before we look at the actual MapReduce Programming Model,
let's do a warm up task.
Now imagine you have a huge text document you know maybe tera, terabytes long and
you want to count, the number of times each distinct word appears in the file.
For example, we want to find out that the word the appears 10 million times and
the word you know apple appears 433 times.
Right?
And some sample applications of this kind of toy example in real
life are you know if you have a big depth of a log and you want to find out,
how often each URL is accessed that could be a sample application.
Or it maybe building terms, such as text for a search engine.
Right? So, but for
now let's just imagine that we have this one big file.
That's a huge text document and
our task is to count, the number of times each distinct word appears in that file.
So, let's look at two cases,
the first case is that the file itself is too large for memory.
Because remember we said it's a, it's a, big, big file.
But imagine that there are, is few enough words in it so
that all the word count pairs actually fit in memory, right?
How do you solve the problem in this case?
Well it turns out, that in this case a very simple approach works.
You can just build a, a Hash Table.
I'll, build the, the index by word.
And and the Hash Table for each word will of course,
will restore the count, of the number of times, that word appears.
So you the first time you see a word you initialize you know,
you add an entry to the Hash Table, with that word, and set the count to 1.
And every subsequent time you see the word, you, you increment the count by one.
And you, you make a single sweep through the file.
And at the end of that, you have the word count pairs for
every unique word that appears in the file.
So this is a simple program, that all of us have written, you know,
many many times in some context or the other.
Now, let's make it a little bit more complicated.
Let's let's imagine that even the word, count pairs don't fit in memory.
Right, the file's too big it doesn't fit in memory, but, there's so
many words, distinct words in the file that even,
you can't even hold all the distinct words in memory.
Right? Now how do
you go about solving the problem in this case?
Well you can try to write some kind of complicated code but you know,
I'm lazy so I like to use Unix file system commands to do this.
And so here's how I would go about doing this
So this is a a Unix command line way of, of doing this.
You know here the the command you know words is,
is, is a little script that goes through doc.txt which is the,
which is the big text file.
And it outputs the words in it one per line.
And once once those words are output I can pipe them to to a sort.
And the sort sorts the you know, sorts the output of that.
And once you sort it
all of the all occurrences of the same word come together.
And once you do that you can pipe it to another little handy utility called uniq
and one of the one of the nifty features of uniq is the, my, is the dash c option.
And when you do uniq dash c
what uniq dash c does is it takes a run of the occurrence of the same word.
And then just counts the occurrences of the same word.
So the output of this is going to be word count pairs, right?
So and you know I, I'm sure many of you have done something like this.
And if you've done something like this,
you've actually done something that's like MapReduce.
Right? So this case actually captures the essence
of MapReduce.
And the nice thing about this kind of implementation is that it's, it's very,
very naturally paralle, light,
parallelizable as we'll see in a, in a moment.
So so let's look at an old view of MapReduce using this example, right?
So the the first step that we did.
What we, we took the document which was our input.
And we wrote a script called words, that output one word to a line, right?
And this is what's called a Map function in in, in, in MapReduce.
The Map function scans the input file record-at-a-time.
And for each record, it, it pulls out something that you care about.
In this case, it was words.
and, and the thing that the you output for
each record you can, you, you cannot one or multiple things for each records.
And the things that you output, are called keys, okay?
The second step is is to group by key.
And this is the what the, the sort step was doing.
It grouped all the keys with the same value together.
Right?
And the the third step the is the, the unique minus c step.
That's the reduce piece of MapReduce.
And once the reducer looks at all the key you know, all
the keys with the same value, and then it, then it ru, runs some kind of function.
In this case it counted the number of times the each key occurred but, but
it could be something much more complicated.
And once it does that kind of analysis it, it has an answer which,
which it then writes up.
Okay, so this is MapReduce in a, in a nutshell.
Now the, the outline of this computation actually stays the stame, same for
any MapReduce computation.
What changes is it that it change the Map function,
the Reduce function, to the fit the problem that you're actually solving.
Right?
In this case for the word count the Map and the Reduce function were quite simple.
In some other problems the Map and
the Reduce functions might be more complicated.
Here's here's, here's another way of looking at it.
You start with a, with a bunch of key value pairs.
And so here's k k v k stands for key and v stands for value.
And the, the, the, the, the, the Map step.
Takes the key-value pairs and maps them to intermediate key-value pairs.
Okay?
So for example, you run the Map on the first key-value pair pair here at k v and
it it actually outputs two intermediate key-value pairs.
And the, the intermediate key-value pairs need not have the same key,
as input key value-pairs.
They could be different keys.
And there could be multiple of them.
And the values although they look the same here, they, they both say v,
the values could be different as well.
And and notice in this case we started with the one input key-value pair,
and the Map function produced multiple intermediate key-value pairs.
So there can be zero, one, or
multiple intermediate key-value pairs, for each, input key-value pair.
Now let's do it again, for the second key-value pair.
Let's apply the Map function and
it turns out that in this case we have the one key-value pair in the,
the in the intermediate key-value pair, in the output.
And so on. So,
so, we, we run through the entire input file.
Apply the Map function to each input record.
And create intermediate key-value pairs.
Now the next step, is to take these intermediate key-value pairs,
and group them by key.
Right? So,
all the intermediate key-value pairs that have the same key, are grouped together.
So it turns out that there are three values.
With the, with the first key, two values for the second key and so on.
And they all get grouped together, and this is done by sorting by key and
then by grouping together the value of, you know the values for the same key.
And these are all different values,
although I use the same same symbol v here.
Now, once you have once you have these key value groups then the final step is
the reducer.
The reducer takes a look at a,
a single a single key-value group as input and.
It produces produces an output that has the sa, you know,
that has the same key but it combines the the, the, the,
the values or the values for a given key, into a single value.
For example, it could add up all the values.
In the, or, or it could or it could multiply them, or
it could do, it could take the average.
Or it can do something more complicated.
But with all of the values for a given key.
And finally you, the output, it outputs a single value for the key.
Right? And so, when you,
when you apply the reducer to the second key-value group.
You get, you get another output and so on.
And once you apply the reducer to all the intermediate key-value groups.
You get the final output.
So more formally the input to MapReduce is a set of key-value pairs.
And the programmer has to specify two methods.
The first method is a Map method.
And the Map method takes an input key-value pair.
And produces an int, an set of intermediate key-value pair, zero or
more intermediate key-value pairs.
and, there is one Map call, for every input key-value pairs.
The Reduce function, takes an intermediate key-value group the intermediate key-value
group consists of a key, and a set of values for that key.
and, the output can consist of one, zero, one, or
multiple key-value pairs once again.
The key is the same as the as the input key but the value is, is,
is is obtained by combining, the input values in some manner.
For example, you might add up the you know, add up the input values and
that could be the output v double prime here.
So let's look at the the word count example and
run that through the MapReduce process again.
Here's our big document.
And I hope you can see the text of this you know, the document but
it doesn't matter, you can see that there are words in there.
And so we're going to take this big document.
And we're going to take the Map function that's provided by the programmer.
The Map function reads the input, and produces a, produces a set of
key-value pairs, and the key-value pairs in this case are going to be the key.
Each word is going to be a key, and the value is going to be the number 1.
Right?
so, for example, the word the and 1 crew and
1 and so on, and the word the appears again.
And so there, there's another the, 1 here and so on.
So these are the intermediate key-value pairs,
that are produced by the Map function.
[SOUND] Now the next step is the group by key step which
collects together all pairs with the same key.
So we can see that the, there are two tuples two intermediate tuples with the,
with the key crew and then those are collected together here.
There's one with you know,
with the word space, there are three with the word the, and so on.
And they're all sorted and collected together.
In this yeah, in, in this place here.
And the, the final step is the Reduce step.
The Reduce, the Reduce step collects together all the values so
the Reduce step adds, adds together the 2, 1 from crew.
and, and figures out that there are two you know,
two occurrences of the word crew.
Space has 1.
There are 3 tuples with the 1 for the there all added together.
And the output is 3, and so on.
Right, so this is a schematic, of the the MapReduce word counting example.
now, of course this, this whole example doesn't run on a single node.
The data is actually distributed across multiple input nodes.
So let's take that into account.
And see here's here's the data.
The data's actually divided here into, into multiple nodes.
Let's say the, the red the, the,
the first portion of, of the file is it's chunk one, and it's on one node.
The second portion of the file here is chunk two, which is on a different node.
The third portion is chunk three, and the fourth portion is chunk four, and
each of these is on a different node.
Now the Map tasks are going to be run on each of these four different nodes.
There going to be a Map task that's run on chunk one that just looks at this portion,
the first portion of the file.
Map task is run on chunk two that,
that, that just looks at the second portion of the file and so on.
And the the outputs of those Map tasks will therefore be produced,
on on four different nodes.
li, like so so here are the, here are the first chunk of Map output.
The second chunk of Map output, which is on another node.
The third chunk of Map output, which is on a third node.
And the fourth chunk of Map output, which is on yet another node.
Right.
Now the output of the of, of the Map functions,
are therefore spread across multiple nodes.
And what the system then does, is that it it,
it copies the, the Map outputs, onto a single node.
And then so
you can see the data from all these four nodes flowing into this single node here.
And once the data has, has flowed to the single node,
it can then sort it by key and then do the final radial step.
Now it's a little bit trickier than this unfortunately.
Because you know, you may not want to use you know,
to, to move all the data from all the Map nodes,
going to be a lot of it, into a single Reduce node, and sort it there.
That might be a lot of you know, a lot of sorting.
So in practice you use multiple Reduce nodes as well.
And you, you know, when you run a MapReduce job, you can say you know,
you can tell the system to use a certain number of Reduce nodes.
Let's say you tell the system in this case, to use three Reduce nodes.
So if you use three reduce nodes then the then
the MapReduce system is smart enough to split the the, the,
the output of the Map into, into three, three into three Reduce nodes.
And it makes sure, that for any given key in this case the,
all instances of the, regardless of which Map node they start out from,
always end up at the same Reduce node, right?
So all instances of the, whether it started from Map node one or
Map node two ended up at Reduce node two, in this case.
And all instances of the word crew regardless of whether they started from
Map node one or Map node four, ended up at Reduce node one.
And this is done by using a hash function, right?
So the system uses a hash function that hashes each Map key and
determines a single Reduce node to shift that tuple two.
And this ensures that all tuples with the same key,
end up with the same Reduce node.
And once once tuples end up at a Reduce node, they get sorted as before.
in, on each Reduce node and and, and
the result is created now, on multiple Reduce nodes.
For example the result for crew is now on is now on Reduce node one.
The result for the is now on, on the Reduce node two and the result for
shuttle and recently are on Reduce node three.
So the final result is actually now spread across three nodes in the system.
Which is perfectly fine because you're dealing with a distributed file system,
which know, knows that your file is spread across three nodes of the system.
So you can still access it as a single file in your client.
And the system knows to access the data from those three three independent nodes.
One final point before we move on from the slide is that
all this magic in the MapReduce magic is implemented to use
as far as possible, only sequential scans of disk as opposed to a random access is.
If you think a little bit carefully, what all the steps that I mentioned
about how the Map function is applied on the input file record by record.
How the sorting is done and so on.
A moment's thought will make it apparent that you can actually implement,
all of this by using only sequential reads of disk, and
never using random accesses of disk.
Now this is super important because sequential reads are much,
much more efficient than random accesses to disk.
If you don't learn your basics of of database systems, it takes much,
much longer to do random seeks.
Than to do a single sequential axis of a file.
And that's why the, the MapReduce, the whole MapReduce system,
is built around doing only sequential reads of files and never random accesses.
So here is the actual pseudocode for for the word count using MapReduce.
Remember, the programmer is required to provide two functions, a Map function,
a Reduce function.
And this is the the Map function right here.
The Map function takes a key and a value and its output has to be int,
an intermi, a set of intermediate key-value pairs.
Now the key in this case is,
is a document name and the value is the text of the document.
And the Map the Map function itself is very simple in this case.
It scans the the input document.
And for each word in the input document [INAUDIBLE] the input document.
For each word in the input document it emits that word and the number 1.
So, so it's, it's a tuple, whose key is the, is the word.
And whose value is the number 1.
And here's the reduced function.
The reduced function, remember, takes a key and a set of values.
The set of values all correspond to the same key and in this case,
they just iterate through all the values and and, and sums them up.
And the output has the same key and the value is the, is the sum.
We looked at a very simple example of bullet count using MapReduce.
Now let's look at a couple more examples.
Here's here's here's another example.
Suppose we have a large web corpus that we've called and, for
each and we have a metadata file for a, for [INAUDIBLE] and
each record in the metadata file lo, looks like this.
It has a URL the size of the file,
the date and then various other pieces of data.
Now the problem, is for each host we want to find the total number of bytes.
And not for each URL, but for each host.
Remember, there can be multiple many URLs with the same host name,
in the crawl and you want to find the number of bytes associated with each host,
not with each URL, right?
Clearly the the number of bytes associated with the host,
is just the sum of the number of bytes associated with all the URLs for a,
for the host and this is very easy to implement in in, in MapReduce.
The mapper in this case, the Map function just looks at each record and
it looks at the URL of, of the record and outputs the hostname of the URL.
and, and, and the size, right?
And the the Reduce function just sums the sizes for each host, right?
And at the end of it, you will have this the, the, the size of each host.
Here's another example.
Let's say you're building a language model by year.
You have a large collection of documents.
And you want to build a language model and and this language model for
some reason requires the count of every 5-word sequence.
Every unique 5-word sequence that occurs in a large corpus of document.
Earlier we looked at accounting, each unique word.
This example ask for each 5-word sequence.
It turns out that the solution is not very different.
the, just the Map function differs.
The Map function extracts you know,
goes through each document and outputs every 5-word sequence in the document.
And the the Reduce function just combines those counts and
adds them up and then you have the output.
So I hope these simple examples illustrate how MapReduce works.
In the next section, we are going to understand how the underlying system,
actually implements some of the magic that makes MapReduce work.
# 3. Scheduling and Data Flow (12:43)
Welcome back to Mining of Massive Datasets.
In, the previous section will be studied the Map-Reduce model and
how to solve some simple problems using Map-Reduce.
In this section, we're going to go under the hood of a Map-Reduce system and
understand how it actually works.
Just to refresh your memory a Map-Reduce system
has simple Map-Reduce system has three steps.
In the Map step, you take a Big a document which is
divided into chunks and you run a Map process on each chunk.
And the map process go through each record in that chunk and
it outputs an intermediate key value pairs for each vector in that, in that chunk.
In the second set step which is a group by step you group by key.
You you bring together all the values for, for the same key.
And in the third step, is a reduce step.
You apply a reducer to each intermediate key value pair set.
And you create a final output.
Now, here's a schematic of how it actually works in in a distributed system.
The previous schematic was how it worked in a, in a centralized system.
In a distributed system, you actually, have multiple nodes and map and
reduced tasks are running in pattern on multiple nodes.
So the here are the few chunks of the file, input file might be on on node 1.
Few chunks on node 2 and a few chunks on node 3.
And you have map tasks running on, on each of those nodes.
And and producing producing it to be intermediate key value pairs on each of
those nodes.
And once the,
once the intermediate key value pairs are produced, the underlying system
the Map-Reduce system uses a partitioning function which is just a hash function.
So the the the Map-Reduce system
applies a hash function to each intermediate key value.
And the has function will tell the Map-Reduce system which,
reduce node to send that key value pair to.
Right, this ensures that all all the, the same key values,
whether they are map task 1, 2, or 3 end up being sent to the same reduce task.
Right? So, in this case the key key 4.
Regardless of where it started from, whether at 1, 2, or 3.
Always end up at reduce task 1.
And the key, key 1 always ends up at reduce task 2.
Now, once once the reduce task has a reduce task has
received input from all from all the map tasks.
All the map tasks have completed, then you can start the reduced tasks.
And the, the reduced tasks first job is,
is to sort, it's input, and group it together by key.
And so in this case, there are three values associated with the key key, key 4,
they're all grouped together.
And once that is done, the reduce task then, works the reduce function which is
provided by the programmer on each each such group and creates the final output.
Okay.
So remember, the programmer provides two functions, Map and Reduce, and
specifies the input file.
The Map-Reduce environment take, has to take care of a bunch of things.
It takes care of Partitioning the input data.
Scheduling the program's execution on a set of machines.
Figuring out where the map tasks run, where the reduce tasks run, and so on.
It performs a gr, the intermediate group by step.
And while all this is going on some nodes may fail.
And the environment make sure that the node failures are hidden
from the from the program.
And finally the Map-Reduced Environment also Manages all
the required inter-machine communication.
[SOUND] So, we're going to take,
take a look at exactly how what's, what's going on in a.
So, let's look at the data flow that's associated with with, with map reduce.
Now the the input and
the final output of a Map-Reduced program are stored on the distributed file system.
And the scheduler tries to schedule the map task
close to the physical storage location of the import data.
What that means is that recall the input data is, is a file.
And the file is divided into chunks.
And there are replicas of the chunks on different chunk servers.
The Map-Reduce system try to schedule each
map task on a chunk server that holds a copy of the corresponding chunk.
So, there's no actual copy.
A data copy associated with the map step of the Map-Reduce program.
Now, the intermediate results are, are at least not stored in the distributed file
system but stored in the local file system of the map and reduce workers.
what, what are intermediate results?
Intermediate results, intermediate results could be the output of a map step.
An intermediate result could be something that,
that limited why, why in the process of computing the reduce.
Now why, why are such debated results not stored in the distributed file system?
It turns out that there's some overhead to storing data in
the distributed file system.
Remember there are multiple replicas of the data that need to be made.
And so there's a lot of copying.
And network shuffling involved in,
in storing new data in the distributed file system.
So, whenever possible, intermediate results are actually stored in
the local file system of the Map and Reduced workers,
ended up being stored in the distributed file system to avoid more network traffic.
And finally, as you'll see in future examples the output
of a Map-Reduce task is often being the input to another Map-Reduce task.
Now, the master node takes care of all the coordination aspects of a Map-Reduce job.
The master node keeps, you know, associates a task status with each task.
A task to see the map tasker reduce task.
And each task has has a status flag.
And the status flag can either be idle, in progress, or completed.
The master schedules idle tasks whenever workers become available.
Whenever, there is a free a node that is tha, that's available for, for
scheduling tasks.
The master goes through it's queue of idle tasks, and schedules an idle task on that,
on that worker.
When the, when a map task completes, it sends the the master the location and
sizes of it's the R intermediate files that it, that creates.
Now, why, R intermediate files?
There's one intermediate file that's created for each reducer.
Because the data, the output of the mapper has to be shipped to each of the reducers,
depending on the, on the key value.
And so there R intermediate files, one for each reducer.
So, whenever, a map task completes, it let it's, it's,
it's stores the R intermediate files.
On it's local file system,
and it let's the master know what the names of those files are.
The master pushes this inf, information to the reducers.
Once the reducers know that all the mappers map tasks are completed,
then they copy the intermediate file from each of the map tasks.
And then they can proceed with their work.
Now, the master also per,
periodically pings the workers, to detect whether a worker has failed.
And if a worker has failed, the master has to do something.
And we're going to, see what that something is.
If a map worker fails, then the, all the map tasks that were scheduled.
On that on that map worker may have failed.
So, the the tricky thing is that the output of a map task is written to
the local file system of the, of the map worker.
So, if a map worker fails, then the node fails.
Then all intermediate output created by all the map tasks that have
ran on that worker, are lost.
And so the, what the master does, is that it resets to idle,
the status of every task that was either completed or in progress on that worker.
Right, and so all those tasks need to be, eventually be done, and
they will eventually be rescheduled on other workers in the course.
If a reduced worker fails on the other hand,
only the in progress tasks are set to idle.
The tasks that are actually been completed by the reduced worker,
don't need to be set to idle.
Because, the output of the reduced worker is a final output, and
it's written to the distribute file system.
And not to the local file system of the reduced worker.
Since, the output is written to the distributed file system.
The output is not lost even if the reduce worker fails.
So, only in-progress tasks need to be set to idle.
While completed tasks don't need to be redone.
Right? And so, the and
once again the Idle reduce tasks will be restarted on other workers eventually.
What happens if the master fails?
If the master node fails, then the map reduce tas, task is aborted.
The client is notified, and
the client can then do something like restarting the map reduce task.
So, this is the one scenario where the task will have to be
restarted from scratch.
Because, the master is typically not applicated in the Map-Reduce system.
So, you might think that, this is a big deal, that that the, the master
failure means the the map-reduce task is aborted, and the task has to be restarted.
But remember, node failures are actually, rather rare.
A node fails actually recall once every three years, or once every 1,000 days.
And the master is, is a single node, and therefore, the chance of the master
failing is actually quite, you know, it, it, it's quite an uncommon occurrence.
the, the, the problem that you have with if,
you have a multiple workers associated in, in a map reduce task.
It's much more likely that, one of many workers failed,
rather than the master failing.
So, the final question to think about is,
how many map and how many reduced jobs do we need?
[NOISE] Supposed you know, they're both throughout M map tasks and R reduce tasks.
Our goal is to determine M and R.
The, this is part of the input that given to the map reduce system to let it
know how many tasks tasks it needs to schedule.
The Rule of thumb is to make M much larger than the number of nodes in the cluster.
[][You might think, that it's sufficient how one map task per node to the cluster.]
[][But, in fact, it the rule of thumb is to have one map task per DFS chunk.]
[][The reason for this is simple.]
Imagine, that there is one map task per node in the cluster and
during you know during processing the node fails.
If a node fails then that map task needs to be rescheduled.
On another node in, in the cluster when it becomes available.
Now in, some, since all the other nodes are processing, you know,
one of the map tasks has to, one of those nodes has to complete before this map task
can be scheduled on that node and so, the entire computation is slowed down.
By the time it takes to com, you know, complete this map task.
The failed redo the failed map task.
[][So, if instead of one map task on a given node, there are many small map tasks on]
[][a given node, and that node fails, then those map tasks can then be spread across]
[][all the available nodes and so the entire task will complete much faster.]
On the other hand, the number produces R is usually smaller than M and
is usually even smaller than the total number of nodes in the system.
And this because the the output file is,
is spread across spread across R node where R the number of reducers.
And if it's usually convenient to have the output spread across
a small number of nodes rather than across a large number of nodes.
And so usually R is set to a smaller value than M.
# 4. Combiners and Partition Functions (12:17) [Advanced]
Welcome back to Mining of Massive Datasets.
In the previous lectures we studied the Basic Map-Reduce model, and
then we looked at how it's actually implemented.
In this lecture, we're going to look at a tuples of refinements to the basic
Map-Reduce model, that can make it run a bit faster.
[SOUND] The first refinement we're going to look at is combiners.
Now, one of the things that you may have noticed in the previous examples, was that
the map task will produce many pairs of key value pairs with the same key.
For example popular word, like the, will occur in millions and
millions of key value pairs.
Now, remember that the map tasks are actually happening in
parallel on multiple worker nodes.
And the, the key value pairs from each map node have to
be shipped to to, to reducer nodes.
If you sort of imagine a word like the, the on, on node 1,
map task 1, it's probably going to see a few thousand occurrences of the word the.
And map task 2 is going to see a few thousand occurrences of
the word the, and so on.
So the output of map task 1,
will have let's say 1000 tuples, with the key the and value of one.
Now all these,
tuples will have to be shipped over to let's say to the new task 1.
.
[][Now, by shipping a thousand tuples over all of whose you know, keys are the,]
[][all of whose values are one it's a lot of network overhead.]
And, you can save some of this network overhead by doing an intermediate sum
in the, in the map worker.
For example, if we're sending thousand tuples that each say, that,
that each have the key the and the value of one.
You can send a single tuple that has the key the and the value of 1000, right?
And so, you can save a lot of network bandwidth by doing a little bit of
pre-aggregation in the map worker.
Here's a mapper and the mapper is this is about code example again.
The mapper the we, has b occurring once, c occurring once,
d occurring once, e occurring once.
D occurring once and b occurring, once again.
And now the, the tuple b occurs two times here having in the output of this mapper.
So, l,
*the combiner which is another function that is provided by the programmer.*
*Combines the two occurrences of B, and produce a single tuple,*
*B comma two which is then shipped, shipped over to the reducer.*
Since, we have two tuples of the form B1 being shipped over to the reducer,
a single tuple of the form B2 gets shipped over to the reducer.
And this way much less data needs to be copied and, and shuffled.
So the, the combiner is actually also supplied by the programmer.
The programmer provides a function combine.
The input to the combiner is is, is a key and a list of values.
And the output is a single value.
So, instead of a whole bunch of tuples with the key k
being shipped off to a reducer.
Just a single tuple with key k and v2 is shipped off,
to the reducer now usually the combiner is the same function as the reducer.
So, if for example,
if a reducer adds up its input values the combiner does the same thing as well.
Further, we have to be careful, because this trick of using the combiner
works only if the reduce function is commutative and associative.
Let's look at a couple of examples to see what what I'm saying here.
So for example, let's say the, the reduce function is a sum function.
You want to add up all the input values, as in the count example.
Now the, the sum function actually is commutative and
associative; by which we mean, that, a plus b.
Is b is the same as b plus a.
And a plus b plus c is the same as a plus b plus c.
This is the first property is the commutative property.
And the second property the associative property.
And because sum satisfies both these properties sum can be used as
a combiner as well as a reducer.
What that really means,
is that if you have a lot of values that need to be summed.
All these values need to be added up.
We can break it up into two pieces.
You you can sum up the first piece.
You can sum up the second piece.
And then you can sum up the, the two intermediate results and you'll get the,
you'll get the proper, you'll get the same, same answer.