EMMA Coverage Report (generated Sun May 02 20:42:29 CEST 2010)
[all classes][hu.netmind.beankeeper.operation.impl]

COVERAGE SUMMARY FOR SOURCE FILE [OperationTrackerImpl.java]

nameclass, %method, %block, %line, %
OperationTrackerImpl.java100% (1/1)100% (9/9)73%  (285/391)77%  (54.4/71)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class OperationTrackerImpl100% (1/1)100% (9/9)73%  (285/391)77%  (54.4/71)
waitForQuery (Long): void 100% (1/1)49%  (40/81)48%  (6.7/14)
handle (PersistenceEvent): void 100% (1/1)53%  (27/51)58%  (7/12)
startCommit (int): Long 100% (1/1)80%  (75/94)86%  (12/14)
endCommit (int, Long, Long): void 100% (1/1)80%  (88/110)86%  (13.7/16)
<static initializer> 100% (1/1)100% (4/4)100% (1/1)
OperationTrackerImpl (): void 100% (1/1)100% (17/17)100% (5/5)
getMutex (): Object 100% (1/1)100% (3/3)100% (1/1)
init (Map): void 100% (1/1)100% (16/16)100% (4/4)
release (): void 100% (1/1)100% (15/15)100% (4/4)

1/**
2 * Copyright (C) 2006 NetMind Consulting Bt.
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 3 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 * Lesser General Public License for more details.
13 *
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with this library; if not, write to the Free Software
16 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17 */
18 
19package hu.netmind.beankeeper.operation.impl;
20 
21import hu.netmind.beankeeper.event.EventDispatcher;
22import hu.netmind.beankeeper.event.PersistenceEventListener;
23import hu.netmind.beankeeper.event.PersistenceEvent;
24import hu.netmind.beankeeper.serial.SerialTracker;
25import hu.netmind.beankeeper.operation.OperationTracker;
26import hu.netmind.beankeeper.node.NodeManager;
27import hu.netmind.beankeeper.node.event.RemoteStateChangeEvent;
28import org.apache.log4j.Logger;
29import java.util.*;
30 
31/**
32 * This class tracks current commits and queries. The public methods are all server side methods.
33 * @author Brautigam Robert
34 * @version Revision: $Revision$
35 */
36public class OperationTrackerImpl implements OperationTracker,PersistenceEventListener
37{
38   private static Logger logger = Logger.getLogger(OperationTrackerImpl.class);
39   
40   private Object mutex = new Object();
41   private HashMap commitsByIndex;
42   private SortedSet commitSerials;
43 
44   private EventDispatcher eventDispatcher = null; // Injected
45   private NodeManager nodeManager = null; // Injected
46   private SerialTracker serialTracker = null; // Injected
47 
48   /**
49    * Initialize.
50    */
51   public void init(Map parameters)
52   {
53      commitsByIndex = new HashMap();
54      commitSerials = new TreeSet();
55      eventDispatcher.registerListener(this,EventDispatcher.PRI_SYSTEM_LOW);
56   }
57 
58   /**
59    * Release all resources associated with this service.
60    */
61   public void release()
62   {
63      eventDispatcher.unregisterListener(this);
64      commitsByIndex = new HashMap();
65      commitSerials = new TreeSet();
66   }
67 
68   /**
69    * Get the operations mutex. This method is not usable remotely.
70    */
71   public Object getMutex()
72   {
73      return mutex;
74   }
75 
76   public Long startCommit(int index)
77   {
78      // Make it a server call
79      if ( nodeManager.getRole() == NodeManager.NodeRole.CLIENT )
80      {
81         return (Long) nodeManager.callServer(OperationTracker.class.getName(),
82               "startCommit",new Class[] { int.class },
83               new Object[] { index });
84      }
85      synchronized ( getMutex() )
86      {
87         // Get serial and register it
88         Long serial = serialTracker.getNextSerial();
89         Set serials = (Set) commitsByIndex.get(new Integer(index));
90         if ( serials == null )
91         {
92            serials = new HashSet();
93            commitsByIndex.put(new Integer(index),serials);
94         }
95         serials.add(serial);
96         commitSerials.add(serial);
97         // Return serial
98         if ( logger.isDebugEnabled() )
99            logger.debug("starting commit lock for node "+index+", serial: "+serial);
100         return serial;
101      }
102   }
103 
104   public void endCommit(int index, Long serial, Long txSerial)
105   {
106      // Make it a server call
107      if ( nodeManager.getRole() == NodeManager.NodeRole.CLIENT )
108      {
109         nodeManager.callServer(OperationTracker.class.getName(),
110               "endCommit",new Class[] { int.class, Long.class, Long.class },
111               new Object[] { index, serial, txSerial });
112         return;
113      }
114      // Local
115      if ( logger.isDebugEnabled() )
116         logger.debug("ending commit lock for node "+index+", serial: "+serial);
117      synchronized ( getMutex() )
118      {
119         // Remove the serial
120         Set serials = (Set) commitsByIndex.get(new Integer(index));
121         if ( serials == null )
122            return;
123         serials.remove(serial);
124         if ( serials.size() == 0 )
125            commitsByIndex.remove(new Integer(index));
126         commitSerials.remove(serial);
127         // Notify waiting queries to re-check commit serials
128         getMutex().notifyAll();
129      }
130   }
131 
132   public void handle(PersistenceEvent event)
133   {
134      if ( event instanceof RemoteStateChangeEvent )
135      {
136         // If a remote node disconnected, then remove all the current
137         // operations of that node.
138         int index = ((RemoteStateChangeEvent) event).getNodeId();
139         synchronized ( getMutex() )
140         {
141            Set serials = (Set) commitsByIndex.remove(new Integer(index));
142            if ( serials == null )
143               return;
144            Iterator serialIterator = serials.iterator();
145            while ( serialIterator.hasNext() )
146               commitSerials.remove(serialIterator.next());
147            // Notify waiting queries to re-check commit serials
148            getMutex().notifyAll();
149         }
150      }
151   }
152 
153   /**
154    * This method must block until all commits which are lower
155    * serials are finished.
156    */
157   public void waitForQuery(Long serial)
158   {
159      // Make it a server call
160      if ( nodeManager.getRole() == NodeManager.NodeRole.CLIENT )
161      {
162         nodeManager.callServer(OperationTracker.class.getName(),
163               "waitForQuery",new Class[] { Long.class },
164               new Object[] { serial });
165         return;
166      }
167      // Local
168      synchronized ( getMutex() )
169      {
170         // Get the lowest commit serial. If that is higher than this
171         // serial, than all commits are finished before this query.
172         Long lowest = null;
173         while ( (!commitSerials.isEmpty()) && ((lowest=(Long) commitSerials.first()).compareTo(serial) <= 0) )
174         {
175            if ( logger.isDebugEnabled() )
176               logger.debug("serial '"+serial+"' blocked by: "+lowest);
177            try
178            {
179               // Wait until some commit finishes
180               getMutex().wait();
181            } catch ( InterruptedException e ) {
182               logger.warn("query wait was interrupted",e);
183            }
184         }
185      }
186   }
187}
188 
189 

[all classes][hu.netmind.beankeeper.operation.impl]
EMMA 2.0.5312debian (C) Vladimir Roubtsov