Hazelcast - 地图缩减和聚合
MapReduce 是一种计算模型,当您拥有大量数据并且需要多台机器(即分布式环境)来计算数据时,它对于数据处理非常有用。它涉及将数据“映射”为键值对,然后“减少”,即对这些键进行分组并对值执行操作。
鉴于 Hazelcast 在设计时就考虑到了分布式环境,因此实现 Map-Reduce 框架是自然而然的事情。
让我们通过一个例子来看看如何做到这一点。
例如,假设我们有关于一辆汽车(品牌和车号)以及该车车主的数据。
Honda-9235, John Hyundai-235, Alice Honda-935, Bob Mercedes-235, Janice Honda-925, Catnis Hyundai-1925, Jane
现在,我们必须计算出每个品牌的汽车数量,即现代、本田等。
例子
让我们尝试使用 MapReduce 来找出答案 -
package com.example.demo;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.core.IMap;
import com.hazelcast.mapreduce.Context;
import com.hazelcast.mapreduce.Job;
import com.hazelcast.mapreduce.JobTracker;
import com.hazelcast.mapreduce.KeyValueSource;
import com.hazelcast.mapreduce.Mapper;
import com.hazelcast.mapreduce.Reducer;
import com.hazelcast.mapreduce.ReducerFactory;
public class MapReduce {
public static void main(String[] args) throws ExecutionException,
InterruptedException {
try {
// create two Hazelcast instances
HazelcastInstance hzMember = Hazelcast.newHazelcastInstance();
Hazelcast.newHazelcastInstance();
IMap<String, String> vehicleOwnerMap=hzMember.getMap("vehicleOwnerMap");
vehicleOwnerMap.put("Honda-9235", "John");
vehicleOwnerMap.putc"Hyundai-235", "Alice");
vehicleOwnerMap.put("Honda-935", "Bob");
vehicleOwnerMap.put("Mercedes-235", "Janice");
vehicleOwnerMap.put("Honda-925", "Catnis");
vehicleOwnerMap.put("Hyundai-1925", "Jane");
KeyValueSource<String, String> kvs=KeyValueSource.fromMap(vehicleOwnerMap);
JobTracker tracker = hzMember.getJobTracker("vehicleBrandJob");
Job<String, String> job = tracker.newJob(kvs);
ICompletableFuture<Map<String, Integer>> myMapReduceFuture =
job.mapper(new BrandMapper())
.reducer(new BrandReducerFactory()).submit();
Map<String, Integer&g; result = myMapReduceFuture.get();
System.out.println("Final output: " + result);
} finally {
Hazelcast.shutdownAll();
}
}
private static class BrandMapper implements Mapper<String, String, String, Integer> {
@Override
public void map(String key, String value, Context<String, Integer>
context) {
context.emit(key.split("-", 0)[0], 1);
}
}
private static class BrandReducerFactory implements ReducerFactory<String, Integer, Integer> {
@Override
public Reducer<Integer, Integer> newReducer(String key) {
return new BrandReducer();
}
}
private static class BrandReducer extends Reducer<Integer, Integer> {
private AtomicInteger count = new AtomicInteger(0);
@Override
public void reduce(Integer value) {
count.addAndGet(value);
}
@Override
public Integer finalizeReduce() {
return count.get();
}
}
}
让我们尝试理解这段代码 -
- 我们创建 Hazelcast 成员。在示例中,我们有一个成员,但也可以有多个成员。
我们使用虚拟数据创建一个地图,并用它创建一个键值存储。
我们创建一个 Map-Reduce 作业并要求它使用键值存储作为数据。
然后我们将作业提交到集群并等待完成。
映射器创建一个键,即从原始键中提取品牌信息并将值设置为 1,然后将该信息作为 KV 发送到减速器。
减速器简单地对值进行求和,并根据键(即品牌名称)对数据进行分组。
输出
代码的输出 -
Final output: {Mercedes=1, Hyundai=2, Honda=3}