EMMA Coverage Report (generated Sun May 02 20:42:29 CEST 2010)
[all classes][hu.netmind.beankeeper.cache.impl]

COVERAGE SUMMARY FOR SOURCE FILE [MinimalResultsCache.java]

nameclass, %method, %block, %line, %
MinimalResultsCache.java67%  (2/3)94%  (17/18)84%  (817/977)93%  (190.2/204)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class MinimalResultsCache100% (1/1)93%  (14/15)83%  (799/959)93%  (189.2/203)
getCache (): Cache 0%   (0/1)0%   (0/3)0%   (0/1)
updateEntries (String, Long): void 100% (1/1)70%  (50/71)86%  (10.4/12)
getEntry (QueryStatement, Limits): SearchResult 100% (1/1)76%  (121/159)91%  (27.2/30)
addEntry (QueryStatement, Limits, SearchResult): void 100% (1/1)81%  (251/309)92%  (58/63)
clear (): void 100% (1/1)83%  (48/58)94%  (11.3/12)
updateEntries (List, Long): void 100% (1/1)84%  (78/93)92%  (18.4/20)
removeEntry (MinimalResultsCache$CacheEntry): void 100% (1/1)89%  (78/88)95%  (16.2/17)
init (Map): void 100% (1/1)90%  (45/50)99%  (11.9/12)
<static initializer> 100% (1/1)100% (12/12)100% (5/5)
MinimalResultsCache (): void 100% (1/1)100% (26/26)100% (9/9)
configurationChanged (ConfigurationEvent): void 100% (1/1)100% (11/11)100% (3/3)
configurationReload (): void 100% (1/1)100% (30/30)100% (5/5)
getRepresentation (QueryStatement, Limits): String 100% (1/1)100% (16/16)100% (3/3)
handle (PersistenceEvent): void 100% (1/1)100% (18/18)100% (6/6)
release (): void 100% (1/1)100% (15/15)100% (5/5)
     
class MinimalResultsCache$10%   (0/1)100% (0/0)100% (0/0)100% (0/0)
     
class MinimalResultsCache$CacheEntry100% (1/1)100% (3/3)100% (18/18)100% (2/2)
MinimalResultsCache$CacheEntry (MinimalResultsCache): void 100% (1/1)100% (6/6)100% (1/1)
MinimalResultsCache$CacheEntry (MinimalResultsCache, MinimalResultsCache$1): ... 100% (1/1)100% (4/4)100% (1/1)
compareTo (Object): int 100% (1/1)100% (8/8)100% (1/1)

1/**
2 * Copyright (C) 2006 NetMind Consulting Bt.
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 3 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 * Lesser General Public License for more details.
13 *
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with this library; if not, write to the Free Software
16 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17 */
18 
19package hu.netmind.beankeeper.cache.impl;
20 
21import hu.netmind.beankeeper.config.ConfigurationTracker;
22import hu.netmind.beankeeper.config.ExtendedConfigurationListener;
23import hu.netmind.beankeeper.management.ManagementTracker;
24import hu.netmind.beankeeper.parser.*;
25import hu.netmind.beankeeper.event.EventDispatcher;
26import hu.netmind.beankeeper.event.PersistenceEventListener;
27import hu.netmind.beankeeper.event.PersistenceEvent;
28import hu.netmind.beankeeper.model.*;
29import hu.netmind.beankeeper.cache.ResultsCache;
30import hu.netmind.beankeeper.db.Limits;
31import hu.netmind.beankeeper.db.SearchResult;
32import hu.netmind.beankeeper.object.Identifier;
33import hu.netmind.beankeeper.object.PersistenceMetaData;
34import hu.netmind.beankeeper.serial.Serial;
35import hu.netmind.beankeeper.schema.SchemaManager;
36import hu.netmind.beankeeper.store.event.ObjectsFinalizationEvent;
37import hu.netmind.beankeeper.node.event.NodeStateChangeEvent;
38import org.apache.log4j.Logger;
39import java.util.*;
40import org.apache.commons.configuration.event.ConfigurationEvent;
41 
42/**
43 * This is an implementation of an intelligent, configurationless
44 * read-only cache with change detection.<br>
45 * The main design point is that it does not require any configuration
46 * from the user. It's task is to cache result sets up to a previously
47 * given deadline, and when that is reached, clear from cache. When the
48 * same result is referenced, the deadline may be moved further into the future.
49 * Memory management is dynamic. When a resultset arives into the cache,
50 * it is <strong>always</strong> cached, but if the cache detects, that
51 * there is "not enough memory" (see below) left, it may clear some entries before their
52 * their deadline is reached.<br>
53 * So basically one does not have to configure the size of the cache because
54 * it assumes that if a resultset was not recalled in a given timeframe, 
55 * the overhead of selecting from database is acceptable (rather than
56 * always using a predetemined size for the cache, hoping to achieve more
57 * cache hits). Also, memory adapts to usage: When the load is low, 
58 * it is more likely, that only a few resultsets are in the cache, because
59 * they expire, and are not likely to be hit anyway. But if the load rises,
60 * more and more results get into the cache, the likelyhood of a hit also
61 * rises, together with the memory allocation.<br>
62 * The cache determines whether there is enough memory by checking the 
63 * raw bytes free, and also computes the ratio of allocated vs. free memory. 
64 * If this ratio is below a given threshold, then there is enough memory. The
65 * theory is, that the Java VM will allocate more heap when this ratio
66 * is sufficiently small (usually around 60-70%). This leaves two cases:<br>
67 * <ul>
68 *    <li>If the cache's ratio is less than the VM's, than the cache will
69 *    not force the VM to allocate more space, which in turn means, that
70 *    the cache will not grow, although the VM could allocate more memory.</li>
71 *    <li>If this ratio is more than the VM's, than the cache will potentially
72 *    force the VM to allocate new memory, potentially eating the memory
73 *    away from more important tasks.</li>
74 * </ul>
75 * The cache uses the first non-agressive algorithm. The cache itself will not
76 * cause the VM to allocate more heap, but if the application uses more memory
77 * the cache will use proportionally more memory for it's own cause. Note:
78 * The VM tries to maintain free/used ratio between appr. 30-70%.<br>
79 * This cache specializes to store only current searches' result (rather than
80 * current <strong>and</strong> historical results). This is because in the
81 * case of current results, the cache can effectively compute the interval
82 * the result is valid. When determining whether a statement's result is in
83 * the cache, the cache searches all entries (which are all current), and if
84 * the statements serial is above or equals to the result's start serial
85 * (the serial of the first query which caused the entry to be created),
86 * then the result is valid for that query. If a query is received for which
87 * the result may depend on changes inside the transaction, which are not
88 * yet visible to the other transactions, then this query is not handled. This
89 * is mainly because handling transaction-dependent result sets would be
90 * a large overhead for the cache, with little benefit if at all.<br>
91 * In effect, cache hits will occur mostly, when the same non-historical 
92 * query, for a common table (not frequently changed) is run multiple times 
93 * in short period of time.
94 * @author Brautigam Robert
95 * @version Revision: $Revision$
96 */
97public class MinimalResultsCache 
98   implements ResultsCache, ExtendedConfigurationListener, PersistenceEventListener
99{
100   private static Logger logger = Logger.getLogger(MinimalResultsCache.class);
101   
102   private static int MIN_FREE_BYTES = 512*1024; // Min free memory in bytes
103   private static int MIN_FREE_RATE = 60; // Min free memory in percentage to total allocated
104   private static int FREE_RATE = 2; // How many entries to free for a single entry if needed
105   private static long EXPIRATION_INTERVAL = 1*60*1000; // Expiration in millis
106 
107   private SortedSet entriesByExpiration; // The cache entries sorted by expiration
108   private Map entriesByRepresentation; // Entries by statement representation
109   private Map entriesByTables; // Entries by table names
110   private Object cacheMutex = new Object(); // Mutex for cache
111   private Long startSerial; // The serial on which the cache started
112   private Map serialsByTables; // Last modification serials of tables
113 
114   private Cache cache = null;
115   private ConfigurationTracker configurationTracker = null; // Injected
116   private EventDispatcher eventDispatcher = null; // Injected
117   private ManagementTracker managementTracker = null; // Injected
118   private ClassTracker classTracker = null; // Injected
119   private SchemaManager schemaManager = null; // Injected
120   
121   public void init(Map parameters)
122   {
123      this.startSerial=null;
124      this.cache=new Cache(this);
125      clear();
126      // Configure
127      configurationReload();
128      configurationTracker.addListener(this);
129      // Initialize
130      synchronized ( cacheMutex )
131      {
132         clear();
133         // TODO: this should be refreshed when node manager reconnects
134         startSerial = Serial.getSerial(new Date()).getValue();
135      }
136      // Listen for updates
137      eventDispatcher.registerListener(this);
138      // Register mbean
139      managementTracker.registerBean("Cache",cache);
140   }
141 
142   public void release()
143   {
144      clear();
145      managementTracker.deregisterBean("Cache");
146      eventDispatcher.unregisterListener(this);
147      configurationTracker.removeListener(this);
148   }
149 
150   Cache getCache()
151   {
152      return cache;
153   }
154 
155   private String getRepresentation(QueryStatement stmt, Limits limits)
156   {
157      if ( limits != null )
158         return stmt.getStaticRepresentation()+limits.toString();
159      else
160         return stmt.getStaticRepresentation();
161   }
162   
163   /**
164    * Get an entry from the cache.
165    * @param stmt The statement to look for.
166    * @param limits The limits of the query.
167    * @return A SearchResult object if the query was cached, null otherwise.
168    */
169   public SearchResult getEntry(QueryStatement stmt, Limits limits)
170   {
171      // Check whether entry was modified in the same transaction. Only
172      // those results are cached, which are global.
173      if ( stmt.getTimeControl().isApplyTransaction() )
174         return null;
175      // Get entry
176      String rep = getRepresentation(stmt,limits);
177      if ( logger.isDebugEnabled() )
178         logger.debug("searching in cache for: "+rep+", entries: "+entriesByExpiration.size());
179      if ( (rep==null) || ("".equals(rep)) )
180         return null;
181      CacheEntry entry = null;
182      synchronized ( cacheMutex )
183      {
184         entry = (CacheEntry) entriesByRepresentation.get(rep);
185      }
186      if ( entry == null )
187      {
188         // Cache miss
189         synchronized ( cache )
190         {
191            cache.setMissCount(cache.getMissCount()+1);
192         }
193         return null;
194      }
195      // Check whether query is after result became active
196      if ( entry.startSerial > stmt.getTimeControl().getSerial().longValue() )
197         return null;
198      // All OK, result is valid set statistics
199      synchronized ( cacheMutex )
200      {
201         entriesByExpiration.remove(entry); // Remove, because it will be re-ordered
202         entry.accessCount++;
203         entry.lastAccess = System.currentTimeMillis();
204         entry.expiration += EXPIRATION_INTERVAL;
205         entriesByExpiration.add(entry);
206      }
207      // Return with cache hit
208      logger.debug("cache HIT.");
209      synchronized ( cache )
210      {
211         cache.setHitCount(cache.getHitCount()+1);
212      }
213      return entry.result;
214   }
215 
216   /**
217    * Remove an entry from cache.
218    */
219   private void removeEntry(CacheEntry entry)
220   {
221      synchronized ( cacheMutex )
222      {
223         entriesByExpiration.remove(entry);
224         entriesByRepresentation.remove(entry.representation);
225         Iterator tableIterator = entry.tables.iterator();
226         while ( tableIterator.hasNext() )
227         {
228            String tableName = (String) tableIterator.next();
229            Set tableEntries = (Set) entriesByTables.get(tableName); // This shouldn't be null
230            tableEntries.remove(entry);
231            if ( tableEntries.size() == 0 )
232               entriesByTables.remove(tableName);
233         }
234         // Remove from management bean
235         synchronized ( cache )
236         {
237            cache.setResultCount(cache.getResultCount()-1);
238            cache.setObjectCount(cache.getObjectCount()-entry.result.getResult().size());
239         }
240      }
241   }
242 
243   /**
244    * Add an entry to the cache.
245    * @param stmt The statement source of result.
246    * @param limits The limits of result.
247    * @param result The SearchResult object.
248    */
249   public void addEntry(QueryStatement stmt, Limits limits, SearchResult result)
250   {
251      // Check whether entry was modified in the same transaction. Only
252      // those results are cached, which are global.
253      if ( stmt.getTimeControl().isApplyTransaction() )
254         return;
255      // Rep
256      String rep = getRepresentation(stmt,limits);
257      if ( logger.isDebugEnabled() )
258         logger.debug("adding to cache: "+rep+", entries: "+entriesByExpiration.size());
259      if ( (rep==null) || ("".equals(rep)) )
260         return;
261      // First, determine how many entries to free. By default, all expired
262      // entries are freed, but if there is not enough memory, entries
263      // can be forced to be removed.
264      int forceFreeResultsCount = 0; // By default none are forced
265      long freeMem = Runtime.getRuntime().freeMemory();
266      long totalMem = Runtime.getRuntime().totalMemory();
267      if ( (freeMem<MIN_FREE_BYTES) || (100.0*freeMem/totalMem>MIN_FREE_RATE) )
268      {
269         if ( logger.isDebugEnabled() )
270            logger.debug("not enough memory to cache, free: "+freeMem+", total: "+totalMem);
271         // Not enough memory, set force free count
272         forceFreeResultsCount = result.getResult().size()*FREE_RATE+1;
273      }
274      // Free entries
275      long currentTime = System.currentTimeMillis();
276      long lastExpiration = currentTime;
277      while ( ((forceFreeResultsCount>0) || (lastExpiration<currentTime)) && 
278         (entriesByExpiration.size()>0) )
279      {
280         // Get top entry
281         CacheEntry entry = null;
282         synchronized ( cacheMutex )
283         {
284            entry = (CacheEntry) entriesByExpiration.first();
285         }
286         // Set indicators
287         lastExpiration = entry.expiration;
288         forceFreeResultsCount -= entry.result.getResult().size();
289         // Free it
290         removeEntry(entry);
291      }
292      if ( logger.isDebugEnabled() )
293         logger.debug("cache entries after free: "+entriesByExpiration.size());
294      // Create new entry
295      CacheEntry entry = new CacheEntry();
296      entry.representation=rep;
297      entry.result=result;
298      entry.accessCount=0;
299      entry.firstAccess=currentTime;
300      entry.lastAccess=currentTime;
301      entry.expiration=currentTime+EXPIRATION_INTERVAL;
302      entry.tables=stmt.computeTables();
303      entry.startSerial=stmt.getTimeControl().getSerial().longValue();
304      // Add new entry to cache
305      synchronized ( cacheMutex )
306      {
307         // Determine whether entry is current (all table
308         // modifications are previous to entry)
309         Iterator tableIterator = entry.tables.iterator();
310         while ( tableIterator.hasNext() )
311         {
312            String tableName = (String) tableIterator.next();
313            Long lastModificationSerial = (Long) serialsByTables.get(tableName);
314            if ( lastModificationSerial == null )
315               lastModificationSerial = startSerial;
316            if ( lastModificationSerial.longValue() > entry.startSerial )
317               return; // Table is newer than query, so query is historical
318         }
319         // Add to maps
320         entriesByExpiration.add(entry);
321         entriesByRepresentation.put(entry.representation,entry);
322         // Add to table indexed map
323         tableIterator = entry.tables.iterator();
324         while ( tableIterator.hasNext() )
325         {
326            String tableName = (String) tableIterator.next();
327            Set tableEntries = (Set) entriesByTables.get(tableName);
328            if ( tableEntries == null )
329            {
330               tableEntries = new HashSet();
331               entriesByTables.put(tableName,tableEntries);
332            }
333            tableEntries.add(entry);
334         }
335         // Add to management bean
336         synchronized ( cache )
337         {
338            cache.setResultCount(cache.getResultCount()+1);
339            cache.setObjectCount(cache.getObjectCount()+entry.result.getResult().size());
340         }
341      }
342   }
343 
344   /**
345    * Clear the cache.
346    */
347   public void clear()
348   {
349      logger.debug("clearing the cache");
350      synchronized ( cacheMutex )
351      {
352         entriesByExpiration = new TreeSet();
353         entriesByRepresentation = new HashMap();
354         entriesByTables = new HashMap();
355         serialsByTables = new HashMap();
356         // Clear management bean
357         synchronized ( cache )
358         {
359            cache.setResultCount(0);
360            cache.setObjectCount(0);
361         }
362      }
363   }
364 
365   public void handle(PersistenceEvent event)
366   {
367      if ( event instanceof ObjectsFinalizationEvent )
368      {
369         // All object finalizations are handled, even
370         // from remote nodes, to maintain a fair cache
371         ObjectsFinalizationEvent finEvent = (ObjectsFinalizationEvent) event;
372         updateEntries(finEvent.getMetas(),finEvent.getSerial());
373      }
374      if ( event instanceof NodeStateChangeEvent )
375      {
376         // To be sure, we clear this cache on each state change
377         clear();
378      }
379   }
380 
381   /**
382    * Update the tables for given ids.
383    */
384   public void updateEntries(List<PersistenceMetaData> metas, Long modifySerial)
385   {
386      if ( logger.isDebugEnabled() )
387         logger.debug("cache will be updated because following metas were modified: "+metas);
388      // Assemble all tables which changed according
389      // to ids.
390      Set tableNames = new HashSet();
391      Iterator metaIterator = metas.iterator();
392      while ( metaIterator.hasNext() )
393      {
394         Identifier id = new Identifier(((PersistenceMetaData)metaIterator.next()).getPersistenceId());
395         ClassEntry entry = classTracker.getClassEntry(id.getClassId());
396         ClassInfo info = classTracker.getClassInfo(entry);
397         // Get all supertables too, because those all got modified potentially
398         while ( (entry!=null) && (info.isStorable()) )
399         {
400            tableNames.add(schemaManager.getTableName(entry));
401            entry = entry.getSuperEntry();
402            if ( entry != null )
403               info = classTracker.getClassInfo(entry);
404         }
405      }
406      // Go through all tables an update cache
407      synchronized ( cacheMutex )
408      {
409         Iterator tableNameIterator = tableNames.iterator();
410         while ( tableNameIterator.hasNext() )
411            updateEntries((String) tableNameIterator.next(),modifySerial);
412      }
413   }
414 
415   /**
416    * Tell the cache, that a table was updated. If an object is updated,
417    * the old resultsets could be theoretically kept, with an other time
418    * control, but empirically that does not add to cache hits, because more
419    * often, only current resultsets are selected.
420    * @param tableName The table to update.
421    * @param modifySerial The modification serial of table.
422    */
423   private void updateEntries(String tableName, Long modifySerial)
424   {
425      // Update table
426      synchronized ( cacheMutex )
427      {
428         serialsByTables.put(tableName,modifySerial);
429         // Get entries
430         Set entries = null;
431         entries = (Set) entriesByTables.get(tableName);
432         if ( entries != null )
433         {
434            // Remove all entries 
435            for ( CacheEntry entry : new HashSet<CacheEntry>(entries) )
436               removeEntry(entry);
437            entriesByTables.remove(tableName);
438            if ( logger.isDebugEnabled() )
439               logger.debug("updated cache table '"+tableName+"', entry count: "+entriesByExpiration.size());
440         }
441      }
442   }
443 
444   /**
445    * This is a single cache entry.
446    */
447   private class CacheEntry implements Comparable
448   {
449      // Statistics
450      public int accessCount;
451      public long firstAccess;
452      public long lastAccess;
453      public long expiration;
454 
455      // Data
456      public String representation;
457      public Set tables;
458      public SearchResult result;
459 
460      // Valid markers
461      public long startSerial; // Maximum of touched table last changed serials
462 
463      public int compareTo(Object obj)
464      {
465         return (int) (expiration - ((CacheEntry) obj).expiration);
466      }
467   }
468 
469   public void configurationChanged(ConfigurationEvent event)
470   {
471      if ( (event.getPropertyName()!=null) && 
472            (event.getPropertyName().startsWith("beankeeper.cache")) )
473         configurationReload();
474   }
475 
476   public void configurationReload()
477   {
478      MIN_FREE_BYTES = configurationTracker.getConfiguration().
479         getInt("beankeeper.cache.min_free_bytes",512*1024);
480      MIN_FREE_RATE = configurationTracker.getConfiguration().
481         getInt("beankeeper.cache.min_free_rate",60);
482      FREE_RATE = configurationTracker.getConfiguration().
483         getInt("beankeeper.cache.force_free_rate",2);
484      EXPIRATION_INTERVAL = configurationTracker.getConfiguration().
485         getInt("beankeeper.cache.expiration",60*1000);
486   }
487}
488 
489 

[all classes][hu.netmind.beankeeper.cache.impl]
EMMA 2.0.5312debian (C) Vladimir Roubtsov