EMMA Coverage Report (generated Sun May 02 20:42:29 CEST 2010)
[all classes][hu.netmind.beankeeper.node.impl]

COVERAGE SUMMARY FOR SOURCE FILE [NetEndpoint.java]

nameclass, %method, %block, %line, %
NetEndpoint.java100% (3/3)96%  (23/24)69%  (538/777)80%  (107.3/134)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class NetEndpoint$ProcessingHandler100% (1/1)100% (2/2)42%  (96/226)59%  (16.5/28)
run (): void 100% (1/1)41%  (90/220)57%  (15.5/27)
NetEndpoint$ProcessingHandler (NetEndpoint): void 100% (1/1)100% (6/6)100% (1/1)
     
class NetEndpoint$IncomingHandler100% (1/1)100% (2/2)74%  (149/200)83%  (28.3/34)
run (): void 100% (1/1)74%  (143/194)83%  (27.3/33)
NetEndpoint$IncomingHandler (NetEndpoint): void 100% (1/1)100% (6/6)100% (1/1)
     
class NetEndpoint100% (1/1)95%  (19/20)83%  (293/351)87%  (62.5/72)
onIncoming (CommObject): void 0%   (0/1)0%   (0/1)0%   (0/1)
waitForResponse (CommObject): CommResponse 100% (1/1)67%  (66/98)73%  (13.1/18)
close (): void 100% (1/1)71%  (37/52)82%  (12.4/15)
send (CommObject): void 100% (1/1)81%  (43/53)92%  (11/12)
<static initializer> 100% (1/1)100% (4/4)100% (1/1)
NetEndpoint (Socket, String): void 100% (1/1)100% (90/90)100% (20/20)
access$000 (NetEndpoint): boolean 100% (1/1)100% (3/3)100% (1/1)
access$100 (NetEndpoint): ObjectInputStream 100% (1/1)100% (3/3)100% (1/1)
access$200 (): Logger 100% (1/1)100% (2/2)100% (1/1)
access$300 (NetEndpoint): List 100% (1/1)100% (3/3)100% (1/1)
access$400 (NetEndpoint): List 100% (1/1)100% (3/3)100% (1/1)
access$500 (NetEndpoint): List 100% (1/1)100% (3/3)100% (1/1)
access$600 (NetEndpoint): Object 100% (1/1)100% (3/3)100% (1/1)
access$700 (NetEndpoint): int 100% (1/1)100% (3/3)100% (1/1)
access$708 (NetEndpoint): int 100% (1/1)100% (8/8)100% (1/1)
access$710 (NetEndpoint): int 100% (1/1)100% (8/8)100% (1/1)
access$800 (NetEndpoint): String 100% (1/1)100% (3/3)100% (1/1)
isConnected (): boolean 100% (1/1)100% (3/3)100% (1/1)
onError (): void 100% (1/1)100% (1/1)100% (1/1)
sendAndWaitForResponse (CommObject): CommResponse 100% (1/1)100% (7/7)100% (2/2)

1/**
2 * Copyright (C) 2009 NetMind Consulting Bt.
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 3 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 * Lesser General Public License for more details.
13 *
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with this library; if not, write to the Free Software
16 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17 */
18 
19package hu.netmind.beankeeper.node.impl;
20 
21import hu.netmind.beankeeper.common.StoreException;
22import java.net.*;
23import java.io.*;
24import java.util.*;
25import org.apache.log4j.Logger;
26 
27/**
28 * A common implementation of an endpoint that handles socket communication
29 * outgoing and incoming objects. The incoming messages are handled by
30 * multiple threads.
31 * @author Brautigam Robert
32 * @version Revision: $Revision$
33 */
34public class NetEndpoint
35{
36   private static Logger logger = Logger.getLogger(NetEndpoint.class);
37   private static final int PROCESSOR_IDLE_TTL = 10*60*1000;
38   private static final int RESPONSE_TTL = 60*1000;
39 
40   private Object processorMutex = new Object();
41   private int processorCount = 0;
42   private Socket socket;
43   private ObjectOutputStream oOutput;
44   private ObjectInputStream oInput;
45   private boolean connected = true;
46   private List<CommObject> incomingQueue = new LinkedList<CommObject>();
47   private List<CommResponse> responseQueue = new LinkedList<CommResponse>();
48   private List<Long> responseQueueDates = new LinkedList<Long>();
49   private String threadNamePrefix = "";
50 
51   /**
52    * Create the object with a socket that will be handled.
53    */
54   public NetEndpoint(Socket socket, String threadNamePrefix)
55   {
56      this.socket=socket;
57      this.threadNamePrefix=threadNamePrefix;
58      // Establish streams and listener
59      try
60      {
61         // Streams
62         oOutput = new ObjectOutputStream(socket.getOutputStream());
63         oInput = new ObjectInputStream(socket.getInputStream());
64         // The listener will get new messages and insert them into
65         // the incoming queue.
66         Thread listenerThread = new Thread(new IncomingHandler());
67         listenerThread.setName(threadNamePrefix+" - Incoming");
68         listenerThread.setDaemon(true);
69         listenerThread.start();
70      } catch ( Exception e ) {
71         throw new CommException("error establishing net endpoint to: "+socket,e);
72      }
73   }
74 
75   /**
76    * Return whether endpoint is supposed to be connected.
77    * @return True if endpoint should be connected, false is it should
78    * be not yet connected, or it's closed.
79    */
80   public boolean isConnected()
81   {
82      return connected;
83   }
84 
85   /**
86    * Close this endpoint and release all resources.
87    */
88   public void close()
89   {
90      logger.debug("closing net endpoint");
91      if ( ! connected )
92         return;
93      connected=false;
94      try
95      {
96         // Disconnect
97         socket.close();
98      } catch ( Exception e ) {
99         logger.error("error while closing endpoint",e);
100      }
101      // Wake up incoming listener
102      synchronized ( incomingQueue )
103      {
104         incomingQueue.notifyAll();
105      }
106      // Wake up response waiting
107      synchronized ( responseQueue )
108      {
109         responseQueue.notifyAll();
110      }
111   }
112 
113   /**
114    * Override this method to handle error conditions in communication.
115    */
116   public void onError()
117   {
118   }
119 
120   /**
121    * Override this method to receive messages. Messages are delivered in
122    * a dedicated thread.
123    */
124   public void onIncoming(CommObject obj)
125   {
126   }
127 
128   /**
129    * Send and wait for response.
130    */
131   public CommResponse sendAndWaitForResponse(CommObject obj)
132   {
133      send(obj);
134      return waitForResponse(obj);
135   }
136 
137   /**
138    * Send a message to peer endpoint without waiting for anwser.
139    */
140   public void send(CommObject obj)
141   {
142      try
143      {
144         if ( logger.isDebugEnabled() )
145            logger.debug("sending object: "+obj);
146         synchronized ( oOutput )
147         {
148            oOutput.writeObject(obj);
149         }
150      } catch ( Exception e ) {
151         CommException wrapper = new CommException("communication error while sending: "+obj,e);
152         if ( isConnected() )
153            onError();
154         throw wrapper;
155      }
156   }
157 
158   /**
159    * Wait for a response for the given message.
160    */
161   public CommResponse waitForResponse(CommObject obj)
162   {
163      if ( logger.isDebugEnabled() )
164         logger.debug("waiting for response to object: "+obj);
165      try
166      {
167         synchronized ( responseQueue )
168         {
169            while ( connected )
170            {
171               // Search for the appropriate message
172               for ( int i=0; i<responseQueue.size(); i++ )
173               {
174                  CommResponse response = responseQueue.get(i);
175                  if ( response.getSessionId() == obj.getSessionId() )
176                  {
177                     responseQueueDates.remove(i);
178                     responseQueue.remove(i);
179                     if ( logger.isDebugEnabled() )
180                        logger.debug("wait terminated, response: "+response);
181                     return response;
182                  }
183               }
184               // If not found, wait for new entries
185               responseQueue.wait();
186            }
187         }
188      } catch ( InterruptedException e ) {
189         logger.debug("wait interrupted",e);
190      }
191      throw new CommException("there was no answer from server for message: "+obj);
192   }
193 
194   /**
195    * Handles incoming messages.
196    */
197   public class IncomingHandler implements Runnable
198   {
199      public void run()
200      {
201         try
202         {
203            CommObject obj = null;
204            while ( (connected) && ((obj=(CommObject)oInput.readObject()) != null) )
205            {
206               if ( logger.isDebugEnabled() )
207                  logger.debug("received object: "+obj);
208               // Place the message in a queue
209               if ( obj instanceof CommResponse )
210               {
211                  synchronized ( responseQueue )
212                  {
213                     long currentTime = System.currentTimeMillis();
214                     // Check for obsolete entries
215                     while ( (!responseQueueDates.isEmpty()) &&
216                        (responseQueueDates.get(0) + RESPONSE_TTL < currentTime) )
217                     {
218                        responseQueue.remove(0);
219                        responseQueueDates.remove(0);
220                     }
221                     // Add response to queue
222                     responseQueue.add((CommResponse)obj);
223                     responseQueueDates.add(currentTime);
224                     responseQueue.notifyAll();
225                  }
226               } else {
227                  synchronized ( incomingQueue )
228                  {
229                     incomingQueue.add(obj);
230                     incomingQueue.notify();
231                  }
232               }
233               // Now see whether we need more threads to handle messages.
234               // Note, the message could be at this point already under
235               // processing, but it is preferred to have one additional
236               // thread than one less than required.
237               synchronized ( processorMutex )
238               {
239                  if ( processorCount <= 0 )
240                  {
241                     Thread processingThread = new Thread(new ProcessingHandler());
242                     processingThread.setName(threadNamePrefix+" - Processor");
243                     processingThread.start();
244                     processorCount++;
245                  }
246               }
247            }
248         } catch ( Exception e ) {
249            // If not called from disconnect() then call error handler
250            if ( isConnected() )
251            {
252               logger.warn("error while listening for incoming messages, probably peer closed",e);
253               close();
254               onError();
255            } else {
256               logger.debug("endpoint was disconnected: "+e.getMessage());
257            }
258         }
259      }
260   }
261 
262   /**
263    * Process incoming objects.
264    */
265   public class ProcessingHandler implements Runnable
266   {
267      public void run()
268      {
269         try
270         {
271            while ( connected )
272            {
273               // Get a message from the queue
274               CommObject obj = null;
275               synchronized ( incomingQueue )
276               {
277                  while ( (connected) && (incomingQueue.isEmpty()) )
278                     incomingQueue.wait(PROCESSOR_IDLE_TTL);
279                  if ( incomingQueue.isEmpty() )
280                  {
281                     // This means either we are disconnected, or were idle
282                     return; // So exit
283                  }
284                  obj = incomingQueue.remove(0);
285               }
286               // Indicate less processors for now
287               synchronized ( processorMutex )
288               {
289                  processorCount--;
290               }
291               // Process message
292               try
293               {
294                  onIncoming(obj);
295               } catch ( StoreException e ) {
296                  logger.error("processing handler threw",e);
297               } finally {
298                  // Indicate processor as back to pool
299                  synchronized ( processorMutex )
300                  {
301                     processorCount++;
302                  }
303               }
304            }
305         } catch ( InterruptedException e ) {
306            logger.debug("endpoint processor interrupted, exiting",e);
307         } finally {
308            // Finally remove from free processor count
309            synchronized ( processorMutex )
310            {
311               processorCount--;
312            }
313         }
314      }
315   }
316 
317}
318 
319 

[all classes][hu.netmind.beankeeper.node.impl]
EMMA 2.0.5312debian (C) Vladimir Roubtsov