Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix recallonbusy #314

Merged
merged 8 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion freepbx/Containerfile
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ COPY var/www/html/freepbx/admin/brand/* /var/www/html/freepbx/admin/brand/

RUN ln -sf /var/lib/asterisk/bin/fwconsole /usr/bin/fwconsole

RUN pip install mysql.connector asterisk-ami configparser
RUN pip install mysql.connector configparser


# Use PHP development ini configuration and enable logging on syslog
Expand Down
377 changes: 204 additions & 173 deletions freepbx/usr/sbin/recallonbusy
Original file line number Diff line number Diff line change
@@ -1,179 +1,210 @@
#!/usr/bin/python3

#
# Copyright (C) 2021 Nethesis S.r.l.
# http://www.nethesis.it - [email protected]
#
# This script is part of NethVoice.
#
# NethVoice is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License,
# or any later version.
#
# NethVoice is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with NethVoice. If not, see COPYING.
#

import syslog
"""Asterisk AMI client to recall on busy extensions."""

"""
Copyright (C) 2024 Nethesis S.r.l.
SPDX-License-Identifier: GPL-3.0-or-later
"""



import socket
import sys
import time
from asterisk.ami import AMIClient,SimpleAction,AutoReconnect
import threading
import configparser
#import threading
import json
import os
import time
import re

DEFAULT_CONFIG = {
'Host': 'localhost',
'Port': 5038,
'Pidfile' : '/run/recallonbusy.pid',
'Username': 'recallonbusy',
'Secret': '',
'Debug' : True,
'CheckInterval': 60,
}

# read config file
config = configparser.ConfigParser(DEFAULT_CONFIG)
config.read('/etc/asterisk/recallonbusy.cfg')
CONFIG = {}

for key,value in DEFAULT_CONFIG.items():
try:
if key in ['Debug']:
CONFIG[key] = config.getboolean('recallonbusy',key)
elif key in ['Port','CheckInterval']:
CONFIG[key] = config.getint('recallonbusy',key)
else:
CONFIG[key] = config.get('recallonbusy',key)
except:
CONFIG[key] = DEFAULT_CONFIG[key]

def log_debug(msg):
global CONFIG
if CONFIG['Debug'] == True:
syslog.syslog(syslog.LOG_INFO,'recallonbusy: %s' % msg)

def get_extension_state(extension):
global device_state_map
# get all devices
if re.match(r'9[0-9][0-9]{2,}$',extension):
mainextension = re.sub(r'^9[0-9]([0-9]{2,})$', r'\1', extension)
else:
mainextension = extension

extensions_states = {}
for ext,state in device_state_map.items():
if ext == mainextension or re.match(r'9[0-9]'+mainextension+'$',ext):
extensions_states[ext] = state

res_state = 'UNKNOWN'
for ext,state in extensions_states.items():
if state == 'INUSE' or state == 'RINGING':
res_state = state
break;
elif state == 'NOT_INUSE' and (res_state == 'UNKNOWN' or res_state == 'UNAVAILABLE'):
res_state = state

log_debug('Final extension '+ extension + ' state: ' + res_state + ' ' + json.dumps(extensions_states))
return res_state


# This is called every time AMI event is emitted
def event_listener(source, event):
global client
global device_state_map
if event.name == 'DeviceStateChange':
if re.match(r'PJSIP/([0-9]*)$', str(event['Device'])):
if 'State' in event:
extension = re.sub(r'PJSIP/([0-9]*)$', r'\1', str(event['Device']))
log_debug('DeviceStateChange '+ extension + ' -> '+ event['State'])
mainextension = re.sub(r'^9[0-9]([0-9]{2,})$', r'\1', extension)
device_state_map[extension] = event['State']
if get_extension_state(extension) == 'NOT_INUSE':
# Ask for waiting ROB for this extension
timeid = time.time()
actionid='dbget'+str(timeid)
action = SimpleAction(
'DBGet',
ActionID=actionid,
Family='ROB',
Key=mainextension
)
client.send_action(action)
elif event.name == 'DBGetResponse':
if 'dbget' in event['ActionID'] and event['Val'] != '':
waiting_extensions = event['Val'].split('&')
ext_to_call = event['Key']
log_debug('ext_to_call: '+ext_to_call+ ' waiting_extensions: '+ str(waiting_extensions))
if get_extension_state(ext_to_call) == 'NOT_INUSE':
for waiting_extension in waiting_extensions:
if get_extension_state(waiting_extension) == 'NOT_INUSE':
log_debug('waiting_extension '+ waiting_extension + 'is NOT_INUSE, generating the call...')
# launch call
action = SimpleAction(
'Originate',
Channel='Local/'+waiting_extension+'@from-internal',
Timeout=150000,
CallerID=ext_to_call,
Context='from-internal',
Priority=1,
Exten=ext_to_call
)
client.send_action(action)
waiting_extensions.remove(waiting_extension)
break
new_waiting_extensions_string = '&'.join(waiting_extensions)
# Write new astdb string
timeid = time.time()
actionid='dbput'+str(timeid)
action = SimpleAction(
'DBPut',
ActionID=actionid,
Family='ROB',
Key=ext_to_call,
Val=new_waiting_extensions_string
)
client.send_action(action)

else :
log_debug('Unknow event: '+ str(event))

def ami_client_connect_and_login(address,port,username,secret):
global client
try:
client = AMIClient(address=CONFIG['Host'],port=CONFIG['Port'])
#AutoReconnect(client)
client.login(username=CONFIG['Username'],secret=CONFIG['Secret'])
client.add_event_listener(event_listener, white_list=['DeviceStateChange','DBGetResponse'])
log_debug('AMI client connected')
return True
except Exception as err:
syslog.syslog(syslog.LOG_ERR,'AMI client ERROR: %s' % str(err))
return False

device_state_map = {}

connected = False

#MAIN LOOP
while True:
try:
if not connected:
connected = ami_client_connect_and_login(address=CONFIG['Host'],port=CONFIG['Port'],username=CONFIG['Username'],secret=CONFIG['Secret'])
if connected:
# Ask for device state map updates
client.send_action(SimpleAction('DeviceStateList'))

except Exception as e:
syslog.syslog(syslog.LOG_ERR,"Error: " + str(e))
connected = False

time.sleep(CONFIG['CheckInterval'])

class AMIClient:
def __init__(self, config_path='/etc/asterisk/recallonbusy.cfg'):
self.config_path = config_path
self._load_config()
self.sock = None
self.buffer = ''
self.event_listeners = []
self.lock = threading.Lock()
self.debug = self.config.getboolean('recallonbusy', 'Debug', fallback=False)
self.check_interval = self.config.getint('recallonbusy', 'CheckInterval', fallback=20)
self.actions = {}

def _load_config(self):
"""Load configuration from the specified file."""
self.config = configparser.ConfigParser()
if not os.path.exists(self.config_path):
raise FileNotFoundError(f"Configuration file not found: {self.config_path}")
self.config.read(self.config_path)
self.host = self.config.get('recallonbusy', 'Host', fallback='localhost')
self.port = self.config.getint('recallonbusy', 'Port', fallback=5038)
self.username = self.config.get('recallonbusy', 'Username', fallback='')
self.secret = self.config.get('recallonbusy', 'Secret', fallback='')

def connect(self):
"""Establish a connection to the AMI."""
print(f"[recallonbusy] Connecting to {self.host}:{self.port}")
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.host, self.port))
self._login()
threading.Thread(target=self._listen, daemon=True).start()

def _login(self):
"""Send the login action to AMI."""
action = (
f"Action: Login\r\n"
f"Username: {self.username}\r\n"
f"Secret: {self.secret}\r\n\r\n"
)
self.sock.sendall(action.encode())
if self.debug:
print("[recallonbusy] DEBUG: Sent login action")

def _listen(self):
"""Listen for incoming data from AMI."""
if self.debug:
print("[recallonbusy] DEBUG: Started listener thread")
while True:
try:
data = self.sock.recv(4096).decode()
if not data:
break
self.buffer += data
while '\r\n\r\n' in self.buffer:
raw_event, self.buffer = self.buffer.split('\r\n\r\n', 1)
event = self._parse_event(raw_event)
self._handle_event(event)
except Exception as e:
print(f"[recallonbusy] ERROR in listener thread: {e}", file=sys.stderr)
break

def _parse_event(self, data):
"""Parse raw event data into a dictionary."""
event = {}
lines = data.strip().split('\r\n')
for line in lines:
if ': ' in line:
key, value = line.split(': ', 1)
event[key.strip()] = value.strip()
if self.debug: # Print all events
print(f"[recallonbusy] DEBUG: Parsed event: {event}")
return event

def _handle_event(self, event):
"""Handle an event by notifying listeners."""
with self.lock:
for listener in self.event_listeners:
listener(event)

def add_event_listener(self, listener):
"""Add a listener for AMI events."""
with self.lock:
self.event_listeners.append(listener)

def remove_event_listener(self, listener):
"""Remove a listener for AMI events."""
with self.lock:
if listener in self.event_listeners:
self.event_listeners.remove(listener)

def send_action(self, action_dict):
"""Send an action to the AMI."""
action = ''.join(f"{key}: {value}\r\n" for key, value in action_dict.items())
action += '\r\n'
self.sock.sendall(action.encode())
if self.debug:
print(f"[recallonbusy] DEBUG: Sent action: {action_dict}")

def close(self):
"""Close the connection to the AMI."""
if self.sock:
self.sock.close()
self.sock = None
if self.debug:
print("[recallonbusy] DEBUG: Closed connection to AMI")


if __name__ == '__main__':
device_state_map = {}
def device_state_change_event_listener(event):
"""Handle incoming DeviceStateChange AMI events."""
global device_state_map
if not 'Event' in event or event['Event'] != 'DeviceStateChange':
return
if client.debug:
print(f"[recallonbusy] DEBUG: Hanlding event: {event}")

if 'State' in event and 'Device' in event and re.match(r'^PJSIP/[0-9]{2,}$', str(event['Device'])) :
mainextension = re.sub(r'^PJSIP/9[0-9]([0-9]+)$|^PJSIP/([^9][0-9]+)$', r'\1\2', str(event['Device']))

# update extension state in map
device_state_map[mainextension] = event['State']

# If the device is not in use, check if there are any waiting extensions
if event['State'] == 'NOT_INUSE':
if client.debug:
print(f'[recallonbusy] DEBUG: Device state changed: {mainextension} is {event["State"]} checking for waiting extensions')
# Ask Asterisk DB if there are any extension waiting for this extension
# Use mainextension as ActionID to recognize the response
client.send_action({
'Action': 'DBGet',
'ActionID': f'{mainextension}_get_waiting',
'Family': 'ROB',
'Key': mainextension
})

def db_get_response_event_listener(event):
"""Handle incoming DBGetResponse AMI events."""
global device_state_map
if not 'Event' in event or event['Event'] != 'DBGetResponse':
return
if client.debug:
print(f"[recallonbusy] DEBUG: Hanlding event: {event}")

if 'Family' in event and event['Family'] == 'ROB' and 'Key' in event and 'Val' in event and event['Val'] != '' and 'ActionID' in event and re.match(r'^[0-9]+_get_waiting$', str(event['ActionID'])):
mainextension = re.sub(r'^([0-9]+)_get_waiting$', r'\1', str(event['ActionID']))
waiting_extensions = event['Val'].split('&')
if client.debug:
print(f'[recallonbusy] DEBUG: Waiting extensions for {mainextension}: {waiting_extensions}')
# Call the first waiting extension
for waiting_extension in waiting_extensions:
waiting_extension_state = device_state_map.get(waiting_extension)
if waiting_extension_state in ['NOT_INUSE', 'UNKNOWN']:
print(f'[recallonbusy] Calling waiting extension {waiting_extension}')
# Call the waiting extension
client.send_action({
'Action': 'Originate',
'Channel': f'Local/{waiting_extension}@from-internal',
'Context': 'from-internal',
'Timeout': 150000,
'CallerID': mainextension,
'Exten': mainextension,
'Priority': 1
})
# Remove the waiting extension from the list
waiting_extensions.remove(waiting_extension)
# Update the waiting extensions list in Asterisk DB
client.send_action({
'Action': 'DBPut',
'Family': 'ROB',
'Key': mainextension,
'Val': '&'.join(waiting_extensions)
})
break
elif client.debug:
print(f'[recallonbusy] DEBUG: Skipping waiting extension {waiting_extension}: {waiting_extension_state}')


client = AMIClient('/etc/asterisk/recallonbusy.cfg')
client.add_event_listener(device_state_change_event_listener)
client.add_event_listener(db_get_response_event_listener)
client.connect()

while True:
try:
client.send_action({'Action': 'DeviceStateList'})
except socket.error as e:
# Reconnect to Asterisk if there is a socket error
print(f"[recallonbusy] ERROR sending action: {e}", file=sys.stderr)
client.connect()
finally:
time.sleep(client.check_interval)
Loading
Loading