16.3. Internals 4: Distributed Map

Hazelcast distributed map is a peer to peer, partitioned implementation so entries put into the map will be almost evenly partitioned onto the existing members. Entries are partitioned according to their keys.

Every key is owned by a member. So every key-aware operation, such as put, remove, get is routed to the member owning the key.

Q. How does Hazelcast determine the owner of a key?

Hazelcast creates fixed number of virtual partitions (blocks). Partition count is set to 271 by default. Each key falls into one of these partitions. Each partition is owned/managed by a member. Oldest member of the cluster will assign the ownerships of the partitions and let every member know who owns which partitions. So at any given time, each member knows the owner member of a each partition. Hazelcast will convert your key object to com.hazelcast.nio.Data then calculate the partition of the owner:partition-of-the-key = hash(keyData) % PARTITION_COUNT. Since each member(JVM) knows the owner of each partition, each member can find out which member owns the key.

Q. Can I get the owner of a key?

Yes. Use Partition API to get the partition that your key falls into and then get the owner of that partition. Note that owner of the partition can change over time as new members join or existing members leave the cluster.

PartitionService partitionService = Hazelcast.getPartitionService();
Partition partition = partitionService.getPartition(key);
Member ownerMember = partition.getOwner();

Locally owned entries can be obtained by callingmap.localKeySet().

Q. What happens when a new member joins?

Just like any other member in the cluster, the oldest member also knows who owns which partition and what the oldest member knows is always right. The oldest member is also responsible for redistributing the partition ownerships when a new member joins. Since there is new member, oldest member will take ownership of some of the partitions and give them to the new member. It will try to move the least amount of data possible. New ownership information of all partitions is then sent to all members.

Notice that the new ownership information may not reach each member at the same time and the cluster never stops responding to user map operations even during joins so if a member routes the operation to a wrong member, target member will tell the caller to re-do the operation.

If a member's partition is given to the new member, then the member will send all entries of that partition to the new member (Migrating the entries). Eventually every member in the cluster will own almost same number of partitions, and almost same number of entries. Also eventually every member will know the owner of each partition (and each key).

You can listen for migration events. MigrationEvent contains thepartitionId,oldOwner, and newOwner information.

PartitionService partitionService = Hazelcast.getPartitionService();
partitionService.addMigrationListener(new MigrationListener () {

   public void migrationStarted(MigrationEvent migrationEvent) {
      System.out.println(migrationEvent);
   }

   public void migrationCompleted(MigrationEvent migrationEvent) {
      System.out.println(migrationEvent);
   }
});

Q. How about distributed set and list?

Both distributed set and list are implemented on top of distributed map. The underlying distributed map doesn't hold value; it only knows the key. Items added to both list and set are treated as keys. Unlike distributed set, since distributed list can have duplicate items, if an existing item is added again, copyCount of the entry (com.hazelcast.impl.ConcurrentMapManager.Record) is incremented. Also note that index based methods of distributed list, such as List.get(index) andList.indexOf(Object), are not supported because it is too costly to keep distributed indexes of list items so it is not worth implementing.

Check out the com.hazelcast.impl.ConcurrentMapManager class for the implementation. As you will see, the implementation is lock-free because ConcurrentMapManager is a singleton and processed by only one thread, theServiceThread.