Merge pull request #404 from Flowdalic/ibb-manager-fix

[ibb] Ensure InBandBytestreamManager is a singleton
This commit is contained in:
Florian Schmaus 2020-07-06 11:21:00 +02:00 committed by GitHub
commit c80afbbce7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 41 additions and 138 deletions

View File

@ -110,22 +110,6 @@ public final class InBandBytestreamManager extends Manager implements Bytestream
public void connectionCreated(final XMPPConnection connection) {
// create the manager for this connection
InBandBytestreamManager.getByteStreamManager(connection);
// register shutdown listener
connection.addConnectionListener(new AbstractConnectionClosedListener() {
@Override
public void connectionTerminated() {
InBandBytestreamManager.getByteStreamManager(connection).disableService();
}
@Override
public void connected(XMPPConnection connection) {
InBandBytestreamManager.getByteStreamManager(connection);
}
});
}
});
}
@ -206,6 +190,15 @@ public final class InBandBytestreamManager extends Manager implements Bytestream
private InBandBytestreamManager(XMPPConnection connection) {
super(connection);
connection.addConnectionListener(new AbstractConnectionClosedListener() {
@Override
public void connectionTerminated() {
// reset internal status
InBandBytestreamManager.this.sessions.clear();
InBandBytestreamManager.this.ignoredBytestreamRequests.clear();
}
});
// register bytestream open packet listener
this.initiationListener = new InitiationListener(this);
connection.registerIQRequestHandler(initiationListener);
@ -453,19 +446,6 @@ public final class InBandBytestreamManager extends Manager implements Bytestream
connection().sendStanza(error);
}
/**
* Responses to the given IQ packet's sender with an XMPP error that an In-Band Bytestream open
* request is rejected because its block size is greater than the maximum allowed block size.
*
* @param request IQ stanza that should be answered with a resource-constraint error
* @throws NotConnectedException if the XMPP connection is not connected.
* @throws InterruptedException if the calling thread was interrupted.
*/
protected void replyResourceConstraintPacket(IQ request) throws NotConnectedException, InterruptedException {
IQ error = IQ.createErrorResponse(request, StanzaError.Condition.resource_constraint);
connection().sendStanza(error);
}
/**
* Responses to the given IQ packet's sender with an XMPP error that an In-Band Bytestream
* session could not be found.
@ -539,30 +519,4 @@ public final class InBandBytestreamManager extends Manager implements Bytestream
return ignoredBytestreamRequests;
}
/**
* Disables the InBandBytestreamManager by removing its stanza listeners and resetting its
* internal status, which includes removing this instance from the managers map.
*/
private void disableService() {
final XMPPConnection connection = connection();
// remove manager from static managers map
managers.remove(connection);
// remove all listeners registered by this manager
connection.unregisterIQRequestHandler(initiationListener);
connection.unregisterIQRequestHandler(dataListener);
connection.unregisterIQRequestHandler(closeListener);
// shutdown threads
this.initiationListener.shutdown();
// reset internal status
this.userListeners.clear();
this.allRequestListeners.clear();
this.sessions.clear();
this.ignoredBytestreamRequests.clear();
}
}

View File

@ -16,15 +16,9 @@
*/
package org.jivesoftware.smackx.bytestreams.ibb;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.jivesoftware.smack.SmackException.NotConnectedException;
import org.jivesoftware.smack.iqrequest.AbstractIqRequestHandler;
import org.jivesoftware.smack.packet.IQ;
import org.jivesoftware.smack.packet.Stanza;
import org.jivesoftware.smack.packet.StanzaError;
import org.jivesoftware.smackx.bytestreams.BytestreamListener;
import org.jivesoftware.smackx.bytestreams.ibb.packet.Open;
@ -44,14 +38,10 @@ import org.jivesoftware.smackx.filetransfer.StreamNegotiator;
* @author Henning Staib
*/
class InitiationListener extends AbstractIqRequestHandler {
private static final Logger LOGGER = Logger.getLogger(InitiationListener.class.getName());
/* manager containing the listeners and the XMPP connection */
private final InBandBytestreamManager manager;
/* executor service to process incoming requests concurrently */
private final ExecutorService initiationListenerExecutor;
/**
* Constructor.
*
@ -60,40 +50,29 @@ class InitiationListener extends AbstractIqRequestHandler {
protected InitiationListener(InBandBytestreamManager manager) {
super(Open.ELEMENT, Open.NAMESPACE, IQ.Type.set, Mode.async);
this.manager = manager;
initiationListenerExecutor = Executors.newCachedThreadPool();
}
@Override
public IQ handleIQRequest(final IQ packet) {
initiationListenerExecutor.execute(new Runnable() {
@Override
public void run() {
try {
processRequest(packet);
}
catch (InterruptedException | NotConnectedException e) {
LOGGER.log(Level.WARNING, "proccessRequest", e);
}
}
});
return null;
}
private void processRequest(Stanza packet) throws NotConnectedException, InterruptedException {
Open ibbRequest = (Open) packet;
public IQ handleIQRequest(final IQ iqRequest) {
Open ibbRequest = (Open) iqRequest;
int blockSize = ibbRequest.getBlockSize();
int maximumBlockSize = manager.getMaximumBlockSize();
// validate that block size is within allowed range
if (ibbRequest.getBlockSize() > this.manager.getMaximumBlockSize()) {
this.manager.replyResourceConstraintPacket(ibbRequest);
return;
if (blockSize > maximumBlockSize) {
StanzaError error = StanzaError.getBuilder().setCondition(StanzaError.Condition.resource_constraint)
.setDescriptiveEnText("Requests block size of " + blockSize + " exceeds maximum block size of "
+ maximumBlockSize)
.build();
return IQ.createErrorResponse(iqRequest, error);
}
StreamNegotiator.signal(ibbRequest.getFrom().toString() + '\t' + ibbRequest.getSessionID(), ibbRequest);
// ignore request if in ignore list
if (this.manager.getIgnoredBytestreamRequests().remove(ibbRequest.getSessionID()))
return;
if (this.manager.getIgnoredBytestreamRequests().remove(ibbRequest.getSessionID())) {
return null;
}
// build bytestream request from packet
InBandBytestreamRequest request = new InBandBytestreamRequest(this.manager, ibbRequest);
@ -102,7 +81,6 @@ class InitiationListener extends AbstractIqRequestHandler {
BytestreamListener userListener = this.manager.getUserListener(ibbRequest.getFrom());
if (userListener != null) {
userListener.incomingBytestreamRequest(request);
}
else if (!this.manager.getAllRequestListeners().isEmpty()) {
/*
@ -111,21 +89,16 @@ class InitiationListener extends AbstractIqRequestHandler {
for (BytestreamListener listener : this.manager.getAllRequestListeners()) {
listener.incomingBytestreamRequest(request);
}
}
else {
/*
* if there is no listener for this initiation request, reply with reject message
*/
this.manager.replyRejectPacket(ibbRequest);
StanzaError error = StanzaError.getBuilder()
.setCondition(StanzaError.Condition.not_acceptable)
.setDescriptiveEnText("No file-transfer listeners registered")
.build();
return IQ.createErrorResponse(iqRequest, error);
}
}
/**
* Shuts down the listeners executor service.
*/
protected void shutdown() {
this.initiationListenerExecutor.shutdownNow();
return null;
}
}

View File

@ -83,23 +83,14 @@ public class InitiationListenerTest extends SmackTestSuite {
*/
@Test
public void shouldRespondWithError() throws Exception {
// run the listener with the initiation packet
initiationListener.handleIQRequest(initBytestream);
// wait because packet is processed in an extra thread
Thread.sleep(200);
// capture reply to the In-Band Bytestream open request
ArgumentCaptor<IQ> argument = ArgumentCaptor.forClass(IQ.class);
verify(connection).sendStanza(argument.capture());
IQ response = initiationListener.handleIQRequest(initBytestream);
// assert that reply is the correct error packet
assertEquals(initiatorJID, argument.getValue().getTo());
assertEquals(IQ.Type.error, argument.getValue().getType());
assertEquals(initiatorJID, response.getTo());
assertEquals(IQ.Type.error, response.getType());
assertEquals(StanzaError.Condition.not_acceptable,
argument.getValue().getError().getCondition());
response.getError().getCondition());
}
/**
@ -113,21 +104,13 @@ public class InitiationListenerTest extends SmackTestSuite {
byteStreamManager.setMaximumBlockSize(1024);
// run the listener with the initiation packet
initiationListener.handleIQRequest(initBytestream);
// wait because packet is processed in an extra thread
Thread.sleep(200);
// capture reply to the In-Band Bytestream open request
ArgumentCaptor<IQ> argument = ArgumentCaptor.forClass(IQ.class);
verify(connection).sendStanza(argument.capture());
IQ response = initiationListener.handleIQRequest(initBytestream);
// assert that reply is the correct error packet
assertEquals(initiatorJID, argument.getValue().getTo());
assertEquals(IQ.Type.error, argument.getValue().getType());
assertEquals(initiatorJID, response.getTo());
assertEquals(IQ.Type.error, response.getType());
assertEquals(StanzaError.Condition.resource_constraint,
argument.getValue().getError().getCondition());
response.getError().getCondition());
}
/**
@ -199,24 +182,17 @@ public class InitiationListenerTest extends SmackTestSuite {
byteStreamManager.addIncomingBytestreamListener(listener, JidCreate.from("other_" + initiatorJID));
// run the listener with the initiation packet
initiationListener.handleIQRequest(initBytestream);
// wait because packet is processed in an extra thread
Thread.sleep(200);
IQ response = initiationListener.handleIQRequest(initBytestream);
// assert listener is not called
ArgumentCaptor<BytestreamRequest> byteStreamRequest = ArgumentCaptor.forClass(BytestreamRequest.class);
verify(listener, never()).incomingBytestreamRequest(byteStreamRequest.capture());
// capture reply to the In-Band Bytestream open request
ArgumentCaptor<IQ> argument = ArgumentCaptor.forClass(IQ.class);
verify(connection).sendStanza(argument.capture());
// assert that reply is the correct error packet
assertEquals(initiatorJID, argument.getValue().getTo());
assertEquals(IQ.Type.error, argument.getValue().getType());
assertEquals(initiatorJID, response.getTo());
assertEquals(IQ.Type.error, response.getType());
assertEquals(StanzaError.Condition.not_acceptable,
argument.getValue().getError().getCondition());
response.getError().getCondition());
}
/**