Skip to content

Commit

Permalink
Fix recallonbusy (#314)
Browse files Browse the repository at this point in the history
NethServer/dev#7010

* freepbx: log recallonbusy using stdout and stderr

* Show reload button when recall on busy default is changed

* bump version to force database init

* Rewrite of recallonbusy to avoid using AMIClient python library

* make new recallonbusy executable

* enable recallonbusy by default

---------

Co-authored-by: Matteo Valentini <[email protected]>
  • Loading branch information
Stell0 and Amygos authored Oct 8, 2024
1 parent aa8e6f2 commit c261e33
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 176 deletions.
2 changes: 1 addition & 1 deletion freepbx/Containerfile
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ COPY var/www/html/freepbx/admin/brand/* /var/www/html/freepbx/admin/brand/

RUN ln -sf /var/lib/asterisk/bin/fwconsole /usr/bin/fwconsole

RUN pip install mysql.connector asterisk-ami configparser
RUN pip install mysql.connector configparser


# Use PHP development ini configuration and enable logging on syslog
Expand Down
377 changes: 204 additions & 173 deletions freepbx/usr/sbin/recallonbusy
Original file line number Diff line number Diff line change
@@ -1,179 +1,210 @@
#!/usr/bin/python3

#
# Copyright (C) 2021 Nethesis S.r.l.
# http://www.nethesis.it - [email protected]
#
# This script is part of NethVoice.
#
# NethVoice is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License,
# or any later version.
#
# NethVoice is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with NethVoice. If not, see COPYING.
#

import syslog
"""Asterisk AMI client to recall on busy extensions."""

"""
Copyright (C) 2024 Nethesis S.r.l.
SPDX-License-Identifier: GPL-3.0-or-later
"""



import socket
import sys
import time
from asterisk.ami import AMIClient,SimpleAction,AutoReconnect
import threading
import configparser
#import threading
import json
import os
import time
import re

DEFAULT_CONFIG = {
'Host': 'localhost',
'Port': 5038,
'Pidfile' : '/run/recallonbusy.pid',
'Username': 'recallonbusy',
'Secret': '',
'Debug' : True,
'CheckInterval': 60,
}

# read config file
config = configparser.ConfigParser(DEFAULT_CONFIG)
config.read('/etc/asterisk/recallonbusy.cfg')
CONFIG = {}

for key,value in DEFAULT_CONFIG.items():
try:
if key in ['Debug']:
CONFIG[key] = config.getboolean('recallonbusy',key)
elif key in ['Port','CheckInterval']:
CONFIG[key] = config.getint('recallonbusy',key)
else:
CONFIG[key] = config.get('recallonbusy',key)
except:
CONFIG[key] = DEFAULT_CONFIG[key]

def log_debug(msg):
global CONFIG
if CONFIG['Debug'] == True:
syslog.syslog(syslog.LOG_INFO,'recallonbusy: %s' % msg)

def get_extension_state(extension):
global device_state_map
# get all devices
if re.match(r'9[0-9][0-9]{2,}$',extension):
mainextension = re.sub(r'^9[0-9]([0-9]{2,})$', r'\1', extension)
else:
mainextension = extension

extensions_states = {}
for ext,state in device_state_map.items():
if ext == mainextension or re.match(r'9[0-9]'+mainextension+'$',ext):
extensions_states[ext] = state

res_state = 'UNKNOWN'
for ext,state in extensions_states.items():
if state == 'INUSE' or state == 'RINGING':
res_state = state
break;
elif state == 'NOT_INUSE' and (res_state == 'UNKNOWN' or res_state == 'UNAVAILABLE'):
res_state = state

log_debug('Final extension '+ extension + ' state: ' + res_state + ' ' + json.dumps(extensions_states))
return res_state


# This is called every time AMI event is emitted
def event_listener(source, event):
global client
global device_state_map
if event.name == 'DeviceStateChange':
if re.match(r'PJSIP/([0-9]*)$', str(event['Device'])):
if 'State' in event:
extension = re.sub(r'PJSIP/([0-9]*)$', r'\1', str(event['Device']))
log_debug('DeviceStateChange '+ extension + ' -> '+ event['State'])
mainextension = re.sub(r'^9[0-9]([0-9]{2,})$', r'\1', extension)
device_state_map[extension] = event['State']
if get_extension_state(extension) == 'NOT_INUSE':
# Ask for waiting ROB for this extension
timeid = time.time()
actionid='dbget'+str(timeid)
action = SimpleAction(
'DBGet',
ActionID=actionid,
Family='ROB',
Key=mainextension
)
client.send_action(action)
elif event.name == 'DBGetResponse':
if 'dbget' in event['ActionID'] and event['Val'] != '':
waiting_extensions = event['Val'].split('&')
ext_to_call = event['Key']
log_debug('ext_to_call: '+ext_to_call+ ' waiting_extensions: '+ str(waiting_extensions))
if get_extension_state(ext_to_call) == 'NOT_INUSE':
for waiting_extension in waiting_extensions:
if get_extension_state(waiting_extension) == 'NOT_INUSE':
log_debug('waiting_extension '+ waiting_extension + 'is NOT_INUSE, generating the call...')
# launch call
action = SimpleAction(
'Originate',
Channel='Local/'+waiting_extension+'@from-internal',
Timeout=150000,
CallerID=ext_to_call,
Context='from-internal',
Priority=1,
Exten=ext_to_call
)
client.send_action(action)
waiting_extensions.remove(waiting_extension)
break
new_waiting_extensions_string = '&'.join(waiting_extensions)
# Write new astdb string
timeid = time.time()
actionid='dbput'+str(timeid)
action = SimpleAction(
'DBPut',
ActionID=actionid,
Family='ROB',
Key=ext_to_call,
Val=new_waiting_extensions_string
)
client.send_action(action)

else :
log_debug('Unknow event: '+ str(event))

def ami_client_connect_and_login(address,port,username,secret):
global client
try:
client = AMIClient(address=CONFIG['Host'],port=CONFIG['Port'])
#AutoReconnect(client)
client.login(username=CONFIG['Username'],secret=CONFIG['Secret'])
client.add_event_listener(event_listener, white_list=['DeviceStateChange','DBGetResponse'])
log_debug('AMI client connected')
return True
except Exception as err:
syslog.syslog(syslog.LOG_ERR,'AMI client ERROR: %s' % str(err))
return False

device_state_map = {}

connected = False

#MAIN LOOP
while True:
try:
if not connected:
connected = ami_client_connect_and_login(address=CONFIG['Host'],port=CONFIG['Port'],username=CONFIG['Username'],secret=CONFIG['Secret'])
if connected:
# Ask for device state map updates
client.send_action(SimpleAction('DeviceStateList'))

except Exception as e:
syslog.syslog(syslog.LOG_ERR,"Error: " + str(e))
connected = False

time.sleep(CONFIG['CheckInterval'])

class AMIClient:
def __init__(self, config_path='/etc/asterisk/recallonbusy.cfg'):
self.config_path = config_path
self._load_config()
self.sock = None
self.buffer = ''
self.event_listeners = []
self.lock = threading.Lock()
self.debug = self.config.getboolean('recallonbusy', 'Debug', fallback=False)
self.check_interval = self.config.getint('recallonbusy', 'CheckInterval', fallback=20)
self.actions = {}

def _load_config(self):
"""Load configuration from the specified file."""
self.config = configparser.ConfigParser()
if not os.path.exists(self.config_path):
raise FileNotFoundError(f"Configuration file not found: {self.config_path}")
self.config.read(self.config_path)
self.host = self.config.get('recallonbusy', 'Host', fallback='localhost')
self.port = self.config.getint('recallonbusy', 'Port', fallback=5038)
self.username = self.config.get('recallonbusy', 'Username', fallback='')
self.secret = self.config.get('recallonbusy', 'Secret', fallback='')

def connect(self):
"""Establish a connection to the AMI."""
print(f"[recallonbusy] Connecting to {self.host}:{self.port}")
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.host, self.port))
self._login()
threading.Thread(target=self._listen, daemon=True).start()

def _login(self):
"""Send the login action to AMI."""
action = (
f"Action: Login\r\n"
f"Username: {self.username}\r\n"
f"Secret: {self.secret}\r\n\r\n"
)
self.sock.sendall(action.encode())
if self.debug:
print("[recallonbusy] DEBUG: Sent login action")

def _listen(self):
"""Listen for incoming data from AMI."""
if self.debug:
print("[recallonbusy] DEBUG: Started listener thread")
while True:
try:
data = self.sock.recv(4096).decode()
if not data:
break
self.buffer += data
while '\r\n\r\n' in self.buffer:
raw_event, self.buffer = self.buffer.split('\r\n\r\n', 1)
event = self._parse_event(raw_event)
self._handle_event(event)
except Exception as e:
print(f"[recallonbusy] ERROR in listener thread: {e}", file=sys.stderr)
break

def _parse_event(self, data):
"""Parse raw event data into a dictionary."""
event = {}
lines = data.strip().split('\r\n')
for line in lines:
if ': ' in line:
key, value = line.split(': ', 1)
event[key.strip()] = value.strip()
if self.debug: # Print all events
print(f"[recallonbusy] DEBUG: Parsed event: {event}")
return event

def _handle_event(self, event):
"""Handle an event by notifying listeners."""
with self.lock:
for listener in self.event_listeners:
listener(event)

def add_event_listener(self, listener):
"""Add a listener for AMI events."""
with self.lock:
self.event_listeners.append(listener)

def remove_event_listener(self, listener):
"""Remove a listener for AMI events."""
with self.lock:
if listener in self.event_listeners:
self.event_listeners.remove(listener)

def send_action(self, action_dict):
"""Send an action to the AMI."""
action = ''.join(f"{key}: {value}\r\n" for key, value in action_dict.items())
action += '\r\n'
self.sock.sendall(action.encode())
if self.debug:
print(f"[recallonbusy] DEBUG: Sent action: {action_dict}")

def close(self):
"""Close the connection to the AMI."""
if self.sock:
self.sock.close()
self.sock = None
if self.debug:
print("[recallonbusy] DEBUG: Closed connection to AMI")


if __name__ == '__main__':
device_state_map = {}
def device_state_change_event_listener(event):
"""Handle incoming DeviceStateChange AMI events."""
global device_state_map
if not 'Event' in event or event['Event'] != 'DeviceStateChange':
return
if client.debug:
print(f"[recallonbusy] DEBUG: Hanlding event: {event}")

if 'State' in event and 'Device' in event and re.match(r'^PJSIP/[0-9]{2,}$', str(event['Device'])) :
mainextension = re.sub(r'^PJSIP/9[0-9]([0-9]+)$|^PJSIP/([^9][0-9]+)$', r'\1\2', str(event['Device']))

# update extension state in map
device_state_map[mainextension] = event['State']

# If the device is not in use, check if there are any waiting extensions
if event['State'] == 'NOT_INUSE':
if client.debug:
print(f'[recallonbusy] DEBUG: Device state changed: {mainextension} is {event["State"]} checking for waiting extensions')
# Ask Asterisk DB if there are any extension waiting for this extension
# Use mainextension as ActionID to recognize the response
client.send_action({
'Action': 'DBGet',
'ActionID': f'{mainextension}_get_waiting',
'Family': 'ROB',
'Key': mainextension
})

def db_get_response_event_listener(event):
"""Handle incoming DBGetResponse AMI events."""
global device_state_map
if not 'Event' in event or event['Event'] != 'DBGetResponse':
return
if client.debug:
print(f"[recallonbusy] DEBUG: Hanlding event: {event}")

if 'Family' in event and event['Family'] == 'ROB' and 'Key' in event and 'Val' in event and event['Val'] != '' and 'ActionID' in event and re.match(r'^[0-9]+_get_waiting$', str(event['ActionID'])):
mainextension = re.sub(r'^([0-9]+)_get_waiting$', r'\1', str(event['ActionID']))
waiting_extensions = event['Val'].split('&')
if client.debug:
print(f'[recallonbusy] DEBUG: Waiting extensions for {mainextension}: {waiting_extensions}')
# Call the first waiting extension
for waiting_extension in waiting_extensions:
waiting_extension_state = device_state_map.get(waiting_extension)
if waiting_extension_state in ['NOT_INUSE', 'UNKNOWN']:
print(f'[recallonbusy] Calling waiting extension {waiting_extension}')
# Call the waiting extension
client.send_action({
'Action': 'Originate',
'Channel': f'Local/{waiting_extension}@from-internal',
'Context': 'from-internal',
'Timeout': 150000,
'CallerID': mainextension,
'Exten': mainextension,
'Priority': 1
})
# Remove the waiting extension from the list
waiting_extensions.remove(waiting_extension)
# Update the waiting extensions list in Asterisk DB
client.send_action({
'Action': 'DBPut',
'Family': 'ROB',
'Key': mainextension,
'Val': '&'.join(waiting_extensions)
})
break
elif client.debug:
print(f'[recallonbusy] DEBUG: Skipping waiting extension {waiting_extension}: {waiting_extension_state}')


client = AMIClient('/etc/asterisk/recallonbusy.cfg')
client.add_event_listener(device_state_change_event_listener)
client.add_event_listener(db_get_response_event_listener)
client.connect()

while True:
try:
client.send_action({'Action': 'DeviceStateList'})
except socket.error as e:
# Reconnect to Asterisk if there is a socket error
print(f"[recallonbusy] ERROR sending action: {e}", file=sys.stderr)
client.connect()
finally:
time.sleep(client.check_interval)
Loading

0 comments on commit c261e33

Please sign in to comment.