/*
* Salamander Transport Layer
*
* Michael J. Willis (violator@umich.edu)
*
* $Workfile: SalConnection.java $
* $Author: rmalan $
* $Date: 1997/07/16 14:20:39 $
* $Revision: 1.7 $
*/
package Salamander;
import java.net.*;
import java.io.*;
import java.util.*;
/**
* SalConnection is used to communicate with the Salamander transport
* layer in order to receive data from a Salamander supplier. Information
* pertaining to available and currently connected services (instruments)
* can also be obtained with this class.
* The constructor of SalConnection is private; obtain a SalConnection object
* using the getSalConnection() method.
*/
public class SalConnection {
/**
* The host to which we are connected
*/
String m_hostName;
/**
* The port to which we are connected
*/
int m_TCPPort;
/**
* The UDP port to use for TCP request message
*/
int m_UDPPort;
/**
* Unique key values returned from the server; used to identify
* the server uniquely
*/
int m_key;
/**
* Table in which the keys are service names that are currently subscribed,
* and the value is the SalDataQueue object that should be called when a
* message is received.
*/
Hashtable m_flowTable;
/**
* InputStream from the server.
*/
DataInputStream m_inputStream;
/**
* OutputStream to the server
*/
DataOutputStream m_outputStream;
/**
* Socket connection to the server
*/
Socket m_hostSocket;
/**
* Thread for dispatcher.
*/
SalPacketDispatcher m_dispatcher;
/**
* Do we feedback everything?
*/
boolean m_feedback;
/**
* Connects to the Salamander host identified by the hostName and UDPPort
* given, and returns a SalConnection object for the connection.
* @hostName The fully qualified name of the Salamander host machine.
* @UDPPort The UDP port to use to connect to the host machine.
* @returns A SalConnection object that is connected to the host, or null
* if a connection could not be made.
*/
public static SalConnection getSalConnection( String hostName, int UDPPort ) {
trace("getSalConnection("+hostName+","+UDPPort+")");
// check params
if (null == hostName) {
error("hostName is null in getSalConnection()");
return null;
}
if (0 == UDPPort) {
error("UDPPort is 0 in getSalConnection()");
}
// Instantiate the connection object
SalConnection con = new SalConnection();
// Connect to the given host and port.
if ( !con.connect(hostName, UDPPort) ) {
error("Could not connect to "+hostName+" on port "+UDPPort);
con = null;
}
return con;
}
/**
* Connects to the given Salamander host using default UDP port
*/
public static SalConnection getSalConnection( String hostName ) {
return getSalConnection(hostName, SAL.getDefaultUDPPort() );
}
/**
* Connects to the default host and port
*/
public static SalConnection getSalConnection() {
return getSalConnection( SAL.getDefaultHostName(), SAL.getDefaultUDPPort() );
}
/**
* Constructor initializes internal state only. Use one of the connect()
* methods to connect this object to a remote host.
* Constructor is intentionally private; use SalConnection.getSalConnection()
* to obtain an object.
*/
SalConnection() {
m_flowTable = new Hashtable();
m_feedback = false;
}
/**
* Connects the SalConnection object to the given host and UDP port.
* @param hostName The name of the host to which to connect
* @param UDPPort The UDP port to which to connect.
* @returns true on success.
*/
boolean connect(String hostName, int UDPPort ) {
if ( null == hostName ) {
error("hostName is null in connect()");
return false;
}
if (0 == UDPPort) {
error("UDPPort is 0 in connect()");
return false;
}
m_hostName = hostName;
m_UDPPort = UDPPort;
// Use UDP to request a TCP assignment from the host machine
m_TCPPort = requestTCPPort( SAL.getDefaultTimeout());
if ( 0 == m_TCPPort ) {
error("Could not obtain TCP port from host "+hostName);
return false;
}
// Connect to the TCP port provided
try {
m_hostSocket = new Socket(m_hostName, m_TCPPort);
} catch (Exception e) {
error("Exception connecting to "+m_hostName+"("+m_TCPPort+"): "+e.getMessage());
return false;
}
// get input/output streams
try {
m_inputStream = new DataInputStream( m_hostSocket.getInputStream() );
m_outputStream = new DataOutputStream(m_hostSocket.getOutputStream());
// Write our key... This is junk!
m_outputStream.writeInt (321456);
} catch (Exception e) {
error("Could not create input/output streams: "+e.getMessage());
return false;
}
//m_outputStream = new DataOutputStream( System.out );
// start up the reader thread
m_dispatcher = new SalPacketDispatcher(this,
SAL.getInterfaceVersion());
m_dispatcher.start();
return true;
}
/**
* Connects the SalConnection object to the given host using the default hostName
* and port.
* @throws IOException If a connection could not be made.
*/
boolean connect() {
return connect( SAL.getDefaultHostName(), SAL.getDefaultUDPPort() );
}
/**
* @returns The name of the remote host serving data to this SalConnction
* object. Note that this is most likely not the host on which any supplied
* data stream originates.
*/
public String getHost() {
return m_hostName;
}
/**
* @returns The port to which this SalConnection object is connected on
* the remote host.
*/
public int getPort() {
return m_TCPPort;
}
/**
* Used to obtain all currently available services.
* @returns An enumeration object containing a SalProperties object for
* each available service.
*/
Enumeration getAvailableServices() {
return null;
}
/**
* Used to obtain all currently available services fitting the criteria
* specified in the given SalProperties object.
* Criteria are specified in the SalProperties object by using
* regular expressions.
* For example, to obtain information on all services having a name
* beginning with "Radar", you would use the following:
*
* SalProperties prop = new SalProperties;
* prop.setName("Radar*");
* Enumeration e = SalConnectionObject.getAvailableServices(prop);
* @param props The SalProperties object containing information used to
* decide when a service matches given criteria.
* @returns An Enumeration object containing a SalProperties object for
* each available service matching the given criteria.
*/
Enumeration getAvailableServices(SalProperties props) {
error("SalConnection.getAvailableServices() not implemented.");
return null;
}
/**
* Used to obtain the properties for an available service given
* the service's name.
* Shorthand method to avoid overhead of calling getAvailableServices()
* and pulling the SalProperties object out of the Enumeration.
* @param serviceName The fully qualified name of the service whose
* SalProperties are to be obtained.
* @returns A SalProperties object containing for the given service, or
* null<\em> if the service is not available.
*/
SalProperties getServiceProperties(String serviceName) {
error("SalConnection.getServiceProperties() not implemented.");
return null;
}
/**
* Called when a client wishes to begin recieving data.
* Any data packets that flow through the Salamander server that
* match the criteria contained in the SalProperties object will be
* forwarded to the object implementing the SalDataQueue.
* To restrict the data flow to all data from a given service name,
* use the other beginDataFlow() method.
* @param props The SalProperties object representing criteria for
* data packets that will be sent.
* @param queue An object implementing SalDataQueue to receive each
* data packet.
* @returns a SalDataFlowTicket object that can be used to close this
* data flow, as well as to obtain information about this data flow.
* @throws SalException if data flow could not be instantiated.
*/
public SalDataFlowTicket startDataFlow(SalProperties props, SalDataQueue queue)
throws SalException {
error("SalConnection.startDataFlow(SalProperties, SalDataQueue) not implemented.\n"+
"Use SalConnection.startDataflow(String, SalDataQueue).\n");
return null;
}
/**
* Called by a client to begin receiving all data packets from a given
* named service.
* For more control over what data to obtain, use the other beginDataFlow()
* method.
* @param serviceName The fully qualified name of the service to retrieve
* data packets from.
* @param queue An object implementing SalDataQueue to receive each
* data packet.
* @returns a SalDataFlowTicket object that can be used to close this
* data flow, as well as to obtain information about this data flow.
* @throws SalException if data flow could not be instantiated.
*/
public SalDataFlowTicket startDataFlow(String serviceName, SalDataQueue queue)
throws SalException {
trace("Beginning data flow for service: "+serviceName);
// assume successful request, and put this in the flow table
m_flowTable.put(serviceName, queue);
if (serviceName.equals(SalProperties.ALL_SALAMANDER_DATA)) {
m_feedback = true;
} else {
// send the request packet to the server
SalPacket packet = new SalPacket();
packet.setMethod(SAL.SUBSCRIBE);
packet.setName(serviceName);
dispatch(packet);
}
return null;
}
/**
* Stops the data flow whose Ticket is specified.
* @param ticket The SalDataFlowTicket object that was returned from
* startDataFlow().
*/
public void stopDataFlow(SalDataFlowTicket ticket) {
}
/**
* returns a SalDataFlowStats object with information on the data flow
* whose ticket is passed. This object can contain information such
* as total bytes sent, time of last send, etc.
* @param ticket The SalDataFlowTicket obtained from startDataFlow()
* @returns a SlaDataFlowStats object.
*/
SalDataFlowStats getDataFlowStats(SalDataFlowTicket ticket) {
return null;
}
/**
* Constructs a UDP DatagramPacket for requesting the proper
* TCP port from the server
* @returns a DatagramPacket object ready to be sent
*/
DatagramPacket buildTCPRequestPacket() {
InetAddress iAddr;
ByteArrayOutputStream os = new ByteArrayOutputStream();
DataOutputStream dos;
try {
dos = new DataOutputStream( os );
dos.writeInt(SAL.QUERY_TCP_PORT);
} catch (IOException e) {
error(e.getMessage());
}
byte[] arr = os.toByteArray();
try {
iAddr = InetAddress.getByName(m_hostName);
} catch (UnknownHostException e) {
error(e.getMessage());
return null;
}
trace("iNetAddress of host is "+iAddr);
return new DatagramPacket( arr, arr.length, iAddr, m_UDPPort );
}
/**
* requests a TCP port assignment from the server. Handles details of
* creating, sending, and receiving the necessary UDP packets.
* Currently, no time out is installed.
* @returns The TCP port obtained, or 0 on failure.
*/
int requestTCPPort(int maxTimeout) {
DatagramSocket socket = null;
int port=0;
if (null == m_hostName) {
error("m_hostName is null in requestTCPPort()");
return 0;
}
if (0 == m_UDPPort) {
error("m_UDPPort is 0 in requestTCPPort()");
return 0;
}
// create the TCP request packet
byte[] recvBuf = new byte[256];
DatagramPacket inPacket = new DatagramPacket (recvBuf, 12);
DatagramPacket outPacket = buildTCPRequestPacket();
try {
// bind to the socket
socket = new DatagramSocket ();
}
catch (java.io.IOException e)
{
error("Could not create datagram socket in requestTCPPort()");
return 0;
}
// send it into the wild blue
trace("Sending TCP request packet...");
trace("Address: "+outPacket.getAddress()+"; Port: "+outPacket.getPort()+"; Length: "+outPacket.getLength());
try
{
int timeout;
// Set to true so if it falls through it will exit.
boolean timeoutHappened = true;
String version = System.getProperty("java.version");
System.out.println (version);
if (version.startsWith("1.0")) {
// Stuck with old yucky jdk, just blast 'em.
int i;
for (i = 0; i < 10; i++) {
System.out.println("+");
socket.send(outPacket);
}
socket.receive(inPacket);
timeoutHappened = false;
} else {
for (timeout = 1; timeout < maxTimeout; timeout *= 2) {
// send request
System.out.println("*");
socket.send (outPacket);
socket.setSoTimeout (timeout * 1000);
// get response
timeoutHappened = false;
try
{
socket.receive (inPacket);
}
catch (InterruptedIOException e) {
timeoutHappened = true;
}
if (timeoutHappened == false) {
break;
}
}
}
if (timeoutHappened == true) {
error("Salamander connect timed out in requestTCPPort()");
return 0;
}
byte[] data = inPacket.getData();
trace("TCP request reply received. Length: "+inPacket.getLength());
trace("Sender: "+inPacket.getAddress());
trace("Port: "+inPacket.getPort());
DataInputStream in = new DataInputStream( new ByteArrayInputStream(data, 0, data.length) );
port = in.readShort(); // get the port number
trace("Got TCP Port: "+port);
in.readShort(); // read out filler
m_key = in.readInt(); // read key
trace("Got key1: "+m_key);
} catch (Exception e) {
error("Could not obtain TCP port: "+e.getMessage());
port = 0;
}
return port;
}
static void trace(String s) {
SAL.trace("SalConnection: "+s);
}
static void error(String s) {
SAL.error("SalConnection: "+s);
}
public void dispatch(SalPacket p) {
trace("SalConnection.dispatch()");
try {
m_outputStream.writeInt(m_key);
m_outputStream.writeInt(SAL.getInterfaceVersion());
m_outputStream.writeInt(p.getPropLength());
p.serialize(m_outputStream);
} catch (IOException e) {
error("Exception serializing packet in dispatch()");
}
}
}