Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Use Streams #654

Open
wants to merge 2 commits into
base: pr/prepare_streams
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 57 additions & 21 deletions core/src/saros/negotiation/ArchiveIncomingProjectNegotiation.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package saros.negotiation;

import java.io.DataInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smackx.filetransfer.IncomingFileTransfer;
import saros.SarosPluginContext;
import saros.exceptions.LocalCancellationException;
import saros.exceptions.SarosCancellationException;
import saros.filesystem.IChecksumCache;
Expand All @@ -18,11 +21,14 @@
import saros.monitoring.IProgressMonitor;
import saros.monitoring.SubProgressMonitor;
import saros.negotiation.NegotiationTools.CancelOption;
import saros.net.IConnectionManager;
import saros.net.IReceiver;
import saros.net.IStreamConnection;
import saros.net.ITransmitter;
import saros.net.xmpp.JID;
import saros.net.xmpp.XMPPConnectionService;
import saros.observables.FileReplacementInProgressObservable;
import saros.repackaged.picocontainer.annotations.Inject;
import saros.session.ISarosSession;
import saros.session.ISarosSessionManager;
import saros.util.CoreUtils;
Expand All @@ -35,6 +41,9 @@ public class ArchiveIncomingProjectNegotiation extends AbstractIncomingProjectNe

private static final Logger LOG = Logger.getLogger(ArchiveIncomingProjectNegotiation.class);

// TODO move to factory
@Inject private IConnectionManager connectionManager;

public ArchiveIncomingProjectNegotiation(
final JID peer, //
final String negotiationID, //
Expand All @@ -60,6 +69,9 @@ public ArchiveIncomingProjectNegotiation(
connectionService,
transmitter,
receiver);

// FIXME remove
SarosPluginContext.initComponent(this);
}

@Override
Expand All @@ -73,22 +85,20 @@ protected void transfer(

// the host do not send an archive if we do not need any files
if (filesMissing) {
receiveAndUnpackArchive(projectMapping, transferListener, monitor);
receiveAndUnpackArchive(projectMapping, monitor);
}
}

/** Receives the archive with all missing files and unpacks it. */
private void receiveAndUnpackArchive(
final Map<String, IProject> localProjectMapping,
final TransferListener archiveTransferListener,
final IProgressMonitor monitor)
final Map<String, IProject> localProjectMapping, final IProgressMonitor monitor)
throws IOException, SarosCancellationException {

// waiting for the big archive to come in

monitor.beginTask(null, 100);

File archiveFile = receiveArchive(archiveTransferListener, new SubProgressMonitor(monitor, 50));
File archiveFile = receiveArchive(new SubProgressMonitor(monitor, 50));

/*
* FIXME at this point it makes no sense to report the cancellation to
Expand Down Expand Up @@ -146,37 +156,63 @@ private void unpackArchive(
// TODO: now add the checksums into the cache
}

private File receiveArchive(TransferListener archiveTransferListener, IProgressMonitor monitor)
private File receiveArchive(IProgressMonitor monitor)
throws IOException, SarosCancellationException {

monitor.beginTask("Receiving archive file...", 100);
LOG.debug("waiting for incoming archive stream request");

monitor.subTask("Host is compressing project files. Waiting for the archive file...");

awaitTransferRequest();
LOG.debug("connecting to " + getPeer() + " to receive archive file");

monitor.subTask("Receiving archive file...");
monitor.subTask("Connecting to " + getPeer().getName() + "...");

LOG.debug(this + " : receiving archive");

IncomingFileTransfer transfer = archiveTransferListener.getRequest().accept();
IStreamConnection connection =
connectionManager.connectStream(TRANSFER_ID_PREFIX + getID(), getPeer());

File archiveFile = File.createTempFile("saros_archive_" + System.currentTimeMillis(), null);

OutputStream out = null;

boolean transferFailed = true;

try {
transfer.recieveFile(archiveFile);

monitorFileTransfer(transfer, monitor);
out = new FileOutputStream(archiveFile);

connection.setReadTimeout(60 * 60 * 1000);
monitor.subTask("Host is compressing project files. Waiting for the archive file...");

DataInputStream dis = new DataInputStream(connection.getInputStream());

long remainingDataSize = dis.readLong();

monitor.subTask("Receiving archive file...");

LOG.debug(this + " : receiving archive");

final byte buffer[] = new byte[BUFFER_SIZE];

while (remainingDataSize > 0) {
int read = dis.read(buffer);

if (read == -1) break;

out.write(buffer, 0, read);
remainingDataSize -= read;

checkCancellation(CancelOption.NOTIFY_PEER);
}

if (remainingDataSize > 0)
localCancel(
"The receiving of the archive file was not successful.", CancelOption.NOTIFY_PEER);

transferFailed = false;
} catch (XMPPException e) {
throw new IOException(e.getMessage(), e.getCause());
} finally {
if (transferFailed && !archiveFile.delete()) {
LOG.warn("Could not clean up archive file " + archiveFile.getAbsolutePath());
}

IOUtils.closeQuietly(out);
connection.close();
}

monitor.done();
Expand Down
101 changes: 91 additions & 10 deletions core/src/saros/negotiation/ArchiveOutgoingProjectNegotiation.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package saros.negotiation;

import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
import org.jivesoftware.smack.XMPPException;
import org.jivesoftware.smackx.filetransfer.OutgoingFileTransfer;
import saros.SarosPluginContext;
import saros.editor.IEditorManager;
import saros.exceptions.LocalCancellationException;
import saros.exceptions.OperationCanceledException;
Expand All @@ -18,10 +21,14 @@
import saros.filesystem.IWorkspace;
import saros.monitoring.IProgressMonitor;
import saros.negotiation.NegotiationTools.CancelOption;
import saros.net.IConnectionManager;
import saros.net.IReceiver;
import saros.net.IStreamConnection;
import saros.net.IStreamConnectionListener;
import saros.net.ITransmitter;
import saros.net.xmpp.JID;
import saros.net.xmpp.XMPPConnectionService;
import saros.repackaged.picocontainer.annotations.Inject;
import saros.session.ISarosSession;
import saros.session.ISarosSessionManager;
import saros.session.User;
Expand All @@ -36,6 +43,33 @@ public class ArchiveOutgoingProjectNegotiation extends AbstractOutgoingProjectNe
private static final Logger LOG = Logger.getLogger(ArchiveOutgoingProjectNegotiation.class);
private File zipArchive = null;

// TODO move to factory
@Inject private IConnectionManager connectionManager;

private IStreamConnection connection;

private boolean awaitConnection = true;

private final IStreamConnectionListener streamConnectionListener =
new IStreamConnectionListener() {

@Override
public boolean streamConnectionEstablished(String id, IStreamConnection connection) {

synchronized (ArchiveOutgoingProjectNegotiation.this) {
if (!awaitConnection) return false;

if (!(TRANSFER_ID_PREFIX + getID()).equals(id)) return false;

ArchiveOutgoingProjectNegotiation.this.connection = connection;

ArchiveOutgoingProjectNegotiation.this.notifyAll();
}

return true;
}
};

public ArchiveOutgoingProjectNegotiation( //
final JID peer, //
final ProjectSharingData projects, //
Expand All @@ -59,13 +93,18 @@ public ArchiveOutgoingProjectNegotiation( //
connectionService,
transmitter,
receiver);

// FIXME remove
SarosPluginContext.initComponent(this);
}

@Override
protected void setup(IProgressMonitor monitor) throws IOException {
if (fileTransferManager == null)
// FIXME: the logic will try to send this to the remote contact
throw new IOException("not connected to a XMPP server");

connectionManager.addStreamConnectionListener(streamConnectionListener);
}

@Override
Expand Down Expand Up @@ -113,6 +152,7 @@ protected void transfer(IProgressMonitor monitor, List<FileList> fileLists)
protected void cleanup(IProgressMonitor monitor) {
if (zipArchive != null && !zipArchive.delete())
LOG.warn("could not delete archive file: " + zipArchive.getAbsolutePath());
connectionManager.addStreamConnectionListener(streamConnectionListener);
super.cleanup(monitor);
}

Expand Down Expand Up @@ -200,19 +240,60 @@ private void sendArchive(
File archive, JID remoteContact, String transferID, IProgressMonitor monitor)
throws SarosCancellationException, IOException {

LOG.debug(this + " : sending archive");
LOG.debug(this + " : waiting for remote connection");
monitor.beginTask("Sending archive file...", 100);

assert fileTransferManager != null;
final long timeout = 60 * 1000;

long currentTime = System.currentTimeMillis();

synchronized (this) {
while (System.currentTimeMillis() - currentTime < timeout) {
if (connection != null) break;

if (monitor.isCanceled()) {
awaitConnection = false;
checkCancellation(CancelOption.NOTIFY_PEER);
}

try {
wait(1000);
} catch (InterruptedException e) {
awaitConnection = false;
Thread.currentThread().interrupt();
this.localCancel("Negotiation got internally interrupted.", CancelOption.NOTIFY_PEER);
break;
}
}

awaitConnection = false;
}

assert connection != null;

DataOutputStream out = null;
InputStream in = null;

try {
OutgoingFileTransfer transfer =
fileTransferManager.createOutgoingFileTransfer(remoteContact.toString());

transfer.sendFile(archive, transferID);
monitorFileTransfer(transfer, monitor);
} catch (XMPPException e) {
throw new IOException(e.getMessage(), e);
in = new FileInputStream(archive);

long fileSize = archive.length();

out = new DataOutputStream(connection.getOutputStream());

out.writeLong(fileSize);
final byte buffer[] = new byte[BUFFER_SIZE];

int read = 0;

while ((read = in.read(buffer)) != -1) {
out.write(buffer, 0, read);
checkCancellation(CancelOption.NOTIFY_PEER);
}
} finally {
connection.close();
IOUtils.closeQuietly(in);
}

monitor.done();
Expand Down
2 changes: 2 additions & 0 deletions core/src/saros/negotiation/ProjectNegotiation.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ public abstract class ProjectNegotiation extends Negotiation {
*/
protected FileTransferManager fileTransferManager;

protected static final int BUFFER_SIZE = 32 * 1024;

public ProjectNegotiation(
final String id,
final JID peer,
Expand Down
21 changes: 13 additions & 8 deletions core/src/saros/net/IConnectionManager.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package saros.net;

import java.io.IOException;
import saros.net.internal.IByteStreamConnection;
import saros.net.internal.IConnection;
import saros.net.stream.StreamMode;
import saros.net.xmpp.JID;

Expand All @@ -21,24 +21,29 @@ public interface IConnectionManager {
*/
public void setServices(int serviceMask);

public void addStreamConnectionListener(final IStreamConnectionListener listener);

public void removeStreamConnectionListener(final IStreamConnectionListener listener);

public IStreamConnection connectStream(String id, Object address) throws IOException;
/** @deprecated */
@Deprecated
public IByteStreamConnection connect(JID peer) throws IOException;
public IConnection connect(Object address) throws IOException;

public IByteStreamConnection connect(String connectionID, JID peer) throws IOException;
public IConnection connect(String connectionID, Object address) throws IOException;

/**
* @deprecated Disconnects {@link IByteStreamConnection} with the specified peer
* @param peer {@link JID} of the peer to disconnect the {@link IByteStreamConnection}
* @deprecated Disconnects with the specified address
* @param address
*/
@Deprecated
public boolean closeConnection(JID peer);
public boolean closeConnection(Object address);

public boolean closeConnection(String connectionIdentifier, JID peer);
public boolean closeConnection(String connectionIdentifier, Object address);

/** @deprecated */
@Deprecated
public StreamMode getTransferMode(JID jid);

public StreamMode getTransferMode(String connectionID, JID jid);
public StreamMode getTransferMode(String connectionID, Object address);
}
17 changes: 17 additions & 0 deletions core/src/saros/net/IStreamConnection.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package saros.net;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import saros.net.internal.IConnection;

public interface IStreamConnection extends IConnection {

public InputStream getInputStream() throws IOException;

public OutputStream getOutputStream() throws IOException;

public int getReadTimeout() throws IOException;

public void setReadTimeout(int timeout) throws IOException;
}
Loading