| 1 | /** |
| 2 | * Copyright (C) 2006 NetMind Consulting Bt. |
| 3 | * |
| 4 | * This library is free software; you can redistribute it and/or |
| 5 | * modify it under the terms of the GNU Lesser General Public |
| 6 | * License as published by the Free Software Foundation; either |
| 7 | * version 3 of the License, or (at your option) any later version. |
| 8 | * |
| 9 | * This library is distributed in the hope that it will be useful, |
| 10 | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 11 | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| 12 | * Lesser General Public License for more details. |
| 13 | * |
| 14 | * You should have received a copy of the GNU Lesser General Public |
| 15 | * License along with this library; if not, write to the Free Software |
| 16 | * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA |
| 17 | */ |
| 18 | |
| 19 | package hu.netmind.beankeeper.node.impl; |
| 20 | |
| 21 | import hu.netmind.beankeeper.service.Service; |
| 22 | import hu.netmind.beankeeper.service.StoreContext; |
| 23 | import hu.netmind.beankeeper.common.StoreException; |
| 24 | import hu.netmind.beankeeper.parser.*; |
| 25 | import hu.netmind.beankeeper.event.EventDispatcher; |
| 26 | import hu.netmind.beankeeper.config.ExtendedConfigurationListener; |
| 27 | import hu.netmind.beankeeper.config.ConfigurationTracker; |
| 28 | import hu.netmind.beankeeper.node.NodeManager; |
| 29 | import hu.netmind.beankeeper.node.event.NodeStateChangeEvent; |
| 30 | import hu.netmind.beankeeper.node.NodeManager.NodeState; |
| 31 | import hu.netmind.beankeeper.node.NodeManager.NodeRole; |
| 32 | import hu.netmind.beankeeper.transaction.Transaction; |
| 33 | import hu.netmind.beankeeper.transaction.InternalTransactionTracker; |
| 34 | import hu.netmind.beankeeper.db.SearchResult; |
| 35 | import hu.netmind.beankeeper.db.Database; |
| 36 | import java.lang.reflect.Method; |
| 37 | import java.lang.reflect.InvocationTargetException; |
| 38 | import java.util.*; |
| 39 | import java.net.*; |
| 40 | import java.sql.Connection; |
| 41 | import java.sql.PreparedStatement; |
| 42 | import org.apache.log4j.Logger; |
| 43 | import org.apache.commons.configuration.event.ConfigurationEvent; |
| 44 | |
| 45 | /** |
| 46 | * This manager enables the Store to function on a peer-to-peer |
| 47 | * fashion with other Store instances which are pointed to the same |
| 48 | * database. The class takes care of all IP communication related |
| 49 | * work, such as reconnecting, communication protocoll, etc. All |
| 50 | * synchronization points must occur through this manager, which guarantees |
| 51 | * synchronization across all other Store instances. |
| 52 | * @author Brautigam Robert |
| 53 | * @version Revision: $Revision$ |
| 54 | */ |
| 55 | public class NodeManagerImpl implements NodeManager, ExtendedConfigurationListener |
| 56 | { |
| 57 | private static Logger logger = Logger.getLogger(NodeManagerImpl.class); |
| 58 | |
| 59 | public static int CLIENT_RECONNECT_TRIES = 2; |
| 60 | private static int SOCKET_CONNECT_TIMEOUT = 3000; |
| 61 | |
| 62 | private int index = 0; |
| 63 | private int serverIndex; |
| 64 | private String ips; |
| 65 | private NodeServer server; |
| 66 | private NodeClient client; |
| 67 | |
| 68 | private boolean running = true; |
| 69 | private NodeState state = NodeState.OFFLINE; |
| 70 | private Object stateMutex = new Object(); |
| 71 | |
| 72 | private ConfigurationTracker configurationTracker = null; // Injected |
| 73 | private EventDispatcher eventDispatcher = null; // Injected |
| 74 | private Database database = null; // Injected |
| 75 | private StoreContext context = null; // Injected |
| 76 | private InternalTransactionTracker transactionTracker = null; // Injected |
| 77 | |
| 78 | /** |
| 79 | * Construct node manager, establish identity, and make |
| 80 | * initial connection. |
| 81 | */ |
| 82 | public void init(Map parameters) |
| 83 | { |
| 84 | // Configure |
| 85 | configurationReload(); |
| 86 | configurationTracker.addListener(this); |
| 87 | // Initialize node, so we have a node id |
| 88 | ensureState(NodeState.INITIALIZED); |
| 89 | } |
| 90 | |
| 91 | public void release() |
| 92 | { |
| 93 | logger.debug("closing node manager."); |
| 94 | // Release all resources of state |
| 95 | try |
| 96 | { |
| 97 | ensureState(NodeState.UNINITIALIZED); |
| 98 | } catch ( Exception e ) { |
| 99 | logger.error("error while shutting down node manager",e); |
| 100 | } |
| 101 | configurationTracker.removeListener(this); |
| 102 | } |
| 103 | |
| 104 | public NodeState getState() |
| 105 | { |
| 106 | return state; |
| 107 | } |
| 108 | |
| 109 | public Integer getServerId() |
| 110 | { |
| 111 | ensureState(NodeState.CONNECTED); |
| 112 | if ( getState().getLevel() < NodeState.CONNECTED.getLevel() ) |
| 113 | throw new StoreException("node not yet connected, can not determine server id"); |
| 114 | return serverIndex; |
| 115 | } |
| 116 | |
| 117 | public Integer getId() |
| 118 | { |
| 119 | if ( getState().getLevel() < NodeState.INITIALIZED.getLevel() ) |
| 120 | throw new StoreException("node not yet initialized, identity is not known"); |
| 121 | return index; |
| 122 | } |
| 123 | |
| 124 | public NodeRole getRole() |
| 125 | { |
| 126 | ensureState(NodeState.CONNECTED); |
| 127 | if ( getState().getLevel() < NodeState.CONNECTED.getLevel() ) |
| 128 | throw new StoreException("node not yet connected, can not determine role"); |
| 129 | if ( client == null ) |
| 130 | return NodeRole.SERVER; |
| 131 | else |
| 132 | return NodeRole.CLIENT; |
| 133 | } |
| 134 | |
| 135 | /** |
| 136 | * Get the server addresses from interfaces. |
| 137 | */ |
| 138 | public static String getHostAddresses() |
| 139 | { |
| 140 | try |
| 141 | { |
| 142 | Enumeration interfaceEnumeration = NetworkInterface.getNetworkInterfaces(); |
| 143 | // Copy from enumeration to addresses vector, but filter loopback addresses |
| 144 | ArrayList addresses = new ArrayList(); |
| 145 | while ( interfaceEnumeration.hasMoreElements() ) |
| 146 | { |
| 147 | NetworkInterface intf = (NetworkInterface) interfaceEnumeration.nextElement(); |
| 148 | // Remove loopback addresses |
| 149 | Enumeration addressEnumeration = intf.getInetAddresses(); |
| 150 | while ( addressEnumeration.hasMoreElements() ) |
| 151 | { |
| 152 | InetAddress address = (InetAddress) addressEnumeration.nextElement(); |
| 153 | // Insert to addresses only if not loopback and not link local |
| 154 | if ( (! address.isLoopbackAddress()) && (! address.isLinkLocalAddress()) ) |
| 155 | addresses.add(address); |
| 156 | } |
| 157 | } |
| 158 | // Pick one address from the remaining address space |
| 159 | logger.debug("server available local addresses: "+addresses); |
| 160 | // Now, multiple addresses are in the list, so copy all of them |
| 161 | // into the result string. |
| 162 | StringBuffer ips = new StringBuffer(); |
| 163 | for ( int i=0; i<addresses.size(); i++ ) |
| 164 | { |
| 165 | InetAddress address = (InetAddress) addresses.get(i); |
| 166 | if ( ips.length() > 0 ) |
| 167 | ips.append(","); |
| 168 | ips.append(address.getHostAddress()); |
| 169 | } |
| 170 | return ips.toString(); |
| 171 | } catch ( StoreException e ) { |
| 172 | throw e; |
| 173 | } catch ( Exception e ) { |
| 174 | throw new StoreException("exception while determining server address",e); |
| 175 | } |
| 176 | } |
| 177 | |
| 178 | /** |
| 179 | * Determine if an address is available. |
| 180 | */ |
| 181 | public static boolean isAlive(String ips, int port) |
| 182 | { |
| 183 | if ( logger.isDebugEnabled() ) |
| 184 | logger.debug("trying to reach: "+ips+":"+port); |
| 185 | try |
| 186 | { |
| 187 | if ( "".equals(ips) ) |
| 188 | ips = InetAddress.getLocalHost().getHostAddress(); |
| 189 | } catch ( Exception e ) { |
| 190 | throw new StoreException("can not determine local adapter, but there is another node, which would need to be contacted.",e); |
| 191 | } |
| 192 | StringTokenizer tokens = new StringTokenizer(ips,","); |
| 193 | while ( tokens.hasMoreTokens() ) |
| 194 | { |
| 195 | String ip = tokens.nextToken(); |
| 196 | if ( logger.isDebugEnabled() ) |
| 197 | logger.debug("determining whether '"+ip+":"+port+"' is alive..."); |
| 198 | try |
| 199 | { |
| 200 | Socket socket = new Socket(); |
| 201 | socket.connect(new InetSocketAddress(ip,port),SOCKET_CONNECT_TIMEOUT); |
| 202 | socket.close(); |
| 203 | return true; // Success, so return true |
| 204 | } catch ( Exception e ) { |
| 205 | logger.debug("unreachable node at '"+ip+":"+port+", "+e.getMessage()); |
| 206 | } |
| 207 | } |
| 208 | logger.debug("could not reach any of the ips given for node"); |
| 209 | return false; |
| 210 | } |
| 211 | |
| 212 | /** |
| 213 | * Make the reflection call locally, for the local services. |
| 214 | */ |
| 215 | public Object callLocal(String service, String method, Class[] parameterTypes, |
| 216 | Object[] parameters) |
| 217 | { |
| 218 | ensureState(NodeState.CONNECTED); |
| 219 | // Get the service, and with the help of relfection, call the named |
| 220 | // method |
| 221 | Service serviceObject = context.getService(service); |
| 222 | try |
| 223 | { |
| 224 | Method methodObject = serviceObject.getClass().getMethod(method, parameterTypes); |
| 225 | return methodObject.invoke(serviceObject,parameters); |
| 226 | } catch ( InvocationTargetException e ) { |
| 227 | if ( e.getCause() instanceof StoreException ) |
| 228 | throw (StoreException) e.getCause(); |
| 229 | throw new StoreException("received exception while invoking '"+method+"', of service: "+service,e.getCause()); |
| 230 | } catch ( StoreException e ) { |
| 231 | throw e; |
| 232 | } catch ( Exception e ) { |
| 233 | throw new StoreException("could not invoke method '"+method+"', of service: "+service,e); |
| 234 | } |
| 235 | } |
| 236 | |
| 237 | /** |
| 238 | * Make a potentially remote call to the given service, with given |
| 239 | * parameters. This call will always go to the server |
| 240 | * node for execution. If this is the server node, then |
| 241 | * the service will be called locally. |
| 242 | * If this node is a client, and the communication fails |
| 243 | * to the server, the communication is retried once. |
| 244 | * @return The object that was returned from RPC. |
| 245 | */ |
| 246 | public Object callServer(String service, String method, Class[] parameterTypes, |
| 247 | Object[] parameters) |
| 248 | { |
| 249 | for ( int tries = 0; tries<CLIENT_RECONNECT_TRIES; tries++ ) |
| 250 | { |
| 251 | try |
| 252 | { |
| 253 | if ( getRole() == NodeRole.SERVER ) |
| 254 | { |
| 255 | // This node is the server, then call the method locally |
| 256 | return callLocal(service,method,parameterTypes,parameters); |
| 257 | } else { |
| 258 | // This is a client node, so we must pass the call to server |
| 259 | CallResponse response = (CallResponse) client.sendAndWaitForResponse(new CallMessage( |
| 260 | service,method,parameterTypes,parameters,false)); |
| 261 | if ( response.getException() != null ) |
| 262 | throw response.getException(); |
| 263 | return response.getReturnValue(); |
| 264 | } |
| 265 | } catch ( CommException e ) { |
| 266 | // If all retries are exhausted, just throw the exception |
| 267 | if ( tries+1 >= CLIENT_RECONNECT_TRIES ) |
| 268 | throw e; |
| 269 | logger.debug("communication error, retrying client call"); |
| 270 | } |
| 271 | } |
| 272 | // Code shouldn't reach this |
| 273 | return null; |
| 274 | } |
| 275 | |
| 276 | /** |
| 277 | * Call this method on all nodes, including where the call originated. |
| 278 | * Note, broadcast calls do not have return value. It is guaranteed however, |
| 279 | * that any given node either received the call, or fell off the node |
| 280 | * network. It is not guaranteed however, that all calls were successful |
| 281 | * in their respective nodes. |
| 282 | */ |
| 283 | public void callAll(String service, String method, Class[] parameterTypes, |
| 284 | Object[] parameters) |
| 285 | { |
| 286 | if ( getRole() == NodeRole.SERVER ) |
| 287 | { |
| 288 | // We are the server, broadcast to all |
| 289 | server.broadcast(new CallMessage(service, method, parameterTypes, |
| 290 | parameters,false)); |
| 291 | // Now call locally |
| 292 | try |
| 293 | { |
| 294 | callLocal(service,method,parameterTypes,parameters); |
| 295 | } catch ( StoreException e ) { |
| 296 | logger.warn("local call after broadcast call failed for service method: "+service+"."+method,e); |
| 297 | } |
| 298 | } else { |
| 299 | // We are client, send the call all message to server |
| 300 | try |
| 301 | { |
| 302 | client.sendAndWaitForResponse(new CallMessage(service,method,parameterTypes, |
| 303 | parameters,true)); |
| 304 | } catch ( StoreException e ) { |
| 305 | logger.warn("exception when sending broadcast call: "+service+"."+method,e); |
| 306 | } |
| 307 | } |
| 308 | } |
| 309 | |
| 310 | /** |
| 311 | * Change the state physically to the given state. |
| 312 | */ |
| 313 | private void changeState(NodeState newState) |
| 314 | { |
| 315 | eventDispatcher.notifyAll( |
| 316 | new NodeStateChangeEvent(index,state,newState)); |
| 317 | state = newState; |
| 318 | } |
| 319 | |
| 320 | /** |
| 321 | * Ensure that the client given is properly disconnected. |
| 322 | * This is called by clients to ensure that the node knows about |
| 323 | * the disconnect. |
| 324 | */ |
| 325 | void ensureDisconnect(NodeClient client) |
| 326 | { |
| 327 | synchronized ( stateMutex ) |
| 328 | { |
| 329 | // Check if the client is current (can happen, that |
| 330 | // clients disconnect later due to threading, while another |
| 331 | // client is already present) |
| 332 | if ( client != this.client ) |
| 333 | return; |
| 334 | // Ensure that the maximal level is INITIALIZED |
| 335 | if ( state.getLevel() <= NodeState.INITIALIZED.getLevel() ) |
| 336 | return; // State is less or equal |
| 337 | ensureState(NodeState.INITIALIZED); |
| 338 | } |
| 339 | } |
| 340 | |
| 341 | /** |
| 342 | * Ensure that, if possible the given state is reached. This method |
| 343 | * makes all the necessary calls of state changes up and down. |
| 344 | */ |
| 345 | void ensureState(NodeState newState) |
| 346 | { |
| 347 | synchronized ( stateMutex ) |
| 348 | { |
| 349 | logger.debug("ensuring state: "+newState+", current state: "+state); |
| 350 | if ( state == newState ) |
| 351 | return; |
| 352 | // Separate two events: When state is increased, and when |
| 353 | // state is decreased. |
| 354 | if ( newState.getLevel() > state.getLevel() ) |
| 355 | { |
| 356 | // Node is offline, set to uninitialized (requires no additional |
| 357 | // operations) |
| 358 | if ( (state==NodeState.OFFLINE) && |
| 359 | (newState.getLevel()>NodeState.OFFLINE.getLevel()) ) |
| 360 | changeState(NodeState.UNINITIALIZED); |
| 361 | // Trying to initialize |
| 362 | if ( (state==NodeState.UNINITIALIZED) && |
| 363 | (newState.getLevel()>NodeState.UNINITIALIZED.getLevel()) ) |
| 364 | { |
| 365 | // Initialize (determine identity, launch server) |
| 366 | initialize(); |
| 367 | // Set state |
| 368 | changeState(NodeState.INITIALIZED); |
| 369 | } |
| 370 | // Trying to connect |
| 371 | if ( (state==NodeState.INITIALIZED) && |
| 372 | (newState.getLevel()>NodeState.INITIALIZED.getLevel()) ) |
| 373 | { |
| 374 | // Initialized, now determine server and connect to it. |
| 375 | connect(); |
| 376 | changeState(NodeState.CONNECTED); |
| 377 | } |
| 378 | } else { |
| 379 | // State is decreased |
| 380 | if ( (state==NodeState.CONNECTED) && |
| 381 | (newState.getLevel()<NodeState.CONNECTED.getLevel()) ) |
| 382 | { |
| 383 | // Set state |
| 384 | changeState(NodeState.INITIALIZED); |
| 385 | // Disconnect client or server, whichever is used |
| 386 | if ( client != null ) |
| 387 | { |
| 388 | client.close(); |
| 389 | client = null; |
| 390 | } |
| 391 | } |
| 392 | if ( (state==NodeState.INITIALIZED) && |
| 393 | (newState.getLevel()<NodeState.INITIALIZED.getLevel()) ) |
| 394 | { |
| 395 | // Set state |
| 396 | changeState(NodeState.UNINITIALIZED); |
| 397 | // Remove identity from database |
| 398 | clearNode(); |
| 399 | // Shutdown server |
| 400 | server.close(); |
| 401 | server = null; |
| 402 | } |
| 403 | } |
| 404 | } |
| 405 | logger.debug("state: "+state+", successfully established, requested was: "+newState); |
| 406 | } |
| 407 | |
| 408 | /** |
| 409 | * Load the node list into a list. |
| 410 | */ |
| 411 | private List loadNodeList(Transaction transaction, int searchIndex) |
| 412 | { |
| 413 | ArrayList resultList = new ArrayList(); |
| 414 | ArrayList orderByList = new ArrayList(); |
| 415 | orderByList.add(new OrderBy( |
| 416 | new ReferenceTerm("persistence_nodes",null,"nodeindex"), |
| 417 | OrderBy.ASCENDING)); |
| 418 | Expression expr = null; |
| 419 | if ( searchIndex > 0 ) |
| 420 | { |
| 421 | expr = new Expression(); |
| 422 | expr.add(new ReferenceTerm("persistence_nodes",null,"nodeindex")); |
| 423 | expr.add("<"); |
| 424 | expr.add(new ConstantTerm(new Integer(searchIndex))); |
| 425 | } |
| 426 | QueryStatement stmt = new QueryStatement("persistence_nodes",expr,orderByList); |
| 427 | SearchResult result = database.search(transaction,stmt,null); |
| 428 | for ( int i=0; i<result.getResult().size(); i++ ) |
| 429 | { |
| 430 | Map attributes = (Map) result.getResult().get(i); |
| 431 | NodeEntry entry = new NodeEntry(); |
| 432 | entry.ips=(String) attributes.get("ips"); |
| 433 | entry.port=((Number) attributes.get("command_port")).intValue(); |
| 434 | entry.index=((Number) attributes.get("nodeindex")).intValue(); |
| 435 | resultList.add(entry); |
| 436 | } |
| 437 | return resultList; |
| 438 | } |
| 439 | |
| 440 | /** |
| 441 | * Connect this nodes to the node network. This method reads all nodes from the node |
| 442 | * table which are below this node, determines the first node from that which is |
| 443 | * alive. This method may block until the status of a node is clear. If there are no |
| 444 | * suitable nodes, this node becomes the server. |
| 445 | */ |
| 446 | private void connect() |
| 447 | { |
| 448 | logger.debug("node connecting to server..."); |
| 449 | // New client |
| 450 | client = null; |
| 451 | List<NodeEntry> nodeList = null; |
| 452 | // Reload server node list, check if some nodes disapeared, |
| 453 | // so we don't have to check those |
| 454 | Transaction transaction = transactionTracker.getTransaction(); |
| 455 | transaction.begin(); |
| 456 | try |
| 457 | { |
| 458 | nodeList = loadNodeList(transaction,index); |
| 459 | } catch ( Exception e ) { |
| 460 | logger.error("could not load node list",e); |
| 461 | transaction.markRollbackOnly(); |
| 462 | } finally { |
| 463 | transaction.commit(); |
| 464 | } |
| 465 | // Try to select a server node from list (the first alive) |
| 466 | NodeEntry serverNode = null; |
| 467 | for ( int i=0; (i<nodeList.size()) && (serverNode==null); i++ ) |
| 468 | { |
| 469 | NodeEntry entry = (NodeEntry) nodeList.get(i); |
| 470 | if ( isAlive(entry.ips,entry.port) ) |
| 471 | serverNode = entry; |
| 472 | } |
| 473 | // If server node is found, connect to that, else make this a server |
| 474 | if ( serverNode == null ) |
| 475 | { |
| 476 | logger.debug("node will be the appointed server."); |
| 477 | // We are the server, so clear all nodes that are dead |
| 478 | clearNodeList(nodeList); |
| 479 | serverIndex = index; |
| 480 | } else { |
| 481 | logger.debug("determined to be client node, server is: "+serverNode.ips+":"+serverNode.port); |
| 482 | // If server node is not null, we should connect to it |
| 483 | Socket socket = connect(serverNode.ips,serverNode.port); |
| 484 | client = new NodeClient(this,socket,index,serverNode.index); |
| 485 | // If connection is established, send and wait for the first |
| 486 | // "init" message |
| 487 | CommResponse response = client.sendAndWaitForResponse(new InitMessage(getId())); |
| 488 | if ( response.getResponseCode() != CommResponse.ACTION_SUCCESS ) |
| 489 | throw new StoreException("server was not ready to accept commands, response code: "+ |
| 490 | response.getResponseCode()); |
| 491 | serverIndex = serverNode.index; |
| 492 | } |
| 493 | } |
| 494 | |
| 495 | /** |
| 496 | * Connect to server. This does not need to be synchronized, because it is |
| 497 | * called from node manager state update, which is synchronized. |
| 498 | */ |
| 499 | private Socket connect(String hostips, int hostport) |
| 500 | { |
| 501 | // Cook ips |
| 502 | try |
| 503 | { |
| 504 | if ( "".equals(hostips) ) |
| 505 | hostips = InetAddress.getLocalHost().getHostAddress(); |
| 506 | } catch ( Exception e ) { |
| 507 | throw new StoreException("can not determine local adapter, but there is another node, which would need to be contacted.",e); |
| 508 | } |
| 509 | // Connect |
| 510 | logger.debug("(re)connecting to server: "+hostips+":"+hostport); |
| 511 | try |
| 512 | { |
| 513 | // Connect physically |
| 514 | StringTokenizer tokens = new StringTokenizer(hostips,","); |
| 515 | while ( tokens.hasMoreTokens() ) |
| 516 | { |
| 517 | try |
| 518 | { |
| 519 | String ip = tokens.nextToken(); |
| 520 | Socket socket = new Socket(); |
| 521 | socket.connect(new InetSocketAddress(ip,hostport),SOCKET_CONNECT_TIMEOUT); |
| 522 | logger.debug("established connection with: "+ip+", out of "+hostips); |
| 523 | return socket; |
| 524 | } catch ( Exception e ) { |
| 525 | if ( ! tokens.hasMoreTokens() ) |
| 526 | throw e; // If no more ips, throw exception |
| 527 | } |
| 528 | } |
| 529 | } catch ( StoreException e ) { |
| 530 | throw e; |
| 531 | } catch ( Exception e ) { |
| 532 | throw new StoreException("exception while trying to connect "+hostips+":"+hostport,e); |
| 533 | } |
| 534 | return null; |
| 535 | } |
| 536 | |
| 537 | /** |
| 538 | * Clear this node from the database. |
| 539 | */ |
| 540 | private void clearNode() |
| 541 | { |
| 542 | logger.debug("clearing node from database: "+index); |
| 543 | Connection conn = database.getConnectionSource().getConnection(); |
| 544 | try |
| 545 | { |
| 546 | PreparedStatement pstmt = conn.prepareStatement("delete from nodes where nodeindex = "+index); |
| 547 | pstmt.executeUpdate(); |
| 548 | pstmt.close(); |
| 549 | conn.commit(); |
| 550 | } catch ( Exception e ) { |
| 551 | logger.error("error while clearing node: "+index); |
| 552 | } finally { |
| 553 | database.getConnectionSource().releaseConnection(conn); |
| 554 | } |
| 555 | logger.debug("node cleared from database."); |
| 556 | } |
| 557 | |
| 558 | /** |
| 559 | * Clear node list. This is called as part of the initialization |
| 560 | * process, if all previous node entries in the database |
| 561 | * are dead. |
| 562 | */ |
| 563 | private void clearNodeList(List<NodeEntry> nodeList) |
| 564 | { |
| 565 | Transaction transaction = transactionTracker.getTransaction(); |
| 566 | transaction.begin(); |
| 567 | try |
| 568 | { |
| 569 | // Go through all nodes on our list, and remove all dead ones |
| 570 | for ( int i=0; i<nodeList.size(); i++ ) |
| 571 | { |
| 572 | NodeEntry entry = (NodeEntry) nodeList.get(i); |
| 573 | Map attrs = new HashMap(); |
| 574 | attrs.put("nodeindex",new Integer(entry.index)); |
| 575 | database.remove(transaction,"persistence_nodes",attrs); |
| 576 | } |
| 577 | } catch ( StoreException e ) { |
| 578 | transaction.markRollbackOnly(); |
| 579 | throw e; |
| 580 | } catch ( Exception e ) { |
| 581 | transaction.markRollbackOnly(); |
| 582 | throw new StoreException("exception while deleting dead nodes from database",e); |
| 583 | } finally { |
| 584 | transaction.commit(); |
| 585 | } |
| 586 | } |
| 587 | |
| 588 | /** |
| 589 | * Initialize node identity. |
| 590 | */ |
| 591 | private void initialize() |
| 592 | { |
| 593 | logger.debug("node initializing..."); |
| 594 | Transaction transaction = transactionTracker.getTransaction(); |
| 595 | transaction.begin(); |
| 596 | try |
| 597 | { |
| 598 | // First ensure that table exists |
| 599 | HashMap tableAttrs = new HashMap(); |
| 600 | tableAttrs.put("nodeindex",Integer.class); |
| 601 | tableAttrs.put("ips",String.class); |
| 602 | tableAttrs.put("command_port",Integer.class); |
| 603 | ArrayList tableKeys = new ArrayList(); |
| 604 | tableKeys.add("nodeindex"); |
| 605 | database.ensureTable(transaction,"persistence_nodes", |
| 606 | tableAttrs,tableKeys,true); |
| 607 | // Load nodes table |
| 608 | List<NodeEntry> nodeList = loadNodeList(transaction,0); |
| 609 | // Determine identity |
| 610 | index = 1; |
| 611 | if ( nodeList.size() > 0 ) |
| 612 | index = 1 + ((NodeEntry) nodeList.get(nodeList.size()-1)).index; |
| 613 | // Start the node server |
| 614 | server = new NodeServer(this,eventDispatcher,index); |
| 615 | ips = getHostAddresses(); |
| 616 | server.bind(); |
| 617 | int port = server.getServerSocket().getLocalPort(); |
| 618 | // Insert my index, port and ip to the nodes table. Note, duplicate |
| 619 | // indices will not be allowed because of primary key. |
| 620 | if ( logger.isDebugEnabled() ) |
| 621 | logger.debug("node identity determined, index: "+index+", address: "+ips+":"+port); |
| 622 | Map attrs = new HashMap(); |
| 623 | attrs.put("nodeindex",new Integer(index)); |
| 624 | attrs.put("ips",ips); |
| 625 | attrs.put("command_port",new Integer(port)); |
| 626 | database.insert(transaction,"persistence_nodes",attrs); |
| 627 | } catch ( StoreException e ) { |
| 628 | logger.fatal("could not initialize node subsystem.",e); |
| 629 | transaction.markRollbackOnly(); |
| 630 | throw e; |
| 631 | } catch ( Throwable e ) { |
| 632 | logger.fatal("could not initialize node subsystem.",e); |
| 633 | transaction.markRollbackOnly(); |
| 634 | throw new StoreException("unexcepted error",e); |
| 635 | } finally { |
| 636 | transaction.commit(); |
| 637 | } |
| 638 | } |
| 639 | |
| 640 | public static class NodeEntry |
| 641 | { |
| 642 | public String ips; |
| 643 | public int port; |
| 644 | public int index; |
| 645 | |
| 646 | public int hashCode() |
| 647 | { |
| 648 | return index; |
| 649 | } |
| 650 | |
| 651 | public boolean equals(Object obj) |
| 652 | { |
| 653 | if ( ! (obj instanceof NodeEntry) ) |
| 654 | return false; |
| 655 | return index == ((NodeEntry)obj).index; |
| 656 | } |
| 657 | |
| 658 | public String toString() |
| 659 | { |
| 660 | return "[Node: "+ips+":"+port+", index: "+index+"]"; |
| 661 | } |
| 662 | } |
| 663 | |
| 664 | /** |
| 665 | * If anything changed, reload the configuration values. |
| 666 | */ |
| 667 | public void configurationChanged(ConfigurationEvent event) |
| 668 | { |
| 669 | if ( (event.getPropertyName()!=null) && |
| 670 | (event.getPropertyName().startsWith("beankeeper.n")) ) |
| 671 | configurationReload(); |
| 672 | } |
| 673 | |
| 674 | /** |
| 675 | * Just read in the new configuration values, and use them from now on. |
| 676 | */ |
| 677 | public void configurationReload() |
| 678 | { |
| 679 | CLIENT_RECONNECT_TRIES = configurationTracker.getConfiguration(). |
| 680 | getInt("beankeeper.node.client_reconnect_tries",2); |
| 681 | SOCKET_CONNECT_TIMEOUT = configurationTracker.getConfiguration(). |
| 682 | getInt("beankeeper.net.connect_timeout",3000); |
| 683 | } |
| 684 | } |
| 685 | |
| 686 | |