mirror of
https://codeberg.org/Mercury-IM/Smack
synced 2024-11-26 00:02:06 +01:00
[ibb] Ensure InBandBytestreamManager is a singleton
InBandBytestreamManager followed an unusual pattern: Within the connectionTermianted() callback, it would remove itself from the 'managers' map. This allowed for multiple instances of an InBandBytestreamManager to exist for the same connection, causing all kinds of issues. This fixes the issue by changing InBandBytestreamManager to use the Smack-idiomatic pattern used by managers. We also do no longer reset the listeners if the connection is termianted, as listeners (and handlers) typically persist until they are explicitly removed by the user. As positive side-effect, the number of indeterministic unit-tests, caused by using Thread.sleep(), is reduced. The executor service in InitiationListener was also removed, because the IQ handler is already called asynchronously to the connections main loop. Thanks to Anno van Vliet for reporting this.
This commit is contained in:
parent
0eeb89409a
commit
ed02bcf0d4
3 changed files with 41 additions and 138 deletions
|
@ -110,22 +110,6 @@ public final class InBandBytestreamManager extends Manager implements Bytestream
|
||||||
public void connectionCreated(final XMPPConnection connection) {
|
public void connectionCreated(final XMPPConnection connection) {
|
||||||
// create the manager for this connection
|
// create the manager for this connection
|
||||||
InBandBytestreamManager.getByteStreamManager(connection);
|
InBandBytestreamManager.getByteStreamManager(connection);
|
||||||
|
|
||||||
// register shutdown listener
|
|
||||||
connection.addConnectionListener(new AbstractConnectionClosedListener() {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void connectionTerminated() {
|
|
||||||
InBandBytestreamManager.getByteStreamManager(connection).disableService();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void connected(XMPPConnection connection) {
|
|
||||||
InBandBytestreamManager.getByteStreamManager(connection);
|
|
||||||
}
|
|
||||||
|
|
||||||
});
|
|
||||||
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -206,6 +190,15 @@ public final class InBandBytestreamManager extends Manager implements Bytestream
|
||||||
private InBandBytestreamManager(XMPPConnection connection) {
|
private InBandBytestreamManager(XMPPConnection connection) {
|
||||||
super(connection);
|
super(connection);
|
||||||
|
|
||||||
|
connection.addConnectionListener(new AbstractConnectionClosedListener() {
|
||||||
|
@Override
|
||||||
|
public void connectionTerminated() {
|
||||||
|
// reset internal status
|
||||||
|
InBandBytestreamManager.this.sessions.clear();
|
||||||
|
InBandBytestreamManager.this.ignoredBytestreamRequests.clear();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
// register bytestream open packet listener
|
// register bytestream open packet listener
|
||||||
this.initiationListener = new InitiationListener(this);
|
this.initiationListener = new InitiationListener(this);
|
||||||
connection.registerIQRequestHandler(initiationListener);
|
connection.registerIQRequestHandler(initiationListener);
|
||||||
|
@ -453,19 +446,6 @@ public final class InBandBytestreamManager extends Manager implements Bytestream
|
||||||
connection().sendStanza(error);
|
connection().sendStanza(error);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Responses to the given IQ packet's sender with an XMPP error that an In-Band Bytestream open
|
|
||||||
* request is rejected because its block size is greater than the maximum allowed block size.
|
|
||||||
*
|
|
||||||
* @param request IQ stanza that should be answered with a resource-constraint error
|
|
||||||
* @throws NotConnectedException if the XMPP connection is not connected.
|
|
||||||
* @throws InterruptedException if the calling thread was interrupted.
|
|
||||||
*/
|
|
||||||
protected void replyResourceConstraintPacket(IQ request) throws NotConnectedException, InterruptedException {
|
|
||||||
IQ error = IQ.createErrorResponse(request, StanzaError.Condition.resource_constraint);
|
|
||||||
connection().sendStanza(error);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Responses to the given IQ packet's sender with an XMPP error that an In-Band Bytestream
|
* Responses to the given IQ packet's sender with an XMPP error that an In-Band Bytestream
|
||||||
* session could not be found.
|
* session could not be found.
|
||||||
|
@ -539,30 +519,4 @@ public final class InBandBytestreamManager extends Manager implements Bytestream
|
||||||
return ignoredBytestreamRequests;
|
return ignoredBytestreamRequests;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Disables the InBandBytestreamManager by removing its stanza listeners and resetting its
|
|
||||||
* internal status, which includes removing this instance from the managers map.
|
|
||||||
*/
|
|
||||||
private void disableService() {
|
|
||||||
final XMPPConnection connection = connection();
|
|
||||||
|
|
||||||
// remove manager from static managers map
|
|
||||||
managers.remove(connection);
|
|
||||||
|
|
||||||
// remove all listeners registered by this manager
|
|
||||||
connection.unregisterIQRequestHandler(initiationListener);
|
|
||||||
connection.unregisterIQRequestHandler(dataListener);
|
|
||||||
connection.unregisterIQRequestHandler(closeListener);
|
|
||||||
|
|
||||||
// shutdown threads
|
|
||||||
this.initiationListener.shutdown();
|
|
||||||
|
|
||||||
// reset internal status
|
|
||||||
this.userListeners.clear();
|
|
||||||
this.allRequestListeners.clear();
|
|
||||||
this.sessions.clear();
|
|
||||||
this.ignoredBytestreamRequests.clear();
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,15 +16,9 @@
|
||||||
*/
|
*/
|
||||||
package org.jivesoftware.smackx.bytestreams.ibb;
|
package org.jivesoftware.smackx.bytestreams.ibb;
|
||||||
|
|
||||||
import java.util.concurrent.ExecutorService;
|
|
||||||
import java.util.concurrent.Executors;
|
|
||||||
import java.util.logging.Level;
|
|
||||||
import java.util.logging.Logger;
|
|
||||||
|
|
||||||
import org.jivesoftware.smack.SmackException.NotConnectedException;
|
|
||||||
import org.jivesoftware.smack.iqrequest.AbstractIqRequestHandler;
|
import org.jivesoftware.smack.iqrequest.AbstractIqRequestHandler;
|
||||||
import org.jivesoftware.smack.packet.IQ;
|
import org.jivesoftware.smack.packet.IQ;
|
||||||
import org.jivesoftware.smack.packet.Stanza;
|
import org.jivesoftware.smack.packet.StanzaError;
|
||||||
|
|
||||||
import org.jivesoftware.smackx.bytestreams.BytestreamListener;
|
import org.jivesoftware.smackx.bytestreams.BytestreamListener;
|
||||||
import org.jivesoftware.smackx.bytestreams.ibb.packet.Open;
|
import org.jivesoftware.smackx.bytestreams.ibb.packet.Open;
|
||||||
|
@ -44,14 +38,10 @@ import org.jivesoftware.smackx.filetransfer.StreamNegotiator;
|
||||||
* @author Henning Staib
|
* @author Henning Staib
|
||||||
*/
|
*/
|
||||||
class InitiationListener extends AbstractIqRequestHandler {
|
class InitiationListener extends AbstractIqRequestHandler {
|
||||||
private static final Logger LOGGER = Logger.getLogger(InitiationListener.class.getName());
|
|
||||||
|
|
||||||
/* manager containing the listeners and the XMPP connection */
|
/* manager containing the listeners and the XMPP connection */
|
||||||
private final InBandBytestreamManager manager;
|
private final InBandBytestreamManager manager;
|
||||||
|
|
||||||
/* executor service to process incoming requests concurrently */
|
|
||||||
private final ExecutorService initiationListenerExecutor;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor.
|
* Constructor.
|
||||||
*
|
*
|
||||||
|
@ -60,40 +50,29 @@ class InitiationListener extends AbstractIqRequestHandler {
|
||||||
protected InitiationListener(InBandBytestreamManager manager) {
|
protected InitiationListener(InBandBytestreamManager manager) {
|
||||||
super(Open.ELEMENT, Open.NAMESPACE, IQ.Type.set, Mode.async);
|
super(Open.ELEMENT, Open.NAMESPACE, IQ.Type.set, Mode.async);
|
||||||
this.manager = manager;
|
this.manager = manager;
|
||||||
initiationListenerExecutor = Executors.newCachedThreadPool();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public IQ handleIQRequest(final IQ packet) {
|
public IQ handleIQRequest(final IQ iqRequest) {
|
||||||
initiationListenerExecutor.execute(new Runnable() {
|
Open ibbRequest = (Open) iqRequest;
|
||||||
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
processRequest(packet);
|
|
||||||
}
|
|
||||||
catch (InterruptedException | NotConnectedException e) {
|
|
||||||
LOGGER.log(Level.WARNING, "proccessRequest", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void processRequest(Stanza packet) throws NotConnectedException, InterruptedException {
|
|
||||||
Open ibbRequest = (Open) packet;
|
|
||||||
|
|
||||||
|
int blockSize = ibbRequest.getBlockSize();
|
||||||
|
int maximumBlockSize = manager.getMaximumBlockSize();
|
||||||
// validate that block size is within allowed range
|
// validate that block size is within allowed range
|
||||||
if (ibbRequest.getBlockSize() > this.manager.getMaximumBlockSize()) {
|
if (blockSize > maximumBlockSize) {
|
||||||
this.manager.replyResourceConstraintPacket(ibbRequest);
|
StanzaError error = StanzaError.getBuilder().setCondition(StanzaError.Condition.resource_constraint)
|
||||||
return;
|
.setDescriptiveEnText("Requests block size of " + blockSize + " exceeds maximum block size of "
|
||||||
|
+ maximumBlockSize)
|
||||||
|
.build();
|
||||||
|
return IQ.createErrorResponse(iqRequest, error);
|
||||||
}
|
}
|
||||||
|
|
||||||
StreamNegotiator.signal(ibbRequest.getFrom().toString() + '\t' + ibbRequest.getSessionID(), ibbRequest);
|
StreamNegotiator.signal(ibbRequest.getFrom().toString() + '\t' + ibbRequest.getSessionID(), ibbRequest);
|
||||||
|
|
||||||
// ignore request if in ignore list
|
// ignore request if in ignore list
|
||||||
if (this.manager.getIgnoredBytestreamRequests().remove(ibbRequest.getSessionID()))
|
if (this.manager.getIgnoredBytestreamRequests().remove(ibbRequest.getSessionID())) {
|
||||||
return;
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
// build bytestream request from packet
|
// build bytestream request from packet
|
||||||
InBandBytestreamRequest request = new InBandBytestreamRequest(this.manager, ibbRequest);
|
InBandBytestreamRequest request = new InBandBytestreamRequest(this.manager, ibbRequest);
|
||||||
|
@ -102,7 +81,6 @@ class InitiationListener extends AbstractIqRequestHandler {
|
||||||
BytestreamListener userListener = this.manager.getUserListener(ibbRequest.getFrom());
|
BytestreamListener userListener = this.manager.getUserListener(ibbRequest.getFrom());
|
||||||
if (userListener != null) {
|
if (userListener != null) {
|
||||||
userListener.incomingBytestreamRequest(request);
|
userListener.incomingBytestreamRequest(request);
|
||||||
|
|
||||||
}
|
}
|
||||||
else if (!this.manager.getAllRequestListeners().isEmpty()) {
|
else if (!this.manager.getAllRequestListeners().isEmpty()) {
|
||||||
/*
|
/*
|
||||||
|
@ -111,21 +89,16 @@ class InitiationListener extends AbstractIqRequestHandler {
|
||||||
for (BytestreamListener listener : this.manager.getAllRequestListeners()) {
|
for (BytestreamListener listener : this.manager.getAllRequestListeners()) {
|
||||||
listener.incomingBytestreamRequest(request);
|
listener.incomingBytestreamRequest(request);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
/*
|
StanzaError error = StanzaError.getBuilder()
|
||||||
* if there is no listener for this initiation request, reply with reject message
|
.setCondition(StanzaError.Condition.not_acceptable)
|
||||||
*/
|
.setDescriptiveEnText("No file-transfer listeners registered")
|
||||||
this.manager.replyRejectPacket(ibbRequest);
|
.build();
|
||||||
}
|
return IQ.createErrorResponse(iqRequest, error);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
return null;
|
||||||
* Shuts down the listeners executor service.
|
|
||||||
*/
|
|
||||||
protected void shutdown() {
|
|
||||||
this.initiationListenerExecutor.shutdownNow();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -83,23 +83,14 @@ public class InitiationListenerTest extends SmackTestSuite {
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void shouldRespondWithError() throws Exception {
|
public void shouldRespondWithError() throws Exception {
|
||||||
|
|
||||||
// run the listener with the initiation packet
|
// run the listener with the initiation packet
|
||||||
initiationListener.handleIQRequest(initBytestream);
|
IQ response = initiationListener.handleIQRequest(initBytestream);
|
||||||
|
|
||||||
// wait because packet is processed in an extra thread
|
|
||||||
Thread.sleep(200);
|
|
||||||
|
|
||||||
// capture reply to the In-Band Bytestream open request
|
|
||||||
ArgumentCaptor<IQ> argument = ArgumentCaptor.forClass(IQ.class);
|
|
||||||
verify(connection).sendStanza(argument.capture());
|
|
||||||
|
|
||||||
// assert that reply is the correct error packet
|
// assert that reply is the correct error packet
|
||||||
assertEquals(initiatorJID, argument.getValue().getTo());
|
assertEquals(initiatorJID, response.getTo());
|
||||||
assertEquals(IQ.Type.error, argument.getValue().getType());
|
assertEquals(IQ.Type.error, response.getType());
|
||||||
assertEquals(StanzaError.Condition.not_acceptable,
|
assertEquals(StanzaError.Condition.not_acceptable,
|
||||||
argument.getValue().getError().getCondition());
|
response.getError().getCondition());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -113,21 +104,13 @@ public class InitiationListenerTest extends SmackTestSuite {
|
||||||
byteStreamManager.setMaximumBlockSize(1024);
|
byteStreamManager.setMaximumBlockSize(1024);
|
||||||
|
|
||||||
// run the listener with the initiation packet
|
// run the listener with the initiation packet
|
||||||
initiationListener.handleIQRequest(initBytestream);
|
IQ response = initiationListener.handleIQRequest(initBytestream);
|
||||||
|
|
||||||
// wait because packet is processed in an extra thread
|
|
||||||
Thread.sleep(200);
|
|
||||||
|
|
||||||
// capture reply to the In-Band Bytestream open request
|
|
||||||
ArgumentCaptor<IQ> argument = ArgumentCaptor.forClass(IQ.class);
|
|
||||||
verify(connection).sendStanza(argument.capture());
|
|
||||||
|
|
||||||
// assert that reply is the correct error packet
|
// assert that reply is the correct error packet
|
||||||
assertEquals(initiatorJID, argument.getValue().getTo());
|
assertEquals(initiatorJID, response.getTo());
|
||||||
assertEquals(IQ.Type.error, argument.getValue().getType());
|
assertEquals(IQ.Type.error, response.getType());
|
||||||
assertEquals(StanzaError.Condition.resource_constraint,
|
assertEquals(StanzaError.Condition.resource_constraint,
|
||||||
argument.getValue().getError().getCondition());
|
response.getError().getCondition());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -199,24 +182,17 @@ public class InitiationListenerTest extends SmackTestSuite {
|
||||||
byteStreamManager.addIncomingBytestreamListener(listener, JidCreate.from("other_" + initiatorJID));
|
byteStreamManager.addIncomingBytestreamListener(listener, JidCreate.from("other_" + initiatorJID));
|
||||||
|
|
||||||
// run the listener with the initiation packet
|
// run the listener with the initiation packet
|
||||||
initiationListener.handleIQRequest(initBytestream);
|
IQ response = initiationListener.handleIQRequest(initBytestream);
|
||||||
|
|
||||||
// wait because packet is processed in an extra thread
|
|
||||||
Thread.sleep(200);
|
|
||||||
|
|
||||||
// assert listener is not called
|
// assert listener is not called
|
||||||
ArgumentCaptor<BytestreamRequest> byteStreamRequest = ArgumentCaptor.forClass(BytestreamRequest.class);
|
ArgumentCaptor<BytestreamRequest> byteStreamRequest = ArgumentCaptor.forClass(BytestreamRequest.class);
|
||||||
verify(listener, never()).incomingBytestreamRequest(byteStreamRequest.capture());
|
verify(listener, never()).incomingBytestreamRequest(byteStreamRequest.capture());
|
||||||
|
|
||||||
// capture reply to the In-Band Bytestream open request
|
|
||||||
ArgumentCaptor<IQ> argument = ArgumentCaptor.forClass(IQ.class);
|
|
||||||
verify(connection).sendStanza(argument.capture());
|
|
||||||
|
|
||||||
// assert that reply is the correct error packet
|
// assert that reply is the correct error packet
|
||||||
assertEquals(initiatorJID, argument.getValue().getTo());
|
assertEquals(initiatorJID, response.getTo());
|
||||||
assertEquals(IQ.Type.error, argument.getValue().getType());
|
assertEquals(IQ.Type.error, response.getType());
|
||||||
assertEquals(StanzaError.Condition.not_acceptable,
|
assertEquals(StanzaError.Condition.not_acceptable,
|
||||||
argument.getValue().getError().getCondition());
|
response.getError().getCondition());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in a new issue