Placing a Remote Call - Proxy

In the previous sections for the CounterService example, we started CounterService as part of a HazelcastInstance startup.

Now, let's connect the Counter interface to CounterService and perform a remote call to the cluster member hosting the counter data. Then, we will return a dummy result.

Remote calls are performed via a proxy in Hazelcast. Proxies expose the methods at the client side. Once a method is called, proxy creates an operation object, sends this object to the cluster member responsible from executing that operation, and then sends the result.

Making Counter a Distributed Object

First, we need to make the Counter interface a distributed object by extending the DistributedObject interface, as shown below.

import com.hazelcast.core.DistributedObject;

public interface Counter extends DistributedObject {
    int inc(int amount);
}

Implementing ManagedService and RemoteService

Now, we need to make the CounterService class implement not only the ManagedService interface, but also the interface com.hazelcast.spi.RemoteService. This way, a client will be able to get a handle of a counter proxy.

import com.hazelcast.core.DistributedObject;
import com.hazelcast.spi.ManagedService;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.RemoteService;

import java.util.Properties;

public class CounterService implements ManagedService, RemoteService {
    public static final String NAME = "CounterService";

    private NodeEngine nodeEngine;

    @Override
    public DistributedObject createDistributedObject(String objectName) {
        return new CounterProxy(objectName, nodeEngine, this);
    }

    @Override
    public void destroyDistributedObject(String objectName) {
        // for the time being a no-op, but in the later examples this will be implemented
    }

    @Override
    public void init(NodeEngine nodeEngine, Properties properties) {
        this.nodeEngine = nodeEngine;
    }

    @Override
    public void shutdown(boolean terminate) {
    }

    @Override
    public void reset() {
    }
}

The CounterProxy returned by the method createDistributedObject is a local representation to (potentially) remote managed data and logic.

image NOTE: Note that caching and removing the proxy instance are done outside of this service.

Implementing CounterProxy

Now, it is time to implement the CounterProxy as shown below.

import com.hazelcast.spi.AbstractDistributedObject;
import com.hazelcast.spi.InvocationBuilder;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.util.ExceptionUtil;

import java.util.concurrent.Future;

public class CounterProxy extends AbstractDistributedObject<CounterService> implements Counter {
    private final String name;

    public CounterProxy(String name, NodeEngine nodeEngine, CounterService counterService) {
        super(nodeEngine, counterService);
        this.name = name;
    }

    @Override
    public String getServiceName() {
        return CounterService.NAME;
    }

    @Override
    public String getName() {
        return name;
    }

    @Override
    public int inc(int amount) {
        NodeEngine nodeEngine = getNodeEngine();
        IncOperation operation = new IncOperation(name, amount);
        int partitionId = nodeEngine.getPartitionService().getPartitionId(name);
        InvocationBuilder builder = nodeEngine.getOperationService()
                .createInvocationBuilder(CounterService.NAME, operation, partitionId);
        try {
            final Future<Integer> future = builder.invoke();
            return future.get();
        } catch (Exception e) {
            throw ExceptionUtil.rethrow(e);
        }
    }
}

CounterProxy is a local representation of remote data/functionality. It does not include the counter state. Therefore, the method inc should be invoked on the cluster member hosting the real counter. You can invoke it using Hazelcast SPI; then it will send the operations to the correct member and return the results.

Let's dig deeper into the method inc.

  • First, we create IncOperation with a given name and amount.
  • Then, we get the partition ID based on the name; by this way, all operations for a given name will result in the same partition ID.
  • Then, we create an InvocationBuilder where the connection between operation and partition is made.
  • Finally, we invoke the InvocationBuilder and wait for its result. This waiting is performed with a future.get(). In our case, timeout is not important. However, it is a good practice to use a timeout for a real system since operations should complete in a certain amount of time.

Dealing with Exceptions

Hazelcast's ExceptionUtil is a good solution when it comes to dealing with execution exceptions. When the execution of the operation fails with an exception, an ExecutionException is thrown and handled with the method ExceptionUtil.rethrow(Throwable).

If it is an InterruptedException, we have two options: Either propagating the exception or just using the ExceptionUtil.rethrow for all exceptions. Please see below sample.

  try {
     final Future<Integer> future = invocation.invoke();
     return future.get();
  } catch(InterruptedException e){
     throw e;
  } catch(Exception e){
     throw ExceptionUtil.rethrow(e);
  }

Implementing the PartitionAwareOperation Interface

Now, let's write the IncOperation. It implements PartitionAwareOperation interface, meaning that it will be executed on the partition that hosts the counter.

import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.spi.AbstractOperation;
import com.hazelcast.spi.PartitionAwareOperation;

import java.io.IOException;

class IncOperation extends AbstractOperation implements PartitionAwareOperation {
    private String objectId;
    private int amount, returnValue;

    // Important to have a no-arg constructor for deserialization
    public IncOperation() {
    }

    public IncOperation(String objectId, int amount) {
        this.amount = amount;
        this.objectId = objectId;
    }

    @Override
    public void run() throws Exception {
        System.out.println("Executing " + objectId + ".inc() on: " + getNodeEngine().getThisAddress());
        returnValue = 0;
    }

    @Override
    public boolean returnsResponse() {
        return true;
    }

    @Override
    public Object getResponse() {
        return returnValue;
    }

    @Override
    protected void writeInternal(ObjectDataOutput out) throws IOException {
        super.writeInternal(out);
        out.writeUTF(objectId);
        out.writeInt(amount);
    }

    @Override
    protected void readInternal(ObjectDataInput in) throws IOException {
        super.readInternal(in);
        objectId = in.readUTF();
        amount = in.readInt();
    }
}

The method run does the actual execution. Since IncOperation will return a response, the method returnsResponse returns true. If your method is asynchronous and does not need to return a response, it is better to return false since it will be faster. The actual response is stored in the field returnValue; you can retrieve it with the method getResponse.

There are two more methods in the above code: writeInternal and readInternal. Since IncOperation needs to be serialized, these two methods should be overwritten, and hence, objectId and amount will be serialized and available when those operations are executed.

For the deserialization, note that the operation must have a no-arg constructor.

Running the Code

Now, let's run our code.

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;

import java.util.UUID;

public class Member {
    public static void main(String[] args) {
        HazelcastInstance[] instances = new HazelcastInstance[2];
        for (int k = 0; k < instances.length; k++)
            instances[k] = Hazelcast.newHazelcastInstance();

        Counter[] counters = new Counter[4];
        for (int k = 0; k < counters.length; k++)
            counters[k] = instances[0].getDistributedObject(CounterService.NAME, k+"counter");

        for (Counter counter : counters)
            System.out.println(counter.inc(1));

        System.out.println("Finished");
        System.exit(0);
    }
}

Once run, you will see the output as below.

Executing 0counter.inc() on: Address[192.168.1.103]:5702

0

Executing 1counter.inc() on: Address[192.168.1.103]:5702

0

Executing 2counter.inc() on: Address[192.168.1.103]:5701

0

Executing 3counter.inc() on: Address[192.168.1.103]:5701

0

Finished

Note that counters are stored in different cluster members. Also note that increment is not active for now since the value remains as 0.

Until now, we have performed the basics to get this up and running. In the next section, we will make a real counter, cache the proxy instances and deal with proxy instance destruction.