Chapter 8. Distributed Executor Service

Table of Contents

8.1. Distributed Execution
8.2. Execution Cancellation
8.3. Execution Callback

One of the coolest features of Java 1.5 is the Executor framework, which allows you to asynchronously execute your tasks, logical units of works, such as database query, complex calculation, image rendering etc. So one nice way of executing such tasks would be running them asynchronously and doing other things meanwhile. When ready, get the result and move on. If execution of the task takes longer than expected, you may consider canceling the task execution. In Java Executor framework, tasks are implemented as java.util.concurrent.Callable and java.util.Runnable.

import java.util.concurrent.Callable;
import java.io.Serializable;

public class Echo implements Callable<String>, Serializable {
    String input = null;

    public Echo() {
    }

    public Echo(String input) {
        this.input = input;
    }

    public String call() {
        return Hazelcast.getCluster().getLocalMember().toString() + ":" + input;
    }
}

Echo callable above, for instance, in its call() method, is returning the local member and the input passed in. Remember that Hazelcast.getCluster().getLocalMember() returns the local member and toString() returns the member's address (ip + port) in String form, just to see which member actually executed the code for our example. Of course, call() method can do and return anything you like. Executing a task by using executor framework is very straight forward. Simply obtain a ExecutorService instance, generally via Executors and submit the task which returns a Future. After executing task, you don't have to wait for execution to complete, you can process other things and when ready use the future object to retrieve the result as show in code below.

ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> future = executorService.submit (new Echo("myinput"));
//while it is executing, do some useful stuff
//when ready, get the result of your execution
String result = future.get();

8.1. Distributed Execution

Distributed executor service is a distributed implementation of java.util.concurrent.ExecutorService. It allows you to execute your code in cluster. In this chapter, all the code samples are based on the Echo class above. Please note that Echo class is Serializable . You can ask Hazelcast to execute your code (Runnable, Callable):

  • on a specific cluster member you choose.

  • on the member owning the key you choose.

  • on the member Hazelcast will pick.

  • on all or subset of the cluster members.

import com.hazelcast.core.Member;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.IExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;   
import java.util.Set;
import com.hazelcast.config.Config;


public void echoOnTheMember(String input, Member member) throws Exception {
   Callable<String> task = new Echo(input);
   HazelcastInstance hz = Hazelcast.newHazelcastInstance();
   IExecutorService executorService = hz.getExecutorService("default");
   Future<String> future = executorService.submitToMember(task, member);
   String echoResult = future.get();
}

public void echoOnTheMemberOwningTheKey(String input, Object key) throws Exception {
   Callable<String> task = new Echo(input);
   HazelcastInstance hz = Hazelcast.newHazelcastInstance();
   IExecutorService executorService = hz.getExecutorService("default");
   Future<String> future = executorService.submitToKeyOwner(task, key);
   String echoResult = future.get();
}

public void echoOnSomewhere(String input) throws Exception { 
   HazelcastInstance hz = Hazelcast.newHazelcastInstance();
   IExecutorService executorService = hz.getExecutorService("default");
   Future<String> future = executorService.submit(new Echo(input));
   String echoResult = future.get();
}

public void echoOnMembers(String input, Set<Member> members) throws Exception {
   HazelcastInstance hz = Hazelcast.newHazelcastInstance();
   IExecutorService executorService = hz.getExecutorService("default");
   Map<Member, Future<String>> futures = executorService.submitToMembers(new Echo(input), members);
   for (Future<String> future : futures.values()) {
        String echoResult = future.get();
        // ...
   }
}

Note that you can obtain the set of cluster members via Hazelcast.getCluster().getMembers() call.