Chapter 2. Distributed Data Structures

Table of Contents

2.1. Distributed Queue
2.2. Distributed Topic
2.3. Distributed Map
2.3.1. Backups
2.3.2. Eviction
2.3.3. Persistence
2.3.4. Query
2.3.5. Near Cache
2.3.6. Entry Statistics
2.4. Distributed MultiMap
2.5. Distributed Set
2.6. Distributed List
2.7. Distributed Lock
2.8. Distributed Events

Common Features of all Hazelcast Data Structures:

This manual is for an old version of Hazelcast IMDG, use the latest stable version.

Here is how you can retrieve existing data structure instances (map, queue, set, lock, topic, etc.) and how you can listen for instance events to get notified when an instance is created or destroyed.

import java.util.Collection;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.Instance;
import com.hazelcast.core.InstanceEvent;
import com.hazelcast.core.InstanceListener;

public class Sample implements InstanceListener {
    public static void main(String[] args) {
        Sample sample = new Sample();

        Hazelcast.addInstanceListener(sample);

        Collection<Instance> instances = Hazelcast.getInstances();
        for (Instance instance : instances) {
           System.out.println(instance.getInstanceType() + "," + instance.getId());
        }
    }

    public void instanceCreated(InstanceEvent event) {
       Instance instance = event.getInstance();
       System.out.println("Created " + instance.getInstanceType() + "," + instance.getId());
    }

    public void instanceDestroyed(InstanceEvent event) {
        Instance instance = event.getInstance();
        System.out.println("Destroyed " + instance.getInstanceType() + "," + instance.getId());
    }
}

2.1. Distributed Queue

Hazelcast distributed queue is an implementation ofjava.util.concurrent.BlockingQueue.

import com.hazelcast.core.Hazelcast;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

BlockingQueue<MyTask> q = Hazelcast.getQueue("tasks");
q.put(new MyTask());
MyTask task = q.take();

boolean offered = q.offer(new MyTask(), 10, TimeUnit.SECONDS);
task = q.poll (5, TimeUnit.SECONDS);
if (task != null) {
    //process task
}

If you have 10 million tasks in your "tasks" queue and you are running that code over 10 JVMs (or servers), then each server carries 1 million task objects (plus backups). FIFO ordering will apply to all queue operations cluster-wide. User objects (such as MyTask in the example above), that are (en/de)queued have to beSerializable. Maximum capacity per JVM and the TTL (Time to Live) for a queue can be configured as shown in the example below.

<hazelcast>
    ...
    <queue name="tasks">
        <!--
            Maximum size of the queue. When a JVM's local queue size reaches the maximum,
            all put/offer operations will get blocked until the queue size
            of the JVM goes down below the maximum.
            Any integer between 0 and Integer.MAX_VALUE. 0 means Integer.MAX_VALUE. Default is 0.
        -->
        <max-size-per-jvm>10000</max-size-per-jvm>

        <!--
            Maximum number of seconds for each item to stay in the queue. Items that are
            not consumed in <time-to-live-seconds> will get automatically evicted from the queue.
            Any integer between 0 and Integer.MAX_VALUE. 0 means infinite. Default is 0.
        -->
        <time-to-live-seconds>0</time-to-live-seconds>
    </queue>
</hazelcast>

As of version 1.9.3, distributed queues are backed by distributed maps. Thus, queues can have custom backup counts and persistent storage. Hazelcast will generate cluster-wide unique id for each element in the queue. Sample configuration:

<hazelcast>
    ...
    <queue name="tasks">
        <!--
            Maximum size of the queue. When a JVM's local queue size reaches the maximum,
            all put/offer operations will get blocked until the queue size
            of the JVM goes down below the maximum.
            Any integer between 0 and Integer.MAX_VALUE. 0 means Integer.MAX_VALUE. Default is 0.
        -->
        <max-size-per-jvm>10000</max-size-per-jvm>
        
        <!--
            Name of the map configuration that will be used for the backing distributed
            map for this queue.
        -->
        <backing-map-ref>queue-map</backing-map-ref>
    </queue>

    <map name="queue-map">

        <backup-count>1</backup-count>

        <map-store enabled="true">

            <class-name>com.your,company.storage.DBMapStore</class-name>

            <write-delay-seconds>0</write-delay-seconds>

        </map-store>

        ...

    </map>
</hazelcast>

If the backing map has no map-store defined then your distributed queue will be in-memory only. If the backing map has a map-store defined then Hazelcast will call your MapStore implementation to persist queue elements. Even if you reboot your cluster Hazelcast will rebuild the queue with its content. When implementing a MapStore for the backing map, note that type of the key is always Long and values are the elements you place into the queue. So make sure MapStore.loadAllKeys returns Set<Long> for instance.

To learn about wildcard configuration feature, see Wildcard Configuration page.