In the previous sections for the CounterService
example, we started CounterService
as part of a HazelcastInstance startup.
Now, let's connect the Counter
interface to CounterService
and perform a remote call to the cluster member hosting the counter data. Then, we will return a dummy result.
Remote calls are performed via a proxy in Hazelcast. Proxies expose the methods at the client side. Once a method is called, proxy creates an operation object, sends this object to the cluster member responsible from executing that operation, and then sends the result.
Making Counter a Distributed Object
First, we need to make the Counter
interface a distributed object by extending the DistributedObject
interface, as shown below.
import com.hazelcast.core.DistributedObject;
public interface Counter extends DistributedObject {
int inc(int amount);
}
Implementing ManagedService and RemoteService
Now, we need to make the CounterService
class implement not only the ManagedService
interface, but also the interface com.hazelcast.spi.RemoteService
. This way, a client will be able to get a handle of a counter proxy. You can read the source code for RemoteService here.
import com.hazelcast.core.DistributedObject;
import com.hazelcast.spi.ManagedService;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.RemoteService;
import java.util.Properties;
public class CounterService implements ManagedService, RemoteService {
public static final String NAME = "CounterService";
private NodeEngine nodeEngine;
@Override
public DistributedObject createDistributedObject(String objectName) {
return new CounterProxy(objectName, nodeEngine, this);
}
@Override
public void destroyDistributedObject(String objectName) {
// for the time being a no-op, but in the later examples this will be implemented
}
@Override
public void init(NodeEngine nodeEngine, Properties properties) {
this.nodeEngine = nodeEngine;
}
@Override
public void shutdown(boolean terminate) {
}
@Override
public void reset() {
}
}
The CounterProxy
returned by the method createDistributedObject
is a local representation to (potentially) remote managed data and logic.
NOTE: Note that caching and removing the proxy instance are done outside of this service.
Implementing CounterProxy
Now, it is time to implement the CounterProxy
as shown below. CounterProxy
extends AbstractDistributedObject, source code here.
import com.hazelcast.spi.AbstractDistributedObject;
import com.hazelcast.spi.InvocationBuilder;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.util.ExceptionUtil;
import java.util.concurrent.Future;
public class CounterProxy extends AbstractDistributedObject<CounterService> implements Counter {
private final String name;
public CounterProxy(String name, NodeEngine nodeEngine, CounterService counterService) {
super(nodeEngine, counterService);
this.name = name;
}
@Override
public String getServiceName() {
return CounterService.NAME;
}
@Override
public String getName() {
return name;
}
@Override
public int inc(int amount) {
NodeEngine nodeEngine = getNodeEngine();
IncOperation operation = new IncOperation(name, amount);
int partitionId = nodeEngine.getPartitionService().getPartitionId(name);
InvocationBuilder builder = nodeEngine.getOperationService()
.createInvocationBuilder(CounterService.NAME, operation, partitionId);
try {
final Future<Integer> future = builder.invoke();
return future.get();
} catch (Exception e) {
throw ExceptionUtil.rethrow(e);
}
}
}
CounterProxy
is a local representation of remote data/functionality. It does not include the counter state. Therefore, the method inc
should be invoked on the cluster member hosting the real counter. You can invoke it using Hazelcast SPI; then it will send the operations to the correct member and return the results.
Let's dig deeper into the method inc
.
- First, we create
IncOperation
with a givenname
andamount
. - Then, we get the partition ID based on the
name
; by this way, all operations for a given name will result in the same partition ID. - Then, we create an
InvocationBuilder
where the connection between operation and partition is made. - Finally, we invoke the
InvocationBuilder
and wait for its result. This waiting is performed with afuture.get()
. In our case, timeout is not important. However, it is a good practice to use a timeout for a real system since operations should complete in a certain amount of time.
Dealing with Exceptions
Hazelcast's ExceptionUtil
is a good solution when it comes to dealing with execution exceptions. When the execution of the operation fails with an exception, an ExecutionException
is thrown and handled with the method ExceptionUtil.rethrow(Throwable)
.
If it is an InterruptedException
, we have two options: either propagate the exception or just use the ExceptionUtil.rethrow
for all exceptions. Please see the example code below.
try {
final Future<Integer> future = invocation.invoke();
return future.get();
} catch(InterruptedException e){
throw e;
} catch(Exception e){
throw ExceptionUtil.rethrow(e);
}
Implementing the PartitionAwareOperation Interface
Now, let's write the IncOperation
. It implements the PartitionAwareOperation
interface, meaning that it will be executed on the partition that hosts the counter. See the PartitionAwareOperation source code here.
The method run
does the actual execution. Since IncOperation
will return a response, the method returnsResponse
returns true
. If your method is asynchronous and does not need to return a response, it is better to return false
since it will be faster. The actual response is stored in the field returnValue
; retrieve it with the method getResponse
.
There are two more methods in this code: writeInternal
and readInternal
. Since IncOperation
needs to be serialized, these two methods are overridden, and hence, objectId
and amount
are serialized and available when those operations are executed.
For the deserialization, note that the operation must have a no-arg constructor.
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.PartitionAwareOperation;
import java.io.IOException;
class IncOperation extends Operation implements PartitionAwareOperation {
private String objectId;
private int amount, returnValue;
// Important to have a no-arg constructor for deserialization
public IncOperation() {
}
public IncOperation(String objectId, int amount) {
this.amount = amount;
this.objectId = objectId;
}
@Override
public void run() throws Exception {
System.out.println("Executing " + objectId + ".inc() on: " + getNodeEngine().getThisAddress());
returnValue = 0;
}
@Override
public Object getResponse() {
return returnValue;
}
@Override
protected void writeInternal(ObjectDataOutput out) throws IOException {
super.writeInternal(out);
out.writeUTF(objectId);
out.writeInt(amount);
}
@Override
protected void readInternal(ObjectDataInput in) throws IOException {
super.readInternal(in);
objectId = in.readUTF();
amount = in.readInt();
}
}
Running the Code
Now, let's run our code.
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import java.util.UUID;
public class Member {
public static void main(String[] args) {
HazelcastInstance[] instances = new HazelcastInstance[2];
for (int k = 0; k < instances.length; k++)
instances[k] = Hazelcast.newHazelcastInstance();
Counter[] counters = new Counter[4];
for (int k = 0; k < counters.length; k++)
counters[k] = instances[0].getDistributedObject(CounterService.NAME, k+"counter");
for (Counter counter : counters)
System.out.println(counter.inc(1));
System.out.println("Finished");
System.exit(0);
}
}
Once run, you will see the output as below.
Executing 0counter.inc() on: Address[192.168.1.103]:5702
0
Executing 1counter.inc() on: Address[192.168.1.103]:5702
0
Executing 2counter.inc() on: Address[192.168.1.103]:5701
0
Executing 3counter.inc() on: Address[192.168.1.103]:5701
0
Finished
Note that counters are stored in different cluster members. Also note that increment is not active for now since the value remains as 0.
Until now, we have performed the basics to get this up and running. In the next section, we will make a real counter, cache the proxy instances and deal with proxy instance destruction.