This patch adds a beanstalk CDR backend.

Beanstalkd is a simple to use job queue. It provides a means to
create multiple job queues called "tubes". Each tube can store
multiple jobs, with varying priorities with the queue. Queue
processing is available via a simple TCP socket or via well defined
libraries, avaialble at
https://github.com/kr/beanstalkd/wiki/client-libraries

This module is based upon the beanstalk-client library, available
for download at: https://github.com/deepfryed/beanstalk-client

Change-Id: I5fe4089a34ab3b39230786d9bbfddafa56715f48
This commit is contained in:
Nir Simionovich 2017-10-16 17:46:02 -04:00
parent 00d1c7ddd2
commit 4559cd0e28
7 changed files with 449 additions and 0 deletions

View File

@ -1,5 +1,6 @@
ALSA=@PBX_ALSA@
BLUETOOTH=@PBX_BLUETOOTH@
BEANSTALK=@PBX_BEANSTALK@
COROSYNC=@PBX_COROSYNC@
CRYPTO=@PBX_CRYPTO@
BFD=@PBX_BFD@

270
cdr/cdr_beanstalkd.c Normal file
View File

@ -0,0 +1,270 @@
/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2017
*
* Nir Simionovich <nirs@greenfieldtech.net>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
/*!
* \file
* \brief Asterisk Beanstalkd CDR records.
*
* This module requires the beanstalk-client library, avaialble from
* https://github.com/deepfryed/beanstalk-client
*
* See also
* \arg \ref AstCDR
* \ingroup cdr_drivers
*/
/*! \li \ref cdr_beanstalkd.c uses the configuration file \ref cdr_beanstalkd.conf
* \addtogroup configuration_file Configuration Files
*/
/*!
* \page cdr_beanstalkd.conf cdr_beanstalkd.conf
* \verbinclude cdr_beanstalkd.conf.sample
*/
/*** MODULEINFO
<depend>beanstalk</depend>
<support_level>extended</support_level>
***/
#include "asterisk.h"
#include <time.h>
#include <stdio.h>
#include "beanstalk.h"
#include "asterisk/channel.h"
#include "asterisk/cdr.h"
#include "asterisk/module.h"
#include "asterisk/utils.h"
#include "asterisk/manager.h"
#include "asterisk/config.h"
#include "asterisk/pbx.h"
#include "asterisk/json.h"
#define DATE_FORMAT "%Y-%m-%d %T"
#define CONF_FILE "cdr_beanstalkd.conf"
#define BEANSTALK_JOB_SIZE 4096
#define BEANSTALK_JOB_PRIORITY 99
#define BEANSTALK_JOB_TTR 60
#define BEANSTALK_JOB_DELAY 0
#define DEFAULT_BEANSTALK_HOST "127.0.0.1"
#define DEFAULT_BEANSTALK_PORT 11300
#define DEFAULT_BEANSTALK_TUBE "asterisk-cdr"
static const char name[] = "cdr_beanstalkd";
static int enablecdr = 0;
static char *bs_host;
static int bs_port;
static char *bs_tube;
static int priority;
AST_RWLOCK_DEFINE_STATIC(config_lock);
static int beanstalk_put(struct ast_cdr *cdr);
static int load_config(int reload) {
char *cat = NULL;
struct ast_config *cfg;
struct ast_variable *v;
struct ast_flags config_flags = {reload ? CONFIG_FLAG_FILEUNCHANGED : 0};
int newenablecdr = 0;
cfg = ast_config_load(CONF_FILE, config_flags);
if (cfg == CONFIG_STATUS_FILEUNCHANGED) {
return 0;
}
if (cfg == CONFIG_STATUS_FILEINVALID) {
ast_log(LOG_ERROR, "Config file '%s' could not be parsed\n", CONF_FILE);
return -1;
}
if (!cfg) {
/* Standard configuration */
ast_log(LOG_WARNING, "Failed to load configuration file. Module not activated.\n");
if (enablecdr) {
ast_cdr_backend_suspend(name);
}
enablecdr = 0;
return -1;
}
if (reload) {
ast_rwlock_wrlock(&config_lock);
ast_free(bs_host);
ast_free(bs_tube);
}
/* Bootstrap the default configuration */
bs_host = ast_strdup(DEFAULT_BEANSTALK_HOST);
bs_port = DEFAULT_BEANSTALK_PORT;
bs_tube = ast_strdup(DEFAULT_BEANSTALK_TUBE);
priority = BEANSTALK_JOB_PRIORITY;
while ((cat = ast_category_browse(cfg, cat))) {
if (!strcasecmp(cat, "general")) {
v = ast_variable_browse(cfg, cat);
while (v) {
if (!strcasecmp(v->name, "enabled")) {
newenablecdr = ast_true(v->value);
} else if (!strcasecmp(v->name, "host")) {
ast_free(bs_host);
bs_host = ast_strdup(v->value);
} else if (!strcasecmp(v->name, "port")) {
bs_port = atoi(v->value);
} else if (!strcasecmp(v->name, "tube")) {
ast_free(bs_tube);
bs_tube = ast_strdup(v->value);
} else if (!strcasecmp(v->name, "priority")) {
priority = atoi(v->value);
}
v = v->next;
}
}
}
if (reload) {
ast_rwlock_unlock(&config_lock);
}
ast_config_destroy(cfg);
if (!newenablecdr) {
ast_cdr_backend_suspend(name);
} else if (newenablecdr) {
ast_cdr_backend_unsuspend(name);
ast_log(LOG_NOTICE, "Added beanstalkd server %s at port %d with tube %s", bs_host, bs_port, bs_tube);
}
enablecdr = newenablecdr;
return 0;
}
static int beanstalk_put(struct ast_cdr *cdr) {
struct ast_tm timeresult;
char strAnswerTime[80] = "";
char strStartTime[80];
char strEndTime[80];
char *cdr_buffer;
int bs_id;
int bs_socket;
struct ast_json *t_cdr_json;
if (!enablecdr) {
return 0;
}
ast_rwlock_rdlock(&config_lock);
bs_socket = bs_connect(bs_host, bs_port);
if (bs_use(bs_socket, bs_tube) != BS_STATUS_OK) {
ast_log(LOG_ERROR, "Connection to Beanstalk tube %s @ %s:%d had failed", bs_tube, bs_host, bs_port);
ast_rwlock_unlock(&config_lock);
return 0;
}
ast_localtime(&cdr->start, &timeresult, NULL);
ast_strftime(strStartTime, sizeof(strStartTime), DATE_FORMAT, &timeresult);
if (cdr->answer.tv_sec) {
ast_localtime(&cdr->answer, &timeresult, NULL);
ast_strftime(strAnswerTime, sizeof(strAnswerTime), DATE_FORMAT, &timeresult);
}
ast_localtime(&cdr->end, &timeresult, NULL);
ast_strftime(strEndTime, sizeof(strEndTime), DATE_FORMAT, &timeresult);
ast_rwlock_unlock(&config_lock);
t_cdr_json = ast_json_pack("{s:s, s:s, s:s, s:s, s:s, s:s, s:s, s:s, s:s, s:s, s:s, s:s, s:i, s:i, s:s, s:s, s:s, s:s}",
"AccountCode", S_OR(cdr->accountcode, ""),
"Source", S_OR(cdr->src, ""),
"Destination", S_OR(cdr->dst, ""),
"DestinationContext", S_OR(cdr->dcontext, ""),
"CallerID", S_OR(cdr->clid, ""),
"Channel", S_OR(cdr->channel, ""),
"DestinationChannel", S_OR(cdr->dstchannel, ""),
"LastApplication", S_OR(cdr->lastapp, ""),
"LastData", S_OR(cdr->lastdata, ""),
"StartTime", S_OR(strStartTime, ""),
"AnswerTime", S_OR(strAnswerTime, ""),
"EndTime", S_OR(strEndTime, ""),
"Duration", cdr->duration,
"Billsec", cdr->billsec,
"Disposition", S_OR(ast_cdr_disp2str(cdr->disposition), ""),
"AMAFlags", S_OR(ast_channel_amaflags2string(cdr->amaflags), ""),
"UniqueID", S_OR(cdr->uniqueid, ""),
"UserField", S_OR(cdr->userfield, ""));
cdr_buffer = ast_json_dump_string(t_cdr_json);
ast_json_unref(t_cdr_json);
bs_id = bs_put(bs_socket, priority, BEANSTALK_JOB_DELAY, BEANSTALK_JOB_TTR, cdr_buffer, strlen(cdr_buffer));
if (bs_id > 0) {
ast_log(LOG_DEBUG, "Successfully created job %d with %s\n", bs_id, cdr_buffer);
} else {
ast_log(LOG_ERROR, "CDR job creation failed for %s\n", cdr_buffer);
}
bs_disconnect(bs_socket);
ast_json_free(cdr_buffer);
return 0;
}
static int unload_module(void) {
if (ast_cdr_unregister(name)) {
return -1;
}
ast_free(bs_host);
ast_free(bs_tube);
return 0;
}
static int load_module(void) {
if (ast_cdr_register(name, "Asterisk CDR Beanstalkd Backend", beanstalk_put)) {
return AST_MODULE_LOAD_DECLINE;
}
if (load_config(0)) {
ast_cdr_unregister(name);
return AST_MODULE_LOAD_DECLINE;
}
return AST_MODULE_LOAD_SUCCESS;
}
static int reload(void) {
return load_config(1);
}
AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "Asterisk Beanstalkd CDR Backend",
.support_level = AST_MODULE_SUPPORT_EXTENDED,
.load = load_module,
.unload = unload_module,
.reload = reload,
.load_pri = AST_MODPRI_CDR_DRIVER,
);

View File

@ -0,0 +1,26 @@
;
; Asterisk Call Management CDR via Beanstalkd job queue
;
; Beanstalkd is a simple job queue server, that is highly versatile and simple to use.
; Beanstalkd includes the capability of using multiple queues at the same time, with priorities.
;
; This module requires that your server has the beanstalk-client library installed. The library
; can be downloaded from - https://github.com/deepfryed/beanstalk-client
;
[general]
;enabled = yes
;host = 127.0.0.1 ; Specify the remote IP address of the Beanstalkd server
;port = 11300 ; Specify the remote PORT of the the Beanstalkd server
;tube = asterisk-cdr ; Specify the default CDR job queue to use
;priority = 99 ; Specify the default job priority for the queue. This parameter is useful when building
; platform with multiple Asterisk servers, that are used for different functions. For example,
; none billable CDR records can be inserted with a priority of 99, while billable ones be
; inserted with a priority of 1

143
configure vendored
View File

@ -991,6 +991,10 @@ PBX_PJSIP_DLG_CREATE_UAS_AND_INC_LOCK
PJSIP_DLG_CREATE_UAS_AND_INC_LOCK_DIR
PJSIP_DLG_CREATE_UAS_AND_INC_LOCK_INCLUDE
PJSIP_DLG_CREATE_UAS_AND_INC_LOCK_LIB
PBX_BEANSTALK
BEANSTALK_DIR
BEANSTALK_INCLUDE
BEANSTALK_LIB
PBX_PGSQL
PGSQL_DIR
PGSQL_INCLUDE
@ -1430,6 +1434,7 @@ with_opus
with_osptk
with_oss
with_postgres
with_beanstalk
with_pjproject
with_popt
with_portaudio
@ -2182,6 +2187,7 @@ Optional Packages:
--with-osptk=PATH use OSP Toolkit files in PATH
--with-oss=PATH use Open Sound System files in PATH
--with-postgres=PATH use PostgreSQL files in PATH
--with-beanstalk=PATH use Beanstalk Job Queue files in PATH
--with-pjproject=PATH use PJPROJECT files in PATH
--with-popt=PATH use popt files in PATH
--with-portaudio=PATH use PortAudio files in PATH
@ -11421,6 +11427,38 @@ fi
BEANSTALK_DESCRIP="Beanstalk Job Queue"
BEANSTALK_OPTION="beanstalk"
PBX_BEANSTALK=0
# Check whether --with-beanstalk was given.
if test "${with_beanstalk+set}" = set; then :
withval=$with_beanstalk;
case ${withval} in
n|no)
USE_BEANSTALK=no
# -1 is a magic value used by menuselect to know that the package
# was disabled, other than 'not found'
PBX_BEANSTALK=-1
;;
y|ye|yes)
ac_mandatory_list="${ac_mandatory_list} BEANSTALK"
;;
*)
BEANSTALK_DIR="${withval}"
ac_mandatory_list="${ac_mandatory_list} BEANSTALK"
;;
esac
fi
if test "x${PBX_PJPROJECT}" != "x1" ; then
PJPROJECT_DESCRIP="PJPROJECT"
@ -25043,6 +25081,111 @@ fi
if test "x${PBX_BEANSTALK}" != "x1" -a "${USE_BEANSTALK}" != "no"; then
pbxlibdir=""
# if --with-BEANSTALK=DIR has been specified, use it.
if test "x${BEANSTALK_DIR}" != "x"; then
if test -d ${BEANSTALK_DIR}/lib; then
pbxlibdir="-L${BEANSTALK_DIR}/lib"
else
pbxlibdir="-L${BEANSTALK_DIR}"
fi
fi
pbxfuncname="bs_version"
if test "x${pbxfuncname}" = "x" ; then # empty lib, assume only headers
AST_BEANSTALK_FOUND=yes
else
ast_ext_lib_check_save_CFLAGS="${CFLAGS}"
CFLAGS="${CFLAGS} "
as_ac_Lib=`$as_echo "ac_cv_lib_beanstalk_${pbxfuncname}" | $as_tr_sh`
{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for ${pbxfuncname} in -lbeanstalk" >&5
$as_echo_n "checking for ${pbxfuncname} in -lbeanstalk... " >&6; }
if eval \${$as_ac_Lib+:} false; then :
$as_echo_n "(cached) " >&6
else
ac_check_lib_save_LIBS=$LIBS
LIBS="-lbeanstalk ${pbxlibdir} $LIBS"
cat confdefs.h - <<_ACEOF >conftest.$ac_ext
/* end confdefs.h. */
/* Override any GCC internal prototype to avoid an error.
Use char because int might match the return type of a GCC
builtin and then its argument prototype would still apply. */
#ifdef __cplusplus
extern "C"
#endif
char ${pbxfuncname} ();
int
main ()
{
return ${pbxfuncname} ();
;
return 0;
}
_ACEOF
if ac_fn_c_try_link "$LINENO"; then :
eval "$as_ac_Lib=yes"
else
eval "$as_ac_Lib=no"
fi
rm -f core conftest.err conftest.$ac_objext \
conftest$ac_exeext conftest.$ac_ext
LIBS=$ac_check_lib_save_LIBS
fi
eval ac_res=\$$as_ac_Lib
{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_res" >&5
$as_echo "$ac_res" >&6; }
if eval test \"x\$"$as_ac_Lib"\" = x"yes"; then :
AST_BEANSTALK_FOUND=yes
else
AST_BEANSTALK_FOUND=no
fi
CFLAGS="${ast_ext_lib_check_save_CFLAGS}"
fi
# now check for the header.
if test "${AST_BEANSTALK_FOUND}" = "yes"; then
BEANSTALK_LIB="${pbxlibdir} -lbeanstalk "
# if --with-BEANSTALK=DIR has been specified, use it.
if test "x${BEANSTALK_DIR}" != "x"; then
BEANSTALK_INCLUDE="-I${BEANSTALK_DIR}/include"
fi
BEANSTALK_INCLUDE="${BEANSTALK_INCLUDE} "
if test "xbeanstalk.h" = "x" ; then # no header, assume found
BEANSTALK_HEADER_FOUND="1"
else # check for the header
ast_ext_lib_check_saved_CPPFLAGS="${CPPFLAGS}"
CPPFLAGS="${CPPFLAGS} ${BEANSTALK_INCLUDE}"
ac_fn_c_check_header_mongrel "$LINENO" "beanstalk.h" "ac_cv_header_beanstalk_h" "$ac_includes_default"
if test "x$ac_cv_header_beanstalk_h" = xyes; then :
BEANSTALK_HEADER_FOUND=1
else
BEANSTALK_HEADER_FOUND=0
fi
CPPFLAGS="${ast_ext_lib_check_saved_CPPFLAGS}"
fi
if test "x${BEANSTALK_HEADER_FOUND}" = "x0" ; then
BEANSTALK_LIB=""
BEANSTALK_INCLUDE=""
else
if test "x${pbxfuncname}" = "x" ; then # only checking headers -> no library
BEANSTALK_LIB=""
fi
PBX_BEANSTALK=1
cat >>confdefs.h <<_ACEOF
#define HAVE_BEANSTALK 1
_ACEOF
fi
fi
fi
# possible places for oss definitions
if test "x${PBX_OSS}" != "x1" -a "${USE_OSS}" != "no"; then

View File

@ -526,6 +526,7 @@ AST_EXT_LIB_SETUP([OPUS], [Opus], [opus])
AST_EXT_LIB_SETUP([OSPTK], [OSP Toolkit], [osptk])
AST_EXT_LIB_SETUP([OSS], [Open Sound System], [oss])
AST_EXT_LIB_SETUP([PGSQL], [PostgreSQL], [postgres])
AST_EXT_LIB_SETUP([BEANSTALK], [Beanstalk Job Queue], [beanstalk])
if test "x${PBX_PJPROJECT}" != "x1" ; then
AST_EXT_LIB_SETUP([PJPROJECT], [PJPROJECT], [pjproject])
@ -2170,6 +2171,8 @@ AST_EXT_LIB_CHECK([BKTR], [c], [backtrace], [execinfo.h])
AST_EXT_LIB_CHECK([BLUETOOTH], [bluetooth], [ba2str], [bluetooth/bluetooth.h])
AST_EXT_LIB_CHECK([BEANSTALK], [beanstalk], [bs_version], [beanstalk.h])
# possible places for oss definitions
AST_EXT_LIB_CHECK([OSS], [ossaudio], [], [linux/soundcard.h])
AST_EXT_LIB_CHECK([OSS], [ossaudio], [], [sys/soundcard.h])

View File

@ -115,6 +115,9 @@
attribute. */
#undef HAVE_ATTRIBUTE_warn_unused_result
/* Define to 1 if you have the Beanstalk Job Queue library. */
#undef HAVE_BEANSTALK
/* Define to 1 if you have the Debug symbol decoding library. */
#undef HAVE_BFD

View File

@ -383,3 +383,6 @@ TIMERFD_INCLUDE=@TIMERFD_INCLUDE@
SNDFILE_INCLUDE=@SNDFILE_INCLUDE@
SNDFILE_LIB=@SNDFILE_LIB@
BEANSTALK_INCLUDE=@BEANSTALK_INCLUDE@
BEANSTALK_LIB=@BEANSTALK_LIB@