2.3. Distributed Map

Just like queue and set, Hazelcast will partition your map entries; and almost evenly distribute onto all Hazelcast members. Distributed maps have 1 backup (replica-count) by default so that if a member goes down, we don't lose data. Backup operations are synchronous so when a map.put(key, value) returns, it is guaranteed that the entry is replicated to one other node. For the reads, it is also guaranteed that map.get(key) returns the latest value of the entry. Consistency is strictly enforced.

import com.hazelcast.core.Hazelcast;
import java.util.Map;
import java.util.Collection;

Map<String, Customer> mapCustomers = Hazelcast.getMap("customers");
mapCustomers.put("1", new Customer("Joe", "Smith"));
mapCustomers.put("2", new Customer("Ali", "Selam"));
mapCustomers.put("3", new Customer("Avi", "Noyan"));

Collection<Customer> colCustomers = mapCustomers.values();
for (Customer customer : colCustomers) {
    // process customer
}

Hazelcast.getMap() actually returns com.hazelcast.core.IMap which extends java.util.concurrent.ConcurrentMap interface. So methods like ConcurrentMap.putIfAbsent(key,value) and ConcurrentMap.replace(key,value) can be used on distributed map as shown in the example below.

import com.hazelcast.core.Hazelcast;
import java.util.concurrent.ConcurrentMap;

Customer getCustomer (String id) {
    ConcurrentMap<String, Customer> map = Hazelcast.getMap("customers");
    Customer customer = map.get(id);
    if (customer == null) {
        customer = new Customer (id);
        customer = map.putIfAbsent(id, customer);
    }
    return customer;
}               

public boolean updateCustomer (Customer customer) {
    ConcurrentMap<String, Customer> map = Hazelcast.getMap("customers");
    return (map.replace(customer.getId(), customer) != null);            
}
                
public boolean removeCustomer (Customer customer) {
    ConcurrentMap<String, Customer> map = Hazelcast.getMap("customers");
    return map.remove(customer.getId(), customer) );           
}                                  
        

All ConcurrentMap operations such as put and remove might wait if the key is locked by another thread in the local or remote JVM, but they will eventually return with success. ConcurrentMap operations never throwjava.util.ConcurrentModificationException.

Also see:

2.3.1. Backups

Hazelcast will distribute map entries onto multiple JVMs (cluster members). Each JVM holds some portion of the data but we don't want to lose data when a member JVM crashes. To provide data-safety, Hazelcast allows you to specify the number of backup copies you want to have. That way data on a JVM will be copied onto other JVM(s). Hazelcast supports both sync and async backups. Sync backups block operations until backups are successfully copied to backups nodes (or deleted from backup nodes in case of remove) and acknowledgements are received. In contrast, async backups do not block operations, they are fire & forget and do not require acknowledgements. By default, Hazelcast will have one sync backup copy. If backup count >= 1, then each member will carry both owned entries and backup copies of other member(s). So for the map.get(key) call, it is possible that calling member has backup copy of that key but by default, map.get(key) will always read the value from the actual owner of the key for consistency. It is possible to enable backup reads by changing the configuration. Enabling backup reads will give you greater performance.

<hazelcast>
    ...
    <map name="default">
        <!--
            Number of sync-backups. If 1 is set as the backup-count for example,
            then all entries of the map will be copied to another JVM for
            fail-safety. Valid numbers are 0 (no backup), 1, 2, 3.
        -->
        <backup-count>1</backup-count>

        <!--
            Number of async-backups. If 1 is set as the backup-count for example,
            then all entries of the map will be copied to another JVM for
            fail-safety. Valid numbers are 0 (no backup), 1, 2, 3.
        -->
        <async-backup-count>1</async-backup-count>

        <!--
            Can we read the local backup entries? Default value is false for
            strong consistency. Being able to read backup data will give you
            greater performance.
        -->
        <read-backup-data>false</read-backup-data>

        ...
    </map>
</hazelcast>

2.3.2. Eviction

Hazelcast also supports policy based eviction for distributed map. Currently supported eviction policies are LRU (Least Recently Used) and LFU (Least Frequently Used). This feature enables Hazelcast to be used as a distributed cache. If time-to-live-seconds is not 0 then entries older than time-to-live-seconds value will get evicted, regardless of the eviction policy set. Here is a sample configuration for eviction:

<hazelcast>
    ...
    <map name="default">
        <!--
            Number of backups. If 1 is set as the backup-count for example,
            then all entries of the map will be copied to another JVM for
            fail-safety. Valid numbers are 0 (no backup), 1, 2, 3.
        -->
        <backup-count>1</backup-count>

        <!--
            Maximum number of seconds for each entry to stay in the map. Entries that are
            older than <time-to-live-seconds> and not updated for <time-to-live-seconds>
            will get automatically evicted from the map.
            Any integer between 0 and Integer.MAX_VALUE. 0 means infinite. Default is 0.
        -->
        <time-to-live-seconds>0</time-to-live-seconds>

        <!--
            Maximum number of seconds for each entry to stay idle in the map. Entries that are
            idle(not touched) for more than <max-idle-seconds> will get
            automatically evicted from the map.
            Entry is touched if get, put or containsKey is called.
            Any integer between 0 and Integer.MAX_VALUE.
            0 means infinite. Default is 0.
        -->
        <max-idle-seconds>0</max-idle-seconds>

        <!--
            Valid values are:
            NONE (no extra eviction, <time-to-live-seconds> may still apply),
            LRU  (Least Recently Used),
            LFU  (Least Frequently Used).
            NONE is the default.
            Regardless of the eviction policy used, <time-to-live-seconds> will still apply. 
        -->
        <eviction-policy>LRU</eviction-policy>

        <!--
            Maximum size of the map. When max size is reached,
            map is evicted based on the policy defined.
            Any integer between 0 and Integer.MAX_VALUE. 0 means
            Integer.MAX_VALUE. Default is 0.
        -->
        <max-size policy="cluster_wide_map_size">5000</max-size>

        <!--
            When max. size is reached, specified percentage of
            the map will be evicted. Any integer between 0 and 100.
            If 25 is set for example, 25% of the entries will
            get evicted.
        -->
        <eviction-percentage>25</eviction-percentage>
       <!--
            Specifies when eviction will be started. Default value is 3. 
           So every 3 (+up to 5 for performance reasons) seconds 
           eviction will be kicked of. Eviction is costly operation, setting 
           this number too low, can decrease the performance. 
       -->
      <eviction-delay-seconds>3</eviction-delay-seconds>
    </map>
</hazelcast>

Max-Size Policies

There are 5 defined policies can be used in max-size configuration.

  1. cluster_wide_map_size: Cluster-wide total max map size (default policy).

    <max-size policy="cluster_wide_map_size">50000</max-size>

  2. map_size_per_jvm: Max map size per JVM.

    <max-size policy="map_size_per_jvm">5000</max-size>

  3. partitions_wide_map_size: Partitions (default 271) wide max map size.

    <max-size policy="partitions_wide_map_size">27100</max-size>

  4. used_heap_size: Max used heap size in MB (mega-bytes) per JVM.

    <max-size policy="used_heap_size">4096</max-size>

  5. used_heap_percentage: Max used heap size percentage per JVM.

    <max-size policy="used_heap_percentage">75</max-size>

2.3.3. Persistence

Hazelcast allows you to load and store the distributed map entries from/to a persistent datastore such as relational database. If a loader implementation is provided, when get(key) is called, if the map entry doesn't exist in-memory then Hazelcast will call your loader implementation to load the entry from a datastore. If a store implementation is provided, when put(key,value) is called, Hazelcast will call your store implementation to store the entry into a datastore. Hazelcast can call your implementation to store the entries synchronously (write-through) with no-delay or asynchronously (write-behind) with delay and it is defined by the write-delay-seconds value in the configuration.

If it is write-through, when the map.put(key,value) call returns, you can be sure that

  • MapStore.store(key,value) is successfully called so the entry is persisted.

  • In-Memory entry is updated

  • In-Memory backup copies are successfully created on other JVMs (if backup-count is greater than 0)

If it is write-behind, when the map.put(key,value) call returns, you can be sure that

  • In-Memory entry is updated

  • In-Memory backup copies are successfully created on other JVMs (if backup-count is greater than 0)

  • The entry is marked as dirty so that after write-delay-seconds, it can be persisted.

Same behavior goes for the remove(key and MapStore.delete(key). If MapStore throws an exception then the exception will be propagated back to the original put or remove call in the form of RuntimeException. When write-through is used, Hazelcast will call MapStore.store(key,value) and MapStore.delete(key) for each entry update. When write-behind is used, Hazelcast will callMapStore.store(map), and MapStore.delete(collection) to do all writes in a single call. Also note that your MapStore or MapLoader implementation should not use Hazelcast Map/Queue/MultiMap/List/Set operations. Your implementation should only work with your data store. Otherwise you may get into deadlock situations.

Here is a sample configuration:

<hazelcast>
    ...
    <map name="default">
        ...
        <map-store enabled="true">
            <!--
               Name of the class implementing MapLoader and/or MapStore.
               The class should implement at least of these interfaces and
               contain no-argument constructor. Note that the inner classes are not supported.
            -->
            <class-name>com.hazelcast.examples.DummyStore</class-name>
            <!--
               Number of seconds to delay to call the MapStore.store(key, value).
               If the value is zero then it is write-through so MapStore.store(key, value)
               will be called as soon as the entry is updated.
               Otherwise it is write-behind so updates will be stored after write-delay-seconds
               value by calling Hazelcast.storeAll(map). Default value is 0.
            -->
            <write-delay-seconds>0</write-delay-seconds>
        </map-store>
    </map>
</hazelcast>

Initialization on startup:

As of 1.9.3 MapLoader has the new MapLoader.loadAllKeys API. It is used for pre-populating the in-memory map when the map is first touched/used. If MapLoader.loadAllKeys returns NULL then nothing will be loaded. Your MapLoader.loadAllKeys implementation can return all or some of the keys. You may select and return only the hot keys, for instance. Also note that this is the fastest way of pre-populating the map as Hazelcast will optimize the loading process by having each node loading owned portion of the entries.

Here is MapLoader initialization flow;

  1. When getMap() first called from any node, initialization starts

  2. Hazelcast will call MapLoader.loadAllKeys() to get all your keys on each node

  3. Each node will figure out the list of keys it owns

  4. Each node will load all its owned keys by calling MapLoader.loadAll(keys)

  5. Each node puts its owned entries into the map by calling IMap.putTransient(key,value)

2.3.4. Query

Hazelcast partitions your data and spreads across cluster of servers. You can surely iterate over the map entries and look for certain entries you are interested in but this is not very efficient as you will have to bring entire entry set and iterate locally. Instead, Hazelcast allows you to run distributed queries on your distributed map.

Let's say you have a "employee" map containing values of Employee objects:

import java.io.Serializable;

public class Employee implements Serializable {
private String name;
private int age;
private boolean active;
private double salary;

public Employee(String name, int age, boolean live, double price) {
    this.name = name;
    this.age = age;
    this.active = live;
    this.salary = price;
}

public Employee() {
}

public String getName() {
    return name;
}

public int getAge() {
    return age;
}

public double getSalary() {
    return salary;
}

public boolean isActive() {
    return active;
}
}

Now you are looking for the employees who are active and with age less than 30. Hazelcast allows you to find these entries in two different ways:

Distributed SQL Query

SqlPredicate takes regular SQL where clause. Here is an example:

import com.hazelcast.core.IMap;
import com.hazelcast.query.SqlPredicate;

IMap map = Hazelcast.getMap("employee");

Set<Employee> employees = (Set<Employee>) map.values(new SqlPredicate("active AND age < 30"));

Supported SQL syntax:

  • AND/OR

    • <expression> AND <expression> AND <expression>...

      • active AND age>30

      • active=false OR age = 45 OR name = 'Joe'

      • active AND (age >20 OR salary < 60000)

  • =, !=, <, <=, >, >=

    • <expression> = value

      • age <= 30

      • name ="Joe"

      • salary != 50000

  • BETWEEN

    • <attribute> [NOT] BETWEEN <value1> AND <value2>

      • age BETWEEN 20 AND 33 (same as age >=20 AND age<=33)

      • age NOT BETWEEN 30 AND 40 (same as age <30 OR age>40)

  • LIKE

    • <attribute> [NOT] LIKE 'expression'

      % (percentage sign) is placeholder for many characters, _ (underscore) is placeholder for only one character.

      • name LIKE 'Jo%' (true for 'Joe', 'Josh', 'Joseph' etc.)

      • name LIKE 'Jo_' (true for 'Joe'; false for 'Josh')

      • name NOT LIKE 'Jo_' (true for 'Josh'; false for 'Joe')

      • name LIKE 'J_s%' (true for 'Josh', 'Joseph'; false 'John', 'Joe')

  • IN

    • <attribute> [NOT] IN (val1, val2, ...)

      • age IN (20, 30, 40)

      • age NOT IN (60, 70)

Examples:

  • active AND (salary >= 50000 OR (age NOT BETWEEN 20 AND 30))

  • age IN (20, 30, 40) AND salary BETWEEN (50000, 80000)

Criteria API

If SQL is not enough or programmable queries are preferred then JPA criteria like API can be used. Here is an example:

import com.hazelcast.core.IMap;
import com.hazelcast.query.Predicate;
import com.hazelcast.query.PredicateBuilder;
import com.hazelcast.query.EntryObject;

IMap map = Hazelcast.getMap("employee");

EntryObject e = new PredicateBuilder().getEntryObject();
Predicate predicate = e.is("active").and(e.get("age").lessThan(30));

Set<Employee> employees = (Set<Employee>) map.values(predicate);

Indexing

Hazelcast distributed queries will run on each member in parallel and only results will return the caller. When a query runs on a member, Hazelcast will iterate through the entire owned entries and find the matching ones. Can we make this even faster? Yes by indexing the mostly queried fields. Just like you would do for your database. Of course, indexing will add overhead for each write operation but queries will be a lot faster. If you are querying your map a lot then make sure to add indexes for most frequently queried fields. So if your active and age < 30 query, for example, is used a lot then make sure you add index for active and age fields. Here is how:

IMap imap = Hazelcast.getMap("employees");
imap.addIndex("age", true);        // ordered, since we have ranged queries for this field
imap.addIndex("active", false);    // not ordered, because boolean field cannot have range

API IMap.addIndex(fieldName, ordered) is used for adding index. For a each indexed field, if you have -ranged- queries such asage>30, age BETWEEN 40 AND 60 then ordered parameter should betrue, otherwise set it tofalse.

2.3.5. Near Cache

Map entries in Hazelcast are partitioned across the cluster. Imagine that you are reading key k so many times and k is owned by another member in your cluster. Each map.get(k) will be a remote operation; lots of network trips. If you have a map that is read-mostly then you should consider creating a Near Cache for the map so that reads can be much faster and consume less network traffic. All these benefits don't come free. When using near cache, you should consider the following issues:

  • JVM will have to hold extra cached data so it will increase the memory consumption.

  • If invalidation is turned on and entries are updated frequently, then invalidations will be costly.

  • Near cache breaks the strong consistency guarantees; you might be reading stale data.

Near cache is highly recommended for the maps that are read-mostly. Here is a near-cache configuration for a map :

<hazelcast>
    ...
    <map name="my-read-mostly-map">
        ...
        <near-cache>
            <!--
                Maximum size of the near cache. When max size is reached,
                cache is evicted based on the policy defined.
                Any integer between 0 and Integer.MAX_VALUE. 0 means
                Integer.MAX_VALUE. Default is 0.
            -->
            <max-size>5000</max-size>
            <!--
                Maximum number of seconds for each entry to stay in the near cache. Entries that are
                older than <time-to-live-seconds> will get automatically evicted from the near cache.
                Any integer between 0 and Integer.MAX_VALUE. 0 means infinite. Default is 0.
            -->
            <time-to-live-seconds>0</time-to-live-seconds>

            <!--
                Maximum number of seconds each entry can stay in the near cache as untouched (not-read).
                Entries that are not read (touched) more than <max-idle-seconds> value will get removed
                from the near cache.
                Any integer between 0 and Integer.MAX_VALUE. 0 means
                Integer.MAX_VALUE. Default is 0.
            -->
            <max-idle-seconds>60</max-idle-seconds>

            <!--
                Valid values are:
                NONE (no extra eviction, <time-to-live-seconds> may still apply),
                LRU  (Least Recently Used),
                LFU  (Least Frequently Used).
                NONE is the default.
                Regardless of the eviction policy used, <time-to-live-seconds> will still apply.
            -->
            <eviction-policy>LRU</eviction-policy>

            <!--
                Should the cached entries get evicted if the entries are changed (updated or removed).
                true of false. Default is true.
            -->
            <invalidate-on-change>true</invalidate-on-change>

        </near-cache>
    </map>
</hazelcast>

2.3.6. Entry Statistics

Hazelcast keeps extra information about each map entry such as creationTime, lastUpdateTime, lastAccessTime, number of hits, version, and this information is exposed to the developer via IMap.getMapEntry(key) call. Here is an example:

import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.MapEntry;

MapEntry entry = Hazelcast.getMap("quotes").getMapEntry("1");
System.out.println ("size in memory  : " + entry.getCost();
System.out.println ("creationTime    : " + entry.getCreationTime();
System.out.println ("expirationTime  : " + entry.getExpirationTime();
System.out.println ("number of hits  : " + entry.getHits();
System.out.println ("lastAccessedTime: " + entry.getLastAccessTime();
System.out.println ("lastUpdateTime  : " + entry.getLastUpdateTime();
System.out.println ("version         : " + entry.getVersion();
System.out.println ("isValid         : " + entry.isValid();
System.out.println ("key             : " + entry.getKey();
System.out.println ("value           : " + entry.getValue();
System.out.println ("oldValue        : " + entry.setValue(newValue);

2.3.7. Indexing

Map entries can be indexed to be able to query faster. These indexes can be created using IMap API. But this usage has a limitation; all indexes must be created before any value is put into map. Sometimes by design adding an index to map may be impossible before any value is added. For example if a map has MapLoader that loads entries during map creation, then adding indexes to map becomes meaningless. To solve this problem, Hazelcast introduces defining IMap indexes in configuration.

  • Hazelcast XML configuration

    <map name="default">
        ...
        <indexes>
            <index ordered="false">name</index>
            <index ordered="true">age</index>
        </indexes>
    </map>

  • Config API

                            mapConfig.addMapIndexConfig(new MapIndexConfig("name", false));
                            mapConfig.addMapIndexConfig(new MapIndexConfig("age", true));
                        

  • Spring XML configuration

    <hz:map name="default">
        <hz:indexes>
            <hz:index attribute="name"/>
            <hz:index attribute="age" ordered="true"/>
        </hz:indexes>
    </hz:map>

To learn about wildcard configuration feature, see Wildcard Configuration page.