EMMA Coverage Report (generated Sun May 02 20:42:29 CEST 2010)
[all classes][hu.netmind.beankeeper.modification.impl]

COVERAGE SUMMARY FOR SOURCE FILE [ModificationTrackerImpl.java]

nameclass, %method, %block, %line, %
ModificationTrackerImpl.java100% (2/2)90%  (18/20)73%  (790/1087)80%  (164.7/206)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class ModificationTrackerImpl$ModificationEntry100% (1/1)75%  (3/4)63%  (17/27)75%  (3/4)
equals (Object): boolean 0%   (0/1)0%   (0/10)0%   (0/1)
ModificationTrackerImpl$ModificationEntry (ModificationTrackerImpl): void 100% (1/1)100% (6/6)100% (1/1)
compareTo (Object): int 100% (1/1)100% (7/7)100% (1/1)
hashCode (): int 100% (1/1)100% (4/4)100% (1/1)
     
class ModificationTrackerImpl100% (1/1)94%  (15/16)73%  (773/1060)80%  (162.7/203)
isCurrent (Object): boolean 0%   (0/1)0%   (0/7)0%   (0/1)
hasChanged (Long, Class, Long): boolean 100% (1/1)43%  (113/261)44%  (17/39)
isCurrent (Class, Long): boolean 100% (1/1)67%  (84/125)82%  (18/22)
isCurrent (PersistenceMetaData): boolean 100% (1/1)69%  (78/113)76%  (16/21)
changeCandidates (List, Long, Long): void 100% (1/1)75%  (150/201)79%  (29.4/37)
endTransaction (Long): void 100% (1/1)94%  (74/79)95%  (12.4/13)
<static initializer> 100% (1/1)100% (4/4)100% (1/1)
ModificationTrackerImpl (): void 100% (1/1)100% (55/55)100% (16/16)
configurationChanged (ConfigurationEvent): void 100% (1/1)100% (11/11)100% (3/3)
configurationReload (): void 100% (1/1)100% (18/18)100% (3/3)
handle (PersistenceEvent): void 100% (1/1)100% (33/33)100% (8/8)
init (Map): void 100% (1/1)100% (12/12)100% (4/4)
modifyClassEntries (Class, ModificationTrackerImpl$ModificationEntry, boolean... 100% (1/1)100% (65/65)100% (16/16)
modifyClassEntries (ModificationTrackerImpl$ModificationEntry, boolean): void 100% (1/1)100% (7/7)100% (2/2)
release (): void 100% (1/1)100% (37/37)100% (9/9)
setLastSerial (Class, Long): void 100% (1/1)100% (32/32)100% (8/8)

1/**
2 * Copyright (C) 2009 NetMind Consulting Bt.
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 3 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 * Lesser General Public License for more details.
13 *
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with this library; if not, write to the Free Software
16 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17 */
18 
19package hu.netmind.beankeeper.modification.impl;
20 
21import hu.netmind.beankeeper.model.*;
22import hu.netmind.beankeeper.event.EventDispatcher;
23import hu.netmind.beankeeper.event.PersistenceEventListener;
24import hu.netmind.beankeeper.event.PersistenceEvent;
25import hu.netmind.beankeeper.transaction.event.RemoteTransactionPostCommitEvent;
26import hu.netmind.beankeeper.store.event.ObjectsFinalizationEvent;
27import hu.netmind.beankeeper.modification.ModificationTracker;
28import hu.netmind.beankeeper.object.PersistenceMetaData;
29import hu.netmind.beankeeper.object.Identifier;
30import hu.netmind.beankeeper.object.ObjectTracker;
31import hu.netmind.beankeeper.node.NodeManager;
32import hu.netmind.beankeeper.operation.OperationTracker;
33import hu.netmind.beankeeper.query.QueryService;
34import java.util.*;
35import org.apache.log4j.Logger;
36import org.apache.commons.configuration.event.ConfigurationEvent;
37import hu.netmind.beankeeper.config.ExtendedConfigurationListener;
38import hu.netmind.beankeeper.config.ConfigurationTracker;
39 
40/**
41 * This class tracks changes to objects. It caches the most recent changes, but
42 * also determines whether an object changed by reading the database, if the
43 * cache can't give a definite answer.
44 * @author Robert Brautigam
45 * @version CVS Revision: $Revision$
46 */
47public class ModificationTrackerImpl implements ModificationTracker, 
48      ExtendedConfigurationListener, PersistenceEventListener
49{
50   private long MAX_ENTRY_AGE = 30*60*1000;
51   private int MAX_ENTRY_ITEMS = 10000;
52   private static Logger logger = Logger.getLogger(ModificationTrackerImpl.class);
53 
54   private Map entriesById = new HashMap();
55   private SortedSet entriesByDate = new TreeSet();
56   private Map entriesByTxSerial = new HashMap();
57   private Map entriesByClass = new HashMap();
58   private Map lastSerialsByClass = new HashMap();
59   private Long lastCacheRemovedSerial = null;
60 
61   private EventDispatcher eventDispatcher = null; // Injected
62   private ConfigurationTracker configurationTracker = null; // Injected
63   private OperationTracker operationTracker = null; // Injected
64   private NodeManager nodeManager = null; // Injected
65   private ObjectTracker objectTracker = null; // Injected
66   private QueryService queryService = null; // Injected
67 
68   public void init(Map parameters)
69   {
70      eventDispatcher.registerListener(this,EventDispatcher.PRI_SYSTEM_LOW);
71      // Config
72      configurationReload();
73      configurationTracker.addListener(this);
74   }
75 
76   public void release()
77   {
78      entriesById = new HashMap();
79      entriesByDate = new TreeSet();
80      entriesByTxSerial = new HashMap();
81      entriesByClass = new HashMap();
82      lastSerialsByClass = new HashMap();
83      lastCacheRemovedSerial = null;
84      eventDispatcher.unregisterListener(this);
85      configurationTracker.removeListener(this);
86   }
87 
88   private void modifyClassEntries(ModificationEntry entry, boolean add)
89   {
90      modifyClassEntries(entry.objectClass,entry,add);
91   }
92 
93   private void modifyClassEntries(Class currentClass, ModificationEntry entry, boolean add)
94   {
95      if ( currentClass == null )
96         return;
97      // Do current class
98      Set entries = (Set) entriesByClass.get(currentClass);
99      if ( entries == null )
100      {
101         entries = new HashSet();
102         entriesByClass.put(currentClass,entries);
103      }
104      if ( add )
105         entries.add(entry);
106      else
107         entries.remove(entry);
108      if ( entries.size() == 0 )
109         entriesByClass.remove(currentClass);
110      // Do superclass
111      modifyClassEntries(currentClass.getSuperclass(),entry,add);
112      // Do interfaces
113      Class interfaces[] = currentClass.getInterfaces();
114      for ( int i=0; i<interfaces.length; i++ )
115         modifyClassEntries(interfaces[i],entry,add);
116   }
117 
118   /**
119    * This method registers "potential" change serials for a given id. The
120    * change becomes final only when the transaction's commit ends.
121    * @param meta The list of metainformation for objects that changed.
122    * @param endSerial The serial on which these changes went into database.
123    * @param txSerial The tx identifier for later tracking, whether the tx committed.
124    */
125   private void changeCandidates(List<PersistenceMetaData> metas, Long endSerial, Long txSerial)
126   {
127      synchronized ( operationTracker.getMutex() )
128      {
129         // Clear obsolate data
130         while ( ((!entriesByDate.isEmpty()) && ( System.currentTimeMillis()-
131                  ((ModificationEntry) entriesByDate.first()).entryDate.getTime() > MAX_ENTRY_AGE )) ||
132              (entriesByDate.size() > MAX_ENTRY_ITEMS) )
133         {
134            // The first entry is obsolate, so clear it from the cache
135            ModificationEntry entry = (ModificationEntry) entriesByDate.first();
136            entriesById.remove(entry.id);
137            entriesByDate.remove(entry);
138            if ( entry.potentialTxSerial != null )
139               entriesByTxSerial.remove(entry.potentialTxSerial);
140            if ( entry.lastChangeSerial != null )
141               lastCacheRemovedSerial = entry.lastChangeSerial;
142            modifyClassEntries(entry,false); // Remove
143         }
144         // Add all data to internal structures
145         if ( logger.isDebugEnabled() )
146            logger.debug("changed following objects: "+metas+", with endserial: "+endSerial+", tx: "+txSerial);
147         ArrayList entries = new ArrayList();
148         entriesByTxSerial.put(txSerial,entries);
149         for ( int i=0; i<metas.size(); i++ )
150         {
151            PersistenceMetaData meta = (PersistenceMetaData) metas.get(i);
152            // Search or create entry
153            ModificationEntry entry = (ModificationEntry) entriesById.get(meta.getPersistenceId());
154            if ( entry == null )
155            {
156               entry = new ModificationEntry();
157               entry.id = meta.getPersistenceId();
158               entry.objectClass = meta.getObjectClass();
159               entry.entryDate = new Date();
160               // Update data structure
161               entriesById.put(entry.id,entry);
162               entriesByDate.add(entry);
163               modifyClassEntries(entry,true);
164            }
165            entry.potentialChangeSerial=endSerial;
166            if ( (entry.potentialTxSerial != null) && (!txSerial.equals(entry.potentialTxSerial)) )
167            {
168               // This entry was part of an old transaction, which was
169               // unfinished (endCommit was not received). So delete from
170               // it's list.
171               List obsolateTxList = (List) entriesByTxSerial.get(entry.potentialTxSerial);
172               if ( obsolateTxList != null )
173               {
174                  obsolateTxList.remove(entry);
175                  if ( obsolateTxList.isEmpty() )
176                     entriesByTxSerial.remove(entry.potentialTxSerial);
177               }
178            }
179            entry.potentialTxSerial=txSerial;
180            entries.add(entry);
181         }
182      }
183   }
184 
185   /**
186    * Set definite last modification serial for a class, and for all superclasses
187    * and interfaces.
188    */
189   private void setLastSerial(Class currentClass, Long serial)
190   {
191      if ( currentClass == null )
192         return;
193      // Handle class
194      lastSerialsByClass.put(currentClass,serial);
195      // Superclass
196      setLastSerial(currentClass.getSuperclass(),serial);
197      // Interfaces
198      Class interfaces[] = currentClass.getInterfaces();
199      for ( int i=0; i<interfaces.length; i++ )
200         setLastSerial(interfaces[i],serial);
201   }
202 
203   public void handle(PersistenceEvent event)
204   {
205      // Handle end of transactions
206      if ( event instanceof RemoteTransactionPostCommitEvent )
207      {
208         if ( nodeManager.getRole() == NodeManager.NodeRole.SERVER )
209            endTransaction(((RemoteTransactionPostCommitEvent) event).getTxSerial());
210      }
211      if ( event instanceof ObjectsFinalizationEvent )
212      {
213         if ( nodeManager.getRole() == NodeManager.NodeRole.SERVER )
214         {
215            ObjectsFinalizationEvent finEvent = (ObjectsFinalizationEvent) event;
216            changeCandidates(finEvent.getMetas(),finEvent.getSerial(),finEvent.getTxSerial());
217         }
218      }
219   }
220 
221   /**
222    * Ends a transaction, which means it finalizes the entries associated
223    * with the given transaction.
224    */
225   private void endTransaction(Long txSerial)
226   {
227      synchronized ( operationTracker.getMutex() )
228      {
229         // Get and remove entries from tx map
230         List entries = (List) entriesByTxSerial.remove(txSerial);
231         if ( entries == null )
232         {
233            logger.debug("tx "+txSerial+" contained no changes, closing tx.");
234            return; // No ids, probably it became obsolate
235         }
236         logger.debug("activating "+entries.size()+" changes for tx: "+txSerial);
237         // Go through entries of ids and actualize them
238         for ( int i=0; i<entries.size(); i++ )
239         {
240            ModificationEntry entry = (ModificationEntry) entries.get(i);
241            entry.lastChangeSerial = entry.potentialChangeSerial;
242            entry.potentialTxSerial = null;
243            // Insert into the 'persistent' class serial table.
244            setLastSerial(entry.objectClass,entry.lastChangeSerial);
245         }
246      }
247   }
248 
249   /**
250    * Return whether the given object changed since the given serial.
251    */
252   private boolean hasChanged(Long id, Class objectClass, Long serial)
253   {
254      // Try to get a modification entry if there is one for that object
255      ModificationEntry entry = (ModificationEntry) entriesById.get(id);
256      // If there was no entry, then there was no modification
257      // since the last removed serial. If the query serial is newer than
258      // the last removed serial, then the object must be current.
259      if ( (entry==null) && ((lastCacheRemovedSerial==null) || 
260               (serial.longValue() > lastCacheRemovedSerial.longValue())) )
261      {
262         logger.debug("there was no modification yet for this class in cache, "+
263               "but the serial is newer the cache's start serial, so object is current.");
264         return false;
265      }
266      // If there was no entry, and the query serial is old, we must check the
267      // database whether that object is current. Note, that at this point the
268      // object should be locked, or else this query can not tell for sure.
269      if ( (entry==null) && (lastCacheRemovedSerial!=null) && 
270            (serial.longValue() <= lastCacheRemovedSerial.longValue()) )
271      {
272         // Query the object now
273         Object current = queryService.findSingle("find obj("+objectClass.getName()+") where obj = ?", new Object[] { new Identifier(id) });
274         if ( current == null )
275         {
276            logger.debug("object does not exists, so it's unchanged: "+id);
277            return false; // Object does not yet exist globally
278         }
279         PersistenceMetaData persistenceMeta = objectTracker.getMetaData(current);
280         if ( persistenceMeta.getPersistenceStart().longValue() <= serial.longValue() )
281         {
282            logger.debug("object is old, but database said it is current.");
283            return false;
284         } else {
285            logger.debug("object is old, and database said it is also not current.");
286            return true;
287         }
288      }
289      // If there is an enty, but it has potentialTxSerial, that means
290      // it was in a transaction, which started commiting, but never finished.
291      // The client disconnected, and the locks were relinquished. In this
292      // case, we must query the database to make sure the transaction commited.
293      if ( entry.potentialTxSerial!=null )
294      {
295         // Query the object now
296         Object current = queryService.findSingle("find obj("+objectClass.getName()+") where obj = ?",new Object[] { new Identifier(id) });
297         if ( current == null )
298         {
299            logger.debug("unfinished transaction found, but object not found in database: "+id);
300            return false; // Object does not yet exist globally
301         }
302         PersistenceMetaData persistenceMeta = objectTracker.getMetaData(current);
303         // If the metadata's serial equals the potentialChangeSerial, then commit
304         // the transaction manually.
305         if ( persistenceMeta.getPersistenceStart().longValue() >= entry.potentialChangeSerial.longValue() )
306         {
307            logger.info("transaction commit not received for serial: "+entry.potentialTxSerial+", but transaction commited according to database.");
308            // Transaction was committed successfully
309            endTransaction(entry.potentialTxSerial);
310         } else {
311            logger.info("transaction commit not received for serial: "+entry.potentialTxSerial+", and transaction failed according to database.");
312            // Originating transaction failed. Remove those entries which do
313            // not have a lastChangeSerial, because those were not modified yet.
314            List entries = (List) entriesByTxSerial.remove(entry.potentialTxSerial);
315            for ( int i=0; (entries!=null) && (i<entries.size()); i++ )
316            {
317               ModificationEntry removeEntry = (ModificationEntry) entries.get(i);
318               entriesById.remove(removeEntry.id);
319               entriesByDate.remove(removeEntry);
320               modifyClassEntries(removeEntry,false);
321            }
322         }
323         // Now compare the query serial with the object start serial
324         if ( persistenceMeta.getPersistenceStart().longValue() <= serial.longValue() )
325         {
326            logger.debug("unfinished transaction is resolved, current object did not change.");
327            return false;
328         }
329      }
330      // Default is to compare entry's last change serial with the query date.
331      // If the lastChangeSerial is not yet present in the entry, that means
332      // the modification already started, but not yet commited. This means it is
333      // not modified.
334      if ( (entry.lastChangeSerial==null) || (entry.lastChangeSerial.longValue() <= serial.longValue()) )
335      {
336         logger.debug("there was a change for that object in cache, but the object is newer. "+
337               "Last change was on: "+entry.lastChangeSerial+", current object selected: "+serial);
338         return false;
339      }
340      // Last default is changed
341      logger.debug("object was not current, fallthrough return");
342      return true;
343   }
344 
345   /**
346    * Returns whether the given object instance is a current representation
347    * of that object identity among all nodes. When any object is saved, all
348    * of it's attributes will be saved as-is, to protect data integrity. If any
349    * other thread or node already modified that identity (database row), then
350    * those modifications will be overwritten. This method returns if the
351    * object given has any such modifications since it was queried from the
352    * database. Objects not yet in the database are considered 'current'.
353    * @return True, if object is current, false otherwise.
354    */
355   public boolean isCurrent(Object obj)
356   {
357      return isCurrent(objectTracker.getMetaData(obj));
358   }
359 
360   /**
361    * Returns whether the given class changed since the given serial. A class changed,
362    * if any objects of the given class were changed.
363    */
364   public boolean isCurrent(Class cl, Long serial)
365   {
366      logger.debug("making sure class "+cl+" is current at: "+serial);
367      if ( serial == null )
368         return true; // No serial given, this is always current
369      // Make it a server call
370      if ( nodeManager.getRole() == NodeManager.NodeRole.CLIENT )
371      {
372         return (Boolean) nodeManager.callServer(ClassTracker.class.getName(),
373               "isCurrent",new Class[] { Class.class, long.class },
374               new Object[] { cl, serial });
375      }
376      // Local
377      synchronized ( operationTracker.getMutex() )
378      {
379         // We must check each modification
380         // which concerns this class, whether they are newer than the given serial.
381         Set entries = (Set) entriesByClass.get(cl);
382         if ( entries != null )
383         {
384            // If the entries are not null, then check whether there was a valid
385            // modification.
386            Iterator entriesIterator = entries.iterator();
387            while ( entriesIterator.hasNext() )
388            {
389               ModificationEntry entry = (ModificationEntry) entriesIterator.next();
390               if ( hasChanged(entry.id,entry.objectClass,serial) )
391               {
392                  logger.debug("meta is a class, and there was an object of this class newer.");
393                  return false; // Change of object is newer, so class changed
394               }
395            }
396         }
397         // There were no modifications registered by object changes, but
398         // the class still could have changed before, so check change date.
399         Long lastTableSerial = (Long) lastSerialsByClass.get(cl);
400         if ( (lastTableSerial==null) || (lastTableSerial.longValue() < serial) )
401         {
402            logger.debug("class modification table says class is current.");
403            return true;
404         }
405         // Fallback (not current).
406         logger.debug("all checks failed, object is not current");
407         return false;
408      }
409   }
410 
411   /**
412    * Internal remote call for objects.
413    */
414   public boolean isCurrent(PersistenceMetaData meta)
415   {
416      // Make it a server call
417      if ( nodeManager.getRole() == NodeManager.NodeRole.CLIENT )
418      {
419         return (Boolean) nodeManager.callServer(ClassTracker.class.getName(),
420               "isCurrent",new Class[] { PersistenceMetaData.class },
421               new Object[] { meta });
422      }
423      // If this is not a persisted object, return current
424      if ( meta.getPersistenceId()==null )
425         return true; 
426      // Make checks for object
427      synchronized ( operationTracker.getMutex() )
428      {
429         logger.debug("making sure meta: "+meta+" is current.");
430         // If the startdate is not set, then object is not yet saved, it is current.
431         if ( meta.getPersistenceStart()==null )
432         {
433            logger.debug("object is not yet persistent, so it is current.");
434            return true;
435         }
436         // If the end serial is set, then the object is definitely not current
437         // (it is deleted)! Implementation note: object is only deleted, if
438         // the endserial is in the past, but we don't know the actual serial,
439         // so the workaround is to see, if endserial is less then Long.MAX_VALUE,
440         // assuming that there are no "future" deleted objects.
441         if ( (meta.getPersistenceEnd()!=null) &&
442           (meta.getPersistenceEnd().longValue() < Long.MAX_VALUE) )
443         {
444            logger.debug("object has enddate set, it will not be considered current.");
445            return false;
446         }
447         // If the query date is not set, then we have no data to match to, 
448         // return NOT current.
449         if ( meta.getLastCurrentSerial() == null )
450         {
451            logger.debug("metadata contained no query serial, returning 'current'.");
452            return false;
453         }
454         // Test whether entry indicates a change since the query of the meta we received
455         if ( ! hasChanged(meta.getPersistenceId(),meta.getObjectClass(),meta.getLastCurrentSerial()) )
456         {
457            logger.debug("meta represents an object, and haschanged reports not changed.");
458            return true;
459         }
460         // Fallback (not current).
461         logger.debug("all checks failed, object is not current");
462         return false;
463      }
464   }
465 
466   public class ModificationEntry implements Comparable
467   {
468      public Long id;
469      public Class objectClass;
470      public Date entryDate;
471      public Long lastChangeSerial;
472      public Long potentialChangeSerial;
473      public Long potentialTxSerial;
474 
475      public int compareTo(Object entry)
476      {
477         return entryDate.compareTo(((ModificationEntry)entry).entryDate);
478      }
479 
480      public int hashCode()
481      {
482         return id.hashCode();
483      }
484 
485      public boolean equals(Object o)
486      {
487         return id==((ModificationEntry) o).id;
488      }
489   }
490 
491   public void configurationChanged(ConfigurationEvent event)
492   {
493      if ( (event.getPropertyName()!=null) && 
494            (event.getPropertyName().startsWith("beankeeper.cache.modification_")) )
495         configurationReload();
496   }
497 
498   public void configurationReload()
499   {
500      MAX_ENTRY_AGE = configurationTracker.getConfiguration().
501         getInt("beankeeper.cache.modification_max_age",30*60*1000);
502      MAX_ENTRY_ITEMS = configurationTracker.getConfiguration().
503         getInt("beankeeper.cache.modification_max_items",10000);
504   }
505}
506 
507 

[all classes][hu.netmind.beankeeper.modification.impl]
EMMA 2.0.5312debian (C) Vladimir Roubtsov