ECHO Fixes

git-svn-id: http://svn.igniterealtime.org/svn/repos/smack/trunk@7473 b35dd754-fafc-0310-a699-88a17e54d16e
This commit is contained in:
Thiago Camargo 2007-03-12 19:09:19 +00:00 committed by thiago
parent a078606ab4
commit 3787c7a8af
9 changed files with 159 additions and 203 deletions

View File

@ -124,7 +124,7 @@ public abstract class JingleSession extends JingleNegotiator {
* @param jingleMediaManager the jingleMediaManager
*/
protected JingleSession(XMPPConnection conn, String initiator, String responder,
String sessionid, JingleMediaManager jingleMediaManager) {
String sessionid, JingleMediaManager jingleMediaManager) {
super(conn);
this.mediaNeg = null;
@ -389,25 +389,21 @@ public abstract class JingleSession extends JingleNegotiator {
if (invalidState()) {
throw new IllegalStateException(
"Illegal state in dispatch packet in Session manager.");
}
else {
} else {
if (iq == null) {
// If there is no input packet, then we must be inviting...
jout = getState().eventInvite();
}
else {
} else {
if (iq.getType().equals(IQ.Type.ERROR)) {
// Process errors
getState().eventError(iq);
}
else if (iq.getType().equals(IQ.Type.RESULT)) {
} else if (iq.getType().equals(IQ.Type.RESULT)) {
// Process ACKs
if (isExpectedId(iq.getPacketID())) {
jout = getState().eventAck(iq);
removeExpectedId(iq.getPacketID());
}
}
else if (iq instanceof Jingle) {
} else if (iq instanceof Jingle) {
// It is not an error: it is a Jingle packet...
Jingle jin = (Jingle) iq;
Jingle.Action action = jin.getAction();
@ -415,22 +411,17 @@ public abstract class JingleSession extends JingleNegotiator {
if (action != null) {
if (action.equals(Jingle.Action.SESSIONACCEPT)) {
jout = getState().eventAccept(jin);
}
else if (action.equals(Jingle.Action.SESSIONINFO)) {
} else if (action.equals(Jingle.Action.SESSIONINFO)) {
jout = getState().eventInfo(jin);
}
else if (action.equals(Jingle.Action.SESSIONINITIATE)) {
} else if (action.equals(Jingle.Action.SESSIONINITIATE)) {
if (getState() != null)
jout = getState().eventInitiate(jin);
}
else if (action.equals(Jingle.Action.SESSIONREDIRECT)) {
} else if (action.equals(Jingle.Action.SESSIONREDIRECT)) {
jout = getState().eventRedirect(jin);
}
else if (action.equals(Jingle.Action.SESSIONTERMINATE)) {
} else if (action.equals(Jingle.Action.SESSIONTERMINATE)) {
jout = getState().eventTerminate(jin);
}
}
else {
} else {
jout = errorMalformedStanza(iq);
}
}
@ -515,7 +506,7 @@ public abstract class JingleSession extends JingleNegotiator {
* @return the new Jingle packet
*/
private Jingle sendJingleParts(IQ iq, Jingle jSes, Jingle jDesc,
Jingle jTrans) {
Jingle jTrans) {
Jingle response = null;
if (jSes != null) {
@ -523,8 +514,7 @@ public abstract class JingleSession extends JingleNegotiator {
jSes.addTransports(jTrans.getTransportsList());
response = sendFormattedJingle(iq, jSes);
}
else {
} else {
// If we don't have a valid session message, then we must send
// separated messages for transport and jmf...
if (jDesc != null) {
@ -590,8 +580,7 @@ public abstract class JingleSession extends JingleNegotiator {
if (jout.getTo() == null) {
if (iq != null) {
jout.setTo(iq.getFrom());
}
else {
} else {
jout.setTo(other);
}
}
@ -599,8 +588,7 @@ public abstract class JingleSession extends JingleNegotiator {
if (jout.getFrom() == null) {
if (iq != null) {
jout.setFrom(iq.getTo());
}
else {
} else {
jout.setFrom(me);
}
}
@ -693,8 +681,7 @@ public abstract class JingleSession extends JingleNegotiator {
if (jda.size() > 1) {
throw new XMPPException(
"Unsupported feature: the number of accepted content descriptions is greater than 1.");
}
else if (jda.size() == 1) {
} else if (jda.size() == 1) {
JingleContentDescription jd = (JingleContentDescription) jda.get(0);
if (jd.getJinglePayloadTypesCount() > 1) {
throw new XMPPException(
@ -726,15 +713,13 @@ public abstract class JingleSession extends JingleNegotiator {
if (jta.size() > 1) {
throw new XMPPException(
"Unsupported feature: the number of accepted transports is greater than 1.");
}
else if (jta.size() == 1) {
} else if (jta.size() == 1) {
org.jivesoftware.smackx.packet.JingleTransport jt = (org.jivesoftware.smackx.packet.JingleTransport) jta.get(0);
if (jt.getCandidatesCount() > 1) {
throw new XMPPException(
"Unsupported feature: the number of accepted transport candidates is greater than 1.");
}
else if (jt.getCandidatesCount() == 1) {
} else if (jt.getCandidatesCount() == 1) {
JingleTransportCandidate jtc = (JingleTransportCandidate) jt
.getCandidatesList().get(0);
acceptedLocalCandidate = jtc.getMediaTransport();
@ -776,8 +761,7 @@ public abstract class JingleSession extends JingleNegotiator {
if (other.initiator != null) {
return false;
}
}
else if (!initiator.equals(other.initiator)) {
} else if (!initiator.equals(other.initiator)) {
//Todo check behavior
// return false;
}
@ -786,8 +770,7 @@ public abstract class JingleSession extends JingleNegotiator {
if (other.responder != null) {
return false;
}
}
else if (!responder.equals(other.responder)) {
} else if (!responder.equals(other.responder)) {
return false;
}
@ -795,8 +778,7 @@ public abstract class JingleSession extends JingleNegotiator {
if (other.sid != null) {
return false;
}
}
else if (!sid.equals(other.sid)) {
} else if (!sid.equals(other.sid)) {
return false;
}
@ -940,14 +922,12 @@ public abstract class JingleSession extends JingleNegotiator {
System.out.println("Ignored Jingle(INI): " + iq.toXML());
return false;
}
}
else {
} else {
// We accept some non-Jingle IQ packets: ERRORs and ACKs
if (iq.getType().equals(IQ.Type.SET)) {
System.out.println("Ignored Jingle(TYPE): " + iq.toXML());
return false;
}
else if (iq.getType().equals(IQ.Type.GET)) {
} else if (iq.getType().equals(IQ.Type.GET)) {
System.out.println("Ignored Jingle(TYPE): " + iq.toXML());
return false;
}
@ -1013,6 +993,9 @@ public abstract class JingleSession extends JingleNegotiator {
* Trigger a session closed event.
*/
protected void triggerSessionClosed(String reason) {
for (TransportCandidate candidate : this.getTransportNeg().getOfferedCandidates())
candidate.removeCandidateEcho();
ArrayList listeners = getListenersList();
Iterator iter = listeners.iterator();
while (iter.hasNext()) {
@ -1033,6 +1016,8 @@ public abstract class JingleSession extends JingleNegotiator {
* Trigger a session closed event due to an error.
*/
protected void triggerSessionClosedOnError(XMPPException exc) {
for (TransportCandidate candidate : this.getTransportNeg().getOfferedCandidates())
candidate.removeCandidateEcho();
ArrayList listeners = getListenersList();
Iterator iter = listeners.iterator();
while (iter.hasNext()) {
@ -1053,7 +1038,7 @@ public abstract class JingleSession extends JingleNegotiator {
* Trigger a session established event.
*/
protected void triggerSessionEstablished(PayloadType pt,
TransportCandidate rc, TransportCandidate lc) {
TransportCandidate rc, TransportCandidate lc) {
ArrayList listeners = getListenersList();
Iterator iter = listeners.iterator();
while (iter.hasNext()) {
@ -1109,6 +1094,8 @@ public abstract class JingleSession extends JingleNegotiator {
sli.sessionDeclined(reason, this);
}
}
for (TransportCandidate candidate : this.getTransportNeg().getOfferedCandidates())
candidate.removeCandidateEcho();
}
/**
@ -1164,7 +1151,7 @@ public abstract class JingleSession extends JingleNegotiator {
* @return The created IQ packet.
*/
public static IQ createIQ(String ID, String to, String from,
IQ.Type type) {
IQ.Type type) {
IQ iqPacket = new IQ() {
public String getChildElementXML() {
return null;
@ -1190,7 +1177,7 @@ public abstract class JingleSession extends JingleNegotiator {
* @return The created IQ packet.
*/
public static IQ createError(String ID, String to, String from,
int errCode, String errStr) {
int errCode, String errStr) {
IQ iqError = createIQ(ID, to, from, IQ.Type.ERROR);
XMPPError error = new XMPPError(new XMPPError.Condition(errStr));

View File

@ -250,6 +250,8 @@ public class ICECandidate extends TransportCandidate implements Comparable {
//TODO candidate is being checked trigger
//candidatesChecking.add(cand);
final ICECandidate checkingCandidate = this;
Thread checkThread = new Thread(new Runnable() {
public void run() {
@ -264,8 +266,8 @@ public class ICECandidate extends TransportCandidate implements Comparable {
}
ResultListener resultListener = new ResultListener() {
public void testFinished(TestResult testResult) {
if (testResult.isReachable()) {
public void testFinished(TestResult testResult, TransportCandidate candidate) {
if (testResult.isReachable() && checkingCandidate.equals(candidate)) {
result.setResult(true);
}
}
@ -276,11 +278,11 @@ public class ICECandidate extends TransportCandidate implements Comparable {
if (echo != null) {
if (candidate instanceof ICECandidate) {
ICECandidate iceCandidate = (ICECandidate) candidate;
if (!iceCandidate.getType().equals("relay")) {
if (iceCandidate.getType().equals(getType())) {
try {
echo.addResultListener(resultListener);
InetAddress address = InetAddress.getByName(getIp());
echo.testASync(address, getPort());
echo.testASync(checkingCandidate, getPassword());
}
catch (UnknownHostException e) {
e.printStackTrace();
@ -416,5 +418,6 @@ public class ICECandidate extends TransportCandidate implements Comparable {
}
return 0;
}
}

View File

@ -102,7 +102,7 @@ public class ICEResolver extends TransportResolver {
else
typeString = "host";
TransportCandidate transportCandidate = new ICECandidate(candidate.getAddress().getInetAddress().getHostAddress(), 1, candidate.getNetwork(), "1", candidate.getPort(), "1", candidate.getPriority(), typeString);
TransportCandidate transportCandidate = new ICECandidate(candidate.getAddress().getInetAddress().getHostAddress(), 1, candidate.getNetwork(), String.valueOf(Math.abs(random.nextLong())), candidate.getPort(), "1", candidate.getPriority(), typeString);
transportCandidate.setLocalIp(candidate.getBase().getAddress().getInetAddress().getHostAddress());
transportCandidate.setPort(getFreePort());
try {
@ -143,11 +143,11 @@ public class ICEResolver extends TransportResolver {
RTPBridge rtpBridge = RTPBridge.getRTPBridge(connection, String.valueOf(sid));
TransportCandidate localCandidate = new ICECandidate(
rtpBridge.getIp(), 1, network, "1", rtpBridge.getPortA(), "1", 0, "relay");
rtpBridge.getIp(), 1, network, String.valueOf(Math.abs(random.nextLong())), rtpBridge.getPortA(), "1", 0, "relay");
localCandidate.setLocalIp(localIp);
TransportCandidate remoteCandidate = new ICECandidate(
rtpBridge.getIp(), 1, network, "1", rtpBridge.getPortB(), "1", 0, "relay");
rtpBridge.getIp(), 1, network, String.valueOf(Math.abs(random.nextLong())), rtpBridge.getPortB(), "1", 0, "relay");
remoteCandidate.setLocalIp(localIp);
localCandidate.setSymmetric(remoteCandidate);

View File

@ -27,6 +27,6 @@ package org.jivesoftware.smackx.jingle.nat;
*/
public interface ResultListener {
public void testFinished(TestResult result);
public void testFinished(TestResult result, TransportCandidate candidate);
}

View File

@ -54,12 +54,14 @@ package org.jivesoftware.smackx.jingle.nat;
/**
* Result of an ECHO Test
*
*
* @author Thiago Camargo
*/
public class TestResult {
private boolean result = false;
private String ip = null;
private int port = 0;
public boolean isReachable() {
return result;
@ -68,5 +70,21 @@ public class TestResult {
public void setResult(boolean result) {
this.result = result;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
}

View File

@ -56,6 +56,7 @@ import org.jivesoftware.smack.XMPPConnection;
import org.jivesoftware.smackx.jingle.JingleSession;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.*;
import java.util.ArrayList;
import java.util.List;
@ -660,42 +661,37 @@ public abstract class TransportCandidate {
List<ResultListener> resultListeners = new ArrayList<ResultListener>();
boolean enabled = true;
boolean ended = false;
long tries = 10;
long tries = 2;
TransportCandidate candidate = null;
public CandidateEcho(TransportCandidate candidate, JingleSession session) throws UnknownHostException, SocketException {
this.socket = new DatagramSocket(candidate.getPort(), InetAddress.getByName(candidate.getLocalIp()));
this.localUser = session.getInitiator();
this.remoteUser = session.getResponder();
this.id = session.getSid();
this.candidate = candidate;
int keySplitIndex = ((int) Math.ceil(((float) id.length()) / 2));
String local = id.substring(0, keySplitIndex) + ";" + localUser;
String remote = id.substring(keySplitIndex) + ";" + remoteUser;
int size = 4 + localUser.length() * 2 + (id.length() - keySplitIndex) * 2;
ByteBuffer bufLocal = ByteBuffer.allocate(size);
// Create a character ByteBuffer Wrap
CharBuffer cbufLocal = bufLocal.asCharBuffer();
cbufLocal.append(id.substring(0, keySplitIndex));
cbufLocal.append(';');
cbufLocal.append(localUser);
try {
if (session.getConnection().getUser().equals(session.getInitiator())) {
size = 4 + remoteUser.length() * 2 + keySplitIndex * 2;
ByteBuffer bufRemote = ByteBuffer.allocate(size);
// Create a character ByteBuffer Wrap
CharBuffer cbufRemote = bufRemote.asCharBuffer();
cbufRemote.append(id.substring(keySplitIndex));
cbufRemote.append(';');
cbufRemote.append(remoteUser);
if (session.getConnection().getUser().equals(session.getInitiator())) {
this.send = bufLocal.array();
this.receive = bufRemote.array();
this.send = local.getBytes("UTF-8");
this.receive = remote.getBytes("UTF-8");
}
else {
this.receive = local.getBytes("UTF-8");
this.send = remote.getBytes("UTF-8");
}
}
else {
this.receive = bufLocal.array();
this.send = bufRemote.array();
catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
public void run() {
@ -703,26 +699,51 @@ public abstract class TransportCandidate {
System.out.println("Listening for ECHO: " + socket.getLocalAddress().getHostAddress() + ":" + socket.getLocalPort());
while (true) {
DatagramPacket packet = new DatagramPacket(new byte[this.receive.length], this.receive.length);
DatagramPacket packet = new DatagramPacket(new byte[150], 150);
socket.receive(packet);
//System.out.println("ECHO Packet Received in: " + socket.getLocalAddress().getHostAddress() + ":" + socket.getLocalPort() + " From: " + packet.getAddress().getHostAddress() + ":" + packet.getPort());
boolean reply = false;
ByteBuffer buf = ByteBuffer.wrap(packet.getData());
byte[] content = new byte[packet.getLength()];
buf = buf.get(content, 0, packet.getLength());
packet.setData(content);
for (DatagramListener listener : listeners) {
listener.datagramReceived(packet);
reply = listener.datagramReceived(packet);
if (reply) break;
}
packet.setAddress(packet.getAddress());
packet.setPort(packet.getPort());
long delay = 1000 / tries / 2;
long delay = 200 / tries / 2;
if (delay < 0) delay = 10;
if (Arrays.equals(packet.getData(), receive))
String str[] = new String(packet.getData(), "UTF-8").split(";");
String pass = str[0];
String address[] = str[1].split(":");
String ip = address[0];
String port = address[1];
if (pass.equals(candidate.getPassword())) {
byte[] cont = null;
try {
cont = (password + ";" + candidate.getIp() + ":" + candidate.getPort()).getBytes("UTF-8");
}
catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
packet.setData(cont);
packet.setLength(cont.length);
packet.setAddress(InetAddress.getByName(ip));
packet.setPort(Integer.parseInt(port));
for (int i = 0; i < tries; i++) {
packet.setData(send);
packet.setLength(send.length);
socket.send(packet);
if (!enabled) break;
try {
@ -732,6 +753,7 @@ public abstract class TransportCandidate {
e.printStackTrace();
}
}
}
}
}
catch (UnknownHostException uhe) {
@ -753,66 +775,12 @@ public abstract class TransportCandidate {
socket.close();
}
private void fireTestResult(TestResult testResult) {
private void fireTestResult(TestResult testResult, TransportCandidate candidate) {
for (ResultListener resultListener : resultListeners)
resultListener.testFinished(testResult);
resultListener.testFinished(testResult, candidate);
}
public boolean test(final InetAddress address, final int port) {
return test(address, port, 2000);
}
public boolean test(final InetAddress address, final int port, int timeout) {
ended = false;
final TestResult testResult = new TestResult();
DatagramListener listener = new DatagramListener() {
public boolean datagramReceived(DatagramPacket datagramPacket) {
if (datagramPacket.getAddress().equals(address) && datagramPacket.getPort() == port) {
if (Arrays.equals(datagramPacket.getData(), receive)) {
testResult.setResult(true);
ended = true;
return true;
}
}
return false;
}
};
this.addListener(listener);
DatagramPacket packet = new DatagramPacket(send, send.length);
packet.setAddress(address);
packet.setPort(port);
long delay = timeout / tries;
if (delay < 0) delay = 10;
try {
for (int i = 0; i < tries; i++) {
socket.send(packet);
if (ended) break;
try {
Thread.sleep(delay);
}
catch (InterruptedException e) {
e.printStackTrace();
}
}
}
catch (IOException e) {
// Do Nothing
}
this.removeListener(listener);
return testResult.isReachable();
}
public void testASync(final InetAddress address, final int port) {
public void testASync(final TransportCandidate candidate, final String password) {
Thread thread = new Thread(new Runnable() {
@ -820,25 +788,52 @@ public abstract class TransportCandidate {
DatagramListener listener = new DatagramListener() {
public boolean datagramReceived(DatagramPacket datagramPacket) {
if (datagramPacket.getAddress().equals(address) && datagramPacket.getPort() == port) {
if (Arrays.equals(datagramPacket.getData(), receive)) {
try {
String str[] = new String(datagramPacket.getData(), "UTF-8").split(";");
String pass = str[0];
String addr[] = str[1].split(":");
String ip = addr[0];
String pt = addr[1];
if (pass.equals(password) && candidate.getIp().indexOf(ip) != -1 && candidate.getPort() == Integer.parseInt(pt)) {
System.out.println("Result OK:" + candidate.getIp() + ":" + candidate.getPort());
TestResult testResult = new TestResult();
testResult.setResult(true);
fireTestResult(testResult);
ended = true;
fireTestResult(testResult, candidate);
return true;
}
}
catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
System.out.println("Result Wrong Data:" + datagramPacket.getAddress().getHostAddress() + ":" + datagramPacket.getPort());
return false;
}
};
addListener(listener);
DatagramPacket packet = new DatagramPacket(send, send.length);
byte[] content = null;
try {
content = new String(password + ";" + getIp() + ":" + getPort()).getBytes("UTF-8");
}
catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
packet.setAddress(address);
packet.setPort(port);
DatagramPacket packet = new DatagramPacket(content, content.length);
try {
packet.setAddress(InetAddress.getByName(candidate.getIp()));
}
catch (UnknownHostException e) {
e.printStackTrace();
}
packet.setPort(candidate.getPort());
long delay = 200;
@ -858,6 +853,13 @@ public abstract class TransportCandidate {
// Do Nothing
}
try {
Thread.sleep(2000);
}
catch (InterruptedException e) {
e.printStackTrace();
}
removeListener(listener);
}
});

View File

@ -285,7 +285,7 @@ public abstract class TransportNegotiator extends JingleNegotiator {
int totalTime = (CANDIDATES_ACCEPT_PERIOD + TransportResolver.CHECK_TIMEOUT);
int tries = (int) Math.ceil(totalTime / 1000);
for (int i = 0; i < tries - 2; i++) {
for (int i = 0; i < tries - 1; i++) {
try {
Thread.sleep(1000);
}

View File

@ -349,44 +349,6 @@ public class STUNResolverTest extends SmackTestCase {
}
}
public void testEcho() {
TransportCandidate.Fixed c1 = new TransportCandidate.Fixed("localhost", 22222);
TransportCandidate.Fixed c2 = new TransportCandidate.Fixed("localhost", 22444);
try {
c1.addCandidateEcho(null);
c2.addCandidateEcho(null);
try {
Thread.sleep(100);
}
catch (InterruptedException e) {
e.printStackTrace();
}
TransportCandidate.CandidateEcho ce1 = c1.getCandidateEcho();
TransportCandidate.CandidateEcho ce2 = c2.getCandidateEcho();
for (int i = 0; i < 10; i++) {
assertTrue(ce1.test(InetAddress.getByName("localhost"), 22444, 100));
System.out.println("Bind OK");
}
for (int i = 0; i < 10; i++) {
assertTrue(ce2.test(InetAddress.getByName("localhost"), 22222, 100));
System.out.println("Bind OK");
}
}
catch (SocketException e) {
e.printStackTrace();
}
catch (UnknownHostException e) {
e.printStackTrace();
}
}
protected int getMaxConnections() {
return 2;
}

View File

@ -57,22 +57,6 @@ public class TransportCandidateTest extends SmackTestCase {
assertEquals(candList.get(candList.size() - 1), candH);
}
public void testEcho(){
TransportCandidate tc = new TransportCandidate("localhost",10222){
};
try {
tc.addCandidateEcho(null);
assertTrue(tc.getCandidateEcho().test(InetAddress.getByName("localhost"),10020));
}
catch (SocketException e) {
e.printStackTrace();
}
catch (UnknownHostException e) {
e.printStackTrace();
}
}
protected int getMaxConnections() {
return 0;
}