EMMA Coverage Report (generated Sun May 02 20:42:29 CEST 2010)
[all classes][hu.netmind.beankeeper.node.impl]

COVERAGE SUMMARY FOR SOURCE FILE [NodeManagerImpl.java]

nameclass, %method, %block, %line, %
NodeManagerImpl.java100% (2/2)89%  (25/28)71%  (1010/1422)73%  (218.2/300)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class NodeManagerImpl$NodeEntry100% (1/1)25%  (1/4)7%   (3/43)17%  (1/6)
equals (Object): boolean 0%   (0/1)0%   (0/15)0%   (0/3)
hashCode (): int 0%   (0/1)0%   (0/3)0%   (0/1)
toString (): String 0%   (0/1)0%   (0/22)0%   (0/1)
NodeManagerImpl$NodeEntry (): void 100% (1/1)100% (3/3)100% (1/1)
     
class NodeManagerImpl100% (1/1)100% (24/24)73%  (1007/1379)74%  (218.2/295)
clearNodeList (List): void 100% (1/1)26%  (16/61)29%  (4.7/16)
callLocal (String, String, Class [], Object []): Object 100% (1/1)28%  (19/67)33%  (4/12)
isAlive (String, int): boolean 100% (1/1)37%  (39/105)52%  (11/21)
callAll (String, String, Class [], Object []): void 100% (1/1)53%  (37/70)59%  (6.5/11)
connect (String, int): Socket 100% (1/1)59%  (59/100)43%  (10/23)
getId (): Integer 100% (1/1)67%  (10/15)67%  (2/3)
release (): void 100% (1/1)71%  (12/17)71%  (5/7)
getServerId (): Integer 100% (1/1)72%  (13/18)75%  (3/4)
initialize (): void 100% (1/1)73%  (128/175)75%  (26.9/36)
ensureDisconnect (NodeClient): void 100% (1/1)76%  (25/33)82%  (6.5/8)
getRole (): NodeManager$NodeRole 100% (1/1)76%  (16/21)83%  (5/6)
connect (): void 100% (1/1)80%  (113/141)83%  (23.3/28)
configurationChanged (ConfigurationEvent): void 100% (1/1)82%  (9/11)67%  (2/3)
getHostAddresses (): String 100% (1/1)84%  (73/87)78%  (18/23)
callServer (String, String, Class [], Object []): Object 100% (1/1)87%  (46/53)75%  (9/12)
clearNode (): void 100% (1/1)89%  (63/71)99%  (11.9/12)
ensureState (NodeManager$NodeState): void 100% (1/1)96%  (134/139)98%  (25.4/26)
<static initializer> 100% (1/1)100% (8/8)100% (3/3)
NodeManagerImpl (): void 100% (1/1)100% (32/32)100% (11/11)
changeState (NodeManager$NodeState): void 100% (1/1)100% (16/16)100% (3/3)
configurationReload (): void 100% (1/1)100% (15/15)100% (3/3)
getState (): NodeManager$NodeState 100% (1/1)100% (3/3)100% (1/1)
init (Map): void 100% (1/1)100% (10/10)100% (4/4)
loadNodeList (Transaction, int): List 100% (1/1)100% (111/111)100% (19/19)

1/**
2 * Copyright (C) 2006 NetMind Consulting Bt.
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 3 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 * Lesser General Public License for more details.
13 *
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with this library; if not, write to the Free Software
16 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17 */
18 
19package hu.netmind.beankeeper.node.impl;
20 
21import hu.netmind.beankeeper.service.Service;
22import hu.netmind.beankeeper.service.StoreContext;
23import hu.netmind.beankeeper.common.StoreException;
24import hu.netmind.beankeeper.parser.*;
25import hu.netmind.beankeeper.event.EventDispatcher;
26import hu.netmind.beankeeper.config.ExtendedConfigurationListener;
27import hu.netmind.beankeeper.config.ConfigurationTracker;
28import hu.netmind.beankeeper.node.NodeManager;
29import hu.netmind.beankeeper.node.event.NodeStateChangeEvent;
30import hu.netmind.beankeeper.node.NodeManager.NodeState;
31import hu.netmind.beankeeper.node.NodeManager.NodeRole;
32import hu.netmind.beankeeper.transaction.Transaction;
33import hu.netmind.beankeeper.transaction.InternalTransactionTracker;
34import hu.netmind.beankeeper.db.SearchResult;
35import hu.netmind.beankeeper.db.Database;
36import java.lang.reflect.Method;
37import java.lang.reflect.InvocationTargetException;
38import java.util.*;
39import java.net.*;
40import java.sql.Connection;
41import java.sql.PreparedStatement;
42import org.apache.log4j.Logger;
43import org.apache.commons.configuration.event.ConfigurationEvent;
44 
45/**
46 * This manager enables the Store to function on a peer-to-peer
47 * fashion with other Store instances which are pointed to the same
48 * database. The class takes care of all IP communication related
49 * work, such as reconnecting, communication protocoll, etc. All
50 * synchronization points must occur through this manager, which guarantees
51 * synchronization across all other Store instances.
52 * @author Brautigam Robert
53 * @version Revision: $Revision$
54 */
55public class NodeManagerImpl implements NodeManager, ExtendedConfigurationListener
56{
57   private static Logger logger = Logger.getLogger(NodeManagerImpl.class);
58 
59   public static int CLIENT_RECONNECT_TRIES = 2;
60   private static int SOCKET_CONNECT_TIMEOUT = 3000;
61 
62   private int index = 0;
63   private int serverIndex;
64   private String ips;
65   private NodeServer server;
66   private NodeClient client;
67 
68   private boolean running = true;
69   private NodeState state = NodeState.OFFLINE;
70   private Object stateMutex = new Object();
71 
72   private ConfigurationTracker configurationTracker = null; // Injected
73   private EventDispatcher eventDispatcher = null; // Injected
74   private Database database = null; // Injected
75   private StoreContext context = null; // Injected
76   private InternalTransactionTracker transactionTracker = null; // Injected
77 
78   /**
79    * Construct node manager, establish identity, and make
80    * initial connection.
81    */
82   public void init(Map parameters)
83   {
84      // Configure
85      configurationReload();
86      configurationTracker.addListener(this);
87      // Initialize node, so we have a node id
88      ensureState(NodeState.INITIALIZED);
89   }
90 
91   public void release()
92   {
93      logger.debug("closing node manager.");
94      // Release all resources of state
95      try
96      {
97         ensureState(NodeState.UNINITIALIZED);
98      } catch ( Exception e ) {
99         logger.error("error while shutting down node manager",e);
100      }
101      configurationTracker.removeListener(this);
102   }
103 
104   public NodeState getState()
105   {
106      return state;
107   }
108 
109   public Integer getServerId()
110   {
111      ensureState(NodeState.CONNECTED);
112      if ( getState().getLevel() < NodeState.CONNECTED.getLevel() )
113         throw new StoreException("node not yet connected, can not determine server id");
114      return serverIndex;
115   }
116 
117   public Integer getId()
118   {
119      if ( getState().getLevel() < NodeState.INITIALIZED.getLevel() )
120         throw new StoreException("node not yet initialized, identity is not known");
121      return index;
122   }
123 
124   public NodeRole getRole()
125   {
126      ensureState(NodeState.CONNECTED);
127      if ( getState().getLevel() < NodeState.CONNECTED.getLevel() )
128         throw new StoreException("node not yet connected, can not determine role");
129      if ( client == null )
130         return NodeRole.SERVER;
131      else
132         return NodeRole.CLIENT;
133   }
134 
135   /**
136    * Get the server addresses from interfaces.
137    */
138   public static String getHostAddresses()
139   {
140      try
141      {
142         Enumeration interfaceEnumeration = NetworkInterface.getNetworkInterfaces();
143         // Copy from enumeration to addresses vector, but filter loopback addresses
144         ArrayList addresses = new ArrayList();
145         while ( interfaceEnumeration.hasMoreElements() )
146         {
147            NetworkInterface intf = (NetworkInterface) interfaceEnumeration.nextElement();
148            // Remove loopback addresses
149            Enumeration addressEnumeration = intf.getInetAddresses();
150            while ( addressEnumeration.hasMoreElements() )
151            {
152               InetAddress address = (InetAddress) addressEnumeration.nextElement();
153               // Insert to addresses only if not loopback and not link local
154               if ( (! address.isLoopbackAddress()) && (! address.isLinkLocalAddress()) )
155                  addresses.add(address);
156            }
157         }
158         // Pick one address from the remaining address space
159         logger.debug("server available local addresses: "+addresses);
160         // Now, multiple addresses are in the list, so copy all of them
161         // into the result string.
162         StringBuffer ips = new StringBuffer();
163         for ( int i=0; i<addresses.size(); i++ )
164         {
165            InetAddress address = (InetAddress) addresses.get(i);
166            if ( ips.length() > 0 )
167               ips.append(",");
168            ips.append(address.getHostAddress());
169         }
170         return ips.toString();
171      } catch ( StoreException e ) {
172         throw e;
173      } catch ( Exception e ) {
174         throw new StoreException("exception while determining server address",e);
175      }
176   }
177 
178   /**
179    * Determine if an address is available.
180    */
181   public static boolean isAlive(String ips, int port)
182   {
183      if ( logger.isDebugEnabled() )
184         logger.debug("trying to reach: "+ips+":"+port);
185      try
186      {
187         if ( "".equals(ips) )
188            ips = InetAddress.getLocalHost().getHostAddress();
189      } catch ( Exception e ) {
190         throw new StoreException("can not determine local adapter, but there is another node, which would need to be contacted.",e);
191      }
192      StringTokenizer tokens = new StringTokenizer(ips,",");
193      while ( tokens.hasMoreTokens() )
194      {
195         String ip = tokens.nextToken();
196         if ( logger.isDebugEnabled() )
197            logger.debug("determining whether '"+ip+":"+port+"' is alive...");
198         try
199         {
200            Socket socket = new Socket();
201            socket.connect(new InetSocketAddress(ip,port),SOCKET_CONNECT_TIMEOUT);
202            socket.close();
203            return true; // Success, so return true
204         } catch ( Exception e ) {
205            logger.debug("unreachable node at '"+ip+":"+port+", "+e.getMessage());
206         }
207      }
208      logger.debug("could not reach any of the ips given for node");
209      return false;
210   }
211 
212   /**
213    * Make the reflection call locally, for the local services.
214    */
215   public Object callLocal(String service, String method, Class[] parameterTypes,
216         Object[] parameters)
217   {
218      ensureState(NodeState.CONNECTED);
219      // Get the service, and with the help of relfection, call the named
220      // method
221      Service serviceObject = context.getService(service);
222      try
223      {
224         Method methodObject = serviceObject.getClass().getMethod(method, parameterTypes);
225         return methodObject.invoke(serviceObject,parameters);
226      } catch ( InvocationTargetException e ) {
227         if ( e.getCause() instanceof StoreException )
228            throw (StoreException) e.getCause();
229         throw new StoreException("received exception while invoking '"+method+"', of service: "+service,e.getCause());
230      } catch ( StoreException e ) {
231         throw e;
232      } catch ( Exception e ) {
233         throw new StoreException("could not invoke method '"+method+"', of service: "+service,e);
234      }
235   }
236 
237   /**
238    * Make a potentially remote call to the given service, with given
239    * parameters. This call will always go to the server
240    * node for execution. If this is the server node, then
241    * the service will be called locally. 
242    * If this node is a client, and the communication fails
243    * to the server, the communication is retried once.
244    * @return The object that was returned from RPC.
245    */
246   public Object callServer(String service, String method, Class[] parameterTypes,
247         Object[] parameters)
248   {
249      for ( int tries = 0; tries<CLIENT_RECONNECT_TRIES; tries++ )
250      {
251         try
252         {
253            if ( getRole() == NodeRole.SERVER )
254            {
255               // This node is the server, then call the method locally
256               return callLocal(service,method,parameterTypes,parameters);
257            } else {
258               // This is a client node, so we must pass the call to server
259               CallResponse response = (CallResponse) client.sendAndWaitForResponse(new CallMessage(
260                        service,method,parameterTypes,parameters,false));
261               if ( response.getException() != null )
262                  throw response.getException();
263               return response.getReturnValue();
264            }
265         } catch ( CommException e ) {
266            // If all retries are exhausted, just throw the exception
267            if ( tries+1 >= CLIENT_RECONNECT_TRIES )
268               throw e;
269            logger.debug("communication error, retrying client call");
270         }
271      }
272      // Code shouldn't reach this
273      return null;
274   }
275 
276   /**
277    * Call this method on all nodes, including where the call originated.
278    * Note, broadcast calls do not have return value. It is guaranteed however,
279    * that any given node either received the call, or fell off the node
280    * network. It is not guaranteed however, that all calls were successful
281    * in their respective nodes.
282    */
283   public void callAll(String service, String method, Class[] parameterTypes,
284         Object[] parameters)
285   {
286      if ( getRole() == NodeRole.SERVER )
287      {
288         // We are the server, broadcast to all
289         server.broadcast(new CallMessage(service, method, parameterTypes,
290                  parameters,false));
291         // Now call locally
292         try
293         {
294            callLocal(service,method,parameterTypes,parameters);
295         } catch ( StoreException e ) {
296            logger.warn("local call after broadcast call failed for service method: "+service+"."+method,e);
297         }
298      } else {
299         // We are client, send the call all message to server
300         try
301         {
302            client.sendAndWaitForResponse(new CallMessage(service,method,parameterTypes,
303                     parameters,true));
304         } catch ( StoreException e ) {
305            logger.warn("exception when sending broadcast call: "+service+"."+method,e);
306         }
307      }
308   }
309 
310   /**
311    * Change the state physically to the given state.
312    */
313   private void changeState(NodeState newState)
314   {
315      eventDispatcher.notifyAll(
316            new NodeStateChangeEvent(index,state,newState));
317      state = newState;
318   }
319 
320   /**
321    * Ensure that the client given is properly disconnected.
322    * This is called by clients to ensure that the node knows about
323    * the disconnect.
324    */
325   void ensureDisconnect(NodeClient client)
326   {
327      synchronized ( stateMutex )
328      {
329         // Check if the client is current (can happen, that
330         // clients disconnect later due to threading, while another
331         // client is already present)
332         if ( client != this.client )
333            return;
334         // Ensure that the maximal level is INITIALIZED
335         if ( state.getLevel() <= NodeState.INITIALIZED.getLevel() )
336            return; // State is less or equal
337         ensureState(NodeState.INITIALIZED);
338      }
339   }
340 
341   /**
342    * Ensure that, if possible the given state is reached. This method
343    * makes all the necessary calls of state changes up and down.
344    */
345   void ensureState(NodeState newState)
346   {
347      synchronized ( stateMutex )
348      {
349         logger.debug("ensuring state: "+newState+", current state: "+state);
350         if ( state == newState )
351            return;
352         // Separate two events: When state is increased, and when
353         // state is decreased.
354         if ( newState.getLevel() > state.getLevel() )
355         {
356            // Node is offline, set to uninitialized (requires no additional
357            // operations)
358            if ( (state==NodeState.OFFLINE) && 
359                  (newState.getLevel()>NodeState.OFFLINE.getLevel()) )
360               changeState(NodeState.UNINITIALIZED);
361            // Trying to initialize
362            if ( (state==NodeState.UNINITIALIZED) && 
363                  (newState.getLevel()>NodeState.UNINITIALIZED.getLevel()) )
364            {
365               // Initialize (determine identity, launch server)
366               initialize();
367               // Set state
368               changeState(NodeState.INITIALIZED);
369            }
370            // Trying to connect
371            if ( (state==NodeState.INITIALIZED) && 
372                  (newState.getLevel()>NodeState.INITIALIZED.getLevel()) )
373            {
374               // Initialized, now determine server and connect to it.
375               connect();
376               changeState(NodeState.CONNECTED);
377            }
378         } else {
379            // State is decreased
380            if ( (state==NodeState.CONNECTED) && 
381                  (newState.getLevel()<NodeState.CONNECTED.getLevel()) )
382            {
383               // Set state
384               changeState(NodeState.INITIALIZED);
385               // Disconnect client or server, whichever is used
386               if ( client != null )
387               {
388                  client.close();
389                  client = null;
390               } 
391            }
392            if ( (state==NodeState.INITIALIZED) && 
393                  (newState.getLevel()<NodeState.INITIALIZED.getLevel()) )
394            {
395               // Set state
396               changeState(NodeState.UNINITIALIZED);
397               // Remove identity from database
398               clearNode();
399               // Shutdown server
400               server.close();
401               server = null;
402            }
403         }
404      }
405      logger.debug("state: "+state+", successfully established, requested was: "+newState);
406   }
407 
408   /**
409    * Load the node list into a list.
410    */
411   private List loadNodeList(Transaction transaction, int searchIndex)
412   {
413      ArrayList resultList = new ArrayList();
414      ArrayList orderByList = new ArrayList();
415      orderByList.add(new OrderBy(
416               new ReferenceTerm("persistence_nodes",null,"nodeindex"),
417               OrderBy.ASCENDING));
418      Expression expr = null;
419      if ( searchIndex > 0 )
420      {
421         expr = new Expression();
422         expr.add(new ReferenceTerm("persistence_nodes",null,"nodeindex"));
423         expr.add("<");
424         expr.add(new ConstantTerm(new Integer(searchIndex)));
425      }
426      QueryStatement stmt = new QueryStatement("persistence_nodes",expr,orderByList);
427      SearchResult result = database.search(transaction,stmt,null);
428      for ( int i=0; i<result.getResult().size(); i++ )
429      {
430         Map attributes = (Map) result.getResult().get(i);
431         NodeEntry entry = new NodeEntry();
432         entry.ips=(String) attributes.get("ips");
433         entry.port=((Number) attributes.get("command_port")).intValue();
434         entry.index=((Number) attributes.get("nodeindex")).intValue();
435         resultList.add(entry);
436      }
437      return resultList;
438   }
439 
440   /**
441    * Connect this nodes to the node network. This method reads all nodes from the node
442    * table which are below this node, determines the first node from that which is
443    * alive. This method may block until the status of a node is clear. If there are no
444    * suitable nodes, this node becomes the server.
445    */
446   private void connect()
447   {
448      logger.debug("node connecting to server...");
449      // New client
450      client = null;
451      List<NodeEntry> nodeList = null;
452      // Reload server node list, check if some nodes disapeared,
453      // so we don't have to check those
454      Transaction transaction = transactionTracker.getTransaction();
455      transaction.begin();
456      try
457      {
458         nodeList = loadNodeList(transaction,index);
459      } catch ( Exception e ) {
460         logger.error("could not load node list",e);
461         transaction.markRollbackOnly();
462      } finally {
463         transaction.commit();
464      }
465      // Try to select a server node from list (the first alive)
466      NodeEntry serverNode = null;
467      for ( int i=0; (i<nodeList.size()) && (serverNode==null); i++ )
468      {
469         NodeEntry entry = (NodeEntry) nodeList.get(i);
470         if ( isAlive(entry.ips,entry.port) )
471            serverNode = entry;
472      }
473      // If server node is found, connect to that, else make this a server
474      if ( serverNode == null )
475      {
476         logger.debug("node will be the appointed server.");
477         // We are the server, so clear all nodes that are dead
478         clearNodeList(nodeList);
479         serverIndex = index;
480      } else {
481         logger.debug("determined to be client node, server is: "+serverNode.ips+":"+serverNode.port);
482         // If server node is not null, we should connect to it
483         Socket socket = connect(serverNode.ips,serverNode.port);
484         client = new NodeClient(this,socket,index,serverNode.index);
485         // If connection is established, send and wait for the first
486         // "init" message
487         CommResponse response = client.sendAndWaitForResponse(new InitMessage(getId()));
488         if ( response.getResponseCode() != CommResponse.ACTION_SUCCESS )
489            throw new StoreException("server was not ready to accept commands, response code: "+
490                  response.getResponseCode());
491         serverIndex = serverNode.index;
492      }
493   }
494 
495   /**
496    * Connect to server. This does not need to be synchronized, because it is
497    * called from node manager state update, which is synchronized.
498    */
499   private Socket connect(String hostips, int hostport)
500   {
501      // Cook ips
502      try
503      {
504         if ( "".equals(hostips) )
505            hostips = InetAddress.getLocalHost().getHostAddress();
506      } catch ( Exception e ) {
507         throw new StoreException("can not determine local adapter, but there is another node, which would need to be contacted.",e);
508      }
509      // Connect
510      logger.debug("(re)connecting to server: "+hostips+":"+hostport);
511      try
512      {
513         // Connect physically
514         StringTokenizer tokens = new StringTokenizer(hostips,",");
515         while ( tokens.hasMoreTokens() )
516         {
517            try
518            {
519               String ip = tokens.nextToken();
520               Socket socket = new Socket();
521               socket.connect(new InetSocketAddress(ip,hostport),SOCKET_CONNECT_TIMEOUT);
522               logger.debug("established connection with: "+ip+", out of "+hostips);
523               return socket;
524            } catch ( Exception e ) {
525               if ( ! tokens.hasMoreTokens() )
526                  throw e; // If no more ips, throw exception
527            }
528         }
529      } catch ( StoreException e ) {
530         throw e;
531      } catch ( Exception e ) {
532         throw new StoreException("exception while trying to connect "+hostips+":"+hostport,e);
533      }
534      return null;
535   }
536 
537   /**
538    * Clear this node from the database.
539    */
540   private void clearNode()
541   {
542      logger.debug("clearing node from database: "+index);
543      Connection conn = database.getConnectionSource().getConnection();
544      try
545      {
546         PreparedStatement pstmt = conn.prepareStatement("delete from nodes where nodeindex = "+index);
547         pstmt.executeUpdate();
548         pstmt.close();
549         conn.commit();
550      } catch ( Exception e ) {
551         logger.error("error while clearing node: "+index);
552      } finally {
553         database.getConnectionSource().releaseConnection(conn);
554      }
555      logger.debug("node cleared from database.");
556   }
557 
558   /**
559    * Clear node list. This is called as part of the initialization
560    * process, if all previous node entries in the database
561    * are dead.
562    */
563   private void clearNodeList(List<NodeEntry> nodeList)
564   {
565      Transaction transaction = transactionTracker.getTransaction();
566      transaction.begin();
567      try
568      {
569         // Go through all nodes on our list, and remove all dead ones
570         for ( int i=0; i<nodeList.size(); i++ )
571         {
572            NodeEntry entry = (NodeEntry) nodeList.get(i);
573            Map attrs = new HashMap();
574            attrs.put("nodeindex",new Integer(entry.index));
575            database.remove(transaction,"persistence_nodes",attrs);
576         }
577      } catch ( StoreException e ) {
578         transaction.markRollbackOnly();
579         throw e;
580      } catch ( Exception e ) {
581         transaction.markRollbackOnly();
582         throw new StoreException("exception while deleting dead nodes from database",e);
583      } finally {
584         transaction.commit();
585      }
586   }
587 
588   /**
589    * Initialize node identity.
590    */
591   private void initialize()
592   {
593      logger.debug("node initializing...");
594      Transaction transaction = transactionTracker.getTransaction();
595      transaction.begin();
596      try
597      {
598         // First ensure that table exists
599         HashMap tableAttrs = new HashMap();
600         tableAttrs.put("nodeindex",Integer.class);
601         tableAttrs.put("ips",String.class);
602         tableAttrs.put("command_port",Integer.class);
603         ArrayList tableKeys = new ArrayList();
604         tableKeys.add("nodeindex");
605         database.ensureTable(transaction,"persistence_nodes",
606               tableAttrs,tableKeys,true);
607         // Load nodes table
608         List<NodeEntry> nodeList = loadNodeList(transaction,0);
609         // Determine identity
610         index = 1;
611         if ( nodeList.size() > 0 )
612            index = 1 + ((NodeEntry) nodeList.get(nodeList.size()-1)).index;
613         // Start the node server
614         server = new NodeServer(this,eventDispatcher,index);
615         ips = getHostAddresses();
616         server.bind();
617         int port = server.getServerSocket().getLocalPort();
618         // Insert my index, port and ip to the nodes table. Note, duplicate
619         // indices will not be allowed because of primary key.
620         if ( logger.isDebugEnabled() )
621            logger.debug("node identity determined, index: "+index+", address: "+ips+":"+port);
622         Map attrs = new HashMap();
623         attrs.put("nodeindex",new Integer(index));
624         attrs.put("ips",ips);
625         attrs.put("command_port",new Integer(port));
626         database.insert(transaction,"persistence_nodes",attrs);
627      } catch ( StoreException e ) {
628         logger.fatal("could not initialize node subsystem.",e);
629         transaction.markRollbackOnly();
630         throw e;
631      } catch ( Throwable e ) {
632         logger.fatal("could not initialize node subsystem.",e);
633         transaction.markRollbackOnly();
634         throw new StoreException("unexcepted error",e);
635      } finally {
636         transaction.commit();
637      }
638   }
639 
640   public static class NodeEntry
641   {
642      public String ips;
643      public int port;
644      public int index;
645 
646      public int hashCode()
647      {
648         return index;
649      }
650 
651      public boolean equals(Object obj)
652      {
653         if ( ! (obj instanceof NodeEntry) )
654            return false;
655         return index == ((NodeEntry)obj).index;
656      }
657      
658      public String toString()
659      {
660         return "[Node: "+ips+":"+port+", index: "+index+"]";
661      }
662   }
663 
664   /**
665    * If anything changed, reload the configuration values.
666    */
667   public void configurationChanged(ConfigurationEvent event)
668   {
669      if ( (event.getPropertyName()!=null) && 
670            (event.getPropertyName().startsWith("beankeeper.n")) )
671         configurationReload();
672   }
673 
674   /**
675    * Just read in the new configuration values, and use them from now on.
676    */
677   public void configurationReload()
678   {
679      CLIENT_RECONNECT_TRIES = configurationTracker.getConfiguration().
680         getInt("beankeeper.node.client_reconnect_tries",2);
681      SOCKET_CONNECT_TIMEOUT = configurationTracker.getConfiguration().
682         getInt("beankeeper.net.connect_timeout",3000);
683   }
684}
685 
686 

[all classes][hu.netmind.beankeeper.node.impl]
EMMA 2.0.5312debian (C) Vladimir Roubtsov