Skip to content

Commit 9bd7c2b

Browse files
author
Martin Jambon
committed
Some progress toward maintaining a connection to Slack. Still buggy,
receiving and "EOF error".
1 parent 67f1693 commit 9bd7c2b

File tree

5 files changed

+319
-188
lines changed

5 files changed

+319
-188
lines changed

OMakefile

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,21 @@
1+
# A service that maintains websocket connections to Slack,
2+
# allowing us to receive notifications on what's happening
3+
# on a channel (Slack RTM API).
4+
15
OCAML_LIBS = $(OCAML_LIBS_slack-ws)
26

37
FILES[] =
48
slack_ws_t
59
slack_ws_j
610
slack_ws_v
711
slack_ws_access
12+
# Connection manager
13+
slack_ws_conn
14+
# Message handler
815
slack_ws
16+
# HTTP service to receive notifications from other internal components
917
slack_ws_http_serv
18+
# Start the service
1019
slack_ws_main
1120

1221
Atdgen(slack_ws, -j-std)

slack_ws.atd

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@ type slack_ts = string wrap <ocaml module="Slack_api_ts">
66
type teamid = string wrap <ocaml module="Teamid">
77
type uid = string wrap <ocaml module="Uid">
88

9+
type type_only = {
10+
type_ <json name="type">: string;
11+
}
12+
913
type latest = {
1014
uid: uid;
1115
teamid: teamid;

slack_ws.ml

Lines changed: 16 additions & 187 deletions
Original file line numberDiff line numberDiff line change
@@ -1,193 +1,22 @@
1-
2-
open Lwt
3-
open Log
4-
51
(*
6-
- open websocket
7-
- post to elb
8-
- save timestamp of latest event
9-
- update socket connection
2+
Handler for incoming events:
3+
- respond to json pings
4+
- forward other incoming events to our http server
105
*)
116

12-
type connection = {
13-
conn_id: Slack_api_teamid.t;
14-
push: string -> unit Lwt.t;
15-
close: unit -> unit Lwt.t;
16-
mutable waiting_for_pong: (bool Lwt.t * unit Lwt.u) option ref;
17-
}
18-
19-
let connections = Hashtbl.create 10
20-
21-
let remove_connection id =
22-
try
23-
let {close} = Hashtbl.find connections id in
24-
Hashtbl.remove connections id;
25-
catch close
26-
(fun e ->
27-
logf `Error "Can't close websocket connection: %s" (string_of_exn e);
28-
return ()
29-
)
30-
with Not_found ->
31-
return ()
32-
33-
let replace_connection x =
34-
async (fun () -> remove_connection x.conn_id);
35-
Hashtbl.add connections x.conn_id x
36-
37-
let react input_handler waiting_for_pong send frame =
38-
let open Websocket_lwt.Frame in
39-
match frame.opcode with
40-
| Opcode.Ping ->
41-
send (Websocket_lwt.Frame.create ~opcode:Opcode.Pong ()) >>= fun () ->
42-
return true
43-
44-
| Opcode.Close ->
45-
(if String.length frame.content >= 2 then
46-
send (
47-
Websocket_lwt.Frame.create ~opcode:Opcode.Close
48-
~content:(String.sub frame.content 0 2) ()
49-
)
50-
else
51-
send (Websocket_lwt.Frame.close 1000)
52-
) >>= fun () ->
53-
return false
54-
55-
| Opcode.Pong ->
56-
(match !waiting_for_pong with
57-
| None -> ()
58-
| Some (_, awakener) ->
59-
waiting_for_pong := None;
60-
Lwt.wakeup awakener ()
61-
);
62-
return true
63-
64-
| Opcode.Text
65-
| Opcode.Binary ->
66-
input_handler frame.content >>= fun () ->
67-
return true
68-
69-
| Opcode.Continuation
70-
| Opcode.Ctrl _
71-
| Opcode.Nonctrl _ ->
72-
send (Websocket_lwt.Frame.close 1002) >>= fun () ->
73-
return false
74-
75-
let create_websocket_connection ws_url input_handler waiting_for_pong =
76-
let open Websocket_lwt.Frame in
77-
let orig_uri = Uri.of_string ws_url in
78-
let uri = Uri.with_scheme orig_uri (Some "https") in
79-
Resolver_lwt.resolve_uri ~uri Resolver_lwt_unix.system >>= fun endp ->
80-
let ctx = Conduit_lwt_unix.default_ctx in
81-
Conduit_lwt_unix.endp_to_client ~ctx endp >>= fun client ->
82-
Websocket_lwt.with_connection ~ctx client uri >>= fun (recv, send) ->
83-
84-
let rec loop () =
85-
recv () >>= fun frame ->
86-
react input_handler waiting_for_pong send frame >>= function
87-
| true -> loop ()
88-
| false -> return ()
89-
in
90-
let close () =
91-
logf `Info "Sending a close frame.";
92-
send (Websocket_lwt.Frame.close 1000)
93-
in
94-
let push content =
95-
send (Websocket_lwt.Frame.create ~content ())
96-
in
97-
async loop;
98-
return (push, close)
99-
100-
let create_connection slack_teamid input_handler =
101-
Slack.get_auth slack_teamid >>= function
102-
| None ->
103-
logf `Error "Cannot connect to Slack for team %s"
104-
(Slack_api_teamid.to_string slack_teamid);
105-
Http_exn.bad_request
106-
`Slack_authentication_missing
107-
"Slack authentication missing"
108-
| Some auth ->
109-
Slack_api.rtm_start auth.Slack_api_t.access_token >>= fun x ->
110-
let ws_url = x.Slack_api_t.url in
111-
let waiting_for_pong = ref None in
112-
create_websocket_connection ws_url
113-
input_handler waiting_for_pong >>= fun (push, close) ->
114-
let conn = {
115-
conn_id = slack_teamid;
116-
push;
117-
close;
118-
waiting_for_pong;
119-
} in
120-
return conn
121-
122-
let get_connection slack_teamid =
123-
try Some (Hashtbl.find connections slack_teamid)
124-
with Not_found -> None
125-
126-
let obtain_connection slack_teamid input_handler =
127-
let mutex_key = "slack-ws:" ^ Slack_api_teamid.to_string slack_teamid in
128-
Redis_mutex.with_mutex ~atime:30. ~ltime:60 mutex_key (fun () ->
129-
match get_connection slack_teamid with
130-
| Some x -> return x
131-
| None ->
132-
create_connection slack_teamid input_handler >>= fun conn ->
133-
replace_connection conn;
134-
return conn
135-
)
136-
7+
open Slack_ws_t
8+
open Slack_ws_conn
1379
(*
138-
Send a ping and wait for a pong response for up to 10 seconds.
139-
*)
140-
let check_connection x =
141-
match !(x.waiting_for_pong) with
142-
| Some (result, _) ->
143-
result
144-
| None ->
145-
Apputil_error.catch_and_report "Slack Wwebsocket ping"
146-
(fun () ->
147-
let waiter, awakener = Lwt.wait () in
148-
let success = waiter >>= fun () -> return true in
149-
let failure = Lwt_unix.sleep 10. >>= fun () -> return false in
150-
let result = Lwt.pick [success; failure] in
151-
x.waiting_for_pong := Some (result, awakener);
152-
result
153-
)
154-
(fun e ->
155-
logf `Error "Websocket ping error: %s" (string_of_exn e);
156-
return false
10+
let make_input_handler forward =
11+
fun send event_json ->
12+
match parse_event event_json with
13+
| Pong ->
14+
(match !waiting_for_pong with
15+
| None -> ()
16+
| Some (_, awakener) ->
17+
waiting_for_pong := None;
18+
Lwt.wakeup awakener ()
15719
)
158-
>>= function
159-
| true ->
160-
return true
161-
| false ->
162-
logf `Error "Websocket %s didn't respond to ping request"
163-
(Slack_api_teamid.to_string x.conn_id);
164-
remove_connection x.conn_id >>= fun () ->
165-
return false
166-
167-
let check_slack_team_connection slack_teamid =
168-
match get_connection slack_teamid with
169-
| None -> return false
170-
| Some x -> check_connection x
171-
172-
let get_slack_address esper_teamid =
173-
User_team.get esper_teamid >>= fun team ->
174-
User_preferences.get team >>= fun p ->
175-
match p.Api_t.pref_slack_address with
176-
| None -> Http_exn.bad_request `Slack_not_configured "Slack not configured"
177-
| Some x -> return x
178-
179-
let check_esper_team_connection teamid =
180-
get_slack_address teamid >>= fun x ->
181-
check_slack_team_connection x.Api_t.slack_teamid
182-
183-
(*
184-
Testing:
185-
186-
- log into Esper as the desired user
187-
- produce Slack auth URL: Slack.app_auth_url ()
188-
- visit that URL and follow the oauth steps
189-
- grab the auth token from the table slack_app_auth
20+
| Forward event_json ->
21+
forward event_json
19022
*)
191-
192-
let init () =
193-
return ()

0 commit comments

Comments
 (0)