From 4aef46313af4d974ac6de2712ee1a8c58d38fd95 Mon Sep 17 00:00:00 2001 From: Stelian Ionescu Date: Mon, 13 Aug 2007 22:49:20 +0100 Subject: [PATCH] Further improvements to the event-loop. Signed-off-by: Stelian Ionescu --- io.event.asd | 5 +- io.event/io-buffer.lisp | 10 +-- io.event/io-channel.lisp | 148 ++++++++++++++++++++--------------- io.event/io-protocol.lisp | 17 +++- io.event/pkgdcl.lisp | 2 +- io.event/server-factory.lisp | 73 +++++++++++++++++ 6 files changed, 182 insertions(+), 73 deletions(-) create mode 100644 io.event/server-factory.lisp diff --git a/io.event.asd b/io.event.asd index 0f300ae6..b544ccf8 100644 --- a/io.event.asd +++ b/io.event.asd @@ -26,7 +26,7 @@ :author "Stelian Ionescu " :maintainer "Stelian Ionescu " :licence "LLGPL-2.1" - :depends-on (:io.multiplex :net.sockets) + :depends-on (:io.streams :io.multiplex :net.sockets) :pathname (merge-pathnames (make-pathname :directory '(:relative "io.event")) *load-truename*) :serial t @@ -34,4 +34,5 @@ ((:file "pkgdcl") (:file "io-buffer") (:file "io-channel") - (:file "io-protocol"))) + (:file "io-protocol") + (:file "server-factory"))) diff --git a/io.event/io-buffer.lisp b/io.event/io-buffer.lisp index 9437c7f5..d1b5f31e 100644 --- a/io.event/io-buffer.lisp +++ b/io.event/io-buffer.lisp @@ -21,13 +21,13 @@ ;;; Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, ;;; Boston, MA 02110-1301, USA -(in-package :io.multiplex) +(in-package :io.event) (defclass io-buffer () - ((data :accessor data-of) - (size :accessor size-of) - (start :initarg :start :accessor start-of) - (end :initarg :end :accessor end-of)) + ((data :accessor data-of) + (size :accessor size-of) + (start :initarg :start :accessor start-of) + (end :initarg :end :accessor end-of)) (:default-initargs :start 0 :end 0)) (defvar *default-buffer-size* 4096) diff --git a/io.event/io-channel.lisp b/io.event/io-channel.lisp index 264be282..142443ec 100644 --- a/io.event/io-channel.lisp +++ b/io.event/io-channel.lisp @@ -21,31 +21,32 @@ ;;; Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, ;;; Boston, MA 02110-1301, USA -(in-package :io.multiplex) +(in-package :io.event) ;;;; IO-Channel -;;; FIXME: for the moment channels are read/write -;;; this will probably have to change - (defclass io-channel () - ((protocol :accessor protocol-of))) + ((event-loop :initarg :event-loop + :accessor event-loop-of) + (protocol :accessor protocol-of) + (read-handler :accessor read-handler-of) + (write-handler :accessor write-handler-of) + (error-handler :accessor error-handler-of))) (defconstant +default-read-window-size+ 8192) (defclass io-buffered-channel (io-channel) - ((read-buffer :accessor read-buffer-of) - (read-buffered-p :initarg :read-buffered-p - :accessor read-buffered-p) - (read-window-size :initarg :read-window-size - :accessor read-window-size-of) - (write-buffer :accessor write-buffer-of) - (write-buffered-p :initarg :write-buffered-p - :accessor write-buffered-p)) - (:default-initargs - :read-buffered-p t - :write-buffered-p t - :read-window-size +default-read-window-size+)) + ((read-buffer :accessor read-buffer-of) + (read-buffered-p :initarg :read-buffered-p + :accessor read-buffered-p) + (read-window-size :initarg :read-window-size + :accessor read-window-size-of) + (write-buffer :accessor write-buffer-of) + (write-buffered-p :initarg :write-buffered-p + :accessor write-buffered-p)) + (:default-initargs :read-buffered-p t + :write-buffered-p t + :read-window-size +default-read-window-size+)) (defmethod initialize-instance :after ((channel io-buffered-channel) &key read-buffer-size write-buffer-size) @@ -58,61 +59,84 @@ ;;;; Socket-Transport -(defclass socket-transport (io-buffered-channel) - ((socket :accessor socket-of))) - -;;; FIXME: apply not good -(defmethod initialize-instance ((transport socket-transport) - &rest args) - (apply #'make-socket args)) - -(defclass tcp-transport (socket-transport) - ((status :initform :unconnected - :accessor status-of))) +(defclass socket-transport (io-channel) + ((socket :initarg :socket :accessor socket-of))) + +(defclass tcp-transport (io-buffered-channel socket-transport) + ((status :initform :unconnected + :accessor status-of))) + +(defmethod initialize-instance :after ((transport tcp-transport) &key) + (setf (read-handler-of transport) + (add-fd (event-loop-of transport) + (fd-of (socket-of transport)) + :read + #'(lambda (fd event) + (declare (ignore fd event)) + (on-transport-readable transport)))) + (setf (write-handler-of transport) + (add-fd (event-loop-of transport) + (fd-of (socket-of transport)) + :write + #'(lambda (fd event) + (declare (ignore fd event)) + (on-transport-writable transport)))) + (setf (error-handler-of transport) + (add-fd (event-loop-of transport) + (fd-of (socket-of transport)) + :error + #'(lambda (fd event) + (declare (ignore fd event)) + (on-transport-error transport))))) (defclass udp-transport (socket-transport) ()) -(defgeneric read-bytes (transport)) +(defgeneric on-transport-readable (transport)) -(defgeneric write-bytes (transport bytes &key start end)) +(defgeneric on-transport-writable (transport)) -(defmethod read-bytes ((c tcp-transport)) - (with-accessors ((rb read-buffer-of) - (sock socket-of) +(defgeneric on-transport-error (transport)) + +(defmethod on-transport-readable ((c tcp-transport)) + (with-accessors ((sock socket-of) (proto protocol-of) (status status-of)) c - (when (eq status :unconnected) - (on-connection-ready proto)) - (multiple-value-bind (buf byte-num) - (handler-case - ;; append to the buffer - (socket-receive (data-of rb) sock - :start (end-of rb) - :end (size-of rb)) - ;; either a spurious event, or the socket has - ;; just connected and there is no data to receive - (nix:ewouldblock () - (return-from read-bytes)) - ;; FIXME: perhaps we might be a little more sophisticated here - (socket-error (err) - (on-connection-lost proto err))) + (assert (eq status :connected)) + (let ((buffer (make-array +default-read-window-size+ + :element-type '(unsigned-byte 8))) + (byte-num 0)) + (declare (type unsigned-byte byte-num)) + (handler-case + (setf (values buffer byte-num) (socket-receive buffer sock)) + ;; a spurious event ! + (nix:ewouldblock () + (error "Got a transport-readable event but recv() returned EWOULDBLOCK !")) + ;; FIXME: perhaps we might be a little more sophisticated here + (socket-error (err) + (setf status :disconnected) + (on-connection-lost proto err))) (cond ;; EOF ((zerop byte-num) + (setf status :disconnected) (on-connection-end proto)) ;; good data ((plusp byte-num) - ;; increment the end pointer of the buffer - (incf (end-of rb) byte-num) - ;; FIXME: we're both buffering the data *and* calling - ;; ON-MESSAGE-RECEIVED with an array displaced to - ;; the data just received. perhaps we should separate the two - (on-message-received - proto - (make-array byte-num :element-type '(unsigned-byte 8) - :displaced-to (data-of rb) - :displaced-index-offset ))))))) - -(defmethod write-bytes ((c tcp-transport) - bytes &key start end) - ) + (on-message-received proto buffer)))))) + +;;; FIXME: deal with full write kernel buffers +(defmethod on-transport-writable ((c tcp-transport)) + (with-accessors ((sock socket-of) + (proto protocol-of) + (status status-of)) c + ;; not exactly complete: infact subsequent :WRITE + ;; events must be handled + (when (eq status :unconnected) + (on-connection-made proto) + (setf status :connected)))) + +;;; FIXME: complete it +(defmethod on-transport-error ((c tcp-transport)) + (let ((error-code (get-socket-option (socket-of c) + :error))) + )) diff --git a/io.event/io-protocol.lisp b/io.event/io-protocol.lisp index 94cc21c2..ed5f755f 100644 --- a/io.event/io-protocol.lisp +++ b/io.event/io-protocol.lisp @@ -21,15 +21,26 @@ ;;; Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, ;;; Boston, MA 02110-1301, USA -(in-package :io.multiplex) +(in-package :io.event) (defclass io-protocol () - ((transport :accessor transport-of))) + ((transport :initarg :transport + :accessor transport-of))) -(defgeneric on-connection-ready (protocol)) +(defclass stream-protocol () ()) + +(defgeneric on-protocol-start (protocol)) + +(defgeneric on-protocol-stop (protocol)) + +(defgeneric on-connection-made (protocol)) (defgeneric on-connection-lost (protocol reason)) (defgeneric on-connection-end (protocol)) (defgeneric on-message-received (protocol message)) + +(defclass datagram-protocol () ()) + +(defgeneric on-datagram-received (protocol datagram address)) diff --git a/io.event/pkgdcl.lisp b/io.event/pkgdcl.lisp index f0ffe483..bcc8bddd 100644 --- a/io.event/pkgdcl.lisp +++ b/io.event/pkgdcl.lisp @@ -25,7 +25,7 @@ (defpackage :io.event (:nicknames #:evie) - (:use #:common-lisp :io.multiplex :net.sockets) + (:use #:common-lisp :io.streams :io.multiplex :net.sockets) (:export ;; classes )) diff --git a/io.event/server-factory.lisp b/io.event/server-factory.lisp new file mode 100644 index 00000000..f9bf1d10 --- /dev/null +++ b/io.event/server-factory.lisp @@ -0,0 +1,73 @@ +;;;; -*- Mode: Lisp; Syntax: ANSI-Common-Lisp; Indent-tabs-mode: NIL -*- +;;; +;;; server-factory.lisp - TCP server factories. +;;; +;;; Copyright (C) 2007, Stelian Ionescu +;;; +;;; This code is free software; you can redistribute it and/or +;;; modify it under the terms of the version 2.1 of +;;; the GNU Lesser General Public License as published by +;;; the Free Software Foundation, as clarified by the +;;; preamble found here: +;;; http://opensource.franz.com/preamble.html +;;; +;;; This program is distributed in the hope that it will be useful, +;;; but WITHOUT ANY WARRANTY; without even the implied warranty of +;;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +;;; GNU General Public License for more details. +;;; +;;; You should have received a copy of the GNU Lesser General +;;; Public License along with this library; if not, write to the +;;; Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, +;;; Boston, MA 02110-1301, USA + +(in-package :io.event) + +;;; Server factory + +(defclass server-factory () + ((protocol :initarg :protocol + :accessor protocol-of))) + +(defgeneric on-connection-received (factory event-base socket)) + +(defmethod on-connection-received ((factory server-factory) + (event-loop event-base) + (socket active-socket)) + ) + +;;; Event LOOP + +(defclass event-loop (event-base) + ((sockets :initform (make-hash-table :test #'eql) + :accessor sockets-of) + (protocols :initform (make-hash-table :test #'eql) + :accessor protocols-of))) + +(defgeneric listen-tcp (event-loop &key host port factory)) + +(defmethod listen-tcp ((event-loop event-loop) + &key host port factory) + (check-type host address) + (check-type port (unsigned-byte 16)) + (check-type factory server-factory) + (let ((socket (make-socket :family (address-type host) + :type :stream :connect :passive + :local-host host :local-port port))) + (setf (fd-non-blocking socket) t) + (setf (gethash (fd-of socket) (sockets-of event-loop)) socket) + (add-fd event-loop (fd-of socket) :read + #'(lambda (fd event) + (ecase event + (:read + (let ((peer (accept-connection socket))) + (when peer + (let* ((transport (make-instance 'tcp-transport + :event-loop event-loop + :socket socket)) + (protocol (make-instance (protocol-of factory) + :transport transport))) + (setf (gethash (fd-of peer) (protocols-of event-loop)) + (cons peer protocol)))))) + (:error + (error "Got an error on the server socket: ~A~%" socket)))))))