Skip to content

Commit

Permalink
Further improvements to the event-loop.
Browse files Browse the repository at this point in the history
Signed-off-by: Stelian Ionescu <[email protected]>
  • Loading branch information
sionescu committed Aug 13, 2007
1 parent 407bea9 commit 4aef463
Show file tree
Hide file tree
Showing 6 changed files with 182 additions and 73 deletions.
5 changes: 3 additions & 2 deletions io.event.asd
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@
:author "Stelian Ionescu <[email protected]>"
:maintainer "Stelian Ionescu <[email protected]>"
:licence "LLGPL-2.1"
:depends-on (:io.multiplex :net.sockets)
:depends-on (:io.streams :io.multiplex :net.sockets)
:pathname (merge-pathnames (make-pathname :directory '(:relative "io.event"))
*load-truename*)
:serial t
:components
((:file "pkgdcl")
(:file "io-buffer")
(:file "io-channel")
(:file "io-protocol")))
(:file "io-protocol")
(:file "server-factory")))
10 changes: 5 additions & 5 deletions io.event/io-buffer.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
;;; Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
;;; Boston, MA 02110-1301, USA

(in-package :io.multiplex)
(in-package :io.event)

(defclass io-buffer ()
((data :accessor data-of)
(size :accessor size-of)
(start :initarg :start :accessor start-of)
(end :initarg :end :accessor end-of))
((data :accessor data-of)
(size :accessor size-of)
(start :initarg :start :accessor start-of)
(end :initarg :end :accessor end-of))
(:default-initargs :start 0 :end 0))

(defvar *default-buffer-size* 4096)
Expand Down
148 changes: 86 additions & 62 deletions io.event/io-channel.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,32 @@
;;; Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
;;; Boston, MA 02110-1301, USA

(in-package :io.multiplex)
(in-package :io.event)

;;;; IO-Channel

;;; FIXME: for the moment channels are read/write
;;; this will probably have to change

(defclass io-channel ()
((protocol :accessor protocol-of)))
((event-loop :initarg :event-loop
:accessor event-loop-of)
(protocol :accessor protocol-of)
(read-handler :accessor read-handler-of)
(write-handler :accessor write-handler-of)
(error-handler :accessor error-handler-of)))

(defconstant +default-read-window-size+ 8192)

(defclass io-buffered-channel (io-channel)
((read-buffer :accessor read-buffer-of)
(read-buffered-p :initarg :read-buffered-p
:accessor read-buffered-p)
(read-window-size :initarg :read-window-size
:accessor read-window-size-of)
(write-buffer :accessor write-buffer-of)
(write-buffered-p :initarg :write-buffered-p
:accessor write-buffered-p))
(:default-initargs
:read-buffered-p t
:write-buffered-p t
:read-window-size +default-read-window-size+))
((read-buffer :accessor read-buffer-of)
(read-buffered-p :initarg :read-buffered-p
:accessor read-buffered-p)
(read-window-size :initarg :read-window-size
:accessor read-window-size-of)
(write-buffer :accessor write-buffer-of)
(write-buffered-p :initarg :write-buffered-p
:accessor write-buffered-p))
(:default-initargs :read-buffered-p t
:write-buffered-p t
:read-window-size +default-read-window-size+))

(defmethod initialize-instance :after ((channel io-buffered-channel) &key
read-buffer-size write-buffer-size)
Expand All @@ -58,61 +59,84 @@

;;;; Socket-Transport

(defclass socket-transport (io-buffered-channel)
((socket :accessor socket-of)))

;;; FIXME: apply not good
(defmethod initialize-instance ((transport socket-transport)
&rest args)
(apply #'make-socket args))

(defclass tcp-transport (socket-transport)
((status :initform :unconnected
:accessor status-of)))
(defclass socket-transport (io-channel)
((socket :initarg :socket :accessor socket-of)))

(defclass tcp-transport (io-buffered-channel socket-transport)
((status :initform :unconnected
:accessor status-of)))

(defmethod initialize-instance :after ((transport tcp-transport) &key)
(setf (read-handler-of transport)
(add-fd (event-loop-of transport)
(fd-of (socket-of transport))
:read
#'(lambda (fd event)
(declare (ignore fd event))
(on-transport-readable transport))))
(setf (write-handler-of transport)
(add-fd (event-loop-of transport)
(fd-of (socket-of transport))
:write
#'(lambda (fd event)
(declare (ignore fd event))
(on-transport-writable transport))))
(setf (error-handler-of transport)
(add-fd (event-loop-of transport)
(fd-of (socket-of transport))
:error
#'(lambda (fd event)
(declare (ignore fd event))
(on-transport-error transport)))))

(defclass udp-transport (socket-transport) ())

(defgeneric read-bytes (transport))
(defgeneric on-transport-readable (transport))

(defgeneric write-bytes (transport bytes &key start end))
(defgeneric on-transport-writable (transport))

(defmethod read-bytes ((c tcp-transport))
(with-accessors ((rb read-buffer-of)
(sock socket-of)
(defgeneric on-transport-error (transport))

(defmethod on-transport-readable ((c tcp-transport))
(with-accessors ((sock socket-of)
(proto protocol-of)
(status status-of)) c
(when (eq status :unconnected)
(on-connection-ready proto))
(multiple-value-bind (buf byte-num)
(handler-case
;; append to the buffer
(socket-receive (data-of rb) sock
:start (end-of rb)
:end (size-of rb))
;; either a spurious event, or the socket has
;; just connected and there is no data to receive
(nix:ewouldblock ()
(return-from read-bytes))
;; FIXME: perhaps we might be a little more sophisticated here
(socket-error (err)
(on-connection-lost proto err)))
(assert (eq status :connected))
(let ((buffer (make-array +default-read-window-size+
:element-type '(unsigned-byte 8)))
(byte-num 0))
(declare (type unsigned-byte byte-num))
(handler-case
(setf (values buffer byte-num) (socket-receive buffer sock))
;; a spurious event !
(nix:ewouldblock ()
(error "Got a transport-readable event but recv() returned EWOULDBLOCK !"))
;; FIXME: perhaps we might be a little more sophisticated here
(socket-error (err)
(setf status :disconnected)
(on-connection-lost proto err)))
(cond
;; EOF
((zerop byte-num)
(setf status :disconnected)
(on-connection-end proto))
;; good data
((plusp byte-num)
;; increment the end pointer of the buffer
(incf (end-of rb) byte-num)
;; FIXME: we're both buffering the data *and* calling
;; ON-MESSAGE-RECEIVED with an array displaced to
;; the data just received. perhaps we should separate the two
(on-message-received
proto
(make-array byte-num :element-type '(unsigned-byte 8)
:displaced-to (data-of rb)
:displaced-index-offset )))))))

(defmethod write-bytes ((c tcp-transport)
bytes &key start end)
)
(on-message-received proto buffer))))))

;;; FIXME: deal with full write kernel buffers
(defmethod on-transport-writable ((c tcp-transport))
(with-accessors ((sock socket-of)
(proto protocol-of)
(status status-of)) c
;; not exactly complete: infact subsequent :WRITE
;; events must be handled
(when (eq status :unconnected)
(on-connection-made proto)
(setf status :connected))))

;;; FIXME: complete it
(defmethod on-transport-error ((c tcp-transport))
(let ((error-code (get-socket-option (socket-of c)
:error)))
))
17 changes: 14 additions & 3 deletions io.event/io-protocol.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,26 @@
;;; Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
;;; Boston, MA 02110-1301, USA

(in-package :io.multiplex)
(in-package :io.event)

(defclass io-protocol ()
((transport :accessor transport-of)))
((transport :initarg :transport
:accessor transport-of)))

(defgeneric on-connection-ready (protocol))
(defclass stream-protocol () ())

(defgeneric on-protocol-start (protocol))

(defgeneric on-protocol-stop (protocol))

(defgeneric on-connection-made (protocol))

(defgeneric on-connection-lost (protocol reason))

(defgeneric on-connection-end (protocol))

(defgeneric on-message-received (protocol message))

(defclass datagram-protocol () ())

(defgeneric on-datagram-received (protocol datagram address))
2 changes: 1 addition & 1 deletion io.event/pkgdcl.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

(defpackage :io.event
(:nicknames #:evie)
(:use #:common-lisp :io.multiplex :net.sockets)
(:use #:common-lisp :io.streams :io.multiplex :net.sockets)
(:export
;; classes
))
73 changes: 73 additions & 0 deletions io.event/server-factory.lisp
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
;;;; -*- Mode: Lisp; Syntax: ANSI-Common-Lisp; Indent-tabs-mode: NIL -*-
;;;
;;; server-factory.lisp - TCP server factories.
;;;
;;; Copyright (C) 2007, Stelian Ionescu <[email protected]>
;;;
;;; This code is free software; you can redistribute it and/or
;;; modify it under the terms of the version 2.1 of
;;; the GNU Lesser General Public License as published by
;;; the Free Software Foundation, as clarified by the
;;; preamble found here:
;;; http://opensource.franz.com/preamble.html
;;;
;;; This program is distributed in the hope that it will be useful,
;;; but WITHOUT ANY WARRANTY; without even the implied warranty of
;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
;;; GNU General Public License for more details.
;;;
;;; You should have received a copy of the GNU Lesser General
;;; Public License along with this library; if not, write to the
;;; Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
;;; Boston, MA 02110-1301, USA

(in-package :io.event)

;;; Server factory

(defclass server-factory ()
((protocol :initarg :protocol
:accessor protocol-of)))

(defgeneric on-connection-received (factory event-base socket))

(defmethod on-connection-received ((factory server-factory)
(event-loop event-base)
(socket active-socket))
)

;;; Event LOOP

(defclass event-loop (event-base)
((sockets :initform (make-hash-table :test #'eql)
:accessor sockets-of)
(protocols :initform (make-hash-table :test #'eql)
:accessor protocols-of)))

(defgeneric listen-tcp (event-loop &key host port factory))

(defmethod listen-tcp ((event-loop event-loop)
&key host port factory)
(check-type host address)
(check-type port (unsigned-byte 16))
(check-type factory server-factory)
(let ((socket (make-socket :family (address-type host)
:type :stream :connect :passive
:local-host host :local-port port)))
(setf (fd-non-blocking socket) t)
(setf (gethash (fd-of socket) (sockets-of event-loop)) socket)
(add-fd event-loop (fd-of socket) :read
#'(lambda (fd event)
(ecase event
(:read
(let ((peer (accept-connection socket)))
(when peer
(let* ((transport (make-instance 'tcp-transport
:event-loop event-loop
:socket socket))
(protocol (make-instance (protocol-of factory)
:transport transport)))
(setf (gethash (fd-of peer) (protocols-of event-loop))
(cons peer protocol))))))
(:error
(error "Got an error on the server socket: ~A~%" socket)))))))

0 comments on commit 4aef463

Please sign in to comment.