主要内容

使用MapReduce进行简单的数据划分

这个例子展示了如何提取大型数据集的子集。

子集设置或执行查询有两个方面。一种是选择数据集中变量(列)的子集。另一种方法是选择观察结果的子集或行。

在本例中,变量的选择发生在数据存储的定义中。(map函数可以执行进一步的变量子选择,但这不在本例的范围内)。在本例中,map函数的作用是执行观察结果的选择。reduce函数的作用是将每次调用提取的子集记录连接到map函数。这种方法假定数据集在Map阶段之后可以装入内存。

准备数据

方法创建数据存储airlinesmall.csv数据集。这个12兆字节的数据集包含多家航空公司的29列航班信息,包括到达和起飞时间。这个例子使用了数据中29个变量中的15个。

ds = tabularTextDatastore(“airlinesmall.csv”“TreatAsMissing”“NA”);ds。SelectedVariableNames = ds。变量名([1 2 5 9 12 13 15 16 17 ....18 20 21 25 26 27]);ds。SelectedVariableNames
ans =1连接单元格第1至4列{'年'}{'月'}{'DepTime'} {'UniqueCarrier'}第5至8列{'ActualElapsedTime'} {'CRSElapsedTime'} {'ArrDelay'} {'DepDelay'}第9至13列{'Origin'} {'Dest'} {'TaxiIn'} {'TaxiOut'} {'CarrierDelay'}第14至15列{'WeatherDelay'} {'NASDelay'}

数据存储处理“NA”值,并将缺少的值替换为默认值。此外,SelectedVariableNames属性允许您仅处理感兴趣的指定变量,您可以使用这些变量进行验证预览

预览(ds)
ans =8×15表年月DepTime UniqueCarrier ActualElapsedTime CRSElapsedTime ArrDelay DepDelay起源Dest TaxiIn TaxiOut CarrierDelay WeatherDelay NASDelay  ____ _____ _______ _____________ _________________ ______________ ________ ________ _______ _______ ______ _______ ____________ ____________ ________ 1987 10 642{“PS”}53 57 8 12{“宽松”}{‘SJC}南南南南南1987 10 1021{“PS”}63 56 8 1{‘SJC}{“钻”}南南南南南1987 10 2055{“PS”}83 82 21 20{‘圣’}{SMF的}南南南南南198710 1332 {'PS'} 59 58 13 12 {'BUR'} {'SJC'} NaN NaN NaN NaN NaN 1987 10 629 {'PS'} 77 72 4 -1 {'SMF'} {'LAX'} NaN NaN NaN NaN NaN 1987 10 1446 {'PS'} 61 65 59 63 {'LAX'} {'SJC'} NaN NaN NaN NaN NaN 1987 10 928 {'PS'} 84 79 3 -2 {'SAN'} {'SFO'} NaN NaN NaN NaN NaN 1987 10 859 {'PS'} 155 143 11 -1 {'SEA'} {'LAX'} NaN NaN NaN NaN NaN

MapReduce运行

mapreduce函数需要map函数和reduce函数作为输入。映射器接收数据块并输出中间结果。减速器读取中间结果并产生最终结果。

类所描述的变量的表SelectedVariableNames属性。然后,绘图器提取出从登机口推回后有大量延误的航班。具体来说,它可以识别持续时间超过计划持续时间2.5倍的航班。映射器忽略了1995年之前的航班,因为本例中一些感兴趣的变量在该年之前没有收集到。

显示map函数文件。

函数subsettingMapper(data, ~, intermKVStore)选择1995年及以后飞行时间特别长的航班飞行时间百分比(包括在停机坪的时间和在机场的时间)%空气)。Idx = data。Year > 1994 &(数据。ActualElapsedTime- data.CRSElapsedTime)...> 1.50 * data.CRSElapsedTime;intermVal = data(idx,:);添加(intermKVStore“零”, intermVal);结束

减速器接收从映射器获得的子集观测数据,并简单地将它们连接到单个表中。减速器返回一个键(相对没有意义)和一个值(连接的表)。

显示reduce函数文件。

函数subsettingReducer(~, intermValList, outKVStore)%从列表中获取所有中间结果outVal = {};hasnext(intermValList) outVal = [outVal;getnext (intermValList)];结束注意,此方法假定连接的中间值整个数据的%子集)适合内存。添加(outKVStore“零”, outVal);结束

使用mapreduce要将map和reduce函数应用到数据存储,ds

result = mapreduce(ds, @subsettingMapper, @subsettingReducer);
******************************** * MAPREDUCE的进展  * ******************************** 地图地图16%减少0% 32% 0%减少0%减少0%地图48%减少0% 65%减少0%地图81%减少0% 97%减少0%地图100%减少0% 100%减少100%

mapreduce返回一个输出数据存储,结果,文件在当前文件夹。

显示结果

在从数据集中提取的前10个变量中寻找模式。这些变量确定了航空公司、目的地和到达机场,以及一些基本的延误信息。

R = readall(结果);tbl = r.Value{1};台(:1:10)
ans =37×10表年月DepTime UniqueCarrier ActualElapsedTime CRSElapsedTime ArrDelay DepDelay起源服务台  ____ _____ _______ _____________ _________________ ______________ ________ ________ _______ _______ 1995 6 1601{‘我们’}162 58 118 14{机场“BWI’}{‘坑’}1996 6 1834{‘公司’}241 75 220 54{“网络成瘾”}{“英文文宣写作研习营”}1997 1 730{“DL”}110 43 137 70 {ATL的}{“GSP”}1997 4 1715 {UA的}152 57 243 148 {}' IND '{'奥德}1997 9 2232{“西北”}143 115 22{的DTW}{‘而言不啻}1997 10 1419{‘公司’}196 58 157 19 {DFW的}{‘IAH} 1998 32156{“DL”}139 49 146 56{“泰”}{ATL的}1998 10 1803{“西北”}291 81 213 3 {MSP的}{奥德的}2000 5 830{的WN} 140 55 85 0{“木豆”}{“侯”}2000 8 1630{‘公司’}357 123 244 10{“英文文宣写作研习营”}{“此时此地”}2002 6 1759{‘我们’}260 67 192 1{“达到”}{“bo”}2003 3 1214 {XE的}214 84 124 6 {GPT的}{‘IAH} 2003 3 604 {XE的}175 60 114 1{“融通”}{‘IAH} 2003 4 1556 {MQ的}142 52 182 92 {PIA的}{奥德的}2003 5 1954{‘我们’}127 48 78 1{‘RDU}{“此时此地”}2003 7 1250 {FL的}261 95 166 0 {ATL的}{“网络成瘾”}⋮

请看第一个记录,一架美国航空公司的航班比预定起飞时间晚了14分钟离开登机口,并晚了118分钟到达。这架飞机在被推到登机口后延误了104分钟ActualElapsedTime而且CRSElapsedTime

有一个异常记录。2006年2月,捷蓝航空的一架航班起飞时间为凌晨3点24分,飞行时间为1650分钟,但抵达时间仅延误415分钟。这可能是数据输入错误。

除此之外,关于这些异常延误的航班何时何地发生,并没有明确的模式。没有哪家航空公司、一年中的哪个时段、一天中的哪个时段或某个机场占主导地位。一些直观的模式,如冬季的奥黑尔(ORD),当然是存在的。

延迟模式

从1995年开始,航空公司系统性能数据开始包括测量航班在滑行阶段发生了多少延误。然后,在2003年,数据也开始包括某些延迟的原因。

仔细检查这两个变量。

台(:,(1、7、8、11:结束])
ans =37×8表年ArrDelay DepDelay TaxiIn TaxiOut CarrierDelay WeatherDelay NASDelay  ____ ________ ________ ______ _______ ____________ ____________ ________ 1995 118 220 7 101南南南1996 54 12 180南南南1997 137 70 2 12南南南南南南1997 243 148 4 1997 115 22 4 98南南南1997 157 95年6 19日南南南1998 146 56 9 47南南南1998 213 3 11 205南南南2000 85 0 5 51南南南2000 244 273南南南4 2002 192 1 6 217南南南2003 124 6 13 131南南南2003 114 -1 8 106 NaN NaN NaN 2003 182 92 9 106 NaN NaN NaN 2003 78 -1 5 90 NaN NaN NaN 2003 166 0 11 170 0 0 166 ⋮

对于这些异常延误的航班,绝大多数延误发生在起飞时,在停机坪上。此外,延误的主要原因是NASDelay.NAS延误是指国家航空主管部门对飞往机场的航班实施的延误,该机场预计无法在航班预定到达时间处理所有预定抵达的航班。在任何给定时间生效的NAS延迟程序发布在https://www.fly.faa.gov/ois/

最好的情况是,当实施NAS延误时,登机只是延迟。这样的延误会显示为起飞延误。然而,对于本例所选择的大多数航班,延误大多发生在离开登机口后,导致出租车延误。

重新运行MapReduce

前面的map函数在函数文件中硬连接了子集设置标准。必须为任何新的查询编写一个新的地图函数,比如某一天从旧金山出发的航班。

通过从映射函数定义中分离出子集设置标准,并使用匿名函数为每个查询配置映射器,泛型映射器可以具有更强的适应性。这个泛型映射器使用第四个输入参数提供所需的查询变量。

显示通用映射函数文件。

函数subsettingMapperGeneric(data, ~, intermKVStore, subsetter) intermKey =“零”;intermVal = data(subsetter(data),:);添加(intermKVStore intermKey intermVal);结束

创建一个匿名函数,执行与硬编码相同的行选择subsettingMapper

inFlightDelay150percent =...@(数据)的数据。1994年>年&...(data.ActualElapsedTime-data.CRSElapsedTime) 1.50*data.CRSElapsedTime;

mapreduce函数要求map和reduce函数接受恰好三个输入,使用另一个匿名函数指定映射器的第四个输入,subsettingMapperGeneric.随后,您可以使用这个匿名函数来调用subsettingMapperGeneric只使用三个参数(第四个是隐式的)。

configuredMapper =...@(data, info, intermKVStore) subsettingMapperGeneric(data, info, intermKVStore)...intermKVStore inFlightDelay150percent);

使用mapreduce将通用map函数应用到输入数据存储。

result2 = mapreduce(ds, configuredMapper, @subsettingReducer);
******************************** * MAPREDUCE的进展  * ******************************** 地图地图16%减少0% 32% 0%减少0%减少0%地图48%减少0% 65%减少0%地图81%减少0% 97%减少0%地图100%减少0% 100%减少100%

mapreduce返回一个输出数据存储,result2,文件在当前文件夹。

验证结果

确认泛型映射器获得与使用硬连接的子集逻辑相同的结果。

R2 = readall(result2);tbl2 = r2.Value{1};如果Isequaln (tbl, tbl2) disp(“与可配置映射器的结果相同。”其他的disp (“哎呀,回到画板上。”结束
与可配置映射器的结果相同。

本地函数

这里列出的是map和reduce函数mapreduce适用于数据。

函数subsettingMapper(data, ~, intermKVStore)选择1995年及以后飞行时间特别长的航班飞行时间百分比(包括在停机坪的时间和在机场的时间)%空气)。Idx = data。Year > 1994 &(数据。ActualElapsedTime- data.CRSElapsedTime)...> 1.50 * data.CRSElapsedTime;intermVal = data(idx,:);添加(intermKVStore“零”, intermVal);结束%-------------------------------------------------------------------------函数subsettingReducer(~, intermValList, outKVStore)%从列表中获取所有中间结果outVal = {};hasnext(intermValList) outVal = [outVal;getnext (intermValList)];结束注意,此方法假定连接的中间值整个数据的%子集)适合内存。添加(outKVStore“零”, outVal);结束%-------------------------------------------------------------------------函数subsettingMapperGeneric(data, ~, intermKVStore, subsetter) intermKey =“零”;intermVal = data(subsetter(data),:);添加(intermKVStore intermKey intermVal);结束%-------------------------------------------------------------------------

另请参阅

|

相关的话题