XML Feeds

.

[java-dev] Trying to create a mxj socket listener

Nick Rothwell nick at cassiel.com
Tue Oct 30 14:53:20 MDT 2007


On 30 Oct 2007, at 20:40, Timothy Dalbey wrote:

> First the SBOD then crash. Is this because of a threading issue?

Probably: I've written socket listeners for MXJ without any problems.

Here's a hack to an O'Reilly demo I did some years ago to demo how to  
build a TCP/IP server for MXJ. (Apologies to everyone else for the  
noise...)

---

// This example is from _Java Examples in a Nutshell_. (http:// 
www.oreilly.com)
// Copyright (c) 1997 by David Flanagan
// This example is provided WITHOUT ANY WARRANTY either expressed or  
implied.
// You may study, use, modify, and distribute it for non-commercial  
purposes.
// For any commercial use, see http://www.davidflanagan.com/javaexamples

package net.loadbang.mxj.demo;

import java.io.*;
import java.net.*;
import java.util.*;
import com.cycling74.max.*;

/**
* This class is a generic framework for a flexible, multi-threaded  
server.
* It listens on any number of specified ports, and, when it receives a
* connection on a port, passes input and output streams to a  
specified Service
* object which provides the actual service.  It can limit the number of
* concurrent connections, and logs activity to a specified stream.
**/
public class Server extends MaxObject {
   /**
    * A main() method for running the server as a standalone program.
    * The command-line arguments to the program should be pairs of  
servicenames
    * and port numbers.  For each pair, the program will dynamically  
load the
    * named Service class, instantiate it, and tell the server to  
provide that
    * Service on the specified port.  The special -control argument  
should be
    * followed by a password and port, and will start special server  
control
    * service running on the specified port, protected by the specified
    * password.
    **/
	
	/*
   public static void main(String[] args) {
     try {
       if (args.length < 2)  // Check number of arguments
         throw new IllegalArgumentException("Must start at least one  
service");

       // Create a Server object that uses standard out as its log and
       // has a limit of ten concurrent connections at once.
       Server s = new Server(System.out, 10);

       // Parse the argument list
       int i = 0;
       while(i < args.length) {
         if (args[i].equals("-control")) {    // Handle the -control  
argument
           i++;
           String password = args[i++];
           int port = Integer.parseInt(args[i++]);
           s.addService(new Control(s, password), port);  // add  
control service
         }
         else {
           // Otherwise start a named service on the specified port.
           // Dynamically load and instantiate a class that  
implements Service.
           String serviceName = args[i++];
           Class serviceClass = Class.forName(serviceName); //  
dynamic load
           Service service = (Service)serviceClass.newInstance(); //  
instantiate
           int port = Integer.parseInt(args[i++]);
           s.addService(service, port);
         }
       }
     }
     catch (Exception e) { // Display a message if anything goes wrong
       System.err.println("Server: " + e);
       System.err.println("Usage: java Server [-control <password>  
<port>] " +
                          "[<servicename> <port> ... ]");
       System.exit(1);
     }
   }*/

	public Server() {
		createInfoOutlet(false);
		declareTypedIO("A","M");
		init(null, 10);
	}
	
	public void serve(String name, int port)
		throws ClassNotFoundException,
			   InstantiationException,
			   IllegalAccessException,
			   IOException
	{
         Class serviceClass = Class.forName(name); // dynamic load
         Service service = (Service) serviceClass.newInstance(); //  
instantiate
	    addService(service, port);
	}


   // This is the state for the server
   ConnectionManager connectionManager; // The ConnectionManager object
   Hashtable<Integer, Listener> services;      // The current  
services and their ports
   ThreadGroup threadGroup;             // The threadgroup for all  
our threads
   PrintWriter logStream;               // Where we send our logging  
output to

   /**
    * This is the Server() constructor.  It must be passed a stream
    * to send log output to (may be null), and the limit on the  
number of
    * concurrent connections.  It creates and starts a ConnectionManager
    * thread which enforces this limit on connections.
    **/
   private void init(OutputStream logStream, int maxConnections) {
     log("Starting server");
     threadGroup = new ThreadGroup("Server");
     connectionManager = new ConnectionManager(threadGroup,  
maxConnections);
     connectionManager.start();
     services = new Hashtable<Integer, Listener>();
   }

   /** Write the specified string to the log */
   protected synchronized void log(String s) {
       outlet(0, "[" + s + "]");
     }

   /** Write the specified object to the log */
   protected void log(Object o) { log(o.toString()); }


   /**
    * This method makes the server start providing a new service.
    * It runs the specified Service object on the specified port.
    **/
   public void addService(Service service, int port) throws  
IOException {
     Integer key = new Integer(port);  // the hashtable key
     // Check whether a service is already on that port
     if (services.get(key) != null)
       throw new IllegalArgumentException("Port " + port + " already  
in use.");
     // Create a Listener object to listen for connections on the port
     Listener listener = new Listener(threadGroup, port, service);
     // Store it in the hashtable
     services.put(key, listener);
     // Log it
     log("Starting service " + service.getClass().getName() +
         " on port " + port);
     // Start the listener running.
     listener.start();
   }

   /**
    * This method makes the server stop providing a service on a port.
    * It does not terminate any pending connections to that service,  
merely
    * causes the server to stop accepting new connections
    **/
   public void removeService(int port) {
     Integer key = new Integer(port);  // hashtable key
     // Look up the Listener object for the port in the hashtable of  
services
     final Listener listener = (Listener) services.get(key);
     if (listener == null) return;
     // Ask the listener to stop
     listener.pleaseStop();
     // Remove it from the hashtable
     services.remove(key);
     // And log it.
     log("Stopping service " + listener.service.getClass().getName() +
         " on port " + port);
   }

   /**
    * This nested Thread subclass is a "listener".  It listens for  
connections
    * on a specified port (using a ServerSocket) and when it gets a  
connection
    * request, it calls a method of the ConnectionManager to accept  
(or reject)
    * the connection.  There is one Listener for each Service being  
provided
    * by the Server.  The Listener passes the Server object to the
    * ConnectionManager, but doesn't do anything with it itself.
    */
   public class Listener extends Thread {
     ServerSocket listen_socket;   // The socket we listen for  
connections on
     int port;                     // The port we're listening on
     Service service;              // The service to provide on that  
port
     boolean stop = false;         // Whether we've been asked to stop

     /**
      * The Listener constructor creates a thread for itself in the  
specified
      * threadgroup.  It creates a ServerSocket to listen for  
connections
      * on the specified port.  It arranges for the ServerSocket to be
      * interruptible, so that services can be removed from the server.
      **/
     public Listener(ThreadGroup group, int port, Service service)
          throws IOException {
       super(group, "Listener:" + port);
       listen_socket = new ServerSocket(port);
       // give the socket a non-zero timeout so accept() can be  
interrupted
       listen_socket.setSoTimeout(600000);
       this.port = port;
       this.service = service;
     }

     /** This is the nice way to get a Listener to stop accepting  
connections */
     public void pleaseStop() {
       this.stop = true;   // set the stop flag
       this.interrupt();   // and make the accept() call stop blocking
     }

     /**
      * A Listener is a Thread, and this is its body.
      * Wait for connection requests, accept them, and pass the  
socket on
      * to the ConnectionManager object of this server
      **/
     @Override
	public void run() {
       while(!stop) {      // loop until we're asked to stop.
         try {
           Socket client = listen_socket.accept();
           connectionManager.addConnection(client, service);
         }
         catch (InterruptedIOException e) {}
         catch (IOException e) {log(e);}
       }
     }
   }
   /**
    * This nested class  manages client connections for the server.
    * It maintains a list of current connections and enforces the
    * maximum connection limit.  It creates a separate thread (one per
    * server) that sits around and wait()s to be notify()'d that a  
connection
    * has terminated.  When this happens, it updates the list of  
connections.
    **/
   public class ConnectionManager extends Thread {
     int maxConnections;  // The maximum number of allowed connections
     Vector<Connection> connections;  // The current list of connections

     /**
      * Create a ConnectionManager in the specified thread group to  
enforce
      * the specified maximum connection limit.  Make it a daemon  
thread so
      * the interpreter won't wait around for it to exit.
      **/
     public ConnectionManager(ThreadGroup group, int maxConnections) {
       super(group, "ConnectionManager");
       this.setDaemon(true);
       this.maxConnections = maxConnections;
       connections = new Vector<Connection>(maxConnections);
       log("Starting connection manager.  Max connections: " +  
maxConnections);
     }

     /**
      * This is the method that Listener objects call when they accept a
      * connection from a client.  It either creates a Connection object
      * for the connection and adds it to the list of current  
connections,
      * or, if the limit on connections has been reached, it closes the
      * connection.
      **/
     synchronized void addConnection(Socket s, Service service) {
       // If the connection limit has been reached
       if (connections.size() >= maxConnections) {
         try {
           PrintWriter out = new PrintWriter(s.getOutputStream());
           // Then tell the client it is being rejected.
           out.println("Connection refused; " +
                       "server has reached maximum number of clients.");
           out.flush();
           // And close the connection to the rejected client.
           s.close();
           // And log it, of course
           log("Connection refused to " + s.getInetAddress 
().getHostAddress() +
               ":" + s.getPort() + ": max connections reached.");
         } catch (IOException e) {log(e);}
       }
       else {  // Otherwise, if the limit has not been reached
         // Create a Connection thread to handle this connection
         Connection c = new Connection(s, service);
         // Add it to the list of current connections
         connections.addElement(c);
         // Log this new connection
         log("Connected to " + s.getInetAddress().getHostAddress() +
             ":" + s.getPort() + " on port " + s.getLocalPort() +
             " for service " + service.getClass().getName());
         // And start the Connection thread running to provide the  
service
         c.start();
       }
     }

     /**
      * A Connection object calls this method just before it exits.
      * This method uses notify() to tell the ConnectionManager thread
      * to wake up and delete the thread that has exited.
      **/
     public synchronized void endConnection() { this.notify(); }

     /** Change the current connection limit */
     public synchronized void setMaxConnections(int max)  
{ maxConnections=max; }

     /**
      * Output the current list of connections to the specified stream.
      * This method is used by the Control service defined below.
      **/
     public synchronized void printConnections(PrintWriter out) {
       for(int i = 0; i < connections.size(); i++) {
         Connection c = (Connection)connections.elementAt(i);
         out.println("CONNECTED TO " +
                     c.client.getInetAddress().getHostAddress() + ":" +
                     c.client.getPort() + " ON PORT " +  
c.client.getLocalPort()+
                     " FOR SERVICE " + c.service.getClass().getName());
       }
     }

     /**
      * The ConnectionManager is a thread, and this is the body of that
      * thread.  While the ConnectionManager methods above are called  
by other
      * threads, this method is run in its own thread.  The job of  
this thread
      * is to keep the list of connections up to date by removing  
connections
      * that are no longer alive.  It uses wait() to block until  
notify()'d by
      * the endConnection() method.
      **/
     @Override
	public void run() {
       while(true) {  // infinite loop
         // Check through the list of connections, removing dead ones
         for(int i = 0; i < connections.size(); i++) {
           Connection c = (Connection)connections.elementAt(i);
           if (!c.isAlive()) {
             connections.removeElementAt(i);
             log("Connection to " + c.client.getInetAddress 
().getHostAddress() +
                 ":" + c.client.getPort() + " closed.");
           }
         }
         // Now wait to be notify()'d that a connection has exited
         // When we wake up we'll check the list of connections again.
         try { synchronized(this) { this.wait(); } }
         catch(InterruptedException e) {}
       }
     }
   }

   /**
    * This class is a subclass of Thread that handles an individual  
connection
    * between a client and a Service provided by this server.   
Because each
    * such connection has a thread of its own, each Service can have  
multiple
    * connections pending at once.  Despite all the other threads in  
use, this
    * is the key feature that makes this a multi-threaded server  
implementation.
    **/
   public class Connection extends Thread {
     Socket client;     // The socket to talk to the client through
     Service service;   // The service being provided to that client

     /**
      * This constructor just saves some state and calls the superclass
      * constructor to create a thread to handle the connection.   
Connection
      * objects are created by Listener threads.  These threads are  
part of
      * the server's ThreadGroup, so all Connection threads are part  
of that
      * group, too.
      **/
     public Connection(Socket client, Service service) {
       super("Server.Connection:" + client.getInetAddress 
().getHostAddress() +
             ":" + client.getPort());
       this.client = client;
       this.service = service;
     }

     /**
      * This is the body of each and every Connection thread.
      * All it does is pass the client input and output streams to the
      * serve() method of the specified Service object.  That method
      * is responsible for reading from and writing to those streams to
      * provide the actual service.  Recall that the Service object  
has been
      * passed from the Server.addService() method to a Listener object
      * to the ConnectionManager.addConnection() to this Connection  
object,
      * and is now finally getting used to provide the service.
      * Note that just before this thread exits it calls the
      * ConnectionManager.endConnection() method to wake up the
      * ConnectionManager thread so that it can remove this Connection
      * from its list of active connections.
      **/
     @Override
	public void run() {
       try {
         InputStream in = client.getInputStream();
         OutputStream out = client.getOutputStream();
         service.serve(in, out, itsLogger);
       }
       catch (IOException e) {log(e);}
       finally { connectionManager.endConnection(); }
     }
   }

   /** A logging (outlet) interface. */

   public interface Logger {
       public void log(String line);
   }
   /**
    * Here is the Service interface that we have seen so much of.
    * It defines only a single method which is invoked to provide the  
service.
    * serve() will be passed an input stream and an output stream to  
the client.
    * It should do whatever it wants with them, and should close them  
before
    * returning.
    *
    * All connections through the same port to this service share a  
single
    * Service object.  Thus, any state local to an individual  
connection must
    * be stored in local variables within the serve() method.  State  
that should
    * be global to all connections on the same port should be stored in
    * instance variables of the Service class.  If the same Service  
is running
    * on more than one port, there will typically be different  
Service instances
    * for each port.  Data that should be global to all connections  
on any port
    * should be stored in static variables.
    *
    * Note that implementations of this interface must have a no- 
argument
    * constructor if they are to be dynamically instantiated by the  
main()
    * method of the Server class.
    **/
   public interface Service {
     public void serve(InputStream in, OutputStream out, Logger  
logger) throws IOException;
   }

   private Logger itsLogger = new Logger() {
       public void log(String line) {
           outlet(0, line);
       }
   };

   /**
    * This service is an HTTP mirror, just like the HttpMirror class
    * implemented earlier in this chapter.  It echos back the client's
    * HTTP request
    **/
   public static class HTTPMirror implements Service {
     public void serve(InputStream i, OutputStream o, Logger logger)  
throws IOException {
       BufferedReader in = new BufferedReader(new InputStreamReader(i));
       PrintWriter out = new PrintWriter(new OutputStreamWriter(o));
       out.println("HTTP/1.0 200 ");
       out.println("Content-Type: text/plain");
       out.println();
       String line;
       while((line = in.readLine()) != null) {
         if (line.length() == 0) break;
         out.println(line);
       }
       out.close();
       in.close();
     }
   }

   /**
    * This service demonstrates how to maintain state across connections
    * by saving it in instance variables and using synchronized  
access to
    * those variables.  It maintains a count of how many clients have  
connected
    * and tells each client what number it is
    **/
   public static class UniqueID implements Service {
     public int id=0;
     public synchronized int nextId() { return id++; }
     public void serve(InputStream i, OutputStream o, Logger logger)  
throws IOException {
       PrintWriter out = new PrintWriter(new OutputStreamWriter(o));
       out.println("You are client #: " + nextId());
       out.close();
       i.close();
     }
   }

   /**
    * This is a non-trivial service.  It implements a command-based  
protocol
    * that gives password-protected runtime control over the  
operation of the
    * server.  See the main() method of the Server class to see how this
    * service is started.
    *
    * The recognized commands are:
    *   password:   give password; authorization is required for most  
commands
    *   add:        dynamically add a named service on a specified port
    *   remove:     dynamically remove the service running on a  
specified port
    *   max:        change the current maximum connection limit.
    *   status:     display current services, connections, and  
connection limit
    *   help:       display a help message
    *   quit:       disconnect
    *
    * This service displays a prompt, and sends all of its output to  
the user
    * in capital letters.  Only one client is allowed to connect to  
this service
    * at a time.
    **/
   public static class Control implements Service {
     Server server;             // The server we control
     String password;           // The password we require
     boolean connected = false; // Whether a client is already  
connected to us
     /**
      * Create a new Control service.  It will control the specified  
Server
      * object, and will require the specified password for  
authorization
      * Note that this Service does not have a no argument  
constructor, which
      * means that it cannot be dynamically instantiated and added as  
the other,
      * generic services above can be.
      **/
     public Control(Server server, String password) {
       this.server = server;
       this.password = password;
     }

     /**
      * This is the serve method that provides the service.  It reads  
a line
      * the client, and uses java.util.StringTokenizer to parse it into
      * commands and arguments.  It does various things depending on the
      * command.
      **/
     public void serve(InputStream i, OutputStream o, Logger logger)  
throws IOException {
       // Setup the streams
       BufferedReader in = new BufferedReader(new InputStreamReader(i));
       PrintWriter out = new PrintWriter(new OutputStreamWriter(o));
       String line;
       boolean authorized = false;  // Has the user has given the  
password yet?

       // If there is already a client connected to this service,  
display a
       // message to this client and close the connection.  We use a
       // synchronized block to prevent a race condition.
       synchronized(this) {
         if (connected) {
           out.println("ONLY ONE CONTROL CONNECTION ALLOWED AT A  
TIME.");
           out.close();
           return;
         }
         else connected = true;
       }

       for(;;) {  // infinite loop
         out.print("> ");           // Display a prompt
         out.flush();               // Make it appear right away
         line = in.readLine();      // Get the user's input
         if (line == null) break;   // Quit if we get EOF.
         try {
           // Use a StringTokenizer to parse the user's command
           StringTokenizer t = new StringTokenizer(line);
           if (!t.hasMoreTokens()) continue;  // if input was blank line
           // Get the first word of the input and convert to lower case
           String command = t.nextToken().toLowerCase();
           // Now compare it to each of the possible commands, doing the
           // appropriate thing for each command
           if (command.equals("password")) {       // Password command
             String p = t.nextToken();             // Get the next  
word of input
             if (p.equals(this.password)) {        // Does it equal  
the password
               out.println("OK");                  // Say so
               authorized = true;                  // Grant  
authorization
             }
             else out.println("INVALID PASSWORD"); // Otherwise fail
           }
           else if (command.equals("add")) {       // Add Service  
command
             // Check whether password has been given
             if (!authorized) out.println("PASSWORD REQUIRED");
             else {
               // Get the name of the service and try to dynamically  
load
               // and instantiate it.  Exceptions will be handled below
               String serviceName = t.nextToken();
               Class serviceClass = Class.forName(serviceName);
               Service service;
               try { service = (Service) serviceClass.newInstance(); }
               catch (NoSuchMethodError e) {
                 throw new IllegalArgumentException("Service must  
have a " +
                                                    "no-argument  
constructor");
               }
               int port = Integer.parseInt(t.nextToken());
               // If no exceptions occurred, add the service
               server.addService(service, port);
               out.println("SERVICE ADDED");      // acknowledge
             }
           }
           else if (command.equals("remove")) {   // Remove service  
command
             if (!authorized) out.println("PASSWORD REQUIRED");
             else {
               int port = Integer.parseInt(t.nextToken());  // get port
               server.removeService(port);     // remove the service  
on it
               out.println("SERVICE REMOVED"); // acknowledge
             }
           }
           else if (command.equals("max")) {      // Set max  
connection limit
             if (!authorized) out.println("PASSWORD REQUIRED");
             else {
               int max = Integer.parseInt(t.nextToken());        //  
get limit
               server.connectionManager.setMaxConnections(max);  //  
set limit
               out.println("MAX CONNECTIONS CHANGED");           //  
acknowledge
             }
           }
           else if (command.equals("status")) {    // Status Display  
command
             if (!authorized) out.println("PASSWORD REQUIRED");
             else {
               // Display a list of all services currently running
               Enumeration keys = server.services.keys();
               while(keys.hasMoreElements()) {
                 Integer port = (Integer) keys.nextElement();
                 Listener listener = (Listener)server.services.get 
(port);
                 out.println("SERVICE " + listener.service.getClass 
().getName()+
                             " ON PORT " + port);
               }
               // Display a list of all current connections
               server.connectionManager.printConnections(out);
               // Display the current connection limit
               out.println("MAX CONNECTIONS: " +
                           server.connectionManager.maxConnections);
             }
           }
           else if (command.equals("help")) {            // Help command
             // Display command syntax.  Password not required
             out.println("COMMANDS:\n" +
                         "\tpassword <password>\n" +
                         "\tadd <service> <port>\n" +
                         "\tremove <port>\n" +
                         "\tmax <max-connections>\n" +
                         "\tstatus\n" +
                         "\thelp\n" +
                         "\tquit");
           }
           else if (command.equals("quit")) break;    // Quit  
command.  Exit.
           else out.println("UNRECOGNIZED COMMAND");  // Unknown  
command error
         }
         catch (Exception e) {
           // If an exception occurred during the command, print an  
error
           // message, then output details of the exception.
           out.println("EXCEPTION WHILE PARSING OR EXECUTING COMMAND:");
           out.println(e);
         }
       }
       // Finally, when the loop command loop ends, close the streams
       // and set our connected flag to false so that other clients can
       // now connect.
       out.close();
       in.close();
       connected = false;
     }
   }
}


Nick Rothwell / Cassiel.com Limited
www.cassiel.com
www.myspace.com/cassieldotcom
www.last.fm/music/cassiel
www.loadbang.net





More information about the java-dev mailing list