Skip to content

Commit

Permalink
much more work on an example, I think I know what we could do
Browse files Browse the repository at this point in the history
  • Loading branch information
jadlr committed May 8, 2015
1 parent 39988b3 commit 7d9414b
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 6 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ scalaVersion := "2.11.6"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % "2.3.10",
"com.typesafe.akka" %% "akka-stream-experimental" % "1.0-RC2",
"com.typesafe.akka" %% "akka-http-scala-experimental" % "1.0-RC2",
"org.scala-lang" % "scala-reflect" % "2.11.6",
"org.scala-lang.modules" %% "scala-xml" % "1.0.3",
"com.typesafe.akka" %% "akka-testkit" % "2.3.10" % "test",
Expand Down
2 changes: 2 additions & 0 deletions example/resources/web/css/app.css
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/* The main css of this app */

15 changes: 15 additions & 0 deletions example/resources/web/index.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Wikipedia Update Dashboard</title>
<link rel="stylesheet" type="text/css" href="css/app.css">
<script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.4/jquery.min.js"></script>
<script type="application/javascript" src="js/app.js"></script>
</head>
<body>
<h1>Wikipedia Updates</h1>
<ul class="wikipedia-updates">
</ul>
</body>
</html>
22 changes: 22 additions & 0 deletions example/resources/web/js/app.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
$(function() {

if (!window.WebSocket) {
window.WebSocket = window.MozWebSocket;
}

if (window.WebSocket) {
var l = window.location;
var wsUrl = ((l.protocol === "https:") ? "wss://" : "ws://") + l.hostname + (((l.port != 80) && (l.port != 443)) ? ":" + l.port : "") + l.pathname + "socket";
var socket = new WebSocket(wsUrl);
socket.onmessage = function (event) {
$(".wikipedia-updates").append('<li>' + event.data + '</li>');
};
socket.onopen = function (_event) {
console.log("ready to rumble :)");
};

} else {
console.log("You don't have websockets :(")
}

});
41 changes: 41 additions & 0 deletions example/scala/io/trsc/reactive/irc/example/ConnectionHandler.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package io.trsc.reactive.irc.example

import akka.actor.{ActorRef, ActorSystem, Props}
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.http.scaladsl.server.Directives
import akka.stream.OverflowStrategy
import akka.stream.scaladsl._

object ConnectionHandler {
def apply(broadcastSource: Source[BroadcastMessage, _])(implicit sys: ActorSystem) = new ConnectionHandler(broadcastSource).flow
}

class ConnectionHandler(broadcastSource: Source[BroadcastMessage, _])(implicit sys: ActorSystem) extends Directives {

private val connectionSupervisor = sys.actorOf(Props[ConnectionSupervisor])

def flow = Flow(wrappedSupervisorSink, clientOutSource)(Keep.right) { implicit builder =>
(supervisor, out) =>
import akka.stream.scaladsl.FlowGraph.Implicits._

val broadcast = builder.add(broadcastSource)
val collectTextMessage = builder.add(collectTextMessageFlow)
val convertToIncomingMessage = builder.add(convertToIncomingMessageFlow)
val merge = builder.add(Merge[ConnectionEvent](3))

broadcast ~> merge.in(0)
collectTextMessage ~> convertToIncomingMessage ~> merge.in(1)
builder.matValue ~> convertActorRefToNewConnection ~> merge.in(2); merge ~> supervisor

(collectTextMessage.inlet, out.outlet)
}

private val wrappedSupervisorSink = Sink.actorRef[ConnectionEvent](connectionSupervisor, ConnectionClosed)
private val clientOutSource = Source.actorRef[Message](1, OverflowStrategy.fail)

private val collectTextMessageFlow = Flow[Message] collect { case TextMessage.Strict(msg) => msg }
private val convertToIncomingMessageFlow = Flow[String] map { IncomingMessage }

private val convertActorRefToNewConnection = Flow[ActorRef] map { NewConnection }

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.trsc.reactive.irc.example

import akka.actor.{Terminated, ActorRef, Actor}
import akka.http.scaladsl.model.ws.TextMessage

sealed trait ConnectionEvent
case object ConnectionClosed extends ConnectionEvent
case class NewConnection(actorRef: ActorRef) extends ConnectionEvent
case class IncomingMessage(msg: String) extends ConnectionEvent
case class BroadcastMessage(msg: String) extends ConnectionEvent

class ConnectionSupervisor extends Actor {

var subscribers = Set.empty[ActorRef]

def receive = {
case NewConnection(subscriber) =>
context.watch(subscriber)
subscribers += subscriber
println("subscription added")
case BroadcastMessage(msg) =>
subscribers foreach { _ ! TextMessage.Strict(msg) }
println("broadcasted: " + msg)
case IncomingMessage(_) => println("TODO: we could have the client register for diff languages")
case ConnectionClosed => println("TODO: proper logging")
case Terminated(subscriber) =>
subscribers -= subscriber
println("subscription terminated")
}

}
20 changes: 20 additions & 0 deletions example/scala/io/trsc/reactive/irc/example/Routes.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.trsc.reactive.irc.example

import akka.http.scaladsl.model.ws.Message
import akka.http.scaladsl.server.Directives
import akka.stream.scaladsl.Flow

object Routes extends Directives {

def apply(handler: Flow[Message, Message, Any]) =
get {
pathSingleSlash {
getFromResource("web/index.html")
} ~
path("socket") {
handleWebsocketMessages(handler)
} ~
getFromResourceDirectory("web")
}

}
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package io.trsc.reactive.irc.example

import akka.actor.{Props, ActorSystem}
import akka.actor.{ActorSystem, Props}
import akka.stream.ActorFlowMaterializer
import akka.stream.actor.ActorSubscriberMessage.OnNext
import akka.stream.actor.{ActorSubscriber, OneByOneRequestStrategy, RequestStrategy}
import akka.stream.scaladsl.{Sink, Flow}
import akka.stream.scaladsl.{Flow, Sink}
import akka.stream.stage.{Context, PushStage, SyncDirective}
import io.trsc.reactive.irc.ReactiveIRC
import io.trsc.reactive.irc.protocol.IrcMessage

import scala.collection._
import scala.util.Try

object WikipediaUpdates extends App {
Expand Down Expand Up @@ -41,12 +42,22 @@ class ChangedLinesExtractor extends PushStage[String, Int] {

class CountingActor extends ActorSubscriber {

private var count = 0
private var queue: mutable.Queue[(Int, Long)] = mutable.Queue.empty

private var added = 0
private var removed = 0

protected def requestStrategy: RequestStrategy = OneByOneRequestStrategy

def receive = {
case OnNext(lines: Int) => count = count + lines; println(s"lines added to wikipedia since start: $count")
case OnNext(lines: Int) if lines < 0 => removed = removed + lines; printValues
case OnNext(lines: Int) if lines > 0 => added = added + lines; printValues
case _ => printValues
}

}
def printValues {
println(s"lines added: $added")
println(s"lines removed: $removed")
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.trsc.reactive.irc.example

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.stream.ActorFlowMaterializer
import io.trsc.reactive.irc.ReactiveIRC
import io.trsc.reactive.irc.protocol.IrcMessage

import scala.util.{Failure, Success}

object WikipediaUpdatesServer extends App {

implicit val system = ActorSystem("wikipedia-updates-system")
import system.dispatcher
implicit val materializer = ActorFlowMaterializer()

val broadcastSource = ReactiveIRC.listen("irc.wikimedia.org", 6667, "reactive-example", "#en.wikipedia" :: Nil) collect {
case IrcMessage(_, _, params) => params.last
} map { BroadcastMessage }

val interface = "localhost"
val port = args.headOption.map(_.toInt).getOrElse(7117)

Http().bindAndHandle(Routes(ConnectionHandler(broadcastSource)), interface, port).onComplete {
case Success(binding)
val localAddress = binding.localAddress
println(s"Started WikipediaUpdatesServer on ${localAddress.getHostName}:${localAddress.getPort}")
case Failure(e)
println(s"Failed to start WikipediaUpdatesServer with ${e.getMessage}")
system.shutdown()
}
}
2 changes: 1 addition & 1 deletion src/main/scala/io/trsc/reactive/irc/ReactiveIRC.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ object ReactiveIRC {
val splitMessages = builder.add(new UnzipIrcMessages)

registeringSource ~> merge ~> convertToByteString ~> log ~> connection ~> decodeIrcMessages ~> splitMessages.in
merge.preferred <~ systemMessageHandler <~ logSys <~ splitMessages.systemMessages
merge.preferred <~ systemMessageHandler <~ logSys <~ splitMessages.systemMessages

splitMessages.channelMessages
}
Expand Down

0 comments on commit 7d9414b

Please sign in to comment.