Skip to content

Commit

Permalink
Merge pull request #260 from lightpanda-io/tick
Browse files Browse the repository at this point in the history
loop: add IO methods and remove Tick usage
  • Loading branch information
francisbouvier authored Nov 21, 2024
2 parents 2ddca8b + d0e006b commit d115261
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 96 deletions.
169 changes: 74 additions & 95 deletions src/loop.zig
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ pub const SingleThreaded = struct {
cbk_error: bool = false,

const Self = @This();
pub const Completion = IO.Completion;

pub const ConnectError = IO.ConnectError;
pub const RecvError = IO.RecvError;
pub const SendError = IO.SendError;

pub fn init(alloc: std.mem.Allocator) !Self {
const io = try alloc.create(IO);
Expand All @@ -66,7 +71,7 @@ pub const SingleThreaded = struct {
// on the go when they are executed (ie. nested I/O events).
pub fn run(self: *Self) !void {
while (self.eventsNb() > 0) {
try self.io.tick();
try self.io.run_for_ns(10 * std.time.ns_per_ms);
// at each iteration we might have new events registred by previous callbacks
}
// TODO: return instead immediatly on the first JS callback error
Expand Down Expand Up @@ -97,8 +102,8 @@ pub const SingleThreaded = struct {
self.alloc.destroy(ctx);
}

// Callback-based APIs
// -------------------
// JS callbacks APIs
// -----------------

// Timeout

Expand Down Expand Up @@ -207,107 +212,81 @@ pub const SingleThreaded = struct {
}
}

// Yield
pub fn Yield(comptime Ctx: type) type {
// TODO check ctx interface funcs:
// - onYield(ctx: *Ctx, ?anyerror) void
return struct {
const YieldImpl = @This();
const Loop = Self;

loop: *Loop,
ctx: *Ctx,
completion: IO.Completion,

pub fn init(loop: *Loop) YieldImpl {
return .{
.loop = loop,
.completion = undefined,
.ctx = undefined,
};
}

pub fn tick(self: *YieldImpl) !void {
return try self.loop.io.tick();
}
// IO callbacks APIs
// -----------------

pub fn yield(self: *YieldImpl, ctx: *Ctx) void {
self.ctx = ctx;
_ = self.loop.addEvent();
self.loop.io.timeout(*YieldImpl, self, YieldImpl.yieldCbk, &self.completion, 0);
}
// Connect

fn yieldCbk(self: *YieldImpl, _: *IO.Completion, result: IO.TimeoutError!void) void {
_ = self.loop.removeEvent();
_ = result catch |err| return self.ctx.onYield(err);
return self.ctx.onYield(null);
}
};
pub fn connect(
self: *Self,
comptime Ctx: type,
ctx: *Ctx,
completion: *Completion,
comptime cbk: fn (ctx: *Ctx, _: *Completion, res: ConnectError!void) void,
socket: std.posix.socket_t,
address: std.net.Address,
) void {
const old_events_nb = self.addEvent();
self.io.connect(*Ctx, ctx, cbk, completion, socket, address);
if (builtin.is_test) {
report("start connect {d}", .{old_events_nb + 1});
}
}

// Network
pub fn Network(comptime Ctx: type) type {

// TODO check ctx interface funcs:
// - onConnect(ctx: *Ctx, ?anyerror) void
// - onReceive(ctx: *Ctx, usize, ?anyerror) void
// - onSend(ctx: *Ctx, usize, ?anyerror) void

return struct {
const NetworkImpl = @This();
const Loop = Self;

loop: *Loop,
ctx: *Ctx,
completion: IO.Completion,

pub fn init(loop: *Loop) NetworkImpl {
return .{
.loop = loop,
.completion = undefined,
.ctx = undefined,
};
}

pub fn tick(self: *NetworkImpl) !void {
return try self.loop.io.tick();
}
pub fn onConnect(self: *Self, _: ConnectError!void) void {
const old_events_nb = self.removeEvent();
if (builtin.is_test) {
report("connect done, remaining events: {d}", .{old_events_nb - 1});
}
}

pub fn connect(self: *NetworkImpl, ctx: *Ctx, socket: std.posix.socket_t, address: std.net.Address) void {
self.ctx = ctx;
_ = self.loop.addEvent();
self.loop.io.connect(*NetworkImpl, self, NetworkImpl.connectCbk, &self.completion, socket, address);
}
// Send

fn connectCbk(self: *NetworkImpl, _: *IO.Completion, result: IO.ConnectError!void) void {
_ = self.loop.removeEvent();
_ = result catch |err| return self.ctx.onConnect(err);
return self.ctx.onConnect(null);
}
pub fn send(
self: *Self,
comptime Ctx: type,
ctx: *Ctx,
completion: *Completion,
comptime cbk: fn (ctx: *Ctx, completion: *Completion, res: SendError!usize) void,
socket: std.posix.socket_t,
buf: []const u8,
) void {
const old_events_nb = self.addEvent();
self.io.send(*Ctx, ctx, cbk, completion, socket, buf);
if (builtin.is_test) {
report("start send {d}", .{old_events_nb + 1});
}
}

pub fn receive(self: *NetworkImpl, ctx: *Ctx, socket: std.posix.socket_t, buffer: []u8) void {
self.ctx = ctx;
_ = self.loop.addEvent();
self.loop.io.recv(*NetworkImpl, self, NetworkImpl.receiveCbk, &self.completion, socket, buffer);
}
pub fn onSend(self: *Self, _: SendError!usize) void {
const old_events_nb = self.removeEvent();
if (builtin.is_test) {
report("send done, remaining events: {d}", .{old_events_nb - 1});
}
}

fn receiveCbk(self: *NetworkImpl, _: *IO.Completion, result: IO.RecvError!usize) void {
_ = self.loop.removeEvent();
const ln = result catch |err| return self.ctx.onReceive(0, err);
return self.ctx.onReceive(ln, null);
}
// Recv

pub fn send(self: *NetworkImpl, ctx: *Ctx, socket: std.posix.socket_t, buffer: []const u8) void {
self.ctx = ctx;
_ = self.loop.addEvent();
self.loop.io.send(*NetworkImpl, self, NetworkImpl.sendCbk, &self.completion, socket, buffer);
}
pub fn recv(
self: *Self,
comptime Ctx: type,
ctx: *Ctx,
completion: *Completion,
comptime cbk: fn (ctx: *Ctx, completion: *Completion, res: RecvError!usize) void,
socket: std.posix.socket_t,
buf: []u8,
) void {
const old_events_nb = self.addEvent();
self.io.recv(*Ctx, ctx, cbk, completion, socket, buf);
if (builtin.is_test) {
report("start recv {d}", .{old_events_nb + 1});
}
}

fn sendCbk(self: *NetworkImpl, _: *IO.Completion, result: IO.SendError!usize) void {
_ = self.loop.removeEvent();
const ln = result catch |err| return self.ctx.onSend(0, err);
return self.ctx.onSend(ln, null);
}
};
pub fn onRecv(self: *Self, _: RecvError!usize) void {
const old_events_nb = self.removeEvent();
if (builtin.is_test) {
report("recv done, remaining events: {d}", .{old_events_nb - 1});
}
}
};
2 changes: 1 addition & 1 deletion src/shell.zig
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ pub fn shellExec(
// - user input command from repl
// - JS callbacks events from scripts
while (true) {
try loop.io.tick();
try loop.io.run_for_ns(10 * std.time.ns_per_ms);
if (loop.cbk_error) {
if (try try_catch.exception(alloc, js_env.*)) |msg| {
defer alloc.free(msg);
Expand Down

0 comments on commit d115261

Please sign in to comment.