EMMA Coverage Report (generated Sun May 02 20:42:29 CEST 2010)
[all classes][hu.netmind.beankeeper.node.impl]

COVERAGE SUMMARY FOR SOURCE FILE [NodeServer.java]

nameclass, %method, %block, %line, %
NodeServer.java100% (2/2)82%  (14/17)66%  (375/564)74%  (91.6/123)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class NodeServer$ClientHandler100% (1/1)75%  (3/4)64%  (138/215)69%  (29.8/43)
toString (): String 0%   (0/1)0%   (0/12)0%   (0/1)
onIncoming (CommObject): void 100% (1/1)62%  (88/143)66%  (19/29)
close (): void 100% (1/1)76%  (32/42)78%  (7.8/10)
NodeServer$ClientHandler (NodeServer, Socket): void 100% (1/1)100% (18/18)100% (3/3)
     
class NodeServer100% (1/1)85%  (11/13)68%  (237/349)77%  (61.8/80)
access$300 (): Logger 0%   (0/1)0%   (0/2)0%   (0/1)
setServerSocket (ServerSocket): void 0%   (0/1)0%   (0/4)0%   (0/2)
close (): void 100% (1/1)55%  (41/74)69%  (11.8/17)
broadcast (CommObject): void 100% (1/1)56%  (53/94)74%  (14.8/20)
run (): void 100% (1/1)72%  (64/89)81%  (15.5/19)
bind (): void 100% (1/1)82%  (31/38)78%  (7/9)
<static initializer> 100% (1/1)100% (4/4)100% (1/1)
NodeServer (NodeManagerImpl, EventDispatcher, int): void 100% (1/1)100% (29/29)100% (10/10)
access$000 (NodeServer): int 100% (1/1)100% (3/3)100% (1/1)
access$100 (NodeServer): ArrayList 100% (1/1)100% (3/3)100% (1/1)
access$200 (NodeServer): EventDispatcher 100% (1/1)100% (3/3)100% (1/1)
access$400 (NodeServer): NodeManagerImpl 100% (1/1)100% (3/3)100% (1/1)
getServerSocket (): ServerSocket 100% (1/1)100% (3/3)100% (1/1)

1/**
2 * Copyright (C) 2006 NetMind Consulting Bt.
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 3 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 * Lesser General Public License for more details.
13 *
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with this library; if not, write to the Free Software
16 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17 */
18 
19package hu.netmind.beankeeper.node.impl;
20 
21import hu.netmind.beankeeper.common.StoreException;
22import hu.netmind.beankeeper.node.NodeManager.NodeRole;
23import hu.netmind.beankeeper.node.NodeManager.NodeState;
24import hu.netmind.beankeeper.node.event.RemoteStateChangeEvent;
25import hu.netmind.beankeeper.event.EventDispatcher;
26import java.io.*;
27import java.net.*;
28import java.util.*;
29import org.apache.log4j.Logger;
30 
31/**
32 * This class is the implementation of the node's server component. It has a dual purpose,
33 * wait for clients to connect and talk with clients, but this is only activated if this
34 * node becomes the server node. If it's a client node, this implementation only allocates
35 * the server port, but turns down clients who try to connect.
36 * @author Brautigam Robert
37 * @version Revision: $Revision$
38 */
39public class NodeServer implements Runnable
40{
41   private static Logger logger = Logger.getLogger(NodeServer.class);
42   private ServerSocket serverSocket;
43   private ArrayList clientHandlers = new ArrayList();
44   private boolean running = true;
45   private int nodeIndex = 0;
46 
47   private NodeManagerImpl nodeManager = null;
48   private EventDispatcher eventDispatcher = null;
49 
50   public NodeServer(NodeManagerImpl nodeManager, EventDispatcher eventDispatcher, int nodeIndex)
51   {
52      this.nodeManager=nodeManager;
53      this.eventDispatcher=eventDispatcher;
54      this.nodeIndex=nodeIndex;
55   }
56 
57   public ServerSocket getServerSocket()
58   {
59      return serverSocket;
60   }
61   private void setServerSocket(ServerSocket serverSocket)
62   {
63      this.serverSocket=serverSocket;
64   }
65 
66   /**
67    * Broadcast a message to all connected clients. Messages to clients
68    * do not have answers.
69    */
70   public void broadcast(CommObject obj)
71   {
72      if ( logger.isDebugEnabled() )
73         logger.debug("broadcasting message: "+obj);
74      // Safe-copy all clients
75      LinkedList<ClientHandler> handlers = null;
76      synchronized ( clientHandlers )
77      {
78         handlers = new LinkedList<ClientHandler>(clientHandlers);
79      }
80      // Broadcast message. First, all requests are sent in batch,
81      // and then all responses are waited for. This way each node
82      // can work on the message in parallel.
83      Iterator<ClientHandler> handlerIterator = handlers.iterator();
84      while ( handlerIterator.hasNext() )
85      {
86         ClientHandler handler = handlerIterator.next();
87         try
88         {
89            handler.send(obj);
90         } catch ( Exception e ) {
91            handlerIterator.remove();
92            logger.warn("could not deliver message '"+obj+"' to client.",e);
93         }
94      }
95      // So now watch for client to receive responses to those requests
96      for ( ClientHandler handler : handlers )
97      {
98         if ( logger.isDebugEnabled() )
99            logger.debug("waiting for broadcast response from: "+handler);
100         handler.waitForResponse(obj);
101      }
102   }
103 
104   /**
105    * Setup server and bind to a random port.
106    */
107   public void bind()
108   {
109      try
110      {
111         // Setup channel
112         serverSocket = new ServerSocket(0); // Choose an ip:port
113         // Start listening
114         Thread listenerThread = new Thread(this);
115         listenerThread.setName("BeanKeeper Accept (Node: "+nodeIndex+")");
116         listenerThread.setDaemon(true);
117         listenerThread.start();
118      } catch ( Exception e ) {
119         throw new StoreException("exception while binding to server port",e);
120      }
121   }
122 
123   /**
124    * Disconnect the server.
125    */
126   public void close()
127   {
128      // Deactivate server
129      List<ClientHandler> handlers = null;
130      synchronized ( clientHandlers )
131      {
132         handlers = new ArrayList<ClientHandler>(clientHandlers);
133      }
134      for ( ClientHandler handler : handlers )
135      {
136         if ( logger.isDebugEnabled() )
137            logger.debug("closing server node handler: "+handler);
138         try
139         {
140            handler.close();
141         } catch ( Exception e ) {
142            logger.debug("couldn't close handler "+handler+", continuing");
143         }
144      }
145      // Set to not running
146      running = false;
147      // Close server socket
148      try
149      {
150         serverSocket.close();
151      } catch ( Exception e ) {
152         logger.error("error while disconnecting server socket",e);
153      }
154   }
155 
156   /**
157    * Run listener thread. This thread accepts incoming connections,
158    * and incoming socket data.
159    */
160   public void run()
161   {
162      try
163      {
164         while ( running )
165         {
166            // Listen for socket
167            Socket socket = serverSocket.accept();
168            logger.debug("server received connect from client...");
169            // Create client handler
170            try
171            {
172               ClientHandler handler = new ClientHandler(socket);
173               synchronized ( clientHandlers )
174               {
175                  clientHandlers.add(handler);
176               }
177            } catch ( StoreException e ) {
178               // Probably just testing connection, don't worry
179               logger.debug("handler couldn't be created because of: "+e.getMessage());
180            }
181         }
182         logger.debug("node server normal shutdown");
183      } catch ( Exception e ) {
184         if ( running )
185            logger.warn("server socket threw error",e);
186         else
187            logger.debug("server socket was shutdown: "+e.getMessage());
188      } finally {
189         // Node manager should return to uninitialized
190         nodeManager.ensureState(NodeState.UNINITIALIZED);
191      }
192   }
193 
194   /**
195    * Handle socket communications.
196    */
197   public class ClientHandler extends NetEndpoint
198   {
199      private int index;
200 
201      public ClientHandler(Socket socket)
202      {
203         super(socket,"BeanKeeper Client Handler (Node: "+nodeIndex+")");
204      }
205 
206      public String toString()
207      {
208         return "[Handler for node: "+index+"]";
209      }
210 
211      /**
212       * Override close to send events and to deregister from node server.
213       */
214      public void close()
215      {
216         super.close();
217         // Remove from socket handlers
218         synchronized ( clientHandlers )
219         {
220            clientHandlers.remove(this);
221         }
222         // Send disconnect event
223         try
224         {
225            if ( index != 0 ) // If a real client was on
226               eventDispatcher.notifyAll(
227                     new RemoteStateChangeEvent(index));
228         } catch ( StoreException e ) {
229            // Service was no longer available, no problem,
230            // don't have to handle the event then anyway
231            logger.debug("could not send out disconnect event, probably already shut down",e);
232         }
233      }
234 
235      /**
236       * Handle objects from clients.
237       */
238      public void onIncoming(CommObject obj)
239      {
240         try
241         {
242            // Initialization message from a client
243            if ( obj instanceof InitMessage )
244            {
245               index = ((InitMessage) obj).getNodeId();
246               // If not active, maybe the client node detected earlier
247               // that the current server node is out, so check with
248               // a re-connect. If the server node did not die, we lose
249               // our cache and current transactions! But if the server
250               // node really died, we reconnect and accept the client's
251               // init request immediately.
252               if ( nodeManager.getRole() == NodeRole.CLIENT )
253               {
254                  logger.info("server got init request from client: "+index+
255                        " but was not yet activated, try to reconnect to server to check, whether it's available");
256                  nodeManager.ensureState(NodeState.INITIALIZED);
257                  nodeManager.ensureState(NodeState.CONNECTED);
258               }
259               // If active, accept init request
260               CommResponse response = null;
261               if ( nodeManager.getRole() == NodeRole.CLIENT )
262                  response = new CommResponse(obj,CommResponse.SERVER_INACTIVE); // Server not active
263               else
264                  response = new CommResponse(obj,CommResponse.ACTION_SUCCESS); // Ok
265               send(response);
266            }
267            // Message to call some method
268            if ( obj instanceof CallMessage )
269            {
270               CallMessage msg = (CallMessage) obj;
271               CommResponse response = null;
272               if ( msg.isBroadcast() )
273               {
274                  // This is a broadcast and we're the server, so
275                  // broadcast the signal, and return ok
276                  nodeManager.callAll(msg.getService(),msg.getMethod(),
277                        msg.getParameterTypes(),msg.getParameters());
278                  response = new CallResponse(obj,null,null);
279               } else {
280                  try
281                  {
282                     // Execute the call locally
283                     Object result = nodeManager.callLocal(msg.getService(),msg.getMethod(),
284                           msg.getParameterTypes(),msg.getParameters());
285                     response = new CallResponse(obj,result,null);
286                  } catch ( StoreException e ) {
287                     logger.warn("called operation yielded: ",e);
288                     response = new CallResponse(obj,null,e);
289 
290                  }
291               }
292               send(response);
293            }
294         } catch ( StoreException e ) {
295            logger.warn("message request was not successful.",e);
296            send(new CallResponse(obj,null,e));
297         }
298      }
299   }
300   
301}
302 
303 

[all classes][hu.netmind.beankeeper.node.impl]
EMMA 2.0.5312debian (C) Vladimir Roubtsov