diff --git a/.gitignore b/.gitignore new file mode 100755 index 00000000..20a55bae --- /dev/null +++ b/.gitignore @@ -0,0 +1,23 @@ +*.class +*~ +.*.swp +.*.swo +.loadpath +.buildpath +.project +.settings +.classpath +.metadata +.idea +*.iml +*.ipr +*.iws +nbproject +.DS_Store +target +test-output +nbactions.xml +samples/gwt-demo/src/main/webapp/WEB-INF/classes/ +samples/gwt-chat/src/main/webapp/WEB-INF/classes/ +samples/gwt-conn-share/src/main/webapp/WEB-INF/classes/ +atlassian-ide-plugin.xml \ No newline at end of file diff --git a/pom.xml b/pom.xml new file mode 100644 index 00000000..ca35de97 --- /dev/null +++ b/pom.xml @@ -0,0 +1,281 @@ + + + + org.sonatype.oss + oss-parent + 5 + + 4.0.0 + org.atmosphere + atmosphere-client + atmosphere-client + 1.0.0-SNAPSHOT + jar + + Atmosphere Client Library + + http://github.com/Atmosphere/atmosphere + + scm:git:git@github.com:Atmosphere/atmosphere-client.git + scm:git:git@github.com:Atmosphere/atmosphere-client.git + http://github.com/Atmosphere/atmosphere-client + + + 2.2.0 + + + + jfarcand + Jeanfrancois Arcand + jfarcand@apache.org + + + + + Apache License 2.0 + http://www.apache.org/licenses/LICENSE-2.0.html + repo + + + + + com.ning + async-http-client + 1.7.6 + + + org.atmosphere + atmosphere-runtime + 1.0.3 + + + + src/main/java + src/test/java + + + + org.apache.maven.wagon + wagon-ssh-external + 1.0-beta-6 + + + install + + + org.apache.maven.plugins + maven-javadoc-plugin + 2.8.1 + + true + 1.6 + UTF-8 + 1g + + http://java.sun.com/javase/6/docs/api/ + + + + + attach-javadocs + verify + + jar + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 2.3.2 + + ${source.property} + ${target.property} + 1024m + + + + org.apache.maven.plugins + maven-shade-plugin + 1.2.1 + + + package + + shade + + + true + all + + + commons-codec:commons-codec + commons-lang:commons-lang + commons-logging:commons-logging + junit:junit + log4j:log4j + commons-httpclient:commons-httpclient + org.scala-lang:scala-library + org.scala-lang:scala-compiler + + + + + + + + + + + + + org.apache.felix + maven-bundle-plugin + 2.3.4 + true + + META-INF + + $(replace;$(project.version);-SNAPSHOT;.$(tstamp;yyyyMMdd-HHmm)) + + jfarcand + + * + + + org.jfarcand.*;version="$(replace;$(project.version);-SNAPSHOT;"")" + + + + + + osgi-bundle + package + + bundle + + + + + + org.apache.maven.plugins + maven-resources-plugin + 2.4.3 + + UTF-8 + + + + org.apache.maven.plugins + maven-release-plugin + 2.1 + + + org.apache.maven.plugins + maven-source-plugin + 2.1.2 + + + attach-sources + verify + + jar-no-fork + + + + + + + org.scala-tools + maven-scala-plugin + 2.15.2 + + + + compile + testCompile + + + + + + -Xmx384m + + + -target:jvm-1.5 + -deprecation + + + + run-scalatest + org.scalatest.tools.Runner + + -p + ${project.build.testOutputDirectory} + + + -Xmx512m + + + + + + + + + target/site + + + org.scala-tools + maven-scala-plugin + + + org.apache.maven.plugins + maven-javadoc-plugin + 2.8.1 + + true + 1.6 + UTF-8 + 1g + + http://java.sun.com/javase/6/docs/api/ + + ${sun.boot.class.path} + com.google.doclava.Doclava + false + -J-Xmx1024m + + com.google.doclava + doclava + 1.0.3 + + + -hdf project.name "${project.name} ${project.version}" + -d ${project.reporting.outputDirectory}/apidocs + + + + + + + + sonatype-nexus-staging + Sonatype Release + http://oss.sonatype.org/service/local/staging/deploy/maven2 + + + sonatype-nexus-snapshots + sonatype-nexus-snapshots + ${distMgmtSnapshotsUrl} + + + + http://oss.sonatype.org/content/repositories/snapshots + true + 1.6 + 1.6 + + + diff --git a/src/main/java/org/atmosphere/client/AtmosphereClientFactory.java b/src/main/java/org/atmosphere/client/AtmosphereClientFactory.java new file mode 100644 index 00000000..184130ae --- /dev/null +++ b/src/main/java/org/atmosphere/client/AtmosphereClientFactory.java @@ -0,0 +1,25 @@ +/* + * Copyright 2012 Jeanfrancois Arcand + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.atmosphere.client; + +public class AtmosphereClientFactory { + + private final AtmosphereClientFactory factory = new AtmosphereClientFactory(); + + + + +} diff --git a/src/main/java/org/atmosphere/client/Client.java b/src/main/java/org/atmosphere/client/Client.java new file mode 100644 index 00000000..6f332cfb --- /dev/null +++ b/src/main/java/org/atmosphere/client/Client.java @@ -0,0 +1,27 @@ +/* + * Copyright 2012 Jeanfrancois Arcand + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.atmosphere.client; + +import java.io.IOException; + +public interface Client { + + Client open(Options options); + + Future fire(Request request) throws IOException; + + Client close(); +} diff --git a/src/main/java/org/atmosphere/client/Decoder.java b/src/main/java/org/atmosphere/client/Decoder.java new file mode 100644 index 00000000..5fd6ecad --- /dev/null +++ b/src/main/java/org/atmosphere/client/Decoder.java @@ -0,0 +1,22 @@ +/* + * Copyright 2012 Jeanfrancois Arcand + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.atmosphere.client; + +public interface Decoder { + + T decode(U s); + +} diff --git a/src/main/java/org/atmosphere/client/DefaultClient.java b/src/main/java/org/atmosphere/client/DefaultClient.java new file mode 100644 index 00000000..42c1ea21 --- /dev/null +++ b/src/main/java/org/atmosphere/client/DefaultClient.java @@ -0,0 +1,336 @@ +/* + * Copyright 2012 Jeanfrancois Arcand + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.atmosphere.client; + +import com.ning.http.client.AsyncCompletionHandler; +import com.ning.http.client.AsyncHttpClient; +import com.ning.http.client.AsyncHttpClientConfig; +import com.ning.http.client.HttpResponseBodyPart; +import com.ning.http.client.HttpResponseHeaders; +import com.ning.http.client.HttpResponseStatus; +import com.ning.http.client.RequestBuilder; +import com.ning.http.client.Response; +import com.ning.http.client.websocket.WebSocket; +import com.ning.http.client.websocket.WebSocketTextListener; +import com.ning.http.client.websocket.WebSocketUpgradeHandler; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class DefaultClient implements Client { + + private AsyncHttpClient asyncHttpClient; + + protected DefaultClient() { + } + + public DefaultClient open(Options options) { + AsyncHttpClientConfig.Builder config = new AsyncHttpClientConfig.Builder(); + this.asyncHttpClient = new AsyncHttpClient(config.build()); + return this; + } + + public Future fire(Request request) throws IOException { + RequestBuilder r = new RequestBuilder(); + + List transports = getTransport(request.transport()); + + Transport primary = transports.get(0); + // TODO: Fallback implementation + + Future f; + if (primary.name().equals(Request.TRANSPORT.WEBSOCKET)) { + java.util.concurrent.Future w = asyncHttpClient.prepareRequest(r.build()).execute( + new WebSocketUpgradeHandler.Builder().addWebSocketListener((WebSocketTextListener) primary).build()); + try { + f = new Future(new SocketImpl(request, w.get(), primary)); + } catch (Exception e) { + throw new IOException(e); + } + } else { + java.util.concurrent.Future s = asyncHttpClient.prepareRequest(r.build()).execute( + (AsyncCompletionHandler) primary); + + // TODO: This is no garantee the connection has been established. + f = new Future(new SocketImpl(request, asyncHttpClient, primary)); + } + + primary.future(f); + return f; + } + + public DefaultClient close() { + if (asyncHttpClient != null) { + asyncHttpClient.closeAsynchronously(); + } + return this; + } + + protected List getTransport(List t) { + + List transports = new ArrayList(); + + if (t.equals(Request.TRANSPORT.WEBSOCKET)) { + transports.add(new WebSocketTransport()); + } else if (t.equals(Request.TRANSPORT.SSE)) { + transports.add(new SSETransport()); + } else if (t.equals(Request.TRANSPORT.LONG_POLLING)) { + transports.add(new LongPollingTransport()); + } else if (t.equals(Request.TRANSPORT.STREAMING)) { + transports.add(new StreamTransport()); + } + return transports; + } + + private final static class WebSocketTransport implements WebSocketTextListener, Transport { + + Future f; + Function messageFunction = new NoOpsFunction(); + Function errorFunction = new NoOpsFunction(); + Function closeFunction = new NoOpsFunction(); + Function openFunction = new NoOpsFunction(); + + @Override + public Transport future(Future f) { + this.f = f; + return this; + } + + @Override + public Transport injectFunction(Socket.EVENT event, Function function) { + return null; + } + + @Override + public void onMessage(String message) { + messageFunction.on((T) message); + } + + @Override + public void onFragment(String fragment, boolean last) { + + } + + @Override + public void onOpen(WebSocket websocket) { + f.done(); + openFunction.on((T)""); + } + + @Override + public void onClose(WebSocket websocket) { + closeFunction.on((T)""); + } + + @Override + public void onError(Throwable t) { + f.cancel(true); + closeFunction.on((T)t); + } + + @Override + public Request.TRANSPORT name() { + return Request.TRANSPORT.WEBSOCKET; + } + } + + private final static class StreamTransport extends AsyncCompletionHandler implements Transport { + + Future f; + Function messageFunction = new NoOpsFunction(); + Function errorFunction = new NoOpsFunction(); + Function closeFunction = new NoOpsFunction(); + Function openFunction = new NoOpsFunction(); + + @Override + public Transport future(Future f) { + this.f = f; + return this; + } + + @Override + public Transport injectFunction(Socket.EVENT event, Function function) { + return null; + } + + /** + * {@inheritDoc} + */ + public STATE onBodyPartReceived(final HttpResponseBodyPart content) throws Exception { + return STATE.CONTINUE; + } + + /** + * {@inheritDoc} + */ + public STATE onStatusReceived(final HttpResponseStatus status) throws Exception { + f.done(); + return STATE.CONTINUE; + } + + /** + * {@inheritDoc} + */ + public STATE onHeadersReceived(final HttpResponseHeaders headers) throws Exception { + return STATE.CONTINUE; + } + + @Override + public String onCompleted(Response response) throws Exception { + return ""; + } + + @Override + public Request.TRANSPORT name() { + return Request.TRANSPORT.STREAMING; + } + + /** + * {@inheritDoc} + */ + public void onThrowable(Throwable t) { + f.cancel(true); + } + + } + + private final static class SSETransport extends AsyncCompletionHandler implements Transport { + + Future f; + Function messageFunction = new NoOpsFunction(); + Function errorFunction = new NoOpsFunction(); + Function closeFunction = new NoOpsFunction(); + Function openFunction = new NoOpsFunction(); + + @Override + public Transport future(Future f) { + this.f = f; + return this; + } + + @Override + public Transport injectFunction(Socket.EVENT event, Function function) { + return null; + } + + /** + * {@inheritDoc} + */ + public STATE onBodyPartReceived(final HttpResponseBodyPart content) throws Exception { + return STATE.CONTINUE; + } + + /** + * {@inheritDoc} + */ + public STATE onStatusReceived(final HttpResponseStatus status) throws Exception { + f.done(); + return STATE.CONTINUE; + } + + /** + * {@inheritDoc} + */ + public STATE onHeadersReceived(final HttpResponseHeaders headers) throws Exception { + return STATE.CONTINUE; + } + + @Override + public String onCompleted(Response response) throws Exception { + return ""; + } + + @Override + public Request.TRANSPORT name() { + return Request.TRANSPORT.SSE; + } + + /** + * {@inheritDoc} + */ + public void onThrowable(Throwable t) { + f.cancel(true); + } + } + + private final static class LongPollingTransport extends AsyncCompletionHandler implements Transport { + + Future f; + Function messageFunction = new NoOpsFunction(); + Function errorFunction = new NoOpsFunction(); + Function closeFunction = new NoOpsFunction(); + Function openFunction = new NoOpsFunction(); + + @Override + public Transport future(Future f) { + this.f = f; + return this; + } + + @Override + public Transport injectFunction(Socket.EVENT event, Function function) { + return null; + } + + /** + * {@inheritDoc} + */ + public STATE onBodyPartReceived(final HttpResponseBodyPart content) throws Exception { + return STATE.CONTINUE; + } + + /** + * {@inheritDoc} + */ + public STATE onStatusReceived(final HttpResponseStatus status) throws Exception { + f.done(); + return STATE.CONTINUE; + } + + /** + * {@inheritDoc} + */ + public STATE onHeadersReceived(final HttpResponseHeaders headers) throws Exception { + return STATE.CONTINUE; + } + + @Override + public String onCompleted(Response response) throws Exception { + return ""; + } + + @Override + public Request.TRANSPORT name() { + return Request.TRANSPORT.LONG_POLLING; + } + + /** + * {@inheritDoc} + */ + public void onThrowable(Throwable t) { + f.cancel(true); + } + } + + private final static class NoOpsFunction implements Function{ + + @Override + public void on(Object o) { + + } + } +} diff --git a/src/main/java/org/atmosphere/client/Encoder.java b/src/main/java/org/atmosphere/client/Encoder.java new file mode 100644 index 00000000..32e84a85 --- /dev/null +++ b/src/main/java/org/atmosphere/client/Encoder.java @@ -0,0 +1,41 @@ +/* + * Copyright 2012 Jeanfrancois Arcand + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.atmosphere.client; + +public interface Encoder { + + EncodedMessage encode(EncodedMessage s); + + public static class EncodedMessage { + + private final Class encoding; + private final Object message; + + public EncodedMessage(Class encoding, Object message) { + this.encoding = encoding; + this.message = message; + } + + public Class encoding(){ + return encoding; + } + + public Object message(){ + return message; + } + } + +} diff --git a/src/main/java/org/atmosphere/client/Function.java b/src/main/java/org/atmosphere/client/Function.java new file mode 100644 index 00000000..3660c15a --- /dev/null +++ b/src/main/java/org/atmosphere/client/Function.java @@ -0,0 +1,22 @@ +/* + * Copyright 2012 Jeanfrancois Arcand + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.atmosphere.client; + +public interface Function { + + void on(T t); + +} diff --git a/src/main/java/org/atmosphere/client/Future.java b/src/main/java/org/atmosphere/client/Future.java new file mode 100644 index 00000000..fe821035 --- /dev/null +++ b/src/main/java/org/atmosphere/client/Future.java @@ -0,0 +1,71 @@ +/* + * Copyright 2012 Jeanfrancois Arcand + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.atmosphere.client; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + +public class Future implements java.util.concurrent.Future { + + private final Socket socket; + private final CountDownLatch latch = new CountDownLatch(1); + private final AtomicBoolean done = new AtomicBoolean(false); + + public Future(Socket socket) { + this.socket = socket; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + latch.countDown(); + return true; + } + + @Override + public boolean isCancelled() { + return latch.getCount() == 0; + } + + @Override + public boolean isDone() { + return done.get(); + } + + protected Future done(){ + done.set(true); + latch.countDown(); + return this; + } + + @Override + public Socket get() throws InterruptedException, ExecutionException { + latch.await(); + return socket; + } + + @Override + public Socket get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + latch.await(timeout, unit); + return socket; + } + + protected Socket socket() { + return socket; + } +} diff --git a/src/main/java/org/atmosphere/client/Options.java b/src/main/java/org/atmosphere/client/Options.java new file mode 100644 index 00000000..8ba77f5f --- /dev/null +++ b/src/main/java/org/atmosphere/client/Options.java @@ -0,0 +1,19 @@ +/* + * Copyright 2012 Jeanfrancois Arcand + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.atmosphere.client; + +public class Options { +} diff --git a/src/main/java/org/atmosphere/client/Request.java b/src/main/java/org/atmosphere/client/Request.java new file mode 100644 index 00000000..f66d078a --- /dev/null +++ b/src/main/java/org/atmosphere/client/Request.java @@ -0,0 +1,44 @@ +/* + * Copyright 2012 Jeanfrancois Arcand + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.atmosphere.client; + +import java.net.URI; +import java.util.List; +import java.util.Map; + +/** + * @author Jeanfrancois Arcand + */ +public interface Request { + + public enum METHOD {GET, POST, TRACE, PUT, DELETE, OPTIONS} + public enum TRANSPORT {WEBSOCKET, SSE, STREAMING, LONG_POLLING} + + List transport(); + + METHOD method(); + + Map> headers(); + + Map> queryString(); + + List encoders(); + + List decoders(); + + URI uri(); + +} diff --git a/src/main/java/org/atmosphere/client/RequestImpl.java b/src/main/java/org/atmosphere/client/RequestImpl.java new file mode 100644 index 00000000..55f6a6ad --- /dev/null +++ b/src/main/java/org/atmosphere/client/RequestImpl.java @@ -0,0 +1,130 @@ +/* + * Copyright 2012 Jeanfrancois Arcand + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.atmosphere.client; + +import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class RequestImpl implements Request { + + private final Builder builder; + + private RequestImpl(Builder builder) { + this.builder = builder; + } + + @Override + public List transport() { + return builder.transports; + } + + @Override + public METHOD method() { + return builder.method; + } + + @Override + public Map> headers() { + return builder.headers; + } + + @Override + public Map> queryString() { + return builder.queryString; + } + + @Override + public List encoders() { + return builder.encoders; + } + + @Override + public List decoders() { + return builder.decoders; + } + + @Override + public URI uri() { + return builder.uri; + } + + public final static class Builder { + + private final List transports = new ArrayList(); + private METHOD method = METHOD.GET; + private URI uri = URI.create("http://localhost:8080"); + private final List encoders = new ArrayList(); + private final List decoders = new ArrayList(); + private final Map> headers = new HashMap>(); + private final Map> queryString = new HashMap>(); + + public Builder() { + } + + public Builder transport(TRANSPORT t) { + transports.add(t); + return this; + } + + public Builder method(METHOD method) { + this.method = method; + return this; + } + + public Builder uri(URI uri) { + this.uri = uri; + return this; + } + + public Builder encoder(Encoder e) { + encoders.add(e); + return this; + } + + public Builder decoder(Decoder d) { + decoders.add(d); + return this; + } + + public Builder header(String name, String value) { + List l = headers.get(name); + if (l == null) { + l = new ArrayList(); + } + l.add(value); + headers.put(name, l); + return this; + } + + public Builder queryString(String name, String value) { + List l = queryString.get(name); + if (l == null) { + l = new ArrayList(); + } + l.add(value); + queryString.put(name, l); + return this; + } + + protected RequestImpl build(){ + return new RequestImpl(this); + } + } + +} diff --git a/src/main/java/org/atmosphere/client/Socket.java b/src/main/java/org/atmosphere/client/Socket.java new file mode 100644 index 00000000..e9018cb0 --- /dev/null +++ b/src/main/java/org/atmosphere/client/Socket.java @@ -0,0 +1,31 @@ +/* + * Copyright 2012 Jeanfrancois Arcand + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.atmosphere.client; + +import java.io.IOException; + +/** + * @author Jeanfrancois Arcand + */ +public interface Socket { + + enum EVENT {CLOSE, ERROR, HEADER, STATUS, MESSAGE} + + Future send(Object data) throws IOException; + + Socket on(EVENT type, Function function); + +} diff --git a/src/main/java/org/atmosphere/client/SocketImpl.java b/src/main/java/org/atmosphere/client/SocketImpl.java new file mode 100644 index 00000000..67efabe7 --- /dev/null +++ b/src/main/java/org/atmosphere/client/SocketImpl.java @@ -0,0 +1,137 @@ +/* + * Copyright 2012 Jeanfrancois Arcand + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.atmosphere.client; + +import com.ning.http.client.AsyncHttpClient; +import com.ning.http.client.websocket.WebSocket; +import org.atmosphere.client.util.ReaderInputStream; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.Reader; +import java.io.StringWriter; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +public class SocketImpl implements Socket { + + private final Request request; + private final ConcurrentHashMap functions = new ConcurrentHashMap(); + private final InternalSocket socket; + + public SocketImpl(Request request, AsyncHttpClient asyncHttpClient, Transport transport) { + this.request = request; + this.socket = new InternalSocket(asyncHttpClient); + } + + public SocketImpl(Request request, WebSocket webSocket, Transport transport) { + this.request = request; + this.socket = new InternalSocket(webSocket); + } + + public Future send(Object data) throws IOException { + socket.write(request, data); + return new Future(this); + } + + public Socket on(EVENT type, Function function) { + functions.put(type, function); + return this; + } + + private final static class InternalSocket { + + private final WebSocket webSocket; + private final AsyncHttpClient asyncHttpClient; + + public InternalSocket(WebSocket webSocket) { + this.webSocket = webSocket; + this.asyncHttpClient = null; + } + + public InternalSocket(AsyncHttpClient asyncHttpClient) { + this.webSocket = null; + this.asyncHttpClient = asyncHttpClient; + } + + public InternalSocket write(Request request, Object data) throws IOException { + + // Execute encoder + Encoder.EncodedMessage em = executeEncoders(request.encoders(), data); + if (webSocket != null) { + if (InputStream.class.isAssignableFrom(em.encoding())) { + InputStream is = (InputStream) em.message(); + ByteArrayOutputStream bs = new ByteArrayOutputStream(); + //TODO: We need to stream directly, in AHC! + byte[] buffer = new byte[8192]; + int n = 0; + while (-1 != (n = is.read(buffer))) { + bs.write(buffer, 0, n); + } + webSocket.sendMessage(bs.toByteArray()); + } else if (Reader.class.isAssignableFrom(em.encoding())) { + Reader is = (Reader) em.message(); + StringWriter bs = new StringWriter(); + //TODO: We need to stream directly, in AHC! + char[] chars = new char[8192]; + int n = 0; + while (-1 != (n = is.read(chars))) { + bs.write(chars, 0, n); + } + webSocket.sendTextMessage(bs.getBuffer().toString()); + } else if (String.class.isAssignableFrom(em.encoding())) { + webSocket.sendTextMessage(em.message().toString()); + } else if (byte[].class.isAssignableFrom(em.encoding())) { + webSocket.sendMessage((byte[]) em.message()); + } else { + throw new IllegalStateException("No Encoder for " + data); + } + } else { + if (InputStream.class.isAssignableFrom(em.encoding())) { + //TODO: Allow reading the response. + asyncHttpClient.preparePost(request.uri().toURL().toString()) + .setBody((InputStream) em.message()).execute(); + } else if (Reader.class.isAssignableFrom(em.encoding())) { + asyncHttpClient.preparePost(request.uri().toURL().toString()) + .setBody(new ReaderInputStream((Reader) em.message())).execute(); + return this; + } else if (String.class.isAssignableFrom(em.encoding())) { + asyncHttpClient.preparePost(request.uri().toURL().toString()).setBody((String) em.message()).execute(); + } else if (byte[].class.isAssignableFrom(em.encoding())) { + asyncHttpClient.preparePost(request.uri().toURL().toString()).setBody((byte[]) em.message()).execute(); + } else { + throw new IllegalStateException("No Encoder for " + data); + } + } + return this; + } + + + private Encoder.EncodedMessage executeEncoders(List encoders, Object data) { + // TODO This won't work, we need the type. + Encoder.EncodedMessage em = new Encoder.EncodedMessage(data.getClass(), data); + + for (Encoder e : encoders) { + em = e.encode(em); + } + return em; + } + + } + + +} diff --git a/src/main/java/org/atmosphere/client/Transport.java b/src/main/java/org/atmosphere/client/Transport.java new file mode 100644 index 00000000..ff8310f4 --- /dev/null +++ b/src/main/java/org/atmosphere/client/Transport.java @@ -0,0 +1,28 @@ +/* + * Copyright 2012 Jeanfrancois Arcand + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.atmosphere.client; + +/** + * @author Jeanfrancois Arcand + */ +public interface Transport { + + Request.TRANSPORT name(); + + Transport future(Future f); + + Transport injectFunction(Socket.EVENT event, Function function); +} diff --git a/src/main/java/org/atmosphere/client/util/ReaderInputStream.java b/src/main/java/org/atmosphere/client/util/ReaderInputStream.java new file mode 100644 index 00000000..da0bfb61 --- /dev/null +++ b/src/main/java/org/atmosphere/client/util/ReaderInputStream.java @@ -0,0 +1,294 @@ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.atmosphere.client.util; + + +import java.io.IOException; +import java.io.InputStream; +import java.io.Reader; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.Charset; +import java.nio.charset.CharsetEncoder; +import java.nio.charset.CoderResult; +import java.nio.charset.CodingErrorAction; + +/** + * {@link InputStream} implementation that reads a character stream from a {@link Reader} + * and transforms it to a byte stream using a specified charset encoding. The stream + * is transformed using a {@link CharsetEncoder} object, guaranteeing that all charset + * encodings supported by the JRE are handled correctly. In particular for charsets such as + * UTF-16, the implementation ensures that one and only one byte order marker + * is produced. + *

+ * Since in general it is not possible to predict the number of characters to be read from the + * {@link Reader} to satisfy a read request on the {@link ReaderInputStream}, all reads from + * the {@link Reader} are buffered. There is therefore no well defined correlation + * between the current position of the {@link Reader} and that of the {@link ReaderInputStream}. + * This also implies that in general there is no need to wrap the underlying {@link Reader} + * in a {@link java.io.BufferedReader}. + *

+ * {@link ReaderInputStream} implements the inverse transformation of {@link java.io.InputStreamReader}; + * in the following example, reading from in2 would return the same byte + * sequence as reading from in (provided that the initial byte sequence is legal + * with respect to the charset encoding): + *

+ * InputStream in = ...
+ * Charset cs = ...
+ * InputStreamReader reader = new InputStreamReader(in, cs);
+ * ReaderInputStream in2 = new ReaderInputStream(reader, cs);
+ * {@link ReaderInputStream} implements the same transformation as {@link java.io.OutputStreamWriter}, + * except that the control flow is reversed: both classes transform a character stream + * into a byte stream, but {@link java.io.OutputStreamWriter} pushes data to the underlying stream, + * while {@link ReaderInputStream} pulls it from the underlying stream. + *

+ * Note that while there are use cases where there is no alternative to using + * this class, very often the need to use this class is an indication of a flaw + * in the design of the code. This class is typically used in situations where an existing + * API only accepts an {@link InputStream}, but where the most natural way to produce the data + * is as a character stream, i.e. by providing a {@link Reader} instance. An example of a situation + * where this problem may appear is when implementing the {@link javax.activation.DataSource} + * interface from the Java Activation Framework. + *

+ * Given the fact that the {@link Reader} class doesn't provide any way to predict whether the next + * read operation will block or not, it is not possible to provide a meaningful + * implementation of the {@link InputStream#available()} method. A call to this method + * will always return 0. Also, this class doesn't support {@link InputStream#mark(int)}. + *

+ * Instances of {@link ReaderInputStream} are not thread safe. + * * + * @since 2.0 + */ +public class ReaderInputStream extends InputStream { + private static final int DEFAULT_BUFFER_SIZE = 1024; + + private final Reader reader; + private final CharsetEncoder encoder; + + /** + * CharBuffer used as input for the decoder. It should be reasonably + * large as we read data from the underlying Reader into this buffer. + */ + private final CharBuffer encoderIn; + + /** + * ByteBuffer used as output for the decoder. This buffer can be small + * as it is only used to transfer data from the decoder to the + * buffer provided by the caller. + */ + private final ByteBuffer encoderOut; + + private CoderResult lastCoderResult; + private boolean endOfInput; + + /** + * Construct a new {@link ReaderInputStream}. + * + * @param reader the target {@link Reader} + * @param encoder the charset encoder + * @since 2.1 + */ + public ReaderInputStream(Reader reader, CharsetEncoder encoder) { + this(reader, encoder, DEFAULT_BUFFER_SIZE); + } + + /** + * Construct a new {@link ReaderInputStream}. + * + * @param reader the target {@link Reader} + * @param encoder the charset encoder + * @param bufferSize the size of the input buffer in number of characters + * @since 2.1 + */ + public ReaderInputStream(Reader reader, CharsetEncoder encoder, int bufferSize) { + this.reader = reader; + this.encoder = encoder; + this.encoderIn = CharBuffer.allocate(bufferSize); + this.encoderIn.flip(); + this.encoderOut = ByteBuffer.allocate(128); + this.encoderOut.flip(); + } + + /** + * Construct a new {@link ReaderInputStream}. + * + * @param reader the target {@link Reader} + * @param charset the charset encoding + * @param bufferSize the size of the input buffer in number of characters + */ + public ReaderInputStream(Reader reader, Charset charset, int bufferSize) { + this(reader, + charset.newEncoder() + .onMalformedInput(CodingErrorAction.REPLACE) + .onUnmappableCharacter(CodingErrorAction.REPLACE), + bufferSize); + } + + /** + * Construct a new {@link ReaderInputStream} with a default input buffer size of + * 1024 characters. + * + * @param reader the target {@link Reader} + * @param charset the charset encoding + */ + public ReaderInputStream(Reader reader, Charset charset) { + this(reader, charset, DEFAULT_BUFFER_SIZE); + } + + /** + * Construct a new {@link ReaderInputStream}. + * + * @param reader the target {@link Reader} + * @param charsetName the name of the charset encoding + * @param bufferSize the size of the input buffer in number of characters + */ + public ReaderInputStream(Reader reader, String charsetName, int bufferSize) { + this(reader, Charset.forName(charsetName), bufferSize); + } + + /** + * Construct a new {@link ReaderInputStream} with a default input buffer size of + * 1024 characters. + * + * @param reader the target {@link Reader} + * @param charsetName the name of the charset encoding + */ + public ReaderInputStream(Reader reader, String charsetName) { + this(reader, charsetName, DEFAULT_BUFFER_SIZE); + } + + /** + * Construct a new {@link ReaderInputStream} that uses the default character encoding + * with a default input buffer size of 1024 characters. + * + * @param reader the target {@link Reader} + */ + public ReaderInputStream(Reader reader) { + this(reader, Charset.defaultCharset()); + } + + /** + * Fills the internal char buffer from the reader. + * + * @throws IOException + * If an I/O error occurs + */ + private void fillBuffer() throws IOException { + if (!endOfInput && (lastCoderResult == null || lastCoderResult.isUnderflow())) { + encoderIn.compact(); + int position = encoderIn.position(); + // We don't use Reader#read(CharBuffer) here because it is more efficient + // to write directly to the underlying char array (the default implementation + // copies data to a temporary char array). + int c = reader.read(encoderIn.array(), position, encoderIn.remaining()); + if (c == -1) { + endOfInput = true; + } else { + encoderIn.position(position+c); + } + encoderIn.flip(); + } + encoderOut.compact(); + lastCoderResult = encoder.encode(encoderIn, encoderOut, endOfInput); + encoderOut.flip(); + } + + /** + * Read the specified number of bytes into an array. + * + * @param b the byte array to read into + * @param off the offset to start reading bytes into + * @param len the number of bytes to read + * @return the number of bytes read or -1 + * if the end of the stream has been reached + * @throws IOException if an I/O error occurs + */ + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (b == null) { + throw new NullPointerException("Byte array must not be null"); + } + if (len < 0 || off < 0 || (off + len) > b.length) { + throw new IndexOutOfBoundsException("Array Size=" + b.length + + ", offset=" + off + ", length=" + len); + } + int read = 0; + if (len == 0) { + return 0; // Always return 0 if len == 0 + } + while (len > 0) { + if (encoderOut.hasRemaining()) { + int c = Math.min(encoderOut.remaining(), len); + encoderOut.get(b, off, c); + off += c; + len -= c; + read += c; + } else { + fillBuffer(); + if (endOfInput && !encoderOut.hasRemaining()) { + break; + } + } + } + return read == 0 && endOfInput ? -1 : read; + } + + /** + * Read the specified number of bytes into an array. + * + * @param b the byte array to read into + * @return the number of bytes read or -1 + * if the end of the stream has been reached + * @throws IOException if an I/O error occurs + */ + @Override + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + + /** + * Read a single byte. + * + * @return either the byte read or -1 if the end of the stream + * has been reached + * @throws IOException if an I/O error occurs + */ + @Override + public int read() throws IOException { + for (;;) { + if (encoderOut.hasRemaining()) { + return encoderOut.get() & 0xFF; + } else { + fillBuffer(); + if (endOfInput && !encoderOut.hasRemaining()) { + return -1; + } + } + } + } + + /** + * Close the stream. This method will cause the underlying {@link Reader} + * to be closed. + * @throws IOException if an I/O error occurs + */ + @Override + public void close() throws IOException { + reader.close(); + } +}