EMMA Coverage Report (generated Sun May 02 20:42:29 CEST 2010)
[all classes][hu.netmind.beankeeper.transaction.impl]

COVERAGE SUMMARY FOR SOURCE FILE [TransactionTrackerImpl.java]

nameclass, %method, %block, %line, %
TransactionTrackerImpl.java100% (2/2)84%  (31/37)70%  (565/810)78%  (123.8/159)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class TransactionTrackerImpl$TransactionImpl100% (1/1)78%  (18/23)67%  (202/303)76%  (52.8/69)
access$000 (TransactionTrackerImpl$TransactionImpl): Exception 0%   (0/1)0%   (0/3)0%   (0/1)
getAllocateTrace (): Exception 0%   (0/1)0%   (0/3)0%   (0/1)
getTransactionIsolation (): int 0%   (0/1)0%   (0/10)0%   (0/3)
isRollbackOnly (): boolean 0%   (0/1)0%   (0/3)0%   (0/1)
setTransactionIsolation (int): void 0%   (0/1)0%   (0/20)0%   (0/5)
rollback (): void 100% (1/1)47%  (24/51)80%  (8/10)
commit (): void 100% (1/1)58%  (45/78)83%  (15/18)
equals (Object): boolean 100% (1/1)86%  (12/14)67%  (2/3)
TransactionTrackerImpl$TransactionImpl (TransactionTrackerImpl, Long, Integer... 100% (1/1)100% (44/44)100% (11/11)
access$100 (TransactionTrackerImpl$TransactionImpl, Connection): void 100% (1/1)100% (4/4)100% (1/1)
access$200 (TransactionTrackerImpl$TransactionImpl, Long): void 100% (1/1)100% (4/4)100% (1/1)
access$300 (TransactionTrackerImpl$TransactionImpl): Integer 100% (1/1)100% (3/3)100% (1/1)
begin (): void 100% (1/1)100% (18/18)100% (3/3)
getConnection (): Connection 100% (1/1)100% (3/3)100% (1/1)
getEndSerial (): Long 100% (1/1)100% (3/3)100% (1/1)
getSerial (): Long 100% (1/1)100% (3/3)100% (1/1)
getServerId (): Integer 100% (1/1)100% (3/3)100% (1/1)
getStats (): TransactionStatistics 100% (1/1)100% (3/3)100% (1/1)
hashCode (): int 100% (1/1)100% (4/4)100% (1/1)
markRollbackOnly (): void 100% (1/1)100% (4/4)100% (2/2)
setConnection (Connection): void 100% (1/1)100% (4/4)100% (2/2)
setEndSerial (Long): void 100% (1/1)100% (4/4)100% (2/2)
toString (): String 100% (1/1)100% (17/17)100% (1/1)
     
class TransactionTrackerImpl100% (1/1)93%  (13/14)72%  (363/507)79%  (71.9/91)
getStackTrace (Exception): String 0%   (0/1)0%   (0/15)0%   (0/4)
removeTransaction (TransactionTrackerImpl$TransactionImpl): void 100% (1/1)49%  (47/96)72%  (10.9/15)
rollbackInternal (TransactionTrackerImpl$TransactionImpl): void 100% (1/1)56%  (29/52)70%  (7.7/11)
hasTransaction (Long): boolean 100% (1/1)71%  (12/17)67%  (2/3)
getTransaction (int): Transaction 100% (1/1)74%  (81/109)89%  (17.8/20)
commitInternal (TransactionTrackerImpl$TransactionImpl): void 100% (1/1)85%  (132/156)80%  (18.5/23)
<static initializer> 100% (1/1)100% (4/4)100% (1/1)
TransactionTrackerImpl (): void 100% (1/1)100% (18/18)100% (7/7)
access$400 (): Logger 100% (1/1)100% (2/2)100% (1/1)
access$500 (TransactionTrackerImpl, TransactionTrackerImpl$TransactionImpl): ... 100% (1/1)100% (4/4)100% (1/1)
access$600 (TransactionTrackerImpl, TransactionTrackerImpl$TransactionImpl): ... 100% (1/1)100% (4/4)100% (1/1)
init (Map): void 100% (1/1)100% (16/16)100% (4/4)
notifyTransactionCommitted (Long): void 100% (1/1)100% (13/13)100% (2/2)
release (): void 100% (1/1)100% (1/1)100% (1/1)

1/**
2 * Copyright (C) 2006 NetMind Consulting Bt.
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 3 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 * Lesser General Public License for more details.
13 *
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with this library; if not, write to the Free Software
16 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17 */
18 
19package hu.netmind.beankeeper.transaction.impl;
20 
21import java.util.*;
22import java.sql.Connection;
23import org.apache.log4j.Logger;
24import java.io.IOException;
25import java.io.StringWriter;
26import java.io.PrintWriter;
27import hu.netmind.beankeeper.common.StoreException;
28import hu.netmind.beankeeper.transaction.*;
29import hu.netmind.beankeeper.transaction.event.*;
30import hu.netmind.beankeeper.node.NodeManager;
31import hu.netmind.beankeeper.serial.SerialTracker;
32import hu.netmind.beankeeper.db.Database;
33import hu.netmind.beankeeper.operation.OperationTracker;
34import hu.netmind.beankeeper.event.EventDispatcher;
35 
36/**
37 * This class keeps track of all transaction objects allocated.
38 * @author Brautigam Robert
39 * @version Revision: $Revision$
40 */
41public class TransactionTrackerImpl implements TransactionTracker
42{
43   private static Logger logger = Logger.getLogger(TransactionTrackerImpl.class);
44   
45   private ThreadLocal transactions;
46   private LinkedList allTransactions;
47   private HashSet allTransactionIds;
48 
49   private SerialTracker serialTracker = null; // Injected
50   private NodeManager nodeManager = null; // Injected
51   private Database database = null; // Injected
52   private OperationTracker operationTracker = null; // Injected
53   private EventDispatcher eventDispatcher = null; // Injected
54 
55   public void init(Map parameters)
56   {
57      transactions = new ThreadLocal();
58      allTransactions = new LinkedList();
59      allTransactionIds = new HashSet();
60   }
61 
62   public void release()
63   {
64   }
65 
66   public boolean hasTransaction(Long serial)
67   {
68      synchronized ( allTransactions )
69      {
70         return allTransactionIds.contains(serial);
71      }
72   }
73 
74   /**
75    * Get a transaction. Following modes are supported:
76    * <ul>
77    *    <li>TX_REQUIRED: A new transaction is allocated if no current
78    *    transaction exists, otherwise the current transaction is returned.</li>
79    *    <li>TX_NEW: A new transaction is allocated either way.</li>
80    *    <li>TX_OPTIONAL: If there is a current transaction, that is returned,
81    *    null otherwise.</li>
82    * </ul>
83    * Note, that each transaction can support multiple levels of begin-commit
84    * blocks. Each transaction only commits/rollsback is the most outer
85    * block is commited/rolled back.
86    */
87   public Transaction getTransaction(int mode)
88   {
89      LinkedList list = (LinkedList) transactions.get();
90      if ( list == null )
91      {
92         // No list yet, initialize threadlocal
93         list = new LinkedList();
94         transactions.set(list);
95      }
96      if ( (list.size()==0) && (mode==TX_OPTIONAL) )
97         return null;
98      if ( (list.size() == 0) || (mode==TX_NEW) )
99      {
100         // No transaction, or new is explicitly required
101         Long txSerial = serialTracker.getNextSerial();
102         TransactionImpl transaction = new TransactionImpl(txSerial,
103               nodeManager.getServerId());
104         if ( logger.isDebugEnabled() )
105            logger.debug("transaction created: "+transaction);
106         if ( logger.isTraceEnabled() )
107            logger.trace("transaction allocation trace: "+getStackTrace(transaction.getAllocateTrace()));
108         transaction.setConnection(database.getConnectionSource().getConnection());
109         list.add(transaction);
110         synchronized ( allTransactions )
111         {
112            allTransactions.add(transaction);
113            allTransactionIds.add(transaction.getSerial());
114         }
115      }
116      return (Transaction) list.getLast();
117   }
118 
119   /**
120    * Commit a transaction. This is called from a Transaction object.
121    * @throws Exception If commit was unsuccessful.
122    */
123   private void commitInternal(TransactionImpl transaction)
124      throws Exception
125   {
126      if ( logger.isDebugEnabled() )
127         logger.debug("commiting transaction: "+transaction);
128      // Send notifications of commit, if there was an
129      // exception, then skip to rollback
130      eventDispatcher.notify(new TransactionCommittingEvent(transaction));
131      // We need a serial for the transaction to end. This serial
132      // will denote the commit itself. The beginning of a commit
133      // must be marked, because while the commit is running, no
134      // query can execute which has a higher serial, because this
135      // would mean the query will change once this commit finishes.
136      Long endSerial = operationTracker.startCommit(
137               nodeManager.getId());
138      transaction.setEndSerial(endSerial);
139      try
140      {
141         // Check whether we are on the same server still as when the transaction begun.
142         // This is important for the transaction to be synchronized with other nodes.
143         int currentServerId = nodeManager.getServerId();
144         int transactionServerId = transaction.getServerId();
145         if ( currentServerId != transactionServerId )
146            throw new StoreException("there was a reconnect during the transaction, rolling back to preserve consistency, current server id: "+
147                  currentServerId+" vs. "+transactionServerId);
148         // Notify listeners, that the end serial is ready. If
149         // error occurs in this, rollback is invoked.
150         eventDispatcher.notify(new TransactionCommitEndingEvent(transaction));
151         // Commit physically. Note if this fails, rollback is still invoked,
152         // which will close the transaction
153         transaction.getConnection().commit();
154         removeTransaction(transaction);
155      } finally {
156         // Whatever happens, disengage the commit semaphore. An error in this
157         // should not cause the commit to fail, so we catch the exception here.
158         try
159         {
160            operationTracker.endCommit(
161                  nodeManager.getId(),transaction.getEndSerial(),transaction.getSerial());
162            // Give warning if we're on different server now, this could mean,
163            // although not probable, that queries on the new server became inconsistent
164            if ( nodeManager.getServerId() != transaction.getServerId() )
165               logger.warn("commit ended on different server than it begun, check for possible database inconsitencies");
166         } catch ( Exception e ) {
167            logger.warn("commit lock could not be disengaged, check database for possible inconsitencies",e);
168         }
169      }
170      // Send local notifications
171      eventDispatcher.notifyAll(new TransactionCommittedEvent(transaction));
172      // Send remote notifications
173      nodeManager.callAll(TransactionTracker.class.getName(),"notifyTransactionCommitted",
174            new Class[] { Long.class }, new Object[] { transaction.getSerial() });
175   }
176 
177   public void notifyTransactionCommitted(Long txSerial)
178   {
179      eventDispatcher.notifyAll(new RemoteTransactionPostCommitEvent(
180               nodeManager.getId(),txSerial));
181   }
182 
183   /**
184    * Roll back a transaction. This is called from the Transaction object.
185    * Note, that this method will never fail.
186    */
187   private void rollbackInternal(TransactionImpl transaction)
188   {
189      if ( logger.isDebugEnabled() )
190         logger.debug("rolling back transaction: "+transaction);
191      // Send notifications of rollback, disregard exceptions
192      eventDispatcher.notifyAll(new TransactionRollingbackEvent(transaction));
193      // Physical rollback
194      Connection connection = transaction.getConnection();
195      try
196      {
197         connection.rollback();
198      } catch ( Exception e ) {
199         throw new StoreException("could not roll back transaction",e);
200      } finally {
201         // Remove transaction from queue
202         removeTransaction(transaction);
203      }
204      // Send notifications of rollback, disregard exceptions
205      eventDispatcher.notifyAll(new TransactionRolledbackEvent(transaction));
206   }
207 
208   /**
209    * Remove transaction from queue. Also, if this transaction is not
210    * toplevel, then throw exception.
211    */
212   private void removeTransaction(TransactionImpl transaction)
213   {
214      // Remove from threadlocal list
215      LinkedList list = (LinkedList) transactions.get();
216      if ( (list == null) || (list.size()==0) )
217         throw new StoreException("no transactions present, and tried to use one.");
218      if ( ! list.getLast().equals(transaction) )
219      {
220         TransactionImpl top = (TransactionImpl) list.getLast();
221         logger.warn("possible transaction leak, tried to commit/rollback transaction, which was not the currenct transaction, investigate! Top was: "+
222               top+", but tried to close: "+transaction+". "+
223               "Now follows the allocation trace of top and the transaction to be closed: \n"+
224               getStackTrace(top.getAllocateTrace())+"\n"+getStackTrace(transaction.getAllocateTrace()));
225         throw new StoreException("tried to commit/rollback a transaction which is not the current transaction, possible transaction leak!");
226      }
227      // Give back connection
228      database.getConnectionSource().releaseConnection(transaction.getConnection());
229      transaction.setConnection(null);
230      // Remove from list
231      list.removeLast();
232      // Remove from global list
233      synchronized ( allTransactions )
234      {
235         allTransactions.remove(transaction);
236         allTransactionIds.remove(transaction.getSerial());
237      }
238   }
239 
240   private String getStackTrace(Exception e)
241   {
242      StringWriter trace = new StringWriter();
243      PrintWriter writer = new PrintWriter(trace);
244      e.printStackTrace(writer);
245      return trace.toString();
246   }
247 
248   /**
249    * This is a high-level transaction object used throughout the persistence
250    * layer. Also, it can store attributes, and register listeners.
251    */
252   private class TransactionImpl extends HashMap implements Transaction
253   {
254      private Logger statsLogger = Logger.getLogger(Transaction.class.getName()+".stats");
255 
256      private Connection connection;
257      private boolean rollbackOnly = false;
258      private int depth = 0;
259      private Long serial;
260      private Long endSerial;
261      private Integer serverId;
262      
263      // Statistical attributes
264      private TransactionStatistics stats;
265      private Exception allocateTrace = null;
266 
267      public TransactionImpl(Long serial, Integer serverId)
268      {
269         super();
270         this.serial=serial;
271         this.serverId=serverId;
272         stats=new TransactionStatistics();
273         allocateTrace = new Exception("trace");
274      }
275 
276      private Integer getServerId()
277      {
278         return serverId;
279      }
280 
281      private Exception getAllocateTrace()
282      {
283         return allocateTrace;
284      }
285 
286      public int getTransactionIsolation()
287      {
288         try
289         {
290            return connection.getTransactionIsolation();
291         } catch ( Exception e ) {
292            throw new StoreException("unable to determine transaction isolation level.");
293         }
294      }
295        
296      public void setTransactionIsolation(int level)
297      {
298         try
299         {
300            connection.setTransactionIsolation(level);
301         } catch ( Exception e ) {
302            throw new StoreException("cannot set transaction isolation level: "+level,e);
303         }
304      }
305 
306      public Connection getConnection()
307      {
308         return connection;
309      }
310      private void setConnection(Connection connection)
311      {
312         this.connection=connection;
313      }
314 
315      /**
316       * Start the transaction. A transaction must be always start with
317       * the call to <code>begin()</code>. All subsequent calls to this method
318       * will increase the transaction depth, and to commit the transaction
319       * exactly that many <code>commit()</code> and <code>rollback()</code>
320       * calls must occur.
321       */
322      public void begin()
323      {
324         // Increase depth
325         logger.debug("transaction begins at depth: "+depth);
326         depth++;
327      }
328      
329      /**
330       * Commit a transaction.
331       */
332      public void commit()
333      {
334         if ( logger.isDebugEnabled() )
335            logger.debug("transaction "+this+" commits at depth: "+depth);
336         if ( statsLogger.isDebugEnabled() )
337            statsLogger.debug("transaction stats in commit: "+stats);
338         depth--;
339         if ( depth > 0 )
340            return;
341         if ( rollbackOnly )
342         {
343            rollbackInternal(this);
344         } else {
345            try
346            {
347               commitInternal(this);
348            } catch ( Exception e ) {
349               logger.error("commit threw exception, rolling back");
350               rollbackInternal(this);
351               if ( e instanceof StoreException )
352                  throw (StoreException) e;
353               else
354                  throw new StoreException("rolling back in commit because of exception",e);
355            }
356         }
357      }
358 
359      /**
360       * Rollback this transaction.
361       */
362      public void rollback()
363      {
364         if ( logger.isDebugEnabled() )
365            logger.debug("transaction "+this+" rolls back at depth: "+depth);
366         if ( statsLogger.isDebugEnabled() )
367            statsLogger.debug("transaction stats in rollback: "+stats);
368         depth--;
369         if ( depth > 0 )
370         {
371            markRollbackOnly();
372            return;
373         }
374         rollbackInternal(this);
375      }
376 
377      /**
378       * Mark this transaction as rollback only. This means the next call
379       * to either commit or rollback will rollback either way.
380       */
381      public void markRollbackOnly()
382      {
383         rollbackOnly=true;
384      }
385 
386      /**
387       * Returns whether this transaction was marked "rollback only".
388       */
389      public boolean isRollbackOnly()
390      {
391         return rollbackOnly;
392      }
393 
394      /**
395       * Get the serial of this transaction.
396       */
397      public Long getSerial()
398      {
399         return serial;
400      }
401 
402      public TransactionStatistics getStats()
403      {
404         return stats;
405      }
406 
407      public String toString()
408      {
409         return "[Tx: "+serial+" ("+serverId+")]";
410      }
411 
412      public Long getEndSerial()
413      {
414         return endSerial;
415      }
416      private void setEndSerial(Long endSerial)
417      {
418         this.endSerial=endSerial;
419      }
420 
421      public int hashCode()
422      {
423         return serial.intValue();
424      }
425 
426      public boolean equals(Object o)
427      {
428         if ( (o==null) || (!(o instanceof TransactionImpl)) )
429            return false;
430         return serial.equals(((TransactionImpl)o).serial);
431      }
432   }
433 
434}
435 
436 

[all classes][hu.netmind.beankeeper.transaction.impl]
EMMA 2.0.5312debian (C) Vladimir Roubtsov