/* * Salamander Transport Layer * * Michael J. Willis (violator@umich.edu) * * $Workfile: SalConnection.java $ * $Author: rmalan $ * $Date: 1997/07/16 14:20:39 $ * $Revision: 1.7 $ */ package Salamander; import java.net.*; import java.io.*; import java.util.*; /** * SalConnection is used to communicate with the Salamander transport * layer in order to receive data from a Salamander supplier. Information * pertaining to available and currently connected services (instruments) * can also be obtained with this class. * The constructor of SalConnection is private; obtain a SalConnection object * using the getSalConnection() method. */ public class SalConnection { /** * The host to which we are connected */ String m_hostName; /** * The port to which we are connected */ int m_TCPPort; /** * The UDP port to use for TCP request message */ int m_UDPPort; /** * Unique key values returned from the server; used to identify * the server uniquely */ int m_key; /** * Table in which the keys are service names that are currently subscribed, * and the value is the SalDataQueue object that should be called when a * message is received. */ Hashtable m_flowTable; /** * InputStream from the server. */ DataInputStream m_inputStream; /** * OutputStream to the server */ DataOutputStream m_outputStream; /** * Socket connection to the server */ Socket m_hostSocket; /** * Thread for dispatcher. */ SalPacketDispatcher m_dispatcher; /** * Do we feedback everything? */ boolean m_feedback; /** * Connects to the Salamander host identified by the hostName and UDPPort * given, and returns a SalConnection object for the connection. * @hostName The fully qualified name of the Salamander host machine. * @UDPPort The UDP port to use to connect to the host machine. * @returns A SalConnection object that is connected to the host, or null * if a connection could not be made. */ public static SalConnection getSalConnection( String hostName, int UDPPort ) { trace("getSalConnection("+hostName+","+UDPPort+")"); // check params if (null == hostName) { error("hostName is null in getSalConnection()"); return null; } if (0 == UDPPort) { error("UDPPort is 0 in getSalConnection()"); } // Instantiate the connection object SalConnection con = new SalConnection(); // Connect to the given host and port. if ( !con.connect(hostName, UDPPort) ) { error("Could not connect to "+hostName+" on port "+UDPPort); con = null; } return con; } /** * Connects to the given Salamander host using default UDP port */ public static SalConnection getSalConnection( String hostName ) { return getSalConnection(hostName, SAL.getDefaultUDPPort() ); } /** * Connects to the default host and port */ public static SalConnection getSalConnection() { return getSalConnection( SAL.getDefaultHostName(), SAL.getDefaultUDPPort() ); } /** * Constructor initializes internal state only. Use one of the connect() * methods to connect this object to a remote host. * Constructor is intentionally private; use SalConnection.getSalConnection() * to obtain an object. */ SalConnection() { m_flowTable = new Hashtable(); m_feedback = false; } /** * Connects the SalConnection object to the given host and UDP port. * @param hostName The name of the host to which to connect * @param UDPPort The UDP port to which to connect. * @returns true on success. */ boolean connect(String hostName, int UDPPort ) { if ( null == hostName ) { error("hostName is null in connect()"); return false; } if (0 == UDPPort) { error("UDPPort is 0 in connect()"); return false; } m_hostName = hostName; m_UDPPort = UDPPort; // Use UDP to request a TCP assignment from the host machine m_TCPPort = requestTCPPort( SAL.getDefaultTimeout()); if ( 0 == m_TCPPort ) { error("Could not obtain TCP port from host "+hostName); return false; } // Connect to the TCP port provided try { m_hostSocket = new Socket(m_hostName, m_TCPPort); } catch (Exception e) { error("Exception connecting to "+m_hostName+"("+m_TCPPort+"): "+e.getMessage()); return false; } // get input/output streams try { m_inputStream = new DataInputStream( m_hostSocket.getInputStream() ); m_outputStream = new DataOutputStream(m_hostSocket.getOutputStream()); // Write our key... This is junk! m_outputStream.writeInt (321456); } catch (Exception e) { error("Could not create input/output streams: "+e.getMessage()); return false; } //m_outputStream = new DataOutputStream( System.out ); // start up the reader thread m_dispatcher = new SalPacketDispatcher(this, SAL.getInterfaceVersion()); m_dispatcher.start(); return true; } /** * Connects the SalConnection object to the given host using the default hostName * and port. * @throws IOException If a connection could not be made. */ boolean connect() { return connect( SAL.getDefaultHostName(), SAL.getDefaultUDPPort() ); } /** * @returns The name of the remote host serving data to this SalConnction * object. Note that this is most likely not the host on which any supplied * data stream originates. */ public String getHost() { return m_hostName; } /** * @returns The port to which this SalConnection object is connected on * the remote host. */ public int getPort() { return m_TCPPort; } /** * Used to obtain all currently available services. * @returns An enumeration object containing a SalProperties object for * each available service. */ Enumeration getAvailableServices() { return null; } /** * Used to obtain all currently available services fitting the criteria * specified in the given SalProperties object. * Criteria are specified in the SalProperties object by using * regular expressions. * For example, to obtain information on all services having a name * beginning with "Radar", you would use the following: * * SalProperties prop = new SalProperties; * prop.setName("Radar*"); * Enumeration e = SalConnectionObject.getAvailableServices(prop); * @param props The SalProperties object containing information used to * decide when a service matches given criteria. * @returns An Enumeration object containing a SalProperties object for * each available service matching the given criteria. */ Enumeration getAvailableServices(SalProperties props) { error("SalConnection.getAvailableServices() not implemented."); return null; } /** * Used to obtain the properties for an available service given * the service's name. * Shorthand method to avoid overhead of calling getAvailableServices() * and pulling the SalProperties object out of the Enumeration. * @param serviceName The fully qualified name of the service whose * SalProperties are to be obtained. * @returns A SalProperties object containing for the given service, or * null<\em> if the service is not available. */ SalProperties getServiceProperties(String serviceName) { error("SalConnection.getServiceProperties() not implemented."); return null; } /** * Called when a client wishes to begin recieving data. * Any data packets that flow through the Salamander server that * match the criteria contained in the SalProperties object will be * forwarded to the object implementing the SalDataQueue. * To restrict the data flow to all data from a given service name, * use the other beginDataFlow() method. * @param props The SalProperties object representing criteria for * data packets that will be sent. * @param queue An object implementing SalDataQueue to receive each * data packet. * @returns a SalDataFlowTicket object that can be used to close this * data flow, as well as to obtain information about this data flow. * @throws SalException if data flow could not be instantiated. */ public SalDataFlowTicket startDataFlow(SalProperties props, SalDataQueue queue) throws SalException { error("SalConnection.startDataFlow(SalProperties, SalDataQueue) not implemented.\n"+ "Use SalConnection.startDataflow(String, SalDataQueue).\n"); return null; } /** * Called by a client to begin receiving all data packets from a given * named service. * For more control over what data to obtain, use the other beginDataFlow() * method. * @param serviceName The fully qualified name of the service to retrieve * data packets from. * @param queue An object implementing SalDataQueue to receive each * data packet. * @returns a SalDataFlowTicket object that can be used to close this * data flow, as well as to obtain information about this data flow. * @throws SalException if data flow could not be instantiated. */ public SalDataFlowTicket startDataFlow(String serviceName, SalDataQueue queue) throws SalException { trace("Beginning data flow for service: "+serviceName); // assume successful request, and put this in the flow table m_flowTable.put(serviceName, queue); if (serviceName.equals(SalProperties.ALL_SALAMANDER_DATA)) { m_feedback = true; } else { // send the request packet to the server SalPacket packet = new SalPacket(); packet.setMethod(SAL.SUBSCRIBE); packet.setName(serviceName); dispatch(packet); } return null; } /** * Stops the data flow whose Ticket is specified. * @param ticket The SalDataFlowTicket object that was returned from * startDataFlow(). */ public void stopDataFlow(SalDataFlowTicket ticket) { } /** * returns a SalDataFlowStats object with information on the data flow * whose ticket is passed. This object can contain information such * as total bytes sent, time of last send, etc. * @param ticket The SalDataFlowTicket obtained from startDataFlow() * @returns a SlaDataFlowStats object. */ SalDataFlowStats getDataFlowStats(SalDataFlowTicket ticket) { return null; } /** * Constructs a UDP DatagramPacket for requesting the proper * TCP port from the server * @returns a DatagramPacket object ready to be sent */ DatagramPacket buildTCPRequestPacket() { InetAddress iAddr; ByteArrayOutputStream os = new ByteArrayOutputStream(); DataOutputStream dos; try { dos = new DataOutputStream( os ); dos.writeInt(SAL.QUERY_TCP_PORT); } catch (IOException e) { error(e.getMessage()); } byte[] arr = os.toByteArray(); try { iAddr = InetAddress.getByName(m_hostName); } catch (UnknownHostException e) { error(e.getMessage()); return null; } trace("iNetAddress of host is "+iAddr); return new DatagramPacket( arr, arr.length, iAddr, m_UDPPort ); } /** * requests a TCP port assignment from the server. Handles details of * creating, sending, and receiving the necessary UDP packets. * Currently, no time out is installed. * @returns The TCP port obtained, or 0 on failure. */ int requestTCPPort(int maxTimeout) { DatagramSocket socket = null; int port=0; if (null == m_hostName) { error("m_hostName is null in requestTCPPort()"); return 0; } if (0 == m_UDPPort) { error("m_UDPPort is 0 in requestTCPPort()"); return 0; } // create the TCP request packet byte[] recvBuf = new byte[256]; DatagramPacket inPacket = new DatagramPacket (recvBuf, 12); DatagramPacket outPacket = buildTCPRequestPacket(); try { // bind to the socket socket = new DatagramSocket (); } catch (java.io.IOException e) { error("Could not create datagram socket in requestTCPPort()"); return 0; } // send it into the wild blue trace("Sending TCP request packet..."); trace("Address: "+outPacket.getAddress()+"; Port: "+outPacket.getPort()+"; Length: "+outPacket.getLength()); try { int timeout; // Set to true so if it falls through it will exit. boolean timeoutHappened = true; String version = System.getProperty("java.version"); System.out.println (version); if (version.startsWith("1.0")) { // Stuck with old yucky jdk, just blast 'em. int i; for (i = 0; i < 10; i++) { System.out.println("+"); socket.send(outPacket); } socket.receive(inPacket); timeoutHappened = false; } else { for (timeout = 1; timeout < maxTimeout; timeout *= 2) { // send request System.out.println("*"); socket.send (outPacket); socket.setSoTimeout (timeout * 1000); // get response timeoutHappened = false; try { socket.receive (inPacket); } catch (InterruptedIOException e) { timeoutHappened = true; } if (timeoutHappened == false) { break; } } } if (timeoutHappened == true) { error("Salamander connect timed out in requestTCPPort()"); return 0; } byte[] data = inPacket.getData(); trace("TCP request reply received. Length: "+inPacket.getLength()); trace("Sender: "+inPacket.getAddress()); trace("Port: "+inPacket.getPort()); DataInputStream in = new DataInputStream( new ByteArrayInputStream(data, 0, data.length) ); port = in.readShort(); // get the port number trace("Got TCP Port: "+port); in.readShort(); // read out filler m_key = in.readInt(); // read key trace("Got key1: "+m_key); } catch (Exception e) { error("Could not obtain TCP port: "+e.getMessage()); port = 0; } return port; } static void trace(String s) { SAL.trace("SalConnection: "+s); } static void error(String s) { SAL.error("SalConnection: "+s); } public void dispatch(SalPacket p) { trace("SalConnection.dispatch()"); try { m_outputStream.writeInt(m_key); m_outputStream.writeInt(SAL.getInterfaceVersion()); m_outputStream.writeInt(p.getPropLength()); p.serialize(m_outputStream); } catch (IOException e) { error("Exception serializing packet in dispatch()"); } } }