-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmapreduce_pipeline.py
More file actions
executable file
·301 lines (250 loc) · 9.39 KB
/
mapreduce_pipeline.py
File metadata and controls
executable file
·301 lines (250 loc) · 9.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
#!/usr/bin/env python
#
# Copyright 2011 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Pipelines for mapreduce library."""
from __future__ import with_statement
__all__ = [
"CleanupPipeline",
"MapPipeline",
"MapperPipeline",
"MapreducePipeline",
"ReducePipeline",
"ShufflePipeline",
]
import base64
import pickle
from mapreduce.lib import pipeline
from mapreduce.lib.pipeline import common as pipeline_common
from mapreduce.lib import files
from mapreduce.lib.files import file_service_pb
from mapreduce import base_handler
from mapreduce import context
from mapreduce import errors
from mapreduce import input_readers
from mapreduce import mapper_pipeline
from mapreduce import operation
from mapreduce import output_writers
from mapreduce import shuffler
from mapreduce import util
# Mapper pipeline is extracted only to remove dependency cycle with shuffler.py
# Reimport it back.
MapperPipeline = mapper_pipeline.MapperPipeline
ShufflePipeline = shuffler.ShufflePipeline
CleanupPipeline = mapper_pipeline._CleanupPipeline
class MapPipeline(base_handler.PipelineBase):
"""Runs the map stage of MapReduce.
Iterates over input reader and outputs data into key/value format
for shuffler consumption.
Args:
job_name: mapreduce job name as string.
mapper_spec: specification of map handler function as string.
input_reader_spec: input reader specification as string.
params: mapper and input reader parameters as dict.
shards: number of shards to start as int.
Returns:
list of filenames written to by this mapper, one for each shard.
"""
def run(self,
job_name,
mapper_spec,
input_reader_spec,
params,
shards=None):
yield MapperPipeline(
job_name + "-map",
mapper_spec,
input_reader_spec,
output_writer_spec=
output_writers.__name__ + ".KeyValueBlobstoreOutputWriter",
params=params,
shards=shards)
class _ReducerReader(input_readers.RecordsReader):
"""Reader to read KeyValues records files from Files API."""
expand_parameters = True
def __init__(self, filenames, position):
super(_ReducerReader, self).__init__(filenames, position)
self.current_key = None
self.current_values = None
def __iter__(self):
ctx = context.get()
combiner = None
if ctx:
combiner_spec = ctx.mapreduce_spec.mapper.params.get("combiner_spec")
if combiner_spec:
combiner = util.handler_for_name(combiner_spec)
for binary_record in super(_ReducerReader, self).__iter__():
proto = file_service_pb.KeyValues()
proto.ParseFromString(binary_record)
if self.current_key is None:
self.current_key = proto.key()
self.current_values = []
else:
assert proto.key() == self.current_key, (
"inconsistent key sequence. Expected %s but got %s" %
(self.current_key, proto.key()))
if combiner:
combiner_result = combiner(
self.current_key, proto.value_list(), self.current_values)
if not util.is_generator(combiner_result):
raise errors.BadCombinerOutputError(
"Combiner %s should yield values instead of returning them (%s)" %
(combiner, combiner_result))
self.current_values = []
for value in combiner_result:
if isinstance(value, operation.Operation):
value(ctx)
else:
# with combiner current values always come from combiner
self.current_values.append(value)
else:
# without combiner we just accumulate values.
self.current_values.extend(proto.value_list())
if not proto.partial():
key = self.current_key
values = self.current_values
# This is final value, don't try to serialize it.
self.current_key = None
self.current_values = None
yield (key, values)
else:
yield input_readers.ALLOW_CHECKPOINT
@staticmethod
def encode_data(data):
"""Encodes the given data, which may have include raw bytes.
Works around limitations in JSON encoding, which cannot handle raw bytes.
"""
# TODO(user): Use something less slow/ugly.
return base64.b64encode(pickle.dumps(data))
@staticmethod
def decode_data(data):
"""Decodes data encoded with the encode_data function."""
return pickle.loads(base64.b64decode(data))
def to_json(self):
"""Returns an input shard state for the remaining inputs.
Returns:
A json-izable version of the remaining InputReader.
"""
result = super(_ReducerReader, self).to_json()
result["current_key"] = _ReducerReader.encode_data(self.current_key)
result["current_values"] = _ReducerReader.encode_data(self.current_values)
return result
@classmethod
def from_json(cls, json):
"""Creates an instance of the InputReader for the given input shard state.
Args:
json: The InputReader state as a dict-like object.
Returns:
An instance of the InputReader configured using the values of json.
"""
result = super(_ReducerReader, cls).from_json(json)
result.current_key = _ReducerReader.decode_data(json["current_key"])
result.current_values = _ReducerReader.decode_data(json["current_values"])
return result
class ReducePipeline(base_handler.PipelineBase):
"""Runs the reduce stage of MapReduce.
Merge-reads input files and runs reducer function on them.
Args:
job_name: mapreduce job name as string.
reader_spec: specification of reduce function.
output_writer_spec: specification of output write to use with reduce
function.
params: mapper parameters to use as dict.
filenames: list of filenames to reduce.
combiner_spec: Optional. Specification of a combine function. If not
supplied, no combine step will take place. The combine function takes a
key, list of values and list of previously combined results. It yields
combined values that might be processed by another combiner call, but will
eventually end up in reducer. The combiner output key is assumed to be the
same as the input key.
shards: Optional. Number of output shards. Defaults to the number of
input files.
Returns:
filenames from output writer.
"""
def run(self,
job_name,
reducer_spec,
output_writer_spec,
params,
filenames,
combiner_spec=None,
shards=None):
new_params = dict(params or {})
new_params.update({
"files": filenames
})
if combiner_spec:
new_params.update({
"combiner_spec": combiner_spec,
})
# TODO(user): Test this
if shards is None:
shards = len(filenames)
yield mapper_pipeline.MapperPipeline(
job_name + "-reduce",
reducer_spec,
__name__ + "._ReducerReader",
output_writer_spec,
new_params,
shards=shards)
class MapreducePipeline(base_handler.PipelineBase):
"""Pipeline to execute MapReduce jobs.
Args:
job_name: job name as string.
mapper_spec: specification of mapper to use.
reducer_spec: specification of reducer to use.
input_reader_spec: specification of input reader to read data from.
output_writer_spec: specification of output writer to save reduce output to.
mapper_params: parameters to use for mapper phase.
reducer_params: parameters to use for reduce phase.
shards: number of shards to use as int.
combiner_spec: Optional. Specification of a combine function. If not
supplied, no combine step will take place. The combine function takes a
key, list of values and list of previously combined results. It yields
combined values that might be processed by another combiner call, but will
eventually end up in reducer. The combiner output key is assumed to be the
same as the input key.
Returns:
filenames from output writer.
"""
def run(self,
job_name,
mapper_spec,
reducer_spec,
input_reader_spec,
output_writer_spec=None,
mapper_params=None,
reducer_params=None,
shards=None,
combiner_spec=None):
map_pipeline = yield MapPipeline(job_name,
mapper_spec,
input_reader_spec,
params=mapper_params,
shards=shards)
shuffler_pipeline = yield ShufflePipeline(
job_name, map_pipeline)
reducer_pipeline = yield ReducePipeline(
job_name,
reducer_spec,
output_writer_spec,
reducer_params,
shuffler_pipeline,
combiner_spec=combiner_spec)
with pipeline.After(reducer_pipeline):
all_temp_files = yield pipeline_common.Extend(
map_pipeline, shuffler_pipeline)
yield CleanupPipeline(all_temp_files)
yield pipeline_common.Return(reducer_pipeline)