In the previous sections for the CounterService
example, we started CounterService
as part of a HazelcastInstance startup.
Now, let's connect the Counter
interface to CounterService
and perform a remote call to the cluster member hosting the counter data. Then, we will return a dummy result.
Remote calls are performed via a proxy in Hazelcast. Proxies expose the methods at the client side. Once a method is called, proxy creates an operation object, sends this object to the cluster member responsible from executing that operation, and then sends the result.
First, we need to make the Counter
interface a distributed object by extending the DistributedObject
interface, as shown below.
import com.hazelcast.core.DistributedObject;
public interface Counter extends DistributedObject {
int inc(int amount);
}
Now, we need to make the CounterService
class implement not only the ManagedService
interface, but also the interface com.hazelcast.spi.RemoteService
. This way, a client will be able to get a handle of a counter proxy.
import com.hazelcast.core.DistributedObject;
import com.hazelcast.spi.ManagedService;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.RemoteService;
import java.util.Properties;
public class CounterService implements ManagedService, RemoteService {
public static final String NAME = "CounterService";
private NodeEngine nodeEngine;
@Override
public DistributedObject createDistributedObject(String objectName) {
return new CounterProxy(objectName, nodeEngine, this);
}
@Override
public void destroyDistributedObject(String objectName) {
// for the time being a no-op, but in the later examples this will be implemented
}
@Override
public void init(NodeEngine nodeEngine, Properties properties) {
this.nodeEngine = nodeEngine;
}
@Override
public void shutdown(boolean terminate) {
}
@Override
public void reset() {
}
}
The CounterProxy
returned by the method createDistributedObject
is a local representation to (potentially) remote managed data and logic.
NOTE: Note that caching and removing the proxy instance are done outside of this service.
Now, it is time to implement the CounterProxy
as shown below.
import com.hazelcast.spi.AbstractDistributedObject;
import com.hazelcast.spi.InvocationBuilder;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.util.ExceptionUtil;
import java.util.concurrent.Future;
public class CounterProxy extends AbstractDistributedObject<CounterService> implements Counter {
private final String name;
public CounterProxy(String name, NodeEngine nodeEngine, CounterService counterService) {
super(nodeEngine, counterService);
this.name = name;
}
@Override
public String getServiceName() {
return CounterService.NAME;
}
@Override
public String getName() {
return name;
}
@Override
public int inc(int amount) {
NodeEngine nodeEngine = getNodeEngine();
IncOperation operation = new IncOperation(name, amount);
int partitionId = nodeEngine.getPartitionService().getPartitionId(name);
InvocationBuilder builder = nodeEngine.getOperationService()
.createInvocationBuilder(CounterService.NAME, operation, partitionId);
try {
final Future<Integer> future = builder.invoke();
return future.get();
} catch (Exception e) {
throw ExceptionUtil.rethrow(e);
}
}
}
CounterProxy
is a local representation of remote data/functionality. It does not include the counter state. Therefore, the method inc
should be invoked on the cluster member hosting the real counter. You can invoke it using Hazelcast SPI; then it will send the operations to the correct member and return the results.
Let's dig deeper into the method inc
.
IncOperation
with a given name
and amount
.name
; by this way, all operations for a given name will result in the same partition ID.InvocationBuilder
where the connection between operation and partition is made.InvocationBuilder
and wait for its result. This waiting is performed with a future.get()
. In our case, timeout is not important. However, it is a good practice to use a timeout for a real system since operations should complete in a certain amount of time. Hazelcast's ExceptionUtil
is a good solution when it comes to dealing with execution exceptions. When the execution of the operation fails with an exception, an ExecutionException
is thrown and handled with the method ExceptionUtil.rethrow(Throwable)
.
If it is an InterruptedException
, we have two options: Either propagating the exception or just using the ExceptionUtil.rethrow
for all exceptions. Please see below sample.
try {
final Future<Integer> future = invocation.invoke();
return future.get();
} catch(InterruptedException e){
throw e;
} catch(Exception e){
throw ExceptionUtil.rethrow(e);
}
Now, let's write the IncOperation
. It implements PartitionAwareOperation
interface, meaning that it will be executed on the partition that hosts the counter.
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.spi.AbstractOperation;
import com.hazelcast.spi.PartitionAwareOperation;
import java.io.IOException;
class IncOperation extends AbstractOperation implements PartitionAwareOperation {
private String objectId;
private int amount, returnValue;
// Important to have a no-arg constructor for deserialization
public IncOperation() {
}
public IncOperation(String objectId, int amount) {
this.amount = amount;
this.objectId = objectId;
}
@Override
public void run() throws Exception {
System.out.println("Executing " + objectId + ".inc() on: " + getNodeEngine().getThisAddress());
returnValue = 0;
}
@Override
public boolean returnsResponse() {
return true;
}
@Override
public Object getResponse() {
return returnValue;
}
@Override
protected void writeInternal(ObjectDataOutput out) throws IOException {
super.writeInternal(out);
out.writeUTF(objectId);
out.writeInt(amount);
}
@Override
protected void readInternal(ObjectDataInput in) throws IOException {
super.readInternal(in);
objectId = in.readUTF();
amount = in.readInt();
}
}
The method run
does the actual execution. Since IncOperation
will return a response, the method returnsResponse
returns true
. If your method is asynchronous and does not need to return a response, it is better to return false
since it will be faster. The actual response is stored in the field returnValue
; you can retrieve it with the method getResponse
.
There are two more methods in the above code: writeInternal
and readInternal
. Since IncOperation
needs to be serialized, these two methods should be overwritten, and hence, objectId
and amount
will be serialized and available when those operations are executed.
For the deserialization, note that the operation must have a no-arg constructor.
Now, let's run our code.
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import java.util.UUID;
public class Member {
public static void main(String[] args) {
HazelcastInstance[] instances = new HazelcastInstance[2];
for (int k = 0; k < instances.length; k++)
instances[k] = Hazelcast.newHazelcastInstance();
Counter[] counters = new Counter[4];
for (int k = 0; k < counters.length; k++)
counters[k] = instances[0].getDistributedObject(CounterService.NAME, k+"counter");
for (Counter counter : counters)
System.out.println(counter.inc(1));
System.out.println("Finished");
System.exit(0);
}
}
Once run, you will see the output as below.
Executing 0counter.inc() on: Address[192.168.1.103]:5702
0
Executing 1counter.inc() on: Address[192.168.1.103]:5702
0
Executing 2counter.inc() on: Address[192.168.1.103]:5701
0
Executing 3counter.inc() on: Address[192.168.1.103]:5701
0
Finished
Note that counters are stored in different cluster members. Also note that increment is not active for now since the value remains as 0.
Until now, we have performed the basics to get this up and running. In the next section, we will make a real counter, cache the proxy instances and deal with proxy instance destruction.